当前位置: 首页 > news >正文

使用Apache Doris自动同步整个 MySQL/Oracle 数据库进行数据分析

Flink-Doris-Connector 1.4.0 允许用户一步将包含数千个表的整个数据库(MySQL或Oracle )摄取到Apache Doris(一种实时分析数据库)中。

通过内置的Flink CDC,连接器可以直接将上游源的表模式和数据同步到Apache Doris,这意味着用户不再需要编写DataStream程序或在Doris中预先创建映射表。

当 Flink 作业启动时,Connector 会自动检查源数据库和 Apache Doris 之间的数据等效性。如果数据源包含 Doris 中不存在的表,Connector 会自动在 Doris 中创建相同的表,并利用 Flink 的侧输出来方便一次摄取多个表;如果源中发生架构更改,它将自动获取 DDL 语句并在 Doris 中进行相同的架构更改。
 

一、快速开始

  • 对于MySQL:

下载 JAR 文件:https://github.com/apache/doris-flink-connector/releases/tag/1.4.0


行家:

<dependency><groupId>org.apache.doris</groupId><artifactId>flink-doris-connector-1.15</artifactId><!--artifactId>flink-doris-connector-1.16</artifactId--><!--artifactId>flink-doris-connector-1.17</artifactId--><version>1.4.0</version>
</dependency>
  • 对于Oracle:

下载 JAR 文件:
Flink 1.15:http://justtmp-bj-1308700295.cos.ap-beijing.myqcloud.com/oracle/flink-doris-connector-1.15-1.5.0-SNAPSHOT.jar
Flink 1.16:http://justtmp-bj-1308700295.cos.ap-beijing.myqcloud.com/oracle/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar
Flink 1.17:http://justtmp-bj-1308700295.cos.ap-beijing.myqcloud.com/oracle/flink-doris-connector-1.17-1.5.0-SNAPSHOT.jar


如何使用它

例如,要将整个 MySQL 数据库引入mysql_dbDoris(MySQL 表名以tbl或test开头),只需执行以下命令(无需提前在Doris 中创建表):

<FLINK_HOME>/bin/flink run \-Dexecution.checkpointing.interval=10s \-Dparallelism.default=1 \-c org.apache.doris.flink.tools.cdc.CdcTools \lib/flink-doris-connector-1.16-1.4.0.jar \mysql-sync-database \--database test_db \--mysql-conf hostname=127.0.0.1 \--mysql-conf username=root \--mysql-conf password=123456 \--mysql-conf database-name=mysql_db \--including-tables "tbl|test.*" \--sink-conf fenodes=127.0.0.1:8030 \--sink-conf username=root \--sink-conf password=123456 \--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \--sink-conf sink.label-prefix=label1 \--table-conf replication_num=1

