当前位置: 首页 > news >正文

浅析RabbitMQ死信队列

原文首发于公众号【CSJerry】
在这里插入图片描述

在现代分布式系统中,消息队列扮演着至关重要的角色。它们可以实现应用程序之间的异步通信,并确保数据的可靠传输和处理。而在这个领域中,RabbitMQ作为一种强大而受欢迎的消息队列解决方案,具备了高效、可靠和灵活的特性。

然而,即使使用了RabbitMQ,我们仍然会遇到一些不可预料的情况,例如消费者无法处理某些消息、消息过期或者队列溢出等。为了解决这些问题,RabbitMQ引入了死信队列(Dead Letter Queue)的概念,为开发人员提供了一种有效的错误处理机制。

那么,究竟什么是死信队列呢?

本文结合Spring Boot使用RabbitMQ的死信队列,着重从是什么、为什么、怎么用几个方面对死信队列进行简单介绍。

1. 是什么:

  • 死信队列(Dead Letter Queue)是一种特殊的消息队列,用于存储无法被消费的消息。
  • 当消息满足某些条件无法被正常消费时,将被发送到死信队列中进行处理。
  • 死信队列提供了一种延迟处理、异常消息处理等场景的解决方案。

2. 为什么

  • 用来处理消费者无法正确处理的消息,避免消息丢失或积压
  • 实现延迟消息处理,例如订单超时未支付,可以将该消息发送到死信队列,然后再进行后续处理。
  • 用于实现消息重试机制,当消费者处理失败时,将消息重新发送到死信队列进行重试。
  • 提高了系统的可伸缩性和容错性,能够应对高并发和异常情况。

3. 怎么用

  1. 在Spring Boot中配置和使用死信队列:
    • 首先,在pom.xml文件中添加RabbitMQ的依赖项。
    • 然后,在application.properties文件中配置RabbitMQ连接信息。
    • 接下来,创建生产者和消费者代码,并通过注解将队列和交换机进行绑定。
    • 在队列的声明中添加死信队列的相关参数,如x-dead-letter-exchangex-dead-letter-routing-key等。
    • 最后,在消费者中编写处理消息的逻辑,包括对异常消息进行处理,并设置是否重新发送到死信队列。

简而言之,死信队列可以认为是一个正常队列的备用队列(或者说是兜底队列),当正常队列的消息无法消费的时候mq会重新把该消息发送到死信交换机,由死信交换机根据路由键将消息投递到备用队列,启动服务备用方案。

消息从正常队列到死信队列的三种情况:

1、消息被否定确认使用 channel.basicNackchannel.basicReject ,并且此时requeue 属性被设置为false

2、消息在队列中的时间超过了设置的TTL())时间。

3、消息数量超过了队列的容量限制()。

当一个队列中的消息满足上述三种情况任一个时,改消息就会从原队列移至死信队列,若改队列没有绑定死信队列则消息被丢弃。

4. 实战

以下是一个简单的Spring Boot集成RabbitMQ的死信队列示例代码:

  • 配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=rabbit
spring.rabbitmq.password=123456
# 开启消费者手动确认
spring.rabbitmq.listener.type=direct# 发送到队列失败时的手动处理
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.publisher-returns=true# 发送到交换机手动确认
spring.rabbitmq.publisher-confirm-type=simple
  • 配置类
