当前位置: 首页 > news >正文

Flink:Temporal Table Function(时态表函数)和 Temporal Join

《大数据平台架构与原型实现:数据中台建设实战》博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧二维码进入京东手机购书页面。

我们知道,时态表(确切地说应该是版本表)提供了回溯历史的能力,也就是能读取一条记录过去某个时刻所对应的值。要想查询版本表在过去某个时刻对应的值,我们得在查询时把这个时间作为参数传递给版本表,但这个时间参数绝不会是一个 where 条件,它是另一个维度(时间维度)上的参数,那么用怎样的形式才能把这个时间参数合理地表达到查询中呢? Flink 使用了 UDF 的形式,主要思路就是:注册一个 UDF 来指代一张版本表,表名不能有参数,但函数可以有,这时把想访问版本表的目标时间点作为参数传给这个UDF,返回的就是当时表中的数据了,这个 UDF 就被称作:Temporal Table Function!

例如:以下代码将汇率表 currency_rates 注册成了时态表函数 rates。(注意:目前在 Flink SQL 中是不支持定义 Temporal Table Function 的!只能以代码方式定义,但是 SQL 中可以定义 Temporal Table DDL

rates = tEnv.from("currency_rates").createTemporalTableFunction("update_time", "currency")tEnv.createTemporarySystemFunction("rates", rates); 

然后,使用下面的 SQL 就能查询出在 11:05 时的汇率信息了:

SELECT * FROM rates('11:05');

可以说:是时态表函数是访问时态表的“入口”,是时态表的“正确打开方式”!

但是,像上面那样直接查询某一时刻版本表上的数据的情形其实并不多,真正常见是:其他表主动 Join 一张时态表,期望获得表中记录所代表的事件在发生时刻时态表中的当时的数据,就是我们曾经解释的“当时对当时”的需求场景(典型案例:Join 汇率表计算订单当时的总价):

-- 基于时态表函数实现的Join,由于指定的 order_time 是一个事件时间
-- 所以该SQL实现的是:基于事件时间的 Temporal Join,也就是 Join 事件发生时刻关联表当时的值
SELECTSUM(amount * rate) AS amount
FROMorders,LATERAL TABLE (rates(order_time))
WHERErates.currency = orders.currency

上面的 SQL 就是标准的 Temporal Table Function Join 语法,SQL 中使用了关键字 LATERAL TABLE,填入一个 Temporal Table Function / 时态表函数 rates,设定传给时态表的时间属性(基于什么时间查找时态表上的版本)order_time

这里,官方文档其实隐去了一个背景信息,order_time 其实是 orders 表的事件时间属性,所以,上述使用 Temporal Table Function Join 语法实现的是:基于事件时间的 Temporal Join,这种 Join 还可以通过 FOR SYSTEM_TIME AS OF 关键字实现, Temporal Table Function Join 语法除了能实现基于事件时间的 Temporal Join 外,还能实现基于处理时间的 Temporal Join 了,语法不变,只要将传给 rates 函数的时间属性从一个事件时间改为一个处理时间就可以了,就像 [ 官方文档 ] 给出的示例中那样,使用了一个 o_proctime 字段,这个字段是 orders 表的处理时间属性:

-- 基于时态表函数实现的Join,由于指定的 o_proctime 是一个处理时间
-- 所以该SQL实现的是:基于处理时间的 Temporal Join,也就是总是 Join 关联表当前最新状态的数据
SELECTo_amount, r_rate
FROMOrders,LATERAL TABLE (rates(o_proctime))
WHEREr_currency = o_currency
http://www.lryc.cn/news/310480.html

相关文章:

  • Go语言中的时间控制:定时器技术详细指南
  • 面试笔记系列六之redis+kafka+zookeeper基础知识点整理及常见面试题
  • Golang动态高效JSON解析技巧
  • 双重检验锁
  • 【RISC-V 指令集】RISC-V DSP 扩展指令集介绍(一)
  • RocketMQ - CentOS 7.x 安装单机版并测试
  • [JavaWeb玩耍日记]HTML+CSS+JS快速使用
  • 如何使用ArcGIS Pro创建最低成本路径
  • Neoverse CSS N3:实现市场领先能效的最快途径
  • JavaScript实现的计时器效果
  • 仿函数(Functor(c++))
  • 智能汽车加速车规级存储应用DS2431P+TR 汽车级EEPROM 存储器IC
  • js json转换成字符串
  • Linux笔记--基本操作指令
  • 论文阅读:基于超像素的图卷积语义分割(图结构数据)
  • 记录踩过的坑-macOS下使用VS Code
  • 30天JS挑战(第十四天)------数据的复制
  • 【洛谷 P8682】[蓝桥杯 2019 省 B] 等差数列 题解(数学+排序+辗转相除法)
  • Linux:kubernetes(k8s)部署CNI网络插件(4)
  • docker save 命令 docker load 命令 快速复制容器
  • Apache Flink连载(三十七):Flink基于Kubernetes部署(7)-Kubernetes 集群搭建-3
  • 【机器学习】实验6,基于集成学习的 Amazon 用户评论质量预测
  • 【寸铁的刷题笔记】图论、bfs、dfs
  • vue2 + axios + mock.js封装过程,包含mock.js获取数据时报404状态的解决记录,带图文,超详细!!!
  • 对象变更记录objectlog工具(持续跟新)
  • 平衡二叉树,二叉树的路径,左叶子之和
  • Sodinokibi勒索病毒最新变种,解密工具更新到2.0版本
  • css 鼠标移入放大的效果
  • Transformer模型分布式并行通信量浅析
  • PMP考试之20240304