当前位置: 首页 > news >正文

Hudi SQL DDL

本文介绍Hudi在 Spark 和 Flink 中使用SQL创建和更改表的支持。

1.Spark SQL 创建hudi表

1.1 创建非分区表

使用标准CREATE TABLE语法创建表,该语法支持分区和传递表属性。

CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
  [(col_name data_type [COMMENT col_comment], ...)]
  [COMMENT table_comment]
  [PARTITIONED BY (col_name, ...)]
  [ROW FORMAT row_format]
  [STORED AS file_format]
  [LOCATION path]
  [TBLPROPERTIES (property_name=property_value, ...)]
  [AS select_statement];

创建一个非分区表就像创建一个常规表一样简单。

-- create a Hudi table
CREATE TABLE IF NOT EXISTS hudi_table (
  id INT,
  name STRING,
  price DOUBLE
) USING hudi;

SparkSQL和Hudi字段类型比较

Spark

Hudi

Notes

boolean

boolean

byte

int

short

int

integer

int

long

long

date

date

timestamp

timestamp

float

float

double

double

string

string

decimal

decimal

binary

bytes

array

array

map

map

struct

struct

char

not supported

varchar

not supported

numeric

not supported

null

not supported

object

not supported

1.2 创建分区表

分区表可以通过添加partition by子句来创建。分区有助于根据分区列将数据组织到多个文件夹中。它还可以通过限制扫描的元数据、索引和数据的数量来帮助加快查询和索引查找。

CREATE TABLE IF NOT EXISTS hudi_table_partitioned (
  id BIGINT,
  name STRING,
  dt STRING,
  hh STRING
) USING hudi
TBLPROPERTIES (
  type = 'cow'
)
PARTITIONED BY (dt);

还可以通过提供逗号分隔的字段名来创建由多个字段分区的表。例如,“partitioned by dt,hh”

1.3 创建包含键和排序字段的表

表使用键跟踪表中的每个记录。Hudi 为每个新记录自动生成了一个高度压缩的键。如果要使用现有字段作为关键字,可以设置primaryKey选项。通常,这还要配置preCombineField选项,以处理传入写入中具有相同的无序数据和潜在重复记录。

CREATE TABLE IF NOT EXISTS hudi_table_keyed (
  id INT,
  name STRING,
  price DOUBLE,
  ts BIGINT
) USING hudi
TBLPROPERTIES (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'ts'
);

1.4 外部表

通常,Hudi表是由流式写入器(如streamer tool,)创建的,后者可能需要一些SQL语句才能在其上运行。可以使用location 语句创建外部表。不需要指定schema和除分区列之外的任何properties,Hudi可以自动识别schemo和配置。

CREATE TABLE hudi_table_external
USING hudi
LOCATION 'file:///tmp/hudi_table/';

1.5 Create Table As Select (CTAS)

Hudi支持CTAS(create table as select)来支持Hudi表的创建和数据加载。为了确保高效地执行此操作,即使对于大负载,CTAS也使用bulk insert写入操作。

1.5.1 使用CATS创建分区表

# create managed parquet table
CREATE TABLE parquet_table
USING parquet
LOCATION 'file:///tmp/parquet_dataset/';# CTAS by loading data into Hudi table
CREATE TABLE hudi_table_ctas
USING hudi
TBLPROPERTIES (
  type = 'cow',
  preCombineField = 'ts'
)
PARTITIONED BY (dt)
AS SELECT * FROM parquet_table;

1.5.2 使用CATS创建非分区表

创建非分区表时也可以使用create table as select。

# create managed parquet table
CREATE TABLE parquet_table
USING parquet
LOCATION 'file:///tmp/parquet_dataset/';# CTAS by loading data into Hudi table
CREATE TABLE hudi_table_ctas
USING hudi
TBLPROPERTIES (
  type = 'cow',
  preCombineField = 'ts'
)
AS SELECT * FROM parquet_table;

1.5.3在使用CATS时设置主键

可以通过在表属性中设置primaryKey。

CREATE TABLE hudi_table_ctas
USING hudi
TBLPROPERTIES (
  type = 'cow',
  primaryKey = 'id'
)
PARTITIONED BY (dt)
AS
SELECT 1 AS id, 'a1' AS name, 10 AS price, 1000 AS dt;

1.5.4使用CATS复制数据

可以通过create table as select 复制外部表数据。

