当前位置: 首页 > news >正文

FlinkSql-Temporal Joins-Lookup Join

说明

在 Flink SQL 中,Temporal Joins 是一种常见的数据关联操作,特别适用于处理包含时间维度的数据。Lookup Join 是 Temporal Joins 的一种类型,它允许将流数据与维表数据进行关联。使用场景如下:

  1. 实时维度关联: 当您有一个实时的流数据流,并且需要与维表进行关联,以获取维度信息时,Lookup Join 是一个很有用的工具。例如,在电商领域,您可以将实时的订单流与商品维表进行关联,以获取商品的详细信息,如名称、价格、类别等。

  2. 动态数据关联: 如果您的维表数据是动态变化的,例如产品信息或用户配置信息,而且您希望在流数据处理过程中及时地获取最新的维度信息,Lookup Join 可以帮助您实现这一点。您可以将流数据与动态更新的维表进行关联,以确保关联的维度信息始终是最新的。

  3. 事件时间关联: Lookup Join 支持基于事件时间的关联操作,这意味着您可以根据事件发生的时间点来进行关联。这在需要处理时间窗口或事件序列的场景中特别有用。例如,您可以将实时的用户行为数据与用户配置信息进行关联,以便根据用户行为的时间戳获取相应的用户配置。

  4. 高效的维度查询: Lookup Join 通过将维表数据加载到内存中进行索引,提供了高效的维度查询能力。这使得在流数据处理过程中通过内存索引快速查找和关联维度数据成为可能,而无需频繁地访问外部存储系统。

总的来说,Lookup Join 适用于需要实时、动态和高效地关联流数据与维度数据的场景。它可以帮助您获取最新的维度信息,并在流数据处理过程中进行高效的维度查询和关联操作。

假设您有以下两个数据流:

  1. 订单流(Orders Stream)包含实时生成的订单数据,其中每个订单都包含商品ID(productId)和订单数量(quantity)。
  2. 商品维表(Products Dimension Table)包含商品的详细信息,包括商品ID(productId)、商品名称(productName)和商品价格(price)。

您可以使用 Lookup Join 将订单流与商品维表进行关联,以获取订单中商品的详细信息。以下是一个使用 Flink SQL 的示例:

-- 创建订单流表
CREATE TABLE orders (productId INT,quantity INT,orderTime TIMESTAMP(3),WATERMARK FOR orderTime AS orderTime - INTERVAL '5' SECOND
) WITH ('connector.type' = 'kafka','connector.version' = 'universal','connector.topic' = 'orders_topic','connector.properties.bootstrap.servers' = 'kafka:9092','format.type' = 'json'
);-- 创建商品维表
CREATE TABLE products (productId INT,productName STRING,price DECIMAL(10, 2),PRIMARY KEY (productId) NOT ENFORCED
) WITH ('connector.type' = 'jdbc','connector.url' = 'jdbc:mysql://mysql:3306/my_database','connector.table' = 'products','connector.driver' = 'com.mysql.jdbc.Driver','connector.username' = 'username','connector.password' = 'password'
);-- 执行 Lookup Join 操作
CREATE TABLE enrichedOrders AS
SELECT o.*, p.productName, p.price
FROM orders AS o
JOIN products FOR SYSTEM_TIME AS OF o.orderTime AS p
ON o.productId = p.productId;

在上述示例中,我们首先创建了订单流表和商品维表。订单流表从 Kafka 主题中读取实时订单数据,商品维表通过 JDBC 连接到 MySQL 数据库中的商品表。

然后,我们执行 Lookup Join 操作,将订单流表 orders 与商品维表 products 关联起来。通过 JOIN products FOR SYSTEM_TIME AS OF o.orderTime,我们将商品维表与订单流进行关联,并根据订单的事件时间 orderTime 来获取相应时间点的维度信息。

最后,我们将关联后的结果存储在 enrichedOrders 表中,其中包含了订单流的所有字段以及关联的商品名称和价格。

通过这个示例,您可以看到如何使用 Lookup Join 将流数据与维度数据进行关联,以获取实时的维度信息,丰富您的数据分析和处理过程。在实际应用中,您需要根据具体的数据源和业务需求进行相应的配置和调整。

实例demo

--模拟stream表
CREATE view kafka_mock as
select '123' as key, proctime() as _proc; -- proctime()作为处理时间-1,proctime()数据类型为TIMESTAMP_LTZ(3)--可以直接查询的外部系统
CREATE TABLE es_dim(p_key     STRING,p_type    STRING
)
with ('connector' = 'elasticsearch-6','index' = 'index01','document-type' = 'type01','hosts' = 'http://xxx:9200','format' = 'json'
);SELECTa.key,a._proc,CAST(a._proc AS TIMESTAMP(3)) as _proc_local
FROM kafka_mock a
join es_dim FOR SYSTEM_TIME AS OF a._proc as b --利用时态表,关联stream表-2
on a.key = b.p_key

时态表join-查找join
参考:Lookup Join

http://www.lryc.cn/news/254940.html

相关文章:

  • STM32之定时器
  • Canvas鼠标画线
  • Docker 安装部署 Sentinel Dashboard
  • 第21章网络通信
  • 一、运行时数据区域
  • OCR原理解析
  • 使用com组件编辑word
  • 国产Euler(欧拉)系统安装docker
  • Linux 进程控制
  • [ Linux Audio 篇 ] 音频开发入门基础知识
  • 关于高校电子邮件系统开通双因素认证的经验分享-以清华大学为例
  • 「Swift」类淘宝商品瀑布流展示
  • 道可云会展元宇宙平台全新升级,打造3D沉浸式展会新模式
  • Ant Design Pro初始化报错
  • 第16届中国R会议暨2023X-AGI大会开幕,和鲸科技分享ModelOps在数据科学平台中的实践与应用
  • ❀My学习Linux命令小记录(12)❀
  • MySQL学习day05
  • JAVA面试题7
  • 好用免费的AI换脸5个工具
  • 【Linux】公网远程访问AMH服务器管理面板
  • 随笔-这都是命吗
  • 优化网站性能,从容谈CDN加速的部署与运维
  • JavaScript-事件
  • linux的磁盘管理
  • qt-C++笔记之主线程中使用异步逻辑来处理ROS事件循环和Qt事件循环解决相互阻塞的问题
  • 【Docker】从零开始:18.使用Dockerfile构造自己的KingbaseES数据库镜像
  • YOLOv8独家改进《全网无重复 YOLOv8专属打造》感知聚合SERDet检测头:简单高效涨点,即插即用|检测头新颖改进
  • Android Studio中Flutter项目找不到Android真机设备解决方法
  • Vue 静态渲染 v-pre
  • C语言基础概念考查备忘 - 标识符、关键字、预定义标识符、语法检查、语义检查 ... 左值、右值、对象、副作用、未定义行为、sizeof是什么等等