当前位置: 首页 > news >正文

RabbitMQ学习总结-消息的可靠性

保证MQ消息的可靠性,主要从三个方面:发送者确认可靠性,MQ确认可靠性,消费者确认可靠性。

1.发送者可靠性:主要依赖于发送者重试机制,发送者确认机制;

发送者重试机制,其实就是配置文件配置重试规则,当消息发送失败后,会根据配置的重试次数,进行多次发送重试,如代码:

spring:rabbitmq:connection-timeout: 1s # 设置MQ的连接超时时间template:retry:enabled: true # 开启超时重试机制initial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multipliermax-attempts: 3 # 最大重试次数

发送者确认机制:则是依赖于消息的回执,这其中包括发送者回执,和消费者回执两种,但是这种回执都比较耗性能,会导致消息消费的很慢。并且,这也是需要在配置文件中做配置的:

spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型publisher-returns: true # 开启publisher return机制

并且还要有代码的实现,这种方式极大的影响了性能,:

@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("触发return callback,");log.debug("exchange: {}", returned.getExchange());log.debug("routingKey: {}", returned.getRoutingKey());log.debug("message: {}", returned.getMessage());log.debug("replyCode: {}", returned.getReplyCode());log.debug("replyText: {}", returned.getReplyText());}});}
}
@Test
void testPublisherConfirm() {// 1.创建CorrelationDataCorrelationData cd = new CorrelationData();// 2.给Future添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {// 2.1.Future发生异常时的处理逻辑,基本不会触发log.error("send message fail", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执log.debug("发送消息成功,收到 ack!");}else{ // result.getReason(),String类型,返回nack时的异常描述log.error("发送消息失败,收到 nack, reason : {}", result.getReason());}}});// 3.发送消息rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}

2.MQ自身的可靠性:交换机/队列/消息都实现持久化,消息不会丢失,如果是在项目中通过代码创建的交换机/队列/消息,spring默认就是持久化的,如果在mq的客户端手工配置,那就要选定各个参数了。持久化后的消息会直接进入磁盘,不在经过内存了,正常来讲有IO的操作会慢才对,但是在实际的操作中却是非常快。

MQ队列最怕的就是消息积压,导致内存溢出。在3.12版本以后,MQ直接默认就是Laz懒惰队列的模式了,这个模式会直接加载到磁盘,当用到消息的时候,会从磁盘加载到内存,磁盘空间很大,支持数百万级别的存储,所以内存溢出的可能性就会大大降低。我们可以在mq客户端手动设置为lazy队列,也可以在代码中直接实现,代码如下:

@RabbitListener(queuesToDeclare = @Queue(name = "lazy.queue",durable = "true",arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){log.info("接收到 lazy.queue的消息:{}", msg);
}

3.消费者的可靠性:

3.1消费者消费消息后,向MQ发送回执,让MQ知道消息是否正常被消费了,目前回执有三种:

ack:成功处理了消息,MQ从队列中就会删除消息,正常。

nack:失败处理了消息,MQ需要再次投递消息,这会出现一直重试的问题。

reject:消息失败,并拒绝了消息,并且从队列中删除了消息。这个消息被删除了,岂不是数据就丢失了。

对于以上三种回执,基本回执都是固定的,AMQP提供了消息确认的方式,不用写代码,配置就可以,配置有三种:none-配置它失败了,消息会被删除,auto-失败了,消息会回到MQ重新投递,不会丢失,不会被删除,manual-太麻烦,算了。

不过,对于auto的配置,对于返回的异常,会有两种判断:1,如果是业务异常,会自动返回nack

如果是消息处理或者校验异常,会直接进行reject

spring:rabbitmq:listener:simple:acknowledge-mode: auto

3.2 生产者有重试机制,消费者也有重试机制,但是,对于消费者的重试,如果一直失败,那就要有一定的策略,可以把这个失败的消息放到另一个交换机上,后续人工进行干预,这样可以保证消息不丢失。

对于消费者的重试配置:

spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000ms # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

如何把消息发送到另一个交换机上呢?

在消费者服务定义一个处理失败消息队列的交换机,这样就可以把消息存储过去了

@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue", true);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

4.业务的幂等性判断

4.1.对于一条MQ消息,为了防止被重复消费,可以做一个唯一的msgID,当消费的时候可以先检查下这个ID,如果已经消费过了,那就不能再消费了,一定程度上可以避免被重复消费,代码如下:

@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jjmc.setCreateMessageIds(true);return jjmc;
}

4.2.还有一种保证消费唯一性的就是业务上的判断,当需要消费消息的时候,可以先提前去查询下需要消费消息的状态,如果状态已经发生了改变,自然也不用再去消费这条消息了

5.对于以上所有保证消息可靠性的方案,其实都不能完全保证,最终需要一个兜底的方案,兜底方案我们可以采取一个定时任务的方式,定时轮询检查消息是否消费。比如时间间隔多少秒进行一次轮询检查,这种方式我们可以理解为主动查询。这种兜底很大程度上可以保证业务上的一致性。

http://www.lryc.cn/news/320637.html

相关文章:

  • 2024蓝桥杯每日一题(BFS)
  • 力扣思路题:最长特殊序列1
  • c# 的ref 和out
  • ONLYOFFICE文档8.0全新发布:私有部署、卓越安全的协同办公解决方案
  • Mar 14 | Datawhale 01~04 打卡 | Leetcode面试下
  • 【计算机网络】什么是http?
  • 【python开发】并发编程(上)
  • 用c语言实现扫雷游戏
  • LeetCode 2882.删去重复的行
  • 对OceanBase进行 sysbench 压测前,如何用 obdiag巡检
  • 每天学习几道面试题|Kafka架构设计类
  • .rmallox勒索病毒解密方法|勒索病毒解决|勒索病毒恢复|数据库修复
  • 安卓性能优化面试题 11-15
  • Python错题集-9PermissionError:[Errno 13] (权限错误)
  • QT TCP通信介绍
  • 保姆级教学!微信小程序设计全攻略!
  • 日期差值的计算
  • 为什么需要Occupancy?
  • SSA优化最近邻分类预测(matlab代码)
  • nginx相关内容的安装
  • 基于SpringBoot和Echarts的全国地震可视化分析实战
  • 基于YOLOv8/YOLOv7/YOLOv6/YOLOv5的农作物害虫检测系统(深度学习模型+UI界面+训练数据集)
  • 21 # 高级类型:条件类型
  • Java之List.steam().sorted(Comparator.comparing())排序异常解决方案
  • js判断对象是否有某个属性
  • CleanMyMac X2024永久免费的强大的Mac清理工具
  • 等保测评的知识
  • 【算法】多路归并(鱼塘钓鱼)
  • unity3d Animal Controller的Animal组件中General基础部分理解
  • css背景从上到下颜色渐变、css背景从左到右颜色渐变、 css框线展示外阴影、css框线展示内阴影