# create managed parquet table
CREATE TABLE parquet_table
USING parquet
LOCATION 'file:///tmp/parquet_dataset/*.parquet';# CTAS by loading data into hudi table
CREATE TABLE hudi_table_ctas
USING hudi
LOCATION 'file:///tmp/hudi/hudi_tbl/'
TBLPROPERTIES (
  type = 'cow'
)
AS SELECT * FROM parquet_table;

1.6 设置Hudi配置

可以通过不同的方法传递给定hudi表的配置。

使用set命令

可以使用set命令来设置Hudi的任何写入配置。这将适用于整个spark会话中的操作。

set hoodie.insert.shuffle.parallelism = 100;
set hoodie.upsert.shuffle.parallelism = 100;
set hoodie.delete.shuffle.parallelism = 100;

使用表属性

还可以在创建表时配置表选项。这将仅适用于当前表,并覆盖任何SET命令值。

CREATE TABLE IF NOT EXISTS tableName (
  colName1 colType1,
  colName2 colType2,
  ...
) USING hudi
TBLPROPERTIES (
  primaryKey = '${colName1}',
  type = 'cow',
  ${hoodie.config.key1} = '${hoodie.config.value1}',
  ${hoodie.config.key2} = '${hoodie.config.value2}',
  ....
);e.g.
CREATE TABLE IF NOT EXISTS hudi_table (
  id BIGINT,
  name STRING,
  price DOUBLE
) USING hudi
TBLPROPERTIES (
  primaryKey = 'id',
  type = 'cow',
  hoodie.cleaner.fileversions.retained = '20',
  hoodie.keep.max.commits = '20'
);

1.7 表属性

可以在创建表时设置表属性

1.7.1 常用表属性

参数名

默认值

描述

type

cow

要创建的表类型。type='cow'创建COPY-ON-WRITE表,type='mor'创建MERGE-ON-READ表。与hoodie.datasource.write.table.type相同。

primaryKey

uuid

表的主键字段名,用逗号分隔。与hoodie.datasource.write.recordkey.field相同。如果忽略此配置,hudi将自动生成主键。如果明确设置,则主键生成将使用用户配置。

preCombineField

表的预合并字段。它用于在多个版本之间解析记录的最终版本。通常,事件时间或其他类似列将用于排序目的。Hudi将能够使用preCombine字段值处理无序数据。

primaryKey、preCombineField、type和其他属性区分大小写。

1.7.2并发写入器的Passing Lock Providers 

Hudi需要一个锁提供程序来支持并发写入程序或异步表服务。用户也可以将这些表属性传递到TBLPROPERTIES中。下面是一个基于Zookeeper的配置示例。

-- Properties to use Lock configurations to support Multi Writers
TBLPROPERTIES(
  hoodie.write.lock.zookeeper.url = "zookeeper",
  hoodie.write.lock.zookeeper.port = "2181",
  hoodie.write.lock.zookeeper.lock_key = "tableName",
  hoodie.write.lock.provider = "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider",
  hoodie.write.concurrency.mode = "optimistic_concurrency_control",
  hoodie.write.lock.zookeeper.base_path = "/tableName"
)

1.7.3为表启用列统计信息/记录级别索引

Hudi提供了利用有关表的丰富元数据和索引、加快DML和查询的能力。例如:可以启用列统计信息的收集来执行快速数据跳过,或者可以使用以下表属性使用记录级别索引来执行快速更新或点查找。

TBLPROPERTIES('hoodie.metadata.index.column.stats.enable' = 'true''hoodie.metadata.record.index.enable' = 'true' 
)

1.8 Spark Alter Table

语法

-- Alter table name
ALTER TABLE oldTableName RENAME TO newTableName;-- Alter table add columns
ALTER TABLE tableIdentifier ADD COLUMNS(colAndType [, colAndType]);

样例

--rename to:
ALTER TABLE hudi_table RENAME TO hudi_table_renamed;--add column:
ALTER TABLE hudi_table ADD COLUMNS(remark STRING);

1.9 修改表属性

语法

-- alter table ... set|unset
ALTER TABLE tableIdentifier SET|UNSET TBLPROPERTIES (table_property = 'property_value');

样例

