当前位置: 首页 > news >正文

RabbitMQ-Exchanges交换机

  一、介绍    

        RabbitMQ消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至不知道这些消息传递到了哪些队列中。

       相反,生产者只能将消息发送到交换机,交换机工作的内容非常简单,一方面他接受来自生产者的消息,另一方面他将他们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们放到许多队列还是说应该丢弃他们。这就由交换机来决定。

  二、类型
         1、类型

        总共有以下类型:直接(direct)[路由],主题(topic),标题(headers),扇出(fanout)[发布订阅],

          默认类型[无名类型] 通过("")进行标识

 channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes());

          第一个参数是交换机名称,空字符串表示默认或无名的交换机;消息能路由发送到队列中,其实是由routingKey(bindingKey)绑定key指定的,如果它存在的话。

      2、 临时队列

       每当我们连接到Rabbit时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称的队列,或者能让服务器为我们选择一个随机队列名称。其次一旦我们断开了消费者连接,队列将被自动删除。

       创建临时队列的方式如下

 String queueName = channel.queueDeclare().getQueue();
     3、绑定(bingings)

       binding其实时exchange和queue之间的桥梁,他告诉我们exchange和哪个队列进行了绑定关系

    4、fanout

        他是将接收到的所有消息广播到他知道的所有队列中

        消费者,另一个复制即可

public class ReceiveLogs01 {//交换机的名称public static final String EXCHANGE_NAME = "logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();//声明一个交换机channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//声明一个队列  临时队列/*** 队列的名称是随机的* 当消费者断开与队列的连接的时候,队列就自动删除*/String queueName = channel.queueDeclare().getQueue();/*** 绑定队列与交换机*/channel.queueBind(queueName,EXCHANGE_NAME,"");System.out.println("等待接收消息,把接收到的消息打印在屏幕上......");//接收消息DeliverCallback deliverCallback = (consumerTag,message) ->{System.out.println("01控制台打印接收到的消息:"+new String(message.getBody()));};CancelCallback cancelCallback = (consumerTag) ->{};channel.basicConsume(queueName,true,deliverCallback,cancelCallback);}
}

   生产者

public class EmitLog {//交换机的名称public static final String EXCHANGE_NAME = "logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();//声明交换机channel.exchangeDeclare(EXCHANGE_NAME,"fanout");Scanner scanner = new Scanner(System.in);while (scanner.hasNext()){String message = scanner.next();channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());System.out.println("生产者发出消息:"+message);}}
}

     结果

   5、direct

      消息只去到他绑定的routingKey队列中,支持多重绑定,当exchange的绑定类型是direct,但是他绑定的多个队列的key如果都相同,在这种情况下虽然绑定类型是direct但是他表现的就和fanout有点类似了。

     生产者

public class DirectLogs {//交换机的名称public static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();//声明交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);Scanner scanner = new Scanner(System.in);while (scanner.hasNext()){String message = scanner.next();channel.basicPublish(EXCHANGE_NAME,"warning",null,message.getBytes());System.out.println("生产者发出消息:"+message);}}
}

   消费者1

public class ReceiveLogs01 {//交换机名称public static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();//声明一个交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//声明一个队列channel.queueDeclare("console",false,false,false,null);//绑定队列与交换机channel.queueBind("console",EXCHANGE_NAME,"info");channel.queueBind("console",EXCHANGE_NAME,"warning");DeliverCallback deliverCallback = (consumerTag,message) ->{System.out.println("direct01控制台打印接收到的消息:"+new String(message.getBody()));};CancelCallback cancelCallback = (consumerTag) ->{};channel.basicConsume("console",true,deliverCallback,cancelCallback);}
}

  消费者2

public class ReceiveLogs02 {//交换机名称public static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();//声明一个交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//声明一个队列channel.queueDeclare("disk",false,false,false,null);//绑定队列与交换机channel.queueBind("disk",EXCHANGE_NAME,"error");DeliverCallback deliverCallback = (consumerTag,message) ->{System.out.println("direct02控制台打印接收到的消息:"+new String(message.getBody()));};CancelCallback cancelCallback = (consumerTag) ->{};channel.basicConsume("disk",true,deliverCallback,cancelCallback);}
}
      5、topic

           发送到类型是topic交换机的消息的routing_key不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说“stock.usd.nyse”,"nyse.vmw","quick.orange.rabbit"这种类型的。但是这个单词列表最多不能超过255个字节。【* 可以代替一个单词;#可以替代零个或多个单词】

          例如Q1->绑定的是orange带三个单词的字符串(*.orange.*)

                 Q2->绑定的是最后一个是rabbit的3个单词(*.*.rabbit)

                          第一个单词是lazy的多个单词(lazy.#)

         当一个队列绑定键是#,那么这个队列将接收所有数据,有点像fanout;如果队列绑定键当中没有#h和*出现,那么该队列绑定类型就是direct了。

        生产者

public class EmitLogTopic {//交换机名称public static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();Map<String, String> bindingKeyMap = new HashMap<>();bindingKeyMap.put("quick.orange.rabbit","被队列Q1Q2接收到");bindingKeyMap.put("lazy.orange.eleplant","被队列Q1Q2接收到");bindingKeyMap.put("quick.orange.fox","被队列Q1接收到");bindingKeyMap.put("lazy.brown.fox","被队列Q2接收到");bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列Q2接收一次");bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配Q2");for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {String routingKey = bindingKeyEntry.getKey();String message = bindingKeyEntry.getValue();channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes());System.out.println("生产者发出消息:"+message);}}
}

  消费者1

public class ReceiveLogsTopic01 {//交换机名称public static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();//声明交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);//声明队列String queueName = "Q1";channel.queueDeclare(queueName,false,false,false,null);channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");System.out.println("Q1等待接收消息。。。。。。");//接收消息channel.basicConsume(queueName,true,(consumeTag,message)->{System.out.println(new String(message.getBody()));System.out.println(" 接收队列:"+queueName+" 绑定键:"+message.getEnvelope().getRoutingKey());},(message)->{});}
}

   消费者2

public class ReceiveLogsTopic02 {//交换机名称public static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtil.getChannel();//声明交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);//声明队列String queueName = "Q2";channel.queueDeclare(queueName,false,false,false,null);channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");System.out.println("Q2等待接收消息。。。。。。");//接收消息channel.basicConsume(queueName,true,(consumeTag,message)->{System.out.println(new String(message.getBody()));System.out.println(" 接收队列:"+queueName+" 绑定键:"+message.getEnvelope().getRoutingKey());},(message)->{});}
}

http://www.lryc.cn/news/3219.html

相关文章:

  • 离散数学 课时二 命题逻辑等值演算
  • Debezium系列之:事件扁平化转换SMT,简化debezium数据格式,为数据添加head,为值添加键值对
  • 内网渗透(十八)之Windows协议认证和密码抓取-本地认证(NTML哈希和LM哈希)
  • Portraiture全新4.0最新版人像磨皮插件更新内容
  • 前端也能悄悄对视频截图?js实现对视频按帧缓存
  • TCP、UDP网络编程面试题
  • 用网络调试助手测试PLC-Reocrder收听模式的过程
  • 牛客小白月赛66
  • 加载sklearn新闻数据集出错 fetch_20newsgroups() HTTPError: HTTP Error 403: Forbidden解决方案
  • 图解LeetCode——剑指 Offer 53 - I. 在排序数组中查找数字 I
  • python 实现热门音乐分析 附代码+数据 +论文
  • 【2335. 装满杯子需要的最短总时长】
  • 再不跳槽,就晚了
  • Java 内存结构解密
  • ROS小车研究笔记2/11/2023:使用ssh远程登录小车
  • koa ts kick off 搭建项目的基本架子
  • h2database源码解析-查询优化器原理
  • 2月11日,30秒知全网,精选7个热点
  • vue组件的构成 <template> <script> <style>节点的使用 <
  • windows + vscode + rust
  • 二十九、异常处理
  • RTOS之二环境搭建初识RTOS
  • 【Java】 JAVA Notes
  • Java笔记-volatile和AtomicInteger
  • [标准库]STM32F103R8T6 高级定时器--PWM输出和带死区互补PWM输出
  • Camtasia2023最新版电脑视频录屏记录编辑软件
  • 管理用户安全性
  • 分享113个JS菜单导航,总有一款适合您
  • RuoYi-Cloud 部署
  • DockerFile文件详解