当前位置: 首页 > news >正文

(18)不重启服务动态停止、启动RabbitMQ消费者

        我们在消费RabbitMQ消息的过程中,有时候可能会想先暂停消费一段时间,然后过段时间再启动消费者,这个需求怎么实现呢?我们可以借助RabbitListenerEndpointRegistry这个类来实现,它的全类名是org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry,通过这个类可以实现全部队列消息的启动、停止消费,也可以实现指定队列消息的启动、停止消费。具体的原因感兴趣的话可以参考一下我前面的这篇博客(17)不重启服务动态调整RabbitMQ消费者数量,里面有相应的源码分析。

停止、启动全部队列消费

        RabbitListenerEndpointRegistry类提供了start()方法和stop()方法,可以看到底层都是通过调用getListenerContainers()获取到所有队列的消费监听容器列表,然后遍历挨个调用对应的start()方法和stop()方法。

	@Overridepublic void start() {for (MessageListenerContainer listenerContainer : getListenerContainers()) {startIfNecessary(listenerContainer);}}@Overridepublic void stop() {for (MessageListenerContainer listenerContainer : getListenerContainers()) {listenerContainer.stop();}}

        我们只需要获取到RabbitListenerEndpointRegistry对象,然后调用其start()方法和stop()方法即可实现启动/停止所有队列消费。

        实现代码如下所示:

@Resource
RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;@RequestMapping(value = "/startStopAllConsumer")
@ApiOperation(value = "启动/暂停全部队列消息消费")
public Response startStopAllConsumer(@RequestParam(value = "consumeSwitch", required = true) boolean consumeSwitch) {log.info("启动/暂停全部队列消息消费,consumeSwitch:{}",consumeSwitch);if(consumeSwitch){rabbitListenerEndpointRegistry.start();}else {rabbitListenerEndpointRegistry.stop();}return Response.success();
}

        传入开关参数为false,会停止所有队列消费者消费,调用后控制台看到如下日志

2023-09-04 19:43:11.480 +0800 [TID: N/A] [http-nio-8080-exec-4] INFO  c.b.t.m.p.w.PayCashierMockController:67 - 启动/暂停全部队列消息消费,consumeSwitch:false
2023-09-04 19:43:11.556 +0800 [TID: N/A] [http-nio-8080-exec-4] INFO  o.s.a.r.l.SimpleMessageListenerContainer:586 - Waiting for workers to finish.
2023-09-04 19:43:12.352 +0800 [TID: N/A] [http-nio-8080-exec-4] INFO  o.s.a.r.l.SimpleMessageListenerContainer:589 - Successfully waited for workers to finish.
可以看到消息监听容器关闭的日志,然后再传入开关参数为true,调用后会启动所有队列消息消费。

停止、启动指定队列消费

        上面提到了RabbitListenerEndpointRegistry.getListenerContainers()可以获取到所有队列的消费监听容器列表,我们可以使用MessageListenerContainer中获取消费的队列名进行判断,以实现指定队列的停止、启动消费。

        实现代码如下所示:

@Resource
RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;@RequestMapping(value = "/startStopConsumer")
@ApiOperation(value = "启动/暂停指定队列消息消费")
public Response startStopConsumer(@RequestParam(value = "queueName", required = false) String queueName,@RequestParam(value = "consumeSwitch", required = true) boolean consumeSwitch) {log.info("启动/暂停指定队列消息消费,consumeSwitch:{},queueName:{}",consumeSwitch,queueName);//获取所有消息监听容器Collection<MessageListenerContainer> listenerContainers = rabbitListenerEndpointRegistry.getListenerContainers();for (MessageListenerContainer container : listenerContainers) {SimpleMessageListenerContainer con = (SimpleMessageListenerContainer) container;//消息监听容器要消费的队列名称集合List<String> queueNamesList = Arrays.asList(con.getQueueNames());//判断容器中的队列名称是否包含需要调整的队列名参数if (queueNamesList.contains(queueName)) {if(consumeSwitch){con.start();}else{con.stop();}}}return Response.success();
}

传入开关参数为false,停止pay_work_notify队列消费者消费,调用后控制台看到如下日志

2023-09-04 19:51:37.130 +0800 [TID: N/A] [http-nio-8080-exec-1] INFO  c.b.t.m.p.w.PayCashierMockController:80 - 启动/暂停指定队列消息消费,consumeSwitch:false,queueName:pay_work_notify
2023-09-04 19:51:37.200 +0800 [TID: N/A] [http-nio-8080-exec-1] INFO  o.s.a.r.l.SimpleMessageListenerContainer:586 - Waiting for workers to finish.
2023-09-04 19:51:37.903 +0800 [TID: N/A] [http-nio-8080-exec-1] INFO  o.s.a.r.l.SimpleMessageListenerContainer:589 - Successfully waited for workers to finish.
可以看到消息监听容器关闭的日志,然后再传入开关参数为true,调用后会启动pay_work_notify队列消息消费。

http://www.lryc.cn/news/154291.html

相关文章:

  • 数据仓库的流程
  • MyBatis-Plus深入 —— 条件构造器与插件管理
  • C语言结构体的初始化方式
  • Vue生成多文件pdf准考证
  • Rust的derive思考
  • Python常用模块
  • Java“牵手”京东商品评论数据接口方法,京东商品评论接口,京东商品评价接口,行业数据监测,京东API实现批量商品评论内容数据抓取示例
  • 算法leetcode|75. 颜色分类(rust重拳出击)
  • 网络安全(黑客)自学笔记学习路线
  • NoSQL:非关系型数据库分类
  • 【Eclipse】Project interpreter not specified 新建项目时,错误提示,已解决
  • OPENCV实现图像查找
  • vue仿企微文档给页面加水印(水印内容可自定义,超简单)
  • “金融级”数字底座:从时代的“源启”,到“源启”的时代
  • zabbix自动发现linux系统挂载的nas盘,并实现读写故障的监控告警
  • 无涯教程-JavaScript - DAYS函数
  • 48、springboot 的国际化之让用户在程序界面上弄个下拉框,进行动态选择语言
  • FPGA可重配置原理及实现(1)——导论
  • Ubuntu系统下使用宝塔面板实现一键搭建Z-Blog个人博客的方法和流程
  • 数据结构 | 第一章 绪论
  • python爬虫入门教程(非常详细):如何快速入门Python爬虫?
  • ElementUI浅尝辄止21:Tree 树形控件
  • 插入排序,选择排序,交换排序,归并排序和非比较排序(C语言版)
  • 【每日一题】1041. 困于环中的机器人
  • C# 采用3DES-MAC进行签名 base64解码与编码
  • AI绘画:StableDiffusion实操教程-完美世界-魔女(附高清图下载)
  • python excel 读取及写入固定格式
  • SQL Server进阶教程读书笔记
  • DHTMLX Gantt 8.0.5 Crack -甘特图
  • RHCA之路---EX280(5)