当前位置: 首页 > news >正文

深入浅出 RabbitMQ-交换机详解与发布订阅模型实战

大家好,我是工藤学编程 🦉一个正在努力学习的小博主,期待你的关注
实战代码系列最新文章😉C++实现图书管理系统(Qt C++ GUI界面版)
SpringBoot实战系列🐷【SpringBoot实战系列】SpringBoot3.X 整合 MinIO 存储原生方案
分库分表分库分表之实战-sharding-JDBC分库分表执行流程原理剖析
消息队列深入浅出 RabbitMQ-工作队列实战(轮训策略VS公平策略)

前情摘要:

1、深入浅出 RabbitMQ-核心概念介绍与容器化部署
2、深入浅出 RabbitMQ-简单队列实战
3、深入浅出 RabbitMQ-工作队列实战(轮训策略VS公平策略)


【亲测宝藏】发现一个让 AI 学习秒变轻松的神站!不用啃高数、不用怕编程,高中生都能看懂的人工智能教程来啦!

👉点击跳转,和 thousands of 小伙伴一起用快乐学习法征服 AI,说不定下一个开发出爆款 AI 程序的就是你!


本文章目录

  • 玩转RabbitMQ:交换机详解与发布订阅模型实战
    • 一、为什么需要交换机(Exchange)?
    • 二、RabbitMQ交换机类型全解析
      • 关键细节:
    • 三、发布订阅模型:基于Fanout交换机的实战
      • 3.1 发布订阅模型的典型场景
      • 3.2 核心原理
    • 四、代码实战:发布订阅模型实现
      • 环境准备
      • 4.1 消息生产者(Send):发送消息到Fanout交换机
      • 4.2 消息消费者(Recv1 & Recv2):监听并接收广播消息
    • 五、实战验证:广播消息是否生效
      • 测试步骤:
      • 关键现象解析:
    • 六、核心知识点总结

玩转RabbitMQ:交换机详解与发布订阅模型实战

一、为什么需要交换机(Exchange)?

在简单队列和工作队列中,生产者直接将消息发送到队列,但实际业务中,我们常需要更灵活的消息路由能力:比如"将订单消息发给物流系统和支付系统"、“将error级别的日志只发给告警服务”。这时,交换机(Exchange) 就成了关键角色。

简单说,交换机是RabbitMQ中消息的"中转站":

  • 生产者不再直接发送消息到队列,而是发送到交换机;
  • 交换机根据预设的路由规则,将消息转发到一个或多个队列;
  • 交换机本身不存储消息:若没有队列绑定它,或没有匹配的路由规则,消息会直接丢失。

二、RabbitMQ交换机类型全解析

RabbitMQ提供4种交换机类型,核心差异在于路由规则的不同。其中前3种最常用,最后一种(Headers)因灵活性低较少使用。

