当前位置: 首页 > news >正文

RabbitMQ 高级特性之发送方确认

1. 简介

当消息从 producer 发送给 broker是,消息丢失了,那么 broker 就不会收到消息,那么后续的持久化机制、消息确认模式也就没有作用。于是,我们需要尽可能地保证 producer 发送地消息成功到达 broker。broker 中包含交换机与队列,于是,我们不仅需要保证消息能够成功到达交换机,也需要成功到达我们所指定的队列。

2. 事务与 publisher cnofirm(发送方确认)机制

对于上面这个问题,RabbitMQ 提出了两种解决策略,即:

  • 使用事务
  • publisher confirm 机制

由于事务会比较消耗性能,下面就主要讲解 publisher confirm 机制。

3. publisher confirm 机制

在 publisher confirm 机制下,分为两个模式:

  • confirm 确认模式
  • return 退回模式

这两个模式都需要在配置文件中添加下面的配置项:

spring:rabbitmq:publisher-confirm-type: correlated #消息发送确认

3.1 confirm 确认模式

下面是生产者的代码:

    @RequestMapping("/confirm")public String confirm() {String message = "confirm test";/*** 若在此处设置回调函数,那么会影响到所有 rabbitTemplate* 并且只能发送一次消息,因为发送那个多次消息就相当于设置了多个回调函数,规定只能设置一次*/rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/**** @param correlationData correlation data for the callback.* @param ack true: 消息到达交换机; false: 消息没有到达交换机* @param cause An optional cause, for nack, when available, otherwise null.*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {System.out.println("接收到消息, 消息 id: " + (null == correlationData ? null : correlationData.getId()));} else {System.out.println("未接收到消息, 消息 id: " + (null == correlationData ? null : correlationData.getId()) + ", cause: " + cause);}}});String id = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(id);confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, Constants.CONFIRM_ROUTINGKEY, message, correlationData);return "消息发送成功";}

上面的代码与在 spring 环境中发送消息的代码类似,但是多出了设置回调函数这一步。

ConfirmCallback 即为回调函数,当消息不论消息是否发哦是那个到了 broker,都会给 producer 发送一个 ack,若 ack 为 true,就表示消息到达了交换机;若 ack 为 false,就表示消息没有到达交换机。我们可以根据 ack 来进行不同的业务操作。

代码运行结果如下:

 当连续发送两条消息时的结果如下:

报错信息中显示:一个 RabbitMQTemplate 实例是能设置一次 ConfirmCallback。

但是当我们连续两次调用该接口时,就导致 ConfirmCallback 被创建了两次,也就会报错。对于这种情况,我们可以将 RabbitMQTemplate 提取出来,自定义一个 RabbitMQTemplate,在这个类中通过设置回调函数达到之创建一次就能使用多次的效果,代码如下:

@Configuration
public class RabbitTemplateConfig {@Bean("normalRabbitTemplate")public RabbitTemplate normalRabbitTemplate(ConnectionFactory connectionFactory) {return new RabbitTemplate(connectionFactory);}@Bean("confirmRabbitTemplate")public RabbitTemplate confirmRabbitTemplate1(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/**** @param correlationData correlation data for the callback.* @param ack true: 消息到达交换机; false: 消息没有到达交换机* @param cause An optional cause, for nack, when available, otherwise null.*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {System.out.println("接收到消息, 消息 id: " + messageId);} else {System.out.println("未接收到消息, 消息 id: " + messageId + ", cause: " + cause);}}});return rabbitTemplate;}
}

 在上面的配置类中,我们创建了两个方法用来返回 RabbitMQTemplate,这是因为如果我们只创建 confirmRabbitMQTemplate,那么别的接口使用到的 RabbitMQTemplate 就是我们修改后的结果,这时就需要再定义一个 normalRabbitMQTemplate 用来返回没有设置回调函数的 RabbitMQTemplate。这样就不会影响别的接口使用。调用者根据自己的需要注入不同的实例。

修改后的接口如下:

@RequestMapping("/producer")
@RestController
public class ProducerController {@Resource(name = "normalRabbitTemplate")private RabbitTemplate rabbitTemplate;@Resource(name = "confirmRabbitTemplate")private RabbitTemplate confirmRabbitTemplate;/*** 确认模式*/@RequestMapping("/confirm")public String confirm() {String message = "confirm test";String id = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(id);confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, Constants.CONFIRM_ROUTINGKEY, message, correlationData);return "消息发送成功";}}

接下来再连续调用这个接口,观察是否会报错:

这一次代码就没有报错。 

当我们使用了未声明的交换机,代码的运行结果如下:

未接收到消息, 消息 id: dcfab6f0-0d9a-470f-9f8e-5b225df71021, 
cause: channel error; 
protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm.exchangeaaa' in vhost 'extension', class-id=60, method-id=40)

报错信息中表示没有该交换机。

当我们使用了错误的 BindingKey,代码的运行结果如下:

这里居然显示消息发送成功,但是上面的代码使用了错误的 BindingKey,消息只是到了交换机,交换机并没有根据 BindingKey 找到与之绑定的队列,那么消息也就没有到达队列中,即消息发送失败。

