当前位置: 首页 > news >正文

【开发篇】一、处理函数:定时器与定时服务

文章目录

  • 1、基本处理函数
  • 2、定时器和定时服务
  • 3、KeyedProcessFunction下演示定时器
  • 4、process重获取当前watermark

前面API篇完结,对数据的转换、聚合、窗口等,都是基于DataStream的,称DataStreamAPI,如图:
在这里插入图片描述

在Flink底层,可以不定义具体是什么算子,而只是一个统一的处理(process)操作,里面可以自定义逻辑。即图中底层的处理函数层。从下到上,封装越来越重,使用越来越简单。前面用的map等都是Flink封装好的,底层则是process。当现有的算子无法实现需求时,直接用process就行,最底层,最灵活,逻辑你自己开发就行,自定义处理逻辑!!!

1、基本处理函数

处理函数的使用和前面的转换算子一样,基于DataStream对象调用即可:

stream.process(new MyProcessFunction())
  • ProcessFunction不是接口,而是一个抽象类,继承了AbstractRichFunction

  • ProcessFunction的两个泛型:I表示Input,是输入的数据类型;O即Output,是处理完成之后输出的数据类型

  • ProcessFunction抽象类有抽象方法processElement须重写,以及非抽象方法onTimer

public abstract class ProcessFunction<I, O> extends AbstractRichFunction {...public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}...}

抽象方法 processElement:

  • 定义处理元素的逻辑
  • 流中的每个元素都会调用一次这个房啊
  • 三个形参分别为:流中数据value自身、上下文对象ctx获取相关信息、收集器out往下游发处理完的数据

非抽象方法onTimer:

  • 定时器触发时调用这个方法
  • 注册定时器即设一个闹钟,onTimer则是闹钟响了以后要做的事
  • onTimer是基于时间线的一个回调方法
  • onTimer的三个形参分别为:时间戳(timestamp),上下文(ctx),以及收集器(out)

最初的DataStream流在经过不同的操作后会得到不同类型的流,比如keyBy后的KeyedStream,window后的WindowedStream。对于这些不同类型的流,其实都可以直接调用.process()方法进行自定义处理,不过process重载,传参是不同类型的ProcessFunction

关于处理函数的分类:

  • 在什么情况下调用process方法,就传入一个什么类型的ProcessFunction
  • 具体类型,在process下Ctrl+P查看传参提示就行,比如DataStream下传ProcessFunction,按键分区后得到KeyedStream传KeyedProcessFunction

2、定时器和定时服务

ProcessFunction的上下文对象Context有timerService()方法,可返回一个TimerService对象。TimerService是Flink实现定时功能的关键。其常用方法:

  • 获取当前的处理时间
long currentProcessingTime();
  • 获取当前的水位线(事件时间)
long currentWatermark();
  • 注册处理时间定时器,当处理时间超过time时触发
void registerProcessingTimeTimer(long time);
  • 注册事件时间定时器,当水位线超过time时触发
void registerEventTimeTimer(long time);
  • 删除触发时间为time的处理时间定时器
void deleteProcessingTimeTimer(long time);
  • 删除触发时间为time的处理时间定时器
void deleteEventTimeTimer(long time);

注意:

  • 只有在KeyedStream中才支持使用TimerService设置定时器的操作
  • TimerService会以键(key)和时间戳为标准,对定时器进行去重,即同样的key和时间戳下,定时器只会留一个,触发时onTimer只被调用一次

3、KeyedProcessFunction下演示定时器

事件时间下的定时器演示:定义一个5s的定时器,在水位线时间到达5s时触发

public class KeyedProcessTimerDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("node01", 9527).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))  //乱序默认的水位生成器.withTimestampAssigner((element, ts) -> element.getTs() * 1000L)  //时间戳提取);KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// TODO Process:keyedSingleOutputStreamOperator<String> process = sensorKS.process(new KeyedProcessFunction<String, WaterSensor, String>() {/*** 来一条数据调用一次* @param value  每条数据* @param ctx 上下文对象* @param out 采集器* @throws Exception*/@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {//获取当前数据的keyString currentKey = ctx.getCurrentKey();// 获取定时器服务对象TimerService timerService = ctx.timerService();// 数据中提取出来的事件时间Long currentEventTime = ctx.timestamp(); //注册定时任务,水位线被推到5s时触发timerService.registerEventTimeTimer(5000L);System.out.println("当前key=" + currentKey + ",当前时间=" + currentEventTime + ",注册了一个5s的定时器");/*** 时间进展到定时器注册的时间,调用该方法* @param timestamp 当前时间进展,就是定时器被触发时的时间* @param ctx       上下文* @param out       采集器* @throws Exception*/@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);String currentKey = ctx.getCurrentKey();System.out.println("key=" + currentKey + "现在时间是" + timestamp + "定时器触发");}});process.print();env.execute();}
}

