当前位置: 首页 > news >正文

Spark Shuffle中的数据结构

文章目录

  • 1.Shuffle中的三种数据结构
  • 2.AppendOnlyMap原理
    • 2.1 聚合
    • 2.2 扩容
    • 2.3 排序
    • 2.4 为什么是数组?
  • 3.ExternalAppendOnlyMap原理
    • 3.1 工作原理
    • 3.2 AppendOnlyMap大小估计
      • 3.2.1 为什么要估计大小?
      • 3.2.2 估计大小浅析
        • 3.2.2.1 什么时候采样?
        • 3.2.2.2 大小估算
        • 3.2.2.3 采样后
    • 3.3 Spill过程与排序
    • 3.4 全局聚合merge-sort Shuffle
  • 4.PartitionedAppendOnlyMap原理
  • 5.PartitionedPairBuffer原理

1.Shuffle中的三种数据结构

在Spark Shuffle机制原理中,介绍了三种Shuffle的数据结构。
在这里插入图片描述
Spark中的PartitionedAppendOnlyMap和ExternalAppendOnlyMap都基于AppendOnlyMap实现。因此,我们先介绍AppendOnlyMap的原理。

2.AppendOnlyMap原理

AppendOnlyMap实际上是一个只支持record添加和对Value进行更新的HashMap。AppendOnlyMap只使用数组来存储元素,根据元素的 Hash值确定存储位置,如果存储元素时发生Hash值冲突,则使用二次地址探测法来解决Hash值冲突。
对于每个新来的<K,V>record,先使用Hash(K)计算其存放位置,如果存放位置为空,就把record存放到该位置。如果该位置已经被占用,就使用二次探测法来找下一个空闲位置。
在这里插入图片描述

2.1 聚合

  • 首先要明确的是,我们是对相同的key做聚合,而不是对相同的key的哈希值做聚合,不同的key哈希值有可能相等。
  • 当两个key的哈希值相同时,后来的key会将Hash(Key)+n*n来找到该key应该在的位置。
  • 当相同的key到来时,直接在列表中相同key的位置进行聚合。

2.2 扩容

如果插入的record太多,则很快会被填满。Spark的解决方案是,如果AppendOnlyMap的利用率达到70%,那么就扩张一倍,扩张意味着原来的Hash()失效,因此对所有Key进行rehash,重新排列每个Key的位置。

2.3 排序

由于AppendOnlyMap采用了数组作为底层存储结构,可以支持快速排序等排序算法。先将数组中所有的<K,V>record转移到数组的前端,用begin和end来标示起始位置,然后调用排序算法对[begin,end]中的record进行排序。对于需要按Key进行排序的操作,如sortByKey(),可以按照Key值进行排序;对于其他操作,只按照Key的Hash值进行排序即可。
在这里插入图片描述

2.4 为什么是数组?

  • 减少内存开销:传统哈希表(如 Java HashMap)为解决哈希冲突,采用 “数组 + 链表 / 红黑树” 结构,每个节点需要额外存储指针(如链表节点的next引用),这会增加内存占用。而AppendOnlyMap仅使用单个数组存储键值对,通过二次探测法解决冲突,避免了指针开销,更适合存储海量数据。
  • 便于直接排序:AppendOnlyMap的底层数组可以直接作为排序的数据源。

3.ExternalAppendOnlyMap原理

3.1 工作原理

Spark基于AppendOnlyMap设计实现了基于内存+磁盘的ExternalAppendOnlyMap,用于Shuffle Read端大规模数据聚合。
ExternalAppendOnlyMap的工作原理是:

  • 先持有一个AppendOnlyMap来不断接收和聚合新来的record,AppendOnlyMap快被装满时检查一下内存剩余空间是否可以扩展,可直接在内存中扩展,不可扩展则对AppendOnlyMap中的record进行排序,然后将record都spill到磁盘上。
  • 因为record不断到来,可能会多次填满AppendOnlyMap,所以这个spill过程可以出现多次,最终形成多个spill文件。
  • 等record都处理完,此时AppendOnlyMap中可能还留存一些聚合后的record,磁盘上也有多个spill文件。因为这些数据都经过了部分聚合,还需要进行全局聚合(merge)。

