当前位置: 首页 > news >正文

Spark UI中 Shuffle Exchange 和 BroadcastExchange 中的 dataSize 值为什么不一样

背景

Spark 3.5
最近在看Spark UI 上的一些指标看到一个很有意思的东西, 相邻的Shuffle Exechange 和 BroadcastExechange 中的 datasize 居然不一样,
前者为 765KB, 后者为 64.5MB。差别还不少,中间就增加了一个 AQEShuffleRead 计划

结论

Shuffle Exechange 中的是真实 UnsafeRow的大小
BroadcastExechange 中的是 MemoryBlock 类型数据结构所占的大小 ,而不是UnsafeRow的大小。
BroadcastExechange中的datasize大小 和 2的整数倍接近。

现象以及分析

上图:
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

两个同样的 ShuffleExechange 记录条数和 ShuffleExechange 中 datasize 大小不一样,而在BroadcastExechange 中 dataSize 大小却是一样的(都是64.5MB)
关于 ShuffleExchange中的 dataSize的计算可以参考:Spark UI中Shuffle dataSize 和shuffle bytes written 指标区别,这里重点分析一下后者.
直接看BroadcastExechange代码:

  override lazy val relationFuture: Future[broadcast.Broadcast[Any]] = {SQLExecution.withThreadLocalCaptured[broadcast.Broadcast[Any]](session, BroadcastExchangeExec.executionContext) {try {// Setup a job tag here so later it may get cancelled by tag if necessary.sparkContext.addJobTag(jobTag)sparkContext.setInterruptOnCancel(true)val beforeCollect = System.nanoTime()// Use executeCollect/executeCollectIterator to avoid conversion to Scala typesval (numRows, input) = child.executeCollectIterator()...val relation = mode.transform(input, Some(numRows))val dataSize = relation match {case map: HashedRelation =>map.estimatedSizecase arr: Array[InternalRow] =>arr.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sumcase _ =>throw new SparkException("[BUG] BroadcastMode.transform returned unexpected " +s"type: ${relation.getClass.getName}")}longMetric("dataSize") += dataSize

其中child.executeCollectIterator() 是在把数据从各个 Executor 收集到 Driver 端来,便于进行广播操作。
最主要的是 mode.transform(input, Some(numRows)),这里的数据流如下:


HashedRelationBroadcastMode.transform||\/
HashedRelation.apply(rows, key, numRows.toInt, isNullAware = isNullAware)||\/
UnsafeHashedRelation.apply(input, key, sizeEstimate, mm, isNullAware, allowsNullKey,ignoresDuplicatedKey)||\/
new UnsafeHashedRelation(key.size, numFields, binaryMap)

最终调用的 UnsafeHashedRelation.estimatedSize的方法:

  override def estimatedSize: Long = binaryMap.getTotalMemoryConsumption

getTotalMemoryConsumptiondataPages所占用的大小再加上longArray的大小:

  public long getTotalMemoryConsumption() {long totalDataPagesSize = 0L;for (MemoryBlock dataPage : dataPages) {totalDataPagesSize += dataPage.size();}return totalDataPagesSize + ((longArray != null) ? longArray.memoryBlock().size() : 0L);}

那么 BytesToBytesMap 是怎么分配的呢?如下:

    val binaryMap = new BytesToBytesMap(taskMemoryManager,// Only 70% of the slots can be used before growing, more capacity help to reduce collision(sizeEstimate * 1.5 + 1).toInt,pageSizeBytes)

默认的PageSize值为:defaultPageSizeBytes:

  private lazy val defaultPageSizeBytes = {val minPageSize = 1L * 1024 * 1024   // 1MBval maxPageSize = 64L * minPageSize  // 64MBval cores = if (numCores > 0) numCores else Runtime.getRuntime.availableProcessors()// Because of rounding to next power of 2, we may have safetyFactor as 8 in worst caseval safetyFactor = 16val maxTungstenMemory: Long = tungstenMemoryMode match {case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.poolSizecase MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.poolSize}val size = ByteArrayMethods.nextPowerOf2(maxTungstenMemory / cores / safetyFactor)val chosenPageSize = math.min(maxPageSize, math.max(minPageSize, size))if (Utils.isG1GC && tungstenMemoryMode == MemoryMode.ON_HEAP) {chosenPageSize - Platform.LONG_ARRAY_OFFSET} else {chosenPageSize}}

这个跟内存以及core有关。
当在进行val loc = binaryMap.lookup 以及loc.append操作的时候就会进行dataPage以及longArray的分配。而该size的大小并不是实际占用的大小,而是分配给该dataPage的大小。其实你会发现该datasize的大小几乎和2的倍数接近。

http://www.lryc.cn/news/285800.html

相关文章:

  • 阿里云优惠券领取入口、使用方法和限制条件,2024最新
  • 自己构建webpack+vue3+ts
  • 【AI】小白入门笔记
  • GPT应用开发:编写插件获取实时天气信息
  • 揭开Spring MVC的真面目
  • AI大模型开发架构设计(3)——如何打造自己的大模型
  • Linux C语言开发(三)运算符和表达式
  • Spring-AOP入门案例
  • 中仕教育:国考调剂和补录的区别是什么?
  • ESP32-TCP服务端(Arduino)
  • HCIA-HarmonyOS设备开发认证-序
  • Med-YOLO:3D + 医学影像 + 检测框架
  • Docker部署Golang服务
  • C#,字符串匹配(模式搜索)Sunday算法的源代码
  • makefile 编译动态链接库使用(.so库文件)
  • Hive 数仓及数仓设计方案
  • Ubuntu使用docker-compose安装redis
  • 大数据安全 | 期末复习(上)| 补档
  • Kylin 安装novnc 远程访问
  • 神经网络算法与逻辑回归:优势与差异
  • 【蓝桥杯冲冲冲】动态规划初步[USACO2006 OPEN] 县集市
  • C#,入门教程(30)——扎好程序的笼子,错误处理 try catch
  • 操作教程|JumpServer堡垒机结合Ansible进行批量系统初始化
  • 序列化VS反序列化
  • 新数智空间:阿里云边缘云持续保持中国公有云市场第一
  • 【开源】基于JAVA语言的陕西非物质文化遗产网站
  • C++(Qt)软件调试---静态分析工具clang-tidy(18)
  • 2401llvm,clang的重构引擎
  • 【C语言深度剖析——第四节(关键字4)】《C语言深度解剖》+蛋哥分析+个人理解
  • 鸿蒙开发系列教程(五)--ArkTS语言:组件开发