当前位置: 首页 > news >正文

Flink Hive Catalog操作案例

在此对Flink读写Hive表操作进行逐步记录,需要指出的是,其中操作Hive分区表和非分区表的DDL有所不同,以下分别记录。

基础环境

Hive-3.1.3
Flink-1.17.1

基本操作与准备

1、上传依赖jar包到flink/lib目录下

cp flink-sql-connector-hive-3.1.3_2.12-1.17.1.jar
cp mysql-connector-j-8.1.0.jar

2、更换planner依赖(Hive集成的推荐设置)

mv /usr/sft/flink-1.17.1/opt/flink-table-planner_2.12-1.17.1.jar /usr/sft/flink-1.17.1/lib/
mv /usr/sft/flink-1.17.1/lib/flink-table-planner-loader-1.17.1.jar /usr/sft/flink-1.17.1/opt/

3、启动Hive MetaStore

nohup hive --service metastore 2>&1 &

4、启动flink集群和sql-client

yarn-session.sh -d -nm flink-cluster
sql-client.sh embedded -s yarn-session

5、在flink sql-client中创建hive catalog

CREATE CATALOG hive WITH ('type' = 'hive','default-database' = 'sty','hive-conf-dir' = '/usr/sft/hive-3.1.3/conf'
);

非分区表读写

1、Hive中建表并插入数据

create table behavior(
username string,
behavior string
);
insert into behavior values('lisi','buy'),('zhangsan','read');

2、使用hive catalog

use catalog hive;

2、flink sql-client中执行数据插入与数据查询(和常规sql一致)

insert into behavior values('sisi','buy'),('tracy','read');
select *from behavior;

在这里插入图片描述

分区表读写

这里和非分区表有所不同,主要体现在建表层面,参考博客:https://www.jianshu.com/p/295066a24092

写入到hive分区表
streamEnv需要开启checkpoint,保证flink写入hive分区表的写入一致性
hive表ddl中需要指定以下TBLPROPERTIES:
sink.partition-commit.trigger:分区提交触发器,单选,可选值为partition-time、process-time(默认), 其中partition-time需要根据当前数据的watermark来判断分区是否需要提交,当watermark + delay大于等于分区上的时间时就会提交该分区元数据;process-time的话根据当前系统处理时间来判断分区是否需要提交,当系统处理时间大于等于分区上的时间就会提交该分区元数据
partition.time-extractor.timestamp-pattern:使用partition-time触发器时使用该配置项。表示从表字段中提取出表达某个分区的时间的格式,需要提取到的时间必须为yyyy-MM-dd HH:mm:ss的格式。比如字段dt的格式为yyyy-MM-dd,则配置为$dt 00:00:00则表示分区时间取值为dt的value的0点0分0秒,可以选择多个表字段组合。当表字段无法抽取出符合的格式时,则使用自定义提取器partition.time-extractor.class。
sink.partition-commit.delay: 表示watermark允许event time的最大乱序时间,使用partition-time触发器时可以使用,默认为0s
sink.partition-commit.policy.kind:分区提交方式,多选,可选值为metastore、success-file、custom,metastore表示写入元数据库,success-file表示往hdfs分区目录写入一个标志文件,custom表示使用自定义提交方式,通常使用metastore,success-file组合
partition.time-extractor.kind:当要使用自定义分区时间提取器时需要配置此项,值配置为custom
partition.time-extractor.class:当要使用自定义分区时间提取器时需要配置此项,值配置为自定义提取器的类路径。在集群中运行时,需要把该类打成jar包放到flink lib目录下。
某个分区触发提交后,后续再有此分区的数据进来,仍然会写入hive该分区。
作者:spongebobZ
链接:https://www.jianshu.com/p/295066a24092
来源:简书

1、hive创建分区表并插入数据

create table userinfo(
name string,
age int
)
partitioned by (dt string)
stored as orc
tblproperties('sink.partition-commit.trigger' = 'partition-time','sink.partition-commit.policy.kind'='metastore,success-file','partition.time-extractor.timestamp-pattern' ='yyyy-MM-dd HH:mm:ss','sink.partition-commit.delay' = '10'
);insert into table userInfo partition(dt='2023-10-26') values('zhangsan',23);
insert into table userInfo partition(dt='2023-10-26') values('lisi',26),('wangwu',27);

注意:若建表时未在tblproperties中配置恰当的sink.partition-commit.policy.kind,flink sql-client插入数据时将遇到如下报错:

Could not execute SQL statement. Reason:
org.apache.flink.connectors.hive.FlinkHiveException: Streaming write to partitioned hive table `hive`.`sty`.`userInfo` without providing a commit policy. Make sure to set a proper value for sink.partition-commit.policy.kind

2、flink sql-client插入与查询数据

insert into  userinfo partition(dt='2023-10-24') values('tracy',26),('lily',27);
select *from userinfo;

在这里插入图片描述

http://www.lryc.cn/news/207827.html

相关文章:

  • NSSCTF做题第9页(3)
  • 从瀑布模式到水母模式:ChatGPT如何赋能软件研发全流程【文末送书五本】
  • 设置使用LibreOffice作为默认程序打开word、excel等文档
  • 创新领航 | 竹云参编《基于区块链的数据资产评估实施指南》正式发布!
  • 【Docker】Linux网桥连接多个命名空间
  • ES6新特性:let关键字详解
  • 鸿运主动安全监控云平台任意文件下载漏洞复现 [附POC]
  • 使用pycharm远程连接到Linux服务器进行开发
  • JavaScript 中 BOM 基础知识有哪些?
  • 【PointNet—论文笔记分享】
  • Mysql8.1.0 windows 绿色版安装
  • 何为自制力?如何提高自制力?
  • 第1篇 目标检测概述 —(3)目标检测评价指标
  • 剑指JUC原理-3.线程常用方法及状态
  • MYSQL8-sql语句使用集合。MYCAT-sql语法使用集合
  • UNIX 域协议(本地通信协议)
  • 分类预测 | MATLAB实现SSA-CNN-BiGRU-Attention数据分类预测(SE注意力机制)
  • 基于FPGA的图像PSNR质量评估计算实现,包含testbench和MATLAB辅助验证程序
  • 算法进修Day-38
  • 8.MySQL内外连接
  • 使用.NET设计一个Epub电子书生成工具
  • 2023-10-26 用C语言实现一个大整数加法
  • [hive] 窗口函数 ROW_NUMBER()
  • TensorFlow和Pytorch两种机器学习框架的比较及优缺点
  • “Can‘t open workbook - unsupported file type: XML“
  • 达芬奇MacOS最新中文版 DaVinci Resolve Studio 18中文注册秘钥
  • 电脑扬声器未插入?4个方法帮你恢复声音!
  • Python - 通过/SSH 获取远程主机的 env 变量
  • ubuntu 下的 使用anaconda 环境运行python 项目
  • MySQL创建定时任务定时执行sql