当前位置: 首页 > news >正文

基于电商场景的高并发RocketMQ实战-Consumer端队列负载均衡分配机制、并发消费以及消费进度提交

🌈🌈🌈🌈🌈🌈🌈🌈
【11来了】文章导读地址:点击查看文章导读!
🍁🍁🍁🍁🍁🍁🍁🍁

Consumer 端队列负载均衡分配机制

topic 是有一堆的 queue,而且分布在不同的 broker 上

并且在消费时,将多个 queue 分配给多个 consumer,每一个 consumer 会分配到一部分的 queue 进行消费

每个 consumer 会获取到 Topic 下包含的 queue 的信息 以及 每个 consumer group 下包含多少的 consumer ,那么 consumer 都使用相同的算法去做一次分配

  • Topic 下包含的 queue 的信息可以在 Broker 中获取
  • 每个 consumer group 下包含多少了 consumer 的信息也可以在 Broker 获取,因为每个 consumer 启动后,都会将 Broker 中进行注册

Consumer 分配队列:

Consumer 端队列的分配是通过 RebalanceService 这个组件实现的,拉取 Topic 的 queue 信息,拉取 consumer group 信息,根据算法分配 queue,确认自己需要拉取哪些 queue 的消息

RebalanceService 这个组件是在 Broker 中的,主要负责实现消息队列的动态负载均衡和自动分配,确保消息队列在消费者组内均匀分配,并在消费者组发生变化时进行动态调整,通过动态负载均衡和自动分配消息队列,保证了消费者组在消费消息时的 高效性和可靠性

那么分配好队列之后,Consumer 就知道自己分配了哪些 queue 了,Consumer 就可以去 Broker 中对应的 queue 进行数据的拉取,这里 Consumer 消息的拉取在 RocketMQ 中有两种实现(DefaultMQPushConsumer、DefaultMQPullConsumer, 但是在底层全部都是通过 pull 拉取消息进行消费的):

  • push 模式:服务端有数据后推送给客户端,实时性很高,但是增加了服务端工作量
  • pull 模式:客户端主动去服务端拉取数据,会导致数据接收不及时

RocketMQ 的长轮询:

RocketMQ 中使用了 长轮询 的方式,兼顾了 push 和 pull 两种模式的优点

长轮询: 长轮询本质上也是轮询,服务端在没有数据的时候并不是马上返回数据,而是会先将请求挂起,此时有一个长轮询后台线程每隔 5s 会去检查 queue 中是否有新的消息,如果有则去唤醒客户端请求,否则如果超过 15s 就会判断客户端请求超时

Consumer 端并发消费以及消费进度提交

Consumer 去 Broker 中拉取消息的线程只有一个,拉取到消息之后会将消息存放在 ProcessQueue 中,每一个 ConsumeQueue 都会对应一个 ProcessQueue

消息被拉取到会放在 ProcessQueue 中,等待线程池进行 并发消息 ,线程池处理消息时,就会调用到我们在创建生产者时注册的监听器中的 consumeMessage 方法,在这里会执行我们自己定义的业务逻辑,之后会返回状态码:SUCCESS 或 RECONSUME_LATER 等等,如果消费成功,线程会去 ProcessQueue 中删除对应的消息,并且会记录 consumer group 对于 queue 的消费进度 ,以实通过异步提交到 broker 中去,流程图如下:

在这里插入图片描述

Consumer 处理失败时的延迟消费机制:

在 consumer 消费消息失败的时候,线程池会将消费失败的消息发送到 Broker 中,在 Broker 中,对失败的消息进行一个 Topic 的改写为:RETRY_Topic_%,会根据之前的 Topic 名称进行改写,改写后呢,作为一个 延迟消息 重新写入 Commitlog 和 ConsumeQueue 中,再通过专门处理延迟消息的后台线程监听延迟消息是否到达延迟时间,当时间到达之后,会将改写后的 Topic 再重新改写为原来的 Topic 名称并写入 Commitlog,之后等待被消费者再次消费即可

http://www.lryc.cn/news/270312.html

相关文章:

  • 【Java开发岗面试】八股文—数据库MySQLRedis
  • IntelliJ IDEA [设置] 隐藏 .idea 等 .XXX 文件夹
  • 每日一题——LeetCode961
  • 基于Unity Editor开发一个技能编辑器可能涉及到的内容
  • Ubuntu 22.04 安装ftp实现与windows文件互传
  • EasyPoi使用案例
  • 分布式系统架构设计之分布式数据存储的分类和组合策略
  • javaEE -18(11000字 JavaScript入门 - 3)
  • LangChain.js 实战系列:入门介绍
  • pyCharm 打印控制台中文乱码解决办法
  • 计算机基础--Linux详解
  • 基于OpenAI的Whisper构建的高效语音识别模型:faster-whisper
  • cfa一级考生复习经验分享系列(十六)
  • 数模学习day05-插值算法
  • hive中struct相关函数总结
  • macos下转换.dmg文件为 .iso .cdr文件的简单方法
  • ALSA学习(5)——设备中的alsa
  • uniapp中组件库的丰富NumberBox 步进器的用法
  • 【Matlab】基于遗传算法优化BP神经网络 (GA-BP)的数据时序预测
  • 计算机毕业设计 基于HTML5+CSS3的在线英语阅读分级平台的设计与实现 Java实战项目 附源码+文档+视频讲解
  • 云原生|kubernetes|kubernetes资源备份和集群迁移神器velero的部署和使用
  • 【26.4K⭐】ShareX:一款开源免费、功能强大且丰富的截屏录屏软件
  • 什么是ajax,为什么使用ajax?
  • AI面板识别 - 华为OD统一考试
  • Linux之磁盘分区,挂载
  • 2核2G3M服务器上传速度多少?以阿里云和腾讯云为例
  • Cisco模拟器-OSPF路由协议
  • SpEL 的使用
  • 数据采集实战:电商详情页数据埋点
  • 计算机网络——计算大题(七)