Rabbitmq集成springboot,手动确认消息basicAck、basicNack、basicReject的使用
一、手动确认消息模式
模式 | 成功消费 | 消费失败 | 连接中断 | 性能 |
---|---|---|---|---|
NONE | 自动删除 | 消息丢失 | 消息丢失 | 最高 |
AUTO | 自动删除 | 重试/死信 | 消息重回队列 | 中等 |
MANUAL | 手动删除 | 自定义处理 | 消息重回队列 | 最低 |
手动确认: 消息到达消费者,不会自动确认,会等待消费者调用Basic.Ack命令,才会从内存/磁盘 移除这条消息。
自动确认: 消息只要到达消费者就会自动确认,不会考虑消费者是否正确消费了这些消息,直接从 内存/磁盘 中删除消息;
ack:成功处理消息,RabbitMO从队列中删除该消息
nack:消息处理失败,RabbitMO需要再次投递消息
reject:消息处理失败并拒绝该消息,RabbitMO从队列中删除该消息
手动确认消息basicAck、basicNack、basicReject的使用
1、basicAck
// 确认消息处理成功,basicAck 是 RabbitMQ 的手动确认机制核心方法
// 第一个参数 tag:消息的交付标签,唯一标识这条消息 第二个参数 false:表示不批量确认,仅确认当前这条消息
channel.basicAck(deliveryTag,false);
2、basicNack
//拒绝确认消息,即告诉 RabbitMQ 我们无法处理这条消息。这会导致该消息被重新放回队列中等待再次投递。
//1 第一个参数 表示消息的交付标签,是一个单调递增的标识符,用于唯一标识队列中的一条消息。通过这个标签,RabbitMQ 能够知道你正在处理哪一条消息。
//2 第二个参数 表示是否批量确认 true 表示否定确认当前 tag 以及之前所有未确认的消息;如果为 false,则只否定确认当前 tag 对应的消息。 此处为 false,代表只处理当前这一条消息
//3 第三个参数 如果设置为 true,消息会被重新放回队列,等待再次投递给其他消费者或同一个消费者。如果设置为 false,消息不会被重新入队,
//而是根据队列的配置可能会被丢弃或者路由到死信队列(如果配置了的话)。此处为 true,表示在发生异常时将消息重新入队以便重试。
channel.basicNack(tag, false, true);
3、basicReject
// 拒绝消息,
// 1、第一个参数是消息的交付标签,
// 2、第二个参数表示是否将消息重新入队;false 表示拒绝后不重新入队,消息会被丢弃(如果没有配置死信队列,则可能被直接删除)
channel.basicReject(tag, false);
二、准备基本环境
1、pom.xml引入的java包
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>${springboot-version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>${springboot-version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version><scope>provided</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><version>${springboot-version}</version><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.57</version></dependency></dependencies>
2、yaml配置文件
# 8004是zookeeper服务器的支付服务提供者端口号
server:port: 8004
spring:application:name: cloud-mqrabbitmq:addresses: 192.168.96.133port: 5672username: guestpassword: guestvirtual-host: /#消费者配置listener:#todo 切记,设置了重拾机制,要抛出异常,不可try catch 后不抛异常,否则重试机制失效simple:#开启ack 手动确认消息是否被消费成功acknowledge-mode: manualretry:enabled: true# 消费失败后,继续消费,然后最多消费5次就不再消费。max-attempts: 5# 消费失败后 ,重试初始间隔时间 2秒initial-interval: 2000# 重试最大间隔时间5秒max-interval: 5000# 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间multiplier: 2direct:#开启ack 手动确认消息是否被消费成功acknowledge-mode: manual#todo 切记,设置了重拾机制,要抛出异常,不可try catch 后不抛异常,否则重试机制失效retry:enabled: true# 消费失败后,继续消费,然后最多消费3次就不再消费。max-attempts: 3# 消费失败后 ,重试初始间隔时间 3秒initial-interval: 3000# 重试最大间隔时间max-interval: 7000# 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间multiplier: 2# 生产者配置template:retry:# 开启消息发送失败重试机制enabled: true# 生产者 true-开启消息抵达队列的确认publisher-returns: false#simple 配置用于设置 RabbitMQ 消息生产者的消息确认类型为“简单确认”。这意味着当消息被发送到 RabbitMQ 之后,只有在消息成功投递到队列中后,RabbitMQ 才会向生产者发送一个确认(ack)通知。如果消息未能成功投递,则不会收到确认。#该配置通常与 publisher-returns: true 一起使用以启用消息返回机制,但在此配置中 publisher-returns 被设置为 false,表示不启用消息返回功能publisher-confirm-type: simple
3、主启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** @author 10564*/
@SpringBootApplication
public class ApplicationRabbitmq {public static void main(String[] args) {SpringApplication.run(ApplicationRabbitmq.class, args);}
}
三、手动确认消息(以普通消息为例)
1、定义消息队列Queue名称
package org.xwb.springcloud.constant;/*** @author 10564*/
public class MqConstant {/*** 手动确认消息*/public static final String ACK_MQ_NAME = "ackQueue";
}
2、配置类Configuration
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.xwb.springcloud.constant.MqConstant;/*** 创建RabbitMQ的配置类* @author 10564*/
@Configuration
public class RabbitmqAckConfig {/*** 简单消息队列*/@Beanpublic Queue ackQueue() {//名字(name):队列的名字,用来区分不同的队列。//是否持久化(durable):如果设置为 true,表示即使服务器重启了,这个队列依然存在。//是否独占(exclusive):如果设置为 true,表示只有创建它的连接才能使用这个队列。//是否自动删除(autoDelete):如果设置为 true,表示当不再有消费者使用这个队列时,服务器会自动删除它。return new Queue(MqConstant.ACK_MQ_NAME,true,false,false);}
}
3、生产者Producer
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;import javax.annotation.Resource;/*** @author 10564*/
@Component
public class AckProducer {private static final Logger log = LoggerFactory.getLogger(AckProducer.class);@Resourceprivate RabbitTemplate rabbitTemplate;public void senderAckMessage(String message) {log.info("\nack生产者发送消息:{}\n", message);rabbitTemplate.convertAndSend(MqConstant.ACK_MQ_NAME, message);}
}
4、消费者Consumer
package org.xwb.springcloud.messagetype.ack;import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;import java.util.Date;/*** @author 10564*/
@Component
public class AckConsumer {private static final Logger log = LoggerFactory.getLogger(AckConsumer.class);@RabbitListener(queues = MqConstant.ACK_MQ_NAME)public void receiveAckQueueMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {try {log.info("\nack消费者接收消息:{},tag:{} \n", message, tag);if("basicAck".equals( message)){//todo 确认消息处理成功,// 第一个参数 tag:消息的交付标签,唯一标识这条消息// 第二个参数 false:表示不批量确认,仅确认当前这条消息log.info("\n手动确认 处理成功 basicAck :{} \n", new Date());channel.basicAck(tag, false);}else if("basicNack".equals( message)){//todo 拒绝确认消息,即告诉 RabbitMQ 我们无法处理这条消息。这会导致该消息被重新放回队列中等待再次投递。//todo 1 第一个参数 表示消息的交付标签,是一个单调递增的标识符,用于唯一标识队列中的一条消息。通过这个标签,RabbitMQ 能够知道你正在处理哪一条消息。//todo 2 第二个参数 表示是否批量确认 true 表示否定确认当前 tag 以及之前所有未确认的消息;如果为 false,则只否定确认当前 tag 对应的消息。 此处为 false,代表只处理当前这一条消息//todo 3 第三个参数 如果设置为 true,消息会被重新放回队列,等待再次投递给其他消费者或同一个消费者。如果设置为 false,消息不会被重新入队,//todo 而是根据队列的配置可能会被丢弃或者路由到死信队列(如果配置了的话)。此处为 true,表示在发生异常时将消息重新入队以便重试。log.info("\n手动拒绝确认消息 basicNack :{} \n", new Date());channel.basicNack(tag, false, false);}else if("basicReject".equals( message)){//todo 拒绝消息,//todo 1、第一个参数是消息的交付标签,//todo 2、第二个参数表示是否将消息重新入队;false 表示拒绝后不重新入队,消息会被丢弃(如果没有配置死信队列,则可能被直接删除)log.info("\n手动确认 拒绝消息 basicReject :{} \n", new Date());channel.basicReject(tag, false);}else{throw new RuntimeException("模拟消费失败");}} catch (Exception e) {log.error("\n消费消息异常,抛出异常{} message:{}\n",tag, e.getMessage());//todo 抛出异常,触发 spring retrythrow e;}}
}
5、测试Test
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.xwb.springcloud.messagetype.ack.AckProducer;import javax.annotation.Resource;/*** @author 10564*/
@RestController
@RequestMapping("/mq")
public class MqMessageController {@Resourceprivate AckProducer ackProducer;@GetMapping("/ack")public void ack(String message) {ackProducer.senderAckMessage(message);}
6、测试结果
### ack
GET http://localhost:8004/mq/ack?message=basicAck2025-06-21 23:33:21.758 INFO 19824 --- [nio-8004-exec-1] o.x.s.messagetype.ack.AckProducer :
ack生产者发送消息:basicAck
2025-06-21 23:33:21.771 INFO 19824 --- [ntContainer#0-1] o.x.s.messagetype.ack.AckConsumer :
ack消费者接收消息:basicAck,tag:1
2025-06-21 23:33:21.772 INFO 19824 --- [ntContainer#0-1] o.x.s.messagetype.ack.AckConsumer :
手动确认 处理成功 basicAck :Sat Jun 21 23:33:21 CST 2025 ### ack
GET http://localhost:8004/mq/ack?message=basicNack
2025-06-21 23:33:52.687 INFO 19824 --- [nio-8004-exec-2] o.x.s.messagetype.ack.AckProducer :
ack生产者发送消息:basicNack2025-06-21 23:33:52.690 INFO 19824 --- [ntContainer#0-1] o.x.s.messagetype.ack.AckConsumer :
ack消费者接收消息:basicNack,tag:2 2025-06-21 23:33:52.690 INFO 19824 --- [ntContainer#0-1] o.x.s.messagetype.ack.AckConsumer :
手动拒绝确认消息 basicNack :Sat Jun 21 23:33:52 CST 2025 ### ack
GET http://localhost:8004/mq/ack?message=basicReject
2025-06-21 23:34:14.653 INFO 19824 --- [nio-8004-exec-3] o.x.s.messagetype.ack.AckProducer :
ack生产者发送消息:basicReject2025-06-21 23:34:14.656 INFO 19824 --- [ntContainer#0-1] o.x.s.messagetype.ack.AckConsumer :
ack消费者接收消息:basicReject,tag:3 2025-06-21 23:34:14.656 INFO 19824 --- [ntContainer#0-1] o.x.s.messagetype.ack.AckConsumer :
手动确认 拒绝消息 basicReject :Sat Jun 21 23:34:14 CST 2025