当前位置: 首页 > news >正文

rabbitmq消息队列详述

1.同步异步介绍:

同步调用:

优势:时效性强,等待到结果后才返回。

劣势:拓展性差、性能下降、级联失败问题

异步调用:

优势:解除耦合,拓展性强、无需等待,性能好、故障隔离(下游服务故障不影响上游业务)缓存消息,流量削峰填谷

劣势:架构复杂度提高、实时性降低、不确定下游业务执行是否成功、业务安全依赖于Broker的可靠性

异步处理:例如支付服务可能涉及很多服务,支付服务、用户服务、交易服务、通知服务、积分服务等,支付服务不在同步调用业务关联度低的服务,而发送消息通知到Broker(消息代理),用户进行支付服务之后,执行用户服务,扣减对应的余额,更新支付状态,其他服务加入到消息代理里面,更改支付状态之后,支付服务发送消息通知给消息代理,调用其他服务交易服务(更新订单状态)、通知服务(短信通知用户)、积分服务(增加用户积分)

可以控制高并发时候请求速率:

2.Mq选型:

消息队列主流有四种MQ类型,分别是:RabbitMQ、ActiveMQ、RocketMQ、Kafka,异步调用中的Broker

3.RabbitMQ介绍:

RabbitMQ整体架构及核心概念:

virtual-host:虚拟主机,起到数据隔离的作用

publisher:消息发送者

consumer:消息的消费者

queue:队列、存储消息

exchange:交换机,负责路由信息

消息发送的注意事项:

交换机只能路由消息,无法存储消息

交换机只会路由消息给与其绑定的队列,队列与交换机必须绑定

 代码层面来书写rabbitMQ的具体实现过程:先声明队列名称、消息内容、使用rabbitTemplate中的方法convertAndSend来将消息发送给队列中。

发送端代码:

@Autowired
private RabbitTemplate rabbitTemplate;@Test
public void testSimpleQueue(){//队列名称String queueName = "simple.queue";//消息String message = "hello,spring amqp!";//发送消息rabbitTemplate.convertAndSend(queueName,message);
}

接收端代码:配置rabbitmq服务端信息,利用rabbitTemplate发送消息,利用@RabbitListener注解声明要监听的队列,监听消息。

@Slf4j
@Component
public class SpringRabbitListener{@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) throws InterrupedException{log.info("spring 消费者接收到信息:【" + msg + "】");}
}

4.任务模型和消费者消息推送限制

 work Queues:任务模型,就是让多个消费者绑定到一个队列,共同消费队列中的消息

消费者消息推送限制:默认情况下,RabbitMQ会将消息依次轮询投递给绑定在队列上的每一个消费者,没有考虑消费者是否处理完消息,可能出现消息堆积情况。

修改对应的配置application.yml,设置preFetch值为1,确保同一时刻最多投递给消费者1条消息:

spring:rabbitmq:listener:simple:prefetch:1 #每次只能获取一条消息,处理完成才能获取下一个消息

5.交换机:

交换机的作用主要是接收发送者发送的消息,并将消息路由到与其绑定的队列。交换机类型常见有三种:Fanout:广播、Direct:定向、Topic:话题

1. Fanout交换机

Fanout Exchange 会将接收到的消息路由到每一个跟其绑定的queue,广播模式

交换机的作用:

接收publisher发送的消息

将消息按照规则路由到与之绑定的队列

FanoutExchange的会将消息路由到每个绑定的队列

发送消息到交换机的API:

@Test
public void testFanoutExchange(){//交换机名称String exchangeName = "itcast.fanout";//消息String message = "hello,everyone!";//发送消息,参数分别是:交换机名称,RoutingKey(暂时为空),消息rabbitTemplate.convertAndSend(exchangeName,"",message);
}

2.Direct交换机

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,是定向路由

一个Queue都与Exchange设置一个BindingKey

发布者发送消息时,指定消息的RoutingKey

Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

3.Topic交换机

TopicExchange也是基于RoutingKey做消息路由,RoutingKey通常是多个单词的组合,以.分割。Queue与Exchange指定BingdingKey时可以使用通配符:

#:代指0个或者多个单词

*:代指一个单词

6.声明队列和交换机

SpringAMQP提供几个类,用来声明队列、交换机及其绑定关系:

  • Queue:用于声明队列,可以用工厂类QueueBuilder构建
  • Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
  • Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建

代码生成交换机、队列、 绑定队列和交换机 

