当前位置: 首页 > news >正文

paimon0.9记录

启动paimon

-- 本地模式演示
bin/start-cluster.sh-- 启动sqlclient
bin/sql-client.sh

示例

-- 创建catalog,每次都要创建,创建一个已经存在的catalog相当于`使用`
CREATE CATALOG fs_catalog WITH ('type'='paimon','warehouse'='file:/data/soft/paimon/catalog'
);-- 创建之后需要use
use catalog fs_catalog ;-- 在paimon模式下,只能创建paimon表和TEMPORARY表。
CREATE TEMPORARY TABLE word_table (word STRING
) WITH ('connector' = 'datagen','fields.word.length' = '1'
);-- 控制批流操作,此处设置为流模式
SET 'execution.runtime-mode' = 'streaming';
-- 设置checkpoint间隔,这影响到数据写入的批次大小(100秒写一次),很重要
SET 'execution.checkpointing.interval' = '10 s';-- 流式写入
INSERT INTO word_count SELECT word, COUNT(*) FROM word_table GROUP BY word;-- 流式查询
SELECT word,cnt / 10000 AS `interval` FROM word_count;-- 控制批流操作,此处设置为批模式
SET 'execution.runtime-mode' = 'batch';
-- 取消checkpoin设置
RESET 'execution.checkpointing.interval';
-- 设置结果展示形式,默认: table,能设为 : tableau、changelog
SET 'sql-client.execution.result-mode' = 'tableau';-- 批式查询
SELECT word,cnt / 10000 AS `interval` FROM word_count;

CATALOG

本地+本地

  • 元数据库在本地,数据在本地
 CREATE CATALOG fs_catalog WITH ('type'='paimon','warehouse'='file:/data/soft/paimon/catalog');

本地+hdfs

  • 元数据库在本地,数据在hdfs
CREATE CATALOG hdfs_fs_catalog WITH ('type'='paimon','warehouse' = 'hdfs://wsl01:8020/paimon/catalog'
);

hive+hdfs

  • 元数据库在hive,数据在hdfs
  • 通过hadoop配置找到hdfs相关信息
  • hive 需要下载(根据flink版本)
    • https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-2.3.10_2.12/1.20.0/flink-sql-connector-hive-2.3.10_2.12-1.20.0.jar
CREATE CATALOG hive_catalog WITH ('type'='paimon','metastore'='hive','warehouse' = 'hdfs://wsl01:8020/paimon/hive','hive-conf-dir'='/data/soft/hive/apache-hive-3.1.2-bin/conf','hadoop-conf-dir'='/data/soft/hadoop/hadoop-3.1.3/etc/hadoop'); 

jdbc+hive

 CREATE CATALOG jdbc_catalog WITH ('type'='paimon','metastore'='jdbc','uri'='jdbc:mysql://wsl:3306/paimon?useUnicode=true&characterEncoding=UTF-8&user=root&password=root','warehouse' = 'hdfs://wsl01:8020/paimon/hive');

表目录

Schema

  • 记录表结构的变化历史

Snapshot

  • 快照表,paimon数据产生一次变化就会生成一个快照,有三种生成快照的方式
    • 批处理,每次sql更新操作就会形成一个快照
    • 流处理,每次checkpoint就会触发一次数据落地,形成一个快照
    • compaction,每次触发合并都会形成一个快照
      • compaction触发频率基于change-log模式
  • 快照是会自动删除的,默认只保留1小时,但是最少会保留10个版本,1小时到了,但是快照不足10个版本也不会删除,快照没了,但是数据还在,只不过不能通过snapshot-id来查询指定快照版本的数据了
选项必需的默认类型描述
snapshot.time-retainedNo1 hDuration已完成快照的最长时间保留。
snapshot.num-retained.minNo10Integer要保留的已完成快照的最小数量。
snapshot.num-retained.maxNoInteger.MAX_VALUEInteger要保留的已完成快照的最大数量。

Manifest

  • 清单:包括多个数据文件或更改日志文件。

Bucket

  • 桶是最小的数据目录,每个桶就是一个lsm tree,如果有分区,在分区文件夹下建分桶文件夹
  • 每个存储桶(文件夹)中的建议数据大小约为 200MB - 1GB
  • 是数据存储的真正目录
  • 有两种模式
    • 动态分桶:根据每个桶最大数据行数和最大数据量动态分桶
    • 固定分桶 :根据主键的hash值进行分桶,没有主键就一个桶

