当前位置: 首页 > news >正文

SpringBoot3.x入门到精通系列:3.2 整合 RabbitMQ 详解

🎯 RabbitMQ简介

RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据。它是使用Erlang语言编写的,并且基于AMQP协议。

核心概念

  • Producer: 消息生产者,发送消息的应用
  • Consumer: 消息消费者,接收消息的应用
  • Queue: 消息队列,存储消息的缓冲区
  • Exchange: 交换机,负责接收消息并路由到队列
  • Routing Key: 路由键,Exchange根据它来决定消息路由到哪个队列
  • Binding: 绑定,Exchange和Queue之间的连接关系

交换机类型

  • Direct: 直连交换机,完全匹配路由键
  • Topic: 主题交换机,支持通配符匹配
  • Fanout: 扇出交换机,广播到所有绑定的队列
  • Headers: 头交换机,根据消息头属性路由

🚀 快速开始

1. 添加依赖

<dependencies><!-- SpringBoot Starter Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- SpringBoot RabbitMQ Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- JSON处理 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><!-- 测试依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies>

2. RabbitMQ配置

spring:# RabbitMQ配置rabbitmq:# RabbitMQ服务器地址host: localhost# RabbitMQ服务器端口port: 5672# 用户名username: guest# 密码password: guest# 虚拟主机virtual-host: /# 连接配置connection-timeout: 15000# 生产者配置publisher-confirm-type: correlated # 确认模式publisher-returns: true # 开启return机制# 消费者配置listener:simple:# 手动确认模式acknowledge-mode: manual# 并发消费者数量concurrency: 1# 最大并发消费者数量max-concurrency: 10# 每次从队列获取的消息数量prefetch: 1# 重试机制retry:enabled: trueinitial-interval: 1000max-attempts: 3max-interval: 10000multiplier: 1.0# 日志配置
logging:level:org.springframework.amqp: DEBUG

🔧 RabbitMQ配置类

1. 基础配置

package com.example.demo.config;import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {// 队列名称常量public static final String DIRECT_QUEUE = "direct.queue";public static final String TOPIC_QUEUE_1 = "topic.queue.1";public static final String TOPIC_QUEUE_2 = "topic.queue.2";public static final String FANOUT_QUEUE_1 = "fanout.queue.1";public static final String FANOUT_QUEUE_2 = "fanout.queue.2";public static final String DELAY_QUEUE = "delay.queue";public static final String DLX_QUEUE = "dlx.queue";// 交换机名称常量public static final String DIRECT_EXCHANGE = "direct.exchange";public static final String TOPIC_EXCHANGE = "topic.exchange";public static final String FANOUT_EXCHANGE = "fanout.exchange";public static final String DELAY_EXCHANGE = "delay.exchange";public static final String DLX_EXCHANGE = "dlx.exchange";/*** 消息转换器 - 使用JSON格式*/@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}/*** RabbitTemplate配置*/@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(messageConverter());// 设置确认回调rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.println("消息发送成功: " + correlationData);} else {System.out.println("消息发送失败: " + cause);}});// 设置返回回调rabbitTemplate.setReturnsCallback(returned -> {System.out.println("消息返回: " + returned.getMessage());System.out.println("回复码: " + returned.getReplyCode());System.out.println("回复文本: " + returned.getReplyText());System.out.println("交换机: " + returned.getExchange());System.out.println("路由键: " + returned.getRoutingKey());});return rabbitTemplate;}/*** 监听器容器工厂配置*/@Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(messageConverter());return factory;}// ========================= Direct Exchange =========================@Beanpublic Queue directQueue() {return QueueBuilder.durable(DIRECT_QUEUE).build();}@Beanpublic DirectExchange directExchange() {return new DirectExchange(DIRECT_EXCHANGE);}@Beanpublic Binding directBinding() {return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct.routing.key");}// ========================= Topic Exchange =========================@Beanpublic Queue topicQueue1() {return QueueBuilder.durable(TOPIC_QUEUE_1).build();}@Beanpublic Queue topicQueue2() {return QueueBuilder.durable(TOPIC_QUEUE_2).build();}@Beanpublic TopicExchange topicExchange() {return new TopicExchange(TOPIC_EXCHANGE);}@Beanpublic Binding topicBinding1() {return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("topic.*.message");}@Beanpublic Binding topicBinding2() {return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.#");}// ========================= Fanout Exchange =========================@Beanpublic Queue fanoutQueue1() {return QueueBuilder.durable(FANOUT_QUEUE_1).build();}@Beanpublic Queue fanoutQueue2() {return QueueBuilder.durable(FANOUT_QUEUE_2).build();}@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange(FANOUT_EXCHANGE);}@Beanpublic Binding fanoutBinding1() {return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());}@Beanpublic Binding fanoutBinding2() {return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());}// ========================= 延迟队列 =========================@Beanpublic Queue delayQueue() {return QueueBuilder.durable(DELAY_QUEUE).withArgument("x-message-ttl", 60000) // 消息TTL 60秒.withArgument("x-dead-letter-exchange", DLX_EXCHANGE) // 死信交换机.withArgument("x-dead-letter-routing-key", "dlx.routing.key") // 死信路由键.build();}@Beanpublic DirectExchange delayExchange() {return new DirectExchange(DELAY_EXCHANGE);}@Beanpublic Binding delayBinding() {return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("delay.routing.key");}// ========================= 死信队列 =========================@Beanpublic Queue dlxQueue() {return QueueBuilder.durable(DLX_QUEUE).build();}@Beanpublic DirectExchange dlxExchange() {return new DirectExchange(DLX_EXCHANGE);}@Beanpublic Binding dlxBinding() {return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx.routing.key");}
}

