当前位置: 首页 > news >正文

PySpark中python环境打包和JAR包依赖

在 PySpark 中打包 Python 环境并调度到集群是处理依赖一致性的关键步骤。以下是完整的解决方案,包含环境打包、分发和配置方法:

一、环境打包方法

  1. 使用 Conda 打包环境
# 创建 Conda 环境
conda create -n pyspark_env python=3.8
conda activate pyspark_env
conda install numpy pandas pyarrow  # 安装所需依赖# 导出环境为 YAML 文件
conda env export > environment.yml# 打包环境(Linux/macOS)
conda-pack -n pyspark_env -o pyspark_env.tar.gz
  1. 使用 Virtualenv 打包环境
# 创建虚拟环境
python3 -m venv pyspark_env
source pyspark_env/bin/activate  # Linux/macOS
pyspark_env\Scripts\activate  # Windows# 安装依赖
pip install numpy pandas pyarrow# 打包环境(需使用第三方工具)
pip install virtualenv-pack
virtualenv-pack -f -o pyspark_env.tar.gz

二、分发环境到集群

方法 1:通过 --archives 参数上传
在提交作业时,使用 --archives 参数将打包的环境分发到所有节点:

# 将环境包上传到 HDFS,避免每次提交都重新传输:
hdfs dfs -put pyspark_env.tar.gz /path/in/hdfs/spark-submit \--master yarn \--deploy-mode cluster \--py-files helper.py\ # python依赖文件,比如第三方代码等--archives hdfs:///path/in/hdfs/pyspark_env.tar.gz#environment \your_script.py

三、配置 PySpark 使用打包环境

  1. 设置 Python 解释器路径
    在代码中指定 Executor 使用打包环境中的 Python:
import os
os.environ["PYSPARK_PYTHON"] = "./environment/bin/python"  # 对应 --archives 指定的目录名
os.environ["PYSPARK_DRIVER_PYTHON"] = "./environment/bin/python"  # Cluster 模式需要,如果是client模式,driver_python配置本地python路径,比如/opt/conda/bin/python, 需注意本地python和集群打包python的版本一致from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PackagedEnvApp").getOrCreate()
  1. 编写pyspark脚本
import os
os.environ["PYSPARK_PYTHON"] = "./environment/bin/python"from pyspark.sql import SparkSession
import pandas as pd  # 使用打包环境中的 pandasspark = SparkSession.builder.appName("PackagedEnvExample").getOrCreate()# 使用 pandas 处理数据
pdf = pd.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]})
df = spark.createDataFrame(pdf)
df.show()spark.stop()
  1. 提交作业
spark-submit \--master yarn \--deploy-mode cluster \--archives pyspark_env.tar.gz#environment \example.py
  1. 配置优先级与运行模式详解
  • 配置优先级规则
    Spark 配置的优先级从高到低如下:

    SparkSession.build.config
    spark-submit --conf参数
    spark-defaults.conf
    系统默认值
  • 关键结论:

    • SparkSession.builder.config() 优先级最高,会覆盖其他配置
    • spark-submit 参数优先级次之
    • 特殊参数例外:–master 和 --deploy-mode 在 spark-submit 中具有最高优先级
  • deploy-mode 配置规则

    设置方式是否生效说明
    spark-submit✅ 总是生效命令行参数具有最终决定权
    SparkSession 代码❌ 当 spark-submit 指定时无效spark-submit 未指定,则代码配置生效
    spark-defaults⚠️ 最低优先级仅当其他方式未配置时生效

四、替代方案

  1. Docker 容器
    使用 Docker 打包完整环境,通过 Kubernetes 调度:
# Dockerfile 示例
FROM apache/spark-py:v3.3.2
RUN pip install pandas numpy# 构建并推送镜像
docker build -t my-spark-image:v1 .
docker push myregistry/my-spark-image:v1# 提交作业
spark-submit \--master k8s://https://kubernetes-host:port \--conf spark.kubernetes.container.image=myregistry/my-spark-image:v1 \...
  1. PySpark 内置依赖管理
    通过 --py-files 参数上传 Python 文件 / 包:
spark-submit \--py-files my_module.zip,another_dep.py \your_script.py

五、Pyspark调用Xgboost或者LightGBM

5.1 调用 XGBoost 模型

  1. 准备依赖
    下载 XGBoost 的 Spark 扩展 jar 包:可以从 XGBoost 的官方 GitHub 发布页面 或者 Maven 仓库下载与你使用的 XGBoost 和 Spark 版本兼容的xgboost4j-spark和xgboost4j的 jar 包。例如,如果你使用的是 Spark 3.3.0 和 XGBoost 1.6.2,可以下载对应的版本。
    下载其他依赖:确保scala-library等相关依赖也在合适的版本,因为xgboost4j-spark会依赖它们。
  2. 配置 Spark 提交参数
    在使用spark-submit提交作业时,通过–jars参数指定上述下载的 jar 包路径。例如:
spark-submit \--master yarn \--deploy-mode cluster \--jars /path/to/xgboost4j-spark-1.6.2.jar,/path/to/xgboost4j-1.6.2.jar,/path/to/scala-library-2.12.10.jar \your_script.py

也可以将这些 jar 包上传到 HDFS,然后使用 HDFS 路径:

