当前位置: 首页 > news >正文

实时同步:使用 Canal 和 Kafka 解决 MySQL 与缓存的数据一致性问题

目录

1. 准备工作

2. 将需要缓存的数据存储 Redis

3. 监听 canal 存储在 Kafka Topic 中数据


1. 准备工作

1. 开启并配置MySQL的 BinLog(MySQL 8.0 默认开启)

修改配置:C:\ProgramData\MySQL\MySQL Server 8.0\my.ini

log-bin="HELONG-bin"
binlog_format=ROW     # 只能配置行模式, 因为 Cannal 不具备将SQL转化成数据的能力
binlog-do-db=aicloud    # 监控 AI Cloud 项目

如果要同步多个项目:

binlog-do-db=aicloud
binlog-do-db=aicloud2
binlog-do-db=aicloud3

2. 重启MySQL服务

3. 赋值数据同步权限

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

4. 安装并配置 Canal

下载地址:https://github.com/alibaba/canal/releases

① 修改canal.properties

canal.serverMode=kafka
canal.mq.servers=127.0.0.1:9092

canal 监控 binlog 日志,binlog 日志的传输默认使用 MySQL 的复制协议(基于 TCP/IP),

可以使用写代码的方式直接从 MySQL 服务器读取数据,此处使用本地 kafka 进行存储。

② 修改instance.properties

canal.instance.mysql.slaveId=100   # 大于 1 即可
canal.instance.master.address=127.0.0.1:3306
canal.mq.topic=ai-cloud-canal-to-kafka

slaveId 表示从节点 id,canal 的执行原理就是伪装成一个从库去主库同步数据

(主节点的 slaveId = 1)

address 配置连接本地的 MySQL

topic 配置数据发送到 Kafka 的某个主题下

5. 拷贝 Jar 包到 lib

将 canal 下 plugin 下的所有 jar 包拷贝到 lib 目录下。

6. 删除 bin 目录下 startup.bat 里的参数

如果启动时报错:

Unrecognized VM option 'PermSize=128m'

Error: Could not create the Java Virtual Machine.

Error: A fatal exception has occurred. Program will exit.

删除 -XX:PermSize=128m 参数即可。

7. 启动 canal

打开 cmd ,cd 到 bin 目录下,输入 startup.bat 回车

2. 将需要缓存的数据存储 Redis

此时我将这个查询列表接口的数据,存储在 Redis 中:

