当前位置: 首页 > news >正文

RabbitMQ多角度可靠性分析/基于Java代码深度解析

上一篇我介绍了RabbitMQ的基本交换机以及队列,不了解可以再回顾一下。

那么这就带来一个问题,我们在程序中添加一个中间件,把我们的消息依托给MQ,那么我们如何保证我们的消息在这个过程不会丢失。保证它的的可靠性,目前主流从三个角度保证:生产者可靠性,MQ可靠性,消费者可靠性。

生产者可靠性:

所谓生产者可靠性就是我们如何保证将我们的消息从生产者将消息发送到队列或者消息的可靠性

 1. 重试机制

针对网络异常,连接中断,卡顿的自动重发。(萌芽阶段,与客户端的连接,还没有到交换机/d队列)

spring:rabbitmq:/....connection-timeout: 1s template:retry:initial-interval: 1000msmultiplier: 2max-attempts: 3

参数讲解

connetion-timeout连接时长,如果在这个时长内依旧没有连接成功就认为连接失败
initial-interval等待时长,第一次连接失败后多久重新重试
mutiplier等待倍数,结合initial-interval,计算下次等待时长,initial-interval=initial-interval*mutiplier
max-attempts最大重连次数

 

重连机制似乎看起来我们每个程序都需要配置一下,但实际上我们真是企业还是要慎重,因为我们的不断重连是阻塞式的。所以说在追求极度的高并发情况下,需要慎重使用,如果必须使用,我们应该优化参数值,比如连接次数,连接时长等参数值调整。

2. 回调机制

确认消息是否到投递到交换机以及是都正确路由(成长阶段,到达交换机/队列)

Confirm Callback策略

 确认消息是否成功到达交换机

spring:rabbitmq:/...publisher-confirm-type: correlate/none/simple

参数解释 

publisher-confirm-typeSpring AMQP 启用监听RabbitMQ 退回消息的机制
none关闭confirm机制
simple同步阻塞等待MQ回执消息
correlatedMQ异步回调方式返回回执消息

//创建关联数据
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 使用Lambda设置确认回调
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback((correlationData1, ack, cause) -> {
if (ack) {System.out.println("[消息确认] 成功 - ID: " + correlationData.getId());
} else {System.out.println("[消息确认] 失败 - ID: " + correlationData.getId() + ", 原因: " + cause);
}
});

Return Callback策略

确认消息是否被成功路由(异步等待回执消息)

spring:rabbitmq:/...publisher-returns: true
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(returnedMessage -> {//对于需要的接口泛型 接受一个参数 在{}体内写方法体System.err.println("消息投递失败");//...处理逻辑/});

MQ可靠性:

交换机持久化

队列持久化

 

消息持久化

在spring-amqp中消息默认持久化

发送非持久化消息

Message message = MessageBuilder.withBody("hello".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT) //PERSISTENT.build();

消息的持久化影响了我的MQ的吞吐量,同时保证了消息的可靠性。为了平衡可靠性性和性能,

RabbitMQ从 3.6版本 引入了惰性队列(lazy queue)。

让我们看一下为什么持久化是怎么影响性能的

如果我们的消息是持久化的,消息依旧会先存放在内存中,当我们的信息量达到一定程度后,服务器page out将消息扇出到磁盘中。这个时刻队列暂时无法接受消息,处于阻塞状态

为了避免从内存写入磁盘这段性能损失,lazy queue先将消息写入磁盘,然后根据消费者的读取速速,将消息扇出到内存中(最多2048条)。

RabbitMQ3.8版本及以下,如若使用多队列,需要手动配置。

从 RabbitMQ3.12 版本开始,所有队列都采用惰性队列,无法使用传统队列模式

消费者可靠性:

http://www.lryc.cn/news/572054.html

相关文章:

  • android 象棋游戏开发
  • Android Studio Profiler使用
  • 数据差异的iOS性能调试:设备日志导出和iOS文件管理
  • Redis之分布式锁(3)
  • 【深度学习】条件随机场(CRF)深度解析:原理、应用与前沿
  • Ubuntu 安装Telnet服务
  • Cursor Pro取消500次请求限制,无限用的体验更好了吗?
  • YOLOv8改进:Neck篇——2024.1全新MFDS-DETR的HS-FPN特征融合层解析
  • 图像特征检测算法SIFT
  • 实现自动胡批量抓取唯品会商品详情数据的途径分享(官方API、网页爬虫)
  • python校园拼团系统
  • voronoi图,凸壳,和早已遗忘的定不定积分
  • LangChain中的向量数据库抽象基类-VectorStore
  • MySQL存储引擎深度解析:InnoDB、MyISAM、MEMORY 与 ARCHIVE 的全面对比与选型建议
  • Docker PowerJob
  • 今天我想清楚了
  • Conda 常用命令大全:从入门到高效使用
  • Vue添加图片作为水印
  • 最大公约数
  • Espresso + Java 详细示例
  • 【音视频】PJSIP库——pjsua命令使用详解
  • CANFD加速是什么?和CANFD有什么区别?
  • 自演进多智能体在医疗临床诊疗动态场景中的应用
  • Jenkins审核插件实战:实现流水线审批控制的最佳实践
  • Vue.js第二节
  • 使用Trace分析Android方法用时
  • 利用Java进行验证码的实现——算数验证码
  • 【AI Study】第四天,Pandas(7)- 实际应用
  • 【图像处理基石】什么是EIS和OIS?
  • C++ Primer Plus 9.2.7 mutable