当前位置: 首页 > news >正文

Flink CDC实时同步mysql数据

官方参考资料:

https://nightlies.apache.org/flink/flink-cdc-docs-master/zh/docs/connectors/flink-sources/mysql-cdc/

Apache Flink 的 Change Data Capture (CDC) 是一种用于捕获数据库变化(如插入、更新和删除操作)的技术。Flink CDC Connector 允许你使用 Flink 从 MySQL 等数据库中读取变化数据,并处理这些流式数据。以下是如何在 Flink 中配置和使用 CDC Connector 读取 MySQL 数据的步骤:

前提条件

  1. MySQL 数据库:确保你已经有一个 MySQL 数据库,并且知道数据库的连接信息(如主机名、端口、用户名、密码、数据库名)。
  2. Flink 环境:你需要在本地或集群上配置好 Flink 环境。
  3. MySQL Binlog:确保 MySQL 数据库启用了 Binlog(Binary Logging),因为 Flink CDC 依赖于 Binlog 来捕获数据变化。

支持的数据库

依赖

Maven dependency

<dependency>

   <groupId>org.apache.flink</groupId>

   <artifactId>flink-connector-mysql-cdc</artifactId>

   <!--  请使用已发布的版本依赖,snapshot 版本的依赖需要本地自行编译。 -->

   <version>3.3-SNAPSHOT</version>

</dependency>

SQL Client JAR

下载链接仅在已发布版本可用,请在文档网站左下角选择浏览已发布的版本。

下载 flink-sql-connector-mysql-cdc-3.3-SNAPSHOT.jar 到 <FLINK_HOME>/lib/ 目录下。

由于 MySQL Connector 采用的 GPLv2 协议与 Flink CDC 项目不兼容,我们无法在 jar 包中提供 MySQL 连接器。 您可能需要手动配置以下依赖:

配置 MySQL Binlog

修改MySQL 配置文件(my.cnf或my.ini):

[mysqld]

server-id = 1

log-bin = mysql-bin

binlog-format = ROW

配置 MySQL 服务器

先创建一个 MySQL 用户,并授权。

  1. 创建 MySQL 用户:

mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';

  1. 向用户授予所需的权限:

mysql> GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';

注意: 在 scan.incremental.snapshot.enabled 参数已启用时(默认情况下已启用)时,不再需要授予 reload 权限。

  1. 刷新用户权限:

mysql> FLUSH PRIVILEGES;

创建 MySQL CDC 表

MySQL CDC 表可以定义如下:

-- 每 3 秒做一次 checkpoint,用于测试,生产配置建议5到10分钟                     

Flink SQL> SET 'execution.checkpointing.interval' = '3s';  

-- 在 Flink SQL中注册 MySQL 表 'orders'

Flink SQL> CREATE TABLE orders (

     order_id INT,

     order_date TIMESTAMP(0),

     customer_name STRING,

     price DECIMAL(10, 5),

     product_id INT,

     order_status BOOLEAN,

     PRIMARY KEY(order_id) NOT ENFORCED

     ) WITH (

     'connector' = 'mysql-cdc',

     'hostname' = 'localhost',

     'port' = '3306',

     'username' = 'root',

     'password' = '123456',

     'database-name' = 'mydb',

     'table-name' = 'orders');

-- 从订单表读取全量数据(快照)和增量数据(binlog)

Flink SQL> SELECT * FROM orders;

关于无主键表

从2.4.0 版本开始支持无主键表,使用无主键表必须设置 scan.incremental.snapshot.chunk.key-column,且只能选择非空类型的一个字段。

在使用无主键表时,需要注意以下两种情况。

  1. 配置 scan.incremental.snapshot.chunk.key-column 时,如果表中存在索引,请尽量使用索引中的列来加快 select 速度。
  2. 无主键表的处理语义由 scan.incremental.snapshot.chunk.key-column 指定的列的行为决定:
  • 如果指定的列不存在更新操作,此时可以保证 Exactly once 语义。
  • 如果指定的列存在更新操作,此时只能保证 At least once 语义。但可以结合下游,通过指定下游主键,结合幂等性操作来保证数据的正确性。
http://www.lryc.cn/news/503750.html

相关文章:

  • 题解 - 自然数无序拆分
  • dfs_bool_void 两种写法感悟
  • MySQL 主从复制与 Binlog 深度解析
  • 大连理工大学《2024年845自动控制原理真题》 (完整版)
  • Java性能调优 - 多线程性能调优
  • 行为树详解(4)——节点参数配置化
  • 计算机网络中的三大交换技术详解与实现
  • 《杨辉三角》
  • ARM学习(35)单元测试框架以及MinGW GCC覆盖率报告
  • 边缘计算+人工智能:让设备更聪明的秘密
  • neo4j知识图谱AOPC的安装方法
  • 图像分割数据集植物图像叶片健康状态分割数据集labelme格式180张3类别
  • Python学习(二)—— 基础语法(上)
  • Cesium-(Primitive)-(CircleOutlineGeometry)
  • 计算机网络技术基础:2.计算机网络的组成
  • EasyExcel使用管道流连接InputStream和OutputStream
  • OpenWebUI连接不上Ollama模型,Ubuntu24.04
  • C#C++获取当前应用程序的安装目录和工作目录
  • Linux中vi和vim的区别详解
  • 2021 年 6 月青少年软编等考 C 语言四级真题解析
  • UE5编辑器下将RenderTarget输出为UTexture并保存
  • 【漏洞复现】CVE-2024-34102 Magento Open Source XXE漏洞
  • soul大数据面试题及参考答案
  • GLM-4-Plus初体验
  • 基于springboot+vue的高校校园交友交流平台设计和实现
  • Nacos 3.0 Alpha 发布,在安全、泛用、云原生更进一步
  • 【前端开发】HTML+CSS网页,可以拿来当作业(免费开源)
  • 【人工智能-中级】卷积神经网络(CNN)的中阶应用:从图像分类到目标检测
  • [笔记] 编译LetMeowIn(C++汇编联编程序)过程
  • 牛客小白月赛107(A~E)