当前位置: 首页 > news >正文

Rabbitmq的几种工作模式

工具类

public class RabbitMQConnection {public static Connection getConnection() throws Exception{//1.创建connectionFactoryConnectionFactory connectionFactory = new ConnectionFactory();//2.配置HostconnectionFactory.setHost("127.0.0.1");//3.设置PortconnectionFactory.setPort(5672);//4.设置账户和密码connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//5.设置VirtualHostconnectionFactory.setVirtualHost("0517");return connectionFactory.newConnection();}
}

点对点(简单)的队列

图解:

        

生产者代码
public class Producer {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setUsername("guest");factory.setPassword("guest");//channel 实现了自动 close 接口 自动关闭 不需要显示关闭try(Connection connection = factory.newConnection(); Channel channel =connection.createChannel()) {/*** 生成一个队列* 1.队列名称* 2.队列里面的消息是否持久化 默认消息存储在内存中* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除* 5.其他参数*/channel.queueDeclare(QUEUE_NAME,false,false,false,null);String message="hello world";/*** 发送一个消息* 1.发送到那个交换机* 2.路由的 key 是哪个* 3.其他的参数信息* 4.发送消息的消息体*/channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("消息发送完毕");}}
}
消费者代码
public class Consumer {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setUsername("guest");factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();System.out.println("等待接收消息....");//推送的消息如何进行消费的接口回调DeliverCallback deliverCallback=(consumerTag, delivery)->{String message= new String(delivery.getBody());System.out.println(message);};//取消消费的一个回调接口 如在消费的时候队列被删除掉了CancelCallback cancelCallback=(consumerTag)->{System.out.println("消息消费被中断");};/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答* 3.消费者未成功消费的回调*/channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}
重点解析:点对点没什么可说的,就是生产者产生消息给到消息队列,第一次已推送的形式给到消费者,之后就是消费者端主动的拉取,需要在生产端创建好队列或在图形化页面创建好队列

工作(公平性)队列模式

图解:

       

生产者代码
public class Task01 {private static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception {try(Channel channel=RabbitMqUtils.getChannel();) {channel.queueDeclare(QUEUE_NAME,false,false,false,null);//从控制台当中接受信息Scanner scanner = new Scanner(System.in);while (scanner.hasNext()){String message = scanner.next();channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("发送消息完成:"+message);}}}
}
消费者代码

消费者1:

public class Consumer1 {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//推送的消息如何进行消费的接口回调DeliverCallback deliverCallback=(consumerTag, delivery)->{String receivedMessage = new String(delivery.getBody());System.out.println("接收到消息:"+receivedMessage);};CancelCallback cancelCallback=(consumerTag)->{System.out.println(consumerTag+"消费者取消消费接口回调逻辑");};System.out.println("C1 消费者启动等待消费......");channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}

消费者2:

public class Consumer2 {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();//推送的消息如何进行消费的接口回调DeliverCallback deliverCallback=(consumerTag, delivery)->{String receivedMessage = new String(delivery.getBody());System.out.println("接收到消息:"+receivedMessage);};CancelCallback cancelCallback=(consumerTag)->{System.out.println(consumerTag+"消费者取消消费接口回调逻辑");};System.out.println("C2 消费者启动等待消费......");channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}
重点解析
        工作(公平性)队列模式,和点对点差不多,就是生产者将消息直接存放到队列中,然后队列默认采用轮询的形式选择消费者进行消费

        当然也可以设置channel.basicQos(i)的形式进行公平分发(谁处理快,谁做的多)

