当前位置: 首页 > news >正文

Fink CDC数据同步(二)MySQL数据同步

1 开启binlog日志

2 数据准备

use bigdata;
drop table if exists user;CREATE TABLE `user`(`id` INTEGER NOT NULL AUTO_INCREMENT,`name` VARCHAR(20) NOT NULL DEFAULT '',`birth` VARCHAR(20) NOT NULL DEFAULT '',`gender` VARCHAR(10) NOT NULL DEFAULT '',PRIMARY KEY(`id`)
);
ALTER TABLE user AUTO_INCREMENT = 1001;insert into user values(default , '东契奇' , '1995-01-01' , '男');
insert into user values(default , '斯蒂芬' , '1996-12-21' , '男');
insert into user values(default , '里奥梅西' , '1993-05-10' , '男');
insert into user values(default , '凯里欧文' , '1994-08-06' , '男');
insert into user values(default , '张淋艳' , '1997-12-01' , '女');
insert into user values(default , '王珊珊' , '1995-03-01' , '女');
insert into user values(default , '唐佳丽' , '1994-07-01' , '女');
insert into user values(default , '杨力维' , '1995-10-20' , '女');select * from user;

3 jar包依赖

在flink/lib目录下添加依赖:

flink-sql-connector-mysql-cdc-2.3.0.jar

下载地址:

Central Repository: com/ververica/flink-sql-connector-mysql-cdc

4 启动sql-client

# 启动服务
/opt/flink/flink-1.16.2/bin/start-cluster.sh 
# 启动sql-client
/opt/flink/flink-1.16.2/bin/sql-client.sh

设置模式

set sql-client.execution.result-mode = tableau;

设置checkpont

set execution.checkpointing.interval=30sec;

建mysql的映射表

CREATE TABLE if not exists mysql_user (id     STRING,name   STRING,birth  STRING,gender    STRING,PRIMARY KEY (`id`) NOT ENFORCED
) WITH ('connector'= 'mysql-cdc','hostname'= '192.168.0.1','port'= '3306','username'= 'user','password'='password','server-time-zone'= 'Asia/Shanghai','debezium.snapshot.mode'='initial','database-name'= 'bigdata1','table-name'= 'user'
); 

执行查询语句,会生成一个flink job任务

select * from mysql_user; 

5 常用参数表

参数名

必填

默认值

类型

参数描述

connector

String

指定connector,这里填 mysql-cdc

hostname

String

MySql server 的主机名或者 IP 地址

username

String

连接 MySQL 数据库的用户名

password

String

连接 MySQL 数据库的密码

database-name

String

需要监控的数据库名,支持正则表达式

table-name

String

需要监控的表名,支持正则表达式

port

3306

Integer

MySQL 服务的端口号

server-id

Integer

当开启scan.incremental.snapshot.enabled时,建议指定server-id;server-id 可以是单个值,如5400; 也可以提供数值范围,如5400-5408

scan.incremental.snapshot.enabled

TRUE

Boolean

增量快照是读取表快照的新机制;和旧的快照读相比有以下优点:1. 并行读取 2. 支持checkpoint 3. 不需要锁表;当需要并行读取时,server-id需要设置数值范围,如5400-5408

scan.incremental.snapshot.chunk.size

8096

Integer

表快照的块大小

scan.snapshot.fetch.size

1024

Integer

每次读表接受的最大值

scan.startup.mode

initial

String

MySQL CDC 启动模式,有效值:initial 和 latest-offset

connect.timeout

30s

Duration

connector 连接 MySQL 服务的最长等待超时时间

connect.max-retries

3

Integer

connector 创建 MySQL 连接的重试次数

connection.pool.size

20

Integer

连接池的大小


系列文章 

 Fink CDC数据同步(一)环境部署icon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017355?spm=1001.2014.3001.5502
Fink CDC数据同步(二)MySQL数据同步icon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017472?spm=1001.2014.3001.5501
Fink CDC数据同步(三)Flink集成Hiveicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017571?spm=1001.2014.3001.5501
Fink CDC数据同步(四)Mysql数据同步到Kafkaicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023747?spm=1001.2014.3001.5501
Fink CDC数据同步(五)Kafka数据同步Hiveicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023837?spm=1001.2014.3001.5501

Fink CDC数据同步(六)数据入湖Hudiicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023939?spm=1001.2014.3001.5502

http://www.lryc.cn/news/295383.html

相关文章:

  • JavaWeb后端开发(第一期):Maven基础、Maven的安装配置、如何创建maven项目模块、maven的生命周期
  • Windows SDK(四)鼠标和键盘消息处理
  • LabVIEW汽车自燃监测预警系统
  • 数据图表方案,企业视频生产数据可视化
  • 【HarmonyOS应用开发】APP应用的通知(十五)
  • 开启一个服务,将服务器指定的文件读取,传播到网上其他终端
  • nii convert to 2D image【python】
  • C语言指针学习 之 指针是什么
  • 【文本到上下文 #10】探索地平线:GPT 和 NLP 中大型语言模型的未来
  • (四)elasticsearch 源码之索引流程分析
  • 飞天使-k8s知识点16-kubernetes实操1-pod
  • 【gcc】webrtc发送侧 基于丢包更新码率
  • 数字经济的未来:探索Web3的商业模式
  • Centos7部署MetaBase-v0.48.3
  • 【计算机网络】Socket的SO_TIMEOUT与连接超时时间
  • 解密 ARMS 持续剖析:如何用一个全新视角洞察应用的性能瓶颈?
  • 【OJ比赛日历】春节快乐 #02.10-02.16 #9场
  • 前端下载文件有哪些方式
  • vscode预览github上的markdown效果
  • 使用PaddleNLP识别垃圾邮件:用BERT做中文邮件内容分类,验证集准确率高达99.6%以上(附公开数据集)
  • 在bash或脚本中,如何并行执行命令或任务(命令行、parallel、make)
  • 拼音笔记笔记
  • 13. Threejs案例-绘制3D文字
  • clickhouse清理日志。
  • JS中实现继承
  • spring boot学习第九篇:操作mongo的集合和集合中的数据
  • momentJs推导日历组件
  • Linux C/C++ 原始套接字:打造链路层ping实现
  • TCP 粘包/拆包
  • 【Spring Boot 3】应用启动执行特定逻辑