当前位置: 首页 > news >正文

Spring Boot 整合 RabbitMQ

Spring Boot 整合 RabbitMQ

一、概述:RabbitMQ 是什么?

你可以把 RabbitMQ 想象成一个「快递中转站」。
比如你在网上买了一本书,卖家(生产者)把包裹(消息)交给快递站(RabbitMQ),快递站根据包裹上的地址(规则)把包裹分给不同的快递员(消费者),最后送到你家(业务系统)。

RabbitMQ 是一个专门用来「传递消息」的软件(专业叫「消息中间件」),它能让不同的程序、不同的电脑之间高效地「传小纸条」。


二、RabbitMQ 的「快递分类方式」(交换机类型)

快递站分包裹时,可能按「地址」「重量」「紧急程度」分类。RabbitMQ 也有类似的「分类规则」,叫 交换机(Exchange)。常用的有 4 种:

1. 直连交换机(Direct Exchange)

规则:包裹上必须写「精确地址」(路由键 Routing Key),只有地址完全匹配的快递员才能收到。
例子:卖家给「北京-朝阳区」的包裹,只有负责朝阳区的快递员能接。

2. 扇形交换机(Fanout Exchange)

规则:不管地址,「所有快递员」都能收到包裹(广播模式)。
例子:卖家发「双11大促通知」,所有快递员都要知道,一起准备加班。

3. 主题交换机(Topic Exchange)

规则:地址可以用「通配符」(比如 * 代表一个词,# 代表多个词)。
例子:卖家发「北京.*」的包裹,所有地址以「北京」开头的快递员(如北京-朝阳、北京-海淀)都能收到。

4. 头交换机(Headers Exchange)

规则:不看地址,看包裹上的「标签」(Headers 头信息,比如「优先级=高」)。
例子:卖家标「紧急」的包裹,只有关注「紧急」标签的快递员能接。


三、RabbitMQ 的使用场景(为什么需要它?)

1. 异步处理:省时间!

比如你在淘宝下单,系统需要「扣库存+发短信+更新积分」。如果一步步做,可能要等 5 秒;用 RabbitMQ 可以把「发短信」和「更新积分」的任务丢给 RabbitMQ,主流程只需要 1 秒完成下单,剩下的由其他程序慢慢处理。

2. 流量削峰:防崩溃!

双11时,订单像洪水一样涌来,系统直接处理可能被冲垮。RabbitMQ 像「水库」,把订单暂时存起来,系统按自己的速度慢慢处理(比如每秒处理 1000 单),避免被瞬间的高流量冲垮。

3. 系统解耦:不互相拖累!

比如电商系统有「订单模块」「库存模块」「短信模块」。如果订单模块直接调用库存和短信模块,一旦短信模块崩溃,订单也会失败。用 RabbitMQ 后,订单模块只需要把消息发给 RabbitMQ,其他模块自己来取,互不影响。

四、整合Springboot

1. 配置 RabbitMQ 连接

1.Maven

<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId>
</dependency>

2.配置文件,yml和properties选择一个

spring:rabbitmq:host: 117.185.165.187port: 5672username: rabbitmqpassword: j8iG3KYs7Wmxxx
# RabbitMQ 服务器地址(默认 localhost:5672)
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
# 登录账号密码(默认 guest/guest,注意:远程连接需要改密码!)
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

2、定义「快递规则」:交换机和队列

RabbitMQ 的消息需要通过「交换机(Exchange)」和「队列(Queue)」传递。我们需要先告诉 Spring Boot 要创建哪些交换机和队列。

新建 RabbitMQConfig.java,用 @Bean 声明交换机、队列和绑定关系。

做一个「电商下单后发通知」的功能,需要:

  • 一个直连交换机(order_exchange)。
  • 一个队列(sms_queue),专门存「需要发短信的订单」。
  • 把队列和交换机绑定,路由键是 send_sms(只有路由键匹配的消息才会进这个队列)。
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// 1. 声明直连交换机(名字叫 order_exchange)@Beanpublic DirectExchange orderExchange() {return new DirectExchange("order_exchange");}// 2. 声明队列(名字叫 sms_queue,存需要发短信的订单)@Beanpublic Queue smsQueue() {return new Queue("sms_queue");}// 3. 把队列和交换机绑定,路由键是 send_sms(只有路由键匹配的消息才会进这个队列)@Beanpublic Binding smsBinding(Queue smsQueue, DirectExchange orderExchange) {return BindingBuilder.bind(smsQueue).to(orderExchange).with("send_sms");  // 路由键必须和生产者发送时一致}
}