        这里公平的意思是谁做的多,谁处理的多,并不是平均分配的意思

发布订阅模式

图解:

       

生产者代码
public class ProducerFanout {//定义交换机名称private static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws  Exception{//创建连接Connection connection = RabbitMQConnection.getConnection();//创建通道Channel channel = connection.createChannel();//通道关联交换机(创建交换机)(fanout类型会自动创建)//channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);//channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true);String msg = "程子强你好";channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());channel.close();connection.close();}
}
消费者代码

消费者1:

public class MailConsumer {/*** 定义邮件队列*/private static final String QUEUE_NAME = "fanout_email_queue";/*** 定义交换机的名称*/private static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws  Exception{System.out.println("邮件消费者...");// 创建我们的连接Connection connection = RabbitMQConnection.getConnection();// 创建我们通道final Channel channel = connection.createChannel();// 关联队列消费者关联队列channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "UTF-8");System.out.println("邮件消费者获取消息:" + msg);}};// 开始监听消息 自动签收channel.basicConsume(QUEUE_NAME, true, defaultConsumer);}
}

消费者2:

public class SmsConsumer {/*** 定义短信队列*/private static final String QUEUE_NAME = "fanout_email_sms";/*** 定义交换机的名称*/private static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws Exception{System.out.println("短信消费者...");// 创建我们的连接Connection connection = RabbitMQConnection.getConnection();// 创建我们通道final Channel channel = connection.createChannel();// 关联队列消费者关联队列channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "UTF-8");System.out.println("短信消费者获取消息:" + msg);}};// 开始监听消息 自动签收channel.basicConsume(QUEUE_NAME, true, defaultConsumer);}
}
重点解析
       发布订阅模式,和前两种模式不同这里用到了一个fanout类型的交换机(具体交换机的类型和概念小伙伴们可以自行查阅下,这里主要讲工作模式),生产者将消息发送给这个交换机,这个交换机把消息发送给每一个和其绑定的队列(注意fanout类型的交换机不需要key所以生产者传递直接传""就好)

路由模式Routing

图解:

       

生产者代码
public class ReceiveLogsDirect {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws Exception {try (Connection connection = RabbitMQConnection.getConnection(); Channel channel =connection.createChannel()) {//创建交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,true);//创建多个 bindingKeyMap<String, String> bindingKeyMap = new HashMap<>();bindingKeyMap.put("info","普通 info 信息");bindingKeyMap.put("warning","警告 warning 信息");bindingKeyMap.put("error","错误 error 信息");//debug 没有消费这接收这个消息 所有就丢失了bindingKeyMap.put("debug","调试 debug 信息");for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){String bindingKey = bindingKeyEntry.getKey();String message = bindingKeyEntry.getValue();channel.basicPublish(EXCHANGE_NAME,bindingKey, null,message.getBytes("UTF-8"));System.out.println("生产者发出消息:" + message);}}}
}
消费者代码

消费者1:

public class ReceiveLogsDirect01 {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] args) throws  Exception{Connection connection = RabbitMQConnection.getConnection();Channel channel = connection.createChannel();//写不写都可以,如果代码创建在生产端写,如果是浏览器创建,就不需要写这段代码channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,true);String queueName = "disk";channel.queueBind(queueName, EXCHANGE_NAME, "error");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");message="接收绑定键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message;File file = new File("E:\\xxx\\rabbitmq_info.txt");//路径任意写FileUtils.writeStringToFile(file,message,"UTF-8");System.out.println("错误日志已经接收");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
}

消费者2:

public class ReceiveLogsDirect02 {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {Connection connection = RabbitMQConnection.getConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,true);String queueName = "console";channel.queueBind(queueName, EXCHANGE_NAME, "info");channel.queueBind(queueName, EXCHANGE_NAME, "warning");System.out.println("等待接收消息.....");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" 接收绑定键 :"+delivery.getEnvelope().getRoutingKey()+", 消息:"+message);};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
}
重点解析
       路由模式,用到了direct类型的交换机,简单讲就是队列通过key和交换机进行绑定,生产者那边传入的key和消息给交换机,如果该队列绑定的key与其传入的key相同则,交换机讲该消息传给对应的队列,一个队列可以绑定多个key

通配符模式Topics(主题)

图解:

       

生产者代码
public class ProducerTopic {/*** 定义交换机的名称*/private static final String EXCHANGE_NAME = "topic_exchange";public static void main(String[] args) throws Exception{//  创建ConnectionConnection connection = RabbitMQConnection.getConnection();// 创建ChannelChannel channel = connection.createChannel();// 通道关联交换机channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);String msg = "我是xxx";channel.basicPublish(EXCHANGE_NAME, "czq.hhh.aa", null, msg.getBytes());channel.close();connection.close();}
}
消费者代码

消费者1:

public class SmsConsumer {/*** 定义短信队列*/private static final String QUEUE_NAME = "topic_sms_queue";/*** 定义交换机的名称*/private static final String EXCHANGE_NAME = "topic_exchange";public static void main(String[] args) throws Exception{System.out.println("短信消费者...");//  创建ConnectionConnection connection = RabbitMQConnection.getConnection();// 创建ChannelChannel channel = connection.createChannel();// 关联队列消费者关联队列channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "czq.#");DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "UTF-8");System.out.println("短信消费者获取消息:" + msg);}};// 开始监听消息 自动签收channel.basicConsume(QUEUE_NAME, true, defaultConsumer);}
}

消费者2:

public class MailConsumer {/*** 定义邮件队列*/private static final String QUEUE_NAME = "topic_email_queue";/*** 定义交换机的名称*/private static final String EXCHANGE_NAME = "topic_exchange";public static void main(String[] args) throws Exception{System.out.println("邮件消费者...");//  创建ConnectionConnection connection = RabbitMQConnection.getConnection();// 创建ChannelChannel channel = connection.createChannel();// 关联队列消费者关联队列channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.boyatop.#");DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "UTF-8");System.out.println("邮件消费者获取消息:" + msg);}};// 开始监听消息 自动签收channel.basicConsume(QUEUE_NAME, true, defaultConsumer);}
}
重点解析
       通配符模式,用到了topic类型的交换机,简单讲与通配符模式原理大致,差别在于根据队列绑定的路由建模糊转发到具体的队列中存放。其中#号表示支持匹配多个词;*号表示只能匹配一个词,假如同一个队列与交换机直接设置的多个模糊的key都符合传入的,那么也只传送一次
http://www.lryc.cn/news/417604.html

相关文章:

