当前位置: 首页 > news >正文

【SpringCloud】微服务技术栈入门4 - RabbitMQ初探

目录

    • RabbitMQ
      • 安装 rabbitmq
      • SpringAMQP 基础队列
      • WorkQueue
      • 路由发布订阅 FanoutExchange
      • DirectExchange
      • TopicExchange

RabbitMQ


安装 rabbitmq

首先确保自己已经安装好了 docker

是 docker 拉取镜像文件:docker pull rabbitmq:3-management

拉取完毕,打开容器

docker run \-e RABBITMQ_DEFAULT_USER=itcast \-e RABBITMQ_DEFAULT_PASS=123321 \--name mq \--hostname mq1 \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3-management

浏览器访问虚拟机的 15672 端口,即可看见 rabbitmq 管理界面

在这里插入图片描述

我们可以在 admin 选项卡内添加新的用户

其中的can access virtual hosts表示当前用户对应的虚拟主机
建议不同用户对应不同的虚拟主机,可以实现隔离效果

虚拟主机可以点击上图右侧的 virtual hosts 按钮新建


SpringAMQP 基础队列

由于使用官方原生操作 rabbitmq 的方式太过生草,代码巨多,不适合日常开发,推荐改用 SpringAMQP 来简化操作

导入坐标

<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId>
</dependency>
<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--单元测试-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId>
</dependency>

配置 rabbitmq 链接

logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
spring:rabbitmq:host: 192.168.113.146 # rabbitMQ的ip地址port: 5672 # 端口username: itcastpassword: 123321virtual-host: /

publisher 编写测试类测试 AMQP

由于我们的 rabbitmq 默认没有创建队列 simple.queue 所以你直接发送是接收不到任何信息的,必须要先进行判断,如果队列不存在那就先创建对应队列后在发送,才可以接受得到!

@Test
public void testSendMessage2SimpleQueue() {RabbitAdmin admin = new RabbitAdmin(rabbitTemplate);String queueName = "simple.queue";String message = "hello, spring amqp!";// 队列是否存在的判断if (Objects.isNull(admin.getQueueProperties(queueName))) {Queue queue = new Queue(queueName);admin.declareQueue(queue);}// 发送消息到消息队列rabbitTemplate.convertAndSend(queueName, message);
}

不出意外的话,你在 rabbitmq 控制台的 queue 选项内,就可以看见新创建的 simple.queue 队列,里面包含着我们发送的第一条信息


consumer 消费对应队列中的消息

监听之前也要和 publisher 配置相同的 application.yaml,这样才可以连接到 rabbitmq

新建监听消费类 SpringRabbitListener ,传入如下代码执行监听

@Component
public class SpringRabbitListener {// 设置消费者需要监听的队列@RabbitListener(queues = "simple.queue")public void listenWorkQueue1(String msg) throws InterruptedException {// 获取队列中信息System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20);}
}

WorkQueue

在这里插入图片描述

上图展示了工作队列的流程图,实际上就是增加了一个消费者来消费队列中的消息

由于我们上一节已经创建了 simple.queue,这里就不用判断了,直接往里面每隔 20ms 插入一条信息

@Test
public void testSendMessage2WorkQueue() throws InterruptedException {String queueName = "simple.queue";String message = "hello, message__";for (int i = 1; i <= 50; i++) {rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}
}

同理,按照流程图指示,为设置两个消费者监听器
注意!第一个消费者每隔 20ms 接受一次消息,而第二个消费者则是每隔 200ms 接收一次

@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20);
}@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(200);
}

先运行消费者,然后使用 publisher 插入 50 条消息

从日志输出我们发现,1、2 消费者处理了同样多的数据(各自 25 条),但很显然第二个消费者慢很多,因为它每隔 200ms 才处理一个消息

出现这一情况的原因是消息预取,也就是说所有消费者获取同样多的消息,而不在乎自己每个多久处理一次消息


解决消息分配不均问题

配置文件添加 prefetch 配置项,他表示必须先处理完 1 个消息后,才可以取出下一消息进行处理,有效规避了一瞬间预取全部消息堆积到一个消费者上的场面

