当前位置: 首页 > news >正文

大数据-玩转数据-Flink窗口函数

一、Flink窗口函数

前面指定了窗口的分配器, 接着我们需要来指定如何计算, 这事由window function来负责. 一旦窗口关闭, window function 去计算处理窗口中的每个元素.
window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一种.
ReduceFunction,AggregateFunction更加高效, 原因就是Flink可以对到来的元素进行增量聚合 . ProcessWindowFunction 可以得到一个包含这个窗口中所有元素的迭代器, 以及这些元素所属窗口的一些元数据信息.
ProcessWindowFunction不能被高效执行的原因是Flink在执行这个函数之前, 需要在内部缓存这个窗口上所有的元素。
除了一些简单聚合,比如 sum,max,min,maxBay,minBay ,有以下窗口聚合函数。

二、ReduceFunction(增量聚合函数)

输入和输出必须一致

package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class Window_s_function {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).reduce(new ReduceFunction<WaterSensor>() {@Overridepublic WaterSensor reduce(WaterSensor value1,WaterSensor value2) throws Exception {System.out.println("Window_s_function.reduce");value1.setVc ( value1.getVc() + value2.getVc());return (value1);}}).print();env.execute();}
}

运行结果
在这里插入图片描述
在这里插入图片描述

三、AggregateFunction(增量聚合函数)

输入和输出可以不一致

package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.kafka.common.metrics.stats.Avg;import java.util.List;public class Window_s_function_2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).aggregate(new AggregateFunction<WaterSensor, Avg, Double>() {@Overridepublic Avg createAccumulator() {return new Avg();}@Overridepublic Avg add(WaterSensor value, Avg acc) {acc.sum += value.getVc();acc.couunt++;return acc;}@Overridepublic Double getResult(Avg acc) {return acc.sum * 1.0 / acc.couunt;}@Overridepublic Avg merge(Avg avg, Avg acc1) {return null;}},new ProcessWindowFunction<Double, String, String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<Double> elements,Collector<String> out) throws Exception {Double result = elements.iterator().next();long starttime = ctx.window().getStart();long endtime = ctx.window().getEnd();out.collect("窗口:" + starttime + "  " + endtime +  " key: " + key + " result: " + result);}}).print();env.execute();}public static class Avg {public Integer sum = 0;public Long couunt = 0L;};
}

运行结果
在这里插入图片描述
在这里插入图片描述

四、ProcessWindowFunction(全窗口函数)

上面例子里已经用到

new ProcessWindowFunction<Double, String, String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<Double> elements,Collector<String> out) throws Exception {Double result = elements.iterator().next();long starttime = ctx.window().getStart();long endtime = ctx.window().getEnd();out.collect("窗口:" + starttime + "  " + endtime +  " key: " + key + " result: " + result);}}
http://www.lryc.cn/news/143017.html

相关文章:

  • Docker网络-探索容器网络如何相互通信
  • ESP32-CAM模块Arduino环境搭建测试
  • webassembly001 webassembly简述
  • vue 使用C-Lodop打印小票
  • 【C++进阶(二)】STL大法--vector的深度剖析以及模拟实现
  • 1. import pandas as pd 导入库
  • DMK5框选变量之后不显示其他位置的此变量高亮
  • 0061__Appium
  • 【DEVOPS】需求跟踪管理全面落地
  • 算法修炼Day57|647. 回文子串 ● 516.最长回文子序列
  • 呈现数据的精妙之道:选择合适的可视化方法
  • 数据结构(Java实现)-java对象的比较
  • Wolfram Mathematica 13 for Mac 数学计算工具
  • 系统架构设计高级技能 · Web架构
  • 再写CentOS7升级OpenSSL-1.0.1U
  • HBase--技术文档--基本概念--《快速扫盲》
  • 如何利用SFTP协议远程实现更安全的文件传输 ——【内网穿透】
  • 深度学习8:详解生成对抗网络原理
  • sql入门-多表查询
  • 软考A计划-网络工程师-必考知识点-上
  • kafka复习:(17)seekToBeginning的用法
  • C# textBox1.Text=““与textBox1.Clear()的区别
  • CnetSDK .NET OCR SDK Crack
  • Python最新面试题汇总及答案
  • 设计模式(单例模式,工厂模式),线程池
  • 在mybatis中的mapper.xml中如何使用parameterType实现方法单个传参,对象传参,多参数传参.
  • No120.精选前端面试题,享受每天的挑战和学习
  • c# 访问sqlServer数据库时的连接字符串
  • 排序算法概述
  • ChatGPT在高等教育中的应用利弊探讨