当前位置: 首页 > news >正文

Flink SQL Over 聚合详解

Over 聚合定义(⽀持 Batch\Streaming):**特殊的滑动窗⼝聚合函数,拿 Over 聚合 与 窗⼝聚合 做对⽐。

窗⼝聚合:不在 group by 中的字段,不能直接在 select 中拿到

Over 聚合:能够保留原始字段

注意: ⽣产环境中,Over 聚合的使⽤场景较少。

**应⽤场景:**计算最近⼀段滑动窗⼝的聚合结果数据。

**实际案例:**查询每个产品最近⼀⼩时订单的⾦额总和:

SELECT order_id,order_time,amount,SUM(amount) OVER (PARTITION BY productORDER BY order_timeRANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) AS one_hour_prod_amount_sum
FROM Orders

Over 聚合语法如下:

SELECTagg_func(agg_col) OVER ([PARTITION BY col1[, col2, ...]]ORDER BY time_colrange_definition),...
FROM ...

ORDER BY:必须是时间戳列(事件时间、处理时间);

PARTITION BY:标识了聚合窗⼝的聚合粒度,如上述案例是按照 product 进⾏聚合;

range_definition:标识聚合窗⼝的聚合数据范围,在 Flink 中有两种指定数据范围的⽅式。第⼀种为 按照⾏数聚合 ,第⼆种为 按照时间区间聚合 。

1.时间区间聚合

**案例:**输出一个产品最近⼀⼩时数据的 amount 之和。

结果就是最近⼀⼩时数据的 amount 之和。

CREATE TABLE source_table (order_id BIGINT,product BIGINT,amount BIGINT,order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
) WITH ('connector' = 'datagen','rows-per-second' = '1','fields.order_id.min' = '1','fields.order_id.max' = '2','fields.amount.min' = '1','fields.amount.max' = '10','fields.product.min' = '1','fields.product.max' = '2'
);CREATE TABLE sink_table (product BIGINT,order_time TIMESTAMP(3),amount BIGINT,one_hour_prod_amount_sum BIGINT
) WITH ('connector' = 'print'
);INSERT INTO sink_table
SELECT product,order_time,amount,SUM(amount) OVER (PARTITION BY productORDER BY order_time-- 标识统计范围是⼀个 product 的最近 1 ⼩时的数据RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) AS one_hour_prod_amount_sum
FROM source_table

结果如下:

+I[2, 2021-12-24T22:08:26.583, 7, 73]
+I[2, 2021-12-24T22:08:27.583, 7, 80]
+I[2, 2021-12-24T22:08:28.583, 4, 84]
+I[2, 2021-12-24T22:08:29.584, 7, 91]
+I[2, 2021-12-24T22:08:30.583, 8, 99]
+I[1, 2021-12-24T22:08:31.583, 9, 138]
+I[2, 2021-12-24T22:08:32.584, 6, 105]
+I[1, 2021-12-24T22:08:33.584, 7, 145]
2.⾏数聚合

**案例:**输出一个产品最近 5 ⾏数据的 amount 之和。

CREATE TABLE source_table (order_id BIGINT,product BIGINT,amount BIGINT,order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
) WITH ('connector' = 'datagen','rows-per-second' = '1','fields.order_id.min' = '1','fields.order_id.max' = '2','fields.amount.min' = '1','fields.amount.max' = '2','fields.product.min' = '1','fields.product.max' = '2'
);CREATE TABLE sink_table (product BIGINT,order_time TIMESTAMP(3),amount BIGINT,one_hour_prod_amount_sum BIGINT
) WITH ('connector' = 'print'
);INSERT INTO sink_table
SELECT product,order_time,amount,SUM(amount) OVER (PARTITION BY productORDER BY order_time-- 标识统计范围是⼀个 product 的最近 5 ⾏数据ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) AS one_hour_prod_amount_sum
FROM source_table

结果如下:

+I[2, 2021-12-24T22:18:19.147, 1, 9]
+I[1, 2021-12-24T22:18:20.147, 2, 11]
+I[1, 2021-12-24T22:18:21.147, 2, 12]
+I[1, 2021-12-24T22:18:22.147, 2, 12]
+I[1, 2021-12-24T22:18:23.148, 2, 12]
+I[1, 2021-12-24T22:18:24.147, 1, 11]
+I[1, 2021-12-24T22:18:25.146, 1, 10]
+I[1, 2021-12-24T22:18:26.147, 1, 9]
+I[2, 2021-12-24T22:18:27.145, 2, 11]
+I[2, 2021-12-24T22:18:28.148, 1, 10]
+I[2, 2021-12-24T22:18:29.145, 2, 10]

在⼀个 SELECT 中有多个聚合窗⼝,简化写法如下:

SELECT order_id,order_time,amount,SUM(amount) OVER w AS sum_amount,AVG(amount) OVER w AS avg_amount
FROM Orders
-- 使⽤下⾯⼦句,定义 Over Window
WINDOW w AS (PARTITION BY productORDER BY order_timeRANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW)
http://www.lryc.cn/news/218572.html

相关文章:

  • 【鸿蒙软件开发】ArkUI之容器组件Counter(计数器组件)、Flex(弹性布局)
  • PyTorch入门学习(十一):神经网络-线性层及其他层介绍
  • 农业水土环境与面源污染建模及对农业措施响应
  • 回归预测 | Matlab实现MPA-BP海洋捕食者算法优化BP神经网络多变量回归预测(多指标、多图)
  • 扫地机器人遇瓶颈?科沃斯、石头科技“突围”
  • 基于SSM的防疫信息登记系统设计与实现
  • VBA将字典按照item的值大小排序key
  • MySQL第四讲·如何正确设置主键?
  • K8S知识点(三)
  • c语言刷题(9周)(6~10)
  • SpringBoot集成-阿里云对象存储OSS
  • fastapi-Headers和Cookies
  • 云计算的思想、突破、产业实践
  • 【漏洞复现】Apache_HTTP_2.4.49_路径穿越漏洞(CVE-2021-41773)
  • AD9371 官方例程 NO-OS 主函数 headless 梳理
  • WSL 下载
  • 虚拟dom及diff算法之 —— snabbdom
  • 毅速丨3D打印结合拓扑优化让轻量化制造更容易
  • CentOS 7使用RPM包安装MySQL5.7
  • UI设计工具都哪些常用的,推荐这5款
  • 小饭店点餐系统,小餐馆点餐怎么方便,操作简单的酒店点单软件
  • 面试经典150题——Day31
  • chinese_llama_aplaca训练和代码分析
  • 大数据Doris(十七):关于 Partition 和 Bucket 的数量和数据量的建议
  • 进击的巨人 完结篇 后篇-中文下载
  • 力扣刷题-二叉树-二叉树的非递归遍历
  • react_15
  • 关于ROS的网络通讯方式TCP/UDP
  • Leetcode—421.数组中两个数的最大异或值【中等】明天写一下字典树做法!!!
  • 数智赋能!麒麟信安参展全球智慧城市大会