当前位置: 首页 > news >正文

Rabbitmq集成springboot,手动确认消息basicAck、basicNack、basicReject的使用

一、手动确认消息模式

模式成功消费消费失败连接中断性能
NONE自动删除消息丢失消息丢失最高
AUTO自动删除重试/死信消息重回队列中等
MANUAL手动删除自定义处理消息重回队列最低

手动确认: 消息到达消费者,不会自动确认,会等待消费者调用Basic.Ack命令,才会从内存/磁盘 移除这条消息。
自动确认: 消息只要到达消费者就会自动确认,不会考虑消费者是否正确消费了这些消息,直接从 内存/磁盘 中删除消息;

ack:成功处理消息,RabbitMO从队列中删除该消息
nack:消息处理失败,RabbitMO需要再次投递消息
reject:消息处理失败并拒绝该消息,RabbitMO从队列中删除该消息

手动确认消息basicAck、basicNack、basicReject的使用

1、basicAck

// 确认消息处理成功,basicAck 是 RabbitMQ 的手动确认机制核心方法
// 第一个参数 tag:消息的交付标签,唯一标识这条消息  第二个参数 false:表示不批量确认,仅确认当前这条消息
channel.basicAck(deliveryTag,false);

2、basicNack

//拒绝确认消息,即告诉 RabbitMQ 我们无法处理这条消息。这会导致该消息被重新放回队列中等待再次投递。
//1 第一个参数 表示消息的交付标签,是一个单调递增的标识符,用于唯一标识队列中的一条消息。通过这个标签,RabbitMQ 能够知道你正在处理哪一条消息。
//2 第二个参数 表示是否批量确认 true 表示否定确认当前 tag 以及之前所有未确认的消息;如果为 false,则只否定确认当前 tag 对应的消息。 此处为 false,代表只处理当前这一条消息
//3 第三个参数 如果设置为 true,消息会被重新放回队列,等待再次投递给其他消费者或同一个消费者。如果设置为 false,消息不会被重新入队,
//而是根据队列的配置可能会被丢弃或者路由到死信队列(如果配置了的话)。此处为 true,表示在发生异常时将消息重新入队以便重试。
channel.basicNack(tag, false, true);

3、basicReject


// 拒绝消息,
// 1、第一个参数是消息的交付标签,
// 2、第二个参数表示是否将消息重新入队;false 表示拒绝后不重新入队,消息会被丢弃(如果没有配置死信队列,则可能被直接删除)
channel.basicReject(tag, false);

二、准备基本环境

