当前位置: 首页 > news >正文

PySpark用sort-merge join解决数据倾斜的完整案例

假设有两个大表 table1 和 table2 ,并通过 sort-merge join 来解决可能的数据倾斜问题。

from pyspark.sql import SparkSession
from pyspark.sql.functions import col# 初始化SparkSession
spark = SparkSession.builder.appName("SortMergeJoinExample").getOrCreate()# 加载数据,假设数据来自parquet文件
table1 = spark.read.parquet("path/to/table1.parquet")
table2 = spark.read.parquet("path/to/table2.parquet")# 查看表的大小
print("table1 size: ", table1.count())
print("table2 size: ", table2.count())# 为了演示数据倾斜,假设我们直接使用join,这里用inner join举例
joined = table1.join(table2, table1["id"] == table2["id"], "inner")# 先对连接键进行排序,为sort-merge join做准备sorted_table1 = table1.sortWithinPartitions("id")
sorted_table2 = table2.sortWithinPartitions("id")# 使用sort-merge join进行连接
joined = sorted_table1.join(sorted_table2, sorted_table1["id"] == sorted_table2["id"], "inner")# 触发Action,查看执行计划,此时可以去Spark WebUI查看任务执行情况
joined.count()# 停止SparkSession
spark.stop()

代码解释

初始化SparkSession:创建一个SparkSession对象,这是与Spark交互的入口。

spark = SparkSession.builder.appName("SortMergeJoinExample").getOrCreate()

加载数据并查看表大小:从Parquet文件加载两张表,并打印出它们的行数,以此来了解表的规模。

table1 = spark.read.parquet("path/to/table1.parquet")
table2 = spark.read.parquet("path/to/table2.parquet")print("table1 size: ", table1.count())
print("table2 size: ", table2.count())

数据预处理:在进行 sort-merge join 之前,对两个表按照连接键 id 在每个分区内进行排序。

sorted_table1 = table1.sortWithinPartitions("id")
sorted_table2 = table2.sortWithinPartitions("id")

执行sort-merge join:利用排序后的表,执行 sort-merge join 操作,这里选择的是内连接。

joined = sorted_table1.join(sorted_table2, sorted_table1["id"] == sorted_table2["id"], "inner")

触发Action并查看执行情况:调用 count() 方法触发一个Action,此时Spark会真正执行整个计算流程。与此同时,可以打开Spark WebUI(通常是 http://your-spark-master:4040 ),在 Stages 页面查看任务执行计划,尤其是查看各个阶段的数据分布情况,确认数据倾斜是否得到解决。

joined.count()

停止SparkSession:任务完成后,关闭SparkSession释放资源。

spark.stop()

要在Spark WebUI中查看数据倾斜:

  • 在执行 joined.count() 后,迅速打开浏览器访问Spark WebUI。进入 Stages 标签页,找到正在执行的 join 相关阶段。查看每个任务的处理数据量,如果之前存在数据倾斜,经过 sort-merge join 处理后,各个任务处理的数据量应该相对均匀。
http://www.lryc.cn/news/518941.html

相关文章:

  • sklearn-逻辑回归-制作评分卡
  • scrapy爬取图片
  • 在 Vue 项目中使用地区级联选
  • 【简博士统计学习方法】第1章:1. 统计学习的定义与分类
  • 利用 Python 脚本批量创建空白 Markdown 笔记
  • 【Qt】C++11 Lambda表达式
  • 怎样提高服务器中的数据传输速度?
  • Vue 封装公告滚动
  • JVM实战—12.OOM的定位和解决
  • 【python翻译软件V1.0】
  • Spring Boot中的依赖注入是如何工作
  • ubuntu22.04 编译安装libvirt 10.x
  • [fastadmin] 第三十四篇 FastAdmin 商城模块标签使用详解
  • (2024,LLaVA-Bench (Wilder),LLaVA-NeXT,LLaMA3,Qwen-1.5,语言模型扩展)
  • IPEX-LLM开发项目过程中的技术总结和心得
  • HTTP/HTTPS ②-Cookie || Session || HTTP报头
  • 【软考】软件设计师
  • K8s Pod OOMKilled,监控却显示内存资源并未打满
  • C++ 原子变量
  • linux网络 | http结尾、理解长连接短链接与cookie
  • 金融项目实战 02|接口测试分析、设计以及实现
  • 二、智能体强化学习——深度强化学习核心算法
  • Mysql--架构篇--存储引擎InnoDB(内存结构,磁盘结构,存储结构,日志管理,锁机制,事务并发控制等)
  • JVM实战—13.OOM的生产案例
  • client-go 的 QPS 和 Burst 限速
  • 使用docker-compose安装Redis的主从+哨兵模式
  • 数据结构(Java版)第七期:LinkedList与链表(二)
  • ant-design-vue 1.X 通过id获取a-input组件失败
  • Flutter:吸顶效果
  • MATLAB语言的数据类型