RabbitMQ 全面指南:从基础概念到高级特性实现
目录
1. MQ
1.1 MQ 概述
1.2 系统间通信方式
1.3 MQ的作用
2. RabbitMQ
2.1 RabbitMQ 简介
2.2 RabbitMQ 工作模式
2.2.1 Simple (简单模式)
2.2.2 Work Queue (工作队列模式)
2.2.3 Publish/Subscribe (发布/订阅模式)
2.2.4 Routing(路由模式)
2.2.5 Topics(通配符模式)
2.2.6 RPC(RPC通信模式)
2.2.7 Publisher Confirms(发布确认模式)
2.3 RabbitMQ 的实现
2.3.1 原生AMQP客户端
2.3.1.1 引入依赖
2.3.1.2 代码实现
2.3.1.2.1 Work Queues (工作队列)
2.3.1.2.1.1 提取常量
2.3.1.2.1.2 生产者代码
2.3.1.2.1.3 消费者代码
2.3.1.2.2 Publish/Subscribe (发布/订阅)
2.3.1.2.2.1 提取常量
2.3.1.2.2.2 生产者代码
2.3.1.2.2.3 消费者代码
2.3.1.2.3 Routing (路由模式)
2.3.1.2.3.1 提取常量
2.3.1.2.3.2 生产者代码
2.3.1.2.3.3 消费者代码
2.3.1.2.4 Topics (通配符模式)
2.3.1.2.4.1 提取常量
2.3.1.2.4.2 生产者代码
2.3.1.2.4.3 消费者代码
2.3.1.2.5 RPC (RPC 通信)
2.3.1.2.5.1提取常量
2.3.1.2.5.2 Clinet 代码
2.3.1.2.5.3 Server 代码
2.3.1.2.6 Publisher Confirms (发布确认)
2.3.1.2.6.1 提取常量
2.3.1.2.6.2 单独确认
2.3.1.2.6.3 批量确认
2.3.1.2.6.4 异步确认
2.3.1.2.6.5 时间对比
2.3.2 Spring AMQP
2.3.2.1 引入依赖
2.3.2.2 代码实现
2.3.2.2.1 配置 Constants
2.3.2.2.2 配置 Config
2.3.2.2.3 ProducerController
2.3.2.2.4 WorkListener
2.3.2.2.5 FanoutListener
2.3.2.2.6 DirectListener
2.3.2.2.7 TopicsListener
2.3.3 两种实现的对比
2.3.4 发送对象
2.3.4.1 创建和配置 MessageConverter
2.4 RabbitMQ 高级特性
2.4.1 消息确认
2.4.1.1 消息确认介绍
2.4.1.1.1 自动确认
2.4.1.1.2 手动确认
2.4.1.2 手动确认方法
2.4.1.2.1 肯定确认
2.4.1.2.2 否定确认
2.4.1.2.3 消费者确认机制
2.4.1.3 代码实现
2.4.1.3.1 配置确认机制
2.4.1.3.2 Controller
2.4.1.3.3 NoneListener
2.4.1.3.4 AutoListener
2.4.1.3.5 ManualListener
2.4.2 持久性
2.4.2.1 交换机持久化
2.4.2.2 队列持久化
2.4.2.3 消息持久化
2.4.2.3.1 RabbitMQ 客户端实现
2.4.2.3.2 RabbitTemplate 实现
2.4.3 发送方确认
2.4.3.1 Confirm 机制概述
2.4.3.2 配置 Confirm 机制
2.4.3.3 Confirm 代码实现
2.4.3.4 Return 机制概述
2.4.3.5 配置 Return 机制
2.4.3.6 Return 代码实现
2.4.4 重试机制
2.4.4.1 重试机制详解
2.4.4.2 重试核心概念
2.4.4.3 重试机制配置
2.4.4.4 自动确认重试模式
2.4.4.5 手动确认重试模式
2.4.5 TTL (Time to Live)
2.4.5.1 TTL 的基本概念
2.4.5.2 设置消息的 TTL
2.4.5.3 设置队列的 TTL
2.4.5.4 最短TTL共享的原理
2.4.6 死信
2.4.7 延迟队列
2.4.7.1 延迟队列概念
2.4.7.2 代码实现
2.4.7.3 延迟队列问题
2.4.7.4 问题解决方案
2.4.8 事务
2.4.8.1 事务介绍
2.4.9 消息分发
2.4.9.1 消息分发概念
2.4.9.2 解决方案
2.4.9.3 应用场景
2.4.9.3.1 限流
2.4.9.3.2 非公平分发 (负载均衡)
2.4.9.3.3 Spring AMQP 代码实现
2.4.10 幂等性保障
2.4.10.1 幂等性的定义与资源消耗问题
2.4.10.2 MQ的幂等性
2.4.10.3 解决方案
2.4.10.4 顺序性保障方案
2.4.10.5 消息积压
2.4.10.5.1 积压原因
2.4.10.5.2 解决方案
2.4.10.5.2.1 提高消费者效率
2.4.10.5.2.2 限制生产者速率
2.4.10.5.2.3 资源与配置优化
2.11 Raft 算法
2.12 仲裁队列
2.12.1 仲裁队列概念
2.12.2 仲裁队列工作机制
2.13 HAProxy 负载均衡
2.14 推拉模式
2.14.1 拉模式 (Pull Mode)
2.14.2 推模式 (Push Mode)
1. MQ
1.1 MQ 概述
消息队列(MQ)是一种在分布式系统中用于通信的关键组件。从字面意思上看,MQ 本质是一个队列,遵循 FIFO(先入先出)原则,队列中存储的内容是消息(message)。消息可以非常简单,比如只包含文本字符串或 JSON 数据,也可以很复杂,如内嵌对象。MQ 主要用于分布式系统之间的通信,解决数据传递的效率和可靠性问题。
1.2 系统间通信方式
在分布式系统中,系统之间的调用通常有两种方式:
a)同步通信:直接调用对方的服务,数据从一端发出后立即到达另一端。这种方式响应快,但可能导致调用方阻塞,尤其在处理耗时操作时效率低下。
b)异步通信:数据从一端发出后,先进入一个容器进行临时存储,当满足特定条件(如接收方准备好)时,再由容器转发给另一端。MQ 就是这个容器的具体实现,它解耦了发送方和接收方,提高了系统的灵活性和可扩展性。
1.3 MQ的作用
MQ 的核心工作是接收、存储和转发消息。
a)异步解耦:在业务流程中,一些耗时操作(如发送注册短信或邮件通知)不需要即时返回结果。MQ 可以将这些操作异步化。例如,用户注册后,系统立即返回注册成功消息,同时将通知任务放入 MQ;MQ 在后台异步处理通知,避免了用户等待。这降低了系统耦合度,提升了响应速度。
b)流量削峰:面对突发流量(如秒杀或促销活动),系统可能因过载而崩溃。MQ 能缓冲请求,将峰值流量排队处理。例如,在高并发场景下,请求先进入 MQ 队列,系统根据自身处理能力逐步消费消息,防止资源耗尽。这避免了为处理峰值而过度投资资源,优化了成本效率。
c)消息分发:当多个系统需要对同一数据做出响应时,MQ 可实现高效的消息分发。例如,支付成功后,支付系统向 MQ 发送一条消息;其他系统(如订单系统、库存系统)订阅该消息,无需轮询数据库。这减少了冗余查询,提高了数据一致性和系统性能。
d)延迟通知:MQ 支持延迟消息功能,适用于在特定时间后触发操作的场景。例如,在电子商务平台中,用户下单后未支付,系统将超时取消订单的任务放入 MQ 延迟队列;MQ 在指定时间(如下单后 30 分钟)自动发送消息,触发取消流程。这简化了定时任务管理,提升了用户体验。
2. RabbitMQ
有很多基于 MQ 实现的产品,例如 Kafka、RocketMQ、RabbitMQ...等。
a)Kafka:主要用于日志收集和传输,追求高吞吐量,性能卓越,单机吞吐级达到十万级。是由日志采集需求的首选。
b)RocketMQ:采用 Java 语言开发,在设计师借鉴了 Kafka,使用于追求可靠性比较高,并发量比较大的场景。但支持的客户端语言不多,且社区活跃度一般。
c)RabbitMQ:采用 Erlang 语言开发,MQ 功能比较完备,且几乎支持所有主流语言,开源提供的界面非常友好。性能较好,吞吐量能达万级,社区活跃度高。
2.1 RabbitMQ 简介
RabbitMQ 是 MQ 的一种流行实现,它基于 AMQP(Advanced Message Queuing Protocol)(高级消息队列协议),提供了可靠的消息传递、队列管理和路由功能。并能处理高吞吐量和复杂消息路由需求。
MQ 是异步通信容器的一种实现,RabbitMQ 是 MQ 的一种实现
2.2 RabbitMQ 工作模式
RabbitMQ是一个流行的消息队列系统,支持多种工作模式来处理消息的生产和消费。这些模式适用于不同场景,帮助实现高效、可靠的消息传递。
2.2.1 Simple (简单模式)
- 描述:这是最基本的点对点模式。生产者(P)将消息发送到队列(Queue),消费者(C)从队列中取出消息。队列充当缓存区,确保消息在传递过程中不会丢失。
- 特点:
- 一个生产者对应一个消费者。
- 每条消息只能被消费一次,不会重复。
- 简单易用,适合入门级应用。
- 适用场景:消息需要被单个消费者处理的场景,例如日志记录或简单任务队列。
原理示例:生产者发送消息到队列,消费者直接拉取消息。如果队列为空,消费者会等待新消息。
2.2.2 Work Queue (工作队列模式)
- 描述:扩展了简单模式,允许一个生产者对应多个消费者(C1, C2等)。消息被分发到不同的消费者,实现负载均衡。
- 特点:
- 消息不会重复,每个消费者接收不同的消息。
- 支持并行处理,提高系统吞吐量。
- 队列自动管理消息分发。
- 使用 RabbitMQ 默认内置 direct 类型交换器实现
- 适用场景:集群环境中的异步任务处理,例如12306短信通知服务:订单消息发送到队列,多个短信服务实例竞争消息并发送通知。
2.2.3 Publish/Subscribe (发布/订阅模式)
- 描述:引入交换机(Exchange)角色。生产者发送消息到交换机,交换机将消息复制并广播到所有绑定的队列,每个队列对应一个消费者。
- 特点:
- 一个生产者,多个消费者。
- 消息被复制多份,每个消费者接收相同消息。
- 交换机类型包括fanout(广播)、direct(定向)、topic(通配符)和 headers(头部匹配),但 headers 性能较差,较少使用。
- 使用 fanout 类型交换器实现
- 适用场景:消息需要被多个消费者同时接收的场景,例如实时通知或广播消息(如新闻更新)。
每种类型的交换机都代表不同模式,交换机类型介绍:
a)Fanout:广播,将消息交给所有绑定到交换机的队列 (Publish/Subscribe 模式)
b)Direct:定向,把消息交给符合指定 routing key 的队列 (Routing 模式)
c)Topic:通配符,把消息交个符合 routing pattern (路由模式) 的队列 (Topics 模式)
2.2.4 Routing(路由模式)
- 描述:发布/订阅模式的变种,增加路由键(RoutingKey)。生产者发送消息时指定RoutingKey,交换机根据BindingKey规则将消息筛选后路由到特定队列。
- 特点:
- 基于RoutingKey的精确匹配(相等匹配)。
- 消息只分发给符合规则的消费者队列。
- 比发布/订阅更精细控制消息分发。
- 使用 direct 类型交换器实现
- 适用场景:需要根据特定规则分发消息的场景,例如将错误日志路由到专门的处理服务。
2.2.5 Topics(通配符模式)
- 描述:路由模式的升级版,支持通配符匹配RoutingKey。RoutingKey使用点分隔符(如"order.*"),交换机根据模式规则路由消息。
- 特点:
- 基于通配符(如$*$匹配一个词,$#$匹配多个词)进行模式匹配。
- 比Routing模式更灵活,支持复杂过滤。
- 路由效率高,适合动态规则。
- 使用 topic 类型交换器实现
- 适用场景:需要灵活匹配和过滤消息的场景,例如订单系统中的多级分类(如"order.paid"或"order.canceled")。
原理示例:RoutingKey如"order.paid",BindingKey如"order.*",交换机匹配后转发消息。
2.2.6 RPC(RPC通信模式)
- 描述:实现远程过程调用(RPC),生产者发送请求消息,消费者处理并返回响应。通过两个队列(请求队列和响应队列)模拟回调机制。
- 特点:
- 无传统生产者和消费者角色,更像客户端-服务器模型。
- 支持同步通信,生产者等待响应。
- 队列用于传递请求和响应。
- 适用场景:分布式系统中的远程调用,例如微服务间的方法调用。
2.2.7 Publisher Confirms(发布确认模式)
- 描述:确保消息可靠发送到RabbitMQ服务器的机制。生产者将通道设置为confirm模式后,每条消息获得唯一ID,服务器异步发送确认(ACK)表示消息已接收。
- 特点:
- 提供消息可靠性保证,避免丢失。
- 异步确认机制,不影响生产者性能。
- 支持追踪消息状态。
- 适用场景:对数据安全性要求高的场景,例如金融交易或订单处理(如支付系统)。
一个 RabbitMQ 可以有多个虚拟机,这个虚拟机主要的功能就是进行逻辑上的数据隔离,一个虚拟机里有多个 exchange (交换机) 和 queue (队列) 。消息先进入交换机,由交换机分配给队列。
内置交换机:每个虚拟机初始自带的七个交换机
2.3 RabbitMQ 的实现
2.3.1 原生AMQP客户端
2.3.1.1 引入依赖
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.3</version>
</dependency>
2.3.1.2 代码实现
2.3.1.2.1 Work Queues (工作队列)
2.3.1.2.1.1 提取常量
public class Constants {public static final String HOST = "121.51.242.94";public static final int PORT = 5672;public static final String USER = "admin";public static final String PASSWORD = "admin";public static final String VIRTUAL_HOST = "test";public static final String WORK_QUEUE1 = "work.queue";public static final String WORK_QUEUE2 = "work.queue2";}
在Java开发中,提取常量到一个专门的类中,能显著提升代码的可维护性、可读性和健壮性。
2.3.1.2.1.2 生产者代码
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setVirtualHost(Constants.VIRTUAL_HOST);factory.setUsername(Constants.USER);factory.setPassword(Constants.PASSWORD);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(Constants.WORK_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.WORK_QUEUE2, true, false, false, null);for (int i = 0; i < 10; i++) {String msg = "hello " + i;channel.basicPublish("", Constants.WORK_QUEUE1, null, msg.getBytes());channel.basicPublish("",Constants.WORK_QUEUE2,null,msg.getBytes());}System.out.println("消息发送成功");channel.close();connection.close();}}
代码解析:
ConnectionFactory factory = new ConnectionFactory();
作用:初始化 RabbitMQ 连接工厂对象,用于配置与 RabbitMQ 服务器的连接参数
Connection connection = factory.newConnection(); // 创建TCP连接
Channel channel = connection.createChannel(); // 创建AMQP信道
作用:Connection 用于维护物理 TCP 连接;Channel:轻量级逻辑通道,用于复用连接,实际 AMQP 操作接口。
channel.queueDeclare(Constants.WORK_QUEUE1, true, false, false, null);
channel.queueDeclare(Constants.WORK_QUEUE2, true, false, false, null);
作用:声明消息队列
参数详解:
queue:队列名称,必须唯一
durable:true 表示队列持久化 (服务器重启后保留)
exclusive:false 表示非独占队列 (允许多个消费者)
autoDelete:false 表示无消费者时不自动删除
arguments:额外参数 (如 TTL、死信设置等),null 表示默认
for (int i = 0; i < 10; i++) {String msg = "hello " + i;channel.basicPublish("", Constants.WORK_QUEUE1, null, msg.getBytes());channel.basicPublish("", Constants.WORK_QUEUE2, null, msg.getBytes());
}
作用:发送消息
参数详解:
第一个 " " 表示使用默认 RabbitMQ 内置交换机 (direct 类型)。简化操作,适合简单场景;但灵活性较低,因为无法使用其他交换机类型的高级功能。
WORK_QUEUE1/WORK_QUEUE2:路由键 ( 当没有 交换机与 队列绑定的 Routing Key 时,用队列名来确定发送消息到哪个队列)
null:消息属性 (如优先级、过期时间、自定义 props等)
msg.getBytes():消息体 (字节数组形式)
channel.close(); // 关闭信道(释放资源)
connection.close(); // 关闭TCP连接
作用:资源释放
工作流程图:
graph LR
A[创建连接工厂] --> B[配置服务器参数]
B --> C[建立TCP连接]
C --> D[创建AMQP信道]
D --> E[声明持久化队列]
E --> F[循环发布消息]
F --> G[关闭信道]
G --> H[关闭连接]
2.3.1.2.1.3 消费者代码
public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setVirtualHost(Constants.VIRTUAL_HOST);factory.setUsername(Constants.USER);factory.setPassword(Constants.PASSWORD);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(Constants.WORK_QUEUE1, true, false, false, null);DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接受到消息" + new String(body));}};channel.basicConsume(Constants.WORK_QUEUE1,true,consumer);channel.close();connection.close();}
}
代码解析:
DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接受到消息" + new String(body));}
};
DefualtConsumer:RabbitMQ 客户端的基础消费者类,表示创建消费者对象。channel (消息通道对象) 表示与 RabbitMQ 服务器的连接通道。
方法重写:
handleDelivery():消息到达时的回调方法
consumerTag:消费者唯一标识符
envelope:包含消息元数据 (如 routing key,exchange 等)
properties:消息属性 (如 优先级、持久化标志、RPC 标志等)
body:消息内容的字节数组
channel.basicConsume(Constants.WORK_QUEUE1, true, consumer);
作用:开始从指定队列消费信息
参数解析:
Constants.WORK_QUEUE1:表示要消费的队列名称
true:自动确认模式标志 (自动从队列删除),false 则需要手动调用 basicAck() 确认
consumer:创建的消费者对象
2.3.1.2.2 Publish/Subscribe (发布/订阅)
在 发布/订阅 模型中,多了一个 Exchange (交换机)
2.3.1.2.2.1 提取常量
public class Constants {public static final String HOST = "120.53.241.96";public static final int PORT = 5672;public static final String USER = "admin";public static final String PASSWORD = "admin";public static final String VIRTUAL_HOST = "test";public static final String WORK_QUEUE1 = "work.queue";public static final String WORK_QUEUE2 = "work.queue2";public static final String FANOUT_QUEUE1 = "fanout.queue";public static final String FANOUT_QUEUE2 = "fanout.queue2";public static final String FANOUT_EXCHANGE = "fanout.exchange";}
2.3.1.2.2.2 生产者代码
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setUsername(Constants.USER);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);channel.queueBind(Constants.FANOUT_QUEUE1,Constants.FANOUT_EXCHANGE,"");channel.queueBind(Constants.FANOUT_QUEUE2,Constants.FANOUT_EXCHANGE,"");for (int i = 0; i < 20; i++) {String msg = "hello fanout" + i;channel.basicPublish(Constants.FANOUT_EXCHANGE,"",null,msg.getBytes());System.out.println("消息发送成功");}channel.close();connection.close();}
}
代码详解:
channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);
作用:用于声明一个交换器,Fanout 类型会将消息广播到所有绑定队列
参数意义:
Constants.FANOUT_EXCHANGE:声明的交换器名称
builtinExchangeType:交换器类型 (枚举类),FANOUT 表示广播模式。
true:durable 参数,表示交换器是否持久化
channel.queueBind(Constants.FANOUT_QUEUE1, Constants.FANOUT_EXCHANGE, "");
channel.queueBind(Constants.FANOUT_QUEUE2, Constants.FANOUT_EXCHANGE, "");
作用:将队列绑定到交换器,使交换器能将消息路由到队列
参数意义:
Constants.FANOUT_QUEUE1:表示要绑定的 队列名称
Constants.FANOUT_EXCHANGE:表示要绑定的 交换器名称
“” :routingkey 参数,在 FANOUT 模式中,routing 被忽略,长设置为 空字符串,所有消息无条件广播到所有交换器绑定的队列。在其他模式中表示 交换器和队列名 所绑定的 routing key。
for (int i = 0; i < 20; i++) {String msg = "hello fanout" + i;channel.basicPublish(Constants.FANOUT_EXCHANGE, "", null, msg.getBytes());System.out.println("消息发送成功");
}
作用:发送消息
参数意义:
Constants.FANOUT_EXCHANGE:表示 交换机 名称
" " :表示 交换机 和 队列 所绑定的 routing key
null:消息属性 (如优先级、过期时间等)
msg.getBytes():消息体 (字节数组形式)
关键点:
- Fanout特性: 路由键被忽略,所有消息无条件发送到所有绑定队列。
- 对于 Fanout 交换器,路由键的值是否为空都会被忽略。为了代码可读性和避免混淆,在发布到Fanout交换器时,可以将路由键设置为空字符串(
""
),但这并非强制要求,RabbitMQ内部会忽略它。这个在 queueBind 方法中绑定 交换器 和 队列 时体现。 - 持久化: 队列和交换器设置为
durable=true
,确保服务器重启后元数据不丢失(但消息本身持久化需要额外设置)。
2.3.1.2.2.3 消费者代码
public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT); //需要提前开放端口号connectionFactory.setUsername(Constants.USER_NAME);//账号connectionFactory.setPassword(Constants.PASSWORD); //密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机Connection connection = connectionFactory.newConnection();//2. 开启信道Channel channel = connection.createChannel();//3. 声明队列channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);//4. 消费消息DefaultConsumer consumer = new DefaultConsumer(channel){//从队列中收到消息, 就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:"+ new String(body));}};channel.basicConsume(Constants.FANOUT_QUEUE1, true, consumer);channel.close();connection.close();}
}
2.3.1.2.3 Routing (路由模式)
2.3.1.2.3.1 提取常量
public class Constants {public static final String ROUTING_QUEUE1 = "routing.queue";public static final String ROUTING_QUEUE2 = "routing.queue2";public static final String ROUTING_EXCHANGE = "routing.exchange";
}
2.3.1.2.3.2 生产者代码
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setVirtualHost(Constants.VIRTUAL_HOST);factory.setUsername(Constants.USER);factory.setPassword(Constants.PASSWORD);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(Constants.ROUTING_QUEUE1,true,false,false,null);channel.queueDeclare(Constants.ROUTING_QUEUE2, true, false, false, null);channel.exchangeDeclare(Constants.ROUTING_EXCHANGE, BuiltinExchangeType.DIRECT,true);channel.queueBind(Constants.ROUTING_QUEUE1,Constants.ROUTING_EXCHANGE,"a");channel.queueBind(Constants.ROUTING_QUEUE2,Constants.ROUTING_EXCHANGE,"b");channel.queueBind(Constants.ROUTING_QUEUE2,Constants.ROUTING_EXCHANGE,"c");String msg = "hello routing_queue 1, routing key is a";channel.basicPublish(Constants.ROUTING_EXCHANGE,"a",null,msg.getBytes());String msg2 = "hello routing_queue 2, routing key is b";channel.basicPublish(Constants.ROUTING_EXCHANGE,"b",null,msg2.getBytes());String msg3 = "hello routing_queue 2, routing key is c";channel.basicPublish(Constants.ROUTING_EXCHANGE,"c",null,msg3.getBytes());System.out.println("生产信息完毕");channel.close();connection.close();}}
代码详解:
与 FANOUT 模式唯一需要区别的有三点。首先 exchangeDeclare 方法声明的第二个参数需要指定为 DIRECT 模式。第二 queueBind 方法第三个参数需要绑定 交换机与队列之间的 routing key。第三 basicPublish 方法使用时,第二个参数需要 交换机与队列 之间正确的 routing key,交换机依据已经绑定的 routing key 来分发消息。
2.3.1.2.3.3 消费者代码
public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setVirtualHost(Constants.VIRTUAL_HOST);factory.setUsername(Constants.USER);factory.setPassword(Constants.PASSWORD);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(Constants.ROUTING_QUEUE1,true,false,false,null);DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumer1 消费完毕" + new String(body));}};channel.basicConsume(Constants.ROUTING_QUEUE1,true,consumer);channel.close();connection.close();}
}
2.3.1.2.4 Topics (通配符模式)
2.3.1.2.4.1 提取常量
public class Constants {public static final String TOPIC_QUEUE1 = "topic.queue";public static final String TOPIC_QUEUE2 = "topic.queue2";public static final String TOPIC_EXCHANGE = "topic.exchange";
}
2.3.1.2.4.2 生产者代码
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setVirtualHost(Constants.VIRTUAL_HOST);factory.setUsername(Constants.USER);factory.setPassword(Constants.PASSWORD);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC,true);channel.queueBind(Constants.TOPIC_QUEUE1,Constants.TOPIC_EXCHANGE,"*.x.*");channel.queueBind(Constants.TOPIC_QUEUE2,Constants.TOPIC_EXCHANGE,"*.*.y");channel.queueBind(Constants.TOPIC_QUEUE2,Constants.TOPIC_EXCHANGE,"xy.#");String msg = "hello this is *.x.*";channel.basicPublish(Constants.TOPIC_EXCHANGE,"a.x.b",null,msg.getBytes());String msg1 = "hello this is *.*.y";channel.basicPublish(Constants.TOPIC_EXCHANGE,"a.b.y",null,msg1.getBytes());String msg2 = "hello this is xy.#";channel.basicPublish(Constants.TOPIC_EXCHANGE,"ab.x.y",null,msg2.getBytes());channel.close();connection.close();}
}
代码详解:与 ROUTING 模式相比有三个不同点
首先使用 exchangeDeclare 方法时,第二个参数要设置成 TOPIC 模式。第二 使用 queueBind 方法时,第三个参数可以用 * 或 # 来替代字符串,用 * 实现精细过滤 (如匹配固定格式:区域.传感器. 类型),用 # 实现批量订阅 (如匹配所有以 log. 开头的日志消息)。第三就是要改变 basicPublish 方法中第二个参数,使其匹配绑定时的 routing key。
2.3.1.2.4.3 消费者代码
public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setVirtualHost(Constants.VIRTUAL_HOST);factory.setUsername(Constants.USER);factory.setPassword(Constants.PASSWORD);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumer1 消费完毕" + new String(body));}};channel.basicConsume(Constants.TOPIC_QUEUE1,true,consumer);channel.close();connection.close();}
}
2.3.1.2.5 RPC (RPC 通信)
RPC(Remote Procedure Call,远程过程调用)是一种网络通信协议,允许程序在分布式系统中调用另一台计算机上的函数或过程,就像调用本地函数一样。它简化了分布式应用的开发,隐藏了底层网络细节。
2.3.1.2.5.1提取常量
public class Constants {public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";
}
2.3.1.2.5.2 Clinet 代码
public class Client {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setVirtualHost(Constants.VIRTUAL_HOST);factory.setUsername(Constants.USER);factory.setPassword(Constants.PASSWORD);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(Constants.RPC_REQUEST_QUEUE,true,false,false,null);channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE,true,false,false,null);String msg = "hello this is Controller";String id = UUID.randomUUID().toString();AMQP.BasicProperties props = new AMQP.BasicProperties().builder().correlationId(id).replyTo(Constants.RPC_RESPONSE_QUEUE).build();channel.basicPublish("",Constants.RPC_REQUEST_QUEUE,props,msg.getBytes());BlockingQueue<String> response = new ArrayBlockingQueue<>(1);DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body);System.out.println("接收到消息" + msg);if(id.equals(properties.getCorrelationId())) {response.offer(msg);}}};channel.basicConsume(Constants.RPC_RESPONSE_QUEUE,true,consumer);String result = response.take();System.out.println("RPC Controller 响应结果" + result);channel.close();connection.close();}}
逻辑上 Client 需要创建 AMQP.BasicProperties 对象 并修改 correlationId 和 replyTo 参数,传递到 basicPublish 方法的 basicProperties 参数,最后通过内置交换机传递到 request 队列。接收 Server 从 Client replyTo 参数指定队列返回的值,需要判定 correlationId 是否改变。
代码详解:
String msg = "hello this is Controller";
String id = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties().builder().correlationId(id).replyTo(Constants.RPC_RESPONSE_QUEUE).build();
channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes());
作用:构建并发送 RPC 请求消息到请求队列,同设置相应关联信息。
参数详解:
id:使用 UUID.randomUUID().toString() 生成唯一标识符,用于匹配请求和响应
props:消息属性对象,通过 builder() 设置
correlationId:关联 ID,确保响应与请求匹配
replyTo:指定响应队列
2.3.1.2.5.3 Server 代码
public class Server {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setVirtualHost(Constants.VIRTUAL_HOST);factory.setUsername(Constants.USER);factory.setPassword(Constants.PASSWORD);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE,true,false,false,null);channel.queueDeclare(Constants.RPC_REQUEST_QUEUE,true,false,false,null);//channel.basicQos(1);DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String request = new String(body,"UTF-8");System.out.println("接收到请求:"+ request);String response = "针对request:"+ request +", 响应成功";AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();channel.basicPublish("",Constants.RPC_RESPONSE_QUEUE,basicProperties,response.getBytes());channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume(Constants.RPC_REQUEST_QUEUE,false,consumer);}
}
逻辑上 Server 通过 channel.basicQos() 来限制获取信息的数量。Server 通过 basicConsume 来进行内部检验 correlationId、向 Client 指定的 replyTo 队列生产消息。可以通过 basicConsume 的 auto_ack = false 设置手动确认,并在消费逻辑末尾调用 basicAck() 完成手动确认。
2.3.1.2.6 Publisher Confirms (发布确认)
这里提到的 Publisher Confirms 针对生产者实现的机制。作为消息中间件 就会面临消息丢失的问题。消息丢失大概分为以下三种:
a)生产者问题:因为应用程序故障,网络抖动等各种原因,生产者没有成功向 broker 发送消息。
b)消息中间件自身问题:生产者成功发送给了 Broker,但是 Broker 没有把消息保存好,导致消息丢失。
c)消费者问题:Broker 发送消息到消费者,消费者在消费消息时,因为没有处理好,导致 broker 将消费失败的消息从队列中删除了。
针对 A 和 C 类问题,我们可以使用 确认机制 (Acknowledgements) 来确保消息的可靠传递;针对 B 类问题,Broker 可以通过,持久化存储、部署高可用集群、定期备份等方式避免。
生产者确认 和 消费者确认:
方面 | 生产者确认 (Publisher Confirms) | 消费者手动确认 (Manual ACK) |
---|---|---|
目的 | 确保消息被 broker 成功接收和存储,解决生产者问题。 | 确保消息被消费者成功处理,解决消费者问题。 |
触发角色 | 由 broker 触发,生产者被动接收确认。 | 由消费者主动触发(发送 ACK/NACK)。 |
机制 | Broker 在消息持久化后发送 confirm 给生产者。 | 消费者调用 basicAck 或 basicNack 通知 broker。 |
解决的核心问题 | 防止消息在发送到 broker 过程中丢失(如网络故障)。 | 防止消息在消费过程中丢失(如消费者崩溃)。 |
配置方式 | 生产者代码中启用(如 channel.confirmSelect() )。 | 消费者代码中设置 auto_ack=False 并手动确认。 |
可靠性影响 | 保证消息从生产者到 broker 的可靠性。 | 保证消息从 broker 到消费者的可靠性。 |
典型使用场景 | 生产者发送关键数据(如订单创建),需确保存储。 | 消费者处理耗时任务(如支付),需确保完成才删除。 |
失败处理 | 生产者收到 nack 后重发消息。 | 消息重新入队(通过 NACK 或超时)。 |
性能影响 | 轻微延迟(等待 broker 确认)。 | 可能增加延迟(消费者需手动确认),但提高可靠性。 |
2.3.1.2.6.1 提取常量
public class Constants {public static final String PUBLISHER_CONFIRMS_QUEUE1 = "publisher.confirms.queue1";public static final String PUBLISHER_CONFIRMS_QUEUE2 = "publisher.confirms.queue2";public static final String PUBLISHER_CONFIRMS_QUEUE3 = "publisher.confirms.queue3";
}
这里主要展示生产者的三种确认机制:
2.3.1.2.6.2 单独确认
public class Producer {public static Integer MESSAGE_COUNT = 1000000;public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {//单独确认机制publishingMessagesIndividually();}private static void publishingMessagesIndividually() throws IOException, TimeoutException, InterruptedException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setVirtualHost(Constants.VIRTUAL_HOST);factory.setUsername(Constants.USER);factory.setPassword(Constants.PASSWORD);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.confirmSelect();channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1,true,false,false,null);long startTime = System.currentTimeMillis();for (int i = 0; i < 100; i++) {String msg = "Message " + i;channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE1,null,msg.getBytes());channel.waitForConfirmsOrDie(5000);}long endTime = System.currentTimeMillis();System.out.println("单独确认最终耗费时间:" + (endTime - startTime) + "ms");}}
代码解析:
channel.confirmSelect();
- 作用:将信道设置为 Publisher Confirms 模式
- 行为:
- 启用后,Broker 会对每条成功接收的消息发送 basic.ack 确认
- 未启用时,消息发送后无法获取投递状态
channel.waitForConfirmsOrDie(5000);
- 作用:同步等待所有未确认消息的投递结果
- 参数:timeout 表示最大等待时间(毫秒)
- 行为:
- 成功:当所有消息被 Broker 确认时正常返回
- 失败:超时或收到 basic 时抛出 IOException
2.3.1.2.6.3 批量确认
public class Producer {public static Integer MESSAGE_COUNT = 1000000;public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {//批量确认publishingMessagesInBatches();}private static void publishingMessagesInBatches() throws IOException, TimeoutException, InterruptedException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setVirtualHost(Constants.VIRTUAL_HOST);factory.setUsername(Constants.USER);factory.setPassword(Constants.PASSWORD);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.confirmSelect();channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2,true,false,false,null);long startTime = System.currentTimeMillis();Integer count = 0;for (int i = 0; i <MESSAGE_COUNT ; i++) {count++;String msg = "Massage" + i;channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE2, null, msg.getBytes());if(count == 200){channel.waitForConfirmsOrDie(5000);count = 0;}}if (count > 0) {channel.waitForConfirmsOrDie(500);}long endTime = System.currentTimeMillis();System.out.println("批量确认最终耗费时间" + (endTime - startTime) + "ms");}
}
代码解析:
与上述单独确认只有一点不同,通过设置 count 来组织一次批量确认多少个 数据。
2.3.1.2.6.4 异步确认
public class Producer {public static Integer MESSAGE_COUNT = 1000000;public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {//异步确认handlingPublisherConfirmsAsynchronously();}private static void handlingPublisherConfirmsAsynchronously() throws IOException, TimeoutException, InterruptedException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setVirtualHost(Constants.VIRTUAL_HOST);factory.setUsername(Constants.USER);factory.setPassword(Constants.PASSWORD);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.confirmSelect();channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3,true,false,false,null);long startTime = System.currentTimeMillis();SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long l, boolean b) throws IOException {if(b){confirmSeqNo.headSet(l+1).clear();}else{confirmSeqNo.remove(l);}}@Overridepublic void handleNack(long l, boolean b) throws IOException {handleAck(l,b);System.out.println("丢失数据" + l);}});for (int i = 0; i < MESSAGE_COUNT; i++) {String msg = "hello publisher confirms"+i;long seqNo = channel.getNextPublishSeqNo();channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE3, null, msg.getBytes());confirmSeqNo.add(seqNo);}while (!confirmSeqNo.isEmpty()){Thread.sleep(10);}long endTime = System.currentTimeMillis();System.out.println("异步确认最终耗时" + (endTime - startTime) + "ms");}}
代码解析:
a)channel.addConfirmListener()
这个方法用于注册监听器,处理 Broker 返回的确认事件 (ACK 或 NACK)
b)事件会经历的过程
-
步骤 1: 获取序列号
事件首先通过 channel.getNextPublishSeqNo () 获取一个唯一的序列号(delivery tag)。这个序列号是一个单调递增的 long 值,用于标识消息。 -
步骤 2: 发送消息
序列号获取后,立即通过 channel.basicPublish () 将消息发送到 RabbitMQ broker。此时,消息进入队列,但 broker 尚未确认。发送操作是同步的,但是确认是异步的。 -
步骤 3: 存储未确认序列号
序列号被存入一个数据结构(如 confirmSeqNo,通常是一个并发集合,例如 ConcurrentNavigableMap<Long, String> 或类似实现)。这个集合用于跟踪所有已发布但未被 broker 确认的序列号。 -
步骤 4: 监听器处理 ACK/NACK
broker 异步处理消息后,会发送 ACK(确认)或 NACK(未确认)回客户端。监听器(如 ConfirmListener)的 handleAck 或 handleNack 方法被触发。- ACK 处理:如果 broker 确认消息成功处理,handleAck 被调用
- NACK处理: 如果broker无法处理消息(如队列满或错误),handleNack 被调用。
-
步骤 5: 状态更新和清理
在 handleAck 或 handleNack 中,开发者需要从 confirmSeqNo 移除已确认的序列号,并执行自定义逻辑(如重发 NACK 的消息)。 -
总结流程顺序:
序列号获取 → 消息发送 → 序列号存入 confirmSeqNo → 监听器异步处理 ACK/NACK → 执行 handleAck/handleNack 方法。这是一个标准的生产者 - 消费者模式,其中 confirmSeqNo 充当了未确认消息的缓冲区。
c)confirmSeqNo 的作用
confirmSeqNo 是一个线程安全地 SortedSet<Long> 集合,使用(Collections.synchronizedSortedSet(new TreeSet<>) 创建),用于存储已经发布但尚未被确认的消息序列号。
d)seqNo 的含义和消息编号
seqNo (序列号):是 RabbitMQ 为每个发布的消息分配的唯一标识符,用于确认机制
<1>在代码中,long seqNo = channel.getNextPublishSeqNo() 用于获取当前的消息序列号。
<2>序列号是单调递增的 Long 类型值,由 RabbitMQ 客户端在启用确认(channel.confirmSelect()) 后自动生成。
e)Tag 与 SeqNo 的区别
<1>Tag (Delivery Tag):这是消费者端的概念,用于表示消息在消费过程中的唯一性
<2>seqNo (序列号):这是发布者端的概念,专门用于发布者确认机制。
f)监听器与 confirmSeqNo 的交互
channel.addConfirmListener() 在 for 循环之前注册,因此它在后台异步运行。
交互过程:当 Broker 处理完消息并返回确认事件 (AKC 或 NACK) 时,监听器的 handleACK 或handleNack 方法被调用,这些方法使用序列号参数操作 confirmSeqNo:
<1> 在 handleAck 中,如果 boolean b 为true,表示确认多个消息,择一处所有小于等于 l+1 的序列号;如果为 false,表示确认单个消息,则移除 l 。
<2> 在 handleNack 中,类似处理,但额外打印“丢失数据”日志。
2.3.1.2.6.5 时间对比
上述基于 MESSAGE_COUNT = 100000 计算的时间,由于单独确认实在是太慢,这里就不做对比了。
2.3.2 Spring AMQP
对于 RabbitMQ 开发,Spring 也提供了封装。Spring AMQP(2.3.2)通过集成Spring生态,大幅简化了消息队列的实现。相比原生AMQP(2.3.1),它减少了50%以上的代码量,提升了可维护性。
2.3.2.1 引入依赖
<!--Spring MVC相关依赖-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope>
</dependency>
<!--RabbitMQ相关依赖-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.3.2.2 代码实现
2.3.2.2.1 配置 Constants
public class Constants {public static final String WORK_QUEUE = "WORK_QUEUE";public static final String FANOUT_QUEUE1 = "Fanout_QUEUE1";public static final String FANOUT_QUEUE2 = "Fanout_QUEUE2";public static final String FANOUT_EXCHANGE = "Fanout_EXCHANGE";public static final String DIRECT_QUEUE1 = "DIRECT_QUEUE1";public static final String DIRECT_QUEUE2 = "DIRECT_QUEUE2";public static final String DIRECT_EXCHANGE = "DIRECT_EXCHANGE";public static final String TOPIC_QUEUE1 = "TOPIC_QUEUE1";public static final String TOPIC_QUEUE2 = "TOPIC_QUEUE2";public static final String TOPIC_EXCHANGE = "TOPIC_EXCHANGE";}
在Java开发中,提取常量到一个专门的类中,能显著提升代码的可维护性、可读性和健壮性。
2.3.2.2.2 配置 Config
@Configuration
public class RabbitMQConfig {//work_queue@Bean("workQueue")public Queue workQueue(){return QueueBuilder.durable(Constants.WORK_QUEUE).build();}//fanout_queue@Bean("fanoutQueue1")public Queue fanoutQueue1(){return QueueBuilder.durable(Constants.FANOUT_QUEUE1).build();}@Bean("fanoutQueue2")public Queue fanoutQueue2(){return QueueBuilder.durable(Constants.FANOUT_QUEUE2).build();}@Bean("fanoutExchange")public FanoutExchange fanoutExchange(){return ExchangeBuilder.fanoutExchange(Constants.FANOUT_EXCHANGE).build();}@Bean("bindingFanoutQueue1")public Binding bindingFanoutQueue1(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,@Qualifier("fanoutQueue1") Queue queue){return BindingBuilder.bind(queue).to(fanoutExchange);}@Bean("bindingFanoutQueue2")public Binding bindingFanoutQueue2(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,@Qualifier("fanoutQueue2") Queue queue){return BindingBuilder.bind(queue).to(fanoutExchange);}// direct_queue@Bean("directQueue1")public Queue directQueue1(){return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build();}@Bean("directQueue2")public Queue directQueue2(){return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build();}@Bean("directExchange")public DirectExchange directExchange(){return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE).build();}@Bean("bindingDirectQueue1")public Binding bindingDirectQueue1(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue1") Queue queue){return BindingBuilder.bind(queue).to(directExchange()).with("a");}@Bean("bindingDirectQueue2")public Binding bindingDirectQueue2(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue2") Queue queue){return BindingBuilder.bind(queue).to(directExchange()).with("b");}@Bean("bindingDirectQueue3")public Binding bindingDirectQueue3(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue2") Queue queue){return BindingBuilder.bind(queue).to(directExchange()).with("c");}@Bean("topicQueue1")public Queue topicQueue1(){return QueueBuilder.durable(Constants.TOPIC_QUEUE1).build();}@Bean("topicQueue2")public Queue topicQueue2(){return QueueBuilder.durable(Constants.TOPIC_QUEUE2).build();}@Bean("topicExchange")public TopicExchange topicExchange(){return ExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE).build();}@Bean("bindingTopicQueue1")public Binding bindingTopicQueue1(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue1") Queue queue){return BindingBuilder.bind(queue).to(topicExchange).with("*.x.*");}@Bean("bindingTopicQueue2")public Binding bindingTopicQueue2(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue2") Queue queue){return BindingBuilder.bind(queue).to(topicExchange).with("*.*.y");}@Bean("bindingTopicQueue3")public Binding bindingTopicQueue3(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue2") Queue queue){return BindingBuilder.bind(queue).to(topicExchange).with("xy'.#");}}
可以通过配置 Config 一次性大量的声明,队列、交换机、绑定关系等,大幅度缩减了频繁创建文件的次数。
2.3.2.2.3 ProducerController
@RestController
@RequestMapping("/producer")
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/work")public String work(){for (int i = 0; i <10 ; i++) {rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE,"hello work queue" + i);}return "发送成功";}@RequestMapping("/fanout")public String fanout(){for (int i = 0; i <10 ; i++) {rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE,"","hello fanout queue" + i);}return "发送成功";}@RequestMapping("/direct/{routingKey}")public String direct(@PathVariable("routingKey") String routingKey){rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE,routingKey,"hello direct this is routingKey " + routingKey);return "发送成功";}@RequestMapping("/topic/{routingKey}")public String topic(@PathVariable("routingKey") String routingKey){rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE,routingKey,"hello topic " + routingKey);return "发送成功";}
}
上方代码是一个基于Spring Boot的RabbitMQ消息生产者控制器(ProducerController),用于向RabbitMQ消息队列发送消息。它实现了四种常见的消息队列模式,通过HTTP接口触发消息发送。在实际应用中,它可以作为微服务架构中的生产者模块,用于解耦系统组件、实现异步处理或事件驱动架构。
通过以上接口,开发者可以轻松触发消息发送,验证 RabbitMQ 的配置和消费者逻辑。
2.3.2.2.4 WorkListener
@Component
public class WorkListener {@RabbitListener( queues = Constants.WORK_QUEUE )public void workListener1(String message) {System.out.println("队列["+Constants.WORK_QUEUE+"] 接收到消息:" + message);}@RabbitListener( queues = Constants.WORK_QUEUE )public void workListener2(String message) {System.out.println("队列["+Constants.WORK_QUEUE+"] 接收到消息:" + message);}
}
上方代码定义了一个 Spring 组建,用于实现 RabbitMQ 消息队列的并行消费功能。
代码详解:@RabbitListener 不仅可用于方法,还可用于类级别。当标注在处理方法时如上图代码所示,当标注在类时,需要搭配 @RabbitHandler 使用,将 @RabbitHandler 标注在类的方法上即可使用。
@RabbitHandler 标注的代码不能有相同类型的参数,程序运行时 会根据不同参数类型选择不同方法。
2.3.2.2.5 FanoutListener
@Component
public class FanoutListener {@RabbitListener(queues = Constants.FANOUT_QUEUE1)public void fanoutListener1(String message){System.out.println("队列["+Constants.FANOUT_QUEUE1+"] 接收到消息:" + message);}@RabbitListener(queues = Constants.FANOUT_QUEUE2)public void fanoutListener2(String message){System.out.println( "队列["+Constants.FANOUT_QUEUE2+"] 接收到消息:" + message);}
}
2.3.2.2.6 DirectListener
@Component
public class DirectListener {@RabbitListener(queues = Constants.DIRECT_QUEUE1)public void queueListener1(String msg) throws InterruptedException {System.out.println("队列["+Constants.DIRECT_QUEUE1+"] 接收到消息:" + msg);}@RabbitListener(queues = Constants.DIRECT_QUEUE2)public void queueListener2(String msg) throws InterruptedException {System.out.println("队列["+Constants.DIRECT_QUEUE2+"] 接收到消息:" + msg);}
}
2.3.2.2.7 TopicsListener
public class TopicsListener {@RabbitListener(queues = Constants.TOPIC_QUEUE1)public void topicListener1(String message){System.out.println( "队列["+Constants.TOPIC_QUEUE1+"] 接收到消息:" + message);}@RabbitListener(queues = Constants.TOPIC_QUEUE2)public void topicListener2(String message){System.out.println( "队列["+Constants.TOPIC_QUEUE2+"] 接收到消息:" + message);}
}
2.3.3 两种实现的对比
特性 | 原生AMQP客户端 | Spring AMQP |
---|---|---|
连接管理 | 手动创建/关闭连接 | 自动连接池管理 |
线程安全 | 需自行处理 | 内置线程安全 |
资源声明 | 显式声明队列/交换机 | 注解自动声明 |
异常处理 | 手动捕获异常 | 统一异常处理机制 |
事务支持 | 手动提交/回滚 | @Transactional 注解 |
序列化 | 手动处理字节流 | 自动消息转换器 |
配置方式 | 硬编码配置 | 配置文件/YAML |
-
核心特点对比:
- 原生AMQP客户端:使用RabbitMQ等消息代理的原生API(如Java AMQP客户端库),需要手动管理连接、通道、队列和交换机。代码更底层、灵活,但实现繁琐(例如,需自行处理异常、资源释放)。适用于需要精细控制消息流或对性能有极致要求的场景。
- Spring AMQP:基于Spring框架的抽象层,提供声明式配置(如注解驱动),简化了消息生产、消费和资源管理。它封装了原生API,减少了样板代码,支持与Spring Boot无缝集成。适用于快速开发、维护性要求高的企业应用。
2.3.4 发送对象
如果通过 RabbitTemplate 发送一个对象作为消息,我们需要对该对象进行序列化。Spring AMQP 推荐使用 JSON 序列化,Spring AMQP 提供了 Jackson2JsonMessageConverter 和 MappingJackson2MessageConverter 等转换器,我们需要把一个 MessageConverter 设置到 RabbitTemplate 中。
JSON 序列化的优点:
- 兼容性:JSON 格式被广泛支持,便于不同语言系统间交互。
- 安全性:相比 Java 原生序列化,JSON 减少了安全风险(如反序列化漏洞)。
- 性能:Jackson 库高效,支持自定义序列化规则。
2.3.4.1 创建和配置 MessageConverter
@Beanpublic MessageConverter jsonMessageConverter() {return new Jackson2JsonMessageConverter();}//ConnectionFactory@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(messageConverter); // 设置 JSON 转换器return rabbitTemplate;}
代码详解:
@Bean
public MessageConverter jsonMessageConverter() {return new Jackson2JsonMessageConverter();
}
- 作用:创建 JSON 格式的消息转换器
- 实现类 :Jackson2JsonMessageConverter (基于 Jackson 库)
- 核心功能:
- 将 Java 对象自动序列化为 JSON 格式发送
- 将接收的 JSON 消息自动反序列化为 Java 对象
@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(messageConverter); // 设置 JSON 转换器return rabbitTemplate;}
参数解析:
a)ConnectionFactory connectionFactory
- 类型:Spring 自动注入的 Bean
- 作用:管理 RabbitMQ 服务器的物理连接
- 包含配置:
- 主机地址(host)
- 端口号(port)
- 虚拟主机(virtual host)
- 用户名/密码
- 默认来源:Spring Boot 的 spring.rabbitmq.* 配置项
b)MessageConverter massagConverter
- 类型:依赖注入的 Bean
- 作用:指定消息的序列化/反序列化方式
- 此处来源:前文定义的 jsonMessageConverter()
- 替代方案:默认使用 SimpleMessageConverter (字节数组格式)
方法实现:
a) new RabbitTemplate(connectionFactory)
创建 RabbitTemplate 实例
建立与 RabbitMQ 服务器的连接通道
b)setMessageConverter(messageConverter)
关键配置:将 JSON 转换器绑定到模板
c)return rabbitTemplate
返回配置完成的模板 Bean
可在其他组件通过 @Autowrite 注入使用,替换默认的 rabbitTemplate。
2.4 RabbitMQ 高级特性
2.4.1 消息确认
2.4.1.1 消息确认介绍
消息确认是消息传递系统(如消息队列)中的一种机制,用于确保消息被消费者正确处理,防止消息丢失或重复处理。它通过让消费者向生产者或代理发送确认信号来实现。本部分将详细介绍消息确认的概念、类型和实现方法。
2.4.1.1.1 自动确认
消息代理在将消息传递给消费者后立即自动确认,无需消费者干预。这种方式简单高效,但风险较高:如果消费者处理消息时崩溃,消息可能丢失,因为没有重试机制。
2.4.1.1.2 手动确认
消费者在处理消息后,需要显式发送确认信号给消息代理(如RabbitMQ)。这提供了更高的控制性,确保消息只有在成功处理后才被标记为完成。如果处理失败,消费者可以选择重新入队或丢弃消息。
2.4.1.2 手动确认方法
手动确认的实现通常涉及以下步骤,以消息队列系统(如RabbitMQ)为例
a)消费者订阅消息:消费者连接到队列,并声明需要手动确认模式。
b)处理消息:消费者接收消息并执行业务逻辑(如数据处理或存储)。
c)发送确认信号:如果处理成功,消费者发送一个确认(ACK)信号给代理;如果失败,发送否定确认(NACK)信号,让消息重新入队或丢弃。
d)代理响应:代理收到ACK后,从队列中删除消息;收到NACK后,根据配置重试或移至死信队列。
2.4.1.2.1 肯定确认
Channel.basicAck(long deliveryTag,boolean multiple)
参数说明:
a)deliveryTag:消息的唯一标识,是一个单调递增的 64 为长整型。deliveryTag 是每个 Channel 独立维护的,在每个通道上都是唯一的。
b)multiple:是否批量确认,在某些情况下为了减少网络流量,可以对一系列连续的 deliveryTag 进行批量确认。multiple 为 true 则一次性 ack 所有小于等于 deliveryTag 的消息,如果为 false 则只针对当前的 deliveryTag 的消息。
2.4.1.2.2 否定确认
RabbitMQ 在2.0.0 版本开始引入 Channel.basicReject 命令,消费者客户端可以调用这个方法来告诉 RabbitMQ 拒绝接收这个消息。
a)Channel.basicReject(long deliveryTag,boolean multiple)
b)Channel.basicNack(long deliveryTag,boolean multiple, boolean requeue)
参数说明:
requeue :如果为 true 则表示重新入队,false 则表示不重新入队。
Basic.Reject 只能一次拒绝一条消息,如果想要批量拒绝消息,可以使用 Basic.Nack 命令,消费者客户端可以一次性拒绝多条消息。
2.4.1.2.3 消费者确认机制
在AMQP协议(如RabbitMQ)中,Basic.Ack 、Basic.Nack 、Basic.Reject是协议层定义的消息确认机制,而Channel.basicAck、Channel.basicNack、Channel.basicReject是 Java 客户端API的实现。它们的对应关系是协议规范与代码实现的映射,均由消费者端调用,用于向Broker确认消息处理状态。这三种策略主要针对消费者端的消息确认:AUTO、MANUAL、NONE。
- 消费者端确认:由消费者调用,用于向Broker报告消息处理成功或失败(例如,Basic.Ack表示处理成功,Basic.Nack 表示处理失败)。
针对于 RabbitMQ 的 Java 客户端上实现生产者确认:
- 生产端确认:由生产端调用,用于向Broker确认消息是否成功持久化或路由(例如,Broker返回Ack 表示消息已接收,Nack 表示消息丢失或失败)
- 在代码中,通过 channel.confirmSelect() 启用发布者确认模式,然后使用 channel.addConfirmListener 从 SortedSet 数据类型的集合中,调用 handleAck 或 handleNack 异步处理Broker的响应。
2.4.1.3 代码实现
下方基于 SpringBoot 来演示消息的确认机制,Spring-AMQP 对 RabbitMQ 提供的库进行了进一步的封装,对消息确认机制提供了三种策略:
public enum AcknowledgeMode { NONE,MANUAL,AUTO;
}
这个枚举定义了三种消息确认模式:
- NONE:表示不进行任何确认。消息发送后,系统不会等待接收方的响应,适用于对可靠性要求不高的场景(如日志记录),但可能导致消息丢失。
- MANUAL:表示手动确认。接收方需要显式调用确认操作(例如,通过API调用),消息才会被视为处理完成。这提供了最高控制度,但增加了开发复杂度。
- AUTO:表示自动确认。系统在消息被接收后立即自动确认,无需用户干预。这平衡了可靠性和效率,适用于大多数标准场景
方面 | AUTO模式 | NONE模式 |
---|---|---|
确认时机 | 消息被消费端接收后立即由代理自动确认。 | 没有自动确认;消费端不发送确认,消息保持未确认状态。 |
消息处理风险 | 高丢失风险:如果消费端处理失败,消息无法恢复。 | 低丢失风险:消息会重新投递或保留,但可能导致重复处理。 |
性能影响 | 高性能:消息快速移除,减少队列压力。 | 低性能:消息滞留队列,可能引起积压或延迟。 |
适用场景 | 允许消息丢失的场景,如监控数据、实时流处理。 | 需要高可靠性的场景,如事务处理、关键业务逻辑。 |
实现本质 | 代理主动处理确认,消费端被动接收。 | 消费端“不行动”,代理被动等待或超时处理。 |
消息状态详细对比:
模式 | 初始状态 | 状态变化时机 | 最终状态 | 执行是否影响状态 | 主要风险 |
---|---|---|---|---|---|
NONE模式 | 未确认 | 无变化(消费端不确认) | 始终未确认 | 否(broker不关注执行) | 低丢失风险,但可能导致重复处理或延迟 |
AUTO模式 | 未确认 | 消费端接收后立即确认 | 已确认 | 否(broker不检查执行结果) | 高丢失风险(处理失败时消息无法恢复) |
未确认指的是 Broker 中的 Unacked 状态
2.4.1.3.1 配置确认机制
在 application.yml 中配置相关属性
rabbitmq:listener:simple:acknowledge-mode: auto
2.4.1.3.2 Controller
@RestController
@RequestMapping("/ack")
public class ProducerController {@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/auto")public String auto() {for (int i = 0; i < 1; i++) {rabbitTemplate.convertAndSend(Constants.AKC_EXCHANGE_AUTO,"abc","auto 发送消息:" + i);}return "auto success";}@RequestMapping("/none")public String none() {for (int i = 0; i < 1; i++) {rabbitTemplate.convertAndSend(Constants.AKC_EXCHANGE_NONE,"abc","none 发送消息:" + i);}return "none success";}@RequestMapping("/manual")public String manual() {for (int i = 0; i < 1; i++) {rabbitTemplate.convertAndSend(Constants.AKC_EXCHANGE_MANUAL,"abc","manual 发送消息:" + i);}return "manual success";}
2.4.1.3.3 NoneListener
@Component
public class NoneListener {@RabbitListener(queues = Constants.AKC_QUEUE_NONE)public void listen(Message message, Channel channel) throws Exception {System.out.println("接收到消息: "+ new String(message.getBody(),"UTF-8")+"deliveryTag: " + message.getMessageProperties().getDeliveryTag() );System.out.println("auto 业务逻辑处理完毕");}
}
2.4.1.3.4 AutoListener
@Component
public class AutoListener {@RabbitListener(queues = Constants.AKC_QUEUE_AUTO)public void listen(Message message, Channel channel) throws Exception {System.out.println("接收到消息: "+ new String(message.getBody(),"UTF-8")+"deliveryTag: " + message.getMessageProperties().getDeliveryTag() );System.out.println("auto 业务逻辑处理完毕");}
}
消费端不确认收到消息时会自动重复消息入队
2.4.1.3.5 ManualListener
@Component
public class ManualListener {@RabbitListener(queues = Constants.AKC_QUEUE_MANUAL)public void listen(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try{System.out.println("接收到消息: "+ new String(message.getBody(),"UTF-8")+"deliveryTag: " + message.getMessageProperties().getDeliveryTag() );System.out.println("auto 业务逻辑处理完毕");channel.basicAck(deliveryTag,false);}catch(Exception e){// tag 是否批量 是否确认channel.basicNack(deliveryTag,false,true);}}
}
消费端必须掉用 channel.basicAck 来确认收到消息,当出现异常没有确认收到消息时,可以手动调整 Nack 参数来决定是否让消息重新入队。
2.4.2 持久性
在 RabbitMQ 中,持久性是确保消息在服务意外停止或重启后不丢失的关键机制。它通过交换器、队列和消息三部分的持久化来实现。
2.4.2.1 交换机持久化
交换器持久化:
在 Spring AMQP 中,交换器是消息路由的入口。如果交换器不持久化,RabbitMQ服务重启后,其元数据(如名称、类型)会丢失,导致消息无法被正确路由。实现时,在声明交换器时将 durable 参数设为$true$。
Exchange exchange = ExchangeBuilder.topicExchange("myExchange").durable(true).build();
2.4.2.2 队列持久化
队列持久化:
在 Spring AMQP 中,队列是消息存储的容器。如果队列不持久化,服务重启后队列会被删除,所有消息也会丢失。实现时,在声明队列时将 durable 参数设为$true$。也就是在声明队列时,使用 .durable() 来使队列持久化。
Queue queue = QueueBuilder.durable("myQueue").build(); // 默认durable=true
2.4.2.3 消息持久化
消息持久化:
消息本身需要显式设置为持久化,否则即使队列持久化,消息也可能在重启后丢失。
2.4.2.3.1 RabbitMQ 客户端实现
在 RabbitMQ 客户端中,可以通过设置 MessageProperties.PERSISTENT_TEXT_PLAIN ,将这个参数传入 channel.basicPublish() 中 完成消息持久化,当然 队列持久化是消息持久化的前提。
String messageContent = "This is a persistent message";// 2. 设置消息属性为持久化channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,messageContent.getBytes());
代码详解:
MessageProperties.PERSISTENT_TEXT_PLAIN 是库 com.rabbitmq.client.MessageProperties 类中的一个静态常量,不需要用户手动编写代码,用于设置消息为持久化模式。
MessageProperties.PERSISTENT_TEXT_PLAIN的定义:
public static final BasicProperties PERSISTENT_TEXT_PLAIN =new BasicProperties("text/plain",null,null,2, //deliveryMode0, null, null, null,null, null, null, null,null, null);
2.4.2.3.2 RabbitTemplate 实现
如果使用 RabbitTemplate 发送持久化消息,代码如下:
public class test {public static void main(String[] args) {String message = "This is a persistent message";Message messageObject = new Message(message.getBytes(), new MessageProperties());messageObject.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack", messageObject);}
}
代码详解:
a)Message:Message 是 org.springframework.amqp.core.Message 类
Message 是 org.springframework.amqp.core.Message 类是Spring AMQP框架中的核心类,用于封装一条AMQP消息。它表示发送或接收的消息实体,包含消息体(payload)和消息属性(如头部信息、路由键等)。主要作用包括:MessageProperties
- 存储消息的原始字节数据(body)。
- 提供访问和修改消息元数据的接口(通过 MessageProperties)。
- 在RabbitMQ客户端和服务器之间传输消息时,确保数据结构的标准化。
参数解析:
message.getBytes,将字符串消息转换为字节数组,作为消息的实际内容(body),以便RabbitMQ处理。消息体必须是字节数组,因为AMQP协议支持二进制数据传输。;
new MessageProperties(),初始化消息的属性对象,用于设置消息的元数据(如头部、优先级、持久化模式等)。默认情况下, new MessageProperties() 创建一个空属性对象,后续可以通过 getMessageProperties() 方法修改信息属性 (如是否持久化等.....)。
b)setDeliveryMode:方法用于设置消息的传递模式(delivery mode)
主要作用包括:
- 控制消息在 RabbitMQ 中的存储方式:设置为 PERSISTENT 时,消息会被写入磁盘,确保即使 RabbitMQ 服务器重启或崩溃,消息也不会丢失(持久化)。这提高了消息的可靠性,适用于关键业务场景。
- 如果未设置或设置为 NON_PERSISTENT,消息仅存储在内存中,服务器重启后消息会丢失(非持久化),但性能更高。
参数解析:
MessageDeliveryMode,是 MessageDelivery 枚举类 (来自 org.springframework.amqp.core.MessageDeliveryMode)。主要用来指定传递模式,可选值有两个:
MessageDeliveryMode.PERSISTENT 消息持久化,消息保存到磁盘。
MessageDeliveryMode.NON_PERSISTENT 非持久化,消息仅保存在内存中。
2.4.3 发送方确认
在使用RabbitMQ时,消息持久化可以防止服务器崩溃导致的消息丢失,但如果消息在传输过程中丢失(例如RabbitMQ重启期间生产者投递失败),消息根本未到达服务器,持久化也无法解决。RabbitMQ提供了confirm机制(发送方确认)来确保生产者知道消息是否成功到达Exchange。相比事务机制,confirm机制性能更高,是实际工作中的首选方案。
2.4.3.1 Confirm 机制概述
Confirm 机制允许生产者设置一个回调监听 (ConfirmCallback)。无论消息是否到达 Exchange,RabbitMQ 都会触发回调:
- 如果消息成功到达Exchange,回调返回 ack = true。
- 如果消息未到达Exchange(例如网络故障或Exchange不存在),回调返回 ack = false,并提供失败原因(cause)。
- 该机制仅确认消息是否到达 Exchange,不保证消息被 Queue 处理(后续需结合return退回模式处理Queue级错误)
以下步骤基于Spring Boot环境,使用Spring AMQP库。整个过程分为配置RabbitMQ、设置回调逻辑、发送消息和测试。
2.4.3.2 配置 Confirm 机制
listener:simple:acknowledge-mode: manualpublisher-confirm-type: correlated
2.4.3.3 Confirm 代码实现
逻辑上需要先创建 RabbitTemplate Bean,让新创建的 RabbitTemplate 实现 ConfirmCallback 接口,使其替代默认的 RabbitTemplate。
然后需要重写 ConfirmCallback 接口内的 confirm 方法,以此来实现 Confirm 机制。
最后完善 confirm 内的逻辑,如果为 ack 即 Exchange 收到消息,完善相应逻辑;如果为 false 可以打印失败 cause 来完善业务逻辑。cause 是 confirm 方法的原生参数,开发者可直接在方法实现中访问该参数获取消息发送失败的具体原因,无需额外操作。
具体代码如下:
@RequestMapping("/confirm")public String confirm() {rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println("Confirm 生产端发送成功" );if(b){System.out.printf("Exchange 接收到消息 ,消息ID: %s \n",correlationData == null? null : correlationData.getId());}else {System.out.printf("Exchange 未接收到消息,消息ID:%s,cause:%s\n",correlationData == null? null : correlationData.getId(),s);}}});CorrelationData correlationData = new CorrelationData("1");rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE+1,"abc","confirm.test",correlationData);return "confirm success";}
如上图代码所述,RabbitTemplate 是单利对象,所以存在两个问题。
a)在 confirm 中设置 RabbitTemplate 会影响所有使用 RabbitTemplate 的方法
b)重复调用接口会提示错误
可以直接创建一个新的 RabbitTemplate 类,但是需要创建一个原本的 rabbitTemplate 给其他方法调用。修改完的 RabbitTemplate 代码如下:
@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {return new RabbitTemplate(connectionFactory);}@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println("Confirm 生产端发送成功" );if(b){System.out.printf("Exchange 接收到消息 ,消息ID: %s \n" , correlationData == null? null : correlationData.getId());}else {System.out.printf("Exchange 未接收到消息,消息ID:%s , cause: %s\n",correlationData == null? null : correlationData.getId(),s);}}});return rabbitTemplate;}
}
ProducerController代码如下:
@RestController
@RequestMapping("/ack")
public class ProducerController {@AutowiredRabbitTemplate rabbitTemplate;@AutowiredRabbitTemplate confirmRabbitTemplate;@RequestMapping("/manual")public String manual() {for (int i = 0; i < 1; i++) {rabbitTemplate.convertAndSend(Constants.AKC_EXCHANGE_MANUAL,"abc","manual 发送消息:" + i);}return "manual success";}@RequestMapping("/confirm")public String confirm() {CorrelationData correlationData = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE,"abc","confirm.test",correlationData);return "confirm success";}
}
代码运行结果如下:
2.4.3.4 Return 机制概述
在RabbitMQ中,当消息从Exchange路由到Queue时,如果消息无法被任何队列消费(例如路由键不匹配或队列不存在),可以配置系统将消息退回给发送者。这种机制称为“消息退回模式”,它通过设置回调函数来处理退回的消息。以下是详细的实现步骤,帮助您逐步理解和应用该机制。所有解释基于RabbitMQ的标准行为,确保真实可靠。
2.4.3.5 配置 Return 机制
spring:rabbitmq:listener:simple:acknowledge-mode: manual # 启用消息接收手动确认publisher-confirm-type: correlated # 启用消息发送确认,支持退回模式
2.4.3.6 Return 代码实现
逻辑上需要使用 rabbitTemplate.setReturnsCallback() 方法,对 returnedMessage() 进行重写达到业务逻辑。 我们可以把 return 机制和 confirm 机制搭配使用。
修改完的 RabbitTemplate 如下:
@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println("Confirm 生产端发送成功" );if(b){System.out.printf("Exchange 接收到消息 ,消息ID: %s \n" , correlationData == null? null : correlationData.getId());}else {System.out.printf("Exchange 未接收到消息,消息ID:%s , cause: %s\n",correlationData == null? null : correlationData.getId(),s);}}});rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {System.out.printf("消息被退回:%s",returnedMessage);}});return rabbitTemplate;}
ProducerController 如下所示:
@RequestMapping("/confirm")public String confirm() {CorrelationData correlationData = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE,"abc","confirm.test",correlationData);return "confirm success";}@RequestMapping("/confirmFalse")public String confirmFalse() {CorrelationData correlationData = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE,"abc11","confirm.test",correlationData);return "confirm success";}
代码结果如下:
ReturnMessage 包含以下属性:
public class ReturnedMessage {//返回的消息对象,包含了消息体和消息属性 private final Message message;//由Broker提供的回复码, 表⽰消息⽆法路由的原因. 通常是⼀个数字代码,每个数字代表不同的含义.private final int replyCode;//⼀个⽂本字符串, 提供了⽆法路由消息的额外信息或错误描述. private final String replyText;//消息被发送到的交换机名称 private final String exchange;//消息的路由键,即发送消息时指定的键 private final String routingKey;
}
2.4.4 重试机制
2.4.4.1 重试机制详解
RabbitMQ的重试机制是处理消息传递失败的关键策略,用于应对网络故障、服务不可用或资源不足等问题。它能自动或手动地将处理失败的消息重新入队,提高系统可靠性。但需注意,如果失败由程序逻辑错误(如代码缺陷)引起,重试可能无效,因此需设置合理的重试次数。
2.4.4.2 重试核心概念
RabbitMQ 的重试机制基于消息确认模式:
- 自动确认模式(Acknowledge-Mode: Auto):RabbitMQ在消息投递给消费者后自动确认。如果消费者处理失败(如抛出异常),RabbitMQ会根据配置参数自动重试消息。例如,设置重试次数 n,RabbitMQ会负责重发消息 n 次。
- 手动确认模式(Acknowledge-Mode: Manual):消费者需显式调用确认方法。如果处理失败,应用程序可选择是否重试(通过设置 requeue 参数)。这给予应用更多控制权,但重试逻辑需由开发者实现。
关键区别:
- 自动模式:重试由RabbitMQ内部管理,适合简单场景。
- 手动模式:重试由应用程序逻辑控制,适合需要自定义重试策略的复杂系统。
2.4.4.3 重试机制配置
rabbitmq:listener:simple:acknowledge-mode: auto/manualretry:enabled: true # 开启重试机制initial-interval: 5000ms # 初始重试间隔,例如5秒max-attempts: 5 # 最大重试次数(包括首次消
2.4.4.4 自动确认重试模式
rabbitmq:listener:simple:acknowledge-mode: autoretry:enabled: true # 开启重试机制initial-interval: 5000ms # 初始重试间隔,例如5秒max-attempts: 5 # 最大重试次数(包括首次消费)
没开启重试机制之前,RabbitMQ 能自动的将处理失败的消息一直重新入队也不会抛出异常。
重试机制开启之后,会根据设置的次数重新入队,当设置的次数耗尽也没有解决问题时,就会抛出异常。
代码如下:
@Component
public class RetryListener {@RabbitListener( queues = Constants.RETRY_QUEUE )public String retryAuto(Message message) {String msg = new String(message.getBody());long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("消息:%s , coorelationId = %s\n", msg, deliveryTag);int i = 1/0;return "returnAuto 消费端执行完毕";}
}
@RequestMapping("/retryAuto")public String retryAuto() {CorrelationData correlationData = new CorrelationData("1");rabbitTemplate.convertAndSend(Constants.RETRY_EXCHANGE,"abc","retry auto",correlationData);return "retryAuto success";}
运行结果如下:
2.4.4.5 手动确认重试模式
rabbitmq:listener:simple:acknowledge-mode: manualretry:enabled: true # 开启重试机制initial-interval: 5000ms # 初始重试间隔,例如5秒max-attempts: 5 # 最大重试次数(包括首次消
手动确认时,重试机制不生效,只有在自动确认时才会生效,这里就不做进一步的演示了。
2.4.5 TTL (Time to Live)
TTL(Time to Live,过期时间)是RabbitMQ中用于控制消息或队列生命周期的机制。当消息或队列超过设定的存活时间后,未被消费的消息会被自动清除。这适用于电商订单超时取消(如24小时未付款自动取消订单)或退款超时处理(如7天未处理自动退款)等场景。
2.4.5.1 TTL 的基本概念
- TTL定义:过期时间,单位为毫秒(ms)。消息或队列在超过TTL后,如果未被消费,RabbitMQ会自动删除它们。
- 应用场景:避免消息堆积,实现超时业务逻辑(如订单自动取消)。
- 关键规则:
- 如果同时设置消息TTL和队列TTL,RabbitMQ取两者中较小的值作为实际TTL。
- TTL为 0 表示消息必须立即投递给消费者,否则被丢弃;未设置TTL表示消息永不过期。
2.4.5.2 设置消息的 TTL
先创建 MessagePostProcessor 类对象,重写 postProcessMessage 方法,在重写的方法里通过 message.getMessageProperties().setExpiration("10000") 设置过期时间,单位毫秒。最后把该 MessagePostProcessor 类对象作为参数传输到 rabbitTemplate.convertAndSend。
@RequestMapping("/ttlMessage")public String ttlMessage(){System.out.println("ttl......");MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("10000");return message;}};System.out.println("ttl2 .....");rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE,"abc","ttl",messagePostProcessor);return "ttlMessage success";}
2.4.5.3 设置队列的 TTL
设置队列 TTL 步骤简单,只需要在声明队列时加上 .ttl() 就行。
@Bean("ttlQueue")public Queue ttlQueue(){return QueueBuilder.durable(Constants.TTL_QUEUE).ttl(10000).build();}
2.4.5.4 最短TTL共享的原理
消息队列会扫描当前队列中所有消息的TTL值,并选择最小的TTL作为队列的全局TTL。这意味着所有消息的过期时间会受到最短TTL的约束。
假设队列中有以下消息:
- 消息A:TTL=5秒
- 消息B:TTL=10秒
- 消息C:TTL=3秒
系统会将队列的全局TTL设置为3秒(最短的TTL)。此时,所有消息将在3秒后过期,无论其原始TTL是多少。
当队列不是TTL队列时,会对TTL消息按顺序进行消费,也就是TTL消息的时间总和。在非TTL队列中,TTL消息按FIFO顺序消费,但需警惕因顺序阻塞导致的过期风险。
2.4.6 死信
死信(Dead Message)是指因某些原因无法被消费的消息。当消息在一个队列中变成死信后,会被重新发送到另一个交换器(DLX,Dead Letter Exchange),绑定该交换器的队列称为死信队列(DLQ,Dead Letter Queue)。消息变成死信的常见情况包括:
- 消息被拒绝(Basic.Reject/Basic.Nack)且 requeue 参数为 false。
- 消息过期。
- 队列达到最大长度。通过 .maxLength
当声明队列时,可以在队列加上 .deadLetterExchange() 参数绑定死信交换机,接着加上 .deadLetterRoutingKey() 参数指定路由给 死信交换机 所绑定的死信队列。
2.4.7 延迟队列
2.4.7.1 延迟队列概念
延迟队列(Delayed Queue)是一种消息传递机制,消息在发送后不会立即被消费者获取,而是等待特定时间后才可被消费。这在许多场景中非常有用,例如:
- 智能家居:用户指令在指定时间后执行。
- 日常管理:会议前15分钟自动提醒。
- 用户留存:注册7天后发送短信。
RabbitMQ本身不直接支持延迟队列,但可以通过TTL(Time To Live)和死信队列(Dead Letter Exchange)组合模拟实现。然而,这种方式存在局限性,尤其在处理不同延迟时间的消息时。
如下图所示就是最简单的延迟队列模型:
TTL机制允许设置消息的存活时间(单位:毫秒)。当消息过期时,它会被路由到死信队列(DLX),消费者从死信队列消费消息,实现延迟效果。关键步骤包括:
- 声明一个正常队列(normal_queue),并绑定到死信交换器(DLX)。
- 生产者发送消息时,设置消息的TTL(例如10秒或20秒)。
- 消费者监听死信队列(dlx_queue),消费过期消息。
2.4.7.2 代码实现
声明相关对象:
@Bean("normalQueue")public Queue normalQueue(){return QueueBuilder.durable(Constants.NORMAL_QUEUE).deadLetterExchange(Constants.DL_EXCHANGE).deadLetterRoutingKey("dlx").ttl(40000).build();}@Bean("normalExchange")public DirectExchange normalExchange(){return ExchangeBuilder.directExchange(Constants.NORMAL_EXCHANGE).build();}@Bean("normalBinding")public Binding normalBinding(@Qualifier("normalQueue") Queue normalQueue, @Qualifier("normalExchange") Exchange normalExchange){return BindingBuilder.bind(normalQueue).to(normalExchange).with("normal").noargs();}@Bean("dlQueue")public Queue dlQueue(){return QueueBuilder.durable(Constants.DL_QUEUE).build();}@Bean("dlExchange")public DirectExchange dlExchange(){return ExchangeBuilder.directExchange(Constants.DL_EXCHANGE).build();}@Bean("dlBinding")public Binding dlBinding(@Qualifier("dlQueue") Queue dlQueue, @Qualifier("dlExchange") DirectExchange dlExchange){return BindingBuilder.bind(dlQueue).to(dlExchange).with("dlx");}
调用客户端:
@RequestMapping("/dl")public String dl() {rabbitTemplate.convertAndSend(Constants.NORMAL_EXCHANGE,"normal","dl");return "dl success";}
2.4.7.3 延迟队列问题
当队列中包含不同TTL的消息时,RabbitMQ只检查队首消息的过期时间。如果队首消息的TTL较长,后续短TTL消息会被阻塞,直到队首消息过期。
情况一 :
- 先发送一条TTL为20秒的消息,再发送一条TTL为10秒的消息。
- 理想情况下,10秒消息应更早过期进入死信队列。
- 实际结果:RabbitMQ先处理队首(20秒消息),导致10秒消息在20秒后才被消费
情况二:
- 先发送一条TTL为10秒的消息,再发送一条TTL为20秒的消息。
- 理想情况下,10秒消息应更早过期(约 t=10 秒),20秒稍后过期(约 t=20 秒)
- 10秒消息在 t=10 秒被消费,20秒消息在 t=20 秒被消费,间隔约10秒。
原因在于RabbitMQ的队列设计:
- 队列检查原则:RabbitMQ只监控队列头部(第一个消息)的过期时间。只有当头部消息过期或被消费后,才会检查下一个消息。
- TTL计算起点:消息的TTL是从它被发布到队列时开始计算的(绝对时间),但RabbitMQ只在处理头部时“发现”过期。
- 潜在问题:如果头部消息的TTL较长,它会阻塞后续消息的过期检查,即使后续消息的TTL更短。这可能导致短TTL消息延迟过期。
2.4.7.4 问题解决方案
解决方案一:为不同延迟时间创建独立队列
要解决此问题,核心思路是避免混合不同TTL的消息在同一个队列中。RabbitMQ官方推荐为每个延迟时间创建独立的队列。这样,每个队列只处理单一TTL,确保过期检查的准确性。
a)设计多个队列:为每个需支持的延迟时间(如10秒、20秒)创建独立的正常队列。
- 每个队列绑定到同一个死信交换器。
- 设置队列的.deadLetterExchange() 参数 和 .deadLetterRoutingKey()
b)生产者发送消息:根据消息的延迟需求,发送到对应的队列。
c)消费者统一监听死信队列:死信队列接收所有过期消息,消费者无需修改。
解决方案二:添加插件
通过下载插件地址如下: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
添加完插件之后再创建延迟队列,只需要声明延迟交换机时添加 .delayed() 方法,设置消息时使用 message.setDelayLong() 指定延迟时间,无需额外设置过期时间或创建死信队列来完成延迟队列的功能。
延迟队列:消息到达延迟时间的时候才会到达延迟队列
TTL + 死信:通过指定一个队列的过期时间,当过期时间到达时转移到另一个死信队列,从而到达延迟队列的功能。
2.4.8 事务
2.4.8.1 事务介绍
RabbitMQ基于AMQP协议实现了事务机制,确保消息的发送和接收具有原子性(即全部成功或全部失败),Spring AMQP通过事务管理器简化了配置。
在Spring中,需配置 RabbitTransactionManager 和 RabbitTemplate 来启用事务。以下是代码解析:
- RabbitTransactionManager:管理RabbitMQ连接的事务。
- RabbitTemplate :设置 channelTransacted(true) ,表示使用事务通道发送消息。
- 在方法上加上 @Transactional 来启用事务管理
代码如下:
@Beanpublic RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);// 以后把这个注入进去rabbitTemplate.setChannelTransacted(true);return rabbitTemplate;}@Beanpublic RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);}@Transactional@RequestMapping("/transAction")public String transAction() {transRabbitTemplate.convertAndSend(Constants.TEST_EXCHANGE,"test","trans");transRabbitTemplate.convertAndSend(Constants.TEST_EXCHANGE,"test","trans");return "transAction success";}
三者缺一不可,缺少一个 事物 的特性都不会实现。发布确认模式和事务模式相冲突,只能同时开启一个。
2.4.9 消息分发
2.4.9.1 消息分发概念
RabbitMQ的消息分发机制在多个消费者场景下能有效扩展系统吞吐量,但默认的轮询策略可能导致负载不均衡。
消息分发的基本概念与问题
- 默认轮询分发:当队列有多个消费者时,RabbitMQ默认采用轮询策略分发消息。每条消息只发给一个消费者,但分发顺序固定,不考虑消费者处理速度。例如:
- 消费者A处理速度快(每秒10条消息)。
- 消费者B处理速度慢(每秒2条消息)。
- 结果:消费者B可能积压消息,消费者A空闲,整体吞吐量下降。
- 核心问题:轮询分发无法适应消费者处理能力差异,导致资源浪费和系统瓶颈。
2.4.9.2 解决方案
RabbitMQ提供 channel.basicQos(prefetchCount) 方法来解决负载不均衡问题。其工作原理类似于TCP/IP的“滑动窗口”机制:
- 参数说明:
- prefetchCount:设置消费者允许的最大未确认消息数量(整数)。例如,(prefetchCount) = 5 表示每个消费者最多持有5条未处理消息。
- 计数机制:RabbitMQ发送消息时计数+1,消费者确认消息后计数-1。当计数达 prefetchCount 上限时,RabbitMQ暂停向该消费者发送新消息。
- 特殊值 prefetchCount = 0 表示无上限(恢复默认轮询)。
2.4.9.3 应用场景
2.4.9.3.1 限流
- 场景描述:系统需处理突发流量,防止下游服务过载。例如:
- 订单系统最大处理能力为5000请求/秒。
- 秒杀活动时,消息队列涌入10000请求/秒。
- 未限流时:所有请求直接压垮订单系统。
- 限流后:通过设置 prefetchCount 控制消费者拉取速率,只允许处理能力范围内的请求进入。
只在手动确认的时候才生效,自动确认时不生效。
2.4.9.3.2 非公平分发 (负载均衡)
- 场景描述:优先将消息分发给处理能力强的消费者,避免慢消费者拖累整体性能。
- 实现原理:
- 设置 prefetchCount = 1:每个消费者一次只处理一条消息。
- 结果:快速消费者完成处理后立即获取新消息,慢消费者不会阻塞队列。
- 优势:在高并发场景下显著提升效率,减少平均响应时间。
2.4.9.3.3 Spring AMQP 代码实现
通过设置 application.yml 中的参数来设置 prefetchCount 的大小
spring:rabbitmq:listener:simple:acknowledge-mode: manual # 手动确认模式prefetch: 5 # 设置 prefetchCount = 5
2.4.10 幂等性保障
幂等性是计算机科学中的关键概念,尤其在分布式系统和消息队列中至关重要。它确保操作被多次执行时,对系统状态的影响保持一致,从而避免重复处理导致的数据不一致。
2.4.10.1 幂等性的定义与资源消耗问题
幂等性(Idempotency)是指一个操作无论被执行一次还是多次,其最终对系统状态的影响都相同。数学上,如果一个函数 f 是幂等的,则满足: f(f(x)) = f(x)。例如,绝对值函数 |x| 是幂等的,因为 ||x|| = |x| 。 这意味着:
- 重复调用只对系统进行一次资源消耗:理想情况下,幂等操作在第一次执行时消耗资源(如数据库写入、网络请求),后续重复调用不会改变系统状态,因此资源消耗通常为零或极少(例如,仅返回缓存结果)。
- 资源消耗不一定相同:第一次调用可能涉及完整业务逻辑(如扣款),而后续调用可能快速返回成功响应(不执行实际操作)。因此,幂等性保证状态一致性,但不保证每次调用的资源开销相同。
- 数据库的 select 操作是幂等的,因为它不改变资源状态(即使查询结果可能因其他操作而变化)。
- 相反,i++ 操作是非幂等的,因为多次调用会导致 i 的值递增。
在幂等操作中:
- 只有第一次调用会改变系统内存储数据的资源(如数据库写入、文件更新)。
- 后续重复调用可能消耗其他资源,如计算(CPU用于状态检查)、网络(响应传输)或缓存(内存读取),但这些资源消耗不改变持久化存储的数据状态
2.4.10.2 MQ的幂等性
RabbitMQ的传输保障模式:
- 最多一次(at-most-once):消息发送后,如果处理失败(如消费者崩溃、Broker 传输给消费者失败),消息可能丢失,但不会重复。这适合对消息丢失不敏感的场景,比如日志记录。
- 最少一次(at-least-once):消息保证至少被处理一次,不会丢失。但如果处理失败,RabbitMQ会重试发送消息,这可能导致消息被多次处理(即重复)。这是RabbitMQ的默认模式。比如,通过发布者确认、消费者确认、Broker 持久化等来保证 “最少一次” 的功能实现。
- 恰好一次(exactly-once):RabbitMQ无法原生支持,因为分布式系统中存在网络故障、节点失败等问题,很难保证消息只传输一次。实现“恰好一次”需要复杂的机制,通常由应用层自己处理。
在 MQ 中,幂等性指同一条消息被多次消费时,对系统的影响一致。RabbitMQ 支持“最多一次”和“最少一次”的传输保障,无法实现“恰好一次”。RabbitMQ的 “最少一次” 模式通过重试保证消息不丢失,但重试机制带来的问题就是消息的重复。可以通过应用层设计(如幂等性)来规避问题。重复消息可能源于:
- 发送时重复:网络故障导致生产者重发相同消息。
- 投递时重复:消费者处理成功但确认失败,服务端重试投递。
2.4.10.3 解决方案
解决幂等性的核心是为消息添加唯一标识,并确保消费前检查状态。以下是常用方法:
全局局唯一 ID:
- 为每条消息分配唯一 ID(如 UUID 或 RabbitMQ 自带 ID),消费者处理前检查该 ID 是否已消费。
- 使用 Redis 的原子操作 SETNX(set if not exist):将消息 ID 作为 key,执行 SETNX messageID 1。如果返回 1(key 不存在),则正常消费;返回 0(key 存在),则丢弃消息。
它通过提供唯一标识来跟踪操作,确保重复请求不会导致副作用。
业务逻辑判断:
- 在业务层检查数据状态,例如:
- 查询数据库记录是否存在(如支付订单时验证是否已处理)。
- 使用乐观锁机制:更新数据前检查版本号,确保操作只执行一次。
- 状态机控制:业务对象定义状态(如“未处理”到“已完成”),仅当状态匹配时才处理。
2.4.10.4 顺序性保障方案
消息顺序性指消费者处理消息的顺序与生产者发送的顺序一致。例如,生产者发送消息序列 msg_1, msg_2, msg_3,消费者也应顺序处理 msg_1, msg_2, msg_3。
RabbitMQ本身不保证严格全局顺序性,尤其在分布式环境中。业务场景如用户资料修改,通常只需保证同一用户的消息顺序(局部顺序),而非全局顺序。
常见打破顺序性的场景:
- 多个消费者:当队列配置多个消费者时,消息可能被并行处理,导致顺序错乱。
- 网络波动或异常:网络问题可能导致消息确认丢失,引发消息重新入队和重试,破坏顺序。
- 消息重试:消费者处理失败或确认丢失时,消息重试可能打乱原始顺序。
- 消息路由问题:复杂路由规则可能将消息分发到不同队列,无法保证全局顺序。
- 死信队列:消息转入死信队列后,消费顺序可能不一致。
顺序性保障分为局部顺序性(单个队列内)和全局顺序性(跨队列或消费者)。全局顺序性实现困难,RabbitMQ更注重吞吐量和可用性。
常用策略包括:
方案一:单队单消费者
使用单个队列和单个消费者处理消息。RabbitMQ基于先进先出原则保证队列内顺序。简单但吞吐量低。
方案二:分区消费
将队列划分为多个分区 (如按用户ID哈希),每个分区由单一消费者处理,保证分区内的顺序。RabbitMQ本身不直接支持分区,需业务逻辑实现或借助框架(如Spring Cloud Stream)。
方案三:消息确认机制
使用手动确认模式,消费者处理完消息后发送显式 ACK,RbbitMQ 才移除消息,防止消息重试导致的乱序。
方案四:业务逻辑控制
在消息队列中嵌入序列号,消费者根据序列号排序处理。
2.4.10.5 消息积压
消息积压指队列中待处理消息超过消费者能力,导致堆积。
2.4.10.5.1 积压原因
- 消息生产过快:生产者速率过高,超过消费者处理能力。
- 消费者处理能力不足:
- 业务逻辑复杂或耗时。
- 代码性能低(如高时间复杂度算法)。
- 系统资源限制(CPU、内存、I/O)。
- 异常处理不当,导致消息未确认或重试。
- 网络问题:网络延迟或不稳定,影响消息接收和确认。
- RabbitMQ服务器配置不足:硬件或参数设置不合理。
消息积压可能导致系统性能下降或崩溃,需及时监控和干预。
2.4.10.5.2 解决方案
2.4.10.5.2.1 提高消费者效率
- 增加消费者实例:水平扩展,如新增服务器或容器。
- 优化业务逻辑:简化代码,使用多线程或异步处理。例如,将耗时操作拆分为子任务。
- 设置 prefetchCount :限制每个消费者预取消息数,避免单个消费者阻塞。
- 当消费者有代码逻辑错误切当设置为重新入队时,就会一直重复循环导致消息积压。
2.4.10.5.2.2 限制生产者速率
- 流量控制:生产者端根据消费者能力动态调整发送速率,如使用令牌桶算法。
- 限流工具:集成限流组件(如Redis或Nginx),设置消息发送上限。
- 设置消息过期时间:为消息添加TTL(Time-To-Live),未消费消息自动过期并转入死信队列。
2.4.10.5.2.3 资源与配置优化
- 升级服务器硬件:提升CPU、内存或磁盘性能。
- 调整RabbitMQ参数:如增加内存阈值、优化队列持久化设置。
- 监控告警:使用工具(如Prometheus)监控队列深度,及时告警。
2.11 Raft 算法
Raft 是一种分布式一致性算法,旨在简化共识过程,确保数据在节点间一致。它将问题分解为三个子问题:Leader 选举、日志复制和安全性。以下是关键概念:
-
节点角色:
- Leader(领导者):负责处理所有客户端请求,并将操作作为日志项复制到其他节点。Leader 定期发送心跳消息维持领导地位。
- Follower(跟随者):接收并应用 Leader 的日志条目,不直接处理客户端请求。
- Candidate(候选者):当 Follower 在超时时间内未收到 Leader 心跳时,它会转变为 Candidate 并发起选举,尝试成为新 Leader。
-
任期(Term):
Raft 将时间划分为任期,每个任期以选举开始。如果选举成功,Leader 管理集群直到任期结束;如果失败(如未选出 Leader),则任期终止,新任期开始。任期机制确保集群状态有序更迭。
-
选举过程(Leader Election):
- 触发条件:Follower 在选举超时(election timeout)内未收到 Leader 心跳时,会转变为 Candidate。
- 投票机制:Candidate 向所有节点发送投票请求,获得多数票(> N/2)的节点成为新 Leader。
- 过程示例:
- Follower 超时未收到心跳,成为 Candidate。
- Candidate 发起投票,其他节点响应。
- 若获得多数票,Candidate 成为 Leader;否则,选举失败,重新开始。
2.12 仲裁队列
RabbitMQ 的仲裁队列是一种基于 Raft 一致性算法实现的高可用队列机制,用于解决分布式系统中的数据复制和容错问题。它通过多节点复制队列数据,确保在节点故障时服务不中断,提供持久化、FIFO(先进先出)的特性。
2.12.1 仲裁队列概念
- 定义与作用:仲裁队列是 RabbitMQ 3.8 版本引入的队列类型,用于替代传统的镜像队列。它通过在多个节点间复制队列数据,实现高可用性和数据安全性。当某个节点宕机时,队列能自动切换到其他节点继续服务,避免单点故障。
- 核心机制:基于 Raft 共识算法,利用 Quorum(多数决)机制确保操作一致性。例如,对于一个有 N 个副本的队列,任何操作(如消息写入)必须获得超过半数的节点同意才能提交。
- 与镜像队列的对比:镜像队列是旧版方案,存在设计缺陷(如性能瓶颈和复杂性),已被弃用。仲裁队列优化了数据复制过程,提高了可靠性和效率。官方建议迁移到仲裁队列,并提供了迁移指南。
2.12.2 仲裁队列工作机制
仲裁队列在 RabbitMQ 中的实现依赖 Raft 的日志复制和 Quorum 机制,确保数据一致性和高可用。
-
队列副本结构:
- 每个仲裁队列有多个副本,分布在不同的 RabbitMQ 节点上。例如,复制因子(replication factor)为 5 时,有 1个主副本(Leader)和 4 个从副本(Follower)。无论多少个节点,最多非分布只能是1主4从。
- 客户端(生产者或消费者)只与主副本交互,主副本负责复制操作到从副本。
-
日志复制(Log Replication):
- 写入过程:生产者发送消息时,主副本将消息作为日志项广播给所有从副本。只有当超过半数的副本(包括主副本)将消息持久化到磁盘后,主副本才向生产者确认。这确保了数据安全,避免慢副本影响性能。
- 数学表达:设副本数为 $N$,需 $\lfloor N/2 \rfloor + 1$ 个副本确认写入。例如,$N=5$ 时,需 $3$ 个副本确认。
- 读取过程:消费者从主副本获取消息,主副本保证消息顺序(FIFO)。
- 写入过程:生产者发送消息时,主副本将消息作为日志项广播给所有从副本。只有当超过半数的副本(包括主副本)将消息持久化到磁盘后,主副本才向生产者确认。这确保了数据安全,避免慢副本影响性能。
-
故障恢复:
- 如果主副本节点故障,剩余节点通过 Raft 选举从副本中选出一个新主副本。
- 新主副本继续服务,确保队列可用性。选举和故障切换过程在秒级完成,最小化服务中断。
Redis集群的故障迁移机制是独立设计的,基于Gossip协议和多数派投票,但并未直接采用或依赖Raft算法。Redis 没有实现完整的Raft算法。它省略了日志复制、任期管理等关键部分,仅使用简单的投票和休眠策略。
仲裁队列使用 Raft 算法,在接收半数以上的从节点后返回响应时,才修改数据库。Raft 强制要求写操作必须获得多数派确认才能提交,而 Redis 集群默认采用异步复制,主节点立即修改数据库并响应客户端。
2.13 HAProxy 负载均衡
面对大量业务访问和高并发请求,RabbitMQ 集群需要负载均衡来提升性能和可靠性。如果客户端直接连接特定节点,可以避免数据丢失,但是无法避免服务的停止。以下有两个关键问题:
a)节点故障风险:如果 节点1 宕机,所有客户端连接中断,导致服务不可用。
b)负载不均衡:当节点1 宕机所有客户端连接到 节点2,造成该节点网络负载过高,而其他节点资源闲置,浪费硬件资源。
引入负载均衡器(如 HAProxy)能有效分摊客户端连接,避免上述问题。HAProxy 作为开源负载均衡器,提供高可用性和流量分发功能,特别适合 RabbitMQ 集群。
通过 HAProxy 实现负载均衡,RabbitMQ 集群获得以下好处:
- 高可用性:节点故障时,流量自动转移,避免服务中断。
- 负载均衡:连接均匀分摊到所有节点,最大化资源利用率。
- 简化客户端配置:客户端只需记住 HAProxy 的统一入口,无需感知后端节点变化。
- 可扩展性:轻松添加或移除 RabbitMQ 节点,而无需修改客户端代码。
在分布式系统中,主副本选举需要超过半数的副本同意才能生效;生产者发送一条消息时,需要超过半数的队列副本将消息写入磁盘后,系统才能向生产者进行确认。
2.14 推拉模式
在 Spring Boot 应用中,RabbitMQ 作为消息队列系统,支持两种消息消费模式:推模式(Push) 和 拉模式(Pull)。这两种模式各有特点,适用于不同的场景,如异步解耦、消息分发(一拆多)或异步点对点通信(一传一)。
2.14.1 拉模式 (Pull Mode)
在拉模式中,消费者主动从队列中获取消息,而不是被动接收。这类似于“消费者主动请求”的机制,一次只获取一条消息,适合需要精细控制消息处理的场景。
- 核心方法:在 Spring Boot 中,使用 rabbitTemplate 的 receive 方法实现拉模式。该方法会同步地从指定队列拉取消息。
- 底层机制:对应 RabbitMQ 的 AMQP 协议方法 channel.babicGet,这是一个同步操作,消费者调用后,如果队列中有消息,则立即返回一条;如果没有消息,则阻塞或返回空。
- 优点:
- 控制权在消费者:消费者可以按需拉取消息,避免消息过载。
- 适合低吞吐场景:例如,当消费者处理能力有限时,可以控制拉取频率。
- 简单调试:同步操作易于测试和日志记录。
- 缺点:
- 延迟较高:如果队列空,消费者需要轮询,增加响应时间。
- 资源消耗:频繁调用可能增加网络开销。
- 应用场景:异步点对点通信(一传一),如需要手动确认消息的场景,或消费者处理速度较慢时。
2.14.2 推模式 (Push Mode)
在推模式中,Broker(RabbitMQ 服务器)主动将消息推送给消费者,消费者被动接收。这类似于“订阅”机制,Broker 会自动分发消息,适合高吞吐场景。
- 核心方法:在 Spring Boot 中,使用 @RabbitListener 注解实现推模式。消费者定义一个监听器方法,Broker 在有消息时自动调用该方法。
- 底层机制:对应 RabbitMQ 的 AMQP 协议方法 channel.basicConsume,这是一个异步操作。Broker 维护一个推送通道,消息到达队列时立即推送给所有注册的消费者。
- 优点:
- 实时性好:消息到达时立即推送,减少延迟。
- 高吞吐:适合消息量大时,Broker 自动分发(一拆多),支持多个消费者并行处理。
- 自动管理:Spring Boot 处理连接和重试,简化代码。
- 缺点:
- 控制权在 Broker:消费者无法选择是否接收消息,如果消息涌入,可能导致消费者过载。
- 资源占用:可能占用更多内存和线程,尤其在消息峰值时。
- 应用场景:异步解耦和消息分发(一拆多),如事件驱动架构、批量处理或实时通知系统。
如果觉得对你有帮助的话,请给博主一键三连吧,这对我真的很重要
(>人<;) 求你了~
(๑・́ω・̀๑) 拜托啦~
(≧∇≦)ノ 求求你啦~
(ಥ_ಥ) 真的求你了…
(;へ:) 行行好吧~