Partition

  • 分区文件夹,名称不固定,根据你的分区字段和值确定
  • 来一条数据,首先要找到他的分区,然后找到他的分桶,而分区好不好找取决去,分区键和主键的关系,如果主键包含分区键,那通过主键可以直接找到分区,如果主键不包含分区键,就需要借助其他数据结构存储主键到分区的映射
    • 同分区更新表,主键包含所有分区字段,通过主键就可以确认分区
    • 跨分区更新表,主键不完整包含所有分区字段,需要借助索引
  • 如果有分区,那么分桶文件夹会存在于对应分区文件夹下

主键表

  • 主键表主键如果重复会根据策略进行更新操作,表中的数据主键是唯一的
  • 由于数据是分区、分桶目录存放的,因此数据有一个寻址过程
CREATE TABLE my_table_pk (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
);

分桶

动态分桶

  • 每个桶的数据不应过大,因此当达到行数限制或者存储限制,就会自动创建一个新的桶。
  • 由于动态分桶无法通过固定算法计算主键和分桶的关系,因此需要维护一个索引文件记录,在写入的时候要维护index,在读取的时候要读取index,但是影响都还可以接受

固定分桶

  • 在建表的时候就需要指定桶的数量,默认就会初始化出n个分桶文件夹
  • 根据主键的hash%n,计算主键和分桶的关系,不需要额外维护索引表,效率高
  • 固定分桶的数据应当不会经常变化,如果数据变化较大导致需要缩放分桶时是一个比较复杂的操作
桶的缩放
  • ALTER TABLE 仅修改表的元数据,不会重新组织或重新格式化现有数据。必须通过 INSERT OVERWRITE 实现重新组织现有数
ALTER TABLE my_table SET ('bucket' = '4');INSERT OVERWRITE my_table PARTITION (dt = '2022-01-01')
SELECT * FROM my_table where dt = '2022-01-01'

分区表

  • 在进行更新的时候,需要根据这条数据的目录(分区/分桶),因此可以将分区表分为同分区更新表和跨分区更新表
  • 同分区表在数据更新时只需要借助索引文件(index),将主键和桶进行绑定,不需要管主键和分区的关系,因为主键中包含分区,在数据写入是会将索引文件加载到内存,因此对于写入的效率影响几乎为0,但是需要一部分内存,不过在写入过程结束后,会释放这部分内存
  • 跨分区表在数据更新时不光需要索引文件,将主键和桶进行绑定,还需要将主键和分区绑定,因此对于跨分区表,paimon采用rocksdb进行主键-分区-分桶的绑定,这样必然会导致数据写入过程的效率受到影响(1,rocksdb加载过程,2,key-value查询过程)
-- 同分区
CREATE TABLE my_table_pt_tonfenqu (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh);-- 跨分区
CREATE TABLE my_table_pt_kuafenqu(user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, item_id );

changelog配置

CREATE TABLE my_table_pt_changelog(user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, item_id )
with('change-log'='input'
);

none

  • 批写,批读的表适用

input

  • cdc采集表适用
  • 有完整的修改逻辑适用

lookup

  • 流式场景适用
  • 没有完整的修改逻辑
  • 数据时延要求高的场景适用

full-compaction

  • 流式场景适用
  • 没有完整的修改逻辑
  • 数据时延要求低的场景适用

非主键表


INSERT/UPDATE/DELTE

覆盖

  • 动态分区覆盖:select结果中包含的分区才会被影响,当查询结果为空时,没有查询到任何有效分区,因此不会做任何操作。
  • 静态分区覆盖:不管你的查询结果,默认覆盖所有分区,当查询结果为空时,会将所有分区数据清空。静态分区覆盖写入相当于,先做truncate动作,然后再做写入操作
-- 动态分区(默认)覆盖写入
insert overwrite mytable as select ...
-- 或者
insert overwrite mytable/*+ OPTIONS('dynamic-partition-overwrite' = 'true') */ as select ...-- 静态分区(dynamic-partition-overwrite = false)覆盖写入
insert overwrite mytable/*+ OPTIONS('dynamic-partition-overwrite' = 'false') */ as select ...
  • 覆盖写入可以当作truncate来使用
insert overwrite mytable/*+ OPTIONS('dynamic-partition-overwrite' = 'false') */ as select * from mytable where 1=2;
  • 指定分区:可以通过 PARTITION (key1 = value1, key2 = value2, …)指定分区, 当指定分区后,覆盖写入(上述操作)只影响当前分区,否则影响所有分区
-- 动态覆盖指定分区
INSERT OVERWRITE my_table PARTITION (key1 = value1, key2 = value2, ...) SELECT ...-- 静态清空指定分区(指定分区后,查询值中不能包含分区列)
INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite'='false') */ 
PARTITION (k0 = 0) SELECT k1, v FROM my_table WHERE false;

更新

  • 主键表,INSERT相同主键,会根据merge-engine策略,对指定主键的行数据进行更新操作
  • 新版本也支持UPDATE语句进行更新
    • 1、必须是主键表
    • 2、聚合引擎必须是 deduplicate or partial-update
    • 3、flink版本必须1.7及以上版本
    • 4、只适用于批处理

删除

  • 新版本支持DELETE语句
    • 1、必须是主键表
    • 2、聚合引擎必须是 deduplicate
    • 3、flink版本必须1.7及以上版本
    • 4、只适用于批处理

SELECT

批查询

  • 指定快照版本,查询指定的快照,查询快照不会展示-D的数据
SET 'execution.runtime-mode' = 'batch';
RESET 'execution.checkpointing.interval';-- 根据快照版本
SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */;
-- 根据快照时间
SELECT * FROM t /*+ OPTIONS('scan.timestamp' = '2023-12-09 23:09:12') */;
-- 根据tag
SELECT * FROM t /*+ OPTIONS('scan.tag-name' = 'my-tag') */;
-- 根据watermark
SELECT * FROM t /*+ OPTIONS('scan.watermark' = '1678883047356') */; 
  • 根据快照范围查询
-- incremental between snapshot ids
SELECT * FROM t /*+ OPTIONS('incremental-between' = '12,20') */;-- incremental between snapshot time mills
SELECT * FROM t /*+ OPTIONS('incremental-between-timestamp' = '1692169000000,1692169900000') */;

流查询

SET 'execution.runtime-mode' = 'streaming';-- 只读取最新的数据
SELECT * FROM t /*+ OPTIONS('scan.mode' = 'latest') */;
-- 在当数据的基础上,只读取最新的数据
SELECT * FROM t /*+ OPTIONS('scan.mode' = 'latest-full') */;-- 从指定快照开始读取最新数据(不包含指定快照)
SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '1') */;
-- 从指定快照开始读取最新数据(包含指定快照)
SELECT * FROM t_partition /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '1') */;-- 根据快照时间
SELECT * FROM t /*+ OPTIONS('scan.timestamp' = '2023-12-09 23:09:12') */;-- 根据data文件产生时间
SELECT * FROM t /*+ OPTIONS('scan.file-creation-time-millis' = '1678883047356') */;

消费查询

  • 当 stream 读取 Paimon 表时,当上一个作业停止时,新启动的作业可以从上一个进度继续消耗
  • 读取策略有两种
    • ‘consumer.mode’ = ‘at-least-once’ 非对齐快照,至少一次语义,不丢,可能会重复,读取效率快
    • ‘consumer.mode’ = ‘exactly-once’ 对齐快照,精确一次语义,受限于checkpoint间隔,一次checkpoint出一次数据,效率慢
SET 'execution.checkpointing.interval'='60 s'
SELECT * FROM t /*+ OPTIONS('consumer-id' = 'myid', 'consumer.expiration-time' = '1 h', 'consumer.mode' = 'exactly-once') */;
  • 您可以重置具有指定 Consumer ID 和下一个快照 ID 的 Consumer,并删除具有指定 Consumer ID 的 Consumer。首先,您需要使用此 Consumer ID 停止流式处理任务,然后执行重置 Consumer Action 作业。
-- 删除consumerbin/flink run \lib/paimon-flink-action-0.9.0.jar \reset-consumer \--warehouse file:///data/soft/paimon/catalog \--database default \ --table t \--consumer_id myid-- 不删除consumer,将consumer指针切换到指定snapshot,添加下边这行
--next_snapshot <next-snapshot-id>

