当前位置: 首页 > news >正文

PySPARK带多组参数和标签的SparkSQL批量数据导出到S3的程序

设计一个基于多个带标签SparkSQL模板作为配置文件和多组参数的PySPARK代码程序,实现根据不同的输入参数自动批量地将数据导出为Parquet、CSV和Excel文件到S3上,标签和多个参数(以“_”分割)为组成导出数据文件名,文件已经存在则覆盖原始文件。
代码如下:

import json
from pyspark.sql import SparkSessiondef load_config(config_path):with open(config_path, 'r') as f:return json.load(f)def main(config_path, base_s3_path):# 初始化SparkSession,配置S3和Excel支持spark = SparkSession.builder \.appName("DataExportJob") \.config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.13.7,org.apache.hadoop:hadoop-aws:3.3.1") \.getOrCreate()# 配置S3访问(根据实际环境配置)spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key", "YOUR_ACCESS_KEY")spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "YOUR_SECRET_KEY")spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.amazonaws.com")config = load_config(config_path)for template in config['templates']:label = template['label']sql_template = template['sql_template']parameters_list = template['parameters']for params in parameters_list:# 验证参数数量是否匹配placeholders = sql_template.count('{')if len(params) != placeholders:raise ValueError(f"参数数量不匹配,模板需要{placeholders}个参数,但当前参数为{len(params)}个")# 替换SQL中的占位符formatted_sql = sql_template.format(*params)df = spark.sql(formatted_sql)# 生成文件名参数部分param_str = "_".join(params)base_filename = f"{label}_{param_str}"# 定义输出路径output_paths = {'parquet': f"{base_s3_path}/parquet/{base_filename}",'csv': f"{base_s3_path}/csv/{base_filename}",'excel': f"{base_s3_path}/excel/{base_filename}.xlsx"}# 写入Parquetdf.write.mode('overwrite').parquet(output_paths['parquet'])# 写入CSV(自动生成header)df.write.mode('overwrite') \.option("header", "true") \.csv(output_paths['csv'])# 写入Excel(使用spark-excel包)df.write.format("com.crealytics.spark.excel") \.option("header", "true") \.option("inferSchema", "true") \.mode("overwrite") \.save(output_paths['excel'])spark.stop()if __name__ == "__main__":import argparseparser = argparse.ArgumentParser()parser.add_argument('--config', type=str, required=True, help='Path to config JSON file')parser.add_argument('--s3-path', type=str, required=True, help='Base S3 path (e.g., s3a://your-bucket/data)')args = parser.parse_args()main(args.config, args.s3_path)

配置文件示例(config.json)

{"templates": [{"label": "sales_report","sql_template": "SELECT * FROM sales WHERE date = '{0}' AND region = '{1}'","parameters": [["202301", "north"],["202302", "south"]]},{"label": "user_activity","sql_template": "SELECT user_id, COUNT(*) AS cnt FROM activity WHERE day = '{0}' GROUP BY user_id","parameters": [["2023-01-01"],["2023-01-02"]]}]
}

使用说明

  1. 依赖管理

    • 确保Spark集群已安装Hadoop AWS和Spark Excel依赖:
      spark-submit --packages com.crealytics:spark-excel_2.12:0.13.7,org.apache.hadoop:hadoop-aws:3.3.1 your_script.py
      
  2. S3配置

    • 替换代码中的YOUR_ACCESS_KEYYOUR_SECRET_KEY为实际AWS凭证
    • 根据S3兼容存储调整endpoint(如使用MinIO需特殊配置)
  3. 执行命令

    spark-submit --packages com.crealytics:spark-excel_2.12:0.13.7,org.apache.hadoop:hadoop-aws:3.3.1 \
    data_export.py --config config.json --s3-path s3a://your-bucket/exports
    

输出结构

s3a://your-bucket/exports
├── parquet
│   ├── sales_report_202301_north
│   ├── sales_report_202302_south
│   └── user_activity_2023-01-01
├── csv
│   ├── sales_report_202301_north
│   ├── sales_report_202302_south
│   └── user_activity_2023-01-01
└── excel├── sales_report_202301_north.xlsx├── sales_report_202302_south.xlsx└── user_activity_2023-01-01.xlsx
http://www.lryc.cn/news/530765.html

相关文章:

  • 蓝桥杯备考:模拟算法之字符串展开
  • 使用LLaMA-Factory对AI进行认知的微调
  • @Nullable 注解
  • Arduino大师练成手册 -- 控制 AS608 指纹识别模块
  • Mask R-CNN与YOLOv8的区别
  • 在Ubuntu上使用Docker部署DeepSeek
  • MySQL的覆盖索引
  • 【Numpy核心编程攻略:Python数据处理、分析详解与科学计算】2.12 连续数组:为什么contiguous这么重要?
  • 在React中使用redux
  • lstm预测
  • 《 C++ 点滴漫谈: 二十五 》空指针,隐秘而危险的杀手:程序崩溃的真凶就在你眼前!
  • 【AI】探索自然语言处理(NLP):从基础到前沿技术及代码实践
  • 2025年Android开发趋势全景解读
  • C#面试常考随笔11:Dictionary<K, V>、Hashtable的内部实现原理是什么?效率如何?
  • Linux防火墙基础
  • Qt u盘自动升级软件
  • 【Conda 和 虚拟环境详细指南】
  • Python递归函数深度解析:从原理到实战
  • OpenGL学习笔记(五):Textures 纹理
  • 【TypeScript】基础:数据类型
  • Notepad++消除生成bak文件
  • Android NDK
  • 内部知识库助力组织智力激发与信息共享实现业绩增长
  • 通过F12收集的信息
  • 用Python替代OpenMV IDE显示openmv USB 图像
  • c语言:编译和链接(详解)
  • 数据结构【单链表操作大全详解】【c语言版】(只有输入输出为了方便用的c++)
  • leetcode27.删除有序数组中的重复项
  • [c语言日寄]越界访问:意外的死循环
  • 【c++11】包装器