当前位置: 首页 > news >正文

微服务消息队列之——RabbitMQ

本篇文章是基于黑马微服务的课程学习,一些自己理解和看法。让大家对RabbitMQ有一个基本的了解。

1.什么是RabbitMQ

微服务之间的调用是采用OpenFeign,而OpenFeign又是同步调用,可以清晰看下面的例子:

在这里插入图片描述

我们执行图片这样一个业务,我们需要在支付服务完成三步:

  1. 调用用户服务,执行扣减余额
  2. 更新支付服务的交易流水
  3. 调用交易服务,更新订单的状态

第一步和第二步操作需要保证为原子操作

完成三步后,整个支付服务才算完成。这里我们需要思考:支付服务是否需要交易服务反馈过来结果?我们可以发现是不需要的。因此,我们可以采用异步调用的方式去执行交易服务。那么支付服务和交易服务之间媒介,由谁负责——消息队列

1.1 同步调用会导致的问题

  1. 拓展性差
    • 倘若我们需要到支付服务中拓展新的服务,比如:短信服务,计分服务。这样需要到业务修改很多代码,不符合OPC开闭原则,拓展性不好。
  2. 性能下降
    • 支付服务里面子服务太多,影响支付的性能
  3. 级联失败
    • 可能由于没有发送成功短信导致,整个支付失败,这是因小失大

1.2 几种常见的消息队列

RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般

追求可用性:Kafka、 RocketMQ 、RabbitMQ

追求可靠性:RabbitMQ、RocketMQ

追求吞吐能力:RocketMQ、Kafka

追求消息低延迟:RabbitMQ、Kafka

今天我们介绍的是RabbitMQ

1.3 RabbitMQ

1.3.1 核心概念速览

RabbitMQ 是一个开源的“消息代理”(message broker),它实现了 AMQP 0-9-1 协议,也支持 STOMP、MQTT、HTTP 等多种协议,用来在分布式系统或微服务之间做 可靠、异步、解耦 的消息收发。官网地址:https://www.rabbitmq.com/

概念类比/作用
Producer发信人:把消息发到 Exchange
Exchange分拣员:按规则(路由键、通配符等)把消息分到 Queue
Queue信箱:真正存储消息的地方
Binding分拣规则:Exchange 与 Queue 的绑定关系
Consumer收信人:从 Queue 里拿消息
Virtual Host虚拟邮局:逻辑隔离(多租户)
ConnectionTCP 长连接
Channel轻量级“会话”,减少 TCP 复用开销
1.3.2 四种交换机类型(路由策略)
  1. Direct(直连)
    路由键完全匹配才投递。
    例:routingKey = "order.paid" → 只进绑定了该 key 的队列。
  2. Fanout(广播)
    消息发到所有绑定的队列,忽略路由键。
  3. Topic(主题通配)
    路由键支持 *(单级)和 #(多级)。
    例:order.* 匹配 order.paidorder.created
  4. Headers(头信息匹配)
    根据消息头里的键值对过滤,较少用。

2. 部署RabbitMQ

2.1 在Docker中部署

MQ压缩包下载

先把压缩包放在linux系统中,然后dacker load -f [RabbitMQ压缩包] 的镜像加载到Docker容器中。

然后执行创建容器的命令

docker run \-e RABBITMQ_DEFAULT_USER=admin \ -e RABBITMQ_DEFAULT_PASS=123456 \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \--network hm-net\-d \rabbitmq:3.8-management

部署好了,访问地址:RabbitMQ Management,需要根据自己的虚拟机地址修改前面网络号。

2.2 访问网址

在这里插入图片描述

RabbitMQ 官方管理网站(Web UI) 中完成“创建队列、交换机、用户隔离”只需三步,全程图形界面操作

第 1 步:创建用户(实现用户隔离)

  1. 登录管理后台:http://<host>:15672
  2. 顶部菜单 → AdminAdd a user
  3. 填写用户名、密码、角色(如 management),点击 Add user
  4. 新用户默认 无任何虚拟主机权限,继续下一步。

第 2 步:创建虚拟主机(Vhost)实现数据隔离

  1. 仍在 Admin 页面 → Virtual HostsAdd a new virtual host
  2. 输入名称,如:/pay_vhostAdd virtual host
  3. 点击刚创建的 vhost → Permissions → 给第 1 步的用户 Set permission(赋予读写配置权限)
    • 完成后该用户只能看到 /pay_vhost 下的资源,彻底与其他业务隔离。

第 3 步:在 vhost 内创建交换机和队列

  1. 右上角 Virtual Host 下拉框切换为 /pay_vhost
  2. 顶部菜单 → QueuesAdd a new queue
    • 输入队列名(如 order.queue)、选择 Durable(持久化)→ Add queue
  3. 顶部菜单 → ExchangesAdd a new exchange
    • 输入交换机名(如 order.exchange)、类型(如 direct)、DurableAdd exchange
  4. 进入交换机详情页 → Bindings → 绑定刚创建的队列,填写 Routing key(如 order.paid)→ Bind

3.SpringAMQP

Spring AMQP 是 Spring 生态中专为 AMQP(高级消息队列协议,如 RabbitMQ)设计的“全家桶”级解决方案。它把 RabbitMQ 的底层 API 包装成 Spring 风格的模板、注解和自动配置,开发者几乎不用写原生 RabbitMQ 代码就能完成消息的发送、接收、路由和治理。

3.1 广播交换机案例实现

在这里插入图片描述