摄取Oracle数据库:请参考示例代码(https://github.com/apache/doris-flink-connector/pull/156)。


表现如何

当涉及到同步整个数据库(包含数百甚至数千个活动或不活动的表)时,大多数用户希望在几秒钟内完成。因此我们测试了连接器,看看它是否符合要求:

  • 1000 个 MySQL 表,每个表有 100 个字段。所有表都是活动的(这意味着它们不断更新,每次数据写入涉及一百多行)

  • Flink作业检查点:10s

经过压力测试,系统表现出较高的稳定性,主要指标如下:

根据早期采用者的反馈,该Connector在生产环境中的万表数据库同步中也提供了高性能和系统稳定性。这证明Apache Doris和Flink CDC的结合能够高效可靠地进行大规模数据同步。

二、它如何使数据工程师受益

工程师不再需要担心表创建或表模式维护,从而节省了数天繁琐且容易出错的工作。之前在Flink CDC中,需要为每个表创建一个Flink作业,并在源端建立日志解析链路,但现在通过全库摄取,源数据库的资源消耗大大减少。也是增量更新和全量更新的统一解决方案。

其他特性

1、连接维度表和事实表

常见的做法是将维度表放在Doris中,通过Flink的实时流进行Join查询。Flink-Doris-Connector 1.4.0基于Flink 的 Async I/O实现了异步 Lookup Join,因此 Flink 实时流不会因为查询而阻塞。此外,连接器还允许您将多个查询合并为一个大查询,并将其立即发送给 Doris 进行处理。这提高了此类连接查询的效率和吞吐量。

2、节俭 SDK

我们在 Connector 中引入了 Thrift-Service SDK,用户不再需要使用 Thrift 插件或在编译时配置 Thrift 环境。这使得编译过程变得更加简单。

3、按需流加载

数据同步过程中,当没有新的数据摄入时,不会发出Stream Load请求。这样可以避免不必要的集群资源消耗。

4、后端节点轮询

对于数据摄取,Doris 调用前端节点获取后端节点列表,并随机选择一个发起摄取请求。该后端节点将是协调器。Flink-Doris-Connector 1.4.0 允许用户启用轮询机制,即在每个Flink 检查点都有不同的后端节点作为 Coordinator,以避免单个后端节点长期承受过大的压力。

5、支持更多数据类型

除了常见的数据类型外,Flink-Doris-Connector 1.4.0 还支持 Doris 中的 DecimalV3/DateV2/DateTimev2/Array/JSON。


三、用法示例

可以通过DataStream或FlinkSQL(有界流)从Doris读取数据。支持谓词下推。

CREATE TABLE flink_doris_source (name STRING,age INT,score DECIMAL(5,2)) WITH ('connector' = 'doris','fenodes' = '127.0.0.1:8030','table.identifier' = 'database.table','username' = 'root','password' = 'password','doris.filter.query' = 'age=18'
);
​
SELECT * FROM flink_doris_source;


连接维度表和事实表:

CREATE TABLE fact_table (`id` BIGINT,`name` STRING,`city` STRING,`process_time` as proctime()
) WITH ('connector' = 'kafka',
...
);
​
create table dim_city(`city` STRING,`level` INT ,`province` STRING,`country` STRING
) WITH ('connector' = 'doris','fenodes' = '127.0.0.1:8030','jdbc-url' = 'jdbc:mysql://127.0.0.1:9030','lookup.jdbc.async' = 'true','table.identifier' = 'dim.dim_city','username' = 'root','password' = ''
);
​
SELECT a.id, a.name, a.city, c.province, c.country,c.level 
FROM fact_table a
LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.city = c.city


写入Apache Doris:

CREATE TABLE doris_sink (name STRING,age INT,score DECIMAL(5,2)) WITH ('connector' = 'doris','fenodes' = '127.0.0.1:8030','table.identifier' = 'database.table','username' = 'root','password' = '','sink.label-prefix' = 'doris_label',//json write in'sink.properties.format' = 'json','sink.properties.read_json_by_line' = 'true'
);
http://www.lryc.cn/news/157801.html

相关文章:

  • 【1++的数据结构】之哈希(一)
  • 【网络编程】深入了解UDP协议:快速数据传输的利器
  • WordPress(5)在主题中添加文章字数和预计阅读时间
  • STM32WB55开发(1)----套件概述
  • CUDA相关知识科普
  • 恒运资本:总市值和总资产区别?
  • CTF安全竞赛介绍
  • DC/DC开关电源学习笔记(四)开关电源电路主要器件及技术动态
  • 数据可视化与数字孪生:理解两者的区别
  • C++ socket编程(TCP)
  • ldd用于打印程序或库文件所依赖的共享库列表
  • vue+elementUI el-table实现单选
  • 前端组件库造轮子——Message组件开发教程
  • 单片机第二季:温度传感器DS18B20
  • 抓包工具fiddler的基础知识
  • 监控基本概念
  • 【数据结构】 七大排序详解(壹)——直接插入排序、希尔排序、选择排序、堆排序
  • 【Linux】高级IO --- Reactor网络IO设计模式
  • Agisoft Metashape相机标定笔记
  • vue-cropper在ie11下选择本地图片后,无显示、拒绝访问的问题
  • Excel VSTO开发11-自定义菜单项
  • stm32之30.DMA
  • 【LeetCode75】第四十九题 数组中的第K个最大元素
  • 嵌入式面试笔试刷题(day14)
  • 好用免费的Chat GPT(亲测有用)
  • SpringBoot项目--电脑商城【上传头像】
  • 优化SOCKS5的方法
  • 使用 HelpLook Chatbot,让AI聊天机器人变成销售经理
  • MT9700 80mΩ,可调快速响应限流配电开关芯片
  • RabbitMQ之延迟队列