当前位置: 首页 > news >正文

通过 Flink 和 CDC 从 Oracle 数据库获取增量数据,并将这些增量数据同步到 MySQL 数据库中

目录

✅ 1. 引入必要的依赖

✅ 2. CDC 配置

✅ 3. 设计增量同步流程

✅ 4. Flink SQL 流程

✅ 4.1 Oracle CDC 数据源

✅ 4.2 MySQL 数据目标

✅ 4.3 字段映射与数据处理

✅ 4.4 增量同步

✅ 5. 完整的 Flink Job 示例

✅ 5.1 Java 程序示例

✅ 6. 注意事项

✅ 1. 确保 CDC 源是独立运行的

✅ 2. 确保处理逻辑是并行的

✅ 3. 任务并行调度与资源配置

✅ 4. 将 Source 拆分为不同 Job(可选)

✅ 5. 验证并行执行是否生效

✅ 示例更新后的处理代码:

✅ 总结

✅ 7. 总结


✅ 1. 引入必要的依赖

你已经引入了 flink-connector-oracle-cdcflink-connector-mysql-cdc,这是对的。除此之外,你还需要引入 Flink 的 flink-connector-jdbc 来支持写入到 MySQL。

✅ 2. CDC 配置

Flink 提供了 CDC 连接器用于增量数据读取(捕获数据变更)。为了从 Oracle 读取增量数据,你需要使用 Flink Oracle CDC 连接器,同时从 MySQL 写入数据时使用 Flink MySQL CDC 连接器。

✅ 3. 设计增量同步流程

在 Flink 中,你需要做以下几件事:

  1. 从 Oracle 中捕获增量数据。

  2. 根据需求进行字段映射。

  3. 将处理后的数据同步到 MySQL。

下面是一个简单的 Flink SQL 作业来实现这个过程:

✅ 4. Flink SQL 流程

✅ 4.1 Oracle CDC 数据源

首先,我们需要从 Oracle 中获取增量数据。为了做到这一点,使用 Flink 的 Oracle CDC 连接器。

CREATE TABLE oracle_source (id INT,name STRING,email STRING,modified_time TIMESTAMP(3),PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'oracle-cdc','hostname' = 'your_oracle_host','port' = 'your_oracle_port','username' = 'your_oracle_username','password' = 'your_oracle_password','database-name' = 'your_database','table-name' = 'your_oracle_table','debezium.snapshot.mode' = 'initial',  -- 'initial' for the first snapshot'debezium.scan.startup.mode' = 'earliest-offset'
);

✅ 4.2 MySQL 数据目标

然后,创建一个表来映射 MySQL 目标表,并进行字段映射。

CREATE TABLE mysql_sink (id INT,full_name STRING, -- name字段映射到 full_nameemail STRING,updated_time TIMESTAMP(3),PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc','hostname' = 'your_mysql_host','port' = 'your_mysql_port','username' = 'your_mysql_username','password' = 'your_mysql_password','database-name' = 'your_database','table-name' = 'your_mysql_table'
);

✅ 4.3 字段映射与数据处理

你可以在 Flink SQL 查询中做字段映射和转换。假设你需要把 name 映射到 MySQL 中的 full_name 字段,并且对 modified_time 字段进行一些处理。

INSERT INTO mysql_sink
SELECT id,name AS full_name,    -- 字段映射email,modified_time AS updated_time  -- 字段映射
FROM oracle_source;

✅ 4.4 增量同步

这样配置之后,Flink 将从 Oracle 捕获增量数据并同步到 MySQL。Flink 会自动处理增量数据的捕获和写入。你还可以对数据进行更复杂的处理(如数据清洗、转换等):