简单的案例实现,三个模块:一个生产者,一个消费者,一个交换机

Demo文件下载

在这里插入图片描述

在这里插入图片描述

包括三部分:

  • mq-demo:父工程,管理项目依赖
  • publisher:消息的发送者
  • consumer:消息的消费者
3.1.1 依赖安装

注意:JDK版本是11

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.itcast.demo</groupId><artifactId>mq-demo</artifactId><version>1.0-SNAPSHOT</version><modules><module>publisher</module><module>consumer</module></modules><packaging>pom</packaging><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.12</version><relativePath/></parent><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--单元测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>
</project>

备注:消费者和生产者都要安装这些依赖

3.1.2 配置文件(前提新建一个hmall用户)
spring:rabbitmq:host: 127.0.0.1 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123456 # 密码

怎么新建用户,前面已经写了

3.1.3 声明队列和交换机

在控制台创建队列fanout.queue1fanout.queue2

在这里插入图片描述

然后再创建一个交换机:

在这里插入图片描述

然后绑定两个队列到交换机,首先点击那个新建的交换机:

在这里插入图片描述

在这里插入图片描述

3.1.4 生成者发送消息

到测试类写

package com.itheima.publisher.amqp;import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testExchange() {String exchangeName = "hmall.fanout";// 消息String message = "hello, everyone!";rabbitTemplate.convertAndSend(exchangeName, "", message);}
}
3.1.5 消费者接受消息

文件结构
在这里插入图片描述

package com.itheima.consumer.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.time.LocalDateTime;/*** @program: mq-demo* @description:* @author: WangXin* @create: 2025-07-29 19:55**/
@Component
public class SpringRabbitListener {@RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String msg) {System.out.println("消费者1接收到Fanout消息:【" + msg + "】");}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2(String msg) {System.out.println("消费者2接收到Fanout消息:【" + msg + "】");}
}
3.1.6 测试
  • 先启动测试类
  • 再启动消费者

在这里插入图片描述

3.2 基于注解实现

在之前我们都是基于RabbitMQ控制台来创建队列、交换机。但是在实际开发时,队列和交换机是程序员定义的,将来项目上线,又要交给运维去创建。那么程序员就需要把程序中运行的所有队列和交换机都写下来,交给运维。在这个过程中是很容易出现错误的。

因此推荐的做法是由程序启动时检查队列和交换机是否存在,如果不存在自动创建。

这里介绍是采用注解的形式创建

3.2.1 Direct模式的交换机

新建一个配置类

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
注解/属性含义运行时效果
@RabbitListener声明一个“消息监听器”Spring 容器启动时会自动注册一个监听器容器(Listener Container),负责从 RabbitMQ 拉消息并调用当前方法。
bindings = @QueueBinding(…)一次性声明“队列-交换机-绑定”三者关系在监听器启动之前,Spring AMQP 会检查 RabbitMQ 中是否存在这些对象; 不存在则自动创建
@Queue(name = “direct.queue1”)定义/引用队列如果服务器上没有 direct.queue1,就按默认持久化、非排他、非自动删除的规则创建出来。
@Exchange(name = “hmall.direct”, type = ExchangeTypes.DIRECT)定义/引用交换机如果没有 hmall.direct 交换机,就创建一个 DIRECT 类型 的持久化交换机。
key = {“red”, “blue”}指定路由键(Routing Key)把队列 direct.queue1 与交换机 hmall.direct 建立 两条绑定
hmall.directdirect.queue1 的 key 分别是 redblue
http://www.lryc.cn/news/604386.html

相关文章:

  • .NET 10 中的新增功能系列文章2——ASP.NET Core 中的新增功能
  • PostGIS安装与pg_dump/pg_restore排错
  • flutter 记录一个奇怪的问题
  • 在 Mac 上用 Vagrant 安装 K8s
  • InfluxDB 3 数据库命名与创建全攻略:规范、限制与实战指南
  • 《零基础入门AI:传统机器学习核心算法解析(KNN、模型调优与朴素贝叶斯)》
  • GaussDB 数据库架构师(十二) 数据库对象修改审计设置
  • Redis学习------缓存穿透
  • llama factory本地部署常见问题
  • Git版本控制器
  • 人工智能与家庭:智能家居的便捷与隐患
  • gdb调试的限制和配置自动生成core
  • 2023 年 NOI 最后一题题解
  • 【C++篇】哈希扩展:位图和布隆过滤器+哈希切割
  • 【Lambda】flatMap使用案例
  • c++之基础B(第一课)
  • dify离线插件打包步骤
  • 在Trae中使用MoonBit月兔
  • 【编号65】广西地理基础数据(道路、水系、四级行政边界、地级城市、DEM等)
  • 在 Elasticsearch 8.19 和 9.1 中引入更强大、更具弹性和可观测性的 ES|QL
  • Buck的Loadline和DVS区别和联系
  • Jenkinsfile 报错
  • 一篇讲清Redis中常见数据类型的用法
  • Three.js 与 WebXR:初识 VR/AR 开发
  • 国产化再进一步,杰和科技推出搭载国产芯片的主板
  • LoRaWAN协议,提升公用事业能源效率的“隐形引擎”
  • Ubuntu22.04.1搭建php运行环境
  • C++ 高性能容器:ankerl::unordered_dense::map
  • 元码智能“大眼睛”机器人首发,智启生活新纪元!
  • RabbitMQ 发送方确认的两大工具 (With Spring Boot)