Lookup Join

  • lookup join 常用于关联维表、字典表,丰富主表的数据
  • lookup join 有一个明显的问题,当关联的字典表,关联数据缺失的时候,主表数据也会丢失
  • paimon也可以作为维表、字典表
CREATE CATALOG fs_catalog WITH ('type'='paimon','warehouse'='file:/data/soft/paimon/catalog');use catalog fs_catalog;-- Create a table in paimon catalog
CREATE TABLE customers (id INT PRIMARY KEY NOT ENFORCED,name STRING,country STRING,zip STRING
);CREATE TEMPORARY TABLE orders (order_id INT,total INT,customer_id INT,proc_time AS PROCTIME()
) WITH ('connector' = 'kafka','topic' = 'ws_test','properties.bootstrap.servers' = '172.16.11.95:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'latest-offset','format' = 'csv'
);SET 'sql-client.execution.result-mode' = 'tableau'; 
SET 'execution.checkpointing.interval'='10 s';SELECT o.order_id, o.total, c.country, c.zip
FROM orders AS o
-- lookup join  关联维表,一定要用左关联,否则维度信息不存在时,会导致主流丢失
LEFT JOIN customers
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;

重试lookup

  • 除了使用left join 防止主流丢失外,还可以使用retry-strategy 需要flink 1.16+ ,会阻塞主流,重试600次后 才能接受下一条数据
SELECT /*+ LOOKUP('table'='c', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='1s', 'max-attempts'='600') */
o.order_id, o.total, c.country, c.zip
FROM orders AS o
LEFT  JOIN customers
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
  • 如果主表是cdc流,设置允许乱序不生效,异步重试不会异步,依然会阻塞
  • 异步重试,但是得’output-mode’=‘allow_unordered’,允许乱序
  • 通过audit_log系统表,将 CDC 流转换为 append 流即可
SELECT /*+ LOOKUP('table'='c', 'retry-predicate'='lookup_miss', 'output-mode'='allow_unordered', 'retry-strategy'='fixed_delay', 'fixed-delay'='1s', 'max-attempts'='600') */
o.order_id, o.total, c.country, c.zip
FROM orders AS o
LEFT  JOIN customers /*+ OPTIONS('lookup.async'='true', 'lookup.async-thread-number'='16') */
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;

lookup优化

  • 针对于每个分区都是全量数据的情况(按天分区,每天都是全量字典数据),lookup join 可以指定只扫描最新分区
SELECT o.order_id, o.total, c.country, c.zip
FROM orders AS o
LEFT JOIN customers /*+ OPTIONS('lookup.dynamic-partition'='max_pt()', 'lookup.dynamic-partition.refresh-interval'='1 h') */
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
  • 对于维表,可以开启query service,来优化lookup join效率,关联维表时会优先使用query service提供的服务
<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-0.9.0.jar \query_service \--warehouse <warehouse-path> \--database <database-name> \--table <table-name> \[--parallelism <parallelism>] \[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]

ALTER

修改表

-- 修改表属性
ALTER TABLE my_table SET ('write-buffer-size' = '256 MB'
);
ALTER TABLE my_table RESET ('write-buffer-size');-- 修改表名
ALTER TABLE my_table RENAME TO my_table_new;

修改字段

-- 新增字段
ALTER TABLE my_table ADD (c1 INT, c2 STRING);
-- 新增字段,指定位置
ALTER TABLE my_table ADD c INT FIRST;
-- 新增字段,指定位置
ALTER TABLE my_table ADD c INT AFTER b;
-- 删除字段
ALTER TABLE my_table DROP (c1, c2);
-- 修改字段
ALTER TABLE my_table MODIFY buy_count BIGINT COMMENT 'buy count';
-- 字段改名
ALTER TABLE my_table RENAME c0 TO c1;

Watermark

-- 新增watermark
ALTER TABLE my_table ADD (ts AS TO_TIMESTAMP(log_ts) AFTER log_ts,WATERMARK FOR ts AS ts - INTERVAL '1' HOUR
);
-- 删除watermark
ALTER TABLE my_table DROP WATERMARK;
-- 修改watermark
ALTER TABLE my_table MODIFY WATERMARK FOR ts AS ts - INTERVAL '2' HOUR

