Spring Boot集成RabbitMQ终极指南:从配置到高级消息处理
1. 什么是 RabbitMQ?
RabbitMQ 是一个开源的 消息代理(Message Broker),实现了 AMQP(Advanced Message Queuing Protocol) 协议。它充当应用程序之间的中间人,负责接收、存储和转发消息,实现 异步通信、应用解耦 和 流量削峰。
https://www.rabbitmq.com/img/tutorials/intro/rabbitmq-exchanges.webp
(生产者发送消息到 Exchange,Exchange 路由到 Queue,消费者从 Queue 获取消息)
1.1. 核心概念
(1) 消息流模型
(2) 关键组件
组件 | 作用 |
---|---|
生产者 | 发送消息的应用(如订单服务) |
消费者 | 接收消息的应用(如库存服务) |
Exchange | 接收生产者消息,根据规则路由到队列(核心路由中枢) |
Queue | 存储消息的缓冲区(FIFO),消息在此等待消费者处理 |
Binding | Exchange 和 Queue 之间的绑定规则(如路由键匹配) |
Channel | 轻量级连接(复用 TCP 连接),减少资源开销 |
Virtual Host | 虚拟隔离环境(类似命名空间),用于多租户隔 |
1.2. 消息生命周期
1.3. 高级特性
特性 | 说明 |
---|---|
消息持久化 | 消息写入磁盘(防止服务器重启丢失) |
ACK 确认机制 | 消费者处理成功后发送 ACK,否则重新入队 |
QoS 预取 | 限制消费者未确认消息数(防止消息积压) |
死信队列(DLX) | 处理失败的消息转发到特殊队列(用于重试/分析) |
TTL(生存时间) | 设置消息/队列的过期时间(自动删除超时消息) |
插件扩展 | 支持 MQTT、STOMP 等协议(如物联网场景) |
2. 核心概念与实现方案
1. 核心依赖
在pom.xml
中添加依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2. 配置文件
# application.yml
spring:application:name: rabbitmq-demorabbitmq:# 基础连接配置host: ${RABBITMQ_HOST:localhost}port: ${RABBITMQ_PORT:5672}username: ${RABBITMQ_USER:guest}password: ${RABBITMQ_PASS:guest}virtual-host: ${RABBITMQ_VHOST:/}# 连接池配置connection-factory:cache:mode: CONNECTIONsize: 5connection-timeout: 5000# 发布者确认配置publisher-confirm-type: correlatedpublisher-returns: true# 消费者配置listener:simple:acknowledge-mode: manualconcurrency: 5-10prefetch: 10# 自定义应用配置
app:rabbitmq:max-retry-count: 3# 死信队列配置dead-letter:exchange: dlx.exchangerouting-key: dlx.routing.key# 重试配置retry:enabled: truemax-attempts: 3initial-interval: 1000max-interval: 10000multiplier: 2.0# 交换机配置exchanges:order:name: order.exchangetype: directdurable: truepayment:name: payment.exchangetype: topicdurable: truenotification:name: notification.exchangetype: fanoutdurable: truedlx:name: dlx.exchangetype: directdurable: true# 队列配置queues:order:name: order.queuedurable: truearguments:x-dead-letter-exchange: dlx.exchangex-dead-letter-routing-key: dlx.routing.keypayment:name: payment.queuedurable: trueemail:name: email.queuedurable: truesms:name: sms.queuedurable: truedlq:name: dlq.queuedurable: true# 绑定配置bindings:order:queue: order.queueexchange: order.exchangerouting-key: order.routing.keypayment:queue: payment.queueexchange: payment.exchangerouting-key: payment.#email:queue: email.queueexchange: notification.exchangesms:queue: sms.queueexchange: notification.exchangedlq:queue: dlq.queueexchange: dlx.exchangerouting-key: dlx.routing.key# 日志配置
logging:level:org.springframework.amqp: DEBUGcom.example.rabbitmqdemo: TRACE# 服务端口
server:port: 8080
3. RabbitMQ自定义配置类
package com.example.rabbitmqdemo.config.properties;import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;import java.util.Map;@Data
@Builder
@Component
@NoArgsConstructor
@AllArgsConstructor
@ConfigurationProperties(prefix = "app.rabbitmq")
public class RabbitMQCustomProperties {// 最大重试次数private int maxRetryCount = 3;// 死信交换机配置private DeadLetterConfig deadLetter;// 队列配置private Map<String, QueueConfig> queues;// 交换机配置private Map<String, ExchangeConfig> exchanges;// 绑定配置private Map<String, BindingConfig> bindings;// 重试配置private RetryConfig retry;// 内部配置类@Data@Builder@NoArgsConstructor@AllArgsConstructorpublic static class DeadLetterConfig {private String exchange;private String routingKey;}@Data@Builder@NoArgsConstructor@AllArgsConstructorpublic static class QueueConfig {private String name;private boolean durable;private Map<String, Object> arguments;}@Data@Builder@NoArgsConstructor@AllArgsConstructorpublic static class ExchangeConfig {private String name;private String type;private boolean durable;}@Data@Builder@NoArgsConstructor@AllArgsConstructorpublic static class BindingConfig {private String queue;private String exchange;private String routingKey;}@Data@Builder@NoArgsConstructor@AllArgsConstructorpublic static class RetryConfig {private boolean enabled;private int maxAttempts;private long initialInterval;private long maxInterval;private double multiplier;}
}
4. 应用配置绑定类
package com.example.rabbitmqdemo.config;import com.example.rabbitmqdemo.config.properties.RabbitMQCustomProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Configuration;@Configuration
@EnableConfigurationProperties(RabbitMQCustomProperties.class)
public class AppConfig {// 启用配置属性绑定
}
5. RabbitMQ配置类(使用配置属性)
package com.example.rabbitmqdemo.config;import com.example.rabbitmqdemo.config.properties.RabbitMQCustomProperties;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {private final RabbitMQCustomProperties customProperties;@Autowiredpublic RabbitMQConfig(RabbitMQCustomProperties customProperties) {this.customProperties = customProperties;}// 创建交换机@Beanpublic DirectExchange orderExchange() {RabbitMQCustomProperties.ExchangeConfig exchange = customProperties.getExchanges().get("order");return new DirectExchange(exchange.getName(),exchange.isDurable(),false);}@Beanpublic TopicExchange paymentExchange() {RabbitMQCustomProperties.ExchangeConfig exchange = customProperties.getExchanges().get("payment");return new TopicExchange(exchange.getName(),exchange.isDurable(),false);}@Beanpublic FanoutExchange notificationExchange() {RabbitMQCustomProperties.ExchangeConfig exchange = customProperties.getExchanges().get("notification");return new FanoutExchange(exchange.getName(),exchange.isDurable(),false);}@Beanpublic DirectExchange dlxExchange() {RabbitMQCustomProperties.ExchangeConfig exchange = customProperties.getExchanges().get("dlx");return new DirectExchange(exchange.getName(),exchange.isDurable(),false);}// 创建队列@Beanpublic Queue orderQueue() {RabbitMQCustomProperties.QueueConfig queue = customProperties.getQueues().get("order");return QueueBuilder.durable(queue.getName()).withArguments(queue.getArguments()).build();}@Beanpublic Queue paymentQueue() {RabbitMQCustomProperties.QueueConfig queue = customProperties.getQueues().get("payment");return QueueBuilder.durable(queue.getName()).build();}@Beanpublic Queue emailQueue() {RabbitMQCustomProperties.QueueConfig queue = customProperties.getQueues().get("email");return QueueBuilder.durable(queue.getName()).build();}@Beanpublic Queue smsQueue() {RabbitMQCustomProperties.QueueConfig queue = customProperties.getQueues().get("sms");return QueueBuilder.durable(queue.getName()).build();}@Beanpublic Queue dlqQueue() {RabbitMQCustomProperties.QueueConfig queue = customProperties.getQueues().get("dlq");return QueueBuilder.durable(queue.getName()).build();}// 创建绑定@Beanpublic Binding orderBinding() {RabbitMQCustomProperties.BindingConfig binding = customProperties.getBindings().get("order");return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(binding.getRoutingKey());}@Beanpublic Binding paymentBinding() {RabbitMQCustomProperties.BindingConfig binding = customProperties.getBindings().get("payment");return BindingBuilder.bind(paymentQueue()).to(paymentExchange()).with(binding.getRoutingKey());}@Beanpublic Binding emailBinding() {return BindingBuilder.bind(emailQueue()).to(notificationExchange());}@Beanpublic Binding smsBinding() {return BindingBuilder.bind(smsQueue()).to(notificationExchange());}@Beanpublic Binding dlqBinding() {RabbitMQCustomProperties.BindingConfig binding = customProperties.getBindings().get("dlq");return BindingBuilder.bind(dlqQueue()).to(dlxExchange()).with(binding.getRoutingKey());}
}
6. RabbitMQ工具类(使用配置属性)
package com.example.rabbitmqdemo.util;import com.example.rabbitmqdemo.config.properties.RabbitMQCustomProperties;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.UUID;@Component
public class RabbitMQUtil {private final RabbitTemplate rabbitTemplate;private final RabbitMQCustomProperties customProperties;@Autowiredpublic RabbitMQUtil(RabbitTemplate rabbitTemplate, RabbitMQCustomProperties customProperties) {this.rabbitTemplate = rabbitTemplate;this.customProperties = customProperties;}/*** 发送普通消息* @param exchange 交换机名称* @param routingKey 路由键* @param message 消息对象*/public void sendMessage(String exchange, String routingKey, Object message) {rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {setCommonMessageProperties(msg);return msg;});}/*** 发送延迟消息* @param exchange 交换机名称* @param routingKey 路由键* @param message 消息对象* @param delay 延迟时间(毫秒)*/public void sendDelayedMessage(String exchange, String routingKey, Object message, int delay) {rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {setCommonMessageProperties(msg);msg.getMessageProperties().setDelay(delay);return msg;});}/*** 发送广播消息* @param message 消息对象*/public void sendBroadcastMessage(Object message) {String exchangeName = customProperties.getExchanges().get("notification").getName();rabbitTemplate.convertAndSend(exchangeName, "", message,msg -> {setCommonMessageProperties(msg);return msg;});}/*** 发送死信消息* @param message 消息对象*/public void sendDeadLetterMessage(Object message) {RabbitMQCustomProperties.DeadLetterConfig dlConfig = customProperties.getDeadLetter();rabbitTemplate.convertAndSend(dlConfig.getExchange(), dlConfig.getRoutingKey(), message,msg -> {setCommonMessageProperties(msg);return msg;});}// 设置通用消息属性private void setCommonMessageProperties(Message message) {message.getMessageProperties().setMessageId(UUID.randomUUID().toString());message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);}
}
7. 消息生产者服务(使用配置属性)
package com.example.rabbitmqdemo.service;import com.example.rabbitmqdemo.config.properties.RabbitMQCustomProperties;
import com.example.rabbitmqdemo.dto.OrderDTO;
import com.example.rabbitmqdemo.util.RabbitMQUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class MessageProducer {private final RabbitMQUtil rabbitMQUtil;private final RabbitMQCustomProperties customProperties;@Autowiredpublic MessageProducer(RabbitMQUtil rabbitMQUtil, RabbitMQCustomProperties customProperties) {this.rabbitMQUtil = rabbitMQUtil;this.customProperties = customProperties;}/*** 发送订单创建消息*/public void sendOrderCreatedMessage(OrderDTO order) {RabbitMQCustomProperties.BindingConfig binding = customProperties.getBindings().get("order");rabbitMQUtil.sendMessage(binding.getExchange(), binding.getRoutingKey(), order);System.out.println("订单创建消息已发送: " + order.getOrderId());}/*** 发送支付成功消息*/public void sendPaymentSuccessMessage(String paymentInfo) {RabbitMQCustomProperties.BindingConfig binding = customProperties.getBindings().get("payment");rabbitMQUtil.sendMessage(binding.getExchange(), binding.getRoutingKey(), paymentInfo);System.out.println("支付成功消息已发送: " + paymentInfo);}/*** 发送系统通知消息(广播)*/public void sendSystemNotification(String notification) {rabbitMQUtil.sendBroadcastMessage(notification);System.out.println("系统通知已广播: " + notification);}/*** 发送延迟订单取消消息*/public void sendDelayedOrderCancelMessage(OrderDTO order, int delayMillis) {RabbitMQCustomProperties.BindingConfig binding = customProperties.getBindings().get("order");rabbitMQUtil.sendDelayedMessage(binding.getExchange(), binding.getRoutingKey(), order, delayMillis);System.out.println("订单取消延迟消息已发送: " + order.getOrderId() + ", 延迟: " + delayMillis + "ms");}
}
8. 消息消费者服务(使用配置属性)
package com.example.rabbitmqdemo.service;import com.example.rabbitmqdemo.config.properties.RabbitMQCustomProperties;
import com.example.rabbitmqdemo.dto.OrderDTO;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;@Service
public class MessageConsumer {private final RabbitMQCustomProperties customProperties;@Autowiredpublic MessageConsumer(RabbitMQCustomProperties customProperties) {this.customProperties = customProperties;}/*** 处理订单创建消息*/@RabbitListener(queues = "#{@orderQueue.name}")public void handleOrderMessage(OrderDTO order, Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();String messageId = message.getMessageProperties().getMessageId();try {System.out.println("收到订单消息: " + messageId);System.out.println("订单ID: " + order.getOrderId() + ", 金额: " + order.getAmount());// 模拟业务处理processOrder(order);// 手动ACK确认channel.basicAck(deliveryTag, false);System.out.println("订单处理完成: " + order.getOrderId());} catch (BusinessException e) {// 业务异常,不需要重试System.err.println("订单处理业务异常: " + e.getMessage());channel.basicReject(deliveryTag, false);} catch (Exception e) {// 其他异常,根据重试次数决定是否重新入队System.err.println("订单处理异常: " + e.getMessage());if (shouldRetry(message)) {// 重新入队进行重试channel.basicNack(deliveryTag, false, true);System.out.println("订单消息重新入队: " + messageId);} else {// 超过最大重试次数,拒绝并不重新入队channel.basicNack(deliveryTag, false, false);System.out.println("订单消息进入死信队列: " + messageId);}}}// 判断是否应该重试private boolean shouldRetry(Message message) {Integer retryCount = message.getMessageProperties().getHeader("x-retry-count");if (retryCount == null) {retryCount = 0;}// 使用配置的最大重试次数return retryCount < customProperties.getMaxRetryCount();}// 模拟订单处理逻辑private void processOrder(OrderDTO order) throws BusinessException {// 实际业务逻辑if (order.getAmount().compareTo(BigDecimal.ZERO) <= 0) {throw new BusinessException("订单金额异常");}// 其他处理...}// 自定义业务异常private static class BusinessException extends Exception {public BusinessException(String message) {super(message);}}
}
9. 消息控制器(完整调用示例)
package com.example.rabbitmqdemo.controller;import com.example.rabbitmqdemo.config.properties.RabbitMQCustomProperties;
import com.example.rabbitmqdemo.dto.OrderDTO;
import com.example.rabbitmqdemo.service.MessageProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;import java.math.BigDecimal;@RestController
@RequestMapping("/api/messages")
public class MessageController {private final MessageProducer messageProducer;private final RabbitMQCustomProperties customProperties;@Autowiredpublic MessageController(MessageProducer messageProducer, RabbitMQCustomProperties customProperties) {this.messageProducer = messageProducer;this.customProperties = customProperties;}/*** 发送订单创建消息*/@PostMapping("/order")public String sendOrderMessage(@RequestBody OrderDTO order) {messageProducer.sendOrderCreatedMessage(order);return "订单消息已发送";}/*** 发送支付成功消息*/@PostMapping("/payment/{paymentId}")public String sendPaymentMessage(@PathVariable String paymentId) {messageProducer.sendPaymentSuccessMessage("支付ID: " + paymentId);return "支付消息已发送";}/*** 发送系统通知*/@PostMapping("/notification")public String sendNotification(@RequestParam String content) {messageProducer.sendSystemNotification(content);return "系统通知已发送";}/*** 发送延迟订单取消消息*/@PostMapping("/delayed-order")public String sendDelayedOrder(@RequestBody OrderDTO order, @RequestParam int delaySeconds) {// 使用配置中的重试间隔作为基础延迟单位long baseDelay = customProperties.getRetry().getInitialInterval();long delayMillis = delaySeconds * 1000 + baseDelay;messageProducer.sendDelayedOrderCancelMessage(order, (int) delayMillis);return "延迟订单消息已发送,延迟时间: " + delaySeconds + "秒";}
}