当前位置: 首页 > news >正文

Apache Paimon 使用之 Lookup Joins 解析

Lookup Join 是流式查询中的一种 Join,Join 要求一个表具有处理时间属性,另一个表由lookup source connector支持。

Paimon支持在主键表和附加表上进行Lookup Join。

a) 准备

创建一个Paimon表并实时更新它。

-- Create a paimon catalog
CREATE CATALOG my_catalog WITH ('type'='paimon','warehouse'='hdfs://nn:8020/warehouse/path' -- or 'file://tmp/foo/bar'
);USE CATALOG my_catalog;-- Create a table in paimon catalog
CREATE TABLE customers (id INT PRIMARY KEY NOT ENFORCED,name STRING,country STRING,zip STRING
);-- Launch a streaming job to update customers table
INSERT INTO customers ...-- Create a temporary left table, like from kafka
CREATE TEMPORARY TABLE Orders (order_id INT,total INT,customer_id INT,proc_time AS PROCTIME()
) WITH ('connector' = 'kafka','topic' = '...','properties.bootstrap.servers' = '...','format' = 'csv'...
);
b) Normal Lookup(正常查找)

可以在lookup join query中使用customers

-- enrich each order with customer information
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN customers
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
c) Retry Lookup(重试查找)

在 Flink 1.16+ ,如果Orders记录(主表)没有 Join 上,是因为相应的customers数据(查找表)尚未准备就绪,可以使用Flink的延迟重试策略进行查找。

-- enrich each order with customer information
SELECT /*+ LOOKUP('table'='c', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='1s', 'max-attempts'='600') */
o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN customers
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
d) Async Retry Lookup(异步重试查找)

同步重试的问题是,一条记录没返回会阻塞后续记录,导致整个作业被阻塞,可以使用async + allow_unordered以避免阻塞。

-- enrich each order with customer information
SELECT /*+ LOOKUP('table'='c', 'retry-predicate'='lookup_miss', 'output-mode'='allow_unordered', 'retry-strategy'='fixed_delay', 'fixed-delay'='1s', 'max-attempts'='600') */
o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN customers /*+ OPTIONS('lookup.async'='true', 'lookup.async-thread-number'='16') */
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;

如果主表(Orders)是CDC流,allow_unordered将被Flink SQL忽略(仅支持附加流),可能阻塞流式任务,可以尝试使用Paimon的audit_log系统表功能(将CDC流转换为附加流)。

8)Query Service

可以运行Flink流作业来启动表的查询服务,当QueryService存在时,Flink Lookup Join将优先从中获取数据,这将有效地提高查询性能。

Flink SQL

CALL sys.query_service('database_name.table_name', parallelism);

Flink Action

<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.7.0-incubating.jar \query_service \--warehouse <warehouse-path> \--database <database-name> \--table <table-name> \[--parallelism <parallelism>] \[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]
http://www.lryc.cn/news/318981.html

相关文章:

  • GO语言-切片底层探索(下)
  • 物理隔离条件下,如何安全高效地进行内外网文件导入导出?
  • 代码随想录 贪心算法-难度题目-区间问题
  • 地理数据 vs. 3D数据
  • Redis删除
  • 力扣细节题:字符串中的最大奇数
  • Unity PS5开发 天坑篇 之 申请开发者与硬件部署01
  • 十四届蓝桥杯省赛Java B组 合并区域
  • SpringBoot高级
  • 机试:偶数分解
  • 一周学会Django5 Python Web开发-Jinja3模版引擎-安装与配置
  • python前端开发
  • web学习笔记(三十三)
  • flask库
  • 专业无网设备如何远程运维?向日葵远程控制能源场景案例解析
  • 基于YOLOv8/YOLOv7/YOLOv6/YOLOv5的稻田虫害检测系统详解(深度学习+Python代码+UI界面+训练数据集)
  • 实现upt下客户端用tftp文件传输协议编写客户端发送下载文件
  • 什么软件可以改ip地址
  • C#,文字排版的折行问题(Word-wrap problem)的算法与源代码
  • VUE+VScode+elementUI开发环境
  • 第十四届蓝桥杯省赛真题 Java A 组【原卷】
  • 可视化展示与交互编辑:探索3D Web轻量化平台HOOPS WEB Platform在BIM中的新可能性
  • Linux(centos)环境下安装Nginx的步骤文档
  • AI毕业论文降重GPTS,避免AI检测,高效完成论文
  • 什么是线程死锁?形成死锁的四个必要条件是什么?如何避免线程死锁?
  • webpack一些常用的Loader和Plugin
  • SpringCloud Bus 消息总线
  • 汽车屏类产品(五):仪表Cluster常用芯片i.MX117x
  • SQLiteC/C++接口详细介绍之sqlite3类(三)
  • Xcode调试Qt 源码