当前位置: 首页 > news >正文

RabbitMQ第二章(RocketMQ的五大工作模式)

文章目录

  • 一、simple模式(即最简单的收发模式)
  • 二、work工作模式(资源的竞争)
  • 三、Exchange发布订阅(交换机)
    • 3.1 Fanout交换机
    • 3.2 Direct交换机
    • 3.3 Topic交换机

一、simple模式(即最简单的收发模式)

1.消息产生消息,将消息放入队列
2.消息的消费者(consumer) 监听 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删
除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失,这里可以设置成手动的
ack,但如果设置成手动ack,处理完后要及时发送ack消息给队列,否则会造成内存溢出)
在这里插入图片描述
实现步骤如下:
(1),引入相关的依赖

<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--单元测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency>

(2),新建一个队列:simple.queue
在这里插入图片描述
(3),yml添加配置

spring:rabbitmq:host: 192.168.150.101 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码

(4),然后在publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:

@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue() {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, spring amqp!";// 发送消息rabbitTemplate.convertAndSend(queueName, message);}

(5),消息接收

@Component
public class SpringRabbitListener {// 利用RabbitListener来声明要监听的队列信息// 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。// 可以看到方法体中接收的就是消息体的内容@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("spring 消费者接收到消息:【" + msg + "】");}
}

二、work工作模式(资源的竞争)

1.消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2同时监听同一个队列,消息被消费。C1
C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患:高并发情况下,默认会产生某一个消息
被多个消费者共同使用,可以设置一个开关(syncronize) 保证一条消息只能被一个消费者使用)。
在这里插入图片描述

(1),我们修改consumer服务的application.yml文件,添加配置:

spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

(2),在publisher服务中的SpringAmqpTest类中添加一个测试方法:

/*** workQueue* 向队列中不停发送消息,模拟消息堆积。*/
@Test
public void testWorkQueue() throws InterruptedException {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, message_";for (int i = 0; i < 50; i++) {// 发送消息,每20毫秒发送一次,相当于每秒发送50条消息rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}
}

(3),要模拟多个消费者绑定同一个队列,我们在consumer服务的SpringRabbitListener中添加2个新的方法:

@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20);
}@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(200);
}

三、Exchange发布订阅(交换机)

在这里插入图片描述
1、每个消费者监听自己的队列;
2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队
列都将接收到消息。

3.1 Fanout交换机

Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。
在广播模式下,消息发送流程是这样的:
在这里插入图片描述

  • 1) 可以有多个队列
  • 2) 每个队列都要绑定到Exchange(交换机)
  • 3) 生产者发送的消息,只能发送到交换机
  • 4) 交换机把消息发送给绑定过的所有队列
  • 5) 订阅队列的消费者都能拿到消息

(1),声明队列和交换机
在这里插入图片描述
(2),消息发送
@Test
public void testFanoutExchange() {
// 交换机名称
String exchangeName = “hmall.fanout”;
// 消息
String message = “hello, everyone!”;
rabbitTemplate.convertAndSend(exchangeName, “”, message);
}
(3),消息接收
@RabbitListener(queues = “fanout.queue1”)
public void listenFanoutQueue1(String msg) {
System.out.println(“消费者1接收到Fanout消息:【” + msg + “】”);
}

@RabbitListener(queues = “fanout.queue2”)
public void listenFanoutQueue2(String msg) {
System.out.println(“消费者2接收到Fanout消息:【” + msg + “】”);
}

3.2 Direct交换机

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
[图片]

(1),声明队列和交换机

  1. 声明一个名为hmall.direct的交换机
  2. 声明队列direct.queue1,绑定hmall.direct,bindingKey为blud和red
  3. 声明队列direct.queue2,绑定hmall.direct,bindingKey为yellow和red
  4. 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
  5. 在publisher中编写测试方法,向hmall.direct发送消息
    (2),消息接收
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

(3),消息发送

@Test
public void testSendDirectExchange() {// 交换机名称String exchangeName = "hmall.direct";// 消息String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "red", message);
}

3.3 Topic交换机

Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。
只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!

BindingKey 一般都是有一个或多个单词组成,多个单词之间以.分割,例如: item.insert
在这里插入图片描述
(1),声明队列和交换机

  • topic.queue1:绑定的是china.# ,凡是以 china.开头的routing key 都会被匹配到,包括:
    • china.news
    • china.weather
  • topic.queue2:绑定的是#.news ,凡是以 .news结尾的 routing key 都会被匹配。包括:
    • china.news
    • japan.news
      (2),消息发送
/*** topicExchange*/
@Test
public void testSendTopicExchange() {// 交换机名称String exchangeName = "hmall.topic";// 消息String message = "喜报!孙悟空大战哥斯拉,胜!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

(3),消息接收

@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg){System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg){System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}
http://www.lryc.cn/news/582946.html

相关文章:

  • 二进制安全-汇编语言-04-第一个程序
  • 为什么elementui的<el-table-column label=“名称“ prop=“name“ label不用写成:label
  • Docker快速部署Hive服务
  • C++ 遍历可变参数的几种方法
  • 零基础|宝塔面板|frp内网穿透|esp32cam远程访问|微信小程序
  • 链表算法之【移除链表元素】
  • 【深度学习新浪潮】什么是上下文长度?
  • C++异步编程入门
  • 猿人学js逆向比赛第一届第十五题
  • Java面试基础:概念
  • 部署并运行Vim/Vmamba在ImageNet上的训练与测试
  • JavaScript之数组方法详解
  • (C++)list列表相关基础用法(C++教程)(STL库基础教程)
  • HTTP/3.x协议详解:基于QUIC的下一代Web传输协议
  • 音频被动降噪技术
  • nng库使用
  • Android Handler机制与底层原理详解
  • Java 阻塞队列:7种类型全解析
  • 华为eNSP防火墙实验(包含详细步骤)
  • AR 双缝干涉实验亮相:创新科技实验范式,开拓 AR 技术新局​
  • Kafka多组消费:同一Topic,不同Group ID
  • 如何用Python编程计算权重?
  • 常见的网络攻击方式及防御措施
  • 分布式接口幂等性的演进和最佳实践,含springBoot 实现(Java版本)
  • 【c++学习记录】状态模式,实现一个登陆功能
  • 【ES实战】ES客户端线程量分析
  • 从 .proto 到 Python:使用 Protocol Buffers 的完整实践指南
  • 实战Linux进程状态观察:R、S、D、T、Z状态详解与实验模拟
  • 蓝桥杯 第十六届(2025)真题思路复盘解析
  • 50天50个小项目 (Vue3 + Tailwindcss V4) ✨ | StickyNavbar(粘性导航栏)