当前位置: 首页 > news >正文

SpringBoot 通过集成 Flink CDC 来实时追踪 MySql 数据变动

一、概述

Flink CDC 是一个基于 Apache Flink 的数据捕获工具,能够实时捕获和处理数据库的变动事件。通过集成 Flink CDC,可以实时追踪 MySQL 数据库中的数据变动,构建高效的数据处理和分析应用。本文将介绍如何在 SpringBoot 项目中集成 Flink CDC,并实现对 MySQL 数据变动的实时追踪。

二、准备工作

1. 环境准备
  • JDK 1.8+
  • Maven 3.6+
  • MySQL 数据库
  • Apache Flink 1.12+
  • SpringBoot 2.5+
2. 创建 MySQL 数据库和表
CREATE DATABASE test_db;USE test_db;CREATE TABLE users (id INT AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,email VARCHAR(255) NOT NULL,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
​

三、集成步骤

1. 引入依赖

在 SpringBoot 项目的 pom.xml 中添加必要的依赖:

<dependencies><!-- Spring Boot Dependencies --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><!-- Flink Dependencies --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.12.0</version></dependency><!-- Flink CDC Dependencies --><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.0.0</version></dependency>
</dependencies>
​
2. 配置 Flink CDC

在 SpringBoot 项目中创建 Flink CDC 配置类:

import com.ververica.cdc.connectors.mysql.MySQLSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FlinkCdcConfig {@Beanpublic DataStreamSource<String> mysqlSource(StreamExecutionEnvironment env) {MySQLSource<String> source = MySQLSource.<String>builder().hostname("localhost").port(3306).databaseList("test_db").tableList("test_db.users").username("root").password("password").deserializer(new JsonDebeziumDeserializationSchema()).startupOptions(StartupOptions.initial()).build();return env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source");}
}
​
3. 创建 Flink 作业

在 SpringBoot 项目中创建 Flink 作业:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;@Component
public class FlinkJobRunner implements CommandLineRunner {private final StreamExecutionEnvironment env;private final DataStreamSource<String> mysqlSource;public FlinkJobRunner(StreamExecutionEnvironment env, DataStreamSource<String> mysqlSource) {this.env = env;this.mysqlSource = mysqlSource;}@Overridepublic void run(String... args) throws Exception {mysqlSource.print();env.execute("Flink CDC Job");}
}
​
4. 启动 SpringBoot 应用

运行 SpringBoot 应用,启动后会自动执行 Flink 作业,并打印 MySQL 数据库中 users 表的变动。

四、验证和测试

1. 插入测试数据

向 MySQL 数据库中插入数据:

INSERT INTO users (name, email) VALUES ('Alice', 'alice@example.com');
INSERT INTO users (name, email) VALUES ('Bob', 'bob@example.com');
​
2. 验证输出

查看 SpringBoot 应用的控制台输出,确认是否正确捕获并打印了 MySQL 数据库中的变动。

http://www.lryc.cn/news/571354.html

相关文章:

  • 基于高性能的光频域反射(OFDR)分布式光纤传感解决方案
  • 爬虫技术:从基础到高级,探索数据抓取的奥秘
  • 深度融合数智化,百胜软件联合华为云加速零售行业转型升级
  • 【Manus第三篇-Prompt优化】两周实战,一套注意力视角的prompt优化框架,真的有用!
  • 【笔记】MSYS2 的 MinGW64 环境中正确安装 Python 相关环境管理工具 (Poetry、Virtualenv、Pipenv 和 UV)
  • 复现 apache HTTPD 换行解析漏洞(CVE-2017-15715)
  • ABP vNext + Sentry + ELK Stack:打造高可用异常跟踪与日志可视化平台
  • STM32的内部RC与外部晶振电路
  • python打卡day52
  • C++ 学习 多线程 2025年6月17日18:41:30
  • 插入排序C语言版
  • 容器部署springboot项目--入门
  • Vue-8-前端框架Vue之应用基础响应式数据和计算属性
  • 如何设计一个敏感词过滤系统
  • OpenCV 图像仿射变换之旋转
  • flutter的widget的执行顺序,单个组建的执行顺序
  • 什么是数据清洗?数据清洗有哪些步骤?
  • 算法导论第九章:顺序统计的艺术 - 高效查找中位数与第K小元素
  • 【AI分享:LangGraph 开源项目的深度分析报告
  • Spring Boot 数据校验: Bean Validation 注解、分组校验与全局异常处理
  • SSRF3 任意文件读取
  • 游戏引擎学习路径与技术栈指南
  • 基于Qt的配置管理界面实现:保存与加载配置文件
  • SpringCloud + Zookeeper + Feign整合及Feign原理
  • JSON-RPC 2.0 与 1.0 对比总结
  • java面试总结-20250616
  • 字符操作函数续上
  • 图扑 HT 3D 场景视频嵌入应用功能
  • cuda编程笔记(4)--纹理内存
  • OpenCV——图像形态学