- noWatermarks
public class FlinkWaterMark throws Exception {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);WatermarkStrategy<UserEvent2> watermark = WatermarkStrategy.noWatermarks();SingleOutputStreamOperator<UserEvent2> source = socketSource.assignTimestampsAndWatermarks(watermark);env.execute();}
}
关于noWaterMarks()
的使用没有太多内容. - forMonotonousTimestamps
public class FlinkWaterMark throws Exception {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);WatermarkStrategy<String> watermark = WatermarkStrategy.<String>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<String>() {@Overridepublic long extractTimestamp(String element, long recordTimestamp) {String time = element.split(",")[0];long timestamp = Long.parseLong(time);return timestamp;}});SingleOutputStreamOperator<UserEvent2> source = socketSource.assignTimestampsAndWatermarks(watermark);env.execute();}
}
对于forMonotonousTimestamps()
可说内容并不多,如果选择了forMonotonousTimestamps
这种方式就必须保证事件时间严格有序,如果出现乱序的情况可能存在大量数据丢失的问题.
通过源码内容可以看到forMonotonousTimestamps
底层也是使用的forBoundedOutOfOrderness
方式,只不过将容错时间设置为了0
,源码如下:
public class AscendingTimestampsWatermarks<T> extends BoundedOutOfOrdernessWatermarks<T> {public AscendingTimestampsWatermarks() {super(Duration.ofMillis(0)); }
}
- forBoundedOutOfOrderness
public class FlinkWaterMark throws Exception {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);WatermarkStrategy<String> watermark = WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(2000)).withTimestampAssigner(new SerializableTimestampAssigner<String>() {@Overridepublic long extractTimestamp(String element, long recordTimestamp) {String time = element.split(",")[0];long timestamp = Long.parseLong(time);return timestamp;}});SingleOutputStreamOperator<UserEvent2> source = socketSource.assignTimestampsAndWatermarks(watermark);env.execute();}
}
对于允许乱序策略前面文章有介绍过其原理,比如代码中设置容错时间为2S
,那么前后的数据差最大只能是2S
,如果差值大于2S
,后来的这条数据就会被抛弃.