当前位置: 首页 > news >正文

flink写入hudi MOR表

第一步:创建flink内存表从kafka读取数据:

DROP TABLE IF EXISTS HUDI_KAFKA_DEBEZIUM_ZHANG;
CREATE TABLE IF NOT EXISTS HUDI_KAFKA_DEBEZIUM_ZHANG(
ID STRING comment '编码'
,NAME STRING comment '名称'
,PRIMARY KEY(RCLNT,RLDNR,RRCTY,RVERS,RYEAR,ROBJNR,COBJNR,SOBJNR,RTCUR,RUNIT,DRCRK,RPMAX) NOT ENFORCED
) with (
'connector'='kafka',
'topic'='GLFUNCT_DEBEZIUM_TRANSFER',
--'scan.startup.mode'='earliest-offset',
'scan.startup.mode'='timestamp',
'scan.startup.timestamp-millis'='1725811200000',
'properties.group.id'='KAFKA_GLFUNCT_CHANGELOG_HUDI7',
'properties.bootstrap.servers'='10.66.28.69:9092,10.66.28.70:9092,10.66.28.61:9092',
'value.format'='debezium-json',
'scan.topic-partition-discovery.interval' = '10000',
'value.debezium-json.ignore-parse-errors' = 'true'
);

第二步:创建MOR类型的hudi表

DROP TABLE IF EXISTS HUDI_ZHANG;
CREATE TABLE IF NOT EXISTS HUDI_ZHANG(
ID STRING comment '编码'
,NAME STRING comment '名称'
,PRIMARY KEY(ID,NAME) NOT ENFORCED
)with (
'connector' = 'hudi',
'path' = 'hdfs://nameservice1/user/hive/warehouse/hudi_ods_sap.db/HUDI_ZHANG',
'table.type' = 'MERGE_ON_READ',
'hive_sync.skip_ro_suffix' = 'true',
'hoodie.datasource.write.recordkey.field' = 'ID,NAME',
'write.operation' = 'upsert',
--'write.precombine.field' = 'ETL_DT',
'write.tasks' = '4',
'index.bootstrap.enabled' = 'true',
'write.insert.drop.duplicates'='true',
'compaction.tasks' = '4',
'compaction.async.enabled' = 'true',
'compaction.trigger.strategy' = 'time_elapsed',
'compaction.delta_seconds' = '1200',
'changelog.enabled' = 'true',
'read.streaming.enabled' = 'true',
'read.streaming.check-interval' = '1',
'hive_sync.enable' = 'true',
'hive_sync.mode' = 'hms',
'hive_sync.metastore.uris' = 'thrift://pld3cwztmg01:9083',
--'hive_sync.jdbc_url' = 'jdbc:hive2://pld3cwztmg01:10000',
'hive_sync.table' = 'ZHANG',
'hive_sync.db' = 'hudi_ods_sap',
'hive_sync.username' = 'hive',
'hive_sync.password' = 'hive'
);

第三步:把kafka表写入到hudi表即可

insert into HUDI_ZHANG select * from HUDI_KAFKA_DEBEZIUM_ZHANG where RCLNT = '300'; 

以上就是从kafka读取数据写入到hudi表,且表类型是MOR。

http://www.lryc.cn/news/434835.html

相关文章:

  • 智能工厂程序设计 之-2 (Substrate) :三个世界--“存在的意义”-“‘我’的价值的实现” 之2
  • 概要设计例题
  • 注册表模式:使用注册表和装饰器函数的模块化设计
  • 怎样将vue项目 部署在ngixn的子目录下
  • FPGA开发:Verilog数字设计基础
  • 哈希表,算法
  • Java数组的定义及遍历
  • 【电路笔记】-反相运算放大器
  • 【电子通识】半导体工艺——刻蚀工艺
  • vue-router 之如何在模版(template)中获取路由配置信息?
  • HPL 源码结构分析
  • Java代码审计篇 | ofcms系统审计思路讲解 - 篇3 | 文件上传漏洞审计
  • 【Kubernetes】常见面试题汇总(五)
  • MySQL 解决时区相关问题
  • SpringSecurity Context 中 获取 和 更改 当前用户信息的问题
  • Makefile的四种赋值运算符
  • framebuffer
  • 7.科学计算模块Numpy(4)ndarray数组的常用操作(二)
  • 抖音评论区截流脚本软件详细使用教学,抖音私域获客引流的五种方法。
  • Linux_kernel移植uboot07
  • Day-04-QFile打开文件的两种方式
  • 第三部分:4---进程地址空间
  • 【Android】程序开发组件—探究Jetpack
  • pytorch torch.norm函数介绍
  • 【lc_hot100】刷题心得
  • FANUC 数控 A06B-6058-H227 伺服放大器
  • Python将表格文件中某些列的数据整体向上移动一行
  • 基于YOLOv8的PCB缺陷检测算法,加入一种基于内容引导注意力(CGA)的混合融合方案(一)
  • 如何在红米手机中恢复已删除的照片?(6 种方式可供选择)
  • 嵌入式实时操作系统(RTOS):原理、应用与发展