当前位置: 首页 > news >正文

使用 SeaTunnel 建立从 MySQL 到 Databend 的数据同步管道

SeaTunnel 是一个非常易用、超高性能的分布式数据集成平台,支持实时海量数据同步。 每天可稳定高效地同步数百亿数据,已被近百家企业应用于生产,在国内较为普及。

Databend 是一款开源、弹性、低成本,基于对象存储也可以做实时分析的云原生湖仓。

SeaTunnel 架构

SeaTunnel 整体架构:

本文将使用 SeaTunnel 建立从 MySQL 到 Databend 的数据同步管道,实现从 MySQL 数据源同步数据到 Databend 目标表的目的。

SeaTunnel MySQL-CDC 和 Databend Sink Connector

SeaTunnel 的 MySQL CDC 连接器允许从 MySQL 数据库中读取快照数据和增量数据,其实现的原理是基于 debezium-mysql-connector 。

而 Databend 在 PR [Feature][Connector-V2] Support databend source/sink connector 之后也同时在 SeaTunnel 中支持了 Databend 作为 Source 和 Sink Connector。这里我们使用 SeaTunnel 的 MySQL-CDC Source Connector 和 Databend Sink Connector 来搭建数据同步管道。

编译 SeaTunnel

由于上述 Databend Connector 的 PR 刚合并入 SeaTunnel 的 dev 分支,还没有正式 release,所以目前要使用 Databend Connector 的话,需要基于源码对 SeaTunnel 进行构建。

Clone 源码

首先我们需要从 GitHub 克隆 SeaTunnel 源代码。

git clone git@github.com:apache/seatunnel.git
本地安装子项目

在克隆源代码之后,需要运行 ./mvnw 命令将子项目安装到 maven 本地存储库。否则代码无法在 JetBrains IntelliJ IDEA 中正确启动。

./mvnw install -Dmaven.test.skip
构建 SeaTunnel

安装 maven 后,可以使用以下命令进行编译和打包。

mvn clean package -pl seatunnel-dist -am -Dmaven.test.skip=true

构建后的内容在 seatunnel/seatunnel-dist/target 中,我们需要解压 apache-seatunnel-2.3.12-SNAPSHOT-src.tar.gz,得到如下目录: 

bin 下面是可以直接运行的 shell 脚本,能够一键启动 SeaTunnel;

config 中是 jvm options 相关的配置文件;

lib中是运行 SeaTunnel 或者 connector 相关的 jar 包。

创建 connector 配置文件

我们的任务设定是通过 SeaTunnel 从 MySQL 中同步 mydb.t1 表。 配置文件 为 mysql-to-databend.conf:

