当前位置: 首页 > news >正文

异步通讯组件MQ

RabbitMQ

高性能的异步通讯组件

1. 同步异步和MQ技术选型

MQ:消息队列,存放消息的队列,也就是异步调用中的Broker

1.1 同步调用

微服务一旦拆分,必然涉及到服务之间的相互调用,目前我们服务之间调用采用的都是基于OpenFeign的调用。

这种调用中,调用者发起请求后需要等待服务提供者执行业务返回结果后,才能继续执行后面的业务。也就是说调用者在调用过程中处于阻塞状态,因此我们称这种调用方式为同步调用,也可以叫同步通讯

1.2 异步调用

异步调用方式其实就是基于消息通知的方式,一般包含三个角色:

  • 消息发送者:投递消息的人,就是原来的调用方
  • 消息Broker:管理、暂存、转发消息
  • 消息接收者:接收和处理消息的人,就是原来的服务提供方

在异步调用中,发送者不再直接同步调用接收者的业务接口,而是发送一条消息投递给消息Broker。然后接收者根据自己的需求从消息Broker那里订阅消息。每当发送方发送消息后,接受者都能获取消息并处理。

这样,发送消息的人和接收消息的人就完全解耦了。

1.3 MQ技术选型

追求可用性:Kafka、 RocketMQ 、RabbitMQ

追求可靠性:RabbitMQ、RocketMQ

追求吞吐能力:RocketMQ、Kafka

追求消息低延迟:RabbitMQ、Kafka

2. RabbitMQ

RabbitMQ的整体架构及核心概念

  • virtual-host:虚拟主机,起到数据隔离的作用

  • publisher:消息发送者

  • consumer:消息的消费者

  • queue:队列,存储消息

  • exchange:交换机,负责路由消息

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

2.1 入门

消息发送的注意事项

  • 交换机只能路由消息,无法存储消息
  • 交换机只会路由消息给与其绑定的队列,因此队列必须与交换机绑定

3. java客户端

3.1 快速入门

  1. 引入spring-boot-starter-amqp依赖

  2. 进行地址等相关配置

  3. 发送消息:利用RabbitTemplate实现消息发送:

    rabbitTemplate.convertAndSend(queueName,message);
    
  4. 接收消息:利用@RabbitListener注解声明要监听的队列,监听消息

    SpringAMQP提供声明式的消息监听,只需通过注解在方法上声明要监听的队列名称,将来SpringAMQP就会把消息传递给当前方法

    @RabbitListener(queues = "simple.queue")
    

3.2 WorkQueues

  • Work Queueus,任务模型,简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息,可以加快消息处理速度

  • 同一条消息只会被一个消费者处理

  • 在默认情况下,RabbitMQ会将消息依次轮询投递给绑定在队列上的每一个消费者,但这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积

​ **解决方法:**因此我们需要修改application.yml,设置preFetch的值为1,确保同一时刻最多投递给消费者1条消息

3.3 交换机

交换机的主要作用是

  • 接收发送者发送的消息,
  • 将消息路由到与其绑定的队列

常见交换机的类型有:

  1. Fanout:广播
  2. Direct:定向
  3. Topic:话题

Fanout交换机

会将接收的消息路由到每一个与其绑定的queue,所以也叫广播模式

三个参数:交换机名,RoutingKey,消息

rabbitTemplate.convertAndSend(exchangeName,null,message);

Direct交换机

Direct交换机会将接收到的消息根据规则路由到指定的队列中,也成为定向路由

  • 队列与交换机的绑定设置一个BindingKey
  • 发布者发送消息时,指定消息的 RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

多个队列具有相同的RoutingKey则和Fanout功能类似

Topic交换机

Topic交换机也是基于RoutingKey做消息路由的,但是routingKey通常时多个单词的组合,并以.进行分割

Topich和Direct交换机的差异?

Topic的routingKey和bindingKey可以是多个单词,以.分割,

Topic的交换机与队列绑定时的bindingKey可以指定通配符

​ #表示0个或多个单词,*表示一个单词

3.6 声明队列交换机

基于代码生成交换机

SpringAMQP提供了几个类,用来声明队列,交换机及其绑定关系

  • Queue:用来声明队列,可以用工厂类QueueBuilder构建
  • Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
  • Binging:用于声明队列和交换机的绑定关系,可以用工厂类BingingBuilder构建

