当前位置: 首页 > news >正文

RabbitMQ工作模式——Routing路由模式

1.Routing路由模式

在这里插入图片描述
Routing生产者代码

public class Producer_Routing {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost("172.16.98.133"); ip 默认值 localhostfactory.setPort(5672);//端口 默认值5672factory.setVirtualHost("/itcast");//虚拟机 默认值factory.setUsername("heima");//用户名 默认guestfactory.setPassword("heima");//密码 默认值 guest//3.创建连接 ConnectionConnection connection = factory.newConnection();//4.创建ChannelChannel channel = connection.creatChannel();/*exchange(String exchange,String type,boolean durable,boolean autoDelete,boolean internal,Map<String,Object> arguments)参数:1.exchange:交换机名称2.type:交换机类型DIRECT("direct"),:定向FANOUT("fanout"),:扇形(广播)发送消息到每一个与之绑定的队列TOPIC("topic"),:通配符方式HEADERS("headers");:参数匹配3.durable:是否持久化4.autoDelete:自动删除5.internal:内部使用。一般为false6.arguments:参数,一般设为null*///5.创建交换机String exchangeName = "test_direct";channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT,true,false,false,null);//6.创建队列String queue1Name = "test_direct_queue1";String queue2Name = "test_direct_queue2";channel.queueDeclare(queue1Name,true,false,false,null);channel.queueDeclare(queue2Name,true,false,false,null);//7.绑定队列和交换机/*queueBind(String queue,String exchange,String routingKey)参数:1.queue:队列名称2.exchange:交换机名称3.routingKey:路由键,绑定规则如果交换机的类型为:fanout,routingKey设置为空字符串*///队列1的绑定 errorchannel.queueBind(queue1Name,exchangeName,"error");//队列2的绑定 info error warningchannel.queueBind(queue2Name,exchangeName,"info");channel.queueBind(queue2Name,exchangeName,"error");channel.queueBind(queue2Name,exchangeName,"warning");//8.发送消息String body = "日志信息,张三调用了findAll方法...日志级别:info...";	channel.basicPublish(exchangeName,"info",null,body.getBytes());//9.释放资源channel.close();connection.close();}
}

Routing1消费者代码

public class Consumer_Routing1 {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost("172.16.98.133"); ip 默认值 localhostfactory.setPort(5672);//端口 默认值5672factory.setVirtualHost("/itcast");//虚拟机 默认值factory.setUsername("heima");//用户名 默认guestfactory.setPassword("heima");//密码 默认值 guest//3.创建连接 ConnectionConnection connection = factory.newConnection();//4.创建ChannelChannel channel = connection.creatChannel();String queue1Name = "test_direct_queue1";String queue2Name = "test_direct_queue2";/*basicConsume(String queue,boolean autoAck,Consumer callback)参数:1.queue:队列名称2.autoAck:是否自动确认3.callback:回调对象*///接收消息Consumer consumer = new DefaultConsumer(channel){/*回调方法,当收到消息后会自动执行该方法1.consumerTag:标识2.envelope:获取一些信息,交换机,路由key...3.properties:配置信息4.body:数据*/@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body){System.out.println("consumerTag" + consumerTag);System.out.println("Exchange" + envelope.getExchange());System.out.println("RoutingKey" + envelope.getRoutingKey());System.out.println("properties" + properties);System.out.println("body" + new String(body));System.out.println("将日志信息存储到数据库......");}};channel.basicConsume("queue2Name",true,consumer);//消费者不能关闭资源}
}

Routing2消费者代码

public class Consumer_Routing2 {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2.设置参数factory.setHost("172.16.98.133"); ip 默认值 localhostfactory.setPort(5672);//端口 默认值5672factory.setVirtualHost("/itcast");//虚拟机 默认值factory.setUsername("heima");//用户名 默认guestfactory.setPassword("heima");//密码 默认值 guest//3.创建连接 ConnectionConnection connection = factory.newConnection();//4.创建ChannelChannel channel = connection.creatChannel();String queue1Name = "test_direct_queue1";String queue2Name = "test_direct_queue2";/*basicConsume(String queue,boolean autoAck,Consumer callback)参数:1.queue:队列名称2.autoAck:是否自动确认3.callback:回调对象*///接收消息Consumer consumer = new DefaultConsumer(channel){/*回调方法,当收到消息后会自动执行该方法1.consumerTag:标识2.envelope:获取一些信息,交换机,路由key...3.properties:配置信息4.body:数据*/@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body){System.out.println("consumerTag" + consumerTag);System.out.println("Exchange" + envelope.getExchange());System.out.println("RoutingKey" + envelope.getRoutingKey());System.out.println("properties" + properties);System.out.println("body" + new String(body));System.out.println("将日志信息打印到控制台......");}};channel.basicConsume("queue1Name",true,consumer);//消费者不能关闭资源}
}

在这里插入图片描述

http://www.lryc.cn/news/174151.html

相关文章:

  • Python字典的增删改查以及嵌套
  • 【淘宝开店】新手入门开网店教程
  • 计网第五章(运输层)(五)(TCP拥塞控制)
  • windows/ubuntu怎么修改hosts文件
  • (日积月累版)大数据基础知识点1-关系型数据库
  • 【开心消消乐】python实现-附ChatGPT解析
  • springBoot源码汇总
  • 代码随想录二刷day39
  • Spring面试题7:面试官:Spring是如何进行异常处理的呢?
  • 华为云云耀云服务器L实例评测|搭建您的私人影院网站
  • Solidity 小白教程:22. Call
  • mySQL 安装
  • 涛然自得周刊(第 10 期):搬到海岛生活是一种什么体验
  • pycharm中恢复原始界面布局_常用快捷键_常用设置
  • docker(7):实战--安装nginx并实现反向代理
  • day-61 代码随想录算法训练营(19)一刷完结撒花
  • C#中对泛型集合元素使用List.Sort()方法排序
  • 【项目】在线音乐播放器测试报告
  • [C++ 网络协议] 多线程服务器端
  • 宝塔部署node后使用pm2管理上传文件路径失效问题
  • postman-pre-request-scripts使用
  • uniapp Echart X轴Y轴文字被遮挡怎么办,或未能铺满整个容器
  • 学习路之PHP--laravel DingoApi
  • 项目篇——java文档搜索引擎
  • 5.2 磁盘CRC32完整性检测
  • 企业内部安全与风控管理图解
  • vscode基于cmake安装opencv库
  • Web 器学习笔记(基础)
  • uniapp中vue3使用uni.createSelectorQuery().in(this)报错
  • k8s-部署