当前位置: 首页 > news >正文

spark3 sql优化:同一个表关联多次,优化方案

目录

  • 1.合并查询
  • 2.使用 JOIN 条件的过滤优化
  • 3.使用 Map-side Join 或 Broadcast Join
  • 4.使用 Partitioning 和 Bucketing
  • 5.利用 DataFrame API 进行优化
  • 假设 A 和 B 已经加载为 DataFrame
  • Perform left joins with specific conditions
  • 6.使用缓存或持久化
  • 7.避免笛卡尔积
  • 总结

1.合并查询

如果在 SQL 中的多个 JOIN 操作是针对同一个表,只是条件不同,可以考虑将条件合并成一个查询,从而减少对同一表的多次扫描。例如,将多个 LEFT JOIN 转换成一个 JOIN,使用 CASE 或 FILTER 直接处理不同的关联条件。

优化前:

SELECT A.id, A.col1, A.col2, B1.col3 AS B1_col3, B2.col3 AS B2_col3
FROM A
LEFT JOIN B AS B1 ON A.id = B1.id AND A.col1 = B1.col4
LEFT JOIN B AS B2 ON A.id = B2.id AND A.col2 = B2.col4

优化后:

SELECT A.id, A.col1, A.col2, MAX(CASE WHEN A.col1 = B.col4 THEN B.col3 END) AS B1_col3,MAX(CASE WHEN A.col2 = B.col4 THEN B.col3 END) AS B2_col3
FROM A
LEFT JOIN B ON A.id = B.id
GROUP BY A.id, A.col1, A.col2

2.使用 JOIN 条件的过滤优化

通过精简 JOIN 条件,尽量减少连接的行数。例如,如果 B 表中有索引列,可以直接根据索引列做筛选,而不依赖复杂的条件。

假设对 B 表进行的连接条件中,有部分条件可以通过过滤的方式提前应用,比如通过 WHERE 子句或者 JOIN 之前的 FILTER。

SELECT A.id, A.col1, A.col2, B1.col3 AS B1_col3, B2.col3 AS B2_col3
FROM A
LEFT JOIN B AS B1 ON A.id = B1.id AND A.col1 = B1.col4
LEFT JOIN B AS B2 ON A.id = B2.id AND A.col2 = B2.col4
WHERE B1.col3 IS NOT NULL OR B2.col3 IS NOT NULL

3.使用 Map-side Join 或 Broadcast Join

在 Spark SQL 中,当其中一个表(比如 A)较小且能完全加载到内存时,Spark 会自动选择广播连接,即将小表广播到所有工作节点进行连接计算,而不是进行全表扫描。

如果你知道某个表的规模较小(例如 A),可以手动启用广播连接,减少 shuffle 的开销。

SELECT /*+ BROADCAST(A) */A.id, A.col1, A.col2, B1.col3 AS B1_col3, B2.col3 AS B2_col3
FROM A
LEFT JOIN B AS B1 ON A.id = B1.id AND A.col1 = B1.col4
LEFT JOIN B AS B2 ON A.id = B2.id AND A.col2 = B2.col4

在这里,通过 /*+ BROADCAST(A) */ 强制 Spark 将 A 表广播到各个执行节点,从而避免了对大表 B 进行多次 shuffle。

广播条件:
A 表要相对较小,可以完全加载到内存中。
B 表较大,且 A 表的行数远小于 B。

4.使用 Partitioning 和 Bucketing

在分布式环境下,通过合理的分区和分桶设计,可以减少 JOIN 时的 shuffle 开销。尤其是对于大表,可以考虑对 A 或 B 表做分区(PARTITION BY)或分桶(BUCKET BY)。
– 对 B 表进行分桶(根据 id 或其他相关字段)

CREATE TABLE B (id INT,col3 STRING,col4 STRING
)
USING parquet
CLUSTERED BY (id) INTO 10 BUCKETS;

通过将表按某个字段进行分桶,Spark 在进行连接时能够减少数据的移动和重新分配。

5.利用 DataFrame API 进行优化

如果 SQL 性能不够高,可以尝试将查询转为 DataFrame API 编写,Spark DataFrame API 可能在某些复杂的连接和查询场景下更加高效。

假设 A 和 B 已经加载为 DataFrame

from pyspark.sql import functions as F

Perform left joins with specific conditions

df_A = spark.table("A")
df_B = spark.table("B")df_B1 = df_B.filter(df_B.col4.isNotNull()).select("id", "col3")
df_B2 = df_B.filter(df_B.col4.isNotNull()).select("id", "col3")df_result = df_A.join(df_B1, (df_A.id == df_B1.id) & (df_A.col1 == df_B1.col4), "left") \.join(df_B2, (df_A.id == df_B2.id) & (df_A.col2 == df_B2.col4), "left") \.select(df_A.id, df_A.col1, df_A.col2, df_B1.col3.alias("B1_col3"), df_B2.col3.alias("B2_col3"))df_result.show()

