当前位置: 首页 > news >正文

RabbitMQ消息模型之Routing-Direct

Routing Direct

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在向 Exchange发送消息时,也必须指定消息的RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的Routing key完全一致,才会接收到消息

image-20191126220145375

  • P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给与routing key完全匹配的队列。
  • C1:消费者,其所在队列指定了需要routing key 为 error 的消息。
  • C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息。

创建生产者

public class MyProducer {@Testpublic void test() throws Exception {// 交换机String exchange = "logs_direct";// 创建工厂ConnectionFactory factory = new ConnectionFactory();factory.setVirtualHost("/");factory.setHost("xuewei.world");factory.setUsername("xuewei");factory.setPassword("123456");factory.setPort(5672);// 创建连接和通道Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明交换机channel.exchangeDeclare(exchange, "direct");for (int i = 0; i < 3; i++) {// 发布消息channel.basicPublish(exchange, "DEBUG", null, ("DEBUG LOG -> " + i).getBytes());channel.basicPublish(exchange, "INFO", null, ("INFO LOG -> " + i).getBytes());channel.basicPublish(exchange, "WARN", null, ("WARN LOG -> " + i).getBytes());channel.basicPublish(exchange, "ERROR", null, ("ERROR LOG -> " + i).getBytes());}}
}

创建消费者1

public class MyConsumer1 {public static void main(String[] args) throws Exception {// 指定交换机String exchange = "logs_direct";// 创建工厂ConnectionFactory factory = new ConnectionFactory();factory.setVirtualHost("/");factory.setHost("xuewei.world");factory.setUsername("xuewei");factory.setPassword("123456");factory.setPort(5672);// 创建连接和通道Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 绑定交换机channel.exchangeDeclare(exchange, "direct");// 创建临时队列String queue = channel.queueDeclare().getQueue();// 将临时队列绑定exchangechannel.queueBind(queue, exchange, "WARN");channel.queueBind(queue, exchange, "ERROR");// 处理消息channel.basicConsume(queue, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者1: " + new String(body));// TODO 业务处理}});}
}

创建消费者2

public class MyConsumer2 {public static void main(String[] args) throws Exception {// 指定交换机String exchange = "logs_direct";// 创建工厂ConnectionFactory factory = new ConnectionFactory();factory.setVirtualHost("/");factory.setHost("xuewei.world");factory.setUsername("xuewei");factory.setPassword("123456");factory.setPort(5672);// 创建连接和通道Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 绑定交换机channel.exchangeDeclare(exchange, "direct");// 创建临时队列String queue = channel.queueDeclare().getQueue();// 将临时队列绑定exchangechannel.queueBind(queue, exchange, "DEBUG");channel.queueBind(queue, exchange, "INFO");// 处理消息channel.basicConsume(queue, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者2: " + new String(body));// TODO 业务处理}});}
}

image-20220526182028082

http://www.lryc.cn/news/248815.html

相关文章:

  • Harmony 应用开发之size 脚本
  • 商家门店小程序怎么做?门店小程序的优势和好处
  • 什么是灯塔工厂?灯塔工厂的作用?
  • 【GEO-AI】SAM-Geo库(segment-geospatial)入门教程
  • ESP32-Web-Server 实战编程-使用文件系统建立强大的 web 系统
  • kubeadm快速搭建k8s高可用集群
  • GoLong的学习之路,进阶,Redis
  • Linux重置MySql密码(简洁版)
  • Ubuntu部署jmeter与ant
  • 如何使用 RestTemplate 进行 Spring Boot 微服务通信示例?
  • 新开普掌上校园服务管理平台service.action RCE漏洞复现 [附POC]
  • 滤波器、卷积核与内核的关系
  • 沉默是金,寡言为贵
  • 【网络奇遇之旅】:那年我与计算机网络的初相遇
  • 量化误差的测量
  • 8年测试工程师分享,我是怎么开展性能测试的(基础篇)
  • 微服务API网关Spring Cloud Gateway实战
  • uniapp打包ios有时间 uniapp打包次数
  • 【笔记+代码】JDK动态代理理解
  • Java八股文面试全套真题【含答案】-Vue篇
  • 介绍比特币上的 sCrypt 开发平台
  • 什么是路由抖动?该如何控制
  • 2023SICTF-web-白猫-RCE
  • 1.用数组输出0-9
  • Selenium 元素不能定位总结
  • 1-2 非阻塞延时实现LED闪烁功能(累计定时中断次数)--多路软件定时器的功能实现
  • 数据类型及强制转换
  • Python----高阶函数
  • Unity地面交互效果——6、地形动态顶点置换和曲面细分
  • Linux系统服务之一次性服务(2)