3.2 AppendOnlyMap大小估计

3.2.1 为什么要估计大小?

一种简单的解决方法是在每次插入record或对现有record的Value进行更新后,都扫描一下AppendOnlyMap中存放的record,计算每个record的实际对象大小并相加,但这样会非常耗时。而且一般AppendOnlyMap会插入几万甚至几百万个record,如果每个record进入AppendOnlyMap都计算一遍,则开销会很大。
Spark设计
了一个增量式的高效估算算法,在每个record插入或更新时根据历史统计值和当前变化量直接估算当前AppendOnlyMap的大小,算法的复杂度是 O (1),开销很小。在record插入和聚合过程中会定期对当前AppendOnlyMap中的record进行抽样,然后精确计算这些record的总大小、总个数、更新个数及平均值等,并作为历史统计值。进行抽样是因为AppendOnlyMap中的record可能有上万个,难以对每个都精确计算。之后,每当有record插入或更新时,会根据历史统计值和历史平均的变化值,增量估算AppendOnlyMap的总大小,详见Spark源码中的SizeTracker.estimateSize()方法。抽样也会定期进行,更新统计值以获得更高的精度。这个后面有时间研究一下。

3.2.2 估计大小浅析

经过翻阅spark源码,发现估计不同类型对象的方式有所不同,这里只分析数组方式。
对于普通数组来说,元素类型是int,float等,可以直接获取准确大小,不需要进行估计了。但是AppendOnlyMap中通常是java复杂对象。

3.2.2.1 什么时候采样?

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/collection/SizeTracker.scala#L67
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala

  • 初始化时采样:初始化时调用 resetSamples(),将首次采样时机 nextSampleNum 设为 1(即第 1 次更新后采样)。nextSampleNum表示的就是下一次采样的时机。
  • 初始化后采样:nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong,其中SAMPLE_GROWTH_RATE的值为1.1,经查阅,应该是一个经验值,numUpdates表示的是当前数组的更新次数,如果当前是100,那么下一次采样的时机为100+100+1.1=110次时。
3.2.2.2 大小估算

数组元素的大小由两部分构成,一个是数组对象本身的固定开销(对象头),这里的数组对象本身是由数组封装的,通常大小比较固定;一个是数组元素本身内存的开销,数组元素可能是java对象或者对象指针。

  • 一个数组元素的精确总大小 = 数组元素头+指针大小+指针指向的对象大小
  • 抽样时机与抽样个数: private val ARRAY_SIZE_FOR_SAMPLING = 400,这里的400的数字在源码中是写死的,也就是说当数组长度超过400的时候,才会去进行抽样,private val ARRAY_SAMPLE_SIZE = 100抽样的方式也很简单,进行两次抽样,每次抽取100个元素。
  • 抽样的大小:先取两次抽样的最小值为val size = math.min(s1, s2)(减小共享对象对抽样的影响)。整体元素的大小为max(s1, s2) + size × ((总长度 - 100) / 100)
3.2.2.3 采样后
  • 采样后,将采样结果封装成对象,放进一个队列中。并且该队列仅保留最近的两次采样。根据这两次最近的采样估算每次更新的平均字节数。
  • 每次更新平均字节数:val bytesDelta = 最近两次采样的大小差 / 最近两次采样的更新次数差,注意这两次指的不是上面大小计算的两次,而是两次大小估算整个流程输出的结果。
  • 本次采样估算:根据平均字节增长量,估算本次采样后的增量,当前估算大小 = 上次采样大小 + 增量

时间复杂度:估算过程仅依赖内存中的采样数据和累计更新次数,因此是 O(1) 时间复杂度,高效且适合高频调用。

