当前位置: 首页 > news >正文

深入浅出 RabbitMQ:工作队列实战(轮训策略VS公平策略)

大家好,我是工藤学编程 🦉一个正在努力学习的小博主,期待你的关注
实战代码系列最新文章😉C++实现图书管理系统(Qt C++ GUI界面版)
SpringBoot实战系列🐷【SpringBoot实战系列】SpringBoot3.X 整合 MinIO 存储原生方案
分库分表分库分表之实战-sharding-JDBC分库分表执行流程原理剖析
消息队列深入浅出 RabbitMQ-简单队列实战

前情摘要:

1、深入浅出 RabbitMQ-核心概念介绍与容器化部署
2、深入浅出 RabbitMQ-简单队列实战


【亲测宝藏】发现一个让 AI 学习秒变轻松的神站!不用啃高数、不用怕编程,高中生都能看懂的人工智能教程来啦!

👉点击跳转,和 thousands of 小伙伴一起用快乐学习法征服 AI,说不定下一个开发出爆款 AI 程序的就是你!


本文章目录

  • 深入浅出 RabbitMQ:工作队列实战(轮训策略VS公平策略)
    • 一、什么是工作队列?
    • 二、轮训策略(Round Robin)实战
      • 2.1 轮训策略原理
      • 2.2 代码实现
        • ① 消息生产者(Send)
        • ② 消息消费者(Recv1 & Recv2)
      • 2.3 轮训策略验证
        • 测试步骤:
        • 预期结果:
      • 2.4 轮训策略的缺点
    • 三、公平策略(Fair Dispatch)实战
      • 3.1 公平策略原理
      • 3.2 代码修改(仅需调整消费者)
      • 3.3 公平策略验证
        • 测试调整:
        • 测试步骤:
        • 预期结果:
    • 四、关键知识点总结
      • 注意事项:
    • 五、测试步骤回顾
    • 六、总结

深入浅出 RabbitMQ:工作队列实战(轮训策略VS公平策略)

一、什么是工作队列?

在实际业务中,当消息生产速度超过消费速度时(比如秒杀场景的订单消息、日志采集的大量数据),单消费者可能会导致消息堆积。工作队列(Work Queue) 通过引入多个消费者共同处理同一队列的消息,实现消息的分布式消费,解决"生产快、消费慢"的问题。

工作队列的核心特点:

  • 多个消费者监听同一队列,形成竞争关系
  • 消息一旦被一个消费者处理,其他消费者不会再收到该消息
  • 默认采用"轮训策略"分配消息,可通过配置优化为"公平策略"

二、轮训策略(Round Robin)实战

2.1 轮训策略原理

RabbitMQ默认的消息分配方式:将消息依次轮流发送给每个消费者,不考虑消费者的处理速度。例如10条消息会被均匀分给2个消费者(各处理5条)。

2.2 代码实现

① 消息生产者(Send)

发送10条消息到队列work_mq_rr,模拟高频率生产场景:

public class Send {private final static String QUEUE_NAME = "work_mq_rr";public static void main(String[] argv) throws Exception {// 1. 配置连接工厂(同简单队列,确保虚拟主机一致)ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.229.128");factory.setUsername("admin");factory.setPassword("password");factory.setVirtualHost("/dev"); // 生产者与消费者需使用同一虚拟主机factory.setPort(5672);try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 2. 声明队列(非持久化,非独占,不自动删除)channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 3. 循环发送10条消息for (int i = 0; i < 10; i++) {String message = "Hello World! " + i;// 发送消息(使用默认交换机,路由键=队列名)channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));System.out.println(" [x] 发送消息: '" + message + "'");}}}
}
② 消息消费者(Recv1 & Recv2)

两个消费者监听同一队列,模拟不同处理速度(此处为简化,暂用相同休眠时间,实际可调整差异):

// 消费者1
public class Recv1 {private final static String QUEUE_NAME = "work_mq_rr";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.229.128");factory.setUsername("admin");factory.setPassword("password");factory.setVirtualHost("/dev"); // 注意:与生产者保持一致(原代码误写为/xdclass1,已修正)factory.setPort(5672);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Recv1 等待消息...(按CTRL+C退出)");// 消息处理回调(模拟处理耗时:3秒)DeliverCallback deliverCallback = (consumerTag, delivery) -> {try {TimeUnit.SECONDS.sleep(3); // 模拟业务处理时间} catch (InterruptedException e) {e.printStackTrace();}String message = new String(delivery.getBody(), "UTF-8");System.out.println("Recv1 接收: '" + message + "'");// 手动确认消息(必须,否则消息会一直存在队列中)// 参数2:false表示仅确认当前消息,true表示确认所有小于当前tag的消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 关闭自动确认(autoAck=false),开启手动确认channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});}
}
// 消费者2(代码与Recv1一致,仅打印标识不同)
public class Recv2 {private final static String QUEUE_NAME = "work_mq_rr";public static void main(String[] argv) throws Exception {// 连接配置与Recv1相同ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.229.128");factory.setUsername("admin");factory.setPassword("password");factory.setVirtualHost("/dev");factory.setPort(5672);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Recv2 等待消息...(按CTRL+C退出)");// 消息处理回调(同样模拟3秒耗时)DeliverCallback deliverCallback = (consumerTag, delivery) -> {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}String message = new String(delivery.getBody(), "UTF-8");System.out.println("Recv2 接收: '" + message + "'");// 手动确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});}
}

2.3 轮训策略验证

测试步骤:
  1. 先启动两个消费者(Recv1、Recv2)
    在这里插入图片描述
    在这里插入图片描述