  • 如何在 Debian 上安装运行极狐GitLab Runner?【二】
  • 简单的docker学习 第13章 CI/CD与Jenkins(下)
  • 基于STM32设计的智能鱼缸_带鱼儿数量视觉识别(华为云IOT)(202)
  • 立体连接模式下的传播与沟通:AI智能名片小程序的创新应用与深度剖析
  • 基于Python的Scrapy爬虫的个性化书籍推荐系统【Django框架、超详细系统设计原型】
  • 二叉树bst
  • elasticsearch的使用(二)
  • YOLOv8由pt文件中读取模型信息
  • js遍历效率
  • QModbus例程分析
  • Vue万字学习笔记(入门1)
  • Cesium手动建模模型用Cesiumlab转3D Tiles模型位置不对,调整模型位置至指定经纬度
  • 学习C语言第23天(程序环境和预处理)
  • Ubuntu22.04安装
  • 从入门到自动化:一篇文章掌握Python的80%
  • 开源的主流机器学习框架
  • RabbitMQ:发送者的可靠性之配置发送者重试机制
  • 基于深度学习的大规模MIMO信道状态信息反馈
  • 在Docker中部署Rasa NLU服务
  • SQL语句创建数据库(增删查改)
  • 微信小程序-Vant组件库的使用
  • 为什么企业需要进行能源体系认证?
  • 【日常记录-MySQL】EVENT
  • 嵌入式学习day12(LinuxC高级)
  • pytorch中的hook机制register_forward_hook
  • 使用Gin框架返回JSON、XML和HTML数据
  • 网工内推 | 国企运维工程师,华为认证优先,最高年薪20w
  • c# 使用异步函数实现线程的功能
  • MySQL之MySQL server has gone away复现测试
  • 编程深水区之并发④:Web多线程