当前位置: 首页 > news >正文

大数据-玩转数据-Flink定时器

一、说明

基于处理时间或者事件时间处理过一个元素之后, 注册一个定时器, 然后指定的时间执行.
Context和OnTimerContext所持有的TimerService对象拥有以下方法:
currentProcessingTime(): Long 返回当前处理时间
currentWatermark(): Long 返回当前watermark的时间戳
registerProcessingTimeTimer(timestamp: Long): Unit 会注册当前key的processing time的定时器。当processing time到达定时时间时,触发timer。
registerEventTimeTimer(timestamp: Long): Unit 会注册当前key的event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
deleteProcessingTimeTimer(timestamp: Long): Unit 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。
deleteEventTimeTimer(timestamp: Long): Unit 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。

二、基于处理时间的定时器

package com.lyh.flink08;import com.lyh.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;public class ProcessTime {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> stream = env.socketTextStream("hadoop100", 9999).map(line -> {String[] datas = line.split(",");return new WaterSensor(datas[0],Long.valueOf(datas[1]),Integer.valueOf(datas[2]));});stream.keyBy(WaterSensor::getId).process(new KeyedProcessFunction<String, WaterSensor, String>() {@Overridepublic void processElement(WaterSensor value,Context ctx,Collector<String> out) throws Exception {ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5000);out.collect(value.toString());}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {System.out.println(timestamp);out.collect("wo be chu fa le ");}}).print();env.execute();}
}

三、基于事件时间的定时器

package com.lyh.flink08;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import java.time.Duration;public class EventTime_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> stream = env.socketTextStream("hadoop100", 9999).map(line -> {String[] datas = line.split(",");return new WaterSensor(datas[0],Long.valueOf(datas[1]),Integer.valueOf(datas[2]));});WatermarkStrategy<WaterSensor> wms = WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element,recordTimestamp) -> element.getTs() * 1000);stream.assignTimestampsAndWatermarks(wms).keyBy(WaterSensor::getId).process(new KeyedProcessFunction<String, WaterSensor, String>() {@Overridepublic void processElement(WaterSensor value,Context ctx,Collector<String> out) throws Exception {System.out.println(ctx.timestamp());ctx.timerService().registerProcessingTimeTimer(ctx.timestamp()+5000);out.collect(value.toString());}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {System.out.println("定时器被触发了");}}).print();env.execute();}
}
http://www.lryc.cn/news/149999.html

相关文章:

  • Linux 操作系统实战视频课 - GPIO 基础介绍
  • ChatGPT在医疗保健信息管理和电子病历中的应用前景如何?
  • 安防监控/视频存储/视频汇聚平台EasyCVR接入海康Ehome车载设备出现收流超时的原因排查
  • 【zookeeper】zookeeper监控指标查看
  • Flink 如何处理反压?
  • JAVA基础-JDBC
  • 嵌入式学习笔记(1)ARM的编程模式和7种工作模式
  • [NSSCTF Round #15NSSCTF 2nd]——Web、Misc、Crypto方向 详细Writeup
  • Metasploit“MSF”连接postgresql时因排序规则版本不匹配导致无法连接
  • CCF CSP题解:矩阵运算(202305-2)
  • 划分字母区间【贪心算法】
  • 低代码的探索之路
  • easyUI combobox不可手动输入和禁用
  • RV64和ARM64栈结构差异
  • 将 Python 与 RStudio IDE 配合使用(R与Python系列第一篇)
  • 数据库访问性能优化
  • vue 预览 有token验证的 doc、docx、pdf、xlsx、csv、图片 并下载
  • WPF数据视图
  • C++ new/delete 与 malloc/free 的区别?
  • 【数学建模】常微分,偏微分方程
  • 浙大数据结构之09-排序1 排序
  • Pydantic 学习随笔
  • 11 mysql float/double/decimal 的数据存储
  • 【高效数据结构——位图bitmap】
  • ArrayList LinkedList
  • iOS砸壳系列之三:Frida介绍和使用
  • Git学习——细节补充
  • 【设计模式】Head First 设计模式——装饰者模式 C++实现
  • layui实现数据列表的复选框回显
  • 关于使用RT-Thread系统读取stm32的adc无法连续转换的问题解决