RabbitMQ--消息顺序性
看本章之前强烈建议先去看博主的这篇博客
RabbitMQ--消费端单线程与多线程-CSDN博客
一、消息顺序性概念
消息顺序性是指消息在生产者发送的顺序和消费者接收处理的顺序保持一致。
二、RabbitMQ 顺序性保证机制
情况 顺序保证情况 备注 单队列,单消费者 消息严格按发送顺序消费 最简单且唯一保证顺序的场景 单队列,多个消费者 无法保证全局顺序,但可以设置 QoS 保证消费者串行处理自己收到的消息 通过 basicQos(1)
保证每个消费者一次只处理一条消息,但整体队列消息按消费者分配,顺序不保证消息确认和重发机制 如果未正确使用 ack,消息重发可能导致顺序乱 需开启手动确认,确保消息处理完毕后才 ack 消息重试与死信机制 可能导致消息顺序错乱 需要设计合理的重试策略和死信队列策略
三、顺序性的保证方式
单队列单消费者
保证消息完全顺序消费。适合严格顺序场景。
消息确认机制
使用手动确认
autoAck=false
,处理完后再basicAck
,防止消息乱序重发。QoS(basicQos)
设置
basicQos(1)
,保证消费者一次只处理一条消息,避免多条消息并发处理导致乱序。业务分区设计
按某个字段(比如订单ID)分区到不同队列,保证分区内顺序。
四、原生 Java 示例
1. 依赖
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version> </dependency>
2. 生产者代码
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;public class Producer {private static final String QUEUE_NAME = "order_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明队列,持久化channel.queueDeclare(QUEUE_NAME, true, false, false, null);for (int i = 1; i <= 10; i++) {String message = "Order Message " + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("Sent: " + message);Thread.sleep(100); // 模拟发送间隔}}} }
3. 消费者代码(单个消费者,保证顺序)
import com.rabbitmq.client.*;public class Consumer {private static final String QUEUE_NAME = "order_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 设置每次只处理一条消息,避免乱序channel.basicQos(1);System.out.println("Waiting for messages...");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody());System.out.println("Received: " + message);try {// 模拟处理消息Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();} finally {// 手动确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);System.out.println("Ack sent for: " + message);}};// 关闭自动确认,开启手动确认channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});} }
4. 多消费者并发消费注意事项
多个消费者消费同一队列,消息分发是轮询,整体消息顺序无法保证。
basicQos(1)
只保证单个消费者串行处理自己拿到的消息,但多个消费者间消息顺序无保证。若需要严格顺序,需要保证单消费者消费或者分队列处理。
五、Spring Boot 示例
1.
pom.xml
依赖<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.
application.yml
spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestlistener:simple:# 每个消费者预取消息数量,类似 basicQos(1)prefetch: 1
3. RabbitMQ 配置类
import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configuration public class RabbitConfig {public static final String QUEUE_NAME = "order_queue";@Beanpublic Queue orderQueue() {return new Queue(QUEUE_NAME, true); // 持久化队列} }
4. 生产者代码
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;@Component public class Producer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMessages() throws InterruptedException {for (int i = 1; i <= 10; i++) {String message = "Order Message " + i;rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_NAME, message);System.out.println("Sent: " + message);Thread.sleep(100);}} }
5. 消费者代码
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;@Component public class Consumer {@RabbitListener(queues = RabbitConfig.QUEUE_NAME)public void receiveMessage(String message) throws InterruptedException {System.out.println("Received: " + message);// 模拟消息处理时间,确保消息顺序Thread.sleep(500);System.out.println("Processed: " + message);} }
6. 主启动类
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication public class RabbitOrderApplication implements CommandLineRunner {@Autowiredprivate Producer producer;public static void main(String[] args) {SpringApplication.run(RabbitOrderApplication.class, args);}@Overridepublic void run(String... args) throws Exception {producer.sendMessages();} }
六、总结
方面 说明 单队列单消费者 保证严格消息顺序,消息先进先出。 单队列多消费者 消息轮询分发,整体顺序无法保证;设置 basicQos(1)
保证单个消费者顺序处理自己的消息。消息确认机制 手动 ack,避免消息未处理完成就确认导致顺序乱。 Spring Boot 配置 spring.rabbitmq.listener.simple.prefetch=1
控制每个消费者预取消息数。业务设计建议 对于严格顺序场景,推荐单队列单消费者或消息分区+单消费者方案。 如果要严格保证消息顺序性:
1. 单队列单消费者
2. 多消费者分区顺序
当你只要求 “某一类业务 ID 下的顺序”一致,如订单、用户、设备号等,而不要求全局顺序时,这种方案很好。
不能做到全局顺序消费!
不同队列之间顺序是无法控制的
比如
order_1
和order_5
属于不同分区,它们的处理时间会交叉,整体顺序就乱了。多消费者分区顺序代码样例
利用多个队列(分区),每个队列绑定一个消费者,保证队列内消息顺序;
生产者根据某个分区键(如订单ID哈希)选择发送到对应队列,保证同一个分区的消息顺序。
多消费者分区顺序消费示例(Spring Boot)
1. 项目结构与依赖
pom.xml
添加:<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2. 配置类:定义多个队列与交换机绑定
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configuration public class RabbitConfig {public static final int PARTITION_COUNT = 3;@Beanpublic DirectExchange directExchange() {return new DirectExchange("order_exchange");}@Beanpublic Queue queue0() {return new Queue("order_queue_0", true);}@Beanpublic Queue queue1() {return new Queue("order_queue_1", true);}@Beanpublic Queue queue2() {return new Queue("order_queue_2", true);}@Beanpublic Binding binding0(Queue queue0, DirectExchange directExchange) {return BindingBuilder.bind(queue0).to(directExchange).with("partition_0");}@Beanpublic Binding binding1(Queue queue1, DirectExchange directExchange) {return BindingBuilder.bind(queue1).to(directExchange).with("partition_1");}@Beanpublic Binding binding2(Queue queue2, DirectExchange directExchange) {return BindingBuilder.bind(queue2).to(directExchange).with("partition_2");} }
3. 生产者:根据订单ID哈希选择分区,发送到对应RoutingKey
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;@Component public class Producer {@Autowiredprivate RabbitTemplate rabbitTemplate;private static final int PARTITION_COUNT = RabbitConfig.PARTITION_COUNT;public void sendOrder(String orderId, String message) {int partition = Math.abs(orderId.hashCode()) % PARTITION_COUNT;String routingKey = "partition_" + partition;rabbitTemplate.convertAndSend("order_exchange", routingKey, message);System.out.println("Sent to " + routingKey + ": " + message);} }
4. 消费者:为每个队列配置单独消费者,保证分区顺序
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;@Component public class Consumer {@RabbitListener(queues = "order_queue_0")public void receivePartition0(String message) {System.out.println("Partition 0 received: " + message);// 业务处理,保证队列内顺序}@RabbitListener(queues = "order_queue_1")public void receivePartition1(String message) {System.out.println("Partition 1 received: " + message);}@RabbitListener(queues = "order_queue_2")public void receivePartition2(String message) {System.out.println("Partition 2 received: " + message);} }
5. 测试调用示例(主程序)
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication public class PartitionOrderApplication implements CommandLineRunner {@Autowiredprivate Producer producer;public static void main(String[] args) {SpringApplication.run(PartitionOrderApplication.class, args);}@Overridepublic void run(String... args) throws Exception {// 发送多条订单消息,orderId不同分区for (int i = 0; i < 20; i++) {String orderId = "order" + i;String message = "Order message for " + orderId;producer.sendOrder(orderId, message);Thread.sleep(100);}} }
6. 说明
消息根据订单ID哈希决定发送哪个队列
每个队列由单个消费者消费,保证该分区消息顺序
多个队列+多消费者,实现并发消费和分区顺序
🔁 顺序保证范围
粒度 保证情况 同一个 orderId
✅ 顺序消费(始终落在同一队列) 不同 orderId
❌ 不保证顺序(本来就不是要求)
✅ 结论
你这套方案:
👍 是 Spring Boot 下 RabbitMQ 顺序消费的推荐做法
👍 保证了“每个订单 ID 的消息顺序”
👍 可扩展,增加分区数提升并发能力