  2. 再启动生产者(Send),发送10条消息
    在这里插入图片描述

预期结果:
  • Recv1 接收消息:0、2、4、6、8(偶数)
  • Recv2 接收消息:1、3、5、7、9(奇数)
  • 两者各处理5条,均匀分配,但忽略处理能力差异(即使某消费者处理更快,也不会多分配)

实际结果
在这里插入图片描述
在这里插入图片描述

2.4 轮训策略的缺点

当消费者处理能力不均时(比如Recv1处理需1秒,Recv2需5秒),轮训策略会导致:

  • 处理快的消费者(Recv1)处理完后处于空闲状态
  • 处理慢的消费者(Recv2)积压消息,整体消费效率低

三、公平策略(Fair Dispatch)实战

3.1 公平策略原理

为解决轮训的负载不均问题,公平策略让消费者处理完一条消息后,再接收下一条,实现"能者多劳"。核心配置:通过channel.basicQos(1)设置"预取数"为1,告诉RabbitMQ:“在我处理完当前消息并确认前,不要给我发新消息”。

3.2 代码修改(仅需调整消费者)

在消费者的channel.queueDeclare之后,添加basicQos配置:

// 消费者1(Recv1)添加公平策略配置
channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 关键:设置预取数为1,开启公平策略
channel.basicQos(1); // 每次只接收1条未确认的消息// 后续的消息处理逻辑不变...
// 消费者2(Recv2)同样添加:
channel.basicQos(1);

3.3 公平策略验证

测试调整:

修改Recv1的处理耗时为1秒(快消费者),Recv2仍为3秒(慢消费者):

// Recv1的处理耗时改为1秒
TimeUnit.SECONDS.sleep(1); 
测试步骤:
  1. 启动Recv1(1秒/条)和Recv2(3秒/条)
  2. 启动生产者发送10条消息
预期结果:
  • 快消费者(Recv1)会处理更多消息(比如7-8条)
    在这里插入图片描述

  • 慢消费者(Recv2)处理较少消息(比如2-3条)
    在这里插入图片描述

  • 整体消费效率显著提升,避免"忙的忙死,闲的闲死"

四、关键知识点总结

维度轮训策略(Round Robin)公平策略(Fair Dispatch)
核心逻辑依次轮流分配,不考虑处理速度处理完一条再分配下一条,按能力分配
配置要点默认生效,无需额外配置需设置channel.basicQos(1)
适用场景所有消费者处理速度相近的场景消费者处理速度差异较大的场景
消息确认必须开启手动确认(autoAck=false)必须开启手动确认(autoAck=false)

注意事项:

  1. 手动确认是前提:无论哪种策略,若使用basicQos,必须关闭自动确认(autoAck=false),并在处理完后调用basicAck确认,否则消息会一直堆积。
  2. 预取数调整basicQos(n)n可根据实际场景调整(如n=5表示允许预取5条未确认消息),避免网络频繁交互。

五、测试步骤回顾

  1. 轮训策略测试

    • 启动2个默认消费者(无basicQos
    • 发送10条消息,观察均匀分配结果
  2. 公平策略测试

    • 消费者添加channel.basicQos(1)
    • 差异化设置消费者处理速度,发送消息,观察"能者多劳"效果

六、总结

工作队列通过多消费者分布式消费,解决了"生产快于消费"的问题,而策略的选择直接影响效率:

  • 轮训策略实现简单,适合消费者能力均衡的场景;
  • 公平策略通过basicQos(1)实现负载均衡,适合消费者能力差异大的场景。

实际项目中,需根据消费者处理能力、消息紧急程度等因素选择策略,必要时结合消息优先级、持久化等特性,进一步优化消息处理链路。

觉得有用请点赞收藏!
如果有相关问题,欢迎评论区留言讨论~

http://www.lryc.cn/news/609965.html

相关文章:

  • SpringCloud之Nacos基础认识-服务注册中心
  • 13.Home-面板组件封装
  • Mac桌面仿制项目--让ai一句话生成的
  • mac 技巧
  • 【AI 加持下的 Python 编程实战 2_13】第九章:繁琐任务的自动化(中)——自动批量合并 PDF 文档
  • 大模型×垂直领域:预算、时间、空间三重夹击下的生存法则
  • 2.2 vue2子组件注册使用
  • 西门子PLC S7-1200单轴步进控制电动机
  • Axure设计Web端新增表单页面模板案例
  • 【LeetCode 热题 100】215. 数组中的第K个最大元素——(解法一)快速选择
  • 安卓逆向(基础①-Google Pixel-Root)
  • Visual Studio 2022安装与快捷键全攻略
  • 模型蒸馏(Distillation):原理、算法、应用
  • 【达梦MPP(带主备)集群搭建】
  • Selenium教程(Python 网页自动化测试脚本)
  • 华为2288H V5服务器闪红灯 无法开机案例
  • C++八股文——设计模式
  • JSON Schema
  • mybatis-plus报错Caused by: java.sql.SQLException: 无效的列类型: 1111
  • 使用 Aspose.OCR 将图像文本转换为可编辑文本
  • 微软WSUS替代方案
  • Druid手写核心实现案例 实现一个简单Select 解析,包含Lexer、Parser、AstNode
  • AJAX表单验证项目实战:实时用户名检查
  • curl发送文件bodyParser无法获取请求体的问题分析
  • Stanford CS336 assignment1 | Byte-Pair Encoding (BPE) Tokenizer
  • NeoBase:一款开源、基于AI的数据库管理助手
  • 《Python 实用项目与工具制作指南》· 2.2 变量
  • Java中给List<T> 对象集合去重
  • golang的数组
  • SpringMVC 6+源码分析(三)DispatcherServlet实例化流程 2--(url 与contrller类如何进行映射)