当前位置: 首页 > news >正文

消费 Kafka 一个TOPIC数据,插入到另一个KAFKA的TOPIC

从 Kafka 消费 CDC 数据(变更捕获,需 Upsert 语义)
用 kafka 连接器 + 主键 + 处理函数 模拟 Upsert,示例:
CREATE TABLE `KAFKA_TEST_0002` (
`LGL_PERN_CODE` VARCHAR COMMENT 'LGL_PERN_CODE',
`LBLTY_ACCT_NUM` VARCHAR COMMENT 'LBLTY_ACCT_NUM',
`ACCT_NM` VARCHAR COMMENT 'ACCT_NM',
`CUST_NUM` VARCHAR COMMENT 'CUST_NUM',
`NAT_CODE` VARCHAR COMMENT 'NAT_CODE',
-- 声明主键(用于 Upsert 去重)
PRIMARY KEY (`LBLTY_ACCT_NUM`) NOT ENFORCED 
) WITH (
'connector' = 'kafka',  -- 恢复为 kafka 连接器
'topic' = 'KAFKA_TEST_0002',
'properties.bootstrap.servers' = '10.57.48.38:21007,10.57.48.37:21007,10.57.48.36:21007',
'properties.group.id' = '7a074dd07bfb4d4da39eb0f5773b952b',
'scan.startup.mode' = 'earliest-offset',
'format' = 'debezium-json',  -- 适配 CDC 格式
'debezium-json.ignore-parse-errors' = 'true',
'debezium-json.schema-include' = 'true',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.kerberos.domain.name' = 'hadoop.124dba82_3b54_0125_81e4_110652049a41.com',
'properties.sasl.kerberos.service.name' = 'kafka'
);

-- 如需 Upsert 输出,再通过 Sink 写入 upsert-kafka
CREATE TABLE KafkaUpsertSink (
`LBLTY_ACCT_NUM` VARCHAR,
`LGL_PERN_CODE` VARCHAR,
`ACCT_NM` VARCHAR,
PRIMARY KEY (`LBLTY_ACCT_NUM`) NOT ENFORCED 
) WITH (
'connector' = 'upsert-kafka',  -- Sink 侧使用 upsert-kafka
'topic' = 'sink_topic',
'properties.bootstrap.servers' = '...',
'key.format' = 'json',
'value.format' = 'json'
);

-- 业务逻辑:从 Kafka 读 CDC 数据,处理后 Upsert 写入
INSERT INTO KafkaUpsertSink
SELECT LBLTY_ACCT_NUM, LGL_PERN_CODE, ACCT_NM
FROM `KAFKA_TEST_0002`;

http://www.lryc.cn/news/587884.html

相关文章:

  • Docker配置国内镜像源
  • CompletableFuture 源码解析
  • Linux 系统下的 Sangfor VDI 客户端安装与登录完全攻略 (CentOS、Ubuntu、麒麟全线通用)
  • HTTP协议版本对比
  • Apache部署
  • Ubuntu-25.04 Wayland桌面环境安装Anaconda3之后无法启动anaconda-navigator问题解决
  • Can201-Introduction to Networking:Data Plane数据平面
  • vue2/3生命周期使用建议
  • hive的相关的优化
  • Linux 系统管理基础教程
  • 图像分割论文中的评价指标
  • 从零实现一个基于 mem0的具有长期记忆的Text2SQL代理
  • R 语言科研绘图第 64 期 --- 哑铃图
  • 当前(2024-07-14)视频插帧(VFI)方向的 SOTA 基本被三篇顶会工作占据,按“精度-速度-感知质量”三条线总结如下,供你快速定位最新范式
  • 设计模式》》门面模式 适配器模式 区别
  • js与vue基础学习
  • Linux 基础命令详解:从入门到实践(1)
  • 基于Hadoop的竞赛网站日志数据分析与可视化(上)
  • STM32介绍和GPIO
  • Spring Boot启动原理:从main方法到内嵌Tomcat的全过程
  • Datawhale AI夏令营-基于带货视频评论的用户洞察挑战赛
  • [Python] -实用技巧4-Python中浅拷贝与深拷贝的区别详解
  • 工业软件加密锁复制:一场技术与安全的博弈
  • 借助DeepSeek编写输出漂亮表格的chdb客户端
  • 终端安全最佳实践
  • IIS错误:Service Unavailable HTTP Error 503. The service is unavailable.
  • SpringAi笔记
  • OpenCV 视频处理与摄像头操作详解
  • MySQL Innodb Cluster配置
  • 【CV综合实战】基于深度学习的工业压力表智能检测与读数系统【3】使用OpenCV读取分割后的压力表读数