当前位置: 首页 > news >正文

RabbitMQ消息可靠性投递

RabbitMQ消息投递的路径为:
生产者 —> 交换机 —> 队列 —> 消费者
在RabbitMQ工作的过程中,每个环节消息都可能传递失败,那么RabbitMQ是如何监听消息是否成功投递的呢?
确认模式(confirm)可以监听消息是否从生产者成功传递到交换机。
退回模式(return)可以监听消息是否从交换机成功传递到队列。
消费者消息确认(Consumer Ack)可以监听消费者是否成功处理消息。

开启确认模式

publisher-confirm-type: correlated

@SpringBootTest
public class ProducerTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testConfirm(){// 定义确认模式的回调方法,消息向交换机发送后会调用confirm方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/*** 被调用的回调方法* @param correlationData 相关配置信息* @param ack 交换机是否成功收到了消息* @param cause 失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack){System.out.println("confirm接受成功!");}else{System.out.println("confirm接受失败,原因为:"+cause);// 做一些处理。}}});rabbitTemplate.convertAndSend("my_topic_exchange","my_routing","send message...");}
}

消息回退

spring:rabbitmq:host: 192.168.0.162port: 5672username: guestpassword: guestvirtual-host: /# 开启确认模式publisher-confirm-type: correlated# 开启回退模式publisher-returns: true
@SpringBootTest
public class ProducerTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testReturn(){// 定义退回模式的回调方法。交换机发送到队列失败后才会执行returnedMessage方法rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {/*** @param returned 失败后将失败信息封装到参数中*/@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.println("消息对象:"+returned.getMessage());System.out.println("错误码:"+returned.getReplyCode());System.out.println("错误信息:"+returned.getReplyText());System.out.println("交换机:"+returned.getExchange());System.out.println("路由键:"+returned.getRoutingKey());// 处理消息...}});rabbitTemplate.convertAndSend("my_topic_exchange","my_routing1","send message...");}
}

消息确认
自动确认:spring.rabbitmq.listener.simple.acknowledge=“none”
手动确认:spring.rabbitmq.listener.simple.acknowledge=“manual”
消费者处理消息时定义手动签收和拒绝签收的情况

@Component
public class AckConsumer {@RabbitListener(queues = "my_queue")public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException {// 消息投递序号,消息每次投递该值都会+1long deliveryTag = message.getMessageProperties().getDeliveryTag();try {int i = 1/0; //模拟处理消息出现bugSystem.out.println("成功接受到消息:"+message);// 签收消息/*** 参数1:消息投递序号* 参数2:是否一次可以签收多条消息*/channel.basicAck(deliveryTag,true);}catch (Exception e){System.out.println("消息消费失败!");Thread.sleep(2000);// 拒签消息/*** 参数1:消息投递序号* 参数2:是否一次可以拒签多条消息* 参数3:拒签后消息是否重回队列*/channel.basicNack(deliveryTag,true,true);}}
}
http://www.lryc.cn/news/222779.html

相关文章:

  • 汽车网络安全渗透测试概述
  • NOIP2023模拟14联测35 charlotte
  • 绿色科技和可持续发展技术
  • 建链时,please install openssl! use “openssl version“ command to check.
  • “Redis与Spring整合及缓存优化“
  • 腾讯云3年云服务器价格及购买教程
  • cortex-A7核 中断实验(按键中断实验)
  • .NET Framework中自带的泛型委托Action
  • DAIR-V2X-V 3D检测数据集 转为Kitti格式 | 可视化
  • 深入理解指针:【探索指针的高级概念和应用二】
  • 腾讯觅影数智医疗影像平台获颁世界互联网领先科技成果大奖
  • 鸿蒙开发工具DevEco Studio的下载和安装
  • 【原理篇】四、自定义starter
  • redisTemplate不支持zpopmax,解决方案使用reverseRangeWithScore
  • 基于深度模型的日志异常检测
  • 最大连续子数组
  • 【FastCAE源码阅读5】使用VTK实现鼠标拾取对象并高亮
  • 【全志H616 使用标准库 完成自制串口库(分文件实现) orangepi zero2(开源)】.md updata: 23/11/07
  • 小白学爬虫:手机app分享商品短连接获取淘宝商品链接接口|淘宝淘口令接口|淘宝真实商品链接接口|淘宝商品详情接口
  • python 应用之 request 请求调用
  • BeanUtils.copyProperties浅拷贝的坑你得知道?
  • ubuntu安装rabbitMQ 并 开启记录消息的日志
  • 思维模型 首因效应
  • Redis极速上手开发手册【Redis全面复习】
  • [动态规划] (十四) 简单多状态 LeetCode LCR 091.粉刷房子
  • 【VSS版本控制工具】
  • 数据持久化技术(Python)的使用
  • 第23章(上)_索引原理之索引与约束
  • 金蝶云星空BOS设计器中基础资料字段属性“过滤”设置获取当前界面的基础资料值作为查询条件
  • OFDM深入学习及MATLAB仿真