当前位置: 首页 > news >正文

RabbitMQ防止消息丢失

生产者没有成功把消息发送到MQ
丢失的原因 :因为网络传输的不稳定性,当生产者在向MQ发送消息的过程中,MQ没有成功接收到消息,但是生产者却以为MQ成功接收到了消息,不会再次重复发送该消息,从而导致消息的丢失。

解决办法 : 有两个解决办法:事务机制和confirm机制,最常用的是confirm机制(发布确认机制)。

注意:

          RabbitMQ的事务机制是同步的,很耗型能,会降低RabbitMQ的吞吐量。

          confirm机制是异步的,生成者发送完一个消息之后,不需要等待RabbitMQ的回调,就可以发送下一个消息,当RabbitMQ成功接收到消息之后会自动异步的回调生产者的一个接口返回成功与否的消息。

两个机制说明如下:

confirm(发布确认)机制
解释:RabbitMQ可以开启 confirm 模式,在生产者那里设置开启 confirm 模式之后,生产者每次写的消息都会分配一个唯一的 id,如果消息成功写入 RabbitMQ 中,RabbitMQ 会给生产者回传一个 ack 消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你的一个 nack 接口,告诉你这个消息接收失败,生产者可以重新发送。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么可以重发。

代码

yml配置

----------------------------------------------------------------------------------------------------

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
/**
* 交换机回滚
*/
@Component
@Slf4j
public class ExchangeCallback implements RabbitTemplate.ConfirmCallback{/* correlationData 内含消息内容* ack 交换机接受成功或者失败。 true表示交换机接受消息成功, false表示交换机接受失败* cause 表示失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("hello world");String id = correlationData.getId();String message = new String(correlationData.getReturnedMessage().getBody());if (ack){log.info("交换机收到消息id为{}, 消息内容为{}", id, message);}else {log.info("交换机未收到消息id为{}, 消息内容为{}, 原因为{}", id, message, cause);}}
}

----------------------------------------队列防止消息丢失----------------------------------------------------------------

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;/*** 队列防止消息丢失*/
@Slf4j
@Component
public class QueueCallback implements RabbitTemplate.ReturnCallback{@Overridepublic void returnedMessage(Message message,int replyCode, String replyText, String exchange, String routingKey) {log.info("消息 {} 经交换机 {} 通过routingKey={} 路由到队列失败,失败code为:{}, 失败原因为:{}",new String(message.getBody()), exchange, routingKey, replyCode, replyText);}
}

--------------------------引用->controller-----------------------------------------------

//交换机回滚
@Autowired
private ExchangeCallback exchangeCallback;
//队列回滚
@Autowired
private QueueCallback queueCallback;
/*** 初始化交换机监听*/
@PostConstruct
public void init(){  
//交换机
rabbitTemplate.setConfirmCallback(exchangeCallback);
/*** true:交换机无法将消息进行路由时,会将该消息返回给生产者* false:如果发现消息无法进行路由,则直接丢弃*/
rabbitTemplate.setMandatory(true);
//队列
rabbitTemplate.setReturnCallback(queueCallback);
}
/*** 发送消息* 结果:"这是一条消息"*/@GetMapping("/sendMessageTest")public String sendMessageTest(){// 消息类型为object 发送对象也是可以的String msg = "这是一条消息";// 第一个参数为发送消息到那个交换机上,第二个是发送的路由键(交换机进行需要符合绑定的队列),第三个参数为发送的消息
//CommonUtils.dirExchange--自己的交换机名称
//CommonUtils.routingKey --路由Key值 rabbitTemplate.convertAndSend("1235",CommonUtils.routingKey,msg);System.out.println("消息发送成功:"+msg);return "发送成功;发送内容为:"+msg;}

运行结果:

http://www.lryc.cn/news/62188.html

相关文章:

  • ImageJ用户手册——第二部分(ImageJ操作)
  • Java中Lambda表达式(面向初学者)
  • 2023年淮阴工学院五年一贯制专转本数字电子技术考试大纲
  • 使用 GO 编写 Web 应用:学习如何使用 GO 语言编写 Web 应用,包括使用 HTTP 路由、模板引擎等。
  • Leetcode-day4【88】【167】【125】【345】
  • 【IoT】如何使用软件加密(文件夹加密工具.exe),并破解工具
  • Spring Boot——优雅的参数校验
  • 【c语言】typedef的基本用法 | 定义格式
  • 深度学习论文分享(二)Data-driven Feature Tracking for Event Cameras
  • 蛇优化算法
  • 循环神经网络(RNN)简单介绍—包括TF和PyTorch源码,并给出详细注释
  • Struts2 快速入门
  • 关于PullToRefreshView下拉刷新失效问题
  • JAVA开发中的六大原则
  • Matplotlib 安装
  • CF - Li Hua and Pattern
  • 重磅!阿里云云原生合作伙伴计划全新升级:加码核心权益,与伙伴共赢新未来
  • OSCP-Escape(gif绕过)
  • iMazing2023最新免费版iOS设备管理软件
  • Git上传文件代码到GitHub
  • JavaScript概述二(Date+正则表达式+Math+函数+面向对象)
  • 一个朋友弄来的,太牛了,特别是后面内容,不看不知道,一看吓一跳,电话,热线
  • VGA协议实践
  • 毕业5年的同学突然告诉我,他已经是年薪30W的自动化测试工程师....
  • 操作系统原理 —— 进程有哪几种状态?状态之间如何切换?(七)
  • 可算是熬出头了,测试4年,费时8个月,入职阿里,涨薪14K
  • 5款十分小众的软件,知道的人不多但却很好用
  • Linux驱动开发:uboot启动流程详解
  • 分治与减治算法实验: 排序中减治法的程序设计
  • leetcode两数、三数、四数之和