当前位置: 首页 > news >正文

RabbitMQ--消息顺序性

看本章之前强烈建议先去看博主的这篇博客

              RabbitMQ--消费端单线程与多线程-CSDN博客

一、消息顺序性概念

消息顺序性是指消息在生产者发送的顺序消费者接收处理的顺序保持一致。


二、RabbitMQ 顺序性保证机制

情况顺序保证情况备注
单队列,单消费者消息严格按发送顺序消费最简单且唯一保证顺序的场景
单队列,多个消费者无法保证全局顺序,但可以设置 QoS 保证消费者串行处理自己收到的消息通过 basicQos(1) 保证每个消费者一次只处理一条消息,但整体队列消息按消费者分配,顺序不保证
消息确认和重发机制如果未正确使用 ack,消息重发可能导致顺序乱需开启手动确认,确保消息处理完毕后才 ack
消息重试与死信机制可能导致消息顺序错乱需要设计合理的重试策略和死信队列策略


三、顺序性的保证方式

  1. 单队列单消费者

    • 保证消息完全顺序消费。适合严格顺序场景。

  2. 消息确认机制

    • 使用手动确认 autoAck=false,处理完后再 basicAck,防止消息乱序重发。

  3. QoS(basicQos)

    • 设置 basicQos(1),保证消费者一次只处理一条消息,避免多条消息并发处理导致乱序。

  4. 业务分区设计

    • 按某个字段(比如订单ID)分区到不同队列,保证分区内顺序。


四、原生 Java 示例


1. 依赖

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version>
</dependency>

2. 生产者代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer {private static final String QUEUE_NAME = "order_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明队列,持久化channel.queueDeclare(QUEUE_NAME, true, false, false, null);for (int i = 1; i <= 10; i++) {String message = "Order Message " + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("Sent: " + message);Thread.sleep(100);  // 模拟发送间隔}}}
}

3. 消费者代码(单个消费者,保证顺序)

import com.rabbitmq.client.*;public class Consumer {private static final String QUEUE_NAME = "order_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 设置每次只处理一条消息,避免乱序channel.basicQos(1);System.out.println("Waiting for messages...");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());System.out.println("Received: " + message);try {// 模拟处理消息Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();} finally {// 手动确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);System.out.println("Ack sent for: " + message);}};// 关闭自动确认,开启手动确认channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});}
}

4. 多消费者并发消费注意事项

  • 多个消费者消费同一队列,消息分发是轮询,整体消息顺序无法保证

  • basicQos(1) 只保证单个消费者串行处理自己拿到的消息,但多个消费者间消息顺序无保证。

  • 若需要严格顺序,需要保证单消费者消费或者分队列处理。


五、Spring Boot 示例


1. pom.xml 依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. application.yml

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestlistener:simple:# 每个消费者预取消息数量,类似 basicQos(1)prefetch: 1

3. RabbitMQ 配置类

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {public static final String QUEUE_NAME = "order_queue";@Beanpublic Queue orderQueue() {return new Queue(QUEUE_NAME, true); // 持久化队列}
}

4. 生产者代码

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class Producer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessages() throws InterruptedException {for (int i = 1; i <= 10; i++) {String message = "Order Message " + i;rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_NAME, message);System.out.println("Sent: " + message);Thread.sleep(100);}}
}

