当前位置: 首页 > news >正文

RabbitMq(具体怎么用,看这一篇即可)

RabbitMq汇总

  • 1.RabbitMq的传统实现方式
  • 2.SpringAMQP简化RabbitMq开发
    • 2.1 基本消息队列(BasicQueue)
    • 2.2 工作消息队列(WorkQueue)
    • 2.3 发布订阅 -- 广播(Fanout)
    • 2.4 发布订阅 -- 路由(Direct)
    • 2.5 发布订阅 -- 主题(Topic)
  • 2.SpringAMQP声明交换机和队列
    • 2.1 使用bean的方式声明交换机和队列
    • 2.2 使用注解的方式声明交换机和队列

1.RabbitMq的传统实现方式

动手实现一个简单的消息队列
在这里插入图片描述

无论时发布消息还是消费消息,都要建立连接, 所以我们可以将这个步骤抽取出来

public class ConnectionUtil {/*** 建立与RabbitMQ的连接* @return* @throws Exception*/public static Connection getConnection() throws Exception {// 定义连接工厂ConnectionFactory factory = new ConnectionFactory();// 设置服务地址factory.setHost("192.168.202.128");// 端口factory.setPort(5672);// 设置账号信息,用户名、密码、vhostfactory.setVirtualHost("/");factory.setUsername("itcast");factory.setPassword("123321");// 通过工厂获取连接Connection connection = factory.newConnection();return connection;}
}

一、发布消息

  1. 建立连接
  2. 创建通道
  3. 创建队列
  4. 发送消息
  5. 关闭通道和连接
public class PublisherTest {@Testpublic void testSendMessage() throws IOException, TimeoutException {// 1.建立连接Connection connection = ConnectionUtil.getConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.发送消息String message = "hello, rabbitmq!";channel.basicPublish("", queueName, null, message.getBytes());System.out.println("发送消息成功:【" + message + "】");// 5.关闭通道和连接channel.close();connection.close();}
}

二、订阅消息

  1. 建立连接
  2. 创建通道
  3. 创建队列
  4. 订阅消息
public class ConsumerTest {public static void main(String[] args) throws IOException, TimeoutException {// 1.建立连接Connection connection = ConnectionUtil.getConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.订阅消息channel.basicConsume(queueName, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {// 5.处理消息String message = new String(body);System.out.println("接收到消息:【" + message + "】");}});System.out.println("等待接收消息。。。。");}
}

2.SpringAMQP简化RabbitMq开发

一、引入依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

二、在发布者消费者两端,都要配置MQ地址
SpringAMQP提供了配置来简化手动创建连接这一复杂的过程

spring:rabbitmq:host: 192.168.202.128 # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机username: itcast # 用户名password: 123321 # 密码

三、简化发送消息
SpringAMQP提供了RabbitTemplate类来简化发送消息的步骤

@Autowired
private RabbitTemplate rabbitTemplate;

四、简化订阅消息
SpringAMQP提供了@RabbitListener注解来简化订阅消息的步骤

@RabbitListener(queues = "simple.queue")
public void listenMessage(String msg) throws InterruptedException {}

2.1 基本消息队列(BasicQueue)

最基本的队列模型:一个生产者发送消息到一个队列,一个消费者从队列中取消息

在这里插入图片描述

实际开发中,我们通常事先在rabbitMq界面创建好队列,然后只要记住队列的名称

一、发布消息
  注入RabbitTemplate来简化操作, RabbitTemplate在执行convertAndSend方法时,会自动开启通道, 往指定名称的队列中发送消息, 并在方法结束后关闭连接和通道

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue() {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, spring amqp!";// 发送消息rabbitTemplate.convertAndSend(queueName, message);}
}

二、订阅消息
使用@RabbitListener注解实现对队列的订阅

@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("spring 消费者接收到消息:【" + msg + "】");}
}

总结:
基本消息队列模型中,在生产者和消费者之间,只有队列这一个媒介

