当前位置: 首页 > article >正文

[特殊字符] 使用增量同步+MQ机制将用户数据同步到Elasticsearch

在开发用户搜索功能时,我们通常会将用户信息存储到 Elasticsearch(简称 ES) 中,以提高搜索效率。本篇文章将详细介绍我们是如何实现 MySQL 到 Elasticsearch 的增量同步,以及如何通过 MQ 消息队列实现用户信息实时更新 的机制。

一、整体思路

为了保证用户数据在 MySQL 与 ES 之间保持一致,我们采用了以下 双通道同步策略

  1. 定时任务 + 游标机制:实现 MySQL 到 ES 的增量同步

  2. 通过 MQ(消息队列) 实现实时同步用户更新/删除操作到 ES


二、定时任务增量同步逻辑详解

我们定义了一个定时任务 syncUserDataToESJob,主要用于从 user 表中 增量拉取变动数据,并同步到 ES。

✨ 增量拉取机制

为了避免全量同步的高开销,我们使用了 “更新时间 + 主键 ID”双重游标,实现分页增量同步:

List<User> usersBatch = userClient.selectIncrementalUsers(lastSyncTime, lastMaxId, PAGE_SIZE);

其中:

  • lastSyncTime 表示上次同步的最大更新时间

  • lastMaxId 用于处理相同更新时间下的并发写入

🧠 同步逻辑核心代码如下:

@XxlJob("syncUserDataToESJob")
@GlobalTransactional
public void syncUserData() {Date lastSyncTime = syncPointService.getLastSyncTime();Long lastMaxId = syncPointService.getLastMaxId();if (lastSyncTime == null) {lastSyncTime = new Date(0); // 默认从最早开始lastMaxId = 0L;}Date maxUpdateTime = lastSyncTime;Long maxId = lastMaxId;boolean hasNewData = false;while (true) {List<User> usersBatch = userClient.selectIncrementalUsers(lastSyncTime, lastMaxId, PAGE_SIZE);if (usersBatch.isEmpty()) break;hasNewData = true;List<EsUserDoc> esDocs = usersBatch.stream().map(this::convertToEsDoc).collect(Collectors.toList());esClient.bulkIndex(esDocs);for (User u : usersBatch) {Date updateTime = u.getUpdateTime();if (updateTime.after(maxUpdateTime)) {maxUpdateTime = updateTime;maxId = u.getId();} else if (updateTime.equals(maxUpdateTime) && u.getId() > maxId) {maxId = u.getId();}}lastSyncTime = maxUpdateTime;lastMaxId = maxId;}// 同步删除数据List<Long> deletedUserIds = userClient.selectDeletedUserIds(syncPointService.getLastSyncTime(), syncPointService.getLastMaxId());if (!deletedUserIds.isEmpty()) {esClient.bulkDeleteByIds(deletedUserIds);}if (hasNewData) {log.info("更新同步点:maxUpdateTime = {}, maxId = {}", maxUpdateTime, maxId);syncPointService.updateLastSyncPoint(maxUpdateTime, maxId);} else {log.info("本次没有增量数据,不更新同步点");}
}

📝 特别说明:

  • syncPointService 用于记录上次同步的时间点和 ID,保证每次定时任务可重复安全执行。

  • 如果服务中断重启,也不会造成数据丢失或重复。


三、用户修改通过 MQ 实时同步到 ES

虽然定时任务可以周期性同步,但如果用户更新昵称、头像、标签等信息,等待下一次定时任务才能生效,可能会造成 数据延迟

为此,我们引入了 消息队列机制,实现实时更新:

✅ 使用 MQ 的同步方案

  1. 用户信息发生变化时,在业务服务中发送一条消息:

UserUpdateMessage message = new UserUpdateMessage(userId);
rabbitTemplate.convertAndSend("user.topic.exchange", "user.update", message);
  1. 在 ES 同步服务中监听消息并处理:

@RabbitListener(queues = "user.update.queue")
public void onUserUpdate(UserUpdateMessage msg) {User user = userClient.getUserById(msg.getUserId());if (user != null) {EsUserDoc doc = convertToEsDoc(user);esClient.index(doc);}
}

💡 好处:

  • 实时:用户更新后立即同步到 ES

  • 解耦:业务逻辑与搜索逻辑分离

  • 高性能:避免频繁更新 ES


四、总结与展望

通过“定时任务 + 增量游标” 和 “消息队列实时更新” 的结合方案,我们实现了对用户数据高效且可靠的同步到 Elasticsearch。

同步方式特点使用场景
定时任务批量、容错性强周期性同步新增/修改/删除
MQ 实时快速、解耦用户主动更新资料时快速生效

未来我们还可以扩展以下能力:

  • 引入 Canal + Binlog 监听实现更彻底的实时同步

  • 支持多租户分库分表的场景下数据同步

  • 引入失败重试机制保障消息不丢


希望本文对你在做数据同步或 ES 架构设计时有所启发,欢迎点赞、收藏、评论交流!

http://www.lryc.cn/news/2385375.html

相关文章:

  • LeetCode 2942.查找包含给定字符的单词:使用库函数完成
  • 【mediasoup】MS_DEBUG_DEV 等日志形式转PLOG输出
  • 打卡第27天:函数的定义与参数
  • python训练营day34
  • 人工智能在医疗影像诊断上的最新成果:更精准地识别疾病
  • 塔能节能平板灯:点亮苏州某零售工厂节能之路
  • 3DMAX插件UV工具UV Tools命令参数详解
  • Docker 与微服务架构:从单体应用到容器化微服务的迁移实践
  • 《岁月深处的童真》
  • 文件夹图像批处理教程
  • RL电路的响应
  • 30-消息队列
  • 跨域解决方案之JSONP
  • 【AI测试革命】第七期:AI性能测试的深度实践——从智能建模到自动化调优的全链路升级
  • Thinkphp6使用token+Validate验证防止表单重复提交
  • AppAgentx 开源AI手机操控使用分享
  • Axure设计之带分页的穿梭框原型
  • 嵌入式硬件篇---陀螺仪|PID
  • 电机控制储备知识学习(五) 三项直流无刷电机(BLDC)学习(四)
  • Java—— 网络爬虫
  • Baklib内容中台的主要构成是什么?
  • 深度解析 Java 中介者模式:重构复杂交互场景的优雅方案
  • 家用和类似用途电器的安全 第1部分:通用要求 与2005版差异(7)
  • HTTP Digest 认证:原理剖析与服务端实现详解
  • untiy实现汽车漫游
  • PID项目---硬件设计
  • Pluto实验报告——基于FM的音频信号传输并解调恢复
  • 【Redis】AOF日志
  • Leetcode 2792. 计算足够大的节点数
  • 《关于浔川社团退出DevPress社区及内容撤回的声明》