当前位置: 首页 > news >正文

SPARK outputDeterministicLevel的作用--任务全部重试或者部分重试

背景

目前spark的repartition()方法是随机分配数据到下游,这会导致一个问题,有时候如果我们用repartition方法的时候,如果任务发生了重试,就有可能导致任务的数据不准确,那这个时候改怎么解决这个问题呢?

分析

在Spark RDD中存在着名为outputDeterministicLevel的变量,如下:

private[spark] final lazy val outputDeterministicLevel: DeterministicLevel.Value = {if (isReliablyCheckpointed) {DeterministicLevel.DETERMINATE} else {getOutputDeterministicLevel}}

那么该变量的作用是什么呢?让我们分析一下:
改变量最终会被StageisIndeterminate方法调用:

 def isIndeterminate: Boolean = {rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE}

而该方法会被DAGScheduler调用,有两处地方会被调用:

  • submitMissingTasks中调用
   private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {logDebug("submitMissingTasks(" + stage + ")")// Before find missing partition, do the intermediate state clean work first.// The operation here can make sure for the partially completed intermediate stage,// `findMissingPartitions()` returns all partitions every time.stage match {case sms: ShuffleMapStage if stage.isIndeterminate && !sms.isAvailable =>mapOutputTracker.unregisterAllMapOutput(sms.shuffleDep.shuffleId)case _ =>}

该方法主要用于在重新提交失败的stage时候,用来判断是否需要重新计算上游的所有任务。

  • handleTaskCompletion中调用
      case FetchFailed(bmAddress, shuffleId, _, mapIndex, _, failureMessage) =>。。。val noResubmitEnqueued = !failedStages.contains(failedStage)failedStages += failedStagefailedStages += mapStageif (noResubmitEnqueued) {// If the map stage is INDETERMINATE, which means the map tasks may return// different result when re-try, we need to re-try all the tasks of the failed// stage and its succeeding stages, because the input data will be changed after the// map tasks are re-tried.// Note that, if map stage is UNORDERED, we are fine. The shuffle partitioner is// guaranteed to be determinate, so the input data of the reducers will not change// even if the map tasks are re-tried.if (mapStage.isIndeterminate) {

这里如果任务Fetch失败了,根据该shuffle所对应的上游stage是不是isIndeterminate来向DAGScheduler提交ResubmitFailedStages事件,从而调用submitMissingTasks方法进行上游所有任务或者单个任务的重试。

再回到outputDeterministicLevel变量,该变量会调用getOutputDeterministicLevel方法进行循环调用上游的outputDeterministicLevel变量来确定outputDeterministicLevel的值。

结论

所以根据以上分析,我们可以改写对应的RDD的outputDeterministicLevel变量或者getOutputDeterministicLevel方法来进行stage任务的全部重试与否

http://www.lryc.cn/news/14462.html

相关文章:

  • 图数据库中的 OLTP 与 OLAP 融合实践
  • Shader Graph简介
  • kubectl
  • 实验室设计SICOLAB第三方检测中心实验室设计
  • GPS经纬度转距离
  • 7-周赛333总结
  • 电子招标采购系统源码—互联网+招标采购
  • SQL注入和XSS攻击
  • js Map的使用
  • 企业应该怎么管理香港服务器?
  • 软件设计(十四)-UML建模(上)
  • 本地主机搭建服务器后如何让外网访问?快解析内网端口映射
  • Flink-Table API 和 SQL(基本API、流处理的表、时间属性和窗口、聚合查询、联结查询、函数、SQL客户端、连接到外部系统)
  • C++入门:数据抽象
  • WRF进阶:使用IO选项控制WRF变量输出/WRF指定变量输出添加/删除
  • 一文读懂功率放大器(功率放大器的特性是什么意思)
  • 微信小程序阻止页面返回(包滑动、自动返回键)
  • 视频直播美颜sdk的发展史
  • 【Mysql】存储过程
  • Day895.MySql误删数据还原方案 -MySQL实战
  • Java方法引用
  • C++教程之迭代器Iterator
  • 容联七陌:ChatGPT大模型能力为智能客服带来新方向
  • 【Linux 多线程同步】使用同步和互斥实现生产消费模型
  • 【TypeScript】TypeScript的接口和对象类型(interface):
  • 7、函数与异常
  • Julia 语言环境安装
  • 5.1 线程
  • 通讯录的实现
  • Urho3D导航