当前位置: 首页 > news >正文

RocketMq消费者动态订阅topic

一般情况就是生产者将消息发送到指定的topic,消费者订阅指定的topic下的消息进行消费,这种情况下的topic是写死的。

代码示例:

        // 初始化消费者,指定消费者组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_group");//指定RocketMq服务器地址consumer.setNamesrvAddr("localhost:9876");//每次拉取最多拉取消息10条 consumer.setConsumeMessageBatchMaxSize(10);/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费* 如果非第一次启动,那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//订阅topic下的说有tagconsumer.subscribe("topic_test", "*");// 注册消息监听器consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {msgs.forEach(msg->{log.info("--topic:"+msg.getTopic() + " --tags:"+msg.getTags());log.info("--body:"+new String(msg.getBody()));})return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();

简单了解一下注册监听器:

消费消息的方法consumeMessage有两个参数

  1. List<MessageExt> msgs:消费一次性从消息队列里面拉取的消息列表,可通过setConsumeMessageBatchMaxSize()方法来配置每次最多拉取消息条数
  2. ConsumeConcurrentlyContext context: 是 RocketMQ 并发消费上下文对象,它包含了当前消息消费的一些上下文信息和控制参数。

返回ConsumeConcurrentlyStatus类型:

  1. CONSUME_SUCCESS:用于确定同一批消息全部消费成功
  2. RECONSUME_LATER:如果消息消费失败,同一批的所有消息将重新被消费,直到达到最大重试次数。如果同一批的单条消息消费失败返回了RECONSUME_LATER,那么同一批的其他消息即使被成功消费了也会被重新消费。

现在有这么一种情况:生产者的topic可以由用户动态配置,消费者具体消费哪些topic下的消息是不知道的,当消费者正在运行时可以实现动态订阅topic。

实现方式:

@Slf4j
@Component
@Getter
public class DynamicMonitoringEvent {private DefaultMQPushConsumer consumer;private final List<String> topics = new CopyOnWriteArrayList<>();@PostConstructprivate void init(){getInstance();start();}public  DefaultMQPushConsumer getInstance(){log.info("初始化消费者");if (consumer == null){DefaultMQPushConsumer dconsumer = new DefaultMQPushConsumer(group);dconsumer.setNamesrvAddr(nameServer);dconsumer.setConsumeMessageBatchMaxSize(10);dconsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);dconsumer.registerMessageListener(new MessageListenerConcurrently() {public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs){log.info("--topic:"+msg.getTopic() + " --tags:"+msg.getTags());log.info("--body:"+new String(msg.getBody()));//处理业务逻辑}return  ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer = dconsumer;}return consumer;}public void start(){try {consumer.start();} catch (MQClientException e) {e.printStackTrace();}}// 动态设定topic的订阅// 注意:每调用一次setConsumer方法,topic监听会累计而不是覆盖public void setConsumer(String topic) {try {if (!StringUtils.hasText(topic) || topics.contains(topic)){return;}topics.add(topic);// 动态添加topic监听consumer.subscribe(topic, "*");//也可以重新注册监听器//  dconsumer.registerMessageListener(new MessageListenerConcurrently() {//     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,//                                                     ConsumeConcurrentlyContext context) {//         for (MessageExt msg : msgs){//                 log.info("--topic:"+msg.getTopic() + " --tags:"+msg.getTags());//                 log.info("--body:"+new String(msg.getBody()));//         }//         return  ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// }log.info("动态订阅topic:{} 成功", topic);} catch (MQClientException e) {log.error("动态订阅topic:{} 失败", topic, e);}}
}

在外部调用setConsumer方法实现动态订阅topic

@Autowired
private DynamicMonitoringEvent dynamicMonitoringEvent;public void setDynamicTopic() {Set<String> topics = //获取topic列表for (String topic : topics){dynamicMonitoringEvent.setConsumer(topic);}}

注意:每调用一次setConsumer方法,topic监听会累计而不是覆盖,所以我们需要做一下判断,防止重复订阅同一个topic,如果注册了新的监听器,会覆盖之前的监听器,一个消费者只能又一个监听器

http://www.lryc.cn/news/626078.html

相关文章:

  • 聚合链路与软件网桥的原理及配置方法
  • 【LeetCode 热题 100】279. 完全平方数——(解法一)记忆化搜索
  • JVM原生的assert关键字
  • 手写C++ string类实现详解
  • 使用redis读写锁实现抢券功能
  • 怎样平衡NLP技术发展中数据质量和隐私保护的关系?
  • JVM 面试精选 20 题(续)
  • JVM对象创建和内存分配
  • SpringAI接入openAI配置出现的问题全解析
  • 今日行情明日机会——20250819
  • Java开发面试实战:Spring Boot微服务与数据库优化案例分析
  • 星图云开发者平台新功能速递 | 微服务管理器:无缝整合异构服务,释放云原生开发潜能
  • 微服务如何集成swagger3
  • 微服务-08.微服务拆分-拆分商品服务
  • UE5 使用RVT制作地形材质融合
  • idea如何设置tab为4个空格
  • CSS backdrop-filter:给元素背景添加模糊与色调的高级滤镜
  • Day08 Go语言学习
  • Ansible 中的文件包含与导入机制
  • 常见 GC 收集器与适用场景:从吞吐量到亚毫秒停顿的全景指南
  • NestJS 依赖注入方式全解
  • TDengine IDMP 运维指南(3. 使用 Ansible 部署)
  • 【上升跟庄买入】副图/选股指标,动态黄色线由下向上穿越绿色基准线时,发出买入信号
  • day32-进程与线程(5)
  • Ubuntu 下面安装搜狗输入法debug记录
  • Ubuntu一键安装harbor脚本
  • WSL虚拟机(我的是ubuntu20.04)将系统文件转移到E盘
  • 机器学习之决策树:从原理到实战(附泰坦尼克号预测任务)
  • LINUX819 shell:for for,shift ,{} ,array[0] array[s] ,declare -x -a
  • 中科米堆CASAIM提供机加工件来料自动化测量尺寸方案