如果说是多个队列按照下面的

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// 1. 声明直连交换机(名字叫 order_exchange)@Beanpublic DirectExchange orderExchange() {return new DirectExchange("order_exchange");}// 2. 声明 3 个队列(短信、积分、日志)@Beanpublic Queue smsQueue() {return new Queue("sms_queue");  // 存需要发短信的订单}@Beanpublic Queue scoreQueue() {return new Queue("score_queue");  // 存需要更新积分的订单}@Beanpublic Queue logQueue() {return new Queue("log_queue");  // 存需要记录日志的订单}// 3. 绑定 sms_queue(路由键 send_sms)@Beanpublic Binding smsBinding(Queue smsQueue, DirectExchange orderExchange) {return BindingBuilder.bind(smsQueue).to(orderExchange).with("send_sms");  // 路由键:只有 send_sms 的消息会进 sms_queue}// 4. 绑定 score_queue(路由键 update_score)@Beanpublic Binding scoreBinding(Queue scoreQueue, DirectExchange orderExchange) {return BindingBuilder.bind(scoreQueue).to(orderExchange).with("update_score");  // 路由键:只有 update_score 的消息会进 score_queue}// 5. 绑定 logQueue(路由键 log_order)@Beanpublic Binding logBinding(Queue logQueue, DirectExchange orderExchange) {return BindingBuilder.bind(logQueue).to(orderExchange).with("log_order");  // 路由键:只有 log_order 的消息会进 log_queue}}

3、生产者:发送消息(卖家发包裹)

RabbitTemplate(Spring 提供的发消息工具)发送消息到交换机。

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class OrderService {// 注入 RabbitTemplate(Spring 自动帮我们创建好的发消息工具)@Autowiredprivate RabbitTemplate rabbitTemplate;// 用户下单后,发送消息到 RabbitMQpublic void createOrder(String orderInfo) {// 1. 主流程:扣库存、保存订单(这里简化,直接打印)System.out.println("主流程:订单已保存,开始扣库存...");// 2. 异步任务:发送短信通知(把消息发给 RabbitMQ)rabbitTemplate.convertAndSend("order_exchange",  // 交换机名字"send_sms",        // 路由键(和队列绑定的路由键一致)orderInfo          // 消息内容(比如订单详情));System.out.println("已发送短信通知任务到 RabbitMQ");}
}

4、消费者:接收消息(快递员收包裹)

@RabbitListener 注解监听队列,自动接收并处理消息。

新建消费者服务类

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class SmsConsumer {// 监听 sms_queue 队列,有消息就自动触发这个方法@RabbitListener(queues = "sms_queue")public void sendSms(String orderInfo) {System.out.println("收到短信任务,正在发送...");// 这里调用短信接口(比如阿里云短信),实际代码需要替换System.out.println("已给用户发送短信:" + orderInfo);}
}

如果说是多线程处理就多添加一个配置concurrency = "5"

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class SmsConsumer {// 监听 sms_queue 队列,有消息就自动触发这个方法@RabbitListener(queues = "sms_queue",concurrency = "5")public void sendSms(String orderInfo) {System.out.println("收到短信任务,正在发送...");// 这里调用短信接口(比如阿里云短信),实际代码需要替换System.out.println("已给用户发送短信:" + orderInfo);}
}
1、如何避免消息被重复处理?

如果你的场景是「多个消费者抢着处理同一条消息」(比如并行加速),需要确保 一条消息只被一个消费者处理。RabbitMQ 默认已经帮你实现了这一点!

2、原理:消息确认机制(ACK)
  • 当消费者收到消息后,RabbitMQ 会等待消费者「确认」(ACK)。
  • 如果消费者正常处理完消息并返回 ACK,RabbitMQ 会删除这条消息,不会再发给其他消费者。
  • 如果消费者处理失败(比如崩溃),RabbitMQ 会重新将消息分发给其他消费者。
3、注意事项
1. 消息幂等性(防重复处理)

如果消费者处理消息时,因为网络问题导致 ACK 未成功返回,RabbitMQ 会重新发送消息,可能导致重复处理。
解决方法

  • 消息里加唯一标识(如订单号)。
  • 处理前检查是否已处理过(比如查数据库)。
2. 消费者数量别太多!