5. 消费者代码

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class Consumer {@RabbitListener(queues = RabbitConfig.QUEUE_NAME)public void receiveMessage(String message) throws InterruptedException {System.out.println("Received: " + message);// 模拟消息处理时间,确保消息顺序Thread.sleep(500);System.out.println("Processed: " + message);}
}

6. 主启动类

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class RabbitOrderApplication implements CommandLineRunner {@Autowiredprivate Producer producer;public static void main(String[] args) {SpringApplication.run(RabbitOrderApplication.class, args);}@Overridepublic void run(String... args) throws Exception {producer.sendMessages();}
}

六、总结

方面说明
单队列单消费者保证严格消息顺序,消息先进先出。
单队列多消费者消息轮询分发,整体顺序无法保证;设置 basicQos(1) 保证单个消费者顺序处理自己的消息。
消息确认机制手动 ack,避免消息未处理完成就确认导致顺序乱。
Spring Boot 配置spring.rabbitmq.listener.simple.prefetch=1 控制每个消费者预取消息数。
业务设计建议对于严格顺序场景,推荐单队列单消费者或消息分区+单消费者方案。

如果要严格保证消息顺序性:

        1. 单队列单消费者 

        2. 多消费者分区顺序

                当你只要求 “某一类业务 ID 下的顺序”一致,如订单、用户、设备号等,而不要求全局顺序时,这种方案很好。

                不能做到全局顺序消费!

                        不同队列之间顺序是无法控制的

                        比如 order_1order_5 属于不同分区,它们的处理时间会交叉,整体顺序就乱了。

多消费者分区顺序代码样例

  • 利用多个队列(分区),每个队列绑定一个消费者,保证队列内消息顺序;

  • 生产者根据某个分区键(如订单ID哈希)选择发送到对应队列,保证同一个分区的消息顺序。


多消费者分区顺序消费示例(Spring Boot)


1. 项目结构与依赖

pom.xml 添加:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. 配置类:定义多个队列与交换机绑定

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {public static final int PARTITION_COUNT = 3;@Beanpublic DirectExchange directExchange() {return new DirectExchange("order_exchange");}@Beanpublic Queue queue0() {return new Queue("order_queue_0", true);}@Beanpublic Queue queue1() {return new Queue("order_queue_1", true);}@Beanpublic Queue queue2() {return new Queue("order_queue_2", true);}@Beanpublic Binding binding0(Queue queue0, DirectExchange directExchange) {return BindingBuilder.bind(queue0).to(directExchange).with("partition_0");}@Beanpublic Binding binding1(Queue queue1, DirectExchange directExchange) {return BindingBuilder.bind(queue1).to(directExchange).with("partition_1");}@Beanpublic Binding binding2(Queue queue2, DirectExchange directExchange) {return BindingBuilder.bind(queue2).to(directExchange).with("partition_2");}
}

3. 生产者:根据订单ID哈希选择分区,发送到对应RoutingKey

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class Producer {@Autowiredprivate RabbitTemplate rabbitTemplate;private static final int PARTITION_COUNT = RabbitConfig.PARTITION_COUNT;public void sendOrder(String orderId, String message) {int partition = Math.abs(orderId.hashCode()) % PARTITION_COUNT;String routingKey = "partition_" + partition;rabbitTemplate.convertAndSend("order_exchange", routingKey, message);System.out.println("Sent to " + routingKey + ": " + message);}
}

4. 消费者:为每个队列配置单独消费者,保证分区顺序

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class Consumer {@RabbitListener(queues = "order_queue_0")public void receivePartition0(String message) {System.out.println("Partition 0 received: " + message);// 业务处理,保证队列内顺序}@RabbitListener(queues = "order_queue_1")public void receivePartition1(String message) {System.out.println("Partition 1 received: " + message);}@RabbitListener(queues = "order_queue_2")public void receivePartition2(String message) {System.out.println("Partition 2 received: " + message);}
}

5. 测试调用示例(主程序)

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class PartitionOrderApplication implements CommandLineRunner {@Autowiredprivate Producer producer;public static void main(String[] args) {SpringApplication.run(PartitionOrderApplication.class, args);}@Overridepublic void run(String... args) throws Exception {// 发送多条订单消息,orderId不同分区for (int i = 0; i < 20; i++) {String orderId = "order" + i;String message = "Order message for " + orderId;producer.sendOrder(orderId, message);Thread.sleep(100);}}
}

6. 说明

  • 消息根据订单ID哈希决定发送哪个队列

  • 每个队列由单个消费者消费,保证该分区消息顺序

  • 多个队列+多消费者,实现并发消费和分区顺序

🔁 顺序保证范围

粒度保证情况
同一个 orderId✅ 顺序消费(始终落在同一队列)
不同 orderId❌ 不保证顺序(本来就不是要求)

✅ 结论

你这套方案:

  • 👍 是 Spring Boot 下 RabbitMQ 顺序消费的推荐做法

  • 👍 保证了“每个订单 ID 的消息顺序

  • 👍 可扩展,增加分区数提升并发能力

http://www.lryc.cn/news/597930.html

相关文章:

  • 【华为】笔试真题训练_20250611
  • go下载包
  • 数据库常用DDL语言
  • 文件管理困境如何破?ZFile+cpolar打造随身云盘新体验
  • M²IV:面向大型视觉-语言模型中高效且细粒度的多模态上下文学习
  • RabbitMQ简述
  • 【硬件-笔试面试题】硬件/电子工程师,笔试面试题-16,(知识点:电平转换电路)
  • RabbitMQ—仲裁队列
  • <论文>(斯坦福)DSPy:将声明式语言模型调用编译为自我优化的pipeline
  • 等保二级、三级配置表(阿里云)
  • RuoYi-Vue 项目 Docker 全流程部署实战教程
  • 【LeetCode数据结构】二叉树的应用(一)——单值二叉树问题、相同的树问题、对称二叉树问题、另一棵树的子树问题详解
  • [数据结构]#6 树
  • JVM Java虚拟机
  • 【Qt开发】信号与槽(一)
  • 机器学习入门指南它来了
  • LeetCodeOJ题:回文链表
  • Java学习----原型模式
  • vant-field 显示radio
  • 【Java】空指针(NullPointerException)异常深度攻坚:从底层原理到架构级防御,老司机的实战经验
  • 高级 JAVA 工程师卷 1
  • 【硬件-笔试面试题】硬件/电子工程师,笔试面试题-20,(知识点:热阻的概念,散热)
  • 专题:2025微短剧行业生态构建与跨界融合研究报告|附100+份报告PDF汇总下载
  • Python 使用环境下编译 FFmpeg 及 PyAV 源码(英特尔篇)
  • 第4章唯一ID生成器——4.1 分布式唯一ID
  • 中小企业安全落地:低成本漏洞管理与攻击防御方案
  • 深度分析Java内存模型
  • 企业级数据分析创新实战:基于表格交互与智能分析的双引擎架构
  • 【REACT18.x】CRA+TS+ANTD5.X实现useImperativeHandle让父组件修改子组件的数据
  • 赋能决策与创新的数据引擎:数据分析平台的价值与未来