当前位置: 首页 > news >正文

flink自定义窗口分配器

背景

我们知道处理常用的滑动窗口分配器,滚动窗口分配器,全局窗口分配器,会话窗口分配器外,我们可以实现自己的自定义窗口分配器,以实现我们的自己的窗口逻辑

自定义窗口分配器的实现

package wikiedits.assigner;import com.google.common.collect.Lists;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import java.util.Collection;
import java.util.Collections;public class IntervalWindowAssignerextends WindowAssigner<Object, TimeWindow> {private static final long serialVersionUID = 1L;private long windowSize = 60 * 1000L;private IntervalWindowAssigner() {}@Overridepublic Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {long startTime = timestamp -  (timestamp % windowSize);long endTime = startTime + windowSize;return Lists.newArrayList(new TimeWindow(startTime, endTime));}@Overridepublic Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {return EventTimeTrigger.create();}@Override public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {return new TimeWindow.Serializer();}@Override public boolean isEventTime() {return true;}
}

注意,TimeWindow时间窗口是左边右开的形式,参见下图所示
在这里插入图片描述
代码里面是以maxTimeStamp()为准的

http://www.lryc.cn/news/187151.html

相关文章:

  • iOS CGRect CGPoint NSRange等结构体的NSLog打印输出
  • Viper FTP Mac/ftp管理工具
  • web漏洞-xml外部实体注入(XXE)
  • Impeller-Flutter的新渲染引擎
  • python 面试算法题
  • Python中的yield关键字
  • 怎么压缩pdf文件?分享缩小pdf文件的简单方法
  • 51单片机可调幅度频率波形信号发生器( proteus仿真+程序+原理图+报告+讲解视频)
  • Vuex的介绍
  • mysql基础语法速成版
  • Docker镜像 配置ssh
  • 12.2 实现键盘模拟按键
  • 《DevOps 精要:业务视角》- 读书笔记(七)
  • 【随想】每日两题Day.12(实则一题)
  • 基于复旦微JFM7K325T FPGA的高性能PCIe总线数据预处理载板(100%国产化)
  • 什么是原型链(prototype chain)?如何实现继承?
  • RabbitMQ 5种工作模式介绍和Springboot具体实现
  • C++ - 可变模版参数 - emplace相关接口函数 - 移动构造函数 和 移动赋值运算符重载 的 默认成员函数
  • 总结三:计算机网络面经
  • 服务器数据恢复-VMWARE ESX SERVER虚拟机数据恢复案例
  • COCI 2021-2022 #1 - Set 题解
  • 分享40个极具商业价值的chatGPT提问prompt
  • 一文搞懂到底什么是元宇宙
  • 【重拾C语言】六、批量数据组织(四)线性表—栈和队列
  • 力扣刷题-哈希表-一个字符串是否能够由另一个字符串中的字符组成
  • Android使用AOP切面编程
  • Flutter学习笔记
  • 软件生命周期中的概念设计和详细设计的主要任务是什么
  • 大数据学习(2)Hadoop-分布式资源计算hive(1)
  • 深入探究HTML表单与JavaScript的关系