  • 生产者只要知道往哪个队列发送消息
  • 消费者只要知道订阅哪个队列中的消息

2.2 工作消息队列(WorkQueue)

在基本消息队列(BasicQueue)中,我们只有一个消费者, 在工作消息队列模型下,我们可以设置多个消费者同时订阅一个队列
在这里插入图片描述
一、发布消息
  实现和基本消息队列时是一样的,只要知道往哪个队列发送消息, 这里我们演示发送多条信息

@Test
public void testWorkQueue() throws InterruptedException {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, message_";for (int i = 0; i < 20; i++) {// 发送消息rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}
}

二、订阅消息

@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20);
}@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(200);
}

总结:
工作消息队列模型中,在生产者和消费者之间,只有队列这一个媒介(跟基本模型一样)

  • 生产者只要知道往哪个队列发送消息
  • 消费者只要知道订阅哪个队列中的消息(同一个队列)

2.3 发布订阅 – 广播(Fanout)

广播模式下,引入了一个新的概念:交换机

  交换机是一个消息的中转站, 它可以实现广播、定向、通配符等不同形式的消息递交方式; 交换机只负责递交消息, 并不具备存储消息的能力, 消息最终存储媒介, 依旧是队列, 因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

在这里插入图片描述

一、发布消息
  现在就不是往队列中发消息了, 发送者只要知道往哪个交换机发送消息, 不用去关心交换机将消息转发给哪些队列

@Test
public void testFanoutExchange() {// 队列名称String exchangeName = "itcast.fanout";// 消息String message = "hello, everyone!";rabbitTemplate.convertAndSend(exchangeName, "", message);
}

  如果你想了解交换机会往哪些队列中发送消息,可以登录rabbitMq的界面,查看交换机详情, 里面会详细罗列当前交换机绑定的队列
在这里插入图片描述

二、订阅消息
消息的最终存储媒介,依旧是队列.消费者只要知道订阅哪个队列中的消息

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}

总结:

  • 生产者只要知道往哪个交换机发送消息, 不用去关心交换机将消息转发给哪些队列
  • 消费者只要知道订阅哪个队列中的消息

2.4 发布订阅 – 路由(Direct)

  之前我们学习了广播(Fanout), 在广播模式下, 只要往交换机发送消息, 那么交换机会将消息转发给所有绑定的队列, 而路由(Direct)又称为定向
  交换机绑定了A、B两个队列,我们往交换机中发消息时, 最终希望交换机只把消息转发给B队列,这个过程即路由
在这里插入图片描述

一、发布消息
往交换机发消息的时候,需要指定交换机转发给哪个队列(拥有通用routingKey的队列), 此处我们设置的routingKey为red

@Test
public void testSendDirectExchange() {// 交换机名称String exchangeName = "itcast.direct";// 消息String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "red", message);
}

我们发现这个交换机绑定了两个队列, 每个队列都设置了两个routingKey, 其中都有red, 所以这两个队列都能收到消息
在这里插入图片描述

二、订阅消息

