当前位置: 首页 > news >正文

Flink 统计接入的数据量-滚动窗口和状态的使用

1、概述

在生产场景值,经常需要和上游、下游对数,离线场景可以直接 group by 再 count ,但是实时场景中,如果使用 kafka 作为中间件,中间经过几个 job 的过滤转化后,再对照像 Doris 或 Clickhouse 中最终层的数据,如果出现缺失,很难判断是哪一层缺失的。

2、使用 侧流输出+处理时间的滚动窗口+状态进行数据量级统计

package com.flink.feature.windowcount;import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;/*** 1、先输入数据* 1* 1* 1* 1* 输出结果* main=>:8> 业务处理=>1* main=>:1> 业务处理=>1* main=>:2> 业务处理=>1* main=>:3> 业务处理=>1* 每10秒每个key接受到的数据量=>:2> (1698913020000,1698913030000,窗口统计=>1,4)** 2、再输入数据* 1* 2* 2* 3* 3* 4* 4* 输出结果* main=>:4> 业务处理=>1* main=>:5> 业务处理=>2* main=>:6> 业务处理=>2* main=>:7> 业务处理=>3* main=>:8> 业务处理=>3* main=>:1> 业务处理=>4* main=>:2> 业务处理=>4* 每10秒每个key接受到的数据量=>:2> (1698913030000,1698913040000,窗口统计=>1,1)* 每10秒每个key接受到的数据量=>:7> (1698913030000,1698913040000,窗口统计=>4,2)* 每10秒每个key接受到的数据量=>:6> (1698913030000,1698913040000,窗口统计=>2,2)* 每10秒每个key接受到的数据量=>:6> (1698913030000,1698913040000,窗口统计=>3,2)*/public class UseWindowValidateData {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();OutputTag<Tuple2<String,Integer>> windowCountTag = new OutputTag<Tuple2<String,Integer>>("window_count"){};DataStreamSource<String> source = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<String> process = source.process(new ProcessFunction<String, String>() {@Overridepublic void processElement(String input, ProcessFunction<String, String>.Context ctx, Collector<String> collector) throws Exception {ctx.output(windowCountTag,new Tuple2<>("窗口统计=>"+input,1));collector.collect("业务处理=>" + input);}});process.getSideOutput(windowCountTag).keyBy(new KeySelector<Tuple2<String,Integer>, String>() {@Overridepublic String getKey(Tuple2<String,Integer> tp) throws Exception {return tp.f0;}}).window(TumblingProcessingTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple4<String,String,String,Integer>, String, TimeWindow>() {private MapState<String, Integer> mapState;@Overridepublic void open(Configuration parameters) throws Exception {MapStateDescriptor<String, Integer> stateDescriptor = new MapStateDescriptor<>("map-state", String.class, Integer.class);mapState = getRuntimeContext().getMapState(stateDescriptor);}@Overridepublic void process(String key, ProcessWindowFunction<Tuple2<String, Integer>, Tuple4<String, String, String, Integer>, String, TimeWindow>.Context ctx, Iterable<Tuple2<String, Integer>> elements, Collector<Tuple4<String, String, String, Integer>> out) throws Exception {for (Tuple2<String, Integer> tp : elements) {Integer res = mapState.get(tp.f0);if (res == null) {res = 0;}res += 1;mapState.put(tp.f0, res);}out.collect(new Tuple4<>(String.valueOf(ctx.window().getStart()),String.valueOf(ctx.window().getEnd()),key,mapState.get(key)));// 每个窗口计算后清空状态mapState.clear();}}).print("每10秒每个key接受到的数据量=>");process.print("main=>");env.execute();}
}

3、测试结果

1)先输入数据

1
1
1
1

输出结果

main=>:8> 业务处理=>1
main=>:1> 业务处理=>1
main=>:2> 业务处理=>1
main=>:3> 业务处理=>1

每10秒每个key接受到的数据量=>:2> (1698913020000,1698913030000,窗口统计=>1,4)

2)再输入数据

1
2
2
3
3
4
4

输出结果

main=>:4> 业务处理=>1
main=>:5> 业务处理=>2
main=>:6> 业务处理=>2
main=>:7> 业务处理=>3
main=>:8> 业务处理=>3
main=>:1> 业务处理=>4
main=>:2> 业务处理=>4

每10秒每个key接受到的数据量=>:2> (1698913030000,1698913040000,窗口统计=>1,1)
每10秒每个key接受到的数据量=>:7> (1698913030000,1698913040000,窗口统计=>4,2)
每10秒每个key接受到的数据量=>:6> (1698913030000,1698913040000,窗口统计=>2,2)
每10秒每个key接受到的数据量=>:6> (1698913030000,1698913040000,窗口统计=>3,2)

http://www.lryc.cn/news/216932.html

相关文章:

  • SpringBoot快速整合canal1.1.5(TCP模式)
  • docker打包container成image,然后将image上传到docker hub
  • 设计模式—创建型模式之原型模式
  • Zygote进程通信为什么用Socket而不是Binder?
  • API接口加密,解决自动化中登录问题
  • COCOS2DX3.17.2 Android升级targetSDK30问题解决方案
  • HarmonyOS鸿蒙原生应用开发设计- 隐私声明
  • 【面试精选】00后卷王带你三天刷完软件测试面试八股文
  • k-means算法c++实现
  • oracle查询哪些用户下有表
  • 机器人连杆惯量参数辨识(估计)
  • 一座 “数智桥梁”,华为助力“天堑变通途”
  • C#知识总结 基础篇(上)
  • 照片编辑软件Affinity Photo 2 for Mac v2.1.1中文激活版 2024年最新中文版下载
  • TPAMI 2023 | Temporal Perceiver:通用时序边界检测方法
  • Unity-UV展开工具
  • springboot actuator jvm监控丢失
  • UDP服务端和客户端通信代码开发流程
  • 数据库实验:SQL的数据定义与单表查询
  • P3398 仓鼠找 sugar
  • C# 发送邮件
  • Zeal下载文档慢的问题
  • HR模块开发(1):简单的开发流程和注意事项
  • 创建Vue实例
  • 2024上海国际人工智能展(CSITF)以“技术,让生活更精彩”为核心理念,以“创新驱动发展,保护知识产权,促进技术贸易”为主题
  • Vue3使用Monaco-editor
  • java 根据ip获取到城市 GeoLite2-City.mmdb
  • kaggle使用说明
  • BUUCTF FLAG 1
  • 万物皆可“云” 从杭州云栖大会看数智生活的未来