当前位置: 首页 > news >正文

Flink之Watermark策略代码模板

方式作用
WatermarkStrategy.noWatermarks()不生成watermark
WatermarkStrategy.forMonotonousTimestamps()紧跟最大事件时间watermark生成策略
WatermarkStrategy.forBoundedOutOfOrderness()允许乱序watermark生成策略
WatermarkStrategy.forGenerator()自定义watermark生成策略
  • noWatermarks
    public class FlinkWaterMark throws Exception {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 获取数据源DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);// 构造watermark生成策略,选择不生成watermarkWatermarkStrategy<UserEvent2> watermark = WatermarkStrategy.noWatermarks();// 将构造完成的watermark分配给数据流SingleOutputStreamOperator<UserEvent2> source = socketSource.assignTimestampsAndWatermarks(watermark);// ...env.execute();}
    }
    
    关于noWaterMarks()的使用没有太多内容.
  • forMonotonousTimestamps
    public class FlinkWaterMark throws Exception {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 获取数据源DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);// 构造watermark, 使用紧跟最大事件时间策略WatermarkStrategy<String> watermark = WatermarkStrategy.<String>forMonotonousTimestamps()// 抽取时间时间, 根据数据中实际情况选择.withTimestampAssigner(new SerializableTimestampAssigner<String>() {@Overridepublic long extractTimestamp(String element, long recordTimestamp) {/*** 这里是样例代码,实际情况根据具体业务具体数据特性抽取对应的时间**/String time = element.split(",")[0];long timestamp = Long.parseLong(time);return timestamp;}});// 将构造完成的watermark分配给数据流SingleOutputStreamOperator<UserEvent2> source = socketSource.assignTimestampsAndWatermarks(watermark);// ...env.execute();}
    }
    
    对于forMonotonousTimestamps()可说内容并不多,如果选择了forMonotonousTimestamps这种方式就必须保证事件时间严格有序,如果出现乱序的情况可能存在大量数据丢失的问题.
    通过源码内容可以看到forMonotonousTimestamps底层也是使用的forBoundedOutOfOrderness方式,只不过将容错时间设置为了0,源码如下:
    // 首先看这里,继承的BoundedOutOfOrdernessWatermarks
    public class AscendingTimestampsWatermarks<T> extends BoundedOutOfOrdernessWatermarks<T> {/** Creates a new watermark generator with for ascending timestamps. */public AscendingTimestampsWatermarks() {super(Duration.ofMillis(0)); // 这里将容错时间设置为了0}
    }
    
  • forBoundedOutOfOrderness
    public class FlinkWaterMark throws Exception {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 获取数据源DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);// 构造watermark, 使用允许水位线乱序策略,并设置最大容错时间为2sWatermarkStrategy<String> watermark = WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(2000))// 抽取时间时间, 根据数据中实际情况选择.withTimestampAssigner(new SerializableTimestampAssigner<String>() {@Overridepublic long extractTimestamp(String element, long recordTimestamp) {/*** 这里是样例代码,实际情况根据具体业务具体数据特性抽取对应的时间**/String time = element.split(",")[0];long timestamp = Long.parseLong(time);return timestamp;}});// 将构造完成的watermark分配给数据流SingleOutputStreamOperator<UserEvent2> source = socketSource.assignTimestampsAndWatermarks(watermark);// ...env.execute();}
    }
    
    对于允许乱序策略前面文章有介绍过其原理,比如代码中设置容错时间为2S,那么前后的数据差最大只能是2S,如果差值大于2S,后来的这条数据就会被抛弃.
http://www.lryc.cn/news/186569.html

相关文章:

  • ubuntu 安装postgresql,增加VECTOR向量数据库插件 踏坑详细流程
  • 基于Springboot实现影视影院订票选座管理系统【项目源码+论文说明】分享
  • mysql批量插入数据,跳过唯一索引报错
  • 论文阅读--Energy efficiency in heterogeneous wireless access networks
  • Redis的C客户端(hiredis库)使用
  • 光引擎、光模块、光器件之间的关系和区别
  • 【办公-excel】两个时间相减 (二) - 带毫秒的时间进行相减操作
  • 二次封装View Design的table组件,实现宽度自适应,内容在一行展示
  • Node.js代码漏洞扫描工具介绍——npm audit
  • node.js知识系列(3)-每天了解一点
  • Zabbix监控系统 自定义监控项、自动发现与自动注册
  • Python信号之分享
  • 环信web、uniapp、微信小程序SDK报错详解---登录篇
  • DAZ To UMA⭐五.模型在Blender中的配置教程
  • 网络安全工具汇总
  • day-65 代码随想录算法训练营(19)图论 part 04
  • C++ - 完美语义(右值引用的中篇) - lambda表达式
  • 常见排序算法详解
  • 监控搭建-Prometheus
  • 指纹浏览器开发指南-EasyBR
  • qml入门
  • 一文熟练使用python修改Excel中的数据
  • java Spring Boot在配置文件中关闭热部署
  • 【物联网】Arduino+ESP8266物联网开发(一):开发环境搭建 安装Arduino和驱动
  • 自定义UI对象转流程节点
  • P1-P5_动手学深度学习-pytorch(李沐版,粗浅的笔记)
  • Android Studio修改模拟器AVD Manger目录
  • STM32--MQ2烟雾传感器
  • GitHub要求开启2FA,否则不让用了。
  • Python 编程基础 | 第三章-数据类型 | 3.6、元组