当前位置: 首页 > news >正文

Spark 优化全攻略:从 “卡成 PPT“ 到 “飞一般体验“

哈喽各位数据打工人~ 是不是总被 Spark 任务的 "慢" 折磨?咖啡续到天亮,任务还在 "龟速" 爬行?😭 别慌!今天把 Spark 优化的 "家底" 全抖出来,从资源到代码,从理论到实操,保证小白也能看懂~ 所有知识点一个不落,赶紧码住!

一、开局先给 "粮草":资源配置优化🍚(性能调优第一步)

Spark 任务跑不快,八成是 "饿" 的!就像打游戏没装备,再牛的操作也白搭~ 资源配置是优化的第一步,给够资源,性能直接翻倍!

1. 核心资源参数详解(提交脚本必看)

提交 Spark 任务时,这些参数决定了你的任务能调动多少 "兵力":

bin/spark-submit \--class com.bigdata.spark.Analysis \  # 主类路径--master yarn \  # 生产环境必用yarn!--deploy-mode cluster \  # 集群模式--num-executors 80 \  # Executor(小兵)数量--driver-memory 6g \  # Driver(指挥官)内存--executor-memory 6g \  # 每个小兵的内存--executor-cores 3 \  # 每个小兵的CPU核数/usr/opt/modules/spark/jar/spark.jar \  # 任务jar包

👉 这些参数控制着 "小兵数量"、"每个小兵的力气(CPU)" 和 "背包大小(内存)",直接影响并行能力!

2. 不同集群模式的资源分配技巧

Spark 有两种主要运行模式,资源分配套路不同:

  • Standalone 模式:直接看集群总资源!比如 15 台机器,每台 8G 内存 + 2 核 CPU,就配 15 个 Executor,每个 8G 内存 + 2 核(把资源用满)。
  • Yarn 模式:按资源队列分配!比如队列有 400G 内存 + 100 核,就配 50 个 Executor(400/8=50),每个 8G 内存 + 2 核(100/50=2)。

3. 资源调优的 "真香" 效果

  • 加 Executor 数量:4 个 Executor×2 核 → 8 个 task 并行;8 个 Executor×2 核 → 16 个 task 并行(直接翻倍!)。
  • 加 CPU 核数:4 个 Executor×2 核 → 8 个 task;4 个 Executor×4 核 → 16 个 task(并行能力翻倍!)。
  • 加内存:内存大了能缓存更多数据(少写磁盘)、给 shuffle 更多空间(少磁盘 IO)、减少 GC(避免频繁 "卡顿")。

4. 生产环境配置参考

--num-executors:50~100  # 小兵数量
--driver-memory:1G~5G  # 指挥官内存(够用就行)
--executor-memory:6G~10G  # 小兵背包大小
--executor-cores:3  # 小兵的手(核数)
--master:必须是yarn!  # 生产环境标配

二、RDD 优化:让数据 "少干活"💼(避免重复劳动)

RDD 是 Spark 的 "打工人",优化 RDD 就是让它们别做无用功~ 这部分细节超多,一个都不能漏!

1. RDD 复用:别让数据 "重复加班"

比如计算 "用户活跃率" 和 "用户留存率" 时,都需要 "用户登录记录"。如果每次都重新计算这份数据,相当于让员工重复做同一份报表 —— 纯纯浪费!😤
优化:把重复用的 RDD 存为一个变量,后续直接调用,一次计算多次复用~

2. RDD 持久化:给数据 "记笔记"📝

Spark 默认每次用 RDD 都会重新计算(从父 RDD 开始算),就像每次写作业都重新抄题,傻爆了!必须持久化!

  • 怎么做:用cache()persist()把 RDD 存到内存 / 磁盘,下次直接取~
  • 关键细节
    • 内存不够?用序列化!把数据压缩变小,塞到内存里(比如persist(StorageLevel.MEMORY_ONLY_SER))。
    • 怕数据丢了?开副本机制!每个数据存两个副本到不同节点,丢了一个还有备份(persist(StorageLevel.MEMORY_ONLY_2))。

3. 早过滤:提前 "扔垃圾"🗑️

拿到原始数据后,第一时间把没用的过滤掉!就像网购拆快递,先扔包装盒再摆东西,省空间又高效~
例子:分析用户订单时,先过滤掉测试订单、无效订单,再处理剩下的有效数据,内存占用直接减半!

三、并行度调节:让 CPU"不摸鱼"⚡(资源利用最大化)

并行度就是同时跑多少个 task(任务)。如果 CPU 核闲着,就是在浪费钱!💸

1. 并行度的核心逻辑

  • 并行度= 每个 Stage 的 task 数量。
  • 若并行度太低:比如 20 个 Executor×3 核 = 60 核,但只跑 40 个 task,20 个核摸鱼(资源浪费!)。
  • 理想状态:task 数量 = 总 CPU 核数 ×2~3 倍!这样核跑完一个 task 马上接下一个,不闲着~

2. 为什么是 2~3 倍?

