当前位置: 首页 > news >正文

大数据技术架构(组件)31——Spark:Optimize--->JVM On Compute

2.1.9.4、Optimize--->JVM On Compute

首要的一个问题就是GC,那么先来了解下其原理:

1、内存管理其实就是对象的管理,包括对象的分配和释放,如果显式的释放对象,只要把该对象赋值为null,即该对象变为不可达.GC将负责回收这些不可达对象的内存空间。

2、GC采用有向图的方式记录并管理堆中的所有对象,通过这些方式来确定哪些对象是可达的,哪些对象是不可达的。根据上图的JVM内存分配来看,当Eden满了之后,一个小型的GC将会被触发,Eden和Survivor1中幸存的仍被使用的对象被复制到Survivor2中。同时Survivor1和Survivor2区域进行交换,当一个对象生存的时间足够长,或者Survivor2满了,那么就会把存活的对象移到Old代,当Old空间快满的时候,这个时候会触发一个Full GC.

根据以上简单对GC的回顾,Spark GC调优的目的是确保Old代只存生命周期长的RDD,Young 代只保存短生命周期的对象,尽量避免发生Full GC。

那么这里梳理一下spark中关于Jvm的一些参数调优以及一些调优步骤:

1、针对MetaSpace:

-XX:MetaspaceSize:初始化元空间的大小

-XX:MaxMetaspaceSize:最大元空间大小

-XX:MinMetaspaceFreeRatio:扩大空间的最小比率,当GC后,内存占用超过这一比率后,就会扩大空间

-XX:MaxMetaspaceFreeRatio:缩小空间的最小比率,当GC后,内存占用低于这一比率,就会缩小空间。

默认的Metaspace只会受限于本地内存大小,当Metaspace达到MetaSpaceSize的当前大小时,就会触发GC.

2、GC查看步骤:

2.1、首先查看GC统计日志观察GC启动次数是否太多,可以给JVM设置参数-verbose:GC -XX:+PrintGCDetails,那么就可以在Worker日志中看到每次GC花费的时间;如果某个任务在结束之前,多次发生了Full GC,那么说明执行该任务的内存不够

spark-submit --name "app-name" \

--master local[4] \

--conf spark.shuffle.spill=false \

--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \

jar_name.jar

2.2、如果GC信息显示,Old代空间快满了,那么可以降低spark.memory.storageFraction来减少RDD缓存占用的内存。先不要考虑执行性能问题,先让程序跑起来再说

2.3、如果Major GC比较少,但是Minor GC比较多,可以把Eden内存调大些。

3、计算内存和存储内存调整(钨丝计划就是专门来解决JVM性能问题的)

两者之间没有硬性界限,可以相互借用空间,通过参数spark.memory.fraction(默认0.75)来设置整个堆空间的比例。

spark.storage.memoryFraction:设置RDD持久化数据在Executor内存能占用的比例,默认是0.6;如果作业有较多的RDD持久化的话,那么该参数值可以调高些,避免内存不够缓存所有的数据,导致溢写磁盘;如果作业中shuffle类操作比较多,且频繁发生GC,那么可以适当调低该参数值。

spark.yarn.executor.memoryoverhead:如果数据量很大,导致Stage内存溢出,导致后面的Stage无法获取数据,如出现Shuffle file not found、Executor Task lost、Out Of Memory等问题时,可以调整该参数增大堆外内存。

spark.core.connection.ack.wait.timeout:当然对于not found ,file lost问题也可能是因为某些task去其他节点上拉取数据,而该节点正好正在进行GC,导致连接超时(默认60s),那么可以试着调大该参数值。

spark.shuffle.memoryFraction:设置shuffle过程一个task拉取上一个Stage的task输出后,进行聚合操作时能够使用Executor内存的比例,默认是0.2;如果shuffle使用的内存超过了这个限制,那么就会把多余的数据溢写到磁盘中,如果作业中RDD持久化的操作比较少的话,shuffle比较多的话,那么可以调大该值,降低缓存内存占用比例。

2.1.9.5、Optimize--->Shuffle On Compute

更详细的参数配置见2.1.8.6部分。

1、使用Broadcast实现Mapper端Shuffle。

也就是常说的MapJoin,即将较小的RDD进行广播到Executor上,让该Executor上的所有Task都共享该数据

2、Shuffle传输过程中的序列化和压缩。

序列化和压缩

spark.serializer=org.apache.spark.serializer.KryoSerializer

spark.shuffle.compress=true

spark.shuffle.spill.compress=true

spark.io.compression.codec=snappy

使用KryoSerializer的原因是因为其支持relocation,也就是说在把对象进行序列化之后进行排序,这种排序效果和先对数据排序再序列化是一样的。这个属性会在UnsafeShuffleWriter进行排序中用到。