CDC

  • 支持表结构变更,但是支持的有限,
    • 不支持重命名表、删除字段,
    • 修改字段名会重新添加新字段
    • 修改字段类型时存储只能放大,不能缩小。varchar(30)->varch(20) ❌
  • 支持添加计算列 --computed_column
  • 字段类型对应
    • tinyint1-not-bool:强制tinyint(1) 转tinyint(默认转boolean)
    • to-string:所有字段类型 都变成string
  • 如果不设置,默认checkpoint间隔3分钟
    • -Dexecution.checkpointing.interval=‘180 s’

MYSQL-CDC

flink-sql-connector-mysql-cdc-3.1.x.jar
mysql-connector-java-8.0.27.jar
放到lib目录下,重启cluster

同步表

mysql表主键id,paimon重新定义主键为id,name


bin/flink run \
lib/paimon-flink-action-0.9.0.jar \
mysql_sync_table \
--warehouse file:///data/soft/paimon/catalog \
--database mysql_cdc \
--primary_keys 'id,name' \
--table demo_1 \
--mysql_conf hostname=wsl \
--mysql_conf username=root \
--mysql_conf password=root \
--mysql_conf database-name=test \
--mysql_conf table-name='demo3'

mysql表主键id,paimon重新定义主键为id,name,paimon定义分区键为 name

bin/flink run \
lib/paimon-flink-action-0.9.0.jar \
mysql_sync_table \
--warehouse file:///data/soft/paimon/catalog \
--database mysql_cdc \
--primary_keys 'id,name' \
--partition_keys 'name' \
--table demo_2 \
--mysql_conf hostname=wsl \
--mysql_conf username=root \
--mysql_conf password=root \
--mysql_conf database-name=test \
--mysql_conf table-name='demo3'

mysql表主键id,paimon重新定义主键为id,partition_col(计算列),paimon定义分区键为 partition_col
这样每天新增的数据会进入新的分区,但是不能修改,因为修改的话会-D +I,不同分区会重复

bin/flink run \
lib/paimon-flink-action-0.9.0.jar \
mysql_sync_table \
--warehouse file:///data/soft/paimon/catalog \
--database mysql_cdc \
--primary_keys 'id,partition_col' \
--partition_keys 'partition_col' \
--computed_column 'partition_col=date_format(create_date,'yyyy-MM-dd')' \
--table demo_3 \
--mysql_conf hostname=wsl \
--mysql_conf username=root \
--mysql_conf password=root \
--mysql_conf database-name=test \
--mysql_conf table-name='demo3' \
--table_conf bucket=-1 \
--table_conf changelog-producer=input \
--table_conf sink.parallelism=1

同步库

http://www.lryc.cn/news/500175.html

相关文章:

  • Java 中 List 接口的学习笔记
  • 【原生js案例】webApp实现鼠标移入移出相册放大缩小动画
  • LVGL9 定时器模块
  • Qt学习笔记第51到60讲
  • 网页设计--axios作业
  • SpringBoot 整合 Avro 与 Kafka 详解
  • 若依 ruoyi VUE el-select 直接获取 选择option 的 label和value
  • 大数据-155 Apache Druid 架构与原理详解 数据存储 索引服务 压缩机制
  • 修改MySQL存储路径
  • Git常用的命令【提交与回退】
  • 详解:HTTP/HTTPS协议
  • 0.96寸OLED---STM32
  • 保姆级教学 uniapp绘制二维码海报并保存至相册,真机正常展示图片二维码
  • 常用Vim操作
  • 【C#】NET 9中LINQ的新特性-CountBy
  • Trimble X9三维激光扫描仪高效应对化工厂复杂管道扫描测绘挑战【沪敖3D】
  • 【数据结构】文件和外部排序
  • 新手学习:网页前端、后端、服务器Tomcat和数据库的基本介绍
  • 机器学习贝叶斯模型原理
  • 【C++】实现100以内素数的求解
  • Python 浏览器自动化新利器:DrissionPage,让网页操作更简单!
  • Rust学习笔记_13——枚举
  • Postgresql 格式转换笔记整理
  • AI开发:卷积神经网络CNN原理初识,简易例程 - 机器学习
  • 详细介绍vue的递归组件(重要)
  • 【单片机基础知识】基础知识(CortexM系列、STM32系统框架、存储器映射、寄存器映射)
  • yolov5导出命令
  • RabbitMQ的常用术语介绍
  • Docker魔法:用docker run -p轻松开通容器服务大门
  • 【后端面试总结】Redis过期删除策略