当前位置: 首页 > news >正文

深入理解和应用RabbitMQ的Work Queues模型

文章目录

        • 1. 场景模拟
        • 2. 消息发送
        • 3. 消息接收
        • 4. 测试
        • 5. 能者多劳
        • 6. 总结

当你在处理消息时,可能会遇到这样的问题:消息的生产速度远远大于消费速度,导致消息堆积。这时候,Work Queues(工作队列)模型就能派上用场。简单来说,Work Queues 让多个消费者绑定到一个队列,共同消费队列中的消息,从而加快消息处理速度。

1. 场景模拟

我们来模拟一个这样的场景。首先,在控制台创建一个名为 work.queue 的队列。

2. 消息发送

我们通过循环发送大量消息来模拟消息堆积的现象。在 publisher 服务中的 SpringAmqpTest 类中添加一个测试方法:

@Test
public void testWorkQueue() throws InterruptedException {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, message_";for (int i = 0; i < 50; i++) {// 发送消息,每20毫秒发送一次,相当于每秒发送50条消息rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}
}
3. 消息接收

为了模拟多个消费者绑定同一个队列,我们在 consumer 服务的 SpringRabbitListener 中添加两个新的方法:

@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20);
}@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(200);
}

注意到这两个消费者都设置了 Thread.sleep 来模拟任务耗时:

  • 消费者1:Thread.sleep(20),相当于每秒处理50个消息。
  • 消费者2:Thread.sleep(200),相当于每秒处理5个消息。
4. 测试

启动 ConsumerApplication 后,执行 publisher 服务中编写的发送测试方法 testWorkQueue。结果如下:

消费者1接收到消息:【hello, message_0】21:06:00.869555300
消费者2........接收到消息:【hello, message_1】21:06:00.884518
...
消费者1接收到消息:【hello, message_48】21:06:01.920702500
消费者2........接收到消息:【hello, message_49】21:06:05.723106700

可以看到,消费者1和消费者2各自消费了25条消息:

  • 消费者1快速完成了任务。
  • 消费者2则缓慢处理任务。

消息是平均分配给每个消费者的,并没有考虑到各个消费者的处理能力,导致一个消费者空闲,另一个忙碌。这显然是低效的。

5. 能者多劳

spring 中,可以通过简单配置解决这个问题。修改 consumer 服务的 application.yml 文件,添加如下配置:

spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

再次测试,结果如下:

消费者1接收到消息:【hello, message_0】21:12:51.659664200
消费者2........接收到消息:【hello, message_1】21:12:51.680610
...
消费者2........接收到消息:【hello, message_49】21:12:52.746299900

这次,消费者1处理了更多的消息,消费者2则处理了较少的消息,总耗时在1秒左右,大大提升了效率。这充分利用了每一个消费者的处理能力,有效避免了消息积压问题。

6. 总结

Work Queues 模型的使用要点:

  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理。
  • 通过设置 prefetch 来控制消费者预取的消息数量。

这样可以更高效地利用资源,提高消息处理速度。

http://www.lryc.cn/news/417788.html

相关文章:

  • 嵌入式面试八股文(三)·野指针产生原因和解决方法、指针函数和函数指针的区别
  • OpenCV 中 CV_8UC1,CV_32FC3,CV_32S等参数的含义
  • v 3 + vite + ts 自适应布局(postcss-pxtorem)
  • (MTK)java文件添加简单接口并配置相应的SELinux avc 权限笔记2
  • Linux安全与高级应用(六)Linux Shell脚本编程的高级应用:条件测试与if语句的妙用
  • 升级MacOS(Mojave)后使用git问题
  • 基于PFC和ECN搭建无损RoCE网络的工作流程分析
  • 射频功率放大器调测简略
  • Linux使用docker搭建Redis 哨兵模式
  • springboot给类进行赋初值的四种方式
  • Day32 | 1049. 最后一块石头的重量 II 494. 目标和 474.一和零
  • linux 查看一个端口是否被占用
  • 【Git】5. 配置 Git
  • C语言:文件处理
  • SpringBoot MybatisPlus selectOne的坑
  • Spring源码-ClassPathXmlApplicationContext的refresh()都做了什么?
  • 网站加密和混淆技术简介
  • Kafka + Kraft 集群搭建教程,附详细配置及自动化安装脚本
  • “Apple Intelligence”的“系统提示词”被曝光了
  • django学习-数据表操作
  • 机器学习-决策树
  • opencascade TopoDS_Shape源码学习【重中之重】
  • Self-study Python Fish-C Note15 P52to53
  • Java小白入门到实战应用教程-异常处理
  • 使用Anaconda安装多个版本的Python并与Pycharm进行对接
  • android系统中data下的xml乱码无法查看问题剖析及解决方法
  • ​MySQL——索引(三)创建索引(2)使用 CREATE INDEX 语句在已经存在的表上创建索引
  • html+css 实现hover选择按钮
  • Python数据可视化利器:Matplotlib详解
  • 2024 NVIDIA开发者社区夏令营环境配置指南(Win Mac)