📊 消息实体类

package com.example.demo.dto;import com.fasterxml.jackson.annotation.JsonFormat;
import java.io.Serializable;
import java.time.LocalDateTime;public class MessageDto implements Serializable {private static final long serialVersionUID = 1L;private String id;private String content;private String type;private String sender;@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")private LocalDateTime timestamp;// 构造函数public MessageDto() {this.timestamp = LocalDateTime.now();}public MessageDto(String id, String content, String type, String sender) {this();this.id = id;this.content = content;this.type = type;this.sender = sender;}// Getter和Setter方法public String getId() { return id; }public void setId(String id) { this.id = id; }public String getContent() { return content; }public void setContent(String content) { this.content = content; }public String getType() { return type; }public void setType(String type) { this.type = type; }public String getSender() { return sender; }public void setSender(String sender) { this.sender = sender; }public LocalDateTime getTimestamp() { return timestamp; }public void setTimestamp(LocalDateTime timestamp) { this.timestamp = timestamp; }@Overridepublic String toString() {return "MessageDto{" +"id='" + id + '\'' +", content='" + content + '\'' +", type='" + type + '\'' +", sender='" + sender + '\'' +", timestamp=" + timestamp +'}';}
}

📤 消息生产者

package com.example.demo.service;import com.example.demo.config.RabbitConfig;
import com.example.demo.dto.MessageDto;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.UUID;@Service
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送Direct消息*/public void sendDirectMessage(String content) {MessageDto message = new MessageDto(UUID.randomUUID().toString(),content,"DIRECT","Producer");rabbitTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE,"direct.routing.key",message);System.out.println("发送Direct消息: " + message);}/*** 发送Topic消息*/public void sendTopicMessage(String routingKey, String content) {MessageDto message = new MessageDto(UUID.randomUUID().toString(),content,"TOPIC","Producer");rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE,routingKey,message);System.out.println("发送Topic消息 [" + routingKey + "]: " + message);}/*** 发送Fanout消息*/public void sendFanoutMessage(String content) {MessageDto message = new MessageDto(UUID.randomUUID().toString(),content,"FANOUT","Producer");rabbitTemplate.convertAndSend(RabbitConfig.FANOUT_EXCHANGE,"", // Fanout交换机忽略路由键message);System.out.println("发送Fanout消息: " + message);}/*** 发送延迟消息*/public void sendDelayMessage(String content, int delaySeconds) {MessageDto message = new MessageDto(UUID.randomUUID().toString(),content,"DELAY","Producer");// 设置消息属性rabbitTemplate.convertAndSend(RabbitConfig.DELAY_EXCHANGE,"delay.routing.key",message,msg -> {// 设置消息过期时间msg.getMessageProperties().setExpiration(String.valueOf(delaySeconds * 1000));return msg;});System.out.println("发送延迟消息 [" + delaySeconds + "s]: " + message);}/*** 发送带优先级的消息*/public void sendPriorityMessage(String content, int priority) {MessageDto message = new MessageDto(UUID.randomUUID().toString(),content,"PRIORITY","Producer");rabbitTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE,"direct.routing.key",message,msg -> {msg.getMessageProperties().setPriority(priority);return msg;});System.out.println("发送优先级消息 [" + priority + "]: " + message);}
}

📥 消息消费者

package com.example.demo.service;import com.example.demo.config.RabbitConfig;
import com.example.demo.dto.MessageDto;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;import java.io.IOException;@Service
public class MessageConsumer {/*** 消费Direct消息*/@RabbitListener(queues = RabbitConfig.DIRECT_QUEUE)public void consumeDirectMessage(MessageDto message, Message msg, Channel channel) throws IOException {try {System.out.println("接收到Direct消息: " + message);// 模拟业务处理Thread.sleep(1000);// 手动确认消息channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);System.out.println("Direct消息处理完成");} catch (Exception e) {System.err.println("处理Direct消息失败: " + e.getMessage());// 拒绝消息并重新入队channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);}}/*** 消费Topic消息 - 队列1*/@RabbitListener(queues = RabbitConfig.TOPIC_QUEUE_1)public void consumeTopicMessage1(MessageDto message, Message msg, Channel channel) throws IOException {try {System.out.println("队列1接收到Topic消息: " + message);// 模拟业务处理Thread.sleep(500);// 手动确认消息channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);System.out.println("队列1 Topic消息处理完成");} catch (Exception e) {System.err.println("队列1处理Topic消息失败: " + e.getMessage());channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);}}/*** 消费Topic消息 - 队列2*/@RabbitListener(queues = RabbitConfig.TOPIC_QUEUE_2)public void consumeTopicMessage2(MessageDto message, Message msg, Channel channel) throws IOException {try {System.out.println("队列2接收到Topic消息: " + message);// 模拟业务处理Thread.sleep(500);// 手动确认消息channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);System.out.println("队列2 Topic消息处理完成");} catch (Exception e) {System.err.println("队列2处理Topic消息失败: " + e.getMessage());channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);}}/*** 消费Fanout消息 - 队列1*/@RabbitListener(queues = RabbitConfig.FANOUT_QUEUE_1)public void consumeFanoutMessage1(MessageDto message, Message msg, Channel channel) throws IOException {try {System.out.println("Fanout队列1接收到消息: " + message);// 模拟业务处理Thread.sleep(300);// 手动确认消息channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);System.out.println("Fanout队列1消息处理完成");} catch (Exception e) {System.err.println("Fanout队列1处理消息失败: " + e.getMessage());channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);}}/*** 消费Fanout消息 - 队列2*/@RabbitListener(queues = RabbitConfig.FANOUT_QUEUE_2)public void consumeFanoutMessage2(MessageDto message, Message msg, Channel channel) throws IOException {try {System.out.println("Fanout队列2接收到消息: " + message);// 模拟业务处理Thread.sleep(300);// 手动确认消息channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);System.out.println("Fanout队列2消息处理完成");} catch (Exception e) {System.err.println("Fanout队列2处理消息失败: " + e.getMessage());channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);}}/*** 消费死信消息*/@RabbitListener(queues = RabbitConfig.DLX_QUEUE)public void consumeDlxMessage(MessageDto message, Message msg, Channel channel) throws IOException {try {System.out.println("接收到死信消息: " + message);// 处理死信消息的业务逻辑// 比如记录日志、发送告警、人工处理等// 手动确认消息channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);System.out.println("死信消息处理完成");} catch (Exception e) {System.err.println("处理死信消息失败: " + e.getMessage());channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, false);}}
}

🎮 Controller层

package com.example.demo.controller;import com.example.demo.service.MessageProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;import java.util.HashMap;
import java.util.Map;@RestController
@RequestMapping("/api/rabbitmq")
@CrossOrigin(origins = "*")
public class RabbitMQController {@Autowiredprivate MessageProducer messageProducer;/*** 发送Direct消息*/@PostMapping("/direct")public ResponseEntity<Map<String, String>> sendDirectMessage(@RequestBody Map<String, String> request) {String content = request.get("content");messageProducer.sendDirectMessage(content);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "Direct消息发送成功");return ResponseEntity.ok(response);}/*** 发送Topic消息*/@PostMapping("/topic")public ResponseEntity<Map<String, String>> sendTopicMessage(@RequestBody Map<String, String> request) {String content = request.get("content");String routingKey = request.getOrDefault("routingKey", "topic.test.message");messageProducer.sendTopicMessage(routingKey, content);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "Topic消息发送成功");response.put("routingKey", routingKey);return ResponseEntity.ok(response);}/*** 发送Fanout消息*/@PostMapping("/fanout")public ResponseEntity<Map<String, String>> sendFanoutMessage(@RequestBody Map<String, String> request) {String content = request.get("content");messageProducer.sendFanoutMessage(content);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "Fanout消息发送成功");return ResponseEntity.ok(response);}/*** 发送延迟消息*/@PostMapping("/delay")public ResponseEntity<Map<String, String>> sendDelayMessage(@RequestBody Map<String, Object> request) {String content = (String) request.get("content");Integer delaySeconds = (Integer) request.getOrDefault("delaySeconds", 10);messageProducer.sendDelayMessage(content, delaySeconds);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "延迟消息发送成功");response.put("delaySeconds", delaySeconds.toString());return ResponseEntity.ok(response);}/*** 发送优先级消息*/@PostMapping("/priority")public ResponseEntity<Map<String, String>> sendPriorityMessage(@RequestBody Map<String, Object> request) {String content = (String) request.get("content");Integer priority = (Integer) request.getOrDefault("priority", 0);messageProducer.sendPriorityMessage(content, priority);Map<String, String> response = new HashMap<>();response.put("status", "success");response.put("message", "优先级消息发送成功");response.put("priority", priority.toString());return ResponseEntity.ok(response);}
}