3、为了避免Spark下的JVM GC可能会导致Shuffle拉取文件失败的问题,可以使用以下措施:

3.1、调整获取Shuffle数据的重试次数,默认是3次

3.2、调整获取Shuffle数据重试间隔,通过spark.shuffle.io.retryWait参数配置,默认为5s

3.3、适当增大reduce端的缓存空间,否则会spill到磁盘,同时也减少GC次数,可以通过spark.reducer.maxSizeInFlight参数配置

3.4、ShuffleMapTask端也可以增大Map任务写缓存,可以通过spark.shuffle.file.buffer,默认为32k

3.5、可以适当调大计算内存,减少溢写磁盘。

2.1.9.6、Optimize---->Data Skew

spark层面的数据倾斜定位可以通过以下几个方面:

1、通过spark web ui界面,查看每个Stage下的每个Task运行的数据量大小

2、通过Log日志分析定位是在哪个Stage中出现了倾斜,然后再定位到具体的Shuffle代码

3、代码走读,重点看Join,各种byKey的关键性代码

4、数据特征分布分析

针对数据倾斜有以下几种解决手段:

2.1.9.6.1、聚合过滤导致倾斜的Keys

可过滤针对业务逻辑中不需要的倾斜数据,例如无效数值

2.1.9.6.2、提高并行度

其主要思想在于把一个Task处理的数据量拆分为多份给不同的task进行处理,进而减轻Task的压力,其本质在于数据的分区策略。

例如

1、通过repartition或者coalesce进行重分区

2、对外部数据读取设置最小分区数

3、在使用涉及到shuffle类算子时,可以显示指定分区数(默认spark会推导分区数)

4、设置默认spark.default.parallelism并行度

2.1.9.6.3、随机Key二次聚合

使用场景:对于各种byKey操作,可以将每个key通过加上随机数前缀进行拆分,先做局部聚合,然后将随机数拆掉在做全局聚合。

2.1.9.6.4、MapJoin

使用场景:两个RDD的数据量,其中一个RDD的数据量特别小,可以放到内存中。

2.1.9.6.5、采样倾斜Key单独处理

使用场景:两个RDD进行join操作,如果一个RDD倾斜严重,那么可以通过采样方式进行拆分,然后再分别和另外一个RDD进行join,最后把结果进行union。

2.1.9.6.6、随机Join

使用场景:两个RDD中的某一个Key或者某几个Key对应的数量很大,那么在Join的时候会发生倾斜。可以将RDD1中的一个或者几个Key加上随机数前缀,然后RDD2在相同的Key上做同样的处理。

2.1.9.6.7、扩容Join

使用场景:如果两个RDD的倾斜Key特别多,则可以将其中一个RDD的数据进行扩容N倍,另一个RDD的每条数据都打上一个n以内的随机前缀,最后进行join

http://www.lryc.cn/news/2196.html

相关文章:

  • ETL基础概念及要求详解
  • 刷题记录:牛客NC23054华华开始学信息学 线段树+分块
  • 二叉搜索树(查找,插入,删除)
  • C# PictureEdit 加载图片
  • 3种方法设置PDF“打开密码”,总有一种适合你
  • 第三章 数据链路层(点到点的传输服务)-计算机网络(笔记)
  • volatile关键字与CAS机制
  • LeetCode题解 动态规划(四):416 分割等和子集;1049 最后一块石头的重量 II
  • 【FFMPEG源码分析】从ffplay源码摸清ffmpeg框架(二)
  • PCIE 学习笔记(入门简介)
  • 锁的优化机制了解嘛?请进!
  • 5.点赞功能 Redis
  • Java序列化和反序列化(详解)
  • 【刷题篇】链表(上)
  • ConcurrentHashMap设计思路
  • Unity基于GraphView的行为树编辑器
  • 网络流量传输MTU解析
  • 30个HTML+CSS前端开发案例(四)
  • 《TPM原理及应用指南》学习 —— TPM执行环境3
  • 实验名称:经典同步问题:生成者与消费者问题
  • EasyCVR视频云存储的架构解析与Sharelist云存挂载方法介绍
  • 电机参数中力矩单位kgf.cm,Nm,mNm表示的含义
  • 使用scikit-learn为PyTorch 模型进行超参数网格搜索
  • Windeployqt 打包,缺少dll 的解决方法
  • 第四章:搭建Windows server AD域和树域
  • 【解决方案】老旧小区升级改造,视频智能化能力如何提升居民安全感?
  • 【遇见青山】项目难点:缓存穿透的解决方案
  • 单一职责原则|SOLID as a rock
  • 使用百度地图官方WEB API,提示 “ APP 服务被禁用“ 问题的解决方法
  • nodejs如何实现Digest摘要认证?