当前位置: 首页 > news >正文

flink如何支持kafka容灾自动切换

背景

在flink消费kafka消息时,我们会指定连接的kafka服务器的地址以及起始消费偏移等信息,一旦指定,当kafka服务器挂掉后,flink也会由于连接不上服务器而导致失败,这里想要解决的问题是当kafka在机房A挂掉后,如果机房B有对kafka进行容灾的频道,那么flink怎么可以做到自动切换到机房B的进行kafka消费?同理,当机房A数据恢复后,如何自动切回到机房A进行消费?这个过程自动发生而不需要手动修改kafka的地址

技术实现

flink消费kafka的实现类是FlinkKafkaConsumerBase,这个类内部有一个功能:可以自动发现满足某个规则的kafka主题并消费,其关键代码如下:

private void createAndStartDiscoveryLoop(AtomicReference<Exception> discoveryLoopErrorRef) {discoveryLoopThread =new Thread(() -> {try {// --------------------- partition discovery loop// ---------------------// throughout the loop, we always eagerly check if we are still// running before// performing the next operation, so that we can escape the loop as// soon as possiblewhile (running) {if (LOG.isDebugEnabled()) {LOG.debug("Consumer subtask {} is trying to discover new partitions ...",getRuntimeContext().getIndexOfThisSubtask());}final List<KafkaTopicPartition> discoveredPartitions;try {discoveredPartitions =partitionDiscoverer.discoverPartitions();} catch (AbstractPartitionDiscoverer.WakeupException| AbstractPartitionDiscoverer.ClosedException e) {// the partition discoverer may have been closed or woken up// before or during the discovery;// this would only happen if the consumer was canceled;// simply escape the loopbreak;}// no need to add the discovered partitions if we were closed// during the meantimeif (running && !discoveredPartitions.isEmpty()) {kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);}// do not waste any time sleeping if we're not running anymoreif (running && discoveryIntervalMillis != 0) {try {Thread.sleep(discoveryIntervalMillis);} catch (InterruptedException iex) {// may be interrupted if the consumer was canceled// midway; simply escape the loopbreak;}}}} catch (Exception e) {discoveryLoopErrorRef.set(e);} finally {// calling cancel will also let the fetcher loop escape// (if not running, cancel() was already called)if (running) {cancel();}}},"Kafka Partition Discovery for "+ getRuntimeContext().getTaskNameWithSubtasks());discoveryLoopThread.start();}

如上所示,他是通过开启一个线程,然后定时检测的方式来发现是否有新的符合规则条件的主题,如果有添加到消费队列中,读者会不会很好奇,我们讨论的是flink如何对kafka进行容灾切换,你和我说这个主题自动发现做什么?
其实这里想表达的是一样的思路,我们进行kafka容灾的切换也是可以这样做,我们开启一个线程,然后线程里面不停的检测当前消费的kafka集群的连通性是否正常,如果连接不上,那么表明发生了kafka的机房容灾,flink需要切换到kafka的机房B进行消费,那么这里剩下的就只是如何确定消费的偏移量的问题了

http://www.lryc.cn/news/573685.html

相关文章:

  • C++,Qt事件处理机制编程开发练习全解析,23000字解析!!
  • 二、Generative adversarial network (GAN)
  • 深入理解Spring MVC:构建灵活Web应用的基石
  • Elasticsearch Kibana (一)
  • React纯函数和hooks原理
  • 开发语言本身只是提供了一种解决问题的工具
  • Qt应用中处理Linux信号:实现安全退出的技术指南
  • 对射式红外传感器计次旋转编码器计次
  • 消息队列:基本知识
  • day039-nginx配置补充
  • VSCode性能调优:从卡顿到丝滑的终极方案
  • React 核心原理与Fiber架构
  • java中关于异步转同步的一些解决方案的对比与思考。【spring mvc堵塞式】
  • 【前后前】导入Excel文件闭环模型:Vue3前端上传Excel文件,【Java后端接收、解析、返回数据】,Vue3前端接收展示数据
  • 华为云Flexus+DeepSeek征文|在Dify-LLM平台中开发童话故事精灵工作流AI Agent
  • 【DDD】——带你领略领域驱动设计的独特魅力
  • C4.5算法深度解析:决策树进化的里程碑
  • 《HTTP权威指南》 第7章 缓存
  • mysql join的原理及过程
  • C++法则10:引用本身是一个“别名”(alias),一旦绑定到一个对象后,就不能再重新绑定到其他对象。
  • 【递归,搜索与回溯算法】记忆化搜索(二)
  • 如何处理RocketMQ的各种线上问题
  • 【Python学习笔记】报错:Unindent amount does not match previous indent
  • Spring Boot 项目初始化
  • AWS 使用图形化界面创建 EKS 集群(零基础教程)
  • LabVIEW图像拼接原理与实现 链接附件有演示录像
  • 如何用AI开发完整的小程序<9>—UI自适应与游戏页优化
  • 关于uniapp解析SSE响应数据的处理
  • 【学习笔记】深入理解Java虚拟机学习笔记——第11章 后端编译与优化
  • 关于CH32开发板烧录说明