📊 最佳实践

1. 消息可靠性

  • 开启生产者确认机制
  • 使用持久化队列和消息
  • 实现消费者手动确认
  • 配置死信队列处理失败消息

2. 性能优化

  • 合理设置预取数量
  • 使用批量操作
  • 优化序列化方式
  • 监控队列长度

3. 高可用性

  • 配置集群模式
  • 使用镜像队列
  • 实现故障转移
  • 监控系统状态

本文关键词: RabbitMQ, 消息队列, AMQP, 异步通信, 微服务, 解耦

http://www.lryc.cn/news/609467.html

相关文章:

  • mac 锁屏不断网 2025
  • Java基础-斗地主游戏
  • 亚马逊撤离Google购物广告:重构流量生态的战略博弈
  • 编译 Paddle 遇到 flashattnv3 段错误问题解决
  • 37. line-height: 1.2 与 line-height: 120% 的区别
  • YAML文件
  • Vue Router快速入门
  • 高精度实战:YOLOv11交叉口目标行为全透视——轨迹追踪×热力图×滞留分析(附完整代码)
  • 深度学习TR3周:Pytorch复现Transformer
  • 第三阶段—8天Python从入门到精通【itheima】-143节(pyspark实战——数据计算——flatmap方法)
  • 大型语言模型落地应用全景指南:微调、提示工程、多模态与企业级解决方案
  • Perl 面向对象编程深入解析
  • 如何计算卷积层的计算量?从参数到公式的详细推导
  • PPT自动化 python-pptx - 11 : 备注页 (Notes Slides)
  • JUCE VST AI 开源
  • 进程生命周期管理:从创建到终止的完整逻辑
  • 解锁高并发LLM推理:动态批处理、令牌流和使用vLLM的KV缓存秘密
  • Oracle ASH的手册
  • Git简易教程
  • javacc实现简单SQL解析器
  • JSqlParser学习笔记 快速使用JSqlParser
  • [特殊字符] Ubuntu 下 MySQL 离线部署教学(含手动步骤与一键脚本)
  • 虚拟机中查看和修改文件权限
  • SelectDB:新一代实时数仓的核心引擎与应用实战
  • Python day34
  • Druid学习笔记 03、Druid的AstNode类详解与其他产品测试体验
  • 阿里云-通义灵码:解锁云原生智能开发新能力,让云开发更“灵”~
  • 【Linux操作系统】简学深悟启示录:进程初步
  • [spring-cloud: @LoadBalanced @LoadBalancerClient]-源码分析
  • DevOps平台大比拼:Gitee、Jenkins与CircleCI如何选型?