task 执行速度有快有慢(比如有的数据多,有的少)。如果 task 数量 = 核数,快的核跑完就闲着;多设 2~3 倍,快的核能继续干活,整体效率飙升!

3. 默认并行度与 Stage 划分

  • 默认并行度:官方默认 200(别和 Flink 混了!)。
  • Stage 划分:遇到 shuffle 算子(如 reduceByKey)就会分 Stage。比如read→map→flatmap→reduceByKey→foreach,前三个算子一个 Stage,后面一个 Stage。

四、广播大变量:内存 "瘦身术"🧙‍♀️(减少重复存储)

普通变量在每个 task 里都存一份,内存直接爆炸!广播变量能让内存消耗 "断崖式下跌"~

1. 问题场景

比如一个 20M 的变量被 500 个 task 共用:

  • 普通变量:500 个副本→500×20M=10G 内存(血亏!)。
  • 广播变量:每个 Executor 存 1 份,20 个 Executor→20×20M=400M 内存(直接省 25 倍!)。

2. 广播变量工作原理

  • Driver 先存 1 份,task 运行时先从本地 Executor 的 BlockManager 取,没有就从 Driver 或其他节点拉取,之后所有 task 共用这一份。

五、Kryo 序列化:数据 "压缩包"📦(更快更小)

默认用 Java 序列化,但效率低!Kryo 序列化速度快 10 倍,数据体积更小~

1. 为什么用 Kryo?

  • Java 序列化:方便但慢,数据大(需要实现 Serializable 接口)。
  • Kryo 序列化:速度快 10 倍,体积小,但需要注册类型(Spark 2.0 后,简单类型 / 字符串默认用 Kryo 啦~)。

2. 配置方法

val conf = new SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").registerKryoClasses(Array(classOf[YourClass]))  // 注册需要序列化的类

六、本地化等待时长:数据 "少跑腿"🏃‍♂️(减少网络传输)

Spark 希望 task 在数据所在节点跑(少传数据),但节点资源可能被占满,这时候要等一等!

1. 本地化等级(性能从高到低)

  • PROCESS_LOCAL:task 和数据在同一个 Executor(最好!)。
  • NODE_LOCAL:同一节点不同 Executor(数据进程内传)。
  • RACK_LOCAL:同一机架不同节点(网络传)。
  • ANY:任意节点(最差,跨机架传)。

2. 调节技巧

  • 默认等 3 秒,若日志里很多 NODE_LOCAL/ANY,延长等待时间(比如 5 秒),让 task 等节点资源释放。
  • 别太长!否则等待时间比省的传输时间还多,反而变慢~
  • 配置:spark.locality.wait=5s(测试用 client 模式看日志调优)。

七、算子调优:选对工具干得快🔧(每个算子都有讲究)

算子用错,努力白费!这部分是优化核心,细节拉满~

1. mapPartitions:批量处理 "加速器"

  • 普通 map:逐条处理数据(1 万条数据调用 1 万次函数,比如建 1 万次数据库连接)。
  • mapPartitions:按分区处理(1 个分区调用 1 次函数,1 万条数据建 1 次连接)。
  • 注意:数据量大别用!一次加载整个分区数据可能 OOM(内存溢出)~

2. foreachPartition:数据库写入 "神器"

和 mapPartitions 同理,写数据库时用它:一个分区建一次连接,批量写入,比 foreach(逐条建连接)快 10 倍!

// 优化写法
rdd.foreachPartition(iter => {val conn = getDBConnection()  // 一个分区建一次连接iter.foreach(data => conn.insert(data))  // 批量写conn.close()
})

3. filter + coalesce:解决数据 "肥瘦不均"

  • 问题:filter 后分区数据量差异大(比如一个分区 100 条,一个 800 条),导致 task 忙闲不均(数据倾斜)。
  • 解决:用 coalesce 重分区,让每个分区数据量均匀。
    • coalesce vs repartition
      • 减少分区(A>B):用 coalesce(默认不 shuffle,差距大时设shuffle=true)。
      • 增加分区(A<B):用 repartition(底层是 coalesce+shuffle)。

4. repartition:破解 SparkSQL"并行度锁死"

  • SparkSQL 的并行度由 HDFS 文件 split 数决定,用户改不了!如果数据量大 + 逻辑复杂,task 少就会很慢。
  • 解法:SQL 查出来的 RDD 立即用 repartition 重分区,后续 Stage 并行度就由你控制了~

5. reduceByKey:shuffle"预聚合王者"

  • groupByKey:map 端不聚合,全量 shuffle 到 reduce 端(网络传输量大)。
  • reduceByKey:map 端先本地聚合(比如先算每个单词在本节点的次数),再 shuffle(传输量骤减)。
  • 结论:能用 reduceByKey 就别用 groupByKey!

八、Shuffle 调优:数据 "搬家" 不堵车🚚(最容易踩坑的环节)

Shuffle 是数据在节点间传输的过程,最容易卡壳,这 5 个参数必调!

1. map 端缓冲区:小推车 "扩容"

默认 32KB,数据量大时会频繁溢写到磁盘(比如 640KB 数据要溢写 20 次)。调大到 64KB 减少 IO:

