当前位置: 首页 > article >正文

SparkSQL 优化实操

一、基础优化配置

1. 资源配置优化

# 提交Spark作业时的资源配置示例
spark-submit \--master yarn \--executor-memory 8G \--executor-cores 4 \--num-executors 10 \--conf spark.sql.shuffle.partitions=200 \your_spark_app.py

参数说明

  • executor-memory: 每个Executor的内存

  • executor-cores: 每个Executor的CPU核心数

  • num-executors: Executor数量

  • spark.sql.shuffle.partitions: Shuffle操作的分区数(通常设为集群核心数的2-3倍)

2. 内存管理优化

// 在SparkSession初始化时设置
val spark = SparkSession.builder().appName("OptimizedSparkSQL").config("spark.memory.fraction", "0.8")  // 执行和存储内存占总内存的比例.config("spark.memory.storageFraction", "0.3")  // 存储内存占内存比例.getOrCreate()

二、SQL查询优化技巧

1. 分区裁剪(Partition Pruning)

-- 原始查询(全表扫描)
SELECT * FROM sales WHERE dt = '2023-01-01';-- 优化后(确保表按dt分区)
SELECT * FROM sales WHERE dt = '2023-01-01';  -- 自动分区裁剪

2. 谓词下推(Predicate Pushdown)

-- 原始查询(先JOIN后过滤)
SELECT a.*, b.name 
FROM transactions a 
JOIN users b ON a.user_id = b.id
WHERE a.dt = '2023-01-01' AND b.age > 18;-- 优化后(过滤条件下推)
SELECT /*+ MAPJOIN(b) */ a.*, b.name 
FROM (SELECT * FROM transactions WHERE dt = '2023-01-01') a 
JOIN (SELECT id, name FROM users WHERE age > 18) b 
ON a.user_id = b.id;

3. 广播小表(Broadcast Join)

// 方式1: 通过配置自动广播
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10485760")  // 10MB// 方式2: 手动指定广播
val smallDF = spark.table("small_table")
val largeDF = spark.table("large_table")
largeDF.join(broadcast(smallDF), "key")

三、数据存储优化

1. 文件格式选择

// 写入Parquet格式(列式存储,适合分析)
df.write.parquet("/path/to/parquet")// 写入Delta Lake(支持ACID)
df.write.format("delta").save("/path/to/delta")// 写入ORC(高度压缩)
df.write.orc("/path/to/orc")

2. 分区与分桶

// 按日期分区
df.write.partitionBy("dt").parquet("/path/to/partitioned")// 分桶(适合大表JOIN)
df.write.bucketBy(50, "user_id").sortBy("user_id").saveAsTable("bucketed_table")

四、执行计划分析与优化

1. 查看执行计划

val df = spark.sql("SELECT * FROM sales WHERE amount > 100")
df.explain(true)  // 显示逻辑和物理计划// 更详细的执行计划
spark.sql("EXPLAIN EXTENDED SELECT * FROM sales WHERE amount > 100").show(false)

2. 常见执行计划问题识别

  • 数据倾斜:某个task执行时间远长于其他task

  • 全表扫描:执行计划中出现Scan操作没有过滤条件

  • 非广播Join:出现SortMergeJoin而不是BroadcastHashJoin

  • 数据重复计算:同一子查询被多次执行

3. 解决数据倾斜

// 方法1: 加盐处理
import org.apache.spark.sql.functions._
val skewedKey = "user_id"// 为倾斜键添加随机前缀
val saltedDF = df.withColumn("salted_key", concat(col(skewedKey), lit("_"), floor(rand() * 10)))// 方法2: 单独处理倾斜键
val commonDF = df.filter($"user_id" =!= "skewed_value")
val skewedDF = df.filter($"user_id" === "skewed_value")// 分别处理后union
val result = commonDF.union(skewedDF)

五、缓存策略优化

1. 缓存热数据

val hotDF = spark.sql("SELECT * FROM hot_table")
hotDF.persist(StorageLevel.MEMORY_AND_DISK)  // 内存不足时溢写到磁盘// 检查缓存状态
spark.catalog.cacheTable("hot_table")
spark.catalog.isCached("hot_table")

2. 缓存策略选择

存储级别描述适用场景
MEMORY_ONLY仅内存小数据集,频繁访问
MEMORY_AND_DISK内存+磁盘中等数据集
MEMORY_ONLY_SER序列化存储内存有限,减少内存占用
DISK_ONLY仅磁盘很少访问的大数据集

六、高级优化技巧

1. 动态资源分配

spark-submit \--conf spark.dynamicAllocation.enabled=true \--conf spark.dynamicAllocation.initialExecutors=5 \--conf spark.dynamicAllocation.minExecutors=2 \--conf spark.dynamicAllocation.maxExecutors=20 \your_app.py

2. 自适应查询执行(AQE)

// Spark 3.0+ 启用AQE
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB")

3. 代码生成优化

// 启用全阶段代码生成(默认已启用)
spark.conf.set("spark.sql.codegen.wholeStage", "true")// 对于复杂表达式,可调优
spark.conf.set("spark.sql.codegen.maxFields", "100")

七、监控与调优

1. Spark UI分析

  • Jobs页面:识别长任务

  • Stages页面:查看数据倾斜

  • Storage页面:检查缓存效率

  • SQL页面:分析查询执行计划

2. 日志分析

# 查看Executor日志中的GC情况
grep "GC" spark-executor-*.log# 检查是否有OOM错误
grep "OutOfMemory" spark-executor-*.log

八、实战优化案例

案例:优化慢速JOIN查询

原始查询

SELECT a.*, b.* 
FROM large_table a 
JOIN small_table b ON a.key = b.key
WHERE a.dt BETWEEN '2023-01-01' AND '2023-01-31'

优化步骤

  1. 确认执行计划:发现是SortMergeJoin

  2. 检查表大小:small_table < 10MB

  3. 应用广播Join

    SELECT /*+ BROADCAST(b) */ a.*, b.* 
    FROM large_table a 
    JOIN small_table b ON a.key = b.key
    WHERE a.dt BETWEEN '2023-01-01' AND '2023-01-31'

  4. 添加分区过滤:确保large_table按dt分区

  5. 调整shuffle分区

    spark.conf.set("spark.sql.shuffle.partitions", "200")

通过以上优化,该查询性能提升了15倍。

http://www.lryc.cn/news/2403842.html

相关文章:

  • 【vLLM 学习】Cpu Offload Lmcache
  • 数据库同步是什么意思?数据库架构有哪些?
  • 【数据结构】详解算法复杂度:时间复杂度和空间复杂度
  • Rest-Assured API 测试:基于 Java 和 TestNG 的接口自动化测试
  • 多模型协同:基于 SAM 分割 + YOLO 检测 + ResNet 分类的工业开关状态实时监控方案
  • 【分销系统商城】
  • LangChainGo入门指南:Go语言实现与OpenAI/Qwen模型集成实战
  • 5.1 HarmonyOS NEXT系统级性能调优:内核调度、I/O优化与多线程管理实战
  • react public/index.html文件使用env里面的变量
  • chili3d 笔记17 c++ 编译hlr 带隐藏线工程图
  • 创建一个纯直线组成的字体库
  • 接口不是json的内容能用Jsonpath获取吗,如果不能,我们选用什么方法处理呢?
  • 使用 Docker Compose 从零部署 TeamCity + PostgreSQL(详细新手教程)
  • Go 语言实现高性能 EventBus 事件总线系统(含网络通信、微服务、并发异步实战)
  • Linux进程(中)
  • 【计算机组成原理】计算机硬件的基本组成、详细结构、工作原理
  • npm error Cannot read properties of null (reading ‘matches‘)
  • MVC分层架构模式深入剖析
  • 【方案分享】蓝牙Beacon定位精度优化(包含KF、EKF与UKF卡尔曼滤波算法详解)
  • 新能源汽车热管理核心技术解析:冬季续航提升40%的行业方案
  • LeetCode 239. 滑动窗口最大值(单调队列)
  • 华为云Flexus+DeepSeek征文|DeepSeek-V3/R1开通指南及使用心得
  • 鸿蒙图片缓存(一)
  • 运行示例程序和一些基本操作
  • 学习数字孪生,为你的职业发展开辟新赛道
  • WebRTC源码线程-1
  • python学习打卡day47
  • MySQL中的内置函数
  • Ansible自动化运维全解析:从设计哲学到实战演进
  • YOLOv8n行人检测实战:从数据集准备到模型训练