当前位置: 首页 > news >正文

Flink -- window(窗口)

1、窗口主要分成三大种:
        1、Time Window (时间窗口):固定时间触发一次窗口

                a、SlidingEventTimeWindows: 滑动的事件时间窗口

public class Demo1TImeWindow {public static void main(String[] args) throws Exception {/*** 时间窗口:由时间触发的窗口* SlidingEventTimeWindows: 滑动的事件时间窗口* SlidingProcessingTimeWindows:滑动的处理时间窗口* TumblingEventTimeWindows:滚动的事件时间窗口* TumblingProcessingTimeWindows:滚动的处理时间窗口*/StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> linesDS = env.socketTextStream("master", 8888);//解析数据,取出时间字段DataStream<Tuple2<String, Long>> wordAndTsDS = linesDS.map(line -> {String[] split = line.split(",");String word = split[0];//将时间戳转换成long类型long ts = Long.parseLong(split[1]);return Tuple2.of(word, ts);}, Types.TUPLE(Types.STRING, Types.LONG));//设置时间字段和水位线DataStream<Tuple2<String, Long>> assDS = wordAndTsDS.assignTimestampsAndWatermarks(WatermarkStrategy//水位线前移1秒.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner((kv, ts) -> kv.f1));/*** 每隔5秒统计最近10秒单词的数量*/SingleOutputStreamOperator<Tuple2<String, Integer>> kvDS = assDS.map(kv -> Tuple2.of(kv.f0, 1), Types.TUPLE(Types.STRING, Types.INT));kvDS.keyBy(kv -> kv.f0)//滑动的事件时间窗口.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).sum(1).print();env.execute();}
}

                b、SlidingProcessingTimeWindows:滑动的处理时间窗口

public class Demo03ProcessingTime {public static void main(String[] args)  throws Exception{/*** 数据处理时间:一般会结合窗口使用,一般值的是接受数据后对数据操作的时间* 需求:每过5秒中统计15秒内的单词的数量*///构建Flink的环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//使用socket模拟实时的操作DataStreamSource<String> wordDS = env.socketTextStream("master", 8888);//将接受的数据的转换成kv的格式SingleOutputStreamOperator<Tuple2<String, Integer>> kvDS = wordDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING,Types.INT));//按照单词进行分组KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(key -> key.f0);//划分窗口,窗口的大小是10秒钟,滑动的时间是5秒钟WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)));//对统计的单词进行求和SingleOutputStreamOperator<Tuple2<String, Integer>> countDS = windowDS.sum(1);countDS.print();//启动Flinkenv.execute();}
}

                c、TumblingEventTimeWindows:滚动的事件时间窗口

kvDS.keyBy(kv -> kv.f0) 
//滚动的事件时间窗口.window(TumblingEventTimeWindows.of(Time.seconds(5))).sum(1).print();env.execute();

                d、TumblingProcessingTimeWindows:滚动的处理时间窗口

kvDS.keyBy(kv -> kv.f0)//滚动的处理时间窗口: 
.window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum(1).print();env.execute();
        2、Session Window (会话窗口):如果一段时间没有数据就会生成一个窗口,将前面的数据拉去过来一起计算。

        1、 ProcessingTimeSessionWindows: 处理时间的会话窗口,是针对每一个key都会统计他的数量。

        2、EventTimeSessionWindows: 事件时间的会话窗口(需要由时间字段和水位线)(使用的比较少)

public class Demo3SessionWindow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> linesDS = env.socketTextStream("master", 8888);/*** ProcessingTimeSessionWindows: 处理时间的会话窗口* EventTimeSessionWindows: 事件时间的会话窗口(需要由时间字段和水位线)**/linesDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT)).keyBy(kv -> kv.f0)//某一个key如果5秒没有数据产生,将前面的数据放一起进行计算.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))).sum(1).print();env.execute();}
}
        3、Count Window (统计窗口):固定的数据量计算一次

                1、countWindow(10): 滚动的统计窗口
                 2、countWindow(10,2):滑动的统计窗口

public class Demo2CountWindow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> linesDS = env.socketTextStream("master", 8888);/*** 实时统计单词的数量,每10条数据统计一次*  .countWindow(10): 滚动的统计窗口*  .countWindow(10,2):滑动的统计窗口*/linesDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT)).keyBy(kv -> kv.f0)//每隔10条数据计算一次,同一个key每隔10条计算一次.countWindow(10).sum(1).print();env.execute();}
}

注意:对于事件时间,需要指定时间字段和水位线,处理时间不需要指定。

http://www.lryc.cn/news/221909.html

相关文章:

  • 原语:串并转换器
  • 没网络也能安装.Net 3.5!如何脱机安装.NET Framework 3.5
  • JVM运行时数据区-虚拟机栈
  • Java中介者模式
  • 前端框架Vue学习 ——(五)前端工程化Vue-cli脚手架
  • App备案-iOS云管理式证书 Distribution Managed 公钥及证书SHA-1指纹的获取方法
  • Spring -Spring之依赖注入源码解析
  • Spire.Office for .NET 8.10.2 同步更新-Crk
  • MFC 基础篇(一)
  • Android技术-修改SO导出符号
  • flutter 打包apk
  • Halcon如何使用SaperaLT库连接dalsa相机
  • Vue 嵌套路由 多级路由规则
  • pandas教程:Introduction to pandas Data Structures pandas的数据结构
  • MinIO 分布式文件(对象)存储
  • HTML表单标签
  • 【黑马程序员】SpringCloud——Eureka
  • 目标跟踪(DeepSORT)
  • 2 任务2: 使用趋动云GPU进行猫狗识别实践
  • 技术分享 | app自动化测试(Android)--显式等待机制
  • 机器学习基础之《回归与聚类算法(5)—分类的评估方法》
  • 如何在macbook上删除文件?Mac删除文件的多种方法
  • Java代码Demo——Map根据key或value排序
  • 一个Linux自动备份脚本的示例
  • [论文阅读]PV-RCNN++
  • 测试老鸟整理,Postman加密接口测试-Rsa/Aes对参数加密(详细总结)
  • JavaScript使用对象
  • 微带线的ABCD矩阵的推导、转换与级联-Matlab计算实例
  • “网站不安全”该如何解决
  • gitlab数据备份和恢复