运行:注意时间戳8s时水位线为8s-3s-1ms < 5s,即当前最大事件时间 - 等待延迟时间 - 1ms,因此未触发,且同一个key,同一个定时时间,只有一个定时器生效:

在这里插入图片描述

看下不同key的效果,注意,水位线是多少和key没关系,s1,9,9进去,直接水位线变成9-3-1ms > 5s,三个定时器都触发

在这里插入图片描述

再用处理时间下的定时器:

public class KeyedProcessTimerDemo {public static void main(String[] args) throws Exception {//...重复代码略,同上KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// TODO Process:keyedSingleOutputStreamOperator<String> process = sensorKS.process(new KeyedProcessFunction<String, WaterSensor, String>() {@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {//获取当前数据的keyString currentKey = ctx.getCurrentKey();TimerService timerService = ctx.timerService();//当前数据的处理时间long currentTs = timerService.currentProcessingTime();//定时器不用水位线为标杆,直接处理时间加5stimerService.registerProcessingTimeTimer(currentTs + 5000L);System.out.println("当前key=" + currentKey + ",当前时间=" + currentTs + ",注册了一个5s后的定时器");}//...重复代码略,同上
}

运行:

在这里插入图片描述

4、process重获取当前watermark

还是用上面的socket流,但process逻辑不玩定时器,验证下watermark:

//...重复代码略,同上@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {     // 获取 process的 当前watermarklong currentWatermark = timerService.currentWatermark();System.out.println("当前数据=" + value + ",当前watermark=" + currentWatermark);}

此时可以看到,s1,1,1进去,水位线本应为1000ms-3000ms-1ms = -2001,但通过timerService获取到的却是起始值,就那个Long.MIN,直到s1,5进去,才获取到-2001,依次往下,都差一个

在这里插入图片描述

在process重获取当前的watermark,显示的是上一次的watermark,因为process还没接收到这条数据对应的生成的新的watermark。关键点:watermark也是一个数据,要跟着流中对应的那个数据往下游流。

在这里插入图片描述
在这里插入图片描述

上图示意了为什么s5,5获取到的水位线为-2001,因为此时process还没接收到这条数据对应的生成的新的watermark(1999还在process框外,框内只有一个-2001)

http://www.lryc.cn/news/207989.html

相关文章:

  • 重入漏洞EtherStore
  • 账号运营的底层逻辑---获客思维
  • Pinia中如何实现数据持久化操作
  • 【owt-server】RTC视频接收调用流程学习笔记1: Call::CreateVideoReceiveStream 前后
  • 淘宝商品链接获取淘宝商品评论数据(用 Python实现淘宝商品评论信息抓取)
  • 十九、类型信息(1)
  • 十八、字符串(3)
  • 基于SSM的酒店预约及管理系统设计与实现
  • MIxformerV2的onnx和tensorrt加速
  • Kotlin 中let 、run 、with、apply、also的用法与区别
  • PHP函数的定义与最简单后门原理
  • PlantSimulation访问本地Excel文件的方法
  • 使用微PE工具箱制作winU盘启动盘~重装系统
  • 漏洞复现-jquery-picture-cut 任意文件上传_(CVE-2018-9208)
  • Golang Websocket框架:实时通信的新选择
  • ExoPlayer架构详解与源码分析(7)——SampleQueue
  • 第二证券:基本面改善预期强化 机构聚焦科技成长
  • 大语言模型在天猫AI导购助理项目的实践!
  • 【STM32】GPIO控制LED(HAL库版)
  • 第27届亚洲国际动力传动与控制技术展览会盛大开幕,意大利国家展团闪耀回归
  • 永恒之蓝漏洞 ms17_010 详解
  • 汽车托运全流程介绍
  • 【API篇】八、Flink窗口函数
  • React JSX常用语法总结
  • DVWA-Cross Site Request Forgery (CSRF)
  • 浅谈安科瑞可编程电测仪表在老挝某项目的应用
  • Java项目源码合集
  • Python学习笔记--生成器
  • 【Python学习】—Python基础语法(五)
  • 【js】JavaScript清除所有(多个)定时器的方法: