Hudi SQL DDL
本文介绍Hudi在 Spark 和 Flink 中使用SQL创建和更改表的支持。
1.Spark SQL 创建hudi表
1.1 创建非分区表
使用标准CREATE TABLE语法创建表,该语法支持分区和传递表属性。
CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
[(col_name data_type [COMMENT col_comment], ...)]
[COMMENT table_comment]
[PARTITIONED BY (col_name, ...)]
[ROW FORMAT row_format]
[STORED AS file_format]
[LOCATION path]
[TBLPROPERTIES (property_name=property_value, ...)]
[AS select_statement];
创建一个非分区表就像创建一个常规表一样简单。
-- create a Hudi table
CREATE TABLE IF NOT EXISTS hudi_table (
id INT,
name STRING,
price DOUBLE
) USING hudi;
SparkSQL和Hudi字段类型比较
Spark | Hudi | Notes |
boolean | boolean | |
byte | int | |
short | int | |
integer | int | |
long | long | |
date | date | |
timestamp | timestamp | |
float | float | |
double | double | |
string | string | |
decimal | decimal | |
binary | bytes | |
array | array | |
map | map | |
struct | struct | |
char | not supported | |
varchar | not supported | |
numeric | not supported | |
null | not supported | |
object | not supported |
1.2 创建分区表
分区表可以通过添加partition by子句来创建。分区有助于根据分区列将数据组织到多个文件夹中。它还可以通过限制扫描的元数据、索引和数据的数量来帮助加快查询和索引查找。
CREATE TABLE IF NOT EXISTS hudi_table_partitioned (
id BIGINT,
name STRING,
dt STRING,
hh STRING
) USING hudi
TBLPROPERTIES (
type = 'cow'
)
PARTITIONED BY (dt);
还可以通过提供逗号分隔的字段名来创建由多个字段分区的表。例如,“partitioned by dt,hh”
1.3 创建包含键和排序字段的表
表使用键跟踪表中的每个记录。Hudi 为每个新记录自动生成了一个高度压缩的键。如果要使用现有字段作为关键字,可以设置primaryKey选项。通常,这还要配置preCombineField选项,以处理传入写入中具有相同键的无序数据和潜在重复记录。
CREATE TABLE IF NOT EXISTS hudi_table_keyed (
id INT,
name STRING,
price DOUBLE,
ts BIGINT
) USING hudi
TBLPROPERTIES (
type = 'cow',
primaryKey = 'id',
preCombineField = 'ts'
);
1.4 外部表
通常,Hudi表是由流式写入器(如streamer tool,)创建的,后者可能需要一些SQL语句才能在其上运行。可以使用location 语句创建外部表。不需要指定schema和除分区列之外的任何properties,Hudi可以自动识别schemo和配置。
CREATE TABLE hudi_table_external
USING hudi
LOCATION 'file:///tmp/hudi_table/';
1.5 Create Table As Select (CTAS)
Hudi支持CTAS(create table as select)来支持Hudi表的创建和数据加载。为了确保高效地执行此操作,即使对于大负载,CTAS也使用bulk insert写入操作。
1.5.1 使用CATS创建分区表
# create managed parquet table
CREATE TABLE parquet_table
USING parquet
LOCATION 'file:///tmp/parquet_dataset/';# CTAS by loading data into Hudi table
CREATE TABLE hudi_table_ctas
USING hudi
TBLPROPERTIES (
type = 'cow',
preCombineField = 'ts'
)
PARTITIONED BY (dt)
AS SELECT * FROM parquet_table;
1.5.2 使用CATS创建非分区表
创建非分区表时也可以使用create table as select。
# create managed parquet table
CREATE TABLE parquet_table
USING parquet
LOCATION 'file:///tmp/parquet_dataset/';# CTAS by loading data into Hudi table
CREATE TABLE hudi_table_ctas
USING hudi
TBLPROPERTIES (
type = 'cow',
preCombineField = 'ts'
)
AS SELECT * FROM parquet_table;
1.5.3在使用CATS时设置主键
可以通过在表属性中设置primaryKey。
CREATE TABLE hudi_table_ctas
USING hudi
TBLPROPERTIES (
type = 'cow',
primaryKey = 'id'
)
PARTITIONED BY (dt)
AS
SELECT 1 AS id, 'a1' AS name, 10 AS price, 1000 AS dt;
1.5.4使用CATS复制数据
可以通过create table as select 复制外部表数据。
# create managed parquet table
CREATE TABLE parquet_table
USING parquet
LOCATION 'file:///tmp/parquet_dataset/*.parquet';# CTAS by loading data into hudi table
CREATE TABLE hudi_table_ctas
USING hudi
LOCATION 'file:///tmp/hudi/hudi_tbl/'
TBLPROPERTIES (
type = 'cow'
)
AS SELECT * FROM parquet_table;
1.6 设置Hudi配置
可以通过不同的方法传递给定hudi表的配置。
使用set命令
可以使用set命令来设置Hudi的任何写入配置。这将适用于整个spark会话中的操作。
set hoodie.insert.shuffle.parallelism = 100;
set hoodie.upsert.shuffle.parallelism = 100;
set hoodie.delete.shuffle.parallelism = 100;
使用表属性
还可以在创建表时配置表选项。这将仅适用于当前表,并覆盖任何SET命令值。
CREATE TABLE IF NOT EXISTS tableName (
colName1 colType1,
colName2 colType2,
...
) USING hudi
TBLPROPERTIES (
primaryKey = '${colName1}',
type = 'cow',
${hoodie.config.key1} = '${hoodie.config.value1}',
${hoodie.config.key2} = '${hoodie.config.value2}',
....
);e.g.
CREATE TABLE IF NOT EXISTS hudi_table (
id BIGINT,
name STRING,
price DOUBLE
) USING hudi
TBLPROPERTIES (
primaryKey = 'id',
type = 'cow',
hoodie.cleaner.fileversions.retained = '20',
hoodie.keep.max.commits = '20'
);
1.7 表属性
可以在创建表时设置表属性。
1.7.1 常用表属性
参数名 | 默认值 | 描述 |
type | cow | 要创建的表类型。type='cow'创建COPY-ON-WRITE表,type='mor'创建MERGE-ON-READ表。与hoodie.datasource.write.table.type相同。 |
primaryKey | uuid | 表的主键字段名,用逗号分隔。与hoodie.datasource.write.recordkey.field相同。如果忽略此配置,hudi将自动生成主键。如果明确设置,则主键生成将使用用户配置。 |
preCombineField | 表的预合并字段。它用于在多个版本之间解析记录的最终版本。通常,事件时间或其他类似列将用于排序目的。Hudi将能够使用preCombine字段值处理无序数据。 |
primaryKey、preCombineField、type和其他属性区分大小写。
1.7.2并发写入器的Passing Lock Providers
Hudi需要一个锁提供程序来支持并发写入程序或异步表服务。用户也可以将这些表属性传递到TBLPROPERTIES中。下面是一个基于Zookeeper的配置示例。
-- Properties to use Lock configurations to support Multi Writers
TBLPROPERTIES(
hoodie.write.lock.zookeeper.url = "zookeeper",
hoodie.write.lock.zookeeper.port = "2181",
hoodie.write.lock.zookeeper.lock_key = "tableName",
hoodie.write.lock.provider = "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider",
hoodie.write.concurrency.mode = "optimistic_concurrency_control",
hoodie.write.lock.zookeeper.base_path = "/tableName"
)
1.7.3为表启用列统计信息/记录级别索引
Hudi提供了利用有关表的丰富元数据和索引、加快DML和查询的能力。例如:可以启用列统计信息的收集来执行快速数据跳过,或者可以使用以下表属性使用记录级别索引来执行快速更新或点查找。
TBLPROPERTIES('hoodie.metadata.index.column.stats.enable' = 'true''hoodie.metadata.record.index.enable' = 'true'
)
1.8 Spark Alter Table
语法
-- Alter table name
ALTER TABLE oldTableName RENAME TO newTableName;-- Alter table add columns
ALTER TABLE tableIdentifier ADD COLUMNS(colAndType [, colAndType]);
样例
--rename to:
ALTER TABLE hudi_table RENAME TO hudi_table_renamed;--add column:
ALTER TABLE hudi_table ADD COLUMNS(remark STRING);
1.9 修改表属性
语法
-- alter table ... set|unset
ALTER TABLE tableIdentifier SET|UNSET TBLPROPERTIES (table_property = 'property_value');
样例
ALTER TABLE hudi_table SET TBLPROPERTIES (hoodie.keep.max.commits = '10');
ALTER TABLE hudi_table SET TBLPROPERTIES ("note" = "don't drop this table");ALTER TABLE hudi_table UNSET TBLPROPERTIES IF EXISTS (hoodie.keep.max.commits);
ALTER TABLE hudi_table UNSET TBLPROPERTIES IF EXISTS ('note');
当前,尝试更改列类型可能会引发错误。不支持将列colName的oldColType更改为colName的newColType。由于一个开放的SPARK问题。
1.10 修改配置属性
还可以通过alter table SET SERDEPROPERTIES更改表的写入配置。
语法
-- alter table ... set|unset
ALTER TABLE tableName SET SERDEPROPERTIES ('property' = 'property_value');
样例
ALTER TABLE hudi_table SET SERDEPROPERTIES ('key1' = 'value1');
1.11 展示和删除分区
语法
-- Show partitions
SHOW PARTITIONS tableIdentifier;-- Drop partition
ALTER TABLE tableIdentifier DROP PARTITION ( partition_col_name = partition_col_val [ , ... ] );
样例
--Show partition:
SHOW PARTITIONS hudi_table;--Drop partition:
ALTER TABLE hudi_table DROP PARTITION (dt='2021-12-09', hh='10');
1.12 使用限制
Hudi目前在使用Spark SQL创建/更改表时有以下限制。
1) ALTER TABLE ... RENAME TO ...使用AWS Glue Data Catalog作为配置单元元存储时不支持,因为Glue本身不支持表重命名。
2)Spark SQL创建的新Hudi表将默认设置hoodie.datasource.write.hive_style_partitioning=true,以便于使用。这可以使用表属性重写。
2.Flink SQL 创建Hudi表
2.1 创建catalog
catalog有助于管理SQL表,如果catalog保留表定义,则可以在会话之间共享表。对于hms模式,catalog还补充了配置单元同步选项。
样例
CREATE CATALOG hoodie_catalog
WITH ('type'='hudi','catalog.path' = '${catalog default root path}','hive.conf.dir' = '${directory where hive-site.xml is located}','mode'='hms' -- supports 'dfs' mode that uses the DFS backend for table DDLs persistence);
选项
选项名 | 必填项 | 默认值 | 描述 |
catalog.path | true | catalog表存储的默认路径,该路径用于自动推断表路径,默认表路径:${catalog.path}/${db_name}/${table_name} | |
default-database | false | default | 默认数据库名称 |
hive.conf.dir | false | hive-site.xml所在的目录,仅在hms模式下有效 | |
mode | false | dfs | 支持使用hms持久化表选项的hms模式 |
table.external | false | false | 是否创建外部表,仅在hms模式下有效 |
2.2 Create Table
CREATE TABLE hudi_table2(
id int,
name string,
price double
)
WITH (
'connector' = 'hudi',
'path' = 's3://bucket-name/hudi/',
'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table, default is COPY_ON_WRITE
);
2.3 修改表
ALTER TABLE tableA RENAME TO tableB;