当前位置: 首页 > news >正文

Flink笔记整理(五)

Flink笔记整理(五)

文章目录

  • Flink笔记整理(五)
  • 七、处理函数(最底层最常用最灵活)
    • 7.1基本处理函数(ProcessFunction)
      • 处理函数的功能和使用
      • ProcessFunction解析
    • 7.2按键分区处理函数(KeyedProcessFunction)
      • 定时器(Timer)和定时服务(TimeService)
    • 7.3 窗口处理函数
      • 窗口处理函数的使用
      • ProcessWindowFunction解析
    • 7.4 应用案例——Top N
  • 总结


七、处理函数(最底层最常用最灵活)

之前所介绍的流处理API,无论是基本的转换、聚合,还是更为复杂的窗口操作,其实都是基于DataStream进行转换的,所以可以统称为DataStream API。

在Flink更底层,我们可以不定义任何具体的算子(比如map,filter,或者window),而只是提炼出一个统一的“处理”(process)操作——它是所有转换算子的一个概括性的表达,可以自定义处理逻辑,所以这一层接口就被叫作“处理函数”(process function)。

在这里插入图片描述

7.1基本处理函数(ProcessFunction)

处理函数的功能和使用

之前学习的转换算子,一般只是针对某种具体操作来定义的,能够拿到的信息比较有限。如果我们想要访问事件的时间戳,或者当前的水位线信息,都是完全做不到的。跟时间相关的操作,目前我们只会用窗口来处理。而在很多应用需求中,要求我们对时间有更精细的控制,需要能够获取水位线,甚至要“把控时间”、定义什么时候做什么事,这就不是基本的时间窗口能够实现的了。

这时就需要使用底层的处理函数。处理函数提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件(event)、时间戳(timestamp)、水位线(watermark),甚至可以注册“定时事件”。而且处理函数继承了AbstractRichFunction抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息。此外,处理函数还可以直接将数据输出到侧输出流(side output)中。所以,处理函数是最为灵活的处理方法,可以实现各种自定义的业务逻辑。

处理函数的使用与基本的转换操作类似,只需要直接基于DataStream调用.process()方法就可以了。方法需要传入一个ProcessFunction作为参数,用来定义处理逻辑。

stream.process(new MyProcessFunction())

这里ProcessFunction不是接口,而是一个抽象类,继承了AbstractRichFunction;MyProcessFunction是它的一个具体实现。所以所有的处理函数,都是富函数(RichFunction),富函数可以调用的东西这里同样都可以调用。

ProcessFunction解析

在源码中我们可以看到,抽象类ProcessFunction继承了AbstractRichFunction,有两个泛型类型参数:I表示Input,也就是输入的数据类型;O表示Output,也就是处理完成之后输出的数据类型。内部单独定义了两个方法:一个是必须要实现的抽象方法.processElement();另一个是非抽象方法.onTimer()。


public abstract class ProcessFunction<I, O> extends AbstractRichFunction {...public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}...
}

ProcessFunction解析

7.2按键分区处理函数(KeyedProcessFunction)

在上节中提到,只有在KeyedStream中才支持使用TimerService设置定时器的操作。所以一般情况下,我们都是先做了keyBy分区之后,再去定义处理操作;代码中更加常见的处理函数是KeyedProcessFunction。

ProcessFunction解析

定时器(Timer)和定时服务(TimeService)

定时器(Timer)和定时服务(TimeService)及例子

7.3 窗口处理函数

除了KeyedProcessFunction,另外一大类常用的处理函数,就是基于窗口的ProcessWindowFunction和ProcessAllWindowFunction了。在第六章窗口函数的介绍中,我们之前已经简单地使用过窗口处理函数了。

窗口处理函数的使用

进行窗口计算,我们可以直接调用现成的简单聚合方法(sum/max/min),也可以通过调用.reduce()或.aggregate()来自定义一般的增量聚合函数(ReduceFunction/AggregateFucntion);而对于更加复杂、需要窗口信息和额外状态的一些场景,我们还可以直接使用全窗口函数、把数据全部收集保存在窗口内,等到触发窗口计算时再统一处理。窗口处理函数就是一种典型的全窗口函数。

窗口处理函数ProcessWindowFunction的使用与其他窗口函数类似,也是基于WindowedStream直接调用方法就可以,只不过这时调用的是.process()。

stream.keyBy( t -> t.f0 ).window( TumblingEventTimeWindows.of(Time.seconds(10)) ).process(new MyProcessWindowFunction())

ProcessWindowFunction解析

ProcessWindowFunction既是处理函数又是全窗口函数。从名字上也可以推测出,它的本质似乎更倾向于“窗口函数”一些。事实上它的用法也确实跟其他处理函数有很大不同。我们可以从源码中的定义看到这一点:
ProcessWindowFunction解析

7.4 应用案例——Top N

案例需求:实时统计一段时间内的出现次数最多的水位。例如,统计最近10秒钟内出现次数最多的两个水位,并且每5秒钟更新一次。我们知道,这可以用一个滑动窗口来实现。于是就需要开滑动窗口收集传感器的数据,按照不同的水位进行统计,而后汇总排序并最终输出前两名。这其实就是著名的“Top N”问题。
案例实现代码


总结

在这里插入图片描述

http://www.lryc.cn/news/410335.html

相关文章:

  • 数据分析概要【数据分析---偏企业】
  • PDF编辑器大分享,这三款加速PDF编辑!
  • Python --Pandas库基础方法(2)
  • 《Programming from the Ground Up》阅读笔记:p75-p87
  • Python面试整理-常用标准库
  • halcon_C#联合halcon打开摄像头
  • 无标题栏窗口通过消息模拟拖动窗口时,无法拖动的一个原因
  • 每天一个数据分析题(四百五十四)- 调研问卷
  • 红酒与家居:打造优雅生活空间
  • 未来生成式 AI 的发展方向,是 Chat 还是 Agent?
  • powershell@日期和时间命令和对象
  • 【Golang 面试 - 基础题】每日 5 题(八)
  • LeetCode 算法:在排序数组中查找元素的第一个和最后一个位置 c++
  • 会话存储、本地存储,路由导航守卫、web会话跟踪、JWT生成token、axios请求拦截、响应拦截
  • strcmp库函数原型
  • 在 Vue.js 项目中延迟加载子组件
  • 何时会用到设计模式、七大设计原则介绍
  • 编程语言发展历史:赋值与相等运算符的变迁历程
  • 求职Leetcode题目(2)
  • 深入探索 Postman:使用 API 性能测试优化你的 Web 服务
  • 校车购票小程序的设计
  • 拯救数据危机!2024年最受欢迎的数据恢复软件评测
  • 记一次因为在html两个地方引入vue.js导致组件注入失败的问题
  • Postman中的智慧重试:API测试用例的错误处理与重试逻辑设置
  • docker部署本地词向量模型
  • 接口自动化中对于文件上传的处理方法
  • Java高频面试题分享
  • kvm虚拟化平台部署
  • 利用arthas热更新class文件
  • 天机学堂 第四天 高并发优化总结