当前位置: 首页 > news >正文

38、Flink 的 WindowAssigner 之 GlobalWindows 示例

1、注意

使用 GlobalWindows 需要自定义 Trigger,否则窗口中的数据不会被计算。

2、代码示例

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;public class _05_WindowAssignerGlobal {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> input = env.socketTextStream("localhost", 8888);// 此窗口模式仅在指定了自定义的 trigger 时有用,否则计算不会发生,因为全局窗口没有天然的终点去触发其中积累的数据input.keyBy(e -> e)// 多并行 Task.window(GlobalWindows.create()).trigger(new Trigger<String, GlobalWindow>() {@Overridepublic TriggerResult onElement(String s, long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {return null;}@Overridepublic TriggerResult onProcessingTime(long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {return null;}@Overridepublic TriggerResult onEventTime(long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {return null;}@Overridepublic void clear(GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {}}).apply(new WindowFunction<String, String, String, GlobalWindow>() {@Overridepublic void apply(String s, GlobalWindow globalWindow, Iterable<String> iterable, Collector<String> collector) throws Exception {for (String res : iterable) {collector.collect(res);}}}).print();env.execute();}
}
http://www.lryc.cn/news/364719.html

相关文章:

  • 同事仅靠着自己写的npm包跳槽去了大厂,羡慕了一整天
  • Yocto - bitbake任务中clean和cleanall的区别
  • Spring 中如何控制 Bean 的加载顺序?
  • 【学习笔记】Windows GDI绘图(十)Graphics详解(中)
  • web学习笔记(六十二)
  • 每天CTF小练一点--ctfshow年CTF
  • Java Set接口 - TreeSet类
  • css 理解了原理,绘制三角形就简单了
  • 【JavaEE进阶】——MyBatis操作数据库 (#{}与${} 以及 动态SQL)
  • 电阻应变片的结构
  • 云原生时代:从 Jenkins 到 Argo Workflows,构建高效 CI Pipeline
  • 【数据库系统概论】事务
  • C++-排序算法详解
  • Kotlin 引用(双冒号::)
  • C++ day3练习
  • 命令模式(行为型)
  • 韩雪医生针药结合效果好 患者赠送锦旗表感谢
  • 【队列、堆、栈 解释与区分】
  • NTP网络时间服务器_安徽京准电钟
  • Java:爬虫框架
  • ChatGPT基本原理详细解说
  • Java日期时间处理深度解析:从Date、Calendar到SimpleDateFormat
  • Flutter 中的 CupertinoUserInterfaceLevel 小部件:全面指南
  • 区块链学习记录01
  • python--装饰器
  • Docker:定义未来的软件部署
  • ros常用环境变量
  • python学习 - 爬虫案例 - 爬取链接房产信息入数据库代码实例
  • Git 完整操作之记录
  • mediaPlayer的内存泄露解决方法