当前位置: 首页 > news >正文

flink判断两个事件之间有没有超时(不使用CEP)

1.为啥不使用cep呢,cep的超时时间设置不好配置化,无法满足扩展要求

2.超时怎么界定。A事件发生后,过了N时间,还没有收到B事件,算超时。

代码如下:


import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;@Slf4j
public class AsyncModelTimeoutHandler extends KeyedProcessFunction<String, JSONObject, JSONObject> {private static final long serialVersionUID = -61608451659272532L;private transient ValueState<Long> firstDataTime;private transient ValueState<Long> secondDataTime;private transient ValueState<String> eventType;@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<Long> firstDataDescriptor = new ValueStateDescriptor<>("firstDataTime", Long.class);firstDataTime = getRuntimeContext().getState(firstDataDescriptor);ValueStateDescriptor<Long> secondDataDescriptor = new ValueStateDescriptor<>("secondDataTime", Long.class);secondDataTime = getRuntimeContext().getState(secondDataDescriptor);ValueStateDescriptor<String> eventTypeDescriptor = new ValueStateDescriptor<>("eventType", String.class);eventType = getRuntimeContext().getState(eventTypeDescriptor);}@Overridepublic void processElement(JSONObject value, KeyedProcessFunction<String, JSONObject, JSONObject>.Context ctx, Collector<JSONObject> out) throws Exception {Long currentTimestamp = value.getLong("ts");if (value.containsKey("timeout")) {//异步请求消息long timeout = value.getLong("timeout");firstDataTime.update(currentTimestamp + timeout);eventType.update(value.getString("event"));ctx.timerService().registerProcessingTimeTimer(currentTimestamp + timeout);} else {secondDataTime.update(currentTimestamp);}}@Overridepublic void onTimer(long timestamp, KeyedProcessFunction<String, JSONObject, JSONObject>.OnTimerContext ctx, Collector<JSONObject> out) throws Exception {Long firstTime = firstDataTime.value();Long lastTime = secondDataTime.value();if (lastTime == null || (firstTime != null && lastTime >= firstTime)) {//超时了log.info("AsyncModelTimeoutHandler onTimer handle triggerTime={}, firstTime={}, secondTime={},key={}", timestamp, firstTime, lastTime, ctx.getCurrentKey());JSONObject r = new JSONObject();r.put("id", ctx.getCurrentKey());r.put("judgeTime", timestamp);r.put("event", eventType.value());out.collect(r);}firstDataTime.clear();secondDataTime.clear();eventType.clear();}
}
http://www.lryc.cn/news/534036.html

相关文章:

  • 二级C语言题解:十进制转其他进制、非素数求和、重复数统计
  • 打家劫舍3
  • 练习题(2025.2.9)
  • 【练习】PAT 乙 1074 宇宙无敌加法器
  • 网络防御高级02-综合实验
  • UITableView的复用原理
  • SQL条件分支中的大讲究
  • Cherry Studio:一站式多模型AI交互平台深度解析 可配合大模型搭建私有知识库问答系统
  • 工业相机,镜头的选型及实战
  • C++模板学习从专家到入门:关键字typename与class
  • BFS算法篇——FloodFill问题的高效解决之道(下)
  • Android性能优化
  • 1、http介绍
  • 2.6 寒假训练营补题
  • kafka生产者之发送模式与ACK
  • 笔记:蓝桥杯python搜索(3-2)——DFS剪支和记忆化搜索
  • ChatBox+硅基流动Deepseek_R1开源API 满血(671B)部署教程,全程干货无废话
  • 35~37.ppt
  • 畅快使用DeepSeek-R1的方法
  • 【人工智能】Python中的序列到序列(Seq2Seq)模型:实现机器翻译
  • 【算法】动态规划专题⑥ —— 完全背包问题 python
  • 记一次基于manifest v3开发谷歌插件
  • C# OpenCvSharp 部署MOWA:多合一图像扭曲模型
  • 本地部署DeepSeek-R1模型(新手保姆教程)
  • 神经网络常见激活函数 5-PReLU函数
  • 2025我的第二次社招,写在春招之季
  • Visual Studio Code中文出现黄色框子的解决办法
  • threejs开源代码之-旋转的彩色立方体
  • visual studio 2008的试用版评估期已结束的解决办法
  • 解锁 DeepSeek 模型高效部署密码:蓝耘平台深度剖析与实战应用