当前位置: 首页 > news >正文

使用Flink CDC实时监控MySQL数据库变更

在现代数据架构中,实时数据处理变得越来越重要。Flink CDC(Change Data Capture)是一种强大的工具,可以帮助我们实时捕获数据库的变更,并进行处理。本文将介绍如何使用Flink CDC从MySQL数据库中读取变更数据,并将其打印到控制台。

环境准备

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.12.0</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.12.0</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.12.0</version>
</dependency>
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.12.0</version>
</dependency>
<dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.0.0</version>
</dependency>
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version>
</dependency>
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.23</version>
</dependency>
  1. 获取Flink执行环境

首先,我们需要获取Flink的执行环境。这是所有Flink作业的起点。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  1. 启用检查点和设置并行度

为了确保作业的容错性和状态恢复,我们需要启用检查点,并设置作业的并行度。

env.enableCheckpointing(500); // 每500毫秒创建一个检查点
env.setParallelism(1); // 设置作业的并行度为1
  1. 使用Debezium Source读取MySQL的binlog

接下来,我们使用Debezium Source读取MySQL的binlog。我们需要配置MySQL的连接信息、监控的数据库和表、反序列化器以及启动选项。

DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder().serverTimeZone("Asia/Shanghai") // 设置时区为亚洲/上海.hostname("localhost") // MySQL的IP地址.port(3306) // MySQL的端口.username("root") // MySQL的用户名.password("123456") // MySQL的密码.databaseList("my_db") // 监控的数据库.tableList("my_db.user") // 监控的数据库下的表.deserializer(new JsonDebeziumDeserializationSchema()) // 反序列化.startupOptions(StartupOptions.initial()) // 启动选项.build();

这里 JsonDebeziumDeserializationSchema类的代码如下:

import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;import java.util.List;/**
*  自定义DeserializationSchema进行反序列化。
*/public class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {@Overridepublic void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {//创建JSON对象用于存储最终数据JSONObject result = new JSONObject();String topic = sourceRecord.topic();String[] fields = topic.split("\\.");String database = fields[1];String tableName = fields[2];Struct value  = (Struct)sourceRecord.value();//获取before数据Struct before = value.getStruct("before");JSONObject beforeJson = getJson(before);//获取after数据Struct after = value.getStruct("after");JSONObject afterJson = getJson(after);//获取操作类型Envelope.Operation operation = Envelope.operationFor(sourceRecord);//将字段写入JSON对象result.put("database",database);result.put("tableName",tableName);result.put("type",operation);result.put("before",beforeJson);result.put("after",afterJson);//输出数据collector.collect(result.toJSONString());}/***  获取字段值并写入result对象* @param before* @return*/private JSONObject getJson(Struct before) {JSONObject jsonObject = new JSONObject();if(before != null){Schema beforeSchema = before.schema();List<Field> beforeFields = beforeSchema.fields();for (Field field : beforeFields) {Object beforeValue = before.get(field);jsonObject.put(field.name(), beforeValue);}}return jsonObject;}@Overridepublic TypeInformation getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}
}
  1. 添加数据源并打印数据

将Debezium源函数添加到Flink环境中,生成一个数据流,并将数据流中的数据打印到控制台。

DataStream<String> dataStreamSource = env.addSource(sourceFunction, TypeInformation.of(String.class));
DataStreamSink<String> print = dataStreamSource.print();
  1. 启动任务

最后,启动Flink作业,开始处理数据流。

env.execute("Flink-CDC");

6.测试

在这里插入图片描述

总结

通过上述步骤,我们可以使用Flink CDC实时监控MySQL数据库的变更,并将变更数据以JSON格式打印出来。这种方法不仅适用于数据监控,还可以用于实时数据处理和分析。

http://www.lryc.cn/news/380661.html

相关文章:

  • 学生课程信息管理系统
  • 如何看待鸿蒙HarmonyOS?
  • 【论文复现|智能算法改进】一种基于多策略改进的鲸鱼算法
  • yarn安装配置及使用教程
  • 有那么点道理。
  • 蔚蓝资源包和数据分析
  • MySQL----利用Mycat配置读写分离
  • 【科学计算与可视化】2. pandas 基础
  • 医学记录 --- 腋下异味
  • 【Linux】进程间通信_1
  • Linux Kernel入门到精通系列讲解(RV-Kernel 篇) 5.6 在kernel 中实现系统复位和系统关机驱动
  • 如何在Java中进行单元测试?
  • 代码随想录训练营Day32
  • 代码随想录训练营Day31
  • Docker 多阶段构建
  • Linux应急响应——知攻善防应急靶场-Linux(1)
  • 基于CDMA的多用户水下无线光通信(1)——背景介绍
  • vlan三层交换技术--交换机--(自作)
  • 基于springboot websocket和okhttp实现消息中转
  • @PostConstruct 注解的方法用于资源的初始化
  • (一)SvelteKit教程:hello world
  • 华为Atlas NPU ffmpeg 编译安装
  • Python 虚拟环境 requirements.txt 文件生成 ;pipenv导出pip安装文件
  • Less与Sass的区别
  • 力扣-2663
  • CausalMMM:基于因果结构学习的营销组合建模
  • 编译 CUDA 程序的基本知识和步骤
  • [SAP ABAP] 排序内表数据
  • 【UML用户指南】-21-对基本行为建模-活动图
  • 【web2】jquary,bootstrap,vue