/*** 获取历史聊天记录(对话/绘图)** @param type* @return {@link ResponseEntity }*/
@RequestMapping("/list")
public ResponseEntity getHistoryList(Integer type, Integer model) {String listCacheKey = RedisUtil.getListCacheKey(SecurityUtil.getCurrentUser().getUid(), model, type);Object list = redisTemplate.opsForValue().get(listCacheKey);if (ObjectUtil.isNull(list)) {LambdaQueryWrapper<Answer> queryWrapper = new LambdaQueryWrapper<>();queryWrapper.eq(Answer::getUid, SecurityUtil.getCurrentUser().getUid());queryWrapper.eq(Answer::getType, type);queryWrapper.eq(Answer::getModel, model);queryWrapper.orderByDesc(Answer::getAid);List<Answer> answerList = answerService.list(queryWrapper);List<Long> userIds = answerList.stream().map(Answer::getUid).collect(Collectors.toList());Map<Long, User> userIdMap = userService.selectByIds(userIds).stream().collect(Collectors.toMap(User::getUid, Function.identity()));List<AnswerVo> answerVoList = answerList.stream().map(answer -> AnswerVoUtil.getListAnswerVo(answer, userIdMap)).collect(Collectors.toList());// 缓存 1 天redisTemplate.opsForValue().set(listCacheKey, answerVoList, 1, TimeUnit.DAYS);return ResponseEntity.success(answerVoList);} else {return ResponseEntity.success(list);}
}
/*** 查询列表存储 Redis 缓存** @param uid* @param model* @param type* @return {@link String }*/
public static String getListCacheKey(Long uid, Integer model, Integer type) {return "LIST_CACHE_KEY_" + uid + "_" + model + "_" + type;
}

3. 监听 Kafka Topic 中数据并删除 Redis 缓存

首先对数据库中需要缓存的数据进行一些修改操作:

此时,使用 kafka ui(下载地址划到最底下),刷新 kafka 对应 topic 下的 message,就可以看到当前所作出的修改:

执行修改操作:将 “如何学习Spring???”修改成 “如何学习Spring??”

执行删除操作:

由此可见,对数据库的每一个修改操作,都是对应固定格式的一个数据,所以可以监听对应的  topic 并针对 data 中的数据进行一个提取,得到一个  cacheKey,然后删除对应的缓存,使得下一次的查询去访问数据库,并同步缓存。

【代码示例】

/*** canal 监控 binlog 日志,将修改的数据存储 kafka topic 中* 监听 kafka topic 中的数据** @param data* @param ack* @throws JsonProcessingException*/
@KafkaListener(topics = {KafkaConstant.CANAL_TOPIC})
public void canalListen(String data, Acknowledgment ack) throws JsonProcessingException {HashMap<String, Object> map = objectMapper.readValue(data, HashMap.class);if (map.isEmpty()) {ack.acknowledge();return;}// 匹配上对应的数据库和数据表if (KafkaConstant.TARGET_DATABASE.equals(map.get(KafkaConstant.DATABASE_KEY).toString()) &&KafkaConstant.TARGET_TABLE.equals(map.get(KafkaConstant.TABLE_KEY).toString())) {// 更新缓存 List<Map<String, Object>> list = (List<Map<String, Object>>) map.get(KafkaConstant.DATA_KEY);if (!CollectionUtils.isEmpty(list)) {for (Map<String, Object> answerMap : list) {String answerListCacheKey = RedisUtil.getListCacheKey(Long.valueOf(answerMap.get("uid").toString()),Integer.parseInt(answerMap.get("model").toString()),Integer.parseInt(answerMap.get("type").toString()));// 删除缓存,让下一次查询走数据库,并同步缓存redisTemplate.delete(answerListCacheKey);}}}//  手动确认应答ack.acknowledge();
}
/*** canal 同步数据到 kafka*/
public static final String CANAL_TOPIC = "ai-cloud-canal-to-kafka";/*** 数据库,缓存数据一致性的*/public static final String DATABASE_KEY = "database";public static final String TABLE_KEY = "table";public static final String DATA_KEY = "data";public static final String TARGET_DATABASE = "aicloud";public static final String TARGET_TABLE = "answer";

【补充】

kafka ui 下载地址:​​​​​​https://github.com/provectus/kafka-ui/tags

修改配置

kafka:clusters:- name: kafka3_clusterbootstrapServers: 127.0.0.1:9092

 

http://www.lryc.cn/news/407195.html

相关文章:

  • WINUI——Microsoft.UI.Xaml.Markup.XamlParseException:“无法找到与此错误代码关联的文本。
  • C语言 | Leetcode C语言题解之第283题移动零
  • WPF项目实战视频《二》(主要为prism框架)
  • 【微信小程序实战教程】之微信小程序 WXS 语法详解
  • Android中Service学习记录
  • Elasticsearch:Java ECS 日志记录 - log4j2
  • MongoDB自学笔记(四)
  • 时序分解 | Matlab基于CEEMDAN-CPO-VMD的CEEMDAN结合冠豪猪优化算法(CPO)优化VMD二次分解
  • 新版海螺影视主题模板M3.1全解密版本多功能苹果CMSv10后台自适应主题
  • 汽车免拆诊断案例 | 2014 款上汽名爵 GT 车发动机无法起动
  • vue3前端开发-小兔鲜项目-登录功能的业务接口调用
  • 【Linux】vim编辑器使用详解
  • 手机怎么设置不同的ip地址
  • SpringBoot读取配置的6种方式
  • 1.1 openCv -- 介绍
  • 探索PostgreSQL的GUI工具:提升数据库管理效率
  • 【从零开始实现stm32无刷电机FOC】【实践】【5/7 stm32 adc外设的高级用法】
  • springcloud接入seata管理分布式事务
  • Android APP 音视频(02)MediaProjection录屏与MediaCodec编码
  • java中log4j.properties配置文件浅析
  • RV1126 Linux 系统,接外设,时好时坏(二)排查问题的常用命令
  • 鸿蒙北向开发 DevEco Studio 4.1 下载安装傻瓜式教程
  • pglogical扩展的基本用法介绍
  • 2024年虚拟主机转移教程
  • Python 函数对象和函数调用
  • sql注入的专项练习 sqlilabs(含代码审计)
  • 淄博网站建设贵不贵
  • 【学习笔记】无人机系统(UAS)的连接、识别和跟踪(十)-无人机A2X服务
  • 基于迁移学习的手势分类模型训练
  • 个性化音频生成GPT-SoVits部署使用和API调用