@RabbitListener(queues =  "direct.queue1")
public void listenDirectQueue1(String msg){System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(queues =  "direct.queue2")
public void listenDirectQueue2(String msg){System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}

总结:

  • 生产者不仅要知道往哪个交换机发消息, 同时还要通过路由秘钥(routingKey)指定交换机将消息转发给哪些队列(拥有同样的路由秘钥,即可转发)
  • 消费者只要知道订阅哪个队列中的消息(一直都没变过)

2.5 发布订阅 – 主题(Topic)

  主题(Topic)是对路由(Direct)的一种补充, 我们希望某一个组的队列都能收到消息, 但是在rabbitMq中无法将队列编组, 有了主题(Topic)后,我们可以将这一组的队列的routingKey都设置为china.#, 那只会Routingkey只要符合通配符规则, 例如china.jiangsuchina.jiangsu.suzhou 这个组都可以接收到消息
在这里插入图片描述

  • #:匹配一个或多个词
    item.#:能够匹配item.spu.insert 或者 item.spu
  • *:匹配不多不少恰好1个词
    item.*:只能匹配item.spu

一、发布消息
跟路由时一样, 交换机拿到routingKey后会去匹配对应的队列

@Test
public void testSendTopicExchange() {// 交换机名称String exchangeName = "itcast.topic";// 消息String message = "喜报!孙悟空大战哥斯拉,胜!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

当下判断两个队列都符合routingKey
在这里插入图片描述

二、订阅消息

@RabbitListener(queues =  "topic.queue1")
public void listenTopicQueue1(String msg){System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}@RabbitListener(queues =  "topic.queue2")
public void listenTopicQueue2(String msg){System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
}

总结:

  • 生产者不仅要知道往哪个交换机发消息, 同时还要通过路由秘钥(routingKey)指定交换机将消息转发给哪些队列(拥有同样的路由秘钥,即可转发)
  • 交换机会根据routingKey来匹配符合条件的队列(这个过程是交换机来完成,所以我们不用关心)
  • 消费者只要知道订阅哪个队列中的消息(一直都没变过)

2.SpringAMQP声明交换机和队列

2.1 使用bean的方式声明交换机和队列

启动项目后, 就会在rabbitMq中创建交换机和队列, 包括两者的绑定关系

package cn.itcast.mq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfig {/*** 声明交换机* @return Fanout类型交换机*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("itcast.fanout");}/*** 第1个队列*/@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/*** 第2个队列*/@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}

2.2 使用注解的方式声明交换机和队列

Direct定向
基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。
在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}

Topic主题

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "china.#"
))
public void listenTopicQueue1(String msg){System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "#.news"
))
public void listenTopicQueue2(String msg){System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
}
http://www.lryc.cn/news/27006.html

相关文章:

  • 第九届蓝桥杯省赛 C++ A/B组 - 全球变暖
  • Leetcode.2359 找到离给定两个节点最近的节点
  • DCDC/LDO Auto-Discharge
  • linux 中的log
  • 基于ubuntu的STM32嵌入式软件开发(四)——应用软件工程的修改、Makefile及编译脚本的编写
  • MQTT协议分析
  • 基于树莓派4B设计的音视频播放器(从0开始)
  • MSF手机渗透实验(未成功)(CVE-2019-2215 Binder UA)
  • 系列十二、MySQL管理
  • [游戏架构] 有限状态机的实际应用
  • 【站外SEO】如何利用外部链接来提高你的网站排名
  • OSCP-课外4(修复web访问、Mysql UDF提权)
  • 深信服面经---云计算方向(附问题知识点解析)
  • MySQL面试题-基础篇
  • 高通平台开发系列讲解(摄像头篇)QCM6490 上摄像头驱动开发
  • MOV压敏电阻应用推荐及选型要点说明
  • Pytorch学习笔记(8):正则化(L1、L2、Dropout)与归一化(BN、LN、IN、GN)
  • Azure OpenAI 官方指南 01|GPT-3 的原理揭秘与微调技巧
  • 神垕古镇景区三方背后的博弈,争夺许昌第一家5A景区主导权
  • 【C++】vector的模拟实现(SGI版本)
  • 【9】SCI易中期刊推荐——工程技术-计算机:软件工程(中科院4区)
  • SOTA!目标检测开源框架YOLOv6 3.0版本来啦
  • svn使用
  • LeetCode 1487. Making File Names Unique【字符串,哈希表】中等
  • Java——电话号码的字母组合
  • LDR6028市面上最具有性价比的Type-C OTG音频协议方案
  • SpringMVC-0228
  • 【测试岗】那个准点下班的人,比我先升职了...
  • 【C++】适配器模式 -- stack/queue/dqueue
  • sql server 分页查询