通过 Flink 和 CDC 从 Oracle 数据库获取增量数据,并将这些增量数据同步到 MySQL 数据库中
目录
✅ 1. 引入必要的依赖
✅ 2. CDC 配置
✅ 3. 设计增量同步流程
✅ 4. Flink SQL 流程
✅ 4.1 Oracle CDC 数据源
✅ 4.2 MySQL 数据目标
✅ 4.3 字段映射与数据处理
✅ 4.4 增量同步
✅ 5. 完整的 Flink Job 示例
✅ 5.1 Java 程序示例
✅ 6. 注意事项
✅ 1. 确保 CDC 源是独立运行的
✅ 2. 确保处理逻辑是并行的
✅ 3. 任务并行调度与资源配置
✅ 4. 将 Source 拆分为不同 Job(可选)
✅ 5. 验证并行执行是否生效
✅ 示例更新后的处理代码:
✅ 总结
✅ 7. 总结
✅ 1. 引入必要的依赖
你已经引入了 flink-connector-oracle-cdc
和 flink-connector-mysql-cdc
,这是对的。除此之外,你还需要引入 Flink 的 flink-connector-jdbc
来支持写入到 MySQL。
✅ 2. CDC 配置
Flink 提供了 CDC 连接器用于增量数据读取(捕获数据变更)。为了从 Oracle 读取增量数据,你需要使用 Flink Oracle CDC
连接器,同时从 MySQL 写入数据时使用 Flink MySQL CDC
连接器。
✅ 3. 设计增量同步流程
在 Flink 中,你需要做以下几件事:
-
从 Oracle 中捕获增量数据。
-
根据需求进行字段映射。
-
将处理后的数据同步到 MySQL。
下面是一个简单的 Flink SQL 作业来实现这个过程:
✅ 4. Flink SQL 流程
✅ 4.1 Oracle CDC 数据源
首先,我们需要从 Oracle 中获取增量数据。为了做到这一点,使用 Flink 的 Oracle CDC 连接器。
CREATE TABLE oracle_source (id INT,name STRING,email STRING,modified_time TIMESTAMP(3),PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'oracle-cdc','hostname' = 'your_oracle_host','port' = 'your_oracle_port','username' = 'your_oracle_username','password' = 'your_oracle_password','database-name' = 'your_database','table-name' = 'your_oracle_table','debezium.snapshot.mode' = 'initial', -- 'initial' for the first snapshot'debezium.scan.startup.mode' = 'earliest-offset'
);
✅ 4.2 MySQL 数据目标
然后,创建一个表来映射 MySQL 目标表,并进行字段映射。
CREATE TABLE mysql_sink (id INT,full_name STRING, -- name字段映射到 full_nameemail STRING,updated_time TIMESTAMP(3),PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc','hostname' = 'your_mysql_host','port' = 'your_mysql_port','username' = 'your_mysql_username','password' = 'your_mysql_password','database-name' = 'your_database','table-name' = 'your_mysql_table'
);
✅ 4.3 字段映射与数据处理
你可以在 Flink SQL 查询中做字段映射和转换。假设你需要把 name
映射到 MySQL 中的 full_name
字段,并且对 modified_time
字段进行一些处理。
INSERT INTO mysql_sink
SELECT id,name AS full_name, -- 字段映射email,modified_time AS updated_time -- 字段映射
FROM oracle_source;
✅ 4.4 增量同步
这样配置之后,Flink 将从 Oracle 捕获增量数据并同步到 MySQL。Flink 会自动处理增量数据的捕获和写入。你还可以对数据进行更复杂的处理(如数据清洗、转换等):
-- 在 SQL 查询中对数据进行进一步处理
INSERT INTO mysql_sink
SELECT id,CONCAT(name, ' (Processed)') AS full_name, -- 字段映射并做简单的数据处理email,CAST(DATE_FORMAT(modified_time, 'yyyy-MM-dd HH:mm:ss') AS TIMESTAMP) AS updated_time -- 数据格式转换
FROM oracle_source;
✅ 5. 完整的 Flink Job 示例
这个 Flink Job 会读取 Oracle 数据库的增量数据,进行必要的字段映射和转换后,将数据同步到 MySQL。
✅ 5.1 Java 程序示例
如果你希望用 Java 来实现相同的功能,这里是一个基本的例子:
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;public class FlinkOracleToMySQL {public static void main(String[] args) throws Exception {// 设置 Flink 流式执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置 CDC 连接器从 Oracle 获取增量数据DataStream<Row> oracleSourceStream = env.addSource(new OracleSourceFunction());// 对数据进行映射DataStream<Row> mappedStream = oracleSourceStream.map(new MapFunction<Row, Row>() {@Overridepublic Row map(Row row) throws Exception {// 做字段映射、转换等操作String fullName = row.getField(1).toString(); // 假设 name 是第 2 个字段return Row.of(row.getField(0), fullName, row.getField(2), row.getField(3));}});// 定义 JDBC Sink,将数据写入 MySQLmappedStream.addSink(JdbcSink.sink("INSERT INTO your_mysql_table (id, full_name, email, updated_time) VALUES (?, ?, ?, ?)",(statement, row) -> {statement.setInt(1, (Integer) row.getField(0));statement.setString(2, (String) row.getField(1));statement.setString(3, (String) row.getField(2));statement.setTimestamp(4, (Timestamp) row.getField(3));},new JdbcExecutionOptions.Builder().withBatchSize(1000).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://your_mysql_host:3306/your_database").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("your_mysql_username").withPassword("your_mysql_password").build()));// 执行 Flink 作业env.execute("Oracle to MySQL Incremental Sync");}
}
✅ 6. 注意事项
-
时间戳字段:确保源表和目标表的时间戳字段能够正确地转换。你可以使用 Flink SQL 中的内建函数
CAST
或DATE_FORMAT
来进行时间格式的转换。 -
字段映射:在 SQL 中直接做字段映射。如果有需要可以在 Java 代码中进行更多的转换和处理。
-
增量同步配置:CDC 的增量同步依赖于数据库的
binlog
或redo log
。确保你的 Oracle 数据库已经正确配置了log-miner
或其他 CDC 配置。 -
执行增量同步过程中可能会遇到实时性的问题,这个问题在开发过程中已经遇到,下面是代码展示:
怎么才能实现 SHOP_PLU 和 SHOP_STK 表的并行处理,确保两张表的 CDC 源和数据处理逻辑能够独立并行执行
public class OracleToMySQLSyncJob {private static final Logger LOG = LoggerFactory.getLogger(OracleToMySQLSyncJob.class);public static void main(String[] args) throws Exception {LOG.info("启动Oracle到MySQL数据同步作业...");// 创建Flink执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 简化配置:提高并行度以加快处理速度int parallelism = 4; // 固定使用4个并行度env.setParallelism(parallelism);// 简化配置:减少检查点频率以提高性能env.enableCheckpointing(30000); // 30秒检查点间隔// 简化配置:减少缓冲区超时以提高吞吐量env.setBufferTimeout(50); // 50ms缓冲区超时LOG.info("Flink简化配置 - 检查点间隔: 30000ms, 并行度: {}, 缓冲区超时: 50ms", parallelism);// 创建Oracle CDC Source - 处理SHOP_PLU和SHOP_STK表SourceFunction<String> shopPluSource = createShopPluSource();SourceFunction<String> shopStkSource = createShopStkSource();// 添加SHOP_PLU数据源 - CDC Source必须为并行度1DataStreamSource<String> shopPluStream = env.addSource(shopPluSource, "SHOP_PLU CDC Source").setParallelism(1);// 添加SHOP_STK数据源 - CDC Source必须为并行度1DataStreamSource<String> shopStkStream = env.addSource(shopStkSource, "SHOP_STK CDC Source").setParallelism(1);// 数据转换和写入MySQLprocessAndSink(shopPluStream, "SHOP_PLU");processAndSink(shopStkStream, "SHOP_STK");LOG.info("数据流已配置完成 - 使用两个独立的CDC源分别处理SHOP_PLU和SHOP_STK表");// 开始执行Oracle到MySQL数据同步作业env.execute("Oracle to MySQL Sync Job");}/*** 创建SHOP_PLU表的Oracle CDC Source*/private static SourceFunction<String> createShopPluSource() {return createOracleSource(new String[]{"CMPINT.SHOP_PLU"}, "SHOP_PLU");}/*** 创建SHOP_STK表的Oracle CDC Source*/private static SourceFunction<String> createShopStkSource() {return createOracleSource(new String[]{"CMPINT.SHOP_STK"}, "SHOP_STK");}/*** 创建Oracle CDC Source*/private static SourceFunction<String> createOracleSource(String[] syncTables, String sourceName) {// Oracle连接信息String oracleHost = "localhost";int oraclePort = 11521;String oracleSid = "orcl";String oracleUsername = "sys";String oraclePassword = "sys";String oracleSchema = "shop";LOG.info("创建{}源 - 表: {}", sourceName, String.join(", ", syncTables));Properties debeziumProperties = new Properties();debeziumProperties.setProperty("decimal.handling.mode", "string");debeziumProperties.setProperty("database.tablename.case.insensitive", "false");// 配置快照模式 - 只捕获增量变更,不读取全量数据debeziumProperties.setProperty("snapshot.mode", "never");debeziumProperties.setProperty("snapshot.locking.mode", "none");// 简化配置:基本性能配置debeziumProperties.setProperty("database.history.store.only.captured.tables.ddl", "true");debeziumProperties.setProperty("log.mining.strategy", "online_catalog");debeziumProperties.setProperty("database.connection.adapter", "logminer");// 简化配置:提高批处理大小以加快处理速度debeziumProperties.setProperty("max.batch.size", "4096");debeziumProperties.setProperty("max.queue.size", "16384");// 简化配置:减少LogMiner休眠时间以提高速度debeziumProperties.setProperty("log.mining.sleep.time.default.ms", "100");debeziumProperties.setProperty("log.mining.batch.size.default", "20000");// 简化配置:基本连接配置debeziumProperties.setProperty("database.connection.timeout.ms", "30000");debeziumProperties.setProperty("database.query.timeout.ms", "60000");// 添加详细日志配置用于调试debeziumProperties.setProperty("log.mining.continuous.mine", "true");debeziumProperties.setProperty("log.mining.archive.log.hours", "1");debeziumProperties.setProperty("log.mining.archive.log.only.mode", "false");// 错误处理和重试配置debeziumProperties.setProperty("errors.tolerance", "all");debeziumProperties.setProperty("errors.log.enable", "true");debeziumProperties.setProperty("errors.log.include.messages", "true");return OracleSource.<String>builder().hostname(oracleHost).port(oraclePort).database(oracleSid).schemaList(oracleSchema).tableList(syncTables).username(oracleUsername).password(oraclePassword).deserializer(new JsonDebeziumDeserializationSchema()).debeziumProperties(debeziumProperties).build();} }
为了实现真正的并行处理,需要从以下几个关键方面来确保两张表的 CDC 源和数据处理逻辑可以并行运行:
✅ 1. 确保 CDC 源是独立运行的
你已经将两个表的 CDC 源分别配置为:
DataStreamSource<String> shopPluStream = env.addSource(shopPluSource, "SHOP_PLU CDC Source").setParallelism(1);DataStreamSource<String> shopStkStream = env.addSource(shopStkSource, "SHOP_STK CDC Source").setParallelism(1);
Flink CDC(如使用 Ververica 提供的 OracleSource
)通常要求源并行度为 1,这在目前不可避免。但这不会影响整体并行性,只要下游操作并行即可。
✅ 2. 确保处理逻辑是并行的
你要做的,是确保 processAndSink()
中的数据转换和写入逻辑设置了更高的并行度。
例如:
private static void processAndSink(DataStream<String> sourceStream, String tableName) {sourceStream.map(new YourParseOrTransformFunction()) // 可自定义逻辑.setParallelism(4) // <-- 设置 map 操作的并行度.addSink(new YourMySQLSink()) // 自定义 MySQL Sink.setParallelism(4); // <-- 设置 sink 的并行度
}
这样可以确保即使 Source 是串行的,下游的处理和写入是并行运行的。
✅ 3. 任务并行调度与资源配置
确保任务的资源允许并行化生效:
-
设置 Flink 的
TaskManager
的slots
数量足够(至少 8 个以上,便于两个流各占多个并行任务)。 -
调度策略设置为
SlotSharingGroup
,确保两个 CDC 流能被同时调度:shopPluStream.slotSharingGroup("shop_plu_group"); shopStkStream.slotSharingGroup("shop_stk_group");
✅ 4. 将 Source 拆分为不同 Job(可选)
如果你追求极致隔离和并行性,还可以将两个 CDC 流拆成两个完全独立的 Flink Job,这样两个作业可以部署在不同的资源池上,互不干扰:
-
Job A:同步
SHOP_PLU
表 -
Job B:同步
SHOP_STK
表
适合高吞吐场景,但复杂度更高。
✅ 5. 验证并行执行是否生效
你可以通过以下方法检查并行处理是否生效:
-
使用 Flink Web UI 查看每个 Operator 的并行度与运行状态
-
观察两个表的处理是否互不影响、是否同时在运行
-
查看 CPU/Memory 负载是否均衡分布在多个 task slot 上
✅ 示例更新后的处理代码:
private static void processAndSink(DataStream<String> sourceStream, String tableName) {sourceStream.map(new SimpleStringToRowFunction(tableName)).name(tableName + " Parser").setParallelism(4).addSink(new MySQLSinkFunction()).name(tableName + " MySQL Sink").setParallelism(4);
}
✅ 总结
目标 | 方法 |
---|---|
两个表独立采集 | 分别构建并 add 两个 OracleSource ,每个并行度设为 1 |
数据转换和 Sink 并行处理 | 在 map() 和 addSink() 设置更高的并行度(如 4) |
Flink 实际并发生效 | 保证 Flink 配置了足够的 Task Slot |
最大隔离性 | 将两个流变成两个独立作业(部署两个 Job) |
✅ 7. 总结
-
使用 Flink 的 Oracle CDC 和 MySQL CDC 连接器从 Oracle 捕获增量数据,并写入 MySQL。
-
使用 Flink SQL 或 Java 代码对字段进行映射和转换。
-
配置好日志和异常处理,确保 Flink 作业能够稳定运行。