当前位置: 首页 > news >正文

消息队列常见问题(1)-如何保障不丢消息

目录

1. 为什么消息队列会丢消息?

2. 怎么保障消息可靠传递?

2.1 生产者不丢消息

2.2 服务端不丢消息

2.3 消费者不丢消息

3. 消息丢失如何快速止损?

3.1 完善监控

3.2 完善止损工具


1. 为什么消息队列会丢消息?

现在主流的消息队列都会提供完善的高可用解决方案,但是我们依然会有多种原因导致消息丢失,可能得原因包括:生产者生产消息失败、服务端存储消息失败、消费者消息处理失败。

其中常见的生产者生产消息失败的原因包括:

  • 消息体过大
  • 网络传输异常
  • 配置错误(例如:topic配置错误)
  • 生产者应用程序异常

服务端存储消息失败的常见原因包括:

  • 配置问题(例如:高性能方面考虑,未配置主从同步、未配置持久化)
  • 存储空间不足/存储介质故障

消费者消息处理失败的常见原因包括:

  • 消费者应用程序异常;
  • 消费者处理超时;
  • 消费过程中出现服务重启等问题;

2. 怎么保障消息可靠传递?

2.1 生产者不丢消息

  • 快速重试:在程序设计时,需要关注异常处理机制,我们需要遵循的原则是:异常处理 + 自动重试 + 告警(缺一不可),详细展开讲就是:1、任何系统异常,避免在不确定情况下随意捕获异常,从而导致被错误”兜底“处理。2、在生产消息异常情况下,需要支持异常情况下的自动重试,且多次重试需要有一定间隔时间。(此方案要求消费者幂等)3、遇到预期之外的异常及时埋点、告警等。
  • 消息补偿:引入"异常补偿服务",通过异常补偿服务收集生产者、消费者的异常消息进行持久化 + 重试。

2.2 服务端不丢消息

MQ Cluster不丢消息关键参考消息队列高可用方案,总结下来就是:

  • 持久化:持久化的目的是在服务故障或宕机时,消息不会丢失。对于RabbitMQ、Kafka、RocketMQ等不同的消息队列,除了消息日志被持久化之外,还需要关注元数据的持久化(例如:RabbitMQ中的Exchange元数据、Queue元数据等)、偏移量的持久化等。
  • 消息备份:RabbitMQ、Kafka、RocketMQ都支持消息备份,但是消息备份机制上存在一些差异,Kafka是针对每个分区都有主副本和多个从副本,RabbitMQ是采用镜像队列的方式,RocketMQ是每个消息主题都有主节点和多个从节点。
  • ACK确认机制:RabbitMQ、Kafka、RocketMQ在消息的ACK确认机制上差异不大,区别在于Kafka是基于分区的消息提交机制,也即某个分区所有消息消费完成后进行ACK;RabbitMQ是基于消费者的消息确认机制,即只有当消费者成功消费并处理了某条消息后,才会进行ACK确认;RocketMQ采用基于消费者组的消息确认机制,即只有当某个消费者组中所有消费者都成功消费并处理了某条消息后,才会进行确认。

2.3 消费者不丢消息

  • 快速重试:类似于生产者的解决方案,对于消息消费的异常需要感知并进行重试。在消费者的重试上需要注意:1、消息的重试次数需要有限,避免无限重试影响后续的消费;2、消息的消费需要幂等,避免前一次消费正常,再次消费时出现错误。
  • 监控消息:除了正常的消费队列,引入延迟的监控队列,在监控队列中通过状态等属性,监听消费者处理的正确性,对于消费异常的情况可以发送补偿消息。

3. 消息丢失如何快速止损?

3.1 完善监控

  • 实时监控:就是无论是生产者,还是消费者都需要及时捕获处理异常,并进行告警处理。
  • 旁路监控:就是引入监控队列/定时任务的方案,检查生产者与消费者的数据一致性,对于生产者与消费者数据不一致的场景进行及时告警处理。
  • 趋势监控:趋势监控是一种发现大规模问题的方法,也即埋点记录每一秒钟/每一分钟发出去的消息数,若代码变更后导致生产消息数/消费消息数明显降低,则需要及时关注进行处理。

3.2 完善止损工具

  • 消息生产工具:在日常开发中,建议养成在生产者发送消息前打印消息体日志的习惯。在发现数据异常后,可以重新手动发送消息。
  • 数据检查/回退工具:止损工具大家容易想到把消息重新生产一遍,也知道消息的消费需要具有幂等性。但是生产环境通常比较复杂,偶尔会产生一些异常数据导致消息生产/消费失败,或者消息处理一半产生预料之外的脏数据。这里建议建设相关一些数据快速回退工具、数据正确性工具,加快故障处理速度。

http://www.lryc.cn/news/112236.html

相关文章:

  • Circle of Mistery 2023牛客暑期多校训练营5 B
  • VC9、VC10、VC11等等各对应什么版本的Visual Studio,以及含义
  • 两数相加 LeetCode热题100
  • Python基础 P2数字类型与优先级进阶练习
  • CAPL通过继电器实现CAN容错性自动化测试
  • elasticsearch 配置用户名和密码
  • 侯捷 C++面向对象编程笔记——9 复合 委托
  • 状态模式——对象状态及其转换
  • Linux一阶段复习
  • 宝塔Linux面板怎么升级?升级命令及失败解决方法
  • 前端面试的性能优化部分(6)每天10个小知识点
  • 2023年 Java 面试八股文(20w字)
  • 银河麒麟服务器ky10-server在线一键安装docker
  • spring boot中web容器配置
  • DNSlog注入(利用DNSlog平台将SQL盲注变成回显注入)
  • vim学习笔记(致敬vim作者)
  • 力扣 -- 139. 单词拆分
  • 百度秋招攻略,百度网申笔试面试详解
  • nohup Java -jar 生成的nohup.out 文件一直增加,如何处理
  • 静态页面与动态页面的区别及部署jpress应用
  • 华为数通HCIA-华为VRP系统基础
  • 基于Azure OpenAI Service 的知识库搭建实验⼿册
  • 第七节--结构体
  • Docker学习(二十四)报错速查手册
  • 一种SpringBoot下Scheduler定时任务优雅退出方案
  • DNS部署与安全详解(上)
  • 【51单片机】晨启科技,酷黑版,音乐播放器
  • 基于SPSSPRO实现层次分析法(AHP)
  • Spring Test中使用MockMvc进行上传文件单元测试时,报NullPointerException
  • HTTP常用状态码及其含义