hdfs dfs -put /path/to/xgboost4j-spark-1.6.2.jar /lib/
hdfs dfs -put /path/to/xgboost4j-1.6.2.jar /lib/
hdfs dfs -put /path/to/scala-library-2.12.10.jar /lib/spark-submit \--master yarn \--deploy-mode cluster \--jars hdfs:///lib/xgboost4j-spark-1.6.2.jar,hdfs:///lib/xgboost4j-1.6.2.jar,hdfs:///lib/scala-library-2.12.10.jar \your_script.py
  1. Python 代码示例
    在 Python 代码中,导入相关模块并使用 XGBoost 的 Spark 接口:
from pyspark.sql import SparkSession
from xgboost.spark import XGBoostClassifierspark = SparkSession.builder \.appName("XGBoostOnSpark") \.getOrCreate()# 假设data是一个包含特征和标签的DataFrame
data = spark.read.csv("your_data.csv", header=True, inferSchema=True)
feature_cols = [col for col in data.columns if col != "label"]
label_col = "label"# 创建XGBoost分类器
model = XGBoostClassifier(num_round=10, objective="binary:logistic")
# 拟合模型
model.fit(data, label_col=label_col, features_col=feature_cols)

5.2 调用 LightGBM 模型

  1. 准备依赖
    下载 LightGBM 的 Spark 扩展 jar 包:从 LightGBM 的官方 GitHub 发布页面或者 Maven 仓库获取lightgbm4j-spark相关的 jar 包,以及lightgbm4j的 jar 包。注意选择与你的 Spark 和 LightGBM 版本适配的版本。
    处理其他依赖:同样要保证scala-library等依赖的兼容性。
  2. 配置 Spark 提交参数
    和 XGBoost 类似,使用spark-submit时通过–jars参数指定 jar 包路径。例如:
spark-submit \--master yarn \--deploy-mode cluster \--jars /path/to/lightgbm4j-spark-3.3.1.jar,/path/to/lightgbm4j-3.3.1.jar,/path/to/scala-library-2.12.10.jar \your_script.py

或者上传到 HDFS 后使用 HDFS 路径:

hdfs dfs -put /path/to/lightgbm4j-spark-3.3.1.jar /lib/
hdfs dfs -put /path/to/lightgbm4j-3.3.1.jar /lib/
hdfs dfs -put /path/to/scala-library-2.12.10.jar /lib/spark-submit \--master yarn \--deploy-mode cluster \--jars hdfs:///lib/lightgbm4j-spark-3.3.1.jar,hdfs:///lib/lightgbm4j-3.3.1.jar,hdfs:///lib/scala-library-2.12.10.jar \your_script.py
  1. Python 代码示例
    在 Python 代码中,导入模块并使用 LightGBM 的 Spark 接口:
from pyspark.sql import SparkSession
from lightgbm4j.spark import LightGBMClassifierspark = SparkSession.builder \.appName("LightGBMOnSpark") \.getOrCreate()# 假设data是一个包含特征和标签的DataFrame
data = spark.read.csv("your_data.csv", header=True, inferSchema=True)
feature_cols = [col for col in data.columns if col != "label"]
label_col = "label"# 创建LightGBM分类器
params = {"objective": "binary","num_leaves": 31,"learning_rate": 0.05,"feature_fraction": 0.9
}
model = LightGBMClassifier(params=params, num_round=10)
# 拟合模型
model.fit(data, label_col=label_col, features_col=feature_cols)
http://www.lryc.cn/news/585387.html

相关文章:

  • tensor
  • Word表格默认格式修改成三线表,一劳永逸,提高生产力!
  • 上位机知识篇---高效下载安装方法
  • 05 rk3568 debian11 root用户 声音服务PulseAudio不正常
  • PyTorch 与 Spring AI 集成实战
  • 2025Nginx最新版讲解/面试
  • 【yolo】模型训练参数解读
  • 七、gateway服务创建
  • WPS、Word加载项开发流程(免费最简版本)
  • [Meetily后端框架] 多模型-Pydantic AI 代理-统一抽象 | SQLite管理
  • VLLM部署DeepSeek-LLM-7B-Chat 模型
  • Lecture #19 : Multi-Version Concurrency Control
  • Jenkins 版本升级与插件问题深度复盘:从 2.443 到 2.504.3 及功能恢复全解析
  • FPGA实现SDI转LVDS视频发送,基于GTX+OSERDES2原语架构,提供2套工程源码和技术支持
  • Java进阶---并发编程
  • 【C/C++ shared_ptr 和 unique_ptr可以互换吗?】
  • 【AI News | 20250710】每日AI进展
  • 一个中层管理者应该看什么书籍?
  • 使用Python将目录中的JPG图片按后缀数字从小到大顺序纵向拼接,很适合老师发的零散图片拼接一个图片
  • 谷歌独立站是什么?谷歌独立站建站引流完全指南
  • HarmonyOS基础概念
  • Python中类静态方法:@classmethod/@staticmethod详解和实战示例
  • C#中的设计模式:构建更加优雅的代码
  • 链接代理后无法访问网络
  • C++入门基础篇(二)
  • HandyJSON使用详情
  • 使用Spring Boot和PageHelper实现数据分页
  • Excel快捷键
  • 20250710-2-Kubernetes 集群部署、配置和验证-网络组件存在的意义?_笔记
  • leetcode:377. 组合总和 Ⅳ[完全背包]