@Configuration
public class FanoutConfig{//声明FanoutExchange交换机,hmall.fanout交换机名称@Beanpublic FanoutExchange fanoutExchange(){return ExchangeBuilder.fanoutExchange("hmall.fanout").build();}//声明第1个队列@Beanpublic Queue fanoutQueue1(){return QueueBuilder.durable("fanout.queue1").build();}//绑定队列1和交换机@Beanpublic Binding bingingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}
}

SpringAMQP还提供了基于@RabbitListener注解来声明队列和交换机的方式:

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),key = {"red","blue"}
))
public void listenDirectQueue1(String msg){System.out.println("消费者1接收到Direct消息:【"+msg+"】");
}

7.消息转换器

 Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。

存在下列问题:

  • JDK的序列化有安全风险
  • JDK序列化的消息太大
  • JDK序列化的消息可读性差

 建议采用JSON序列化代替默认的JDK序列化,做两件事情:

在publisher和consumer中都要引用jackson依赖:

<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId>
</dependency>

在publisher和consumer中都要配置MessageConverter:

@Bean
public MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();
}

 8.发送者确认和发送者重连

发送者重连:因为网络波动,发送者连接MQ失败的情况,通过配置开启连接失败后的重连机制:

spring:rabbitmq:connection-timeout:1s #设置MQ的连接超时时间template:retry:enabled:true # 开启超时重试机制initial-interval:1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待市场倍数,下次等待时长 = initial - interval*multipliermax-attempts:3 # 最大重试次数

 注:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能

如果对于业务性能有要求,建议禁用重试机制,如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

发送者确认:

SpringAMQP提供了Publisher Confirm 和 Publisher Return 两种确认机制,开启确认机制后,当发送者发送消息给MQ后,MQ会返回确认结果给发送者。返回的结果有以下几种情况:

  • 消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功
  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功
  • 其它情况都会返回NACK,告知投递失败

 在publisher这个微服务的application.yml中添加配置:

spring:rabbitmq:publisher-confirm-type:correlated # 开启publisher confirm机制,并设置confirm类型publisher-returns:true # 开启publisher return机制

配置说明:

这里publisher-confirm-type有三种模式可选:

  • none:关闭confirm机制 
  • simple:同步阻塞等待MQ的回执消息
  • correlated:MQ异步回调方式返回回执消息

9.MQ的可靠性

在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题:

  • 一旦MQ宕机,内存中的消息会丢失
  • 内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞

10.数据持久化

包括三个方面:交换机持久化、队列持久化、消息持久化

前面演示的是被动持久化策略,消息类型是非持久化的,只有在消息队列满了后会被迫写入磁盘。这种策略由于磁盘读写瓶颈导致消息队列堵塞。为了解决这个问题才有了同步持久化策略,不会导致消息队列阻塞

11.Lazy Queue

惰性队列特征:

接收到消息后直接存入磁盘,不再存储到内存

消费者要消费消息时才会从磁盘中读取并加载到内存(可以提前缓存部分消息到内存,最多2048条)在3.12版本后,所有队列都是Lazy Queue模式,无法更改。

要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为Lazy即可:

@RabbitListener注解监听队列名称@Queue来监听

保证消息可靠性

1.首先通过配置可以让交换机、队列、以及发送的消息都持久化。这样队列中的消息会持久化到磁盘,MQ重启消息依然存在

2.RabbitMQ在3.6版本引用了LazyQueue,并在3.12版本后会称为队列的默认模式LazyQueue会将所有消息都持久化

3.开启持久化和生产者确认时,RabbitMQ只有在消息持久化完成后才会给生产者返回ACK回执

非持久化数据mq重启之后会消失

12.消费者确认机制

消费者确认机制是为了确认消费者是否成功处理消息。当消费者处理消息结束后,应向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息
  • nack:消息处理失败,RabbitMQ需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

SpringAMQP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式,有三种方式:

none:不处理。即消息投递给消费者后立刻ack消息会立刻从MQ删除。非常不安全,不建议使用

manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活

auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack,当业务出现异常时,根据异常判断返回不同结果

  • 如果是业务异常,会自动返回nack
  • 如果是消息处理或者校验异常,自动返回reject
spring:rabbitmq:listener:simple:prefetch:1acknowledge-mode:none # none,关闭ack;manual,手动ack;auto:自动ack

13.失败重试机制

SpringAMQP提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限的requeue到mq。可以通过在application.yaml文件中添加配置来开启重试机制