env{parallelism = 1job.mode = "STREAMING"checkpoint.interval = 2000
}source {MySQL-CDC {base-url="jdbc:mysql://127.0.0.1:3306/mydb"username="root"password="123456"table-names=["mydb.t1"]startup.mode="initial"}
}
sink {Databend {url = "jdbc:databend://127.0.0.1:8000?presigned_url_disabled=true"database = "default"table = "t1"username = "databend"password = "databend"# 批量操作设置batch_size = 2# 如果目标表不存在,是否自动创建auto_create = true}
}

相关的参数设定可以参考 seatunnel MySQL文档 和 seatunnel Databend Connector。

本地启动 MySQL 与 Databend

启动并初始化 MySQL 表数据

本地启动 MySQL 后,创建一个数据库 mydb,在 mydb 中新建一张表并插入 10 条数据:

create database mydb;
use mydb;
create table t1 (a int, b varchar(100));
insert into t1 values(1,'aa')
...
insert into t1 values(10,'bb')

本地启动 Databend

version: '3'
services:databend:image: datafuselabs/databend:v1.2.754-nightlyplatform: linux/arm64ports:- "8000:8000"environment:- QUERY_DEFAULT_USER=databend- QUERY_DEFAULT_PASSWORD=databend- MINIO_ENABLED=truevolumes:- ./data:/var/lib/miniohealthcheck:test: "curl -f localhost:8080/v1/health || exit 1"interval: 2sretries: 10start_period: 2stimeout: 1s

直接 docker-compose up 即可启动 Databend 服务。

启动 SeaTunnel

./bin/seatunnel.sh --config ./bin/mysql-to-databend.conf -m local

启动后 Databend Sink Connector 会首先将 MySQL 表中的全量数据同步过来:

接下来我们往 MySQL 中插入几条数据,就会同步 MySQL 中增量的数据:

可以看到 SeaTunnel 在终端输出的日志: 

以及 Databend 中查询到数据:

说明数据已经及时同步过来了。

目前 Databend Sink Connector 还只支持 Append Only 模式,对于 update、delete 的数据没做处理,会在下一个 seatunnel 的 PR 中实现完整的 CDC 功能。

结论

通过本文我们成功实现了从 MySQL 到 Databend 的实时数据同步管道。这个解决方案具有以下优势:

  1. 简单易用:SeaTunnel 提供了简洁的配置方式,只需少量配置即可建立高效的数据同步管道。
  2. 实时性强:基于 CDC 技术,能够实时捕获 MySQL 的数据变更并同步到 Databend。
  3. 可扩展性好:SeaTunnel 的分布式架构使其能够处理海量数据同步需求。
  4. 低开发成本:无需编写复杂的 ETL 代码,通过配置文件即可完成数据集成任务。

需要注意的是,目前 Databend Sink Connector 还只支持 Append Only 模式,对于 update、delete 的数据没做处理,完整的 CDC 功能将在后续的 PR 中实现。这个方案特别适合需要将 MySQL 数据实时同步到 Databend 进行分析的场景,帮助企业构建实时数据湖仓架构。

关于 Databend

Databend 是一款开源、弹性、低成本,基于对象存储也可以做实时分析的新式湖仓。期待您的关注,一起探索云原生数仓解决方案,打造新一代开源 Data Cloud。

👨‍💻‍ Databend Cloud:databend.cn

📖 Databend 文档:docs.databend.cn

💻 Wechat:Databend

✨ GitHub:github.com/databendlab...

http://www.lryc.cn/news/590420.html

相关文章:

  • Mysql系列--1、库的相关操作
  • 在 IntelliJ IDEA 中添加框架支持的解决方案(没有出现Add Framework Support)
  • AI学习笔记三十一:YOLOv8 C++编译测试(OpenVINO)
  • 使用Telegraf从工业物联网设备收集数据的完整指南
  • Beautiful Soup(BS4)
  • ABP VNext + EF Core 二级缓存:提升查询性能
  • AI炒作,AGI或在2080年之前也无法实现,通用人工智能AGI面临幻灭
  • 【RTSP从零实践】13、TCP传输AAC格式RTP包(RTP_over_TCP)的RTSP服务器(附带源码)
  • 50天50个小项目 (Vue3 + Tailwindcss V4) ✨ | AutoTextEffect(自动打字机)
  • 使用Whistle自定义接口返回内容:Mock流式JSON数据全解析
  • SQL性能分析
  • C# --- 单例类错误初始化 + 没有释放资源导致线程泄漏
  • 【Linux】如何使用nano创建并编辑一个文件
  • 动态规划题解_打家劫舍【LeetCode】
  • 编译原理第四到五章(知识点学习/期末复习/笔试/面试)
  • 部分排序算法的Java模拟实现(复习向,非0基础)
  • AWS ML Specialist 考试备考指南
  • 【Qt】麒麟系统安装套件
  • uniapp写好的弹窗组件
  • OWASP Top 10 攻击场景实战
  • 在 CentOS 8 上彻底卸载 Kubernetes(k8s)
  • 01 启动流程实例
  • ICMR-2025 | 杭电多智能体协作具身导航框架!MMCNav:基于MLLM的多智能体协作户外视觉语言导航
  • 钱包核心标准 BIP32、BIP39、BIP44:从助记词到多链钱包的底层逻辑
  • STM32F4踩坑小记——使用HAL库函数进入HardFault
  • 蓝光三维扫描技术:手机闪光灯模块全尺寸3D检测的高效解决方案
  • HTML基础知识 二(创建容器和表格)
  • 在虚拟环境中复现论文(环境配置)
  • Class<T> 类传递及泛型数组
  • SSH连接复用技术在海外云服务器环境下的稳定性验证与优化方案