交换机类型核心路由规则适用场景路由键(RoutingKey)作用
Direct(定向)消息路由键与队列绑定的路由键完全匹配单队列精准路由(如订单状态通知)必须指定,且需完全匹配
Fanout(扇形)无视路由键,广播消息到所有绑定的队列发布订阅(如微信公众号、日志广播)无需指定,绑定后均能接收
Topic(主题)路由键与绑定的模式模糊匹配(支持*#多规则路由(如按级别/模块路由日志)需符合模式(*匹配1个词,#匹配多个)
Headers(头信息)根据消息头(Headers)的键值对匹配极少使用(可被Topic替代)不依赖路由键,依赖消息头属性

关键细节:

  • Direct:最严格的匹配,适合一对一的精准路由;
  • Fanout:转发速度最快,因为无需解析路由键,适合广播场景;
  • Topic:最灵活,支持通配符,能覆盖大部分复杂路由需求。

三、发布订阅模型:基于Fanout交换机的实战

发布订阅(Publish/Subscribe)是RabbitMQ中经典的消息模式,核心是"一条消息被多个消费者同时接收",其底层依赖Fanout交换机实现广播能力。

3.1 发布订阅模型的典型场景

  • 微信公众号:作者发一篇文章,所有订阅者都能收到;
  • 日志系统:一条错误日志,同时发给告警服务、存储服务、分析服务;
  • 实时通知:秒杀活动开始,所有在线用户收到通知。

3.2 核心原理

  1. 生产者将消息发送到Fanout交换机
  2. 交换机将消息广播到所有与之绑定的队列
  3. 每个队列对应一个消费者,因此所有消费者都能收到消息。

注意:每个消费者需要创建自己的队列(通常是临时队列),并绑定到Fanout交换机,否则无法接收消息。

四、代码实战:发布订阅模型实现

环境准备

  • RabbitMQ服务已启动(参考前文部署教程);
  • 依赖:amqp-client(同前文,版本5.10.0+);
  • 虚拟主机:/dev(确保生产者和消费者一致)。

4.1 消息生产者(Send):发送消息到Fanout交换机

public class Send {// 交换机名称(全局唯一,生产者和消费者需一致)private final static String EXCHANGE_NAME = "exchange_fanout";public static void main(String[] argv) throws Exception {// 1. 创建连接工厂并配置参数ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.229.128");    // RabbitMQ服务器地址factory.setUsername("admin");       // 用户名factory.setPassword("password");    // 密码factory.setVirtualHost("/dev");     // 虚拟主机(必须一致)factory.setPort(5672);              // AMQP协议端口// 2. 创建连接和信道(try-with-resources自动关闭)try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 3. 声明交换机:类型为Fanout(扇形)/*** 参数说明:* 1. exchange:交换机名称* 2. type:交换机类型(BuiltinExchangeType.FANOUT)* 3. durable:是否持久化(重启后交换机依然存在)* 4. autoDelete:是否自动删除(最后一个绑定解除后删除)* 5. internal:是否内部交换机(一般为false,外部可发送消息)* 6. arguments:额外参数*/channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, false, false, false, null);// 4. 发送消息到交换机(Fanout交换机无需指定路由键)String message = "Hello World! 这是一条广播消息~";/*** 参数说明:* 1. exchange:目标交换机名称* 2. routingKey:路由键(Fanout类型无用,可设为空)* 3. props:消息属性(如持久化、优先级等)* 4. body:消息体(字节数组)*/channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));System.out.println(" [x] 生产者已发送消息: '" + message + "'");}}
}

4.2 消息消费者(Recv1 & Recv2):监听并接收广播消息

两个消费者逻辑一致,均需绑定到Fanout交换机,通过临时队列接收消息。

// 消费者1
public class Recv1 {private final static String EXCHANGE_NAME = "exchange_fanout";public static void main(String[] argv) throws Exception {// 1. 连接配置(与生产者完全一致)ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.229.128");factory.setUsername("admin");factory.setPassword("password");factory.setVirtualHost("/dev");factory.setPort(5672);// 2. 创建连接和信道(消费者保持长连接,不自动关闭)Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 3. 声明交换机(与生产者一致,确保交换机存在)channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, false, false, false, null);// 4. 创建临时队列(发布订阅模式专用)/*** 特点:* - 随机生成队列名称(如amq.gen-xxxx)* - 独占队列(exclusive=true):仅当前连接可访问,连接关闭后自动删除* - 自动删除(autoDelete=true):最后一个消费者断开后删除*/String queueName = channel.queueDeclare().getQueue(); // 无参数表示创建临时队列System.out.println("Recv1 绑定的临时队列名称:" + queueName);// 5. 绑定队列与交换机(Fanout交换机无需指定路由键)channel.queueBind(queueName, EXCHANGE_NAME, "");// 6. 定义消息处理回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Recv1 收到消息:" + message);// 若需手动确认消息,可添加 channel.basicAck(...)(此处简化用自动确认)};// 7. 开始消费消息(自动确认模式,适合简单场景)channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});System.out.println("Recv1 已启动,等待接收消息...(按CTRL+C退出)");}
}
// 消费者2(代码与Recv1完全一致,仅打印标识不同)
public class Recv2 {private final static String EXCHANGE_NAME = "exchange_fanout";public static void main(String[] argv) throws Exception {// 连接配置、交换机声明、临时队列创建、绑定逻辑与Recv1一致ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.229.128");factory.setUsername("admin");factory.setPassword("password");factory.setVirtualHost("/dev");factory.setPort(5672);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, false, false, false, null);String queueName = channel.queueDeclare().getQueue();System.out.println("Recv2 绑定的临时队列名称:" + queueName);channel.queueBind(queueName, EXCHANGE_NAME, "");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Recv2 收到消息:" + message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});System.out.println("Recv2 已启动,等待接收消息...(按CTRL+C退出)");}
}

五、实战验证:广播消息是否生效

测试步骤:

  1. 启动消费者:先运行Recv1和Recv2,控制台会输出各自绑定的临时队列名称(如amq.gen-abc123);
    在这里插入图片描述在这里插入图片描述

  2. 启动生产者:运行Send,发送一条消息;
    在这里插入图片描述

  3. 观察结果:Recv1和Recv2的控制台均会打印收到的消息,说明Fanout交换机成功将消息广播到两个队列。
    在这里插入图片描述
    在这里插入图片描述

关键现象解析:

  • 若未绑定队列到交换机:生产者发送的消息会因"无匹配队列"而丢失;
  • 临时队列的作用:每个消费者独占一个临时队列,确保消息能被各自接收,且消费者退出后队列自动清理,不占用资源。

六、核心知识点总结

  1. 交换机的核心职责:转发消息,不存储,路由规则由类型决定;
  2. Fanout交换机特点
    • 广播消息到所有绑定的队列,无视路由键;
    • 适合"一对多"的发布订阅场景,转发效率最高;
  3. 临时队列设计
    • 随机名称+独占(exclusive=true)+自动删除(autoDelete=true);
    • 避免手动创建队列的麻烦,适合临时消费场景;
  4. 消息不丢失的前提:必须有队列与交换机绑定,且路由规则匹配。

觉得有用请点赞收藏!
如果有相关问题,欢迎评论区留言讨论~

http://www.lryc.cn/news/610687.html

相关文章:

  • 华为云云产品的发展趋势:技术创新驱动数字化未来
  • 查看部署在K8S服务的资源使用情况
  • 蓝桥杯----DS1302实时时钟
  • Could not load the Qt platform plugin “xcb“ in “无法调试与显示Opencv
  • 【升级打怪实录】uniapp - android 静态声明权限和动态请求权限的区别
  • AI+OA原生应用 麦当秀AIPPT
  • 用 PyTorch 实现一个简单的神经网络:从数据到预测
  • lesson32:Pygame模块详解:从入门到实战的2D游戏开发指南
  • 阿里云招Java研发咯
  • day 46 神经网络-简版
  • 从零用java实现小红书springboot_vue_uniapp(15)评论和im添加图片
  • vue和react的框架原理
  • Elasticsearch向量库
  • React18 严格模式下的双重渲染之谜
  • 使用maven-shade-plugin解决es跨版本冲突
  • DHTMLX重磅发布React Scheduler组件,赋能日程管理开发!
  • PDF 文本提取技术深度对比:基于规则与基于模型的两种实现
  • 数学建模-线性规划。
  • 2025国赛数学建模C题详细思路模型代码获取,备战国赛算法解析——层次分析法
  • Java+Redis+SpringBoot定时器-定时发布商品
  • UNet改进(30):SageAttention在UNet中的4-Bit量化实现详解
  • 多参数状态监测集成终端设备怎么选
  • 日常反思总结2025.8.5
  • 2025金九银十Java后端面试攻略
  • 关于为什么ctrl c退不出来SecureCRT命令行的原因及其解决方法:
  • 变频器实习DAY21 区分BU和SUB 区分BJT和MOS 体二极管
  • SAP-ABAP:SAP接口全生命周期核心规范-开发运维注意事项
  • 第十七天:原码、反码、补码与位运算
  • 【Unity笔记】Unity TextMeshPro 字体显示为方块的终极解决方案(含中文、特殊字符支持)
  • GitLab:一站式 DevOps 平台的全方位解析