concurrency 不是越大越好!如果消费者数量超过服务器 CPU 核心数,反而会因为线程切换浪费资源。
建议:根据业务耗时调整,比如处理耗时 1 秒的任务,消费者数量 = CPU 核心数 × 2 比较合理。

3. 手动确认消息(高级场景)

默认是自动 ACK(auto_ack=true),但如果处理消息可能失败(比如调用外部接口超时),建议用手动 ACK。

@RabbitListener(queues = "order_queue", ackMode = "MANUAL")  // 手动确认
public void processOrder(String orderInfo, Channel channel, Message message) {try {// 处理消息...channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);  // 手动确认成功} catch (Exception e) {// 处理失败,重新入队(或发送到死信队列)channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}
}

五、常见问题 & 注意事项

1. 消息丢失怎么办?

  • 开启「消息持久化」:在声明队列和交换机时,设置 durable=true(默认是 true,重启 RabbitMQ 后消息不丢失)。
  • 生产者确认:配置 spring.rabbitmq.publisher-confirm-type=correlated,确保消息成功发到交换机。
  • 消费者确认:默认是 auto_ack=true(自动确认),如果需要手动确认(比如处理消息时可能失败),可以设置 @RabbitListener(ackMode = "MANUAL"),处理完再调用 channel.basicAck()

2. 重复消费怎么办?

  • 消息里加唯一标识(如订单号),消费者处理前检查是否已处理过(比如查数据库)。

3. RabbitMQ 连不上?

  • 检查 application.properties 里的 hostportusernamepassword 是否正确。
  • 远程连接时,RabbitMQ 默认禁止 guest 用户,需要新建用户并授权(管理界面操作)。

六、总结

用 Spring Boot 整合 RabbitMQ 超简单!核心步骤就 4 步:

  1. 配连接:在 application.properties 里填 RabbitMQ 地址。
  2. 定义规则:用 @Bean 声明交换机、队列和绑定关系。
  3. 发消息:用 RabbitTemplate.convertAndSend() 发送。
  4. 收消息:用 @RabbitListener 监听队列。

适合用 Spring Boot + RabbitMQ 的场景

  • 电商、物流等需要「异步任务」的系统。
  • 高并发场景(如双11订单洪峰)。
  • 多个模块需要「松耦合」协作的系统(如订单、短信、积分模块)。
http://www.lryc.cn/news/584306.html

相关文章:

  • 大语言模型驱动智能语音应答:技术演进与架构革新
  • Java Reference类及其实现类深度解析:原理、源码与性能优化实践
  • 聊一聊 Linux 上对函数进行 Hook 的两种方式
  • 使用EasyExcel动态合并单元格(模板方法)
  • Centos 7下使用C++使用Rdkafka库实现生产者消费者
  • Houdini 分布式解算效率瓶颈突破:渲染 101 云集群实战指南
  • 编程实践:单例模式(懒汉模式+饿汉模式)
  • 面试技术问题总结一
  • android TabLayout 标题栏切换 事件拦截
  • 【Linux系统】冯诺依曼体系结构 | 初识操作系统
  • Redis数据安全性分析
  • Spring Boot快速搭建RESTful应用
  • P1722 矩阵 II 题解 DFS深度优先遍历与卡特兰数(Catalan number)解
  • 【WPF实战】MVVM中如何从数据模型反查自定义控件实例(ImageView + Halcon)
  • C++类对象多态底层原理及扩展问题
  • Zotero+zotmoov+坚果云同步
  • 2023年华为杯研究生数学建模竞赛E题脑卒中临床智能分析
  • 我的世界Java版1.21.4的Fabric模组开发教程(十五)方块实体渲染器
  • 北京一家IPO业绩持续性存疑,关联交易频繁独立性堪忧
  • iOS 抓包详细教程:从零搭建、操作到实战调试的全流程指南
  • C++ -- STL -- vector
  • 北斗舞动在线监测装置:电力安全的“智慧守护者”
  • 大健康IP如何借“合规创新”抢占行业新风口|创客匠人
  • 基于Python的程序员数据分析与可视化系统的设计与实现
  • linxu内核的signal fault和arm内核的flault
  • 网络综合实验
  • Flowable21条件事件------------持续更新中
  • 【LeetCode100】--- 2.字母异位词分组【复习回顾】
  • 【LeetCode 热题 100】148. 排序链表——(解法二)分治
  • 数据结构与算法之美:广义表