一般这些声明写在消费者中,发送者不需要关心声明,

fanout交换机的操作:

    // 交换机的声明@Beanpublic FanoutExchange fanoutExchange(){
//        return new FanoutExchange("hamll.fanout");return ExchangeBuilder.fanoutExchange("hmall.fanout").build();}// 队列的声明@Beanpublic Queue fanoutQueue1(){
//        return new Queue("fanout.queue1");return QueueBuilder.durable("fanout.queue1").build();}// 绑定关系的声明@Beanpublic Binding fanoutQueue1Binding(Queue fanoutQueue1,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}

但是direct交换机的绑定存在问题,如下

    // 交换机的声明@Beanpublic DirectExchange directExchange(){
//        return new FanoutExchange("hamll.fanout");return ExchangeBuilder.directExchange("hamll.direct").build();}// 队列的声明@Beanpublic Queue directQueue1(){
//        return new Queue("fanout.queue1");return QueueBuilder.durable("direct.queue1").build();}// 绑定关系的声明@Beanpublic Binding directQueue1BindingRed(Queue directQueue1,DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}// 绑定关系的声明@Beanpublic Binding directQueue1BindingYellow(Queue directQueue1,DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("yellow");}

由于一次只能绑定一个关键词,所以当需要绑定的bindingKey越多,Bean也就越多,代码臃肿

解决方案:使用基于@RabbitListener注解来声明队列和交换机发方法

    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),  // 队列 exchange = @Exchange(name = "hmall.direct"), // 交换机key ={"red","blue"} // bindingKey))public void ListenDirectQueue1(String message) throws InterruptedException {System.out.println("消费者1接收到消息:" + message+","+ LocalTime.now());}

3.7 消息转换器

建议使用JSON序列化代替默认的JDK虚拟化

  1. 赢入jackson依赖
  2. 在publisher和consumer中都配置MessageConverter

4. 黑马商城业务改造

需求:改造余额支付功能,不再同步调用交易服务的OpenFeign接口进行远程调用,而是采用异步MQ通知交易服务更新订单状态

  • 对于边缘业务,不做远程调用,而是做消息通知(异步通知)

高级

1. 发送者重连

2. 发送者确认

3. MQ持久化

4. LazyQueue

5. 消费者确认

6. 失败重连

7. 业务幂等

8. 延迟消息

http://www.lryc.cn/news/602119.html

相关文章:

  • 【Linux系统】Ext2文件系统 | 软硬链接
  • 医疗人工智能高质量数据集和语料库建设路径探析
  • HOT100——链表篇Leetcode206. 反转链表
  • qt 心跳包
  • Java面试宝典:Spring Boot
  • 解决MySQL 1055错误:ONLY_FULL_GROUP_BY问题详解(MySQL 8.0版)
  • Java项目接口权限校验的灵活实现
  • Datawhale AI夏令营 task2 笔记问题汇总收集
  • Python 实现服务器自动故障处理工具:从监控到自愈的完整方案
  • PCS液相色谱柱:专为碱性化合物设计的高性能色谱柱
  • Python 异常 (Exception) 深度解析
  • 项目进度如何控制
  • 新手向:破解VMware迁移难题
  • 元宇宙经济与数字经济的异同:虚实交织下的经济范式对比
  • 【实时Linux实战系列】在实时应用中进行负载均衡
  • PyTorch武侠演义 第二卷:高塔中的注意力秘境 第1章:残卷指引
  • 安宝特案例丨AR+AI赋能轨道交通制造:破解人工装配难题的创新实践
  • 绳子切割 图论
  • RPC 详解
  • 图论(BFS)构造邻接表(运用队列实现搜索)
  • 持续集成CI与自动化测试
  • 鱼皮项目简易版 RPC 框架开发(三)
  • Redis反弹Shell
  • UniappDay04
  • 【跳跃游戏】
  • Vue、微信小程序、Uniapp 面试题整理最新整合版
  • Entity Framework Core (EF Core) 中Database
  • uniapp,uview icon加载太慢了,老是显示叉叉,将远程加载改到本地加载。
  • 设计模式(二十三)行为型:模板方法模式详解
  • 常用设计模式系列(十四)—模板方法模式