当前位置: 首页 > news >正文

flink的异常concurrent.TimeoutException: Heartbeat of TaskManager with id的解决

背景

在使用flink进行集成测试时,我们会使用MiniClusterWithClientResource类,但是当我们断点导致在某个方法执行的时间比较长时,会有错误发生,那么该如何解决这个错误呢?

处理concurrent.TimeoutException: Heartbeat of TaskManager with id错误

其实关键的配置是heartbeat.timeout,这个错误是JobManager抛出的,意思是和某个TaskManager的心跳中断超过了指定的时间,我们把这个参数配置到MiniClusterWithClientResource类中就可以了,代码如下所示:

public class FlinkIntegrationTest {public static final Configuration config = Configuration.fromMap(new HashMap<String, String>() {{put("heartbeat.timeout", "300000");}});@ClassRulepublic static MiniClusterWithClientResource flinkCluster =new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberSlotsPerTaskManager(1).setNumberTaskManagers(3).build());@Testpublic void testStateFlatMap() throws Exception {StatefulFlatMap statefulFlatMap = new StatefulFlatMap();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// configure your test environmentenv.setParallelism(2);// values are collected in a static variableCollectSink.values.clear();// create a stream of custom elements and apply transformationsenv.fromElements("world", "hi").keyBy(e -> "1").flatMap(statefulFlatMap).addSink(new CollectSink());// executeenv.execute();// verify your resultsassertTrue(CollectSink.values.containsAll(Lists.newArrayList("hello world", "hello hi world")));}@Testpublic void testStateFlatMap1() throws Exception {StatefulFlatMap statefulFlatMap = new StatefulFlatMap();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// configure your test environmentenv.setParallelism(2);// values are collected in a static variableCollectSink.values.clear();// create a stream of custom elements and apply transformationsenv.fromElements("world", "hi", "world").keyBy(e -> e).flatMap(statefulFlatMap).addSink(new CollectSink());// executeenv.execute();// verify your resultsassertTrue(CollectSink.values.containsAll(Lists.newArrayList("hello world", "hello hi", "hello world world")));}// create a testing sinkprivate static class CollectSink implements SinkFunction<String> {// must be staticpublic static final List<String> values = Collections.synchronizedList(new ArrayList<>());@Overridepublic void invoke(String value, Context context) throws Exception {values.add(value);}}}
http://www.lryc.cn/news/241324.html

相关文章:

  • 火电安全事故vr模拟仿真培训强交互更真实
  • ELK企业级日志分析平台
  • .NET面试题1
  • mongodb 日志详情
  • Oracle中文显示???????解决办法
  • Java查询数据放入word模板中并在前端导出下载
  • HarmonyOS ArkTS 应用添加弹窗(八)
  • 排序算法-----快速排序(非递归实现)
  • el-input限制输入整数等分析
  • 医院手术麻醉信息系统全套源码,自主版权,支持二次开发
  • canvas扩展001:利用fabric绘制图形,可以平移,旋转,放缩
  • 什么是机器学习
  • 电子桌牌如何赋能数字化会务?以深圳程序员节为例。
  • 打包和部署Java应用程序:Maven和Shell脚本的实用方法
  • Windows Python3安装salt模块失败处理
  • RabbitMQ 消息队列编程
  • 基于安卓android微信小程序的个人管理小程序
  • 免费图书教材配套资料:Spark大数据技术与应用(第2版)
  • SecureCRT9汉化版安装
  • 【VSCode】VSCode 使用
  • 【ARM 嵌入式 编译系列 2.2 -- 如何在Makefile 中添加编译时间 | 编译作者| 编译 git id】
  • 海康威视监控相机的SDK与opencv调用(非工业相机)
  • VUE项目部署过程中遇到的错误:POST http://124.60.11.183:9090/test/login 405 (Not Allowed)
  • MongoDB——索引(单索引,复合索引,索引创建、使用)
  • ebpf实战(一)-------监控udp延迟
  • 中西部各省市翻译协会、公关协会会长金秋圆桌会议圆满结束
  • 极盾故事|“五步”构建某三甲医院数据安全管理集成平台
  • 【开题报告】基于uni-app的恋爱打卡app的设计与实现
  • Python 2.7 在 Debian 服务器上获取 URL 时的 SSL 验证失败问题与解决方案
  • 导出文件到指定路径??