ALTER TABLE hudi_table SET TBLPROPERTIES (hoodie.keep.max.commits = '10');
ALTER TABLE hudi_table SET TBLPROPERTIES ("note" = "don't drop this table");ALTER TABLE hudi_table UNSET TBLPROPERTIES IF EXISTS (hoodie.keep.max.commits);
ALTER TABLE hudi_table UNSET TBLPROPERTIES IF EXISTS ('note');

当前,尝试更改列类型可能会引发错误。不支持将列colName的oldColType更改为colName的newColType。由于一个开放的SPARK问题

1.10 修改配置属性

还可以通过alter table SET SERDEPROPERTIES更改表的写入配置。

语法

-- alter table ... set|unset
ALTER TABLE tableName SET SERDEPROPERTIES ('property' = 'property_value');

样例

 ALTER TABLE hudi_table SET SERDEPROPERTIES ('key1' = 'value1');

1.11 展示和删除分区

语法

-- Show partitions
SHOW PARTITIONS tableIdentifier;-- Drop partition
ALTER TABLE tableIdentifier DROP PARTITION ( partition_col_name = partition_col_val [ , ... ] );

样例

--Show partition:
SHOW PARTITIONS hudi_table;--Drop partition:
ALTER TABLE hudi_table DROP PARTITION (dt='2021-12-09', hh='10');

1.12 使用限制

Hudi目前在使用Spark SQL创建/更改表时有以下限制。

1)  ALTER TABLE ... RENAME TO ...使用AWS Glue Data Catalog作为配置单元元存储时不支持,因为Glue本身不支持表重命名。

2Spark SQL创建的新Hudi表将默认设置hoodie.datasource.write.hive_style_partitioning=true,以便于使用。这可以使用表属性重写。

2.Flink SQL 创建Hudi表

2.1 创建catalog

catalog有助于管理SQL表,如果catalog保留表定义,则可以在会话之间共享表。对于hms模式,catalog还补充了配置单元同步选项。

样例

CREATE CATALOG hoodie_catalog
  WITH ('type'='hudi','catalog.path' = '${catalog default root path}','hive.conf.dir' = '${directory where hive-site.xml is located}','mode'='hms' -- supports 'dfs' mode that uses the DFS backend for table DDLs persistence);

选项

选项名

必填项

默认值

描述

catalog.path

true

catalog表存储的默认路径,该路径用于自动推断表路径,默认表路径:${catalog.path}/${db_name}/${table_name}

default-database

false

default

默认数据库名称

hive.conf.dir

false

hive-site.xml所在的目录,仅在hms模式下有效

mode

false

dfs

支持使用hms持久化表选项的hms模式

table.external

false

false

是否创建外部表,仅在hms模式下有效

2.2  Create Table

CREATE TABLE hudi_table2(
  id int, 
  name string, 
  price double
)
WITH (
'connector' = 'hudi',
'path' = 's3://bucket-name/hudi/',
'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table, default is COPY_ON_WRITE
);

2.3 修改表

ALTER TABLE tableA RENAME TO tableB;

http://www.lryc.cn/news/183782.html

相关文章:

  • gin 框架的 JSON Render
  • 《Dataset Condensation with Differentiable Siamese Augmentation》
  • 多普勒频率相关内容介绍
  • win10睡眠快捷方式
  • C++中的static和extern关键字
  • JAVA经典百题之找完数
  • CSS 滚动驱动动画 view-timeline-inset
  • ansible部署二进制k8s
  • Nginx限流熔断
  • QQ登录的具体流程
  • 用JMeter对HTTP接口进行压测(一)压测脚本的书写、调试思路
  • 接着聊聊如何从binlog文件恢复误delete的数据,模拟Oracle的闪回功能
  • 计算机竞赛 深度学习机器视觉车道线识别与检测 -自动驾驶
  • pyqt5使用经验总结
  • 【MQTT】mosquitto库中SSL/TLS相关API接口
  • 假期题目整合
  • Redisson—分布式服务
  • volatile使用方法
  • 提升您的 Go 应用性能的 6 种方法
  • 计算摄像技术02 - 颜色空间
  • Pytorch笔记之分类
  • 【目标检测】——PE-YOLO精读
  • Java 数组转集合
  • Elasticsearch:ES|QL 查询语言简介
  • qt qml中listview出现卡顿情况时的常用处理方法
  • Elasticsearch基础操作演示总结
  • Spring 作用域解析器AnnotationScopeMetadataResolver
  • 如何发布一个 NPM 包
  • Flask小项目教程(含MySQL与前端部分)
  • Eureka