当前位置: 首页 > news >正文

flink的AggregateFunction,merge方法作用范围

背景

AggregateFunction接口是我们经常用的窗口聚合函数,其中有一个merge方法,我们一般情况下也是实现了的,但是你知道吗,其实这个方法只有在你使用会话窗口需要进行窗口合并的时候才需要实现

AggregateFunction.merge方法调用时机

AggregateFunction.merge方法其实只有在使用会话窗口进行窗口合并的时候才会用到,如下所示
在这里插入图片描述

对应的源码首先查看WindowOperator.processElement方法对要合并的窗口的状态进行合并

public void processElement(StreamRecord<IN> element) throws Exception {final Collection<W> elementWindows =windowAssigner.assignWindows(element.getValue(), element.getTimestamp(), windowAssignerContext);// if element is handled by none of assigned elementWindowsboolean isSkippedElement = true;final K key = this.<K>getKeyedStateBackend().getCurrentKey();if (windowAssigner instanceof MergingWindowAssigner) {MergingWindowSet<W> mergingWindows = getMergingWindowSet();for (W window : elementWindows) {// adding the new window might result in a merge, in that case the actualWindow// is the merged window and we work with that. If we don't merge then// actualWindow == windowW actualWindow =mergingWindows.addWindow(window,new MergingWindowSet.MergeFunction<W>() {@Overridepublic void merge(W mergeResult,Collection<W> mergedWindows,W stateWindowResult,Collection<W> mergedStateWindows)throws Exception {triggerContext.key = key;triggerContext.window = mergeResult;triggerContext.onMerge(mergedWindows);for (W m : mergedWindows) {triggerContext.window = m;triggerContext.clear();deleteCleanupTimer(m);}// 合并窗口的状态windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);}});

继续查看AbstractHeapMergingState.mergeNamespaces方法,

public void mergeNamespaces(N target, Collection<N> sources) throws Exception {if (sources == null || sources.isEmpty()) {return; // nothing to do}final StateTable<K, N, SV> map = stateTable;SV merged = null;// merge the sourcesfor (N source : sources) {// get and remove the next source per namespace/keySV sourceState = map.removeAndGetOld(source);if (merged != null && sourceState != null) {//此处合并状态并调用AggregateFunction.merge方法merged = mergeState(merged, sourceState);} else if (merged == null) {merged = sourceState;}}// merge into the target, if neededif (merged != null) {map.transform(target, merged, mergeTransformation);}
}//真正调用AggregateFunction.merge方法合并自定义的状态
@Override
protected ACC mergeState(ACC a, ACC b) {return aggregateTransformation.aggFunction.merge(a, b);
}

这样AggregateFunction.merge的调用过程就清楚了,实际应用中,我们只需要在使用会话窗口时才需要实现这个方法,其他的基于时间窗口的方式不需要实现这个方法,当然实现了也不会有错

http://www.lryc.cn/news/224224.html

相关文章:

  • Day25力扣打卡
  • SpringCloud - OpenFeign 参数传递和响应处理(全网最详细)
  • Postgresql数据类型-布尔类型
  • SPASS-交叉表分析
  • 用Python的requests库来模拟爬取地图商铺信息
  • 使用EvoMap/Three.js模拟无人机灯光秀
  • 11.9存储器实验总结(单ram,双ram,FIFO)
  • linux(ubuntu)安装并使用scrcpy
  • linux rsyslog安装配置
  • 美国Embarcadero公司正式发布2023 RAD Studio Delphi C++ Builder 12 Athens
  • 树莓派4B的测试记录(CPU、FFMPEG)
  • 物联网AI MicroPython学习之语法 二进制与ASCII转换
  • 学之思项目的搭建部署 打jar包失败的解决方法
  • [100天算法】-定长子串中元音的最大数目(day 67)
  • Elastic Observability 8.11:ES|QL、APM 中的通用分析和增强的 SLOs
  • TexGen简单模型对应inp文件简单梳理-2
  • VUE获取当前日期的周日和周六
  • K8S篇之k8s containerd模式fail to pull image certificate signed by unknown authority
  • 算法进阶指南图论 最优贸易
  • 【Android】Debug时禁用主线程ANR限制
  • P6入门:项目初始化1-项目详情介绍
  • 进行 “最佳价格查询器” 的开发
  • Brain Teaser概率类 - 三局两胜制
  • 在现实生活中传感器GV-H130/GV-21的使用
  • 海康Visionmaster-全局脚本:通过通讯触发快速匹配 模块换型的方法
  • 什么是闭包
  • sql6(Leetcode1387使用唯一标识码替换员工ID)
  • qt-C++笔记之Qt中的时间与定时器
  • 【C++】复杂的多继承及其缺陷(菱形继承)
  • esp32-rust-no_std-examples-blinky