3.3 Spill过程与排序

  • 当AppendOnlyMap达到内存限制时,会将record排序后写入磁盘中。排序是为了方便下一步全局聚合(聚合内存和磁盘上的record)时可以采用更高效的merge-sort(外部排序+聚合)。注意,这里为了后面的高效聚合,因此进行了排序
  • sortByKey()等操作定义了按照Key进行的排序。
  • 如groupByKey(),并没有定义Key的排序方法,也不需要输出结果是按照Key进行排序的。在这种情况下,Spark采用按照Key的Hash值进行排序的方法,这样既可以进行merge-sort,又不要求操作定义Key排序的方法。这种方法的问题是会出现Hash值冲突,也就是不同Key具有相同的Hash值。为了解决这个问题,Spark在merge-sort的同时会比较Key的Hash值是否相等,以及Key的实际值是否相等。

3.4 全局聚合merge-sort Shuffle

在这里插入图片描述

  • 当所有数据输入完成后,会对当前内存中的AppendOnlyMap进行排序,并且所有的spill文件都是排序好的,现在需要进行全局聚合。
  • 聚合使用的是一个最小堆的结构,将AppendOnlyMap中的第一个元素,和所有spill在磁盘上的文件都读取第一个元素维护一个最小堆。每次取堆顶元素进行聚合,直到堆顶元素不相同为止,再取堆顶元素进行下一轮聚合。

4.PartitionedAppendOnlyMap原理

PartitionedAppendOnlyMap用于在Shuffle Write端对record进行聚合(combine)。PartitionedAppendOnlyMap的功能和实现与ExternalAppendOnlyMap的功能和实现基本一样,唯一区别是PartitionedAppendOnlyMap中的Key是“PartitionId+Key”,这样既可以根据partitionId进行排序(面向不需要按Key进行排序的操作),也可以根据partitionId+Key进行排序(面向需要按Key进行排序的操作),从而在Shuffle Write阶段可以进行聚合、排序和分区。

5.PartitionedPairBuffer原理

PartitionedPairBuffer本质上是一个基于内存+磁盘的Array,随着数据添加,不断地扩容,当到达内存限制时,就将Array中的数据按照partitionId或partitionId+Key进行排序,然后spill到磁盘上,该过程可以进行多次,最后对内存中和磁盘上的数据进行全局排序,输出或者提供给下一个操作。

http://www.lryc.cn/news/622881.html

相关文章:

  • 亚马逊S3的使用简记(游戏资源发布更新)
  • 后台管理系统-4-vue3之pinia实现导航栏按钮控制左侧菜单栏的伸缩
  • 二进制为什么使用记事本读取会出乱码
  • 密码学入门笔记4:分组密码常见算法1——DES
  • Custom SRP - Baked Light
  • 用Pygame开发桌面小游戏:从入门到发布
  • 搜索 AI 搜索 概率论基础教程第3章条件概率与独立性(二)
  • 概率论基础教程第3章条件概率与独立性(一)
  • 《P4180 [BJWC2010] 严格次小生成树》
  • [极客时间]LangChain 实战课 ----- 代理(上)|(12)ReAct框架,推理与行动的协同
  • Manus AI与多语言手写识别的技术突破与行业变革
  • 《Python学习之字典(一):基础操作与核心用法》
  • 【每日一题】Day5
  • 电路设计——复位电路
  • 设计模式之静态代理
  • Java 10 新特性及具体应用
  • ABB焊接机器人弧焊省气
  • 多机编队——(6)解决机器人跟踪过程中mpc控制转圈问题
  • 【轨物方案】预防性运维:轨物科技用AI+机器人重塑光伏电站价值链
  • MyBatis极速通关中篇:核心配置精讲与复杂查询实战
  • 大模型教机器人叠衣服:2025年”语言理解+多模态融合“的智能新篇
  • Tomcat架构深度解析:从Server到Servlet的全流程揭秘
  • blender制作动画导入unity两种方式
  • ENSP的简单动态路由rip协议配置
  • 广东省省考备考(第七十八天8.16)——资料分析、判断推理(强化训练)
  • Docker目录的迁移
  • GaussDB 数据库架构师修炼(十三)安全管理(3)-行级访问控制
  • 6JSON格式转python并实现数据可视化
  • 在ubuntu系统上离线安装jenkins的做法
  • 零基础学习人工智能的完整路线规划