当前位置: 首页 > news >正文

Canal+Kafka实现MySQL与Redis数据同步(二)

Canal+Kafka实现MySQL与Redis数据同步(二)

创建MQ消费者进行同步

在application.yml配置文件加上kafka的配置信息:

spring:kafka:# Kafka服务地址bootstrap-servers: 127.0.0.1:9092consumer:# 指定一个默认的组名group-id: consumer-group1#序列化反序列化key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringDeserializervalue-serializer: org.apache.kafka.common.serialization.StringDeserializer# 批量抓取batch-size: 65536# 缓存容量buffer-memory: 524288

根据上面Kafka消费命令那里,我们知道了json数据的结构,可以创建一个CanalBean对象进行接收:

public class CanalBean {//数据private List<TbCommodityInfo> data;//数据库名称private String database;private long es;//递增,从1开始private int id;//是否是DDL语句private boolean isDdl;//表结构的字段类型private MysqlType mysqlType;//UPDATE语句,旧数据private String old;//主键名称private List<String> pkNames;//sql语句private String sql;private SqlType sqlType;//表名private String table;private long ts;//(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等private String type;//getter、setter方法
}
public class MysqlType {private String id;private String commodity_name;private String commodity_price;private String number;private String description;//getter、setter方法
}
public class SqlType {private int id;private int commodity_name;private int commodity_price;private int number;private int description;
}

最后就可以创建一个消费者CanalConsumer进行消费:

@Component
public class CanalConsumer {//日志记录private static Logger log = LoggerFactory.getLogger(CanalConsumer.class);//redis操作工具类@Resourceprivate RedisClient redisClient;//监听的队列名称为:canaltopic@KafkaListener(topics = "canaltopic")public void receive(ConsumerRecord<?, ?> consumer) {String value = (String) consumer.value();log.info("topic名称:{},key:{},分区位置:{},下标:{},value:{}", consumer.topic(), consumer.key(),consumer.partition(), consumer.offset(), value);//转换为javaBeanCanalBean canalBean = JSONObject.parseObject(value, CanalBean.class);//获取是否是DDL语句boolean isDdl = canalBean.getIsDdl();//获取类型String type = canalBean.getType();//不是DDL语句if (!isDdl) {List<TbCommodityInfo> tbCommodityInfos = canalBean.getData();//过期时间long TIME_OUT = 600L;if ("INSERT".equals(type)) {//新增语句for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {String id = tbCommodityInfo.getId();//新增到redis中,过期时间是10分钟redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);}} else if ("UPDATE".equals(type)) {//更新语句for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {String id = tbCommodityInfo.getId();//更新到redis中,过期时间是10分钟redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);}} else {//删除语句for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {String id = tbCommodityInfo.getId();//从redis中删除redisClient.deleteKey(id);}}}}
}

测试MySQL与Redis同步

mysql对应的表结构如下:

CREATE TABLE `tb_commodity_info` (`id` varchar(32) NOT NULL,`commodity_name` varchar(512) DEFAULT NULL COMMENT '商品名称',`commodity_price` varchar(36) DEFAULT '0' COMMENT '商品价格',`number` int(10) DEFAULT '0' COMMENT '商品数量',`description` varchar(2048) DEFAULT '' COMMENT '商品描述',PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品信息表';

首先在MySQL创建表。然后启动项目,接着新增一条数据:

INSERT INTO `canaldb`.`tb_commodity_info` (`id`, `commodity_name`, `commodity_price`, `number`, `description`) VALUES ('3e71a81fd80711eaaed600163e046cc3', '叉包', '3.99', '3', '大叉包,老喜欢');

tb_commodity_info表查到新增的数据:

img

Redis也查到了对应的数据,证明同步成功!

img

如果更新呢?试一下Update语句:

UPDATE `canaldb`.`tb_commodity_info` SET `commodity_name`='青菜包',`description`='便宜的青菜包' WHERE `id`='3e71a81fd80711eaaed600163e046cc3';

img

img

没有问题!

总结

canal的缺点:

  1. canal只能同步增量数据。
  2. 不是实时同步,是准实时同步。
  3. 存在一些bug,不过社区活跃度较高,对于提出的bug能及时修复。
  4. MQ顺序性问题。
    网的回答,大家参考一下
    img

尽管有一些缺点,毕竟没有一样技术(产品)是完美的,合适最重要。

http://www.lryc.cn/news/236493.html

相关文章:

  • NOIP2023模拟19联测40 诡异键盘
  • 算法设计与分析 | 众数问题(c语言)
  • sql server外键设置
  • R语言实现多变量孟德尔随机化分析(1)
  • 搭建 AI 图像生成器 (SAAS) php laravel
  • Maven引用本地jar包
  • 一起学docker系列之五docker的常用命令--操作容器的命令
  • WPF打开对话框选择文件、选择文件夹
  • nginx学习(3)
  • 【系统架构设计】计算机公共基础知识: 4 数据库系统
  • 主键问题以及分布式 id
  • ReentranReadWriteLock 使用案例
  • “我们把最扎心的话,说给了自己最亲近的人” 何解?| IDCF
  • MongoDB之索引和聚合
  • 【GEE】基于GEE进行非监督学习
  • 多视图聚类的论文阅读(一)
  • K-Means算法进行分类
  • 深度学习交通车辆流量分析 - 目标检测与跟踪 - python opencv 计算机竞赛
  • 网络协议入门 笔记一
  • 系列十一、你平时工作用过的JVM常用基本配置参数有哪些?
  • 如何为视频添加旁白,有哪些操作技巧?
  • 如何简单挖掘公益SRC?
  • PhpStorm激活
  • mysql 怎么做定时备份 / mysql 备份 / sql文件导出
  • 416. 分割等和子集问题(动态规划)
  • 【软件安装】Centos系统中安装docker容器(华为云HECS云耀服务器)
  • GitHub Proxy 快速下载github文件
  • 大厂秋招真题【栈】Bilibili2019秋招-简单表达式求值
  • (一)RISC-V 指令集及寄存器介绍
  • 二十三种设计模式:解密职责链模式-购物优惠活动的设计艺术