【RabbitMQ】高级特性—消息确认详解
文章目录
- 消息确认机制
- 手动确认原理
- 手动确认方法
- 肯定确认——basicAck
- 否定确认——basicReject
- 否定确认——basicNack
- 代码示例
- AcknowledgeMode. NONE
- 1. 配置确认机制
- 2. 发送消息
- 3. 写消费端逻辑
- 4. 运行程序
- AcknowledgeMode. AUTO
- 1. 配置确认机制
- 2. 重新运行程序
- AcknowledgeMode. MANUAL
- 1. 配置确认机制
- 2. 消费端手动确认逻辑
- 3. 异常时拒绝签收
消息确认机制
生产者发送消息之后,到达消费端之后,可能会有以下情况:
- 消息处理成功
- 消息处理异常
RabbitMQ
向消费者发送消息之后,就会把这条消息删除掉
- 但是如果消息发出去之后,处理异常,而生产者不知道,就把消息删除了,这种情况就会出问题
如何确保消费端已经成功接收,并正确处理了呢?
- 为了保证消息从队列可靠地到达消费者,
RabbitMQ
提供了消息确认机制(messageacknowledgement
)
消费者在订阅队列的时候,可以指定 autoAck
参数,根据这个参数设置,消息确认机制分为以下两种:
- 自动确认:当
autoAck=true
,RabbitMQ
会自动把发出去的消息设置为确认,然后从内存(或磁盘) 中删除,而不管消费者是否真正地消费到了这些消息- 自动确认模式适合对于消息可靠性要求不高的场景
- 钉钉的已读未读,只要点开了消息,不管读没读消息,一律为“已读”
- 手动确认:当
autoAck=false
,RabbitMQ
会等待消费者显式地调用Basic.Ack
命令,恢复确认信号后才从内存(硬盘) 中移去消息- 这种模式适合对消息可靠性要求比较高的场景
- 发送消息,只有对方回复了之后,才视为“已读”
手动确认原理
当 autoAck
参数置为 false
,对于 RabbitMQ
服务端而言,队列中的消息分成了两个部分
- 等待投递给消费者的消息
- 已经投递给消费者,但是还没有收到消费者确认信号的消息
如果RabbitMQ
一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则RabbitMQ
会安排该消息重新进入队列,等待投递给下一个消费者(也有可能还是原来那个消费者)
从 RabbitMQ
的 Web
管理平台上,也可以但看当前队列中 Ready
状态和 Unacked
状态的消息数
Ready
:等待投递给消费者的消息数Unacked
:已经投递给消费者,但是未收到消费者确认信号的消息数
手动确认方法
消费者在收到消息之后,可以选择确认,也可以选择直接拒绝或者跳过,RabbitMQ
也提供了不同的确认应答的的方式,消费者客户端可以调用与其对应的 channel
的先关方法,共有三种
- 肯定确认
- 否定确认
- 否定确认
肯定确认——basicAck
肯定确认:Channel.basicAck(long deliveryTag, boolean multiple)
RabbitMQ
已知道该消息并且成功的处理消息,可以将其丢弃了delivertTag
- 消息的唯一标识,它是一个单调递增的
64
位的长整型值。 - 是每个通道(
Channel
) 独立维护的,所以在每个通道上都是唯一的 - 当消费者确认(
ack
) 一条消息时,必须使用对应的通道上进行确认
- 消息的唯一标识,它是一个单调递增的
multiple
:- 是否批量确认
- 在某些情况下,为了减少网络流量,可以对一系列连续的
deliveryTag
进行批量确认 - 值为
true
,则会一次性ack
所有小于或等于指定deliveryTag
的消息 - 值为
false
,则只确认当前指定deliveryTag
的消息
deliveryTag
是RabbitMQ
中消息确认机制的一个重要组成部分,它确保了消息传递的可靠性和顺序性
否定确认——basicReject
否定确认:Channel.basicReject(long deliveryTag, boolean requeue)
RabbitMQ
在2.0.0
版本开始引入了Basic.Reject
这个命令,消费者客户端可以调用channel.basicReject
方法来告诉RabbitMQ
拒绝这个消息deliveryTag
:参考channel.basicAck
requeue
- 表示拒绝后,这条消息如何处理(是否重新入队)
- 如果
requeue
参数设为true
,则RabbitMQ
会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者 - 如果
requeue
参数设为false
,则RabbitMQ
会把消息从队列中移除,而不会把它发送给新的消费者
否定确认——basicNack
否定确认:Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
Basic.Reject
一次只能拒绝一条消息,如果想要批量拒绝,则可以使用Basic.Nack
这个命令- 消费者客户端可以调用
channel.basicNack
方法来实现 - 参数介绍可以参考上面两个方法
multiple
参数设置为true
,则表示拒绝deliveryTag
编号之前所有未被当前消费者确认的消息
代码示例
我们基于 Spring Boot
来演示消息的确认机制,使用方式和使用 RabbitMQ Java Client
有一定差异
Spring-AMQP
对消息确认机制提供了三种策略
public enum AcknowledgeMode {NONE,MANUAL,AUTO;
}
AcknowledgeMode.NONE
- 这种模式下,消息一旦投递给消费者,不管消费者是否成功处理了消息,
RabbitMQ
都会自动确认消息,从RabbitMQ
队列中移除消息 - 如果消费者处理消息失败,消息可能会丢失
- 这种模式下,消息一旦投递给消费者,不管消费者是否成功处理了消息,
AcknowledgeMode.AUTO
(默认)- 消费者在消息处理成功时会自动确认消息
- 如果处理过程中抛出了异常,则不会确认消息
AcknowledgeMode.MANUAL
- 手动确认模式下,消费者必须在成功处理消息后显示调用
basicAck
方法来确认消息 - 如果消息未被确认,
RabbitMQ
会认为消息尚未被成功处理,并且会在消费者可用时重新投递该消息 - 这种模式提高了消息处理的可靠性,因为即使消费者处理消息后失败,消息也不回丢失,而是可以被重新处理
- 手动确认模式下,消费者必须在成功处理消息后显示调用
主要流程:
- 配置确认机制(自动确认/手动确认)
- 生产者发送消息
- 消费端逻辑
- 测试
AcknowledgeMode. NONE
- 这种模式下,消息一旦投递给消费者,不管消费者是否成功处理了消息,
RabbitMQ
都会自动确认消息,从RabbitMQ
队列中移除消息 - 如果消费者处理消息失败,消息可能会丢失
1. 配置确认机制
spring: rabbitmq: addresses: amqp://guest:guest@127.0.0.1:5672/coding listener: simple: acknowledge-mode: none
2. 发送消息
队列,交换机配置
public class Constant { public static final String ACK_EXCHANGE_NAME = "ack_exchange"; public static final String ACK_QUEUE = "ack_queue";
}
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean; public class RabbitMQConfig { // 1. 交换机 @Bean("ackExchange") public Exchange ackExchange() { return ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build(); } // 2. 队列 @Bean("ackQueue") public Queue ackQueue() { return QueueBuilder.durable(Constant.ACK_QUEUE).build(); } // 3. 队列和交换机绑定 Binding @Bean("ackBinding")public Binding ackBinding(@Qualifier("ackExchange") Exchange exchange, @Qualifier("ackQueue") Queue queue) { return BindingBuilder.bind(queue).to(exchange).with("ack").noargs(); }
}
通过接口发送消息:
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; @RestController
@RequestMapping("/producer")
public class ProductController { @Autowired private RabbitTemplate rabbitTemplate; @RequestMapping("/ack") public String ack() { rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack", "consumer ack test..."); return "发送成功!"; }
}
3. 写消费端逻辑
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component; @Component
public class AckQueueListener { // 指定监听队列的名称 @RabbitListener(queues = Constant.ACK_QUEUE) public void ListenerQueue(Message message, Channel channel) throws Exception { System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"), message.getMessageProperties().getDeliveryTag()); // 模拟处理失败 int num = 3 / 0; System.out.println("处理完成"); }
}
这个代码运行结果是正常的,运行后消息会被签收:Ready
为 0,unacked
为 0
4. 运行程序
调用接口,发送消息: http://127.0.0.1:8080/producer/ack
开启消费者,控制台输出:
管理界面:
- 可以看到,消费者处理失败,但是消息已经从
RabbitMQ
中移除
AcknowledgeMode. AUTO
- 消费者在消息处理成功时会自动确认消息
- 如果处理过程中抛出了异常,则不会确认消息
1. 配置确认机制
配置确认机制
spring: rabbitmq: addresses: amqp://guest:guest@127.0.0.1:5672/coding listener: simple: acknowledge-mode: auto
2. 重新运行程序
调用接口,发送消息: http://127.0.0.1:8080/producer/ack
可以看到队列中有一条消息,unacked
的为 0(需要先把消费者注掉,注掉相关注解即可)
开启消费者,控制台不断输出错误信息
管理界面
- 从日志上可以看出,当消费者出现异常时,
RabbitMQ
会不断重发 - 由于异常,多次重试还是失败,消息没被确认,也无法
nack
,就一直是unacked
状态,导致消息积压
AcknowledgeMode. MANUAL
1. 配置确认机制
配置确认机制
spring: rabbitmq: addresses: amqp://guest:guest@127.0.0.1:5672/coding listener: simple: acknowledge-mode: manual
2. 消费端手动确认逻辑
import com.example.rabbit_features.constant.Constant;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component; @Component
public class AckQueueListener { // 指定监听队列的名称 @RabbitListener(queues = Constant.ACK_QUEUE) public void ListenerQueue(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { // 1. 接收消息 System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"), message.getMessageProperties().getDeliveryTag()); // 2. 处理业务逻辑 System.out.println("处理业务逻辑"); // 手动设置一个异常,来测试异常拒绝机制 // int num = 3 / 0; // 3. 手动签收 channel.basicAck(deliveryTag, true); } catch (Exception e) { // 4. 异常了就拒绝签收 // 第三个参数 requeue,是否重新发送,如果为 true,则会重发;若为 false,则直接丢弃 channel.basicNack(deliveryTag, true, true); } }
}
- 这个代码运行的结果是正常的,运行后消息会被签收:
Read
为 0,unacked
为 0
控制台输出:
管理界面:
3. 异常时拒绝签收
主动设置异常
@Component
public class AckQueueListener { // 指定监听队列的名称 @RabbitListener(queues = Constant.ACK_QUEUE) public void ListenerQueue(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { // 1. 接收消息 System.out.printf("接收到消息: %s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"), message.getMessageProperties().getDeliveryTag()); // 2. 处理业务逻辑 System.out.println("处理业务逻辑"); // 手动设置一个异常,来测试异常拒绝机制 int num = 3 / 0; // 3. 手动签收 channel.basicAck(deliveryTag, true); } catch (Exception e) { // 4. 异常了就拒绝签收 // 第三个参数 requeue,是否重新发送,如果为 true,则会重发;若为 false,则直接丢弃 channel.basicNack(deliveryTag, true, true); } }
}
运行结果:消费异常时不断重试,deliveryTag
从 1 递增
控制台日志:
管理界面:unacked
也变成了 1