当前位置: 首页 > news >正文

Flink CDC 3.0 Starrocks建表失败会导致任务卡主!

Flink CDC 3.0 Starrocks建表失败会导致任务卡主!

现象

StarRocks建表失败,然后任务自动重启,重启完毕后数据回放,jobMaster打印下面日志后,整个任务会卡主

There are already processing requests. Wait for processing

原因分析

前提概要:可以先阅读CDC表变更处理流程然后再读下面会更加清晰

涉及类包括SchemaRegistrySchemaOperatorStarRocksMetadataApplier

SchemaRegistry->handleEventFromOperator方法执行建表失败后会导致任务重启,但是jobMaster不会重启,因此SchemaRegistry.requestHandler.pendingSchemaChanges无法删除导致任务卡主!

public void flushSuccess(TableId tableId, int sinkSubtask) {flushedSinkWriters.add(sinkSubtask);if (flushedSinkWriters.equals(activeSinkWriters)) {LOG.info("All sink subtask have flushed for table {}. Start to apply schema change.",tableId.toString());PendingSchemaChange waitFlushSuccess = pendingSchemaChanges.get(0);//执行表结构变更操作!applySchemaChange(tableId, waitFlushSuccess.getChangeRequest().getSchemaChangeEvent());waitFlushSuccess.getResponseFuture().complete(wrap(new ReleaseUpstreamResponse()));if (RECEIVED_RELEASE_REQUEST.equals(waitFlushSuccess.getStatus())) {//异常会跳过删除pendingSchame!startNextSchemaChangeRequest();}}
}
//删除pendingSchemaChanges中已经完成的pendingSchame
private void startNextSchemaChangeRequest() {this.pendingSchemaChanges.remove(0);this.flushedSinkWriters.clear();...
}public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(SchemaChangeRequest request) {//历史pendingSchame未删除导致,卡主if (pendingSchemaChanges.isEmpty()) {LOG.info("Received schema change event request from table {}. Start to buffer requests for others.",request.getTableId().toString());if (request.getSchemaChangeEvent() instanceof CreateTableEvent&& schemaManager.schemaExists(request.getTableId())) {return CompletableFuture.completedFuture(wrap(new SchemaChangeResponse(false)));}CompletableFuture<CoordinationResponse> response =CompletableFuture.completedFuture(wrap(new SchemaChangeResponse(true)));schemaManager.applySchemaChange(request.getSchemaChangeEvent());pendingSchemaChanges.add(new PendingSchemaChange(request, response));pendingSchemaChanges.get(0).startToWaitForReleaseRequest();return response;} else {LOG.info("There are already processing requests. Wait for processing.");CompletableFuture<CoordinationResponse> response = new CompletableFuture<>();pendingSchemaChanges.add(new PendingSchemaChange(request, response));return response;}
}

解决办法

  1. 让建表执行成功
  2. catch住异常,将schame删除后再异常重启(未验证)
http://www.lryc.cn/news/307190.html

相关文章:

  • 基于 LVGL 使用 SquareLine Studio 快速设计 UI 界面
  • Selenium IDE插件录制网页,解放双手
  • 【LeetCode】【滑动窗口长度不固定】978 最长湍流子数组
  • 水库安全监测方案(福建地区水库安全监测案例分享)
  • Oracle内存计算应用模式
  • ELK日志系统
  • C++:list容器(非原生指针迭代器的实现)
  • 抖音视频批量下载软件|视频评论采集工具
  • Oracle RMAN 备份恢复
  • 【MySQL】学习和总结联合查询
  • Flink应用场景
  • 产品渲染3D效果图一张多少钱,哪个平台更有性价比?
  • 云原生之容器编排实践-ruoyi-cloud项目部署到K8S:MySQL8
  • go interface{} 和string的转换问题
  • 【Git教程】(三)提交详解 —— add、commit、status、stach命令的说明,提交散列值与历史,多次提交及忽略 ~
  • vue3个人网站电子宠物
  • 2.22 作业
  • office word保存pdf高质量设置
  • 微服务设计模式
  • 10.网络游戏逆向分析与漏洞攻防-游戏网络架构逆向分析-接管游戏发送数据的操作
  • 将SU模型导入ARCGIS,并获取高度信息,多面体转SHP文件(ARCMAP)
  • 【电子通识】为什么单片机芯片上会有多组VDD电源?
  • 跟我学C++中级篇——单实例和静态化
  • 下载 axios.js 文件到本地【linux】
  • 一些matlab的常用用法。在MATLAB中,如何实现数据的导入和导出?
  • 数学建模【插值与拟合】
  • 汽修专用产品---选型介绍 汽修示波器 汽车示波器 汽车电子 汽修波形 汽车传感器波形 汽车检测
  • 如何将简历项目部署到自己的域名下
  • Redisson - 实现Java的Redis分布式和可扩展解决方案
  • 如何利用EXCEL批量插入图片