RabbitMQ:SpringAMQP 多消费者绑定同一队列
目录
- 一、案例需求
- 二、代码实现
- 三、总结
生产者源码
消费者源码
一、案例需求
模拟WorkQueue,实现一个队列绑定多个消费者。
- 在RabbitMQ的控制台创建一个队列,命名为
work.queue
。 - 在生产者服务中定义测试方法,在1s内产生50条消息,发送到
work.queue
。 - 在消费这服务中定义两个消息监听,都监听
work.queue
队列。 - 消费者1每秒处理50条消息,消费者2每秒处理5条消息。
二、代码实现
生产者
@Test
public void workQueueTest() throws InterruptedException {for (int i = 0; i < 50; i++) {String queueName = "work.queue";String message = String.format("hello %s, spring amqp!", i + 1);rabbitTemplate.convertAndSend(queueName, message);Thread.sleep(200);}}
消费者
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String message){System.out.println(String.format("消费者1,收到了work.queue: %s", message));
}@RabbitListener(queues = "work.queue")
public void listenSimpleQueue2(String message){System.err.println(String.format("消费者2,收到了work.queue: %s", message));
}
三、总结
默认情况下,RabbitMQ会将消息依次轮询投递给绑定在队列上的所有消费者。但是这并没有考虑到消费者是否已经处理完消息,可能会出现消息堆积。
因此我们需要修改application.yml
,设置prefetch
值为1,确保同一时刻最多投递给消费者1条消息。
spring:rabbitmq:listener:simple:prefetch: 1
- 多个消费者绑定到一个队列,可以加快消费处理速度。
- 同一个消息只会被一个消费者处理。
- 通过设置
prefetch
来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳。