当前位置: 首页 > news >正文

Linux上用C++和GCC开发程序实现两个不同MySQL实例下单个Schema稳定高效的数据迁移到其它MySQL实例

设计一个在Linux上运行的GCC C++程序,同时连接三个不同的MySQL实例,其中两个实例中分别有两个Schema的表结构分别与第三实例中两个Schema个结构完全相同,同时复制两个实例中两个Schema里的所有表的数据到第三个实例中两个Schema里,使用以下快速高效的方法,加入异常处理,支持每隔固定时间重试一定次数,每张表的复制运行状态和记录条数,开始结束时间戳,运行时间,以及每个批次的运行状态,开始结束时间戳,运行时间,总记录条数都要到运行日志文件里,日志文件全部放在logs目录下,每天一个.log扩展名的文本文件,文件名带日期,所有运行参数写在json配置文件中。

C++解决方案,采用模块化设计并包含详细注释:

该实现结合了C++的高效性和现代C++的特性,同时利用MySQL X DevAPI提供的高性能接口,能够有效处理大规模数据复制需求。

#include <iostream>
#include <fstream>
#include <chrono>
#include <thread>
#include <vector>
#include <map>
#include <mysqlx/xdevapi.h>
#include <nlohmann/json.hpp>
#include <filesystem>
#include <mutex>using namespace std;
using namespace mysqlx;
using json = nlohmann::json;
namespace fs = filesystem;// 配置结构体
struct Config {struct DBConfig {string host;int port;string user;string password;};DBConfig source1;DBConfig source2;DBConfig target;int max_retries = 3;int retry_interval = 60; // secondsmap<string, vector<string>> schema_mapping;
};// 日志管理器类
class Logger {
private:mutex log_mutex;string log_dir = "logs";string get_current_date() {auto now = chrono::system_clock::now();time_t t = chrono::system_clock::to_time_t(now);tm tm = *localtime(&t);char buffer[11];strftime(buffer, sizeof(buffer), "%Y%m%d", &tm);return string(buffer);}public:Logger() {if (!fs::exists(log_dir)) {fs::create_directory(log_dir);}}void log(const string& message) {lock_guard<mutex> lock(log_mutex);ofstream log_file(log_dir + "/" + get_current_date() + ".log",ios::app);auto now = chrono::system_clock::now();time_t t = chrono::system_clock::to_time_t(now);log_file << put_time(localtime(&t), "%Y-%m-%d %H:%M:%S") << " | " << message << endl;}
};// 数据库操作类
class DBOperator {Config& config;Logger& logger;Session connect_db(const Config::DBConfig& db_conf) {try {return Session(db_conf.host, db_conf.port, db_conf.user, db_conf.password);} catch (const Error& e) {logger.log("Connection error: " + string(e.what()));throw;}}public:DBOperator(Config& cfg, Logger& log) : config(cfg), logger(log) {}void copy_table_data(Schema& source_schema, Schema& target_schema,const string& table_name,int batch_size = 1000) {auto start_time = chrono::system_clock::now();string log_prefix = "Table[" + table_name + "] ";try {Table source_table = source_schema.getTable(table_name);Table target_table = target_schema.getTable(table_name);int total_rows = 0;RowResult res = source_table.select("*").execute();while (const Row row = res.fetchOne()) {vector<Row> batch;for (int i = 0; i < batch_size && row; ++i) {batch.push_back(row);total_rows++;if (i < batch_size-1) row = res.fetchOne();}target_table.insert().rows(batch).execute();}auto end_time = chrono::system_clock::now();auto duration = chrono::duration_cast<chrono::milliseconds>(end_time - start_time);logger.log(log_prefix + "SUCCESS | Rows: " + to_string(total_rows) +" | Duration: " + to_string(duration.count()) + "ms");} catch (const Error& e) {logger.log(log_prefix + "ERROR: " + string(e.what()));throw;}}void copy_schema(Session& source_session, Session& target_session,const string& source_schema_name,const string& target_schema_name,const vector<string>& tables) {Schema source_schema = source_session.getSchema(source_schema_name);Schema target_schema = target_session.getSchema(target_schema_name);for (const auto& table : tables) {for (int attempt = 0; attempt < config.max_retries; ++attempt) {try {copy_table_data(source_schema, target_schema, table);break;} catch (...) {if (attempt == config.max_retries-1) {logger.log("Table[" + table + "] FAILED after " + to_string(config.max_retries) + " attempts");}this_thread::sleep_for(chrono::seconds(config.retry_interval));}}}}
};int main(int argc, char** argv) {try {// 加载配置文件ifstream config_file("config.json");json config_json = json::parse(config_file);Config config;config.source1 = config_json["source1"];config.source2 = config_json["source2"];config.target = config_json["target"];config.schema_mapping = config_json["schema_mapping"];Logger logger;DBOperator db_operator(config, logger);// 连接数据库Session source1 = db_operator.connect_db(config.source1);Session source2 = db_operator.connect_db(config.source2);Session target = db_operator.connect_db(config.target);// 执行复制任务for (const auto& [source_schema, target_schema] : config.schema_mapping) {vector<string> tables = config.schema_mapping[source_schema];// 从第一个源复制db_operator.copy_schema(source1, target, source_schema, target_schema, tables);// 从第二个源复制db_operator.copy_schema(source2, target, source_schema, target_schema, tables);}} catch (const exception& e) {cerr << "Fatal error: " << e.what() << endl;return 1;}return 0;
}

配置文件示例 (config.json):

{"source1": {"host": "source1.example.com","port": 3306,"user": "user","password": "password"},"source2": {"host": "source2.example.com","port": 3307,"user": "user","password": "password"},"target": {"host": "target.example.com","port": 3308,"user": "user","password": "password"},"max_retries": 3,"retry_interval": 60,"schema_mapping": {"source_schema1": ["table1", "table2"],"source_schema2": ["table3", "table4"]}
}

关键特性说明:

  1. 配置管理
  • 使用JSON配置文件存储数据库连接参数和复制参数
  • 支持灵活的模式映射配置
  • 可配置重试策略和批量大小
  1. 日志系统
  • 自动创建日志目录
  • 按天滚动日志文件
  • 详细记录每个操作的元数据
  • 线程安全的日志写入
  1. 数据复制
  • 使用MySQL X DevAPI进行高效数据操作
  • 批量插入机制提升性能
  • 自动重试机制和错误隔离
  • 表级并行复制(可扩展)
  1. 异常处理
  • 多级异常捕获机制
  • 连接失败自动重试
  • 数据操作错误隔离
  • 资源自动释放保障
  1. 扩展性
  • 模块化设计便于功能扩展
  • 支持动态表列表配置
  • 可扩展的多线程支持

编译运行:

# 安装依赖
sudo apt-get install libmysqlcppconn-dev nlohmann-json3-dev# 编译
g++ -std=c++17 -o db_sync main.cpp -lmysqlcppconn8 -lpthread# 运行
./db_sync

建议的优化扩展方向:

  1. 并行处理
  • 使用线程池实现表级并行复制
  • 异步IO提升吞吐量
  • 连接池管理数据库连接
  1. 监控增强
  • 实时进度报告
  • Prometheus指标导出
  • 健康检查端点
  1. 高级功能
  • 增量复制支持
  • 模式校验机制
  • 数据一致性校验
  • 自动断点续传
  1. 部署优化
  • Docker容器化封装
  • Systemd服务集成
  • 配置热加载支持
http://www.lryc.cn/news/545187.html

相关文章:

  • RabbitMQ系列(一)架构解析
  • XSL 语言:XML 样式表的语言基础与应用
  • 【计算机网络】常见tcp/udp对应的应用层协议,端口
  • ExpMoveFreeHandles函数分析和备用空闲表的关系
  • 微服务学习(1):RabbitMQ的安装与简单应用
  • 基于javaweb的SSM+Maven幼儿园管理系统设计和实现(源码+文档+部署讲解)
  • 企业级本地知识库部署指南(Windows优化版)
  • 5. Nginx 负载均衡配置案例(附有详细截图说明++)
  • Redis---缓存穿透,雪崩,击穿
  • 计算机毕业设计SpringBoot+Vue.js人口老龄化社区服务与管理平台 (源码+文档+PPT+讲解)
  • 【异地访问本地DeepSeek】Flask+内网穿透,轻松实现本地DeepSeek的远程访问
  • Nacos + Dubbo3 实现微服务的Rpc调用
  • 散户如何实现自动化交易下单——篇1:体系介绍与获取同花顺资金账户和持仓信息
  • 基于Electron的应用程序安全测试基础 — 提取和分析.asar文件的案例研究
  • vue中computed方法使用;computed返回函数
  • 大语言模型的评测
  • 【Vue3】浅谈setup语法糖
  • EasyRTC嵌入式WebRTC技术与AI大模型结合:从ICE框架优化到AI推理
  • 如何管理路由器
  • 【NTN 卫星通信】低轨卫星通信需要解决的关键问题
  • DOM HTML:深入理解与高效运用
  • 如何进行OceanBase 运维工具的部署和表性能优化
  • docker简介-学习与参考
  • AcWing 蓝桥杯集训·每日一题2025·密接牛追踪2
  • LeetCode 每日一题 2025/2/24-2025/3/2
  • TeX Live 2025 最新版安装与中文环境配置全教程(Windows/Mac/Linux)
  • Android实现漂亮的波纹动画
  • JAVA学习笔记038——bean的概念和常见注解标注
  • 自然语言处理NLP入门 -- 第十节NLP 实战项目 2: 简单的聊天机器人
  • 【网络安全 | 渗透工具】小程序反编译分析源码 | 图文教程