1、pom.xml引入的java包

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>${springboot-version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>${springboot-version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version><scope>provided</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><version>${springboot-version}</version><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.57</version></dependency></dependencies>

2、yaml配置文件

# 8004是zookeeper服务器的支付服务提供者端口号
server:port: 8004
spring:application:name: cloud-mqrabbitmq:addresses: 192.168.96.133port: 5672username: guestpassword: guestvirtual-host: /#消费者配置listener:#todo 切记,设置了重拾机制,要抛出异常,不可try catch 后不抛异常,否则重试机制失效simple:#开启ack 手动确认消息是否被消费成功acknowledge-mode: manualretry:enabled: true# 消费失败后,继续消费,然后最多消费5次就不再消费。max-attempts: 5# 消费失败后 ,重试初始间隔时间 2秒initial-interval: 2000# 重试最大间隔时间5秒max-interval: 5000# 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间multiplier: 2direct:#开启ack 手动确认消息是否被消费成功acknowledge-mode: manual#todo 切记,设置了重拾机制,要抛出异常,不可try catch 后不抛异常,否则重试机制失效retry:enabled: true# 消费失败后,继续消费,然后最多消费3次就不再消费。max-attempts: 3# 消费失败后 ,重试初始间隔时间 3秒initial-interval: 3000# 重试最大间隔时间max-interval: 7000# 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间multiplier: 2# 生产者配置template:retry:# 开启消息发送失败重试机制enabled: true# 生产者 true-开启消息抵达队列的确认publisher-returns: false#simple 配置用于设置 RabbitMQ 消息生产者的消息确认类型为“简单确认”。这意味着当消息被发送到 RabbitMQ 之后,只有在消息成功投递到队列中后,RabbitMQ 才会向生产者发送一个确认(ack)通知。如果消息未能成功投递,则不会收到确认。#该配置通常与 publisher-returns: true 一起使用以启用消息返回机制,但在此配置中 publisher-returns 被设置为 false,表示不启用消息返回功能publisher-confirm-type: simple

3、主启动类


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** @author 10564*/
@SpringBootApplication
public class ApplicationRabbitmq {public static void main(String[] args) {SpringApplication.run(ApplicationRabbitmq.class, args);}
}

三、手动确认消息(以普通消息为例)

1、定义消息队列Queue名称

package org.xwb.springcloud.constant;/*** @author 10564*/
public class MqConstant {/*** 手动确认消息*/public static final String ACK_MQ_NAME = "ackQueue";
}

2、配置类Configuration


import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.xwb.springcloud.constant.MqConstant;/*** 创建RabbitMQ的配置类* @author 10564*/
@Configuration
public class RabbitmqAckConfig {/*** 简单消息队列*/@Beanpublic Queue ackQueue() {//名字(name):队列的名字,用来区分不同的队列。//是否持久化(durable):如果设置为 true,表示即使服务器重启了,这个队列依然存在。//是否独占(exclusive):如果设置为 true,表示只有创建它的连接才能使用这个队列。//是否自动删除(autoDelete):如果设置为 true,表示当不再有消费者使用这个队列时,服务器会自动删除它。return new Queue(MqConstant.ACK_MQ_NAME,true,false,false);}
}

3、生产者Producer

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;import javax.annotation.Resource;/*** @author 10564*/
@Component
public class AckProducer {private static final Logger log = LoggerFactory.getLogger(AckProducer.class);@Resourceprivate RabbitTemplate rabbitTemplate;public void senderAckMessage(String message) {log.info("\nack生产者发送消息:{}\n", message);rabbitTemplate.convertAndSend(MqConstant.ACK_MQ_NAME, message);}
}

4、消费者Consumer

package org.xwb.springcloud.messagetype.ack;import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;import java.util.Date;/*** @author 10564*/
@Component
public class AckConsumer {private static final Logger log = LoggerFactory.getLogger(AckConsumer.class);@RabbitListener(queues = MqConstant.ACK_MQ_NAME)public void receiveAckQueueMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws  Exception {try {log.info("\nack消费者接收消息:{},tag:{} \n", message, tag);if("basicAck".equals( message)){//todo  确认消息处理成功,// 第一个参数 tag:消息的交付标签,唯一标识这条消息// 第二个参数 false:表示不批量确认,仅确认当前这条消息log.info("\n手动确认 处理成功 basicAck :{} \n", new Date());channel.basicAck(tag, false);}else if("basicNack".equals( message)){//todo 拒绝确认消息,即告诉 RabbitMQ 我们无法处理这条消息。这会导致该消息被重新放回队列中等待再次投递。//todo  1 第一个参数 表示消息的交付标签,是一个单调递增的标识符,用于唯一标识队列中的一条消息。通过这个标签,RabbitMQ 能够知道你正在处理哪一条消息。//todo  2 第二个参数 表示是否批量确认 true 表示否定确认当前 tag 以及之前所有未确认的消息;如果为 false,则只否定确认当前 tag 对应的消息。 此处为 false,代表只处理当前这一条消息//todo  3 第三个参数 如果设置为 true,消息会被重新放回队列,等待再次投递给其他消费者或同一个消费者。如果设置为 false,消息不会被重新入队,//todo 而是根据队列的配置可能会被丢弃或者路由到死信队列(如果配置了的话)。此处为 true,表示在发生异常时将消息重新入队以便重试。log.info("\n手动拒绝确认消息 basicNack :{} \n", new Date());channel.basicNack(tag, false, false);}else if("basicReject".equals( message)){//todo  拒绝消息,//todo  1、第一个参数是消息的交付标签,//todo  2、第二个参数表示是否将消息重新入队;false 表示拒绝后不重新入队,消息会被丢弃(如果没有配置死信队列,则可能被直接删除)log.info("\n手动确认 拒绝消息 basicReject :{} \n", new Date());channel.basicReject(tag, false);}else{throw new RuntimeException("模拟消费失败");}} catch (Exception e) {log.error("\n消费消息异常,抛出异常{} message:{}\n",tag, e.getMessage());//todo 抛出异常,触发 spring retrythrow e;}}
}

5、测试Test


import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.xwb.springcloud.messagetype.ack.AckProducer;import javax.annotation.Resource;/*** @author 10564*/
@RestController
@RequestMapping("/mq")
public class MqMessageController {@Resourceprivate AckProducer ackProducer;@GetMapping("/ack")public void ack(String message) {ackProducer.senderAckMessage(message);}

6、测试结果


### ack
GET http://localhost:8004/mq/ack?message=basicAck2025-06-21 23:33:21.758  INFO 19824 --- [nio-8004-exec-1] o.x.s.messagetype.ack.AckProducer        : 
ack生产者发送消息:basicAck
2025-06-21 23:33:21.771  INFO 19824 --- [ntContainer#0-1] o.x.s.messagetype.ack.AckConsumer        : 
ack消费者接收消息:basicAck,tag:1 
2025-06-21 23:33:21.772  INFO 19824 --- [ntContainer#0-1] o.x.s.messagetype.ack.AckConsumer        : 
手动确认 处理成功 basicAck :Sat Jun 21 23:33:21 CST 2025 ### ack
GET http://localhost:8004/mq/ack?message=basicNack
2025-06-21 23:33:52.687  INFO 19824 --- [nio-8004-exec-2] o.x.s.messagetype.ack.AckProducer        : 
ack生产者发送消息:basicNack2025-06-21 23:33:52.690  INFO 19824 --- [ntContainer#0-1] o.x.s.messagetype.ack.AckConsumer        : 
ack消费者接收消息:basicNack,tag:2 2025-06-21 23:33:52.690  INFO 19824 --- [ntContainer#0-1] o.x.s.messagetype.ack.AckConsumer        : 
手动拒绝确认消息 basicNack :Sat Jun 21 23:33:52 CST 2025 ### ack
GET http://localhost:8004/mq/ack?message=basicReject
2025-06-21 23:34:14.653  INFO 19824 --- [nio-8004-exec-3] o.x.s.messagetype.ack.AckProducer        : 
ack生产者发送消息:basicReject2025-06-21 23:34:14.656  INFO 19824 --- [ntContainer#0-1] o.x.s.messagetype.ack.AckConsumer        : 
ack消费者接收消息:basicReject,tag:3 2025-06-21 23:34:14.656  INFO 19824 --- [ntContainer#0-1] o.x.s.messagetype.ack.AckConsumer        : 
手动确认 拒绝消息 basicReject :Sat Jun 21 23:34:14 CST 2025 
http://www.lryc.cn/news/573390.html

相关文章:

  • 在 MyBatis 的xml中,什么时候大于号和小于号可以不用转义
  • Axios 在 Vue3 项目中的使用:从安装到组件中的使用
  • 升级到 .NET 9 分步指南
  • “最浅”的陷阱:聊聊二叉树最小深度的递归坑点与解法哲学
  • 秋招Day14 - MySQL - SQL优化
  • c++11标准(5)——并发库(互斥锁)
  • 一、什么是生成式人工智能
  • 终端里的AI黑魔法:OpenCode深度体验与架构揭秘
  • Java ArrayList集合和HashSet集合详解
  • 【论文笔记】【强化微调】TinyLLaVA-Video-R1:小参数模型也能视频推理
  • 人人都是音乐家?腾讯开源音乐生成大模型SongGeneration
  • 旧物回收小程序开发:开启绿色生活新方式
  • Python列表常用操作方法
  • 从语义到推荐:大语言模型(LLM)如何驱动智能选车系统?
  • 首页实现多级缓存
  • AWS-SAA 第二部份:安全性和权限管理
  • 《map和set的使用介绍》
  • Linux TCP/IP协议栈中的TCP输入处理:net/ipv4/tcp_input.c解析
  • TCP 三次握手与四次挥手全流程详解
  • 【智能体】n8n聊天获取链接后爬虫知乎
  • 设计模式精讲 Day 9:装饰器模式(Decorator Pattern)
  • 【RTP】基于mediasoup的RtpPacket的H.264打包、解包和demo 1:不含扩展
  • 2D曲线点云平滑去噪
  • 雨声_锦程_时年
  • linux生产环境下根据关键字搜索指定日志文件命令
  • 软件工程期末试卷选择题版带答案(共214道)
  • 借助ChatGPT快速开发图片转PDF的Python工具
  • Java大厂面试攻略:Spring Boot与微服务架构深度剖析
  • `shallowReactive` 与 `shallowRef`:浅层响应式 API
  • 网络编程及原理(六):三次握手、四次挥手