当前位置: 首页 > news >正文

使用finksql方式将mysql数据同步到kafka中,每次只能同步一张表

使用finksql方式将mysql数据同步到kafka中,每次只能同步一张表

package flink;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class FlinkSQL_CDC {public static void main(String[] args) throws Exception {//
//        Configuration conf = new Configuration();
//        conf.setInteger("rest.port",3335);
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);//1.创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//2.创建Flink-MySQL-CDC的SourceTableResult tableResult = tableEnv.executeSql("CREATE TABLE table_name (" +"  id INT primary key," +"  name STRING" +") WITH (" +"  'connector' = 'mysql-cdc'," +"  'hostname' = 'hadoop102'," +"  'port' = '3306'," +"  'username' = 'root'," +"  'password' = 'xxxx'," +"  'database-name' = 'student'," +"  'table-name' = 'table_name'," +"'server-time-zone' = 'Asia/Shanghai'," +"'scan.startup.mode' = 'initial'" +")");// 2. 注册SinkTable: sink_sensor
//        tableEnv.executeSql("" +
//                "CREATE TABLE kafka_binlog ( " +
//                "  user_id INT, " +
//                "  user_name STRING, " +
//                "`proc_time` as PROCTIME()" +
//                ") WITH ( " +
//                "  'connector' = 'kafka', " +
//                "  'topic' = 'test2', " +
//                "  'properties.bootstrap.servers' = 'hadoop102:9092', " +
//                "  'format' = 'json' " +
//                ")" +
//                "");//upsert-kafkatableEnv.executeSql("" +"CREATE TABLE kafka_binlog ( " +"  user_id INT, " +"  user_name STRING, " +"`proc_time` as PROCTIME()," +"  PRIMARY KEY (user_id) NOT ENFORCED" +") WITH ( " +"  'connector' = 'upsert-kafka', " +"  'topic' = 'test2', " +"  'properties.bootstrap.servers' = 'hadoop102:9092', " +"  'key.format' = 'json' ," +"  'value.format' = 'json' " +")" +"");// 3. 从SourceTable 查询数据, 并写入到 SinkTabletableEnv.executeSql("insert into kafka_binlog select * from table_name");tableEnv.executeSql("select * from kafka_binlog").print();env.execute();}}
http://www.lryc.cn/news/145389.html

相关文章:

  • ios开发 swift5 苹果系统自带的图标 SF Symbols
  • Linux内核源码分析 (3)调度器的实现
  • 网络安全法+网络安全等级保护
  • 持续集成对软件项目管理的作用
  • 【Qt QAxObject】使用 QAxObject 高效任意读写 Excel 表
  • java八股文面试[多线程]——自旋锁
  • 分布式系统的多数据库,实现分布式事务回滚(1.7.0 seata整合2.0.4nacos)
  • PDF可以修改内容吗?有什么注意的事项?
  • 自动泊车的自动驾驶控制算法
  • Java doc等文件生成PDF、多个PDF合并
  • 【C++】list类的模拟实现
  • 机械臂+2d相机实现复合机器人定位抓取
  • 网络编程 http 相关基础概念
  • LatexEasy公式渲染教程
  • 十年测试工程师叙述自动化测试学习思路
  • SpringAOP详解(下)
  • 主流软件漏洞跟踪 Apache RocketMQ NameServer 远程代码执行漏洞(CVE-2023-37582)
  • Element table根据字段合并表格(可多字段合并),附带拖拽列动态合并
  • C++标准库STL容器详解
  • ParNew垃圾收集器(Serial+多线程)是干什么用的?
  • 【Android】AES解密抛出异常Cipher functions:OPENSSL_internal:WRONG_FINAL_BLOCK_LENGTH
  • 菜鸟教程《Python 3 教程》笔记(2):数据类型转换
  • JVM运行时参数查看
  • 每日一题:leetcode 1267 统计参与通信的服务器
  • Unity打包Windows程序,概率性出现无法全屏或分辨率不匹配
  • 消息中间件 介绍
  • JAVA-字符串长度
  • [oneAPI] 基于BERT预训练模型的SWAG问答任务
  • 如何为winform控件注册事件
  • 【LeetCode-面试经典150题-day15】