@Configuration
@Slf4j
public class RabbitCof {@Resourceprivate MqKeys mqKeys;@Bean("normalQueue")public Queue normalQueue() {/*** 为普通队列绑定交换机*/Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", mqKeys.DIE_EXCHANGE);args.put("x-dead-letter-routing-key", mqKeys.DIE_ROUTING_KEY);args.put("x-message-ttl", 1000); // 队列中的消息未被消费则1秒后过期return new Queue(mqKeys.NORMAL_QUEUE, true, false, false, args);}@Bean("normalExchange")public Exchange normalExchange() {return new DirectExchange(mqKeys.NORMAL_EXCHANGE);}@Bean("normalBind")public Binding normalBinding(@Qualifier("normalQueue") Queue normalQueue, @Qualifier("normalExchange") Exchange normalExchange) {return BindingBuilder.bind(normalQueue).to(normalExchange).with(mqKeys.ROUTING_KEY).noargs();}/*** 死信队列* @return*/@Bean("dieQueue")public Queue dlQueue() {return new Queue(mqKeys.DIE_QUEUE, true, false, false);}/*** 死信交换机* @return*/@Bean("dieExchange")public Exchange dlExchange() {return new DirectExchange(mqKeys.DIE_EXCHANGE);}@Bean("dieBind")public Binding dlBinding(@Qualifier("dieQueue") Queue dlQueue, @Qualifier("dieExchange") Exchange dlExchange) {return BindingBuilder.bind(dlQueue).to(dlExchange).with(mqKeys.DIE_ROUTING_KEY).noargs();}@Resourceprivate ConnectionFactory connectionFactory;@Beanpublic RabbitTemplate rabbitTemplate() {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);/*** 消费者确认收到消息后,手动ack回调处理* spring.rabbitmq.publisher-confirm-type=simple*/rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean ack, String cause)->{if(!ack) {log.info("消息投递到交换机失败,correlationData={} ,ack={}, cause={}", correlationData == null ? "null" : correlationData.getId(), ack, cause);} else {log.info("消息成功投递到交换机,correlationData={} ,ack={}, cause={}", correlationData == null ? "null" : correlationData.getId(), ack, cause);}});/*** 消息投递到队列失败回调处理* spring.rabbitmq.listener.direct.acknowledge-mode=manual* spring.rabbitmq.publisher-returns=true*/rabbitTemplate.setReturnsCallback((returnedMessage)->{Message message = returnedMessage.getMessage();log.error("分发到到队列失败, body->{}", message.getBody());});return rabbitTemplate;}
}
  • 生产者类
@Component
public class Producer {@Resourceprivate  MqKeys mqKeys;@Resourceprivate RabbitTemplate rabbitTemplate;public void sendMessage(String message) {rabbitTemplate.convertAndSend(mqKeys.NORMAL_EXCHANGE, mqKeys.ROUTING_KEY, message);}
}
  • 消费者类
@Component
@RabbitListener(queues = "normal.queue")
@Slf4j
public class Consumer {@RabbitHandlerpublic void handleMessage(String data, Message message, Channel channel) {boolean success = false;int retryCount = 3;System.out.println(message.toString());long deliveryTag = message.getMessageProperties().getDeliveryTag();while (!success && retryCount-- > 0){try {// 处理消息log.info("收到消息: {}, deliveryTag = {}", data, deliveryTag);// 正常处理完毕,手动确认,此处不确认让他进入死信队列
//                success = true;
//                channel.basicAck(deliveryTag, false);Thread.sleep(3 * 1000L);}catch (Exception e){log.error("程序异常:{}", e.getMessage());}}// 达到最大重试次数后仍然消费失败if(!success){try {log.info("move to die queue");// 手动拒绝,移至死信队列/***deliveryTag – the tag from the received AMQP.Basic.GetOk or AMQP.Basic.Delivermultiple – true to reject all messages up to and including the supplied delivery tag; false to reject just the supplied delivery tag.requeue – true if the rejected message(s) should be requeued rather than discarded/dead-lettered*/channel.basicNack(deliveryTag, false, false);} catch (IOException e) {e.printStackTrace();}}}
}

以上代码演示了如何在Spring Boot中配置一个普通队列和一个死信队列,然后通过生产者发送消息到普通队列,在消费者中处理消息,并模拟了当发生异常时将消息重新发送到死信队列。

参考连接

  • [rabbit 官网]Dead Letter Exchanges — RabbitMQ

  • 具体代码仓库

http://www.lryc.cn/news/110963.html

相关文章:

  • ELK 企业级日志分析系统(ElasticSearch、Logstash 和 Kiabana 详解)
  • 数学建模—多元线性回归分析
  • win10 64位 vs2017 qt5.12.6 pcl1.9.1 vtk8.1.1配置安装步骤
  • 【项目 计网1】4.1 网络结构模式 4.2MAC地址、IP地址、端口
  • uni-app:分页实现多选功能
  • 问道管理:沪指窄幅震荡跌0.18%,有色、汽车等板块走低
  • Kotlin 协程与 Flow
  • 设备管理系统与物联网的融合:实现智能化设备监控和维护
  • 三、从官方源码精简出第1个FreeRTOS
  • __call__函数的用法
  • golang定时任务库cron实践
  • Julia 流程控制
  • 问题解决方案
  • kubernetes基于helm部署gitlab-operator
  • ChatGPT在在线客服和呼叫中心中的应用如何?
  • C++多线程环境下的单例类对象创建
  • “深入解析JVM内部机制:从字节码到垃圾回收“
  • 音频系统项目与音频算法研究方向分类
  • 单例模式和工厂模式
  • 两个镜头、视野、分辨率不同的相机(rgb、红外)的视野校正
  • kettle 连接jdbc
  • PyTorch中加载模型权重 A匹配B|A不匹配B
  • @FeignClient指定多个url实现负载均衡
  • vue diff 双端比较算法
  • 初识React: 基础(概念 特点 高效原因 虚拟DOM JSX语法 组件)
  • 自监督去噪:Neighbor2Neighbor原理分析与总结
  • 简单工厂模式(Simple Factory)
  • Agent:OpenAI的下一步,亚马逊云科技站在第5层
  • JMeter 4.x 简单使用
  • 深入NLTK:Python自然语言处理库高级教程