当前位置: 首页 > news >正文

spark shuffle 剖析

ShuffleExchangeExec
  private lazy val writeMetrics =SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)private[sql] lazy val readMetrics =SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)

用在了两个地方,承接的是前后两个stage 的metrics

  /*** A [[ShuffleDependency]] that will partition rows of its child based on* the partitioning scheme defined in `newPartitioning`. Those partitions of* the returned ShuffleDependency will be the input of shuffle.*/@transientlazy val shuffleDependency : ShuffleDependency[Int, InternalRow, InternalRow] = {val dep = ShuffleExchangeExec.prepareShuffleDependency(inputRDD,child.output,outputPartitioning,serializer,writeMetrics)metrics("numPartitions").set(dep.partitioner.numPartitions)val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics("numPartitions") :: Nil)dep}
  protected override def doExecute(): RDD[InternalRow] = {// Returns the same ShuffleRowRDD if this plan is used by multiple plans.if (cachedShuffleRDD == null) {cachedShuffleRDD = new ShuffledRowRDD(shuffleDependency, readMetrics)}cachedShuffleRDD}

在这里插入图片描述

一般情况是,两个metrics 相同。 write 在前,read 在后

如果下个shuffle read task 没有完成或者失败,就会出现read 比write 少的情况。

broadcast
  /** Remove all blocks belonging to the given broadcast. */def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean): Unit = {val future = driverEndpoint.askSync[Future[Seq[Int]]](RemoveBroadcast(broadcastId, removeFromMaster))future.failed.foreach(e =>logWarning(s"Failed to remove broadcast $broadcastId" +s" with removeFromMaster = $removeFromMaster - ${e.getMessage}", e))(ThreadUtils.sameThread)if (blocking) {// the underlying Futures will timeout anyway, so it's safe to use infinite timeout hereRpcUtils.INFINITE_TIMEOUT.awaitResult(future)}}
http://www.lryc.cn/news/241074.html

相关文章:

  • C语言之认识柔性数组(flexible array)
  • 【MATLAB基础绘图第17棒】绘制玫瑰图
  • Qt 基于海康相机的视频绘图
  • FlinkCDC实现主数据与各业务系统数据的一致性(瀚高、TIDB)
  • JSP:Servlet
  • react中的state
  • VR全景航拍要注意什么,航拍图片如何处理
  • Spark---集群搭建
  • Linux上通过SSL/TLS和start tls连接到LDAP服务器
  • 【华为OD题库-034】字符串化繁为简-java
  • 斯坦福大学引入FlashFFTConv来优化机器学习中长序列的FFT卷积
  • 信息系统项目管理师-干系人管理论文提纲
  • Windmill:最快的自托管开源工作流引擎
  • 线性代数 - 几何原理
  • 火电厂电气部分设计
  • 界面组件DevExpress Reporting v23.1 - Web报表设计器功能升级
  • 小程序Canvas 2D问题解决,如安卓drawImage不执行、动态高度设置、高度1365(或4096)限制等
  • 人工智能对网络安全的影响越来越大
  • JavaEE(SpringMVC)期末复习
  • 微服务保护 Sentinel
  • 【无标题】文本超过一行隐藏,鼠标经过显示提示框
  • 成为独立开发者有多难
  • C++ 正则表达式使用
  • VSCode任务tasks.json中的问题匹配器problemMatcher的问题匹配模式ProblemPattern详解
  • CSS 实现文本框签名
  • Spring 定时任务如何到达某一指定时间点后,触发任务机制
  • PDF Reader Pro 3.0.1.0(pdf阅读器)
  • 【rust:tauri-app踩坑记录】dangerousRemoteDomainIpcAccess 不适用于IP地址,临时解决方案
  • [Docker]八.Docker 容器跨主机通讯
  • 面试cast:reinterpret_cast/const_cast/static_cast/dynamic_cast