spring:rabbitmq:host: 192.168.113.146 # rabbitMQ的ip地址port: 5672 # 端口username: itcastpassword: 123321virtual-host: /listener:simple:prefetch: 1

此时重复上方操作,发现实现了“能者多劳”的效果,消费者 1 由于处理速度快,故其消费了绝大多数消息,而消费者 2 处理消息极少

这样做将整体处理时长压缩到 1s 及以下


路由发布订阅 FanoutExchange

在这里插入图片描述

设置路由发布订阅需要分为三步,设置路由 Exchange、设置队列 Queue、将队列绑定到路由上

创建 fanout 配置文件 FanoutConfig,我们按照以下的代码简单创建 1 个路由以及 2 个队列,并实行绑定操作

@Configuration
public class FanoutConfig {// 配置路由:itcast.fanout@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("itcast.fanout");}// 配置队列:fanout.queue1@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}// 绑定队列1到交换机@Beanpublic Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}// fanout.queue2@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}// 绑定队列2到交换机@Beanpublic Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}

由于他们都添加了 bean 注解,故直接运行 consumer 项目,他们会自动装配

此时打开 rabbitmq 控制面板,进入 exchange 选项,就可以找到我们新创建的路由以及对应的队列绑定关系了

在这里插入图片描述


至于后续的发布者发布信息以及消费者消费信息的代码,大家可以参照上一小节来自己补全,这里就不过多赘述了


DirectExchange

在这里插入图片描述

可以将其理解为带规则的路由转发机制,通过 bindingkey 和 routingkey 相一致配对来实现转发操作


配置消费者监听

// value 监听队列
// exchange 监听路由
// key bindingkey
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
))

然后发布者再向对应的路由发送带 routingkey 的消息

@Test
public void testSendDirectExchange() {// 交换机名称String exchangeName = "itcast.direct";// 消息String message = "hello, red!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "red", message);
}

TopicExchange

在这里插入图片描述

和 directexchange 类似,但是 routingkey 为多个单词的列表,具体格式参照上图

至于发布者与消费者的书写方式和 directexchange 基本一致,需要注意的就是 routingkey 和 bindingkey 的书写方式而已


http://www.lryc.cn/news/184030.html

相关文章:

  • cefsharp(117.2.20)cef117.2.2最新体验版
  • layui在上传图片在前端处理图片压缩
  • js 事件参考
  • 卷积网络的发展历史-LeNet
  • (2023,GPT-4V,LLM,LMM,功能和应用)大型多模态模型的黎明:GPT-4V(ision) 的初步探索
  • 【C++设计模式之装饰模式:结构型】分析及示例
  • 绘制散点图、曲线图、折线图和环形图失败, 设置迭代次数和进度无法保存图片
  • micro-ROS中对消息的内存管理
  • Springboot中使用拦截器、过滤器、监听器
  • 代码随想录二刷day45
  • 泊车功能专题介绍 ———— AVP系统基础数据交互内容
  • 蓝桥杯每日一题2023.10.6
  • 7、【Qlib】【主要组件】Data Layer:数据框架与使用
  • Kubernetes安装部署 1
  • 在VS Code中优雅地编辑csv文件
  • LCR 128.库存管理 I
  • eigen::Affine3d 转换
  • 【Python从入门到进阶】38、selenium关于Chrome handless的基本使用
  • 给Python项目创建一个虚拟环境(enev)
  • 【RK3588】YOLO V5在瑞芯微板子上部署问题记录汇总
  • 别人做的百度百科词条信息不全,如何更正自己的百度百科词条
  • [论文精读]U-Net: Convolutional Networks for BiomedicalImage Segmentation
  • Godot Identifier “File“ not declared in the current scope.
  • Java ORM Bee,多表关联更新
  • Java 读取excel文件
  • PageRank(上):数据分析 | 数据挖掘 | 十大算法之一
  • 吃鸡达人专享!提高战斗力,分享干货,查询装备皮肤,保护账号安全!
  • 力扣第101题 c++ 递归 迭代 双方法 +注释 ~
  • Go:实现SMTP邮件发送订阅功能(包含163邮箱、163企业邮箱、谷歌gmail邮箱)
  • Scala第十六章节