当前位置: 首页 > news >正文

springcloud rocketmq 新增的消费者组从哪里开始消费

如果新建一个新的消费者组,是否会消费历史消息,导致重复消费?

直接在 console 界面新增消费者组,但是没有办法绑定订阅关系,没有找到入口,在 控制台项目源码 rocketmq-externals 也没有找到可以确定订阅关系的接口,在阿里云的生产控制台也没有绑定的入口。
在这里插入图片描述
在这里插入图片描述
所以只能是消费者启动后再注册订阅关系。
消费者从哪里消费的计算:
RebalancePushImpl.java
在这里插入图片描述

默认走的是:CONSUME_FROM_LAST_OFFSET 规则,按照官方说法,是从最后的消费位点开始继续消费。
关键的获取消费位点的逻辑:readOffset方法:
RemoteBrokerOffsetStore.java
在这里插入图片描述
集群模式下,是从远程获取的偏移量,跟据 fetchConsumeOffsetFromBroker 方法:
在这里插入图片描述
在这里插入图片描述
报错,其实就是服务端没有该消费者组的offset,被catch住默认返回 -1.
又不是重试队列,所以拿最大的偏置,broker-a queue-8 的 brokerOffset 是 25
在这里插入图片描述
出来到了 RebalanceImpl.java 的 updateProcessQueueTableInRebalance 方法。
在这里插入图片描述
然后会被添加到 pullRequestList 通过 this.dispatchPullRequest(pullRequestList)

控制台topic消费进度中已经保存了新的消费者组的消费进度,但 consumeOffset都是 0, 还有 759 个消息没有消费。
在这里插入图片描述

消费者消费了一些比较早前的消息:
在这里插入图片描述
在这里插入图片描述

消费进度也随之更新。
在这里插入图片描述
为什么和官方的说法不一致呢?CONSUME_FROM_LAST_OFFSET 为什么没有起到作用?
参考官方的修复:Fix CONSUME_FROM_LAST_OFFSET mode may pull data from 0L #4909

~~
至于这个console怎么看?参考以下:
在rocketmq的控制台中,选择 topic -> consumer manage,就可以查看一个主题下的消费者组、集群、队列的消费情况。
在这里插入图片描述
其中,10.122.24.41 是本人的内网ip,如果我本地线程卡住了(或者debug中),这个在线状态也会下线的。目前我是分配到了其中的8个集群队列,broker-a(8~15)。
在这里插入图片描述
offsetTable 的内容和我所描述的一致。
此外:
我还有对比组,是之前创建的废弃的消费者组,集群位点 brokerOffset 不变,消费位点 consumerOffset 落后了许多,落后的总量 diffTotal 代表此消费者组还有这么多未消费的消息。而且也没有在线的消费者客户端 consumerClient。
在这里插入图片描述
如果这时,我配置启动消费者去消费此消费者组。预计会消费 delay = 294 个消息。
结果也确实如此,将消息消费完,而且分配到了所有集群的所有队列。
在这里插入图片描述

测试结果:默认策略会从offset = 0 开始消费。
在这里插入图片描述
所以该参数没有用,还是从0开始消费了,这时候只能靠消费者组重置位点操作了。

~~
tags过滤,是服务端过滤,为什么会直接将不需要的消息也丢失掉呢?
这就要涉及到订阅关系一致性。
在这里插入图片描述
在这里插入图片描述

参考:https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-5-x-series/developer-reference/message-filtering

tags过滤会将不匹配的直接跳过(丢失)
我的理解是,现在没有为某个tags有单独记录消费进度的地方,所谓的服务端过滤,也只是说用hashcode快速匹配拉取而已,之后也是直接将offset拉到队列尾的。

参考:https://mp.weixin.qq.com/s/RnS675dt-wErnEuolK6Zeg?spm=a2c6h.12873639.article-detail.18.3ba035175CHVos

http://www.lryc.cn/news/407996.html

相关文章:

  • Redis-缓存
  • MySQL练习05
  • [C++][STL源码剖析] 详解AVL树的实现
  • Kubernetes存储 - Node本地存储卷
  • Cocos Creator2D游戏开发-(2)Cocos 常见名词
  • 【不同设备间的数据库连接】被连接设备如何开权限给申请连接的设备
  • Whisper离线部署问题处理
  • 【Hive SQL】数据探查-数据抽样
  • 微信答题小程序产品研发-需求分析与原型设计
  • 基础模板Mybatis-plus+Springboot+Mysql开发配置文件
  • java-poi实现excel自定义注解生成数据并导出
  • LeetCode707 设计链表
  • [Mysql-DDL数据操作语句]
  • google 浏览器插件开发简单学习案例:TodoList;打包成crx离线包
  • 如何学习Doris:糙快猛的大数据之路(从入门到专家)
  • 梯度下降算法,gradient descent algorithm
  • Spring boot 2.0 升级到 3.3.1 的相关问题 (六)
  • C++模版基础知识与STL基本介绍
  • Android 防止重复点击
  • 使用阿里云云主机通过nginx搭建文件服务器
  • 微信Android一面凉经(2024)
  • VMware、Docker - 让虚拟机走主机代理,解决镜像封禁问题
  • 版本管理|为什么不推荐使用Git Rebase
  • Https post 请求时绕过证书验证方案
  • C# 数组常用遍历方式
  • 【JavaScript】详解Day.js:轻量级日期处理库的全面指南
  • AI算法与图像处理 | 吴恩达团队新作!多模态方向
  • 云服务器Ubuntu18.04进行Nginx配置
  • SQL labs-SQL注入(四,sqlmap对于post传参方式的注入)
  • R包:plot1cell单细胞可视化包