spring:rabbitmq:listener:simple:prefetch:1retry:enabled:true # 开启消费者失败重试initial-interval:1000ms # 初始的失败等待时长为1秒multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,包含三种不同实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

第三种:

失败消息处理策略:使用RepublishMessageRecoverer策略:

  • 1.首先,定义接收失败消息的交换机、队列及其绑定关系
  • 2.然后,定义RepublishMessageRecoverer:
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");
}

14.业务幂等性

幂等概念:指的是基本对一个字段进行同等操作,非幂等概念:对一个业务操作可能涉及不同的字段来实现操作,非幂等性。

15. 保持不同服务状态一致性

两个服务,服务1,服务2,

  • 首先,服务1会正在用户完成交易之后利用MQ消息通知服务2,完成状态同步。
  • 其次,为了保证MQ消息可靠性,采用生产者确认机制,消费者确认、消费者失败重试等策略,确保消息投递和处理的可靠性。同时也开启了MQ的持久化,避免因服务宕机导致消息丢失。
  • 最后,我们还在交易服务更新订单状态时做了业务幂等判断,避免因消息重复消费导致订单状态异常。

 16.延迟队列

延迟消息:发送者发送消息时指定一个时间消费之不会立刻收到消息,而是在指定时间之后才收到消息

延迟任务:设置在一定时间之后才执行的任务

17.死信交换机

当一个队列中的消息满足下列情况之一时,就会成为死信

  • 消费者使用basic.reject或basic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费
  • 要投递的队列消息堆积满了,最早的消息可能成为死信

 如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机

 18.延迟消息插件

这个插件可以将普通交换机改造为支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue",durable = "true"),exchange = @Exchange(name = "delay.direct",delayed = "true"),key = "delay"))
public void listenDelayMessage(String msg){log.info("接收到delay.queue的延迟消息:{}",msg);
}
@Bean
public DirectExchange delayExchange(){return ExchangeBuilder.directExchange("delay.direct").delayed() // 设置delay的属性为true.durable(true) //持久化.build();
}

发送消息时需要通过消息头x-delay来设置过期时间:

@Test
void testPublisherDelayMessage(){//1.创建消息String message = "hello,delayed message";//2.发送消息,利用消息后置处理器添加信息头rabbitTemplate.converAndSend("delay.direct","delay",message,new MessagePostProcessor(){@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 添加延迟消息属性message.getMessageProperties().setDelay(5000);return message;}});
}

延迟消息尽可能不能过长。

http://www.lryc.cn/news/607928.html

相关文章:

  • python创建一个excel文件
  • PHP 与 MySQL 详解实战入门(2)
  • Removing Digits(Dynamic Programming)
  • 【第三章】变量也疯狂:深入剖析 Python 数据类型与内存原理
  • Android13文件管理USB音乐无专辑图片显示的是同目录其他图片
  • 【NLP舆情分析】基于python微博舆情分析可视化系统(flask+pandas+echarts) 视频教程 - 微博舆情数据可视化分析-热词情感趋势柱状图
  • 机器学习 —— 决策树
  • 从C++0基础到C++入门(第十五节:switch语句)
  • 计算机网络:为什么IPv6没有选择使用点分十进制
  • 如何修复非json数据
  • Gemini CLI
  • 深入 Go 底层原理(五):内存分配机制
  • 操作系统-lecture5(线程)
  • Vue3核心语法基础
  • 【大模型入门】3.从头实现GPT模型以生成文本
  • 相对路径 绝对路径
  • UniappDay07
  • sqli-labs:Less-19关卡详细解析
  • Qt 槽函数被执行多次,并且使用Qt::UniqueConnection无效【已解决】
  • 24黑马SpringCloud的Docker本地目录挂载出现相关问题解决
  • Tushare对接OpenBB分析A股与港股市场
  • 解锁智能油脂润滑系统:加速度与温振传感器选型协同攻略
  • 深度学习核心:卷积神经网络 - 原理、实现及在医学影像领域的应用
  • 【Java】在一个前台界面中动态展示多个数据表的字段及数据
  • 定制开发开源AI智能名片S2B2C商城小程序的特点、应用与发展研究
  • 自进化智能体综述:通往人工超级智能之路
  • SpringBoot IOC
  • C++之vector类的代码及其逻辑详解 (中)
  • 【自动化运维神器Ansible】YAML语法详解:Ansible Playbook的基石
  • vue引入阿里巴巴矢量图库的方式