当前位置: 首页 > news >正文

RabbitMQ---订阅模型-Fanout

1、 订阅模型-Fanout

Fanout,也称为广播。
流程图:
在这里插入图片描述

在广播模式下,消息发送流程是这样的:
1) 可以有多个消费者
2) 每个消费者有自己的queue(队列)
3) 每个队列都要绑定到Exchange(交换机)
4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
5) 交换机把消息发送给绑定过的所有队列
6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

1.1、生产者

两个变化:
1) 声明Exchange,不再声明Queue
2) 发送消息到Exchange,不再发送到Queue

public class Send {private final static String EXCHANGE_NAME = "fanout_exchange_test";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明exchange,指定类型为fanoutchannel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 消息内容String message = "Hello everyone";// 发布消息到Exchangechannel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println(" [生产者] Sent '" + message + "'");channel.close();connection.close();}
}

1.2、消费者1

public class Recv {private final static String QUEUE_NAME = "fanout_exchange_queue_1";private final static String EXCHANGE_NAME = "fanout_exchange_test";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [消费者1] received : " + msg + "!");}};// 监听队列,自动返回完成channel.basicConsume(QUEUE_NAME, true, consumer);}
}

1.3、 消费者2

public class Recv2 {private final static String QUEUE_NAME = "fanout_exchange_queue_2";private final static String EXCHANGE_NAME = "fanout_exchange_test";public static void main(String[] argv) throws Exception {// 获取到连接Connection connection = ConnectionUtil.getConnection();// 获取通道Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");// 定义队列的消费者DefaultConsumer consumer = new DefaultConsumer(channel) {// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg = new String(body);System.out.println(" [消费者2] received : " + msg + "!");}};// 监听队列,手动返回完成channel.basicConsume(QUEUE_NAME, true, consumer);}
}

1.4、 测试

我们应该先启动生产者,否则,先启动消费者时,由于要绑定交换机,此时,交换机并不存在所以会报错。
我们运行两个消费者,然后发送1条消息:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

http://www.lryc.cn/news/139661.html

相关文章:

  • nginx 中新增url请求参数
  • [系统] 电脑突然变卡 / 电脑突然** / 各种突发情况解决思路
  • 改进YOLO系列:8.添加SimAM注意力机制
  • Go与Rust的对比与分析
  • SpingMVC拦截器-异常处理的思路,用户体验不好的地方
  • 【C++设计模式】用动画片《少年骇客》(Ben10)来解释策略模式
  • 软件测试及数据分析处理实训室建设方案
  • 切换Debian的crontab的nano编辑器
  • Spring Cloud Alibaba-Sentinel--服务容错
  • Stable Diffusion 系列教程 | 如何获得更高清优质的AI绘画
  • 食品饮料制造行业如何实现数字化转型和工业4.0
  • UE学习记录03----UE5.2 使用MVVM示例
  • 代码审计-审计工具介绍-DAST+SAST+IAST项目
  • 网络安全应急响应预案培训
  • STM32F4X 定时器中断
  • MongoDB +Dataframe+excel透视表
  • PostgreSQL日期相关
  • C++编程法则365天一天一条(8)const_cast去除cv限定
  • 某网站DES加密逆向分析实战
  • 面向对象的理解
  • java ssl加密发送邮件
  • SpringBoot-yml配置文件的使用与优势
  • Layer Normalization(层规范化)
  • redisson参数配置
  • 【基于Arduino的仿生蚂蚁机器人】
  • angular12里面FormGroup做多个项目的相关check
  • TypeScript 的发展与基本语法
  • macOS - 上编译运行 risc-v (spike)
  • Linux--线程地址空间
  • 华为OD机试 - 最佳植树距离 - 二分查找(Java 2023 B卷 100分)