-- 在 SQL 查询中对数据进行进一步处理
INSERT INTO mysql_sink
SELECT id,CONCAT(name, ' (Processed)') AS full_name,   -- 字段映射并做简单的数据处理email,CAST(DATE_FORMAT(modified_time, 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP) AS updated_time  -- 数据格式转换
FROM oracle_source;

✅ 5. 完整的 Flink Job 示例

这个 Flink Job 会读取 Oracle 数据库的增量数据,进行必要的字段映射和转换后,将数据同步到 MySQL。

✅ 5.1 Java 程序示例

如果你希望用 Java 来实现相同的功能,这里是一个基本的例子:

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;public class FlinkOracleToMySQL {public static void main(String[] args) throws Exception {// 设置 Flink 流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置 CDC 连接器从 Oracle 获取增量数据DataStream<Row> oracleSourceStream = env.addSource(new OracleSourceFunction());// 对数据进行映射DataStream<Row> mappedStream = oracleSourceStream.map(new MapFunction<Row, Row>() {@Overridepublic Row map(Row row) throws Exception {// 做字段映射、转换等操作String fullName = row.getField(1).toString();  // 假设 name 是第 2 个字段return Row.of(row.getField(0), fullName, row.getField(2), row.getField(3));}});// 定义 JDBC Sink,将数据写入 MySQLmappedStream.addSink(JdbcSink.sink("INSERT INTO your_mysql_table (id, full_name, email, updated_time) VALUES (?, ?, ?, ?)",(statement, row) -> {statement.setInt(1, (Integer) row.getField(0));statement.setString(2, (String) row.getField(1));statement.setString(3, (String) row.getField(2));statement.setTimestamp(4, (Timestamp) row.getField(3));},new JdbcExecutionOptions.Builder().withBatchSize(1000).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://your_mysql_host:3306/your_database").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("your_mysql_username").withPassword("your_mysql_password").build()));// 执行 Flink 作业env.execute("Oracle to MySQL Incremental Sync");}
}

✅ 6. 注意事项

  • 时间戳字段:确保源表和目标表的时间戳字段能够正确地转换。你可以使用 Flink SQL 中的内建函数 CASTDATE_FORMAT 来进行时间格式的转换。

  • 字段映射:在 SQL 中直接做字段映射。如果有需要可以在 Java 代码中进行更多的转换和处理。

  • 增量同步配置:CDC 的增量同步依赖于数据库的 binlogredo log。确保你的 Oracle 数据库已经正确配置了 log-miner 或其他 CDC 配置。

  • 执行增量同步过程中可能会遇到实时性的问题,这个问题在开发过程中已经遇到,下面是代码展示:

怎么才能实现 SHOP_PLU 和 SHOP_STK 表的并行处理,确保两张表的 CDC 源和数据处理逻辑能够独立并行执行

​
public class OracleToMySQLSyncJob {private static final Logger LOG = LoggerFactory.getLogger(OracleToMySQLSyncJob.class);public static void main(String[] args) throws Exception {LOG.info("启动Oracle到MySQL数据同步作业...");// 创建Flink执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 简化配置:提高并行度以加快处理速度int parallelism = 4; // 固定使用4个并行度env.setParallelism(parallelism);// 简化配置:减少检查点频率以提高性能env.enableCheckpointing(30000); // 30秒检查点间隔// 简化配置:减少缓冲区超时以提高吞吐量env.setBufferTimeout(50); // 50ms缓冲区超时LOG.info("Flink简化配置 - 检查点间隔: 30000ms, 并行度: {}, 缓冲区超时: 50ms", parallelism);// 创建Oracle CDC Source - 处理SHOP_PLU和SHOP_STK表SourceFunction<String> shopPluSource = createShopPluSource();SourceFunction<String> shopStkSource = createShopStkSource();// 添加SHOP_PLU数据源 - CDC Source必须为并行度1DataStreamSource<String> shopPluStream = env.addSource(shopPluSource, "SHOP_PLU CDC Source").setParallelism(1);// 添加SHOP_STK数据源 - CDC Source必须为并行度1DataStreamSource<String> shopStkStream = env.addSource(shopStkSource, "SHOP_STK CDC Source").setParallelism(1);// 数据转换和写入MySQLprocessAndSink(shopPluStream, "SHOP_PLU");processAndSink(shopStkStream, "SHOP_STK");LOG.info("数据流已配置完成 - 使用两个独立的CDC源分别处理SHOP_PLU和SHOP_STK表");// 开始执行Oracle到MySQL数据同步作业env.execute("Oracle to MySQL Sync Job");}/*** 创建SHOP_PLU表的Oracle CDC Source*/private static SourceFunction<String> createShopPluSource() {return createOracleSource(new String[]{"CMPINT.SHOP_PLU"}, "SHOP_PLU");}/*** 创建SHOP_STK表的Oracle CDC Source*/private static SourceFunction<String> createShopStkSource() {return createOracleSource(new String[]{"CMPINT.SHOP_STK"}, "SHOP_STK");}/*** 创建Oracle CDC Source*/private static SourceFunction<String> createOracleSource(String[] syncTables, String sourceName) {// Oracle连接信息String oracleHost = "localhost";int oraclePort = 11521;String oracleSid = "orcl";String oracleUsername = "sys";String oraclePassword = "sys";String oracleSchema = "shop";LOG.info("创建{}源 - 表: {}", sourceName, String.join(", ", syncTables));Properties debeziumProperties = new Properties();debeziumProperties.setProperty("decimal.handling.mode", "string");debeziumProperties.setProperty("database.tablename.case.insensitive", "false");// 配置快照模式 - 只捕获增量变更,不读取全量数据debeziumProperties.setProperty("snapshot.mode", "never");debeziumProperties.setProperty("snapshot.locking.mode", "none");// 简化配置:基本性能配置debeziumProperties.setProperty("database.history.store.only.captured.tables.ddl", "true");debeziumProperties.setProperty("log.mining.strategy", "online_catalog");debeziumProperties.setProperty("database.connection.adapter", "logminer");// 简化配置:提高批处理大小以加快处理速度debeziumProperties.setProperty("max.batch.size", "4096");debeziumProperties.setProperty("max.queue.size", "16384");// 简化配置:减少LogMiner休眠时间以提高速度debeziumProperties.setProperty("log.mining.sleep.time.default.ms", "100");debeziumProperties.setProperty("log.mining.batch.size.default", "20000");// 简化配置:基本连接配置debeziumProperties.setProperty("database.connection.timeout.ms", "30000");debeziumProperties.setProperty("database.query.timeout.ms", "60000");// 添加详细日志配置用于调试debeziumProperties.setProperty("log.mining.continuous.mine", "true");debeziumProperties.setProperty("log.mining.archive.log.hours", "1");debeziumProperties.setProperty("log.mining.archive.log.only.mode", "false");// 错误处理和重试配置debeziumProperties.setProperty("errors.tolerance", "all");debeziumProperties.setProperty("errors.log.enable", "true");debeziumProperties.setProperty("errors.log.include.messages", "true");return OracleSource.<String>builder().hostname(oracleHost).port(oraclePort).database(oracleSid).schemaList(oracleSchema).tableList(syncTables).username(oracleUsername).password(oraclePassword).deserializer(new JsonDebeziumDeserializationSchema()).debeziumProperties(debeziumProperties).build();}
}    ​

为了实现真正的并行处理,需要从以下几个关键方面来确保两张表的 CDC 源和数据处理逻辑可以并行运行:

✅ 1. 确保 CDC 源是独立运行的

你已经将两个表的 CDC 源分别配置为:

DataStreamSource<String> shopPluStream = env.addSource(shopPluSource, "SHOP_PLU CDC Source").setParallelism(1);DataStreamSource<String> shopStkStream = env.addSource(shopStkSource, "SHOP_STK CDC Source").setParallelism(1);

Flink CDC(如使用 Ververica 提供的 OracleSource通常要求源并行度为 1,这在目前不可避免。但这不会影响整体并行性,只要下游操作并行即可。


✅ 2. 确保处理逻辑是并行的

你要做的,是确保 processAndSink() 中的数据转换和写入逻辑设置了更高的并行度

例如:

private static void processAndSink(DataStream<String> sourceStream, String tableName) {sourceStream.map(new YourParseOrTransformFunction()) // 可自定义逻辑.setParallelism(4)  // <-- 设置 map 操作的并行度.addSink(new YourMySQLSink())           // 自定义 MySQL Sink.setParallelism(4);  // <-- 设置 sink 的并行度
}

这样可以确保即使 Source 是串行的,下游的处理和写入是并行运行的。


✅ 3. 任务并行调度与资源配置

确保任务的资源允许并行化生效:

  • 设置 Flink 的 TaskManagerslots 数量足够(至少 8 个以上,便于两个流各占多个并行任务)。

  • 调度策略设置为 SlotSharingGroup,确保两个 CDC 流能被同时调度:

    shopPluStream.slotSharingGroup("shop_plu_group");
    shopStkStream.slotSharingGroup("shop_stk_group");
    

✅ 4. 将 Source 拆分为不同 Job(可选)

如果你追求极致隔离和并行性,还可以将两个 CDC 流拆成两个完全独立的 Flink Job,这样两个作业可以部署在不同的资源池上,互不干扰:

  • Job A:同步 SHOP_PLU

  • Job B:同步 SHOP_STK

适合高吞吐场景,但复杂度更高。


✅ 5. 验证并行执行是否生效

你可以通过以下方法检查并行处理是否生效:

  • 使用 Flink Web UI 查看每个 Operator 的并行度与运行状态

  • 观察两个表的处理是否互不影响、是否同时在运行

  • 查看 CPU/Memory 负载是否均衡分布在多个 task slot 上


✅ 示例更新后的处理代码:

private static void processAndSink(DataStream<String> sourceStream, String tableName) {sourceStream.map(new SimpleStringToRowFunction(tableName)).name(tableName + " Parser").setParallelism(4).addSink(new MySQLSinkFunction()).name(tableName + " MySQL Sink").setParallelism(4);
}

✅ 总结

目标方法
两个表独立采集分别构建并 add 两个 OracleSource,每个并行度设为 1
数据转换和 Sink 并行处理map()addSink() 设置更高的并行度(如 4)
Flink 实际并发生效保证 Flink 配置了足够的 Task Slot
最大隔离性将两个流变成两个独立作业(部署两个 Job)

✅ 7. 总结

  • 使用 Flink 的 Oracle CDC 和 MySQL CDC 连接器从 Oracle 捕获增量数据,并写入 MySQL。

  • 使用 Flink SQL 或 Java 代码对字段进行映射和转换。

  • 配置好日志和异常处理,确保 Flink 作业能够稳定运行。

http://www.lryc.cn/news/609765.html

相关文章:

  • [GESP202306 四级] 2023年6月GESP C++四级上机题超详细题解,附带讲解视频!
  • Spring Boot + ShardingSphere 实现分库分表 + 读写分离实战
  • AWS VPC Transit Gateway 可观测最佳实践
  • 【物联网】基于树莓派的物联网开发【23】——树莓派安装SQLite嵌入式数据库
  • 16_OpenCV_漫水填充(floodFill)
  • Nginx vs Spring Cloud Gateway:限流功能深度对比与实践指南
  • Spring Cloud Gateway 实现登录校验:构建统一认证入口
  • 图片的放大缩小选择全屏
  • XSS的原型链污染1--原型链解释
  • 笔记本电脑联想T14重启后无法识别外置红米屏幕
  • Django + Vue 项目部署(1panel + openresty)
  • AI“炼金术”:破解绿色水泥的配方密码
  • 三防平板电脑是什么?这款三防平板支持红外测温!
  • 电脑上不了网怎么办?【图文详解】wifi有网络但是电脑连不上网?网络设置
  • 电脑一键重装系统win7/win10/win11无需U盘(无任何捆绑软件图文教程)
  • Ribbon 核心原理与架构详解:服务负载均衡的隐形支柱
  • 工作流绑定卡片优化用户体验-练习我要找工作智能体
  • 【CVPR2025】计算机视觉|AnomalyNCD:让工业异常分类“脱胎换骨”!
  • transformer与神经网络
  • ubuntu24.01安装odoo18
  • 纯前端使用ExcelJS插件导出Excel
  • 计算机视觉(2)车规摄像头标准
  • 5天挑战网络编程 -DAY1(linux版)
  • python:讲懂决策树,为理解随机森林算法做准备,以示例带学习,通俗易懂,容易理解和掌握
  • 句子表征-文本匹配--representation-based/interactive-based
  • 学习日志27 python
  • 基于开源AI智能名片链动2+1模式与S2B2C商城小程序的直播营销销量转化机制研究
  • 短剧小程序系统开发:引领影视消费新潮流
  • 【世纪龙科技】汽车自动变速器拆装虚拟实训软件
  • 音视频文案字幕一键提取,免费使用,效率软件!