当前位置: 首页 > news >正文

Canal实现数据同步

1、Canal实现数据同步

canal可以用来监控数据库数据的变化,从而获得新增数据,或者修改的数据。

1.1 Canal工作原理

在这里插入图片描述

原理相对比较简单:

1、canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议。

2、mysql master收到dump请求,开始推送binary log给slave(也就是canal)。

3、canal解析binary log对象(原始为byte流)。

canal需要使用到mysql,我们需要先安装mysql,canal是基于mysql的主从模式实现的,所以必须先开启

binlog。

1.2 安装MySQL

# 搜索MySQL镜像
$ docker search mysql
# 拉取MySQL镜像
$ docker pull mysql:5.5
# 启动MySQL
$ docker run -d -p 3306:3306 -v /home/zhangshixing/linuxmysql/conf:/etc/mysql/conf.d -v /home/zhangshixing/linuxmysql/data:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=123456 --name zsxmysq1 mysql:5.5
$ docker exec -it fc566f7d1857 /bin/bash
$ mysql -h 192.168.94.186 -u root -p

在这里插入图片描述

1.3 开启binlog模式

(1) 、连接到mysql中,并修改 /etc/mysql/my.cnf 需要开启主从模式,开启binlog模式。

执行如下命令,编辑 mysql 配置文件:

$ docker exec -it fc566f7d1857 /bin/bash
# 该目录一定要存在,否则log日志无法写入
$ cd /etc/mysql/
$ vim my.cnf

没有vim,需要安装vim:

$ apt-get update
$ apt-get install vim

修改 my.cnf 配置文件,添加如下配置:

# 开启logbin
log-bin=/var/lib/mysql/mysql-bin
# binlog日志格式
binlog-format=ROW
# mysql主从备份serverId,canal中不能与此相同
server-id=12345

在这里插入图片描述

(2) 、创建账号,用于测试使用,使用root账号创建用户并授予权限

# CREATE USER <用户名> [ IDENTIFIED ] BY [ PASSWORD ] <口令>
# <用户名>格式为 'user_name'@'host_name',这里user_name是用户名,host_name为主机名
# "%" 表示一组主机
create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, INSERT, DELETE, UPDATE, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

(3)、重启mysql容器

$ docker restart fc566f7d1857

(4)、查看配置是否生效

$ docker exec -it fc566f7d1857 /bin/bash
$ mysql -h 192.168.94.186 -u canal -p

在这里插入图片描述

# binlog日志文件
$ show master status;

在这里插入图片描述

# 查看是否配置成功
$ show variables like 'binlog_format';

在这里插入图片描述

查看日志文件:

在这里插入图片描述

使用Navicat新建canal_test 数据库和t_user_1表。

# 查看日志信息
$ mysqlbinlog -vv mysql-bin.000001

在这里插入图片描述

1.4 canal容器安装

下载镜像:

$ docker pull docker.io/canal/canal-server

容器安装:

$ docker run -p 11111:11111 --name canal -d docker.io/canal/canal-server

进入容器,修改核心配置canal.propertiesinstance.propertiescanal.properties 是canal自身的

配置,instance.properties是需要同步数据的数据库连接配置。

执行代码如下:

$ docker exec -it canal /bin/bash
$ cd canal-server/conf/
$ vim canal.properties
$ cd example/
$ vim instance.properties

修改canal.properties的id,不能和mysql的server-id重复,如下图:

在这里插入图片描述

修改instance.properties,配置数据库连接地址:

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

这里的canal.instance.filter.regex有多种配置,如下:

可以参考地址如下:

https://github.com/alibaba/canal/wiki/AdminGuide

mysql 数据解析关注的表,Perl正则表达式。

多个正则之间以逗号(,)分隔,转义符需要双斜杠(\)

常见例子:

1、所有表:.* or .*\\..*

2、canal schema下所有表:canal\\..*

3、canal下的以canal打头的表:canal\\.canal.*

4、canal schema下的一张表:canal.test1

5、多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)

注意:此过滤条件只针对row模式的数据有效(mixed/statement因为不解析sql,所以无法准确提取

tableName进行过滤)。

配置完成后,设置开机启动,并记得重启canal。

$ docker update --restart=always canal
$ docker restart canal

验证配置是否成功:

$ docker exec -it canal /bin/bash
$ cd canal-server/logs/example/
# 查看日志
$ tail -100f example.log  

在这里插入图片描述

1.5 canal服务搭建

当用户执行数据库的操作的时候,binlog 日志会被canal捕获到,并解析出数据。

1.5.1 安装辅助jar包

spring-boot-starter-canal-master中有一个工程starter-canal,它主要提供了SpringBoot环境下

canal的支持,我们需要先安装该工程,在starter-canal目录下执行mvn install,如下图:

