当前位置: 首页 > news >正文

Flink实现kafka到kafka、kafka到doris的精准一次消费

1 流程图

2 Flink来源表建模

--来源-城市topic
CREATE TABLE NJ_QL_JC_SSJC_SOURCE (
record string 
) WITH ('connector' = 'kafka','topic' = 'QL_JC_SSJC','properties.bootstrap.servers' = '172.*.*.*:9092','properties.group.id' = 'QL_JC_SSJC_NJ_QL_JC_SSJC_SOURCE','scan.startup.mode' = 'group-offsets','properties.isolation.level' = 'read_committed','properties.auto.offset.reset' = 'earliest','format' = 'raw'
);
--来源-中台kafka-topic
CREATE TABLE ODS_QL_JC_SSJC_SOURCE (
sscsdm string,
extract_time TIMESTAMP,
record string
) WITH ('connector' = 'kafka','topic' = 'ODS_QL_JC_SSJC','properties.bootstrap.servers' = '172.*.*.*:21007,172.*.*.*:21007,172.*.*.*:21007','properties.security.protocol' = 'SASL_PLAINTEXT','properties.sasl.kerberos.service.name' = 'kafka','properties.kerberos.domain.name' = 'hadoop.hadoop.com','properties.group.id' = 'ODS_QL_JC_SSJC_SOURCE_ODS_QL_JC_SSJC_SOURCE','scan.startup.mode' = 'group-offsets','properties.auto.offset.reset' = 'earliest','properties.isolation.level' = 'read_committed','sink.semantic' = 'exactly-once','format' = 'json'
);

3 Flink去向表建模

--去向-中台kafka-topic
CREATE TABLE KAFKA_ODS_QL_JC_SSJC_SINK  (
sscsdm string,
extract_time TIMESTAMP,
record string
) WITH ('connector' = 'kafka','topic' = 'ODS_QL_JC_SSJC','properties.bootstrap.servers' = '172.*.*.*:21007,172.*.*.*:21007,172.*.*.*:21007','properties.security.protocol' = 'SASL_PLAINTEXT','properties.sasl.kerberos.service.name' = 'kafka','properties.kerberos.domain.name' = 'hadoop.hadoop.com','format' = 'json', 'properties.transaction.timeout.ms' = '900000'
);
--去向-Doris表
CREATE TABLE DORIS_ODS_QL_JC_SSJC_SINK (sscsdm STRING,extract_time TIMESTAMP,record STRING
) WITH ('connector' = 'doris','fenodes' = '3.*.*.*:8030,3.*.*.*:8030,3.*.*.*:8030','table.identifier' = 'doris_d.ods_ql_jc_ssjc','username' = 'root','password' = '********','sink.properties.two_phase_commit' = 'true' 
);

http://www.lryc.cn/news/187358.html

相关文章:

  • Outlook屏蔽Jira AI提醒
  • 毛玻璃 has 选择器卡片悬停效果
  • [hive]解决group by 字段超过系统规定64个
  • 生成老年人的声音sox
  • DC2DC电源设计注意事项--1,Feedback
  • 计算机视觉处理的开源框架
  • 最新AI智能创作系统源码AI绘画系统/支持GPT联网提问/支持Prompt应用
  • 2019架构真题案例(四十八)
  • zabbix自定义监控内容和自动发现
  • 导引服务机器人 通用技术条件
  • 今日头条文章采集ChatGPT3.5/4.0驱动浏览器改写文章软件说明文档
  • Mac系统清理工具BuhoCleaner
  • SpringBoot集成WebSocket讲解
  • GNOME 45 动态三层缓存补丁更新
  • [论文笔记]Poly-encoder
  • vs2022中配置PCL1.13.1(附带提供属性表.props文件)
  • 基于共生生物优化的BP神经网络(分类应用) - 附代码
  • GIN框架路由的实现原理
  • Android Studio版本升级后的问题 gradle降级、jdk升级
  • 浏览器插件开发爬虫记录
  • 万万没想到,我用文心一言开发了一个儿童小玩具
  • SQL sever中的视图
  • 如何理解数据序列化
  • 07_项目开发_用户信息列表
  • flutter ios打包
  • 【无公网IP内网穿透】基于NATAPP搭建Web站点
  • 智能AI创作系统ChatGPT详细搭建教程/AI绘画系统/支持GPT联网提问/支持Prompt应用/支持国内AI模型
  • 【技能树笔记】网络篇——练习题解析(五)
  • Java集合(二)--- 集合元素的遍历操作Iterator以及foreach
  • 数据结构:排序- 插入排序(插入排序and希尔排序) , 选择排序(选择排序and堆排序) , 交换排序(冒泡排序and快速排序) , 归并排序