当前位置: 首页 > news >正文

Spark UI中Shuffle dataSize 和shuffle bytes written 指标区别

背景

本文基于Spark 3.1.1
目前在做一些知识回顾的时候,发现了一些很有意思的事情,就是Spark UI中ShuffleExchangeExec 的dataSize和shuffle bytes written指标是不一样的,
那么在AQE阶段的时候,是以哪个指标来作为每个Task分区大小的参考呢

结论

先说结论 dataSzie指标是 是存在内存中的UnsafeRow 的大小的总和,AQE阶段(规则OptimizeSkewedJoin/CoalesceShufflePartitions)用到判断分区是否倾斜或者合并分区的依据是来自于这个值,
shuffle bytes written指的是写入文件的字节数,会区分压缩和非压缩,如果在开启了压缩(也就是spark.shuffle.compress true)和未开启压缩的情况下,该值的大小是不一样的。
开启压缩如下:
在这里插入图片描述
未开启压缩如下:
在这里插入图片描述

先说杂谈

这两个指标的值都在 ShuffleExchangeExec中:

case class ShuffleExchangeExec(override val outputPartitioning: Partitioning,child: SparkPlan,shuffleOrigin: ShuffleOrigin = ENSURE_REQUIREMENTS)extends ShuffleExchangeLike {private lazy val writeMetrics =SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)private[sql] lazy val readMetrics =SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)override lazy val metrics = Map("dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size")) ++ readMetrics ++ writeMetrics

dataSize指标来自于哪里

涉及到datasize的数据流是怎么样的如下,一切还是得从ShuffleMapTask这个shuffle的起始操作讲起:

ShuffleMapTask||\/
runTask||\/
dep.shuffleWriterProcessor.write //这里的shuffleWriterProcessor是来自于 ShuffleExchangeExec中的createShuffleWriteProcessor||\/
writer.write()  //这里是writer 是 UnsafeShuffleWriter类型的实例||\/
insertRecordIntoSorter||\/
UnsafeRowSerializerInstance.writeValue||\/
dataSize.add(row.getSizeInBytes)

这里的 rowUnsafeRow的实例,这样就获取到了实际内存中的每个分区的大小,
而ShuffleMapTask runTask 方法最终返回的是MapStatus,而该MapStatus最终是在UnsafeShuffleWriter的closeAndWriteOutput方法中被赋值的:

void closeAndWriteOutput() throws IOException {assert(sorter != null);updatePeakMemoryUsed();serBuffer = null;serOutputStream = null;final SpillInfo[] spills = sorter.closeAndGetSpills();sorter = null;final long[] partitionLengths;try {partitionLengths = mergeSpills(spills);} finally {for (SpillInfo spill : spills) {if (spill.file.exists() && !spill.file.delete()) {logger.error("Error while deleting spill file {}", spill.file.getPath());}}}mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId);}

shuffle bytes written指标来自哪里

基本流程和dataSize 一样,还是来自于ShuffleMapTask

ShuffleMapTask||\/
runTask||\/
dep.shuffleWriterProcessor.write //这里的shuffleWriterProcessor是来自于 ShuffleExchangeExec中的createShuffleWriteProcessor||\/
writer.write()  //这里是writer 是 UnsafeShuffleWriter类型的实例||\/
closeAndWriteOutput||\/
sorter.closeAndGetSpills() ->  writeSortedFile -> writer.commitAndGet -> writeMetrics.incBytesWritten(committedPosition - reportedPosition) -> serializerManager.wrapStream(blockId, mcs) // 这里进行了压缩||\/
mergeSpills||\/
mergeSpillsUsingStandardWriter||\/
mergeSpillsWithFileStream -> writeMetrics.incBytesWritten(numBytesWritten)||\/
writeMetrics.decBytesWritten(spills[spills.length - 1].file.length())
http://www.lryc.cn/news/212962.html

相关文章:

  • Java——Map.getOrDefault方法详解
  • 银河集团香港优才计划95分获批案例展示!看看是如何申请的?
  • Python class中以`_`开头的类特殊方法
  • 2023云栖大会开幕:全球数万开发者参会,展现AI时代的云计算创新
  • [量化投资-学习笔记004]Python+TDengine从零开始搭建量化分析平台-EMA均线
  • KaiwuDB 获山东省工信厅“信息化应用创新优秀解决方案”奖
  • Python-常用的量化交易代码片段
  • Netty优化-rpc
  • 【Docker 内核详解】cgroups 资源限制(一):概念、作用、术语
  • MATLAB——一维小波的多层分解
  • C++的拷贝构造函数
  • 【手机端远程连接服务器】安装和配置cpolar+JuiceSSH:实现手机端远程连接服务器
  • Jupyter Notebook的使用
  • vue 使用vue-office预览word、excel,pdf同理
  • 【Spring Boot 源码学习】RedisAutoConfiguration 详解
  • Linux中如何进行粘贴复制
  • 多输入多输出 | Matlab实现k-means-LSTM(k均值聚类结合长短期记忆神经网络)多输入多输出组合预测
  • 学习笔记3——JVM基础知识
  • 图像处理:图片二值化学习,以及代码中如何实现
  • 如果你点击RabbitMQ Service - start了,但http://localhost:15672/#/还是访问不了,那么请看这篇博客!
  • Shell 脚本学习 day01
  • esp32 rust linux
  • 一文了解Elasticsearch
  • 一篇文章认识【性能测试】
  • linux环境mysql安装配置踩坑
  • 相关性网络图 | 热图中添加显著性
  • cocosCreator 之 微信小游戏授权设置和调用wxAPI获取用户信息
  • element ui el-table表格纵向横向滚动条去除并隐藏空白占位列
  • 防止python进程重复执行
  • LV.12 D13 C工程与寄存器封装 学习笔记