当前位置: 首页 > news >正文

如何保障消息一定能发送到RabbitMQ?

我们知道,RabbitMQ的消息最终是存储在Queue上的,而在Queue之前还要经过Exchange,那么这个过程中就有两个地方可能导致消息丢失。第一个是Producer到Exchange的过程,第二个是Exchange到Queue的过程。
在这里插入图片描述
为了解决这个问题,有两种方案,一种是通过confirm机制,另外一种是事务机制,因为事务机制并不推荐,这里先介绍Confirm机制。

Publisher Confirm是一种机制,用于确保消息已经被Exchange成功接收和处理。一旦消息成功到达Exchange并被处理,RabbitMQ会向消息生产者发送确认信号(ACK)。如果由于某种原因(例如,Exchange不存在或路由键不匹配)消息无法被处理,RabbitMQ会向消息生产者发送否定信号(NACK)。

//启用Publisher Confirmschannel.confirmSelect();//设置Publisher Confirms回调channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println("Message confirmed with deliveryTag:"+deliveryTag);//在这里处理消息确认}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.println("Message not confirmed with deliveryTag:"+deliveryTag);//在这里处理消息未确认}});

Publisher Returns机制与Publisher Confirms类似,但用于处理在消息无法路由到任何队列时的情况。当RabbitMQ在无法路由消息时将消息返回给消息生产者,但是如果能正常路由,则不会返回消息。

//启用Publisher Returnschannel.addReturnListener(new ReturnListener() {@Overridepublic void handleReturn(int replyCode, String replyTest, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Message returned with replayCode: "+replyCode);//在这里处理消息发送到Queue失败的返回}});

通过以上方式,我们注册了两个回调监听,用于在消息发送到Exchange或者Queue失败时进行异常处理。通常我们可以在失败时精心报警或者重试来保障一定能发送成功。

完整代码:

package com.example.demo.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;public class PublisherCallbacksExample {public static void main(String[] args) throws Exception{ConnectionFactory factory=new ConnectionFactory();factory.setHost("localhost");try(Connection connection=factory.newConnection();Channel channel=connection.createChannel()){//启用Publisher Confirmschannel.confirmSelect();//设置Publisher Confirms回调channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println("Message confirmed with deliveryTag:"+deliveryTag);//在这里处理消息确认}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.println("Message not confirmed with deliveryTag:"+deliveryTag);//在这里处理消息未确认}});//启用Publisher Returnschannel.addReturnListener(new ReturnListener() {@Overridepublic void handleReturn(int replyCode, String replyTest, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Message returned with replayCode: "+replyCode);//在这里处理消息发送到Queue失败的返回}});String exchangeName = "my_exchange";String routingKey = "my_routing_key";String message = "Hello,RabbitMQ!";//发布消息到Exchangechannel.basicPublish(exchangeName,routingKey,true,null,message.getBytes());//等待Publisher Confirmsif (!channel.waitForConfirms()) {System.out.println("Message was not confirmed!");}//关闭通道和连接channel.close();}}
}

另外,这里如果发送到Queue之后,是否一定能持久化下来,是否一定不丢,这就是另外一个话题了。

http://www.lryc.cn/news/323897.html

相关文章:

  • 【web前端】CSS语法
  • JS+CSS3点击粒子烟花动画js特效
  • docker镜像复制与常见命令
  • 如何在linux环境上部署单机ES(以8.12.2版本为例)
  • 如何利用人工智能技术实现企业营销效率提升10倍(下)
  • 【PHP + 代码审计】数组函数
  • Keepalive与idle监测及性能优化
  • DS-红黑树(RBTree)
  • ubuntu 如何使用阿里云盘
  • sqlite3 交叉编译
  • 【AI生成文章】flutter ChangeNotifierProvider 实用场景举例
  • 【0274】从shared init file或local init file加载relation cache(2 - 1)
  • 蓝桥杯-02-2023蓝桥杯c/c++省赛B组题目
  • 欧拉筛+并查集
  • 《桥接模式(极简c++)》
  • jconsole的使用
  • JavaScript详细教程
  • Hive自定义GenericUDF函数
  • 伊理威科技:抖音开网店新手刚做选啥品
  • 【爬虫】专栏文章索引
  • 【Linux】Linux开发工具-vim / 编译器-gcc/g++ / 调试器-gdb / git操作 / 项目自动化构建工具-make/Makefile
  • 解决VM重新打开后找不到共享文件夹的问题
  • uni app 空挡接龙
  • oracle表备份及还原
  • 牛客小白月赛89补题1(ABCD)(偏难)
  • 内存条@电脑支持的最大内存@升级内存硬件
  • 如何了解AI基础概念
  • Apache James数据库存储用户信息的密码加密问题
  • 大数据分布式事务的深入理解?
  • LeetCode hot100-17