在这里插入图片描述

1.5.2 canal服务工程搭建

spring-boot-starter-canal-master中创建一个工程canal-test

pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>cana-test</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><name>cana-test</name><description>Demo project for Spring Boot</description><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.0.RELEASE</version><relativePath/></parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>com.xpand</groupId><artifactId>starter-canal</artifactId><version>0.0.1-SNAPSHOT</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

application.yml配置

# canal配置
canal.client.instances.example.host=192.168.94.186
canal.client.instances.example.port=11111

监听创建

创建一个CanalDataEventListener类,实现对表增删改操作的监听,代码如下:

package com.example.canatest.config;import com.alibaba.otter.canal.protocol.CanalEntry;
import com.xpand.starter.canal.annotation.*;/*** @author zhangshixing*/
@CanalEventListener
public class MyEventListener {/*** 增加数据监听** @param eventType* @param rowData*/@InsertListenPointpublic void onEvent1(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {System.out.println("InsertListenPoint");rowData.getAfterColumnsList().forEach((c) -> System.out.println("By--Annotation: " + c.getName() + " ::   " + c.getValue()));}/*** 修改数据监听** @param rowData*/@UpdateListenPointpublic void onEvent2(CanalEntry.RowData rowData) {System.out.println("UpdateListenPoint");rowData.getAfterColumnsList().forEach((c) -> System.out.println("By--Annotation: " + c.getName() + " ::   " + c.getValue()));}/*** 刪除数据监听** @param eventType*/@DeleteListenPointpublic void onEvent3(CanalEntry.EventType eventType) {System.out.println("DeleteListenPoint");}/*** 自定义数据修改监听** @param eventType* @param rowData*/@ListenPoint(destination = "example", schema = "canal_test", table = {"t_user_1", "t_user_2"}, eventType = CanalEntry.EventType.INSERT)public void onEvent4(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {System.out.println("CustomListenPoint");rowData.getAfterColumnsList().forEach((c) -> System.out.println("By--Annotation: " + c.getName() + " ::   " + c.getValue()));}
}

启动类创建

创建启动类,代码如下:

package com.example.canatest;import com.xpand.starter.canal.annotation.EnableCanalClient;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
@EnableCanalClient
public class CanaTestApplication {public static void main(String[] args) {SpringApplication.run(CanaTestApplication.class, args);}
}

测试

启动canal微服务,然后修改任意数据库的表数据,canal微服务后台输出信息。

1、插入数据

在这里插入图片描述

在这里插入图片描述

2、修改数据

在这里插入图片描述

在这里插入图片描述

3、删除数据

在这里插入图片描述

在这里插入图片描述

4、自定义插入

在这里插入图片描述

在这里插入图片描述

http://www.lryc.cn/news/180309.html

相关文章:

  • 数据库学习笔记——DDL
  • MATLAB算法实战应用案例精讲-【人工智能】边缘计算(附python代码实现)
  • 精彩回顾 | 迪捷软件亮相2023世界智能网联汽车大会
  • 【ShaderLab PBR 嗜血边缘角色_美式朋克风格_“Niohoggr“_角色渲染(第一篇)】
  • python经典百题之围圈报数
  • Google Earth Engine(GEE)案例——如何去除和过滤Landsat和sentinel等系列影像集合中的空影像(三种方法)
  • Leetcode 69.x的平方根
  • Node18.x基础使用总结(二)
  • LCD 的RGB接口(SYNC Mode/ SYNC-DE Mode/ DE Mode)
  • flink生成水位线记录方式--周期性水位线生成器
  • 百度资源搜索平台出现:You do not have the proper credential to access this page.怎么办?
  • 树莓派CM4开启I2C与UART串口登录同时serial0映射到ttyS0 开启多串口
  • 【租车骑绿道】python实现-附ChatGPT解析
  • 【ONE·Linux || 多线程(一)】
  • 华为智能企业上网行为管理安全解决方案(1)
  • Acwing 240. 食物链
  • c++ 容器适配器
  • 正则表达式的应用领域及基本语法解析
  • CIP或者EtherNET/IP中的PATH是什么含义?
  • 使用lombok进行bulider之后调取HashMap的自定义方法进行对象操作报空指针异常(pojo也适用)
  • 矩阵-day14
  • 上古神器:十六位应用程序 Debug 的基本使用
  • [学习笔记]ARXML - Data Format
  • Go_原子操作和锁
  • 初识Java 12-1 流
  • 【软件工程_UML—StartUML作图工具】startUML怎么画interface接口
  • 单片机之瑞萨RL78定时计数器
  • 手机号码格式校验:@Phone(自定义参数校验注解)
  • ORACLE Redo Log Buffer 重做日志缓冲区机制的设计
  • PWN Test_your_nc Write UP