当前位置: 首页 > news >正文

RabbitMQ: Publish/Subscribe结构

生产者 

package com.qf.mq2302.publishSub;import com.qf.mq2302.utils.MQUtils;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class EmitLog {private static final String EXCHANGE_NAME = "logs";public static void main(String[] args) throws Exception {Connection conn = MQUtils.getConnection();Channel channel = conn.createChannel();// 在mq中声明一个交换机/*** 第一个参数:交换机的名字* 第二个参数:交换机的类型,fanout代表该交换机会把收到的消息无差别投递给所有他关联的队列*/channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String msg = "hello fanout!";/*** 第一个参数,交换机的名字* 第二个参数:如果交换机是 fanout类型的,可以写空串 ;因为fanout类型的交换机会把消息无差别向关联队列投递*/channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes("utf-8"));channel.close();conn.close();}
}

消费者1

package com.qf.mq2302.publishSub;import com.qf.mq2302.utils.MQUtils;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;import java.io.IOException;public class ReceiveLogs01 {private static final String EXCHANGE_NAME = "logs";public static void main(String[] args) throws Exception {Connection conn = MQUtils.getConnection();Channel channel = conn.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 在mq中声明一个名字是随机字符串的队列(队列的所有属性都是默认值),返回队列的名字String queueName = channel.queueDeclare().getQueue();// 把队列和交换机建立好绑定关系/*** 参数1: 队列名* 参数2: 交换机名* 参数3: routingkey,注意,如果交换机是fanout类型,可以写空串*/channel.queueBind(queueName,EXCHANGE_NAME,"");channel.basicQos(1);channel.basicConsume(queueName, false, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {byte[] body = message.getBody();String msg = new String(body, "utf-8");System.out.println("01:"+msg);// 手动ackchannel.basicAck(message.getEnvelope().getDeliveryTag(),false);}},c -> {});}
}

消费者2

package com.qf.mq2302.publishSub;import com.qf.mq2302.utils.MQUtils;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;import java.io.IOException;public class ReceiveLogs02 {private static final String EXCHANGE_NAME = "logs";public static void main(String[] args) throws Exception {Connection conn = MQUtils.getConnection();Channel channel = conn.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 在mq中声明一个名字是随机字符串的队列(队列的所有属性都是默认值),返回队列的名字String queueName = channel.queueDeclare().getQueue();// 把队列和交换机建立好绑定关系/*** 参数1: 队列名* 参数2: 交换机名* 参数3: routingkey,注意,如果交换机是fanout类型,可以写空串*/channel.queueBind(queueName,EXCHANGE_NAME,"");channel.basicQos(1);channel.basicConsume(queueName, false, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {byte[] body = message.getBody();String msg = new String(body, "utf-8");System.out.println("02:"+msg);// 手动ackchannel.basicAck(message.getEnvelope().getDeliveryTag(),false);}},c -> {});}
}

消费者3

package com.qf.mq2302.publishSub;import com.qf.mq2302.utils.MQUtils;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;import java.io.IOException;public class ReceiveLogs03 {private static final String EXCHANGE_NAME = "logs";public static void main(String[] args) throws Exception {Connection conn = MQUtils.getConnection();Channel channel = conn.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 在mq中声明一个名字是随机字符串的队列(队列的所有属性都是默认值),返回队列的名字String queueName = channel.queueDeclare().getQueue();// 把队列和交换机建立好绑定关系/*** 参数1: 队列名* 参数2: 交换机名* 参数3: routingkey,注意,如果交换机是fanout类型,可以写空串*/channel.queueBind(queueName,EXCHANGE_NAME,"");channel.basicQos(1);channel.basicConsume(queueName, false, new DeliverCallback() {@Overridepublic void handle(String consumerTag, Delivery message) throws IOException {byte[] body = message.getBody();String msg = new String(body, "utf-8");System.out.println("03:"+msg);// 手动ackchannel.basicAck(message.getEnvelope().getDeliveryTag(),false);}},c -> {});}
}

http://www.lryc.cn/news/162251.html

相关文章:

  • 单片机-蜂鸣器
  • 华为云云耀云服务器L实例评测 | 分分钟完成打地鼠小游戏部署
  • Android——数据存储(二)(二十二)
  • appium环境搭建
  • 十五、Webpack打包图片-js-Vue、Label命令、resolve模块解析
  • ARM指令集--数据处理指令
  • Excel embed into a webpage
  • uniapp点击事件在小程序中无法传参
  • ssprompt:一个LLM Prompt分发管理工具
  • 修复 ChatGPT 发生错误的问题
  • 《热题100》字符串、双指针、贪心算法篇
  • 大数据组件Sqoop-安装与验证
  • 运算符重载(个人学习笔记黑马学习)
  • 2023.9.6 Redis 的基本介绍
  • 2023-09-08力扣每日一题
  • adb-linux 调试桥
  • 入门人工智能 —— 使用 Python 进行文件读写,并完成日志记录功能(4)
  • 使用Caffeine实现帖子的缓存来优化网站的运行速度
  • Webpack5 搭建Vue项目(进阶版)
  • 论文阅读:Distortion-Free Wide-Angle Portraits on Camera Phones
  • 力扣每日一题---207. 课程表
  • 在Kubernetes环境中有关Nginx Ingress与API Gateway的连接问题
  • c语言练习44:深入理解strstr
  • 渗透测试漏洞原理之---【业务安全】
  • CentOS查看CPU、内存、网络流量和磁盘 I/O
  • 无人机航线规划
  • react中受控组件与非受控组件
  • 【网络教程】如何解决Docker删除镜像和容器后磁盘空间未释放的问题
  • Python中的进度条显示方案
  • 2023-09-05力扣每日一题