但是为什么会出现上面那种情况呢?这就需要使用到退回模式。

3.2 return 退回模式

在确认模式末尾,我们发现,即使消息没有成功到达队列,依然会提示消息发送成功,这是不符合逻辑的。

在 RabbitMQ 中,confirm 确认模式之是保证了 producer 与 交换机之间的消息可靠传输,并没有保证交换机与队列之间的可靠传输。于是,return 退回模式就针对这一缺陷进行补充。

当消息没有成功到达队列后,就会将这条消息退回给 producer,并且携带退回的原因。

退回模式与确认模式并不冲突,二者可以分开,也可以同时存在,下面的代码将二者合并在一起进行编写,代码如下:

@Configuration
public class RabbitTemplateConfig {/**** @param connectionFactory 会根据配置文件中的 rabbitmq 配置自动填充参数* @return*/@Bean("normalRabbitTemplate")public RabbitTemplate normalRabbitTemplate(ConnectionFactory connectionFactory) {return new RabbitTemplate(connectionFactory);}/*** 确认模式使用* 使用 AtomicBoolean, 在消息被退回时打印不同的日志* @param connectionFactory* @return*/@Bean("confirmRabbitTemplate1")public RabbitTemplate confirmRabbitTemplate1(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/**** @param correlationData correlation data for the callback.* @param ack true: 消息到达交换机; false: 消息没有到达交换机* @param cause An optional cause, for nack, when available, otherwise null.*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String messageId = null == correlationData ? null : correlationData.getId();if (ack) {System.out.println("接收到消息, 消息 id: " + messageId);} else {System.out.println("未接收到消息, 消息 id: " + messageId + ", cause: " + cause);}}});/*** 若消息没有到达队列,就退回*/rabbitTemplate.setMandatory(true); //启用强制路由检查rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.println("消息退退回: " + returned);}});return rabbitTemplate;}
}

在上面的代码中,使用 setMandatory 开启强制路由检查,当 setMandatory 设置为 true,那么就会进行消息是否成功到达队列的判断,若没有声明为 true,即使编写了 return 模式的代码,也就不会生效。

我们使用 ReturnCallBack 回调函数实现消息没有到达队列后的逻辑。

当我们使用了错误的 BindingKey,代码的运行结果如下:

消息退退回: ReturnedMessage [message=(Body:'confirm test' MessageProperties [headers={spring_returned_message_correlation=f4db7a24-f6aa-499c-8409-65c42605576a}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=confirm.exchange, routingKey=keyaaa]接收到消息, 消息 id: f4db7a24-f6aa-499c-8409-65c42605576a

这次就识别出了消息没有成功到达队列的情况,但是依然有一个不足:

既然消息没有发送到指定队列,那为什么还会弹出接收到消息的日志呢?

于是我们需要针对上面的代码进行改变,此处不过多进行讲解。

http://www.lryc.cn/news/579590.html

相关文章:

  • NV133NV137美光固态闪存NV147NV148
  • c++中的绑定器
  • 在Linux服务器上使用kvm创建虚拟机
  • 国内MCP服务平台推荐!aibase.cn上线MCP服务器集合平台
  • 儿童几岁开始可以使用益智玩具?
  • 解决python报not found libzbar-64.dll的问题
  • 基于 SpringBoot+Vue.js+ElementUI 的 “花开富贵“ 花园管理系统设计与实现7000字论文
  • 基于Hadoop的餐饮大数据分析系统的设计与实现
  • 刷卡登入数据获取
  • 纯前端批量下载
  • CPT204-Advanced OO Programming: Sorting排序
  • 扣子空间PPT生产力升级:AI智能生成与多模态创作新时代
  • JS模块导出导入笔记 —— 默认导出 具名导出
  • 行波进位加法器 (Carry-Propagate Adder)
  • UE5 瞄准偏移(AimOffset)功能详解
  • 独立开发者软件出海:如何用Semrush高效洞察与增长
  • RJ45 连接器(水晶头)的引脚定义
  • 贪心专题练习
  • 强实时运动控制内核MotionRT750(一):驱动安装、内核配置与使用
  • 马斯克脑机接口(Neuralink)技术进展,已经实现瘫痪患者通过BCI控制电脑、玩视频游戏、学习编程,未来盲人也能恢复视力了
  • OpenEuler 24.03 用 Ansible 一键完成 SSH 互信 —— 从踩坑到最终方案
  • 站在 Java 程序员的角度如何学习和使用 AI?从 MVC 到智能体,范式变了!
  • 渗透测试中 phpinfo() 的信息利用分析
  • Part 0:射影几何,变换与估计-第三章:3D射影几何与变换
  • 工作中用到过哪些设计模式?是怎么实现的?
  • Robot---能打羽毛球的机器人
  • Linux操作系统之文件(二):重定向
  • 物联网MQTT协议与实践:从零到精通的硬核指南
  • 【王阳明代数】基于Perplexica二次开发的道装资源标识符与重定向知识路由系统
  • 使用HAProxy搭建Web群集:原理、步骤与实战总结