当前位置: 首页 > news >正文

FlinkCDC数据实时同步Mysql到ES

考大家一个问题,如果想要把数据库的数据同步到别的地方,比如es,mongodb,大家会采用哪些方案呢? :::

  1. 定时扫描同步?

  2. 实时日志同步?

定时同步是一个很好的方案,比较简单,但是如果对实时要求比较高的话,定时同步就有点不合适了。今天给大家介绍一种实时同步方案,就是是使用flinkcdc 来读取数据库日志,并且写入到elasticsearch中。

1.什么是flinkcdc?

Flink CDC(Change Data Capture)是指通过 Apache Flink 实现的一种数据变化捕获技术。CDC 可以实时捕获数据库中的数据变化,如插入、更新、删除操作,并将这些变化数据流式地传输到其他系统或存储中。通过 Flink CDC,用户可以实时监控数据库中的数据变化,并将这些变化数据用于实时分析、ETL(Extract, Transform, Load)等应用场景。Flink CDC 通常用于构建实时数据管道,帮助用户实现实时数据同步和分析。

2.flinkcdc发展趋势?

目前在github 上大概有5k 的star,也有越来越多的人使用。

3.flinkcdc有什么优势?

说到实时同步,canal 是比较常用的方案

canal,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。 这句介绍有几个关键字:增量日志,增量数据订阅和消费。

canal的把自己伪装成MySQL slave,模拟MySQL slave的交互协议向MySQL Mater发送 dump协议,MySQL mater收到canal发送过来的dump请求,开始推送binary log给canal,然后canal解析binary log,再发送到存储目的地,比如MySQL,Kafka,Elastic Search等等。

那么 flinkcdc 和canal 对比,有什么不同呢?

这是网上的一个对比。可以看到 flinkcdc 和canal 一样,也是通过读取数据库日志的方式做到实时同步的,这个和很多实时同步的工具原理相同,比如 ogg debezium 都是这样做的,flinkcdc 的优势是基于flink 这个强大的实时计算引擎,可以做到集群部署,高可用等等,并且社区活跃,支持的平台多,像 mysql oracle mongodb 主流数据库都是支持的。而canal只支持mysql。

还有一个优势,flinkcdc 是基于java实现的,背靠大数据这个大平台,解决方案也是比较多的。源码阅读修改起来也是比较方便的。

4.一个例子

光说不练假把式,简单的写一个把mysql 数据实时同步到es的例子,使用flinksql的方式,只需要简单的几行sql

依赖
flink-1.15.0
flink-sql-connector-elasticsearch7-1.15.0.jar
flink-sql-connector-mysql-cdc-2.2.1.jar
mysql 5.7
es 7.9.3

安装好flink 之后,把 flink-sql-connector-elasticsearch7-1.15.0.jar flink-sql-connector-mysql-cdc-2.2.1.jar 上传到 flink lib 目录 启动flink

./start-cluster.sh

打开flink sql 窗口

./start-cluster.sh

创建和mysql 关联的表

CREATE TABLE products (id INT,name STRING,description STRING,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = 'localhost','port' = '3306','username' = 'root','password' = '123456','database-name' = 'mydb','table-name' = 'products');
CREATE TABLE orders (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN,PRIMARY KEY (order_id) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = 'localhost','port' = '3306','username' = 'root','password' = '123456','database-name' = 'mydb','table-name' = 'orders');

创建和es 同步的表

CREATE TABLE enriched_orders (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN,product_name STRING,product_description STRING,PRIMARY KEY (order_id) NOT ENFORCED) WITH ('connector' = 'elasticsearch-7','hosts' = 'http://192.168.91.134:9200','index' = 'enriched_orders');

创建读取mysql写入es任务

INSERT INTO enriched_ordersSELECT o.*, p.name, p.descriptionFROM orders AS oLEFT JOIN products AS p ON o.product_id = p.id;

执行这个任务后,mysql 的数据就能实时同步至es了

当然数据源也是支持很多种,比如 oracle mongodb sqlserver 写入端也支持 es kafka hive 等等,看大家需要。想我们的业务场景,是先将mysql 数据同步到kafka,然后再消费kafka 消息,把数据写入到es, hive,starrocks 等等。并且使用了checkpoint 做为断点恢复的保障。

5.最后

附上一些涉及的到网址,方便大家查阅

flinkcdc 官网 

flinkcdc github

flink 官网

flink 文档

http://www.lryc.cn/news/236172.html

相关文章:

  • 【Feign】 基于 Feign 远程调用、 自定义配置、性能优化、实现 Feign 最佳实践
  • 小迪安全笔记(3)——基础入门3、基础入门4
  • SOME/IP 协议介绍(六)接口设计的兼容性规则
  • 吴恩达《机器学习》8-5->8-6:特征与直观理解I、样本与值观理解II
  • 『亚马逊云科技产品测评』活动征文|借助AWS EC2搭建服务器群组运维系统Zabbix+spug
  • 文件转换,简简单单,pdf转word,不要去找收费的了,自己学了之后免费转,之后就复制粘贴就ok了
  • Jmeter——循环控制器中实现Counter计数器的次数重置
  • [创业之路-85]:IT创业成功老板的品质、创业失败老板的特征、成功领导者的品质、失败管理者的特征
  • 警惕.360勒索病毒,您需要知道的预防和恢复方法。
  • 人力资源小程序
  • 【多线程 - 10、线程同步3 ThreadLocal】
  • 【Flink 问题集】The generic type parameters of ‘Collector‘ are missing
  • 数据分析—将txt文件转为csv文件;将csv文件转为xls文件
  • 【算法】二分查找-20231120
  • WPF实现将鼠标悬浮在按钮上时弹出菜单
  • 车载以太网-传输层-UDP
  • uniapp如何上传文件,使用API是什么
  • 【狂神说Java】Docker概述 | Docker安装 | Docker的常用命令
  • Git精讲
  • 读书笔记:Effective C++ 3.0版2005年Scott Meyers : 55条建议(47-55)
  • Golang Context 的并发安全性探究
  • C++中只能有一个实例的单例类
  • 单张图像3D重建:原理与PyTorch实现
  • 340条样本就能让GPT-4崩溃,输出有害内容高达95%?OpenAI的安全防护措施再次失效
  • 2023.11.17使用flask将多个图片文件上传至服务器
  • 母婴服务预约小程序的效果如何
  • Flask实现cookie 开发
  • 2311rust,到60版本更新
  • 【开源】基于Vue和SpringBoot的微信小程序的音乐平台
  • adb手机调试常用命令