当前位置: 首页 > news >正文

源码解析FlinkKafkaConsumer支持周期性水位线发送

背景

当flink消费kafka的消息时,我们经常会用到FlinkKafkaConsumer进行水位线的发送,本文就从源码看下FlinkKafkaConsumer.assignTimestampsAndWatermarks指定周期性水位线发送的流程

FlinkKafkaConsumer水位线发送

1.首先从Fetcher类开始,创建Fetcher类的时候会构建一个周期性的水位线发送线程并启动

        // if we have periodic watermarks, kick off the interval schedulerif (timestampWatermarkMode == WITH_WATERMARK_GENERATOR && autoWatermarkInterval > 0) {PeriodicWatermarkEmitter<T, KPH> periodicEmitter =new PeriodicWatermarkEmitter<>(checkpointLock,subscribedPartitionStates,watermarkOutputMultiplexer,processingTimeProvider,autoWatermarkInterval);periodicEmitter.start();}

2.随后,PeriodicWatermarkEmitter中注册处理时间定时器,周期性执行

        public void start() {timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);}@Overridepublic void onProcessingTime(long timestamp) {synchronized (checkpointLock) {for (KafkaTopicPartitionState<?, ?> state : allPartitions) {// 这里当前算子任务消费的kafka 分区分别记录每个分区的水位值state.onPeriodicEmit();}//这里当前算子会把自己消费的kafka分区的所有水位线取最小值后当成当前算子任务自身的水位线发送出去,注意这里是当前算子任务级别的watermarkOutputMultiplexer.onPeriodicEmit();}// schedule the next watermarktimerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);}}

3.对应state.onPeriodicEmit();记录每个kafka分区的水位线方法

    @Overridepublic void onPeriodicEmit(WatermarkOutput output) {final org.apache.flink.streaming.api.watermark.Watermark next = wms.getCurrentWatermark();if (next != null) {output.emitWatermark(new Watermark(next.getTimestamp()));}}
其中 WatermarkOutput output.emitWatermark(new Watermark(next.getTimestamp()))代码如下:public DeferredOutput(OutputState state) {this.state = state;}@Overridepublic void emitWatermark(Watermark watermark) {state.setWatermark(watermark.getTimestamp());}
所以这里最终效果只是对应state(kafka分区[注意,一个算子任务有可能消费好几个kafka分区])上设置了水位线/*** Returns true if the watermark was advanced, that is if the new watermark is larger than* the previous one.** <p>Setting a watermark will clear the idleness flag.*/public boolean setWatermark(long watermark) {this.idle = false;final boolean updated = watermark > this.watermark;// 这里也可以看出来,即使代码里面发送了更小值的水位线,水位线也不会回退this.watermark = Math.max(watermark, this.watermark);return updated;}        

4.对应算子任务组合当前任务消费的所有分区水位线的方法

private void updateCombinedWatermark() {long minimumOverAllOutputs = Long.MAX_VALUE;boolean hasOutputs = false;boolean allIdle = true;for (OutputState outputState : watermarkOutputs) {if (!outputState.isIdle()) {minimumOverAllOutputs = Math.min(minimumOverAllOutputs, outputState.getWatermark());allIdle = false;}hasOutputs = true;}// if we don't have any outputs minimumOverAllOutputs is not valid, it's still// at its initial Long.MAX_VALUE state and we must not emit that// 如果算子任务不消费任何分区,它不会发出任何水位线,这里是不是就是kafka消费者要小于kafka主题的原因所在???if (!hasOutputs) {return;}if (allIdle) {// 如果当前算子任务处于空闲时间,标识空闲,以便后续算子可以继续推进underlyingOutput.markIdle();} else if (minimumOverAllOutputs > combinedWatermark) {combinedWatermark = minimumOverAllOutputs;underlyingOutput.emitWatermark(new Watermark(minimumOverAllOutputs));}}```
http://www.lryc.cn/news/192983.html

相关文章:

  • Nginx:动静分离(示意图+配置讲解)
  • 通讯网关软件024——利用CommGate X2Access实现Modbus TCP数据转储Access
  • vim工具的使用
  • Docker学习_存储篇
  • 微信小程序获取当前日期时间
  • Unity关键词语音识别
  • SpringBoot的配置文件——.yml和.properties
  • Retrieve Anything To Augment Large Language Models
  • 什么是面向对象编程
  • c++视觉处理----固定阈值操作:Threshold()函数,实时处理:二值化,反二值化,截断,设为零,反向设为零
  • KWin、libdrm、DRM从上到下全过程 —— drmModeAddFBxxx(8)
  • 【问题解决】Ubuntu 安装 SeisSol 依赖 easi 报错解决: undefined reference to `H5free_memory‘
  • 循环小数(Repeating Decimals, ACM/ICPC World Finals 1990, UVa202)rust解法
  • [GAMES101]透视投影变换矩阵中为什么需要改变z值
  • sklearn处理离散变量的问题——以决策树为例
  • QT 数据库表格----QSqlTableModel
  • Vue_Bug Failed to fetch extension, trying 4 more times
  • 缩短从需求到上线的距离:集成多种工程实践的稳定框架 | 开源日报 No.55
  • 基于秃鹰优化的BP神经网络(分类应用) - 附代码
  • C++笔记之std::future的用法
  • openssl学习——消息认证码原理
  • Netty使用SslHandler实现加密通信-单向认证篇
  • Jetpack:007-Kotlin中的Button
  • opencv图形绘制2
  • “华为杯”研究生数学建模竞赛2019年-【华为杯】A题:无线智能传播模型(附优秀论文及Pyhton代码实现)(续)
  • 爬虫 | 正则、Xpath、BeautifulSoup示例学习
  • nginx的location的优先级和匹配方式
  • 深入了解Spring Boot Actuator
  • 【SQL】NodeJs 连接 MySql 、MySql 常见语句
  • SSH 基础学习使用