PySpark中python环境打包和JAR包依赖
在 PySpark 中打包 Python 环境并调度到集群是处理依赖一致性的关键步骤。以下是完整的解决方案,包含环境打包、分发和配置方法:
一、环境打包方法
- 使用 Conda 打包环境
# 创建 Conda 环境
conda create -n pyspark_env python=3.8
conda activate pyspark_env
conda install numpy pandas pyarrow # 安装所需依赖# 导出环境为 YAML 文件
conda env export > environment.yml# 打包环境(Linux/macOS)
conda-pack -n pyspark_env -o pyspark_env.tar.gz
- 使用 Virtualenv 打包环境
# 创建虚拟环境
python3 -m venv pyspark_env
source pyspark_env/bin/activate # Linux/macOS
pyspark_env\Scripts\activate # Windows# 安装依赖
pip install numpy pandas pyarrow# 打包环境(需使用第三方工具)
pip install virtualenv-pack
virtualenv-pack -f -o pyspark_env.tar.gz
二、分发环境到集群
方法 1:通过 --archives 参数上传
在提交作业时,使用 --archives 参数将打包的环境分发到所有节点:
# 将环境包上传到 HDFS,避免每次提交都重新传输:
hdfs dfs -put pyspark_env.tar.gz /path/in/hdfs/spark-submit \--master yarn \--deploy-mode cluster \--py-files helper.py\ # python依赖文件,比如第三方代码等--archives hdfs:///path/in/hdfs/pyspark_env.tar.gz#environment \your_script.py
三、配置 PySpark 使用打包环境
- 设置 Python 解释器路径
在代码中指定 Executor 使用打包环境中的 Python:
import os
os.environ["PYSPARK_PYTHON"] = "./environment/bin/python" # 对应 --archives 指定的目录名
os.environ["PYSPARK_DRIVER_PYTHON"] = "./environment/bin/python" # Cluster 模式需要,如果是client模式,driver_python配置本地python路径,比如/opt/conda/bin/python, 需注意本地python和集群打包python的版本一致from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PackagedEnvApp").getOrCreate()
- 编写pyspark脚本
import os
os.environ["PYSPARK_PYTHON"] = "./environment/bin/python"from pyspark.sql import SparkSession
import pandas as pd # 使用打包环境中的 pandasspark = SparkSession.builder.appName("PackagedEnvExample").getOrCreate()# 使用 pandas 处理数据
pdf = pd.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]})
df = spark.createDataFrame(pdf)
df.show()spark.stop()
- 提交作业
spark-submit \--master yarn \--deploy-mode cluster \--archives pyspark_env.tar.gz#environment \example.py
- 配置优先级与运行模式详解
-
配置优先级规则
Spark 配置的优先级从高到低如下: -
关键结论:
- SparkSession.builder.config() 优先级最高,会覆盖其他配置
- spark-submit 参数优先级次之
- 特殊参数例外:–master 和 --deploy-mode 在 spark-submit 中具有最高优先级
-
deploy-mode 配置规则
设置方式 是否生效 说明 spark-submit
✅ 总是生效 命令行参数具有最终决定权 SparkSession
代码❌ 当 spark-submit
指定时无效若 spark-submit
未指定,则代码配置生效spark-defaults
⚠️ 最低优先级 仅当其他方式未配置时生效
四、替代方案
- Docker 容器
使用 Docker 打包完整环境,通过 Kubernetes 调度:
# Dockerfile 示例
FROM apache/spark-py:v3.3.2
RUN pip install pandas numpy# 构建并推送镜像
docker build -t my-spark-image:v1 .
docker push myregistry/my-spark-image:v1# 提交作业
spark-submit \--master k8s://https://kubernetes-host:port \--conf spark.kubernetes.container.image=myregistry/my-spark-image:v1 \...
- PySpark 内置依赖管理
通过 --py-files 参数上传 Python 文件 / 包:
spark-submit \--py-files my_module.zip,another_dep.py \your_script.py
五、Pyspark调用Xgboost或者LightGBM
5.1 调用 XGBoost 模型
- 准备依赖
下载 XGBoost 的 Spark 扩展 jar 包:可以从 XGBoost 的官方 GitHub 发布页面 或者 Maven 仓库下载与你使用的 XGBoost 和 Spark 版本兼容的xgboost4j-spark和xgboost4j的 jar 包。例如,如果你使用的是 Spark 3.3.0 和 XGBoost 1.6.2,可以下载对应的版本。
下载其他依赖:确保scala-library等相关依赖也在合适的版本,因为xgboost4j-spark会依赖它们。 - 配置 Spark 提交参数
在使用spark-submit提交作业时,通过–jars参数指定上述下载的 jar 包路径。例如:
spark-submit \--master yarn \--deploy-mode cluster \--jars /path/to/xgboost4j-spark-1.6.2.jar,/path/to/xgboost4j-1.6.2.jar,/path/to/scala-library-2.12.10.jar \your_script.py
也可以将这些 jar 包上传到 HDFS,然后使用 HDFS 路径:
hdfs dfs -put /path/to/xgboost4j-spark-1.6.2.jar /lib/
hdfs dfs -put /path/to/xgboost4j-1.6.2.jar /lib/
hdfs dfs -put /path/to/scala-library-2.12.10.jar /lib/spark-submit \--master yarn \--deploy-mode cluster \--jars hdfs:///lib/xgboost4j-spark-1.6.2.jar,hdfs:///lib/xgboost4j-1.6.2.jar,hdfs:///lib/scala-library-2.12.10.jar \your_script.py
- Python 代码示例
在 Python 代码中,导入相关模块并使用 XGBoost 的 Spark 接口:
from pyspark.sql import SparkSession
from xgboost.spark import XGBoostClassifierspark = SparkSession.builder \.appName("XGBoostOnSpark") \.getOrCreate()# 假设data是一个包含特征和标签的DataFrame
data = spark.read.csv("your_data.csv", header=True, inferSchema=True)
feature_cols = [col for col in data.columns if col != "label"]
label_col = "label"# 创建XGBoost分类器
model = XGBoostClassifier(num_round=10, objective="binary:logistic")
# 拟合模型
model.fit(data, label_col=label_col, features_col=feature_cols)
5.2 调用 LightGBM 模型
- 准备依赖
下载 LightGBM 的 Spark 扩展 jar 包:从 LightGBM 的官方 GitHub 发布页面或者 Maven 仓库获取lightgbm4j-spark相关的 jar 包,以及lightgbm4j的 jar 包。注意选择与你的 Spark 和 LightGBM 版本适配的版本。
处理其他依赖:同样要保证scala-library等依赖的兼容性。 - 配置 Spark 提交参数
和 XGBoost 类似,使用spark-submit时通过–jars参数指定 jar 包路径。例如:
spark-submit \--master yarn \--deploy-mode cluster \--jars /path/to/lightgbm4j-spark-3.3.1.jar,/path/to/lightgbm4j-3.3.1.jar,/path/to/scala-library-2.12.10.jar \your_script.py
或者上传到 HDFS 后使用 HDFS 路径:
hdfs dfs -put /path/to/lightgbm4j-spark-3.3.1.jar /lib/
hdfs dfs -put /path/to/lightgbm4j-3.3.1.jar /lib/
hdfs dfs -put /path/to/scala-library-2.12.10.jar /lib/spark-submit \--master yarn \--deploy-mode cluster \--jars hdfs:///lib/lightgbm4j-spark-3.3.1.jar,hdfs:///lib/lightgbm4j-3.3.1.jar,hdfs:///lib/scala-library-2.12.10.jar \your_script.py
- Python 代码示例
在 Python 代码中,导入模块并使用 LightGBM 的 Spark 接口:
from pyspark.sql import SparkSession
from lightgbm4j.spark import LightGBMClassifierspark = SparkSession.builder \.appName("LightGBMOnSpark") \.getOrCreate()# 假设data是一个包含特征和标签的DataFrame
data = spark.read.csv("your_data.csv", header=True, inferSchema=True)
feature_cols = [col for col in data.columns if col != "label"]
label_col = "label"# 创建LightGBM分类器
params = {"objective": "binary","num_leaves": 31,"learning_rate": 0.05,"feature_fraction": 0.9
}
model = LightGBMClassifier(params=params, num_round=10)
# 拟合模型
model.fit(data, label_col=label_col, features_col=feature_cols)