DataFrame API 可以对复杂的 JOIN 和条件执行更多优化,比如延迟执行和缓存策略。

6.使用缓存或持久化

如果你在多次查询中重复使用某些中间结果(例如对 B 表的过滤结果或计算结果),可以选择缓存或持久化某些 DataFrame。

df_B1_cached = df_B1.cache()
df_B2_cached = df_B2.cache()df_result = df_A.join(df_B1_cached, (df_A.id == df_B1_cached.id) & (df_A.col1 == df_B1_cached.col4), "left") \.join(df_B2_cached, (df_A.id == df_B2_cached.id) & (df_A.col2 == df_B2_cached.col4), "left")

缓存对于反复使用的子查询可以减少重新计算的开销。

7.避免笛卡尔积

笛卡尔积会导致非常高的计算开销和内存占用,因此在 JOIN 时需要确保条件足够明确,避免无条件的多表连接。你可以使用 EXPLAIN 来分析查询计划,检查是否出现了笛卡尔积。
查询计划中的 CartesianProduct 或 CROSS JOIN

EXPLAIN SELECT A.id, A.col1, A.col2, B.col3 FROM A JOIN B ON A.id = B.id

Spark SQL / Hive 中,查询计划可能会显示 CartesianProduct 或类似的描述,指明两张表间进行了笛卡尔积连接。

== Physical Plan ==
CartesianProduct(0)

PostgreSQL、MySQL 等关系型数据库,通常会标明连接类型。如果执行计划中显示了 CROSS JOIN,则明确表示笛卡尔积。

->  Seq Scan on table_a  (cost=0.00..10.00 rows=100 width=20)
->  Seq Scan on table_b  (cost=0.00..10.00 rows=100 width=20)
->  Hash Join  (cost=200.00..220.00 rows=1000 width=100)

如果这里显示了 CROSS JOIN,就意味着没有任何连接条件,导致笛卡尔积的生成。

通过查看执行计划(EXPLAIN)了解是否存在不必要的全表扫描。

总结

合并查询: 用 CASE WHEN 合并多个 JOIN。
简化 JOIN 条件: 提前通过 WHERE 子句过滤无效数据。
广播连接: 对小表使用 BROADCAST,减少 shuffle 开销。
分区和分桶: 对大表进行分区或分桶优化 JOIN 性能。
使用 DataFrame API: 在某些复杂查询中,DataFrame API 性能更优。
缓存数据: 重复使用的数据可以进行缓存或持久化。
避免笛卡尔积: 确保 JOIN 有明确的条件,避免全表扫描。

http://www.lryc.cn/news/499963.html

相关文章:

  • JavaWeb学习(4)(四大域、HttpSession原理(面试)、SessionAPI、Session实现验证码功能)
  • Ubuntu22.04系统源码编译OpenCV 4.10.0(包含opencv_contrib)
  • 【Unity高级】在编辑器中如何让物体围绕一个点旋转固定角度
  • 2024.11.29——[HCTF 2018]WarmUp 1
  • AGameModeBase和游戏模式方法
  • Swift 扩展
  • 【NebulaGraph】官方查询语言nGQL教程1 (四)
  • 阿里云负载均衡SLB实践
  • 鸿蒙技术分享:❓❓[鸿蒙应用开发]怎么更好的管理模块生命周期?
  • 深度解析 Ansible:核心组件、配置、Playbook 全流程与 YAML 奥秘(上)
  • LabVIEW气缸摩擦力测试系统
  • Leetcode. 688骑士在棋盘上的概率
  • TCP/IP 协议栈高效可靠的数据传输机制——以 Linux 4.19 内核为例
  • Ubuntu22.04搭建LAMP环境(linux服务器学习笔记)
  • 鸿蒙面试---1208
  • java基础教程第16篇( 正则表达式)
  • Docker部署的gitlab升级的详细步骤(升级到17.6.1版本)
  • 【如何制定虚拟货币的补仓策略并计算回本和盈利】
  • 给图像去除水印攻
  • Linux之封装线程库和线程的互斥
  • PH热榜 | 2024-12-08
  • LeetCode刷题day20——贪心
  • CCF编程能力等级认证GESP—C++3级—20241207
  • Microi 吾码:大数据浪潮中的智能领航者
  • Lua语言入门 - Lua 数组
  • gulp应该怎么用,前端批量自动化替换文件
  • 石岩湿地公园的停车场收费情况
  • A7157 基于Java+SSM+mysql+jsp的医院挂号系统的设计与实现 源码 文档 配置 全套资料
  • 数据处理与统计分析——11-Pandas-Seaborn可视化
  • 【计算机网络】实验13:运输层端口