spark.conf.set("spark.shuffle.file.buffer", "64k")

2. reduce 端拉取缓冲区:货车 "扩容"

默认 48MB,调大到 96MB,一次拉更多数据,减少网络传输次数:

spark.conf.set("spark.reducer.maxSizeInFlight", "96m")

3. 拉取重试次数:网络差就 "多试几次"

默认重试 3 次,大数据量 shuffle 易失败。调大到 60 次提高稳定性:

spark.conf.set("spark.shuffle.io.maxRetries", "60")

4. 拉取等待间隔:失败了 "歇会儿再试"

默认等 5 秒,调大到 60 秒,给网络 / GC 留缓冲时间:

spark.conf.set("spark.shuffle.io.retryWait", "60s")

5. SortShuffle 排序阈值:不需要排序就 "跳过"

SortShuffleManager 默认当 reduce task<200 时不排序。若确定不需要排序,调大阈值(比如 500),减少排序开销:

spark.conf.set("spark.shuffle.sort.bypassMergeThreshold", "500")

九、JVM 调优:内存 "合理分家"🏠(避免频繁 "清理房间")

JVM 内存分块不合理,会频繁 GC(垃圾回收),导致任务卡顿~

1. 静态内存管理:给执行区 "腾空间"

  • 静态模式下:Storage(缓存 RDD / 广播数据)占 60%,Execution(shuffle 中间数据)占 40%,两者独立。
  • 若 Execution 内存不够(频繁 GC),调小 Storage 占比:
spark.conf.set("spark.storage.memoryFraction", "0.4")  // 降到40%

2. 统一内存管理:内存 "动态分配"

  • 统一模式下:Storage 和 Execution 各占 50%,支持动态占用(shuffle 内存不够时自动用 Storage 的),无需手动调!

3. 堆外内存:额外 "储物间" 扩容

若常报 "out of memory"、"executor lost",可能堆外内存不够(默认≈300MB)。调大到 1G 以上:

--conf spark.executor.memoryOverhead=2g  # 堆外内存设2G

总结:Spark 优化 "全地图"🗺️

  1. 资源配置:按模式分资源,Executor、内存、核给够。
  2. RDD 优化:复用、持久化(序列化 + 副本)、早过滤。
  3. 并行度:task 数量 = 核数 ×2~3 倍,避免资源浪费。
  4. 广播变量:减少变量副本,内存直降 25 倍 +。
  5. Kryo 序列化:比 Java 快 10 倍,数据更小。
  6. 本地化等待:根据日志调时长,减少网络传输。
  7. 算子调优:mapPartitions/foreachPartition 批量处理,coalesce 解决倾斜,reduceByKey 替代 groupByKey。
  8. Shuffle 调优:5 个参数(缓冲区、重试、阈值)全调对。
  9. JVM 调优:静态内存分比例,堆外内存不够就扩容。

按这个清单一步步优化,你的 Spark 任务绝对能从 "卡成 PPT" 变成 "飞一般体验"!🚀 赶紧收藏实践吧~ 有问题评论区问我哦~

http://www.lryc.cn/news/617599.html

相关文章:

  • Vlanif 实验
  • 第16届蓝桥杯Python青少组_省赛_中/高级组_2025年5月真题
  • 国企社招 | 中国邮政2025年社会招聘开启
  • 腾讯前端面试模拟详解
  • Java 之抽象类和接口
  • AIStarter修复macOS 15兼容问题:跨平台AI项目管理新体验
  • docker是什么以及镜像命令详解
  • C++模板的补充
  • 【读代码】微软开源Agentic-RAG深度解析
  • Profile.vue组件详细解析
  • SDH 和 OTN 的帧结构对比
  • 3.数据类型和类型装换
  • Spring-Security-5.7.11升级6.5.2
  • Unity笔记(五)知识补充——场景切换、退出游戏、鼠标隐藏锁定、随机数、委托
  • 前端面试:promise...then与asnyc ...await
  • 简单了解MongoDB数据存储
  • ‌太钢建材:筑就未来,品质见证
  • 软考倒计时 巧用芝麻倒计时软件 助力高效备考 有效提升备考效率
  • DNS(域名系统)
  • 关于线性DP模板
  • 月报 Vol.02:新增条件编译属性 cfg、#alias属性、defer表达式,增加 tuple struct 支持
  • 基于OpenCV的实时美颜技术:从传统算法到深度学习融合实现
  • 基恩士 CA CNX10U 视觉连接器REPEATER CA CN or CV- C 日本原装进口
  • 【软考中级网络工程师】知识点之 TCP 协议深度剖析
  • 嵌入式第二十五课!!!文件读写操作补充与应用、以及文件流定位相关函数接口
  • idea没有 Active Profiles 的填写位置
  • 企业级 IT 运维服务平台数据备份方案:基于 rsync 的自动化实现
  • 视觉相机偏移补偿
  • 在macOS上扫描192.168.1.0/24子网的所有IP地址
  • 初识影刀:将多个相同格式EXCEL中内容汇总到一个EXCEL文件中去