当前位置: 首页 > news >正文

消息队列RabbitMQ.03.死信交换机的讲解与使用

目录

一、死信队列(延迟队列) 概念讲解

二、确认消息(局部方法处理消息)

三、代码实战

1.编写生产者代码,配置消息、直连交换机、路由键

 1.1代码解析:

2.配置消费者接受类接受直连交换机的路由键

2.1.  String msg,Channel channel ,@Header(AmqpHeaders.DELIVERY_TAG) ,long tag 方法参数解析:

  2.2.channel.basicAck(tag,true); 代码解析

2.3.channel.basicReject(tag,true); 代码解析:

 3.对死信交换机进行测试


 

一、死信队列(延迟队列) 概念讲解


死信,在官网中对应的单词为 “Dead Letter”, 它是 RabbitMQ 的一种消息机制。
一般来说,生产者将消息投递到 broker 或者直接到 queue 里了, consumer queue 取出消息进行消费,如果它一直无法消费某条数据,那么可以把这条消息放入死信队列里面。等待
条件满足了再从死信队列中取出来再次消费,从而避免消息丢失。
死信消息来源:
  • 消息 TTL 过期
  • 队列满了,无法再次添加数据
  • 消息被拒绝(reject nack),并且 requeue =false

二、确认消息(局部方法处理消息)


 默认情况下消息消费者是自动 ack (确认)消息的,如果要手动 ack(确认)则需要修改确认模式为 manual

spring:rabbitmq:listener:simple:acknowledge-mode: manual

三、代码实战


1.编写生产者代码,配置消息、直连交换机、路由键

 @Beanpublic Queue queueA() {//正常Map<String, Object> config = new HashMap<>();//message在该队列queue的存活时间最大为10秒config.put("x-message-ttl", 10000);//x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)config.put("x-dead-letter-exchange", "ExchangeB");//x-dead-letter-routing-key参数是给这个DLX指定路由键config.put("x-dead-letter-routing-key", "bb");return new Queue("queueA",true,false,false,config);}@Beanpublic DirectExchange ExchangeA(){return new DirectExchange("ExchangeA");}@Beanpublic Binding bindingA(){return BindingBuilder.bind(queueA()).to(ExchangeA()).with("aa");}@Beanpublic Queue queueB() {return new Queue("queueB");}@Beanpublic DirectExchange ExchangeB(){return new DirectExchange("ExchangeB");}@Beanpublic Binding bindingB(){return BindingBuilder.bind(queueB()).to(ExchangeB()).with("bb");}
 1.1代码解析:

 Map<String, Object> config = new HashMap<>();
        //message在该队列queue的存活时间最大为10秒
        config.put("x-message-ttl", 10000);
        //x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)
        config.put("x-dead-letter-exchange", "ExchangeB");
        //x-dead-letter-routing-key参数是给这个DLX指定路由键
        config.put("x-dead-letter-routing-key", "bb");
        return new Queue("queueA",true,false,false,config);

首先我们需要知道死信产生的原因:

  • 消息过期: 消息在队列中等待的时间超过了指定的过期时间。
  • 消息被拒绝: 消费者拒绝消费消息,并且消息被标记为不可重新投递。
  • 队列满: 队列达到最大容量,无法再接收新的消息。

上述代码的作用是让消息等待10秒,如果10秒内没有进入消费者或者没有被操作那么它就会进入死信;我们就会将它放入消息B也就是当作死信就行处理。

 return new Queue("queueA",true,false,false,config); 中参数的作用:

  • true: 持久化,表示队列在消息代理(例如 RabbitMQ)重启后仍然存在。
  • false: 非独占,表示该队列不会被其他连接独占使用。
  • false: 不自动删除,表示即使没有消费者连接,队列也不会被自动删除。
  • config: 包含了上述配置的 Map 对象,将这些配置应用到队列上。

总体来说,这段代码创建了一个具有消息过期和死信队列功能的队列 "queueA",并配置了过期消息发送到名为 "ExchangeB" 的交换器,并指定了死信的路由键为 "bb"。这样的配置在处理消息的时候能够更加灵活,并且对于消息的生命周期有了额外的控制。


2.配置消费者接受类接受直连交换机的路由键

queueA

@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "queueA")
public class ReceiverQA {@RabbitHandler//手动确认消息public void process(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{log.warn("QA接收到:" + msg);//channel.basicAck(tag,true);//拒绝 ture是否重新入队 会一直 循环 false 会直接变成死信channel.basicReject(tag,true);Thread.sleep(1000);}
}
2.1.  String msg​​​​​​​Channel channel​​​​​​​ @Header(AmqpHeaders.DELIVERY_TAG) ,long tag 方法参数解析:
  • Channel channel

    • Channel 对象表示与消息代理(例如 RabbitMQ)建立的通信通道。该通道提供了与消息代理进行交互的方法,如确认消息、拒绝消息等。在这个方法中,channel 参数用于与消息代理进行通信。
  • @Header(AmqpHeaders.DELIVERY_TAG) long tag

    • @Header 注解用于提取消息头中的信息。在这里,通过 AmqpHeaders.DELIVERY_TAG 提取了消息的传送标签(delivery tag)。
    • long tag 表示消息的唯一标识符。它是一个标识消息的数字,通常在消息代理传递消息给消费者时分配。这个标识符可以用于在确认、拒绝或重新排队消息时指定特定的消息。

总体来说,这个方法是一个消息处理方法,其中 message 参数表示消息的内容,channel 参数用于与消息代理进行通信,而 tag 参数表示消息的唯一标识符,用于在消息确认、拒绝等操作中指定特定的消息。这样的方法通常用于处理从消息队列中接收到的消息。

  2.2.channel.basicAck(tag,true); 代码解析

 channel.basicAck(tag,true); 用于确认(acknowledge)消息的方法,通常在消费者成功处理消息后调用。以下是该方法的作用和参数含义:

  • channel

    • 这是表示与消息代理建立的通信通道的对象。在很多消息队列系统中,通常通过该通道进行消息的发布、消费等操作。
  • basicAck

    • 这是确认消息的方法。它告诉消息代理,消费者已经成功处理了某个特定的消息。
  • tag

    • 这是消息的唯一标识符或者句柄。在消费者接收到消息时,消息会被分配一个唯一的标识符,这个标识符用于确认或拒绝特定的消息。
  • true

    • 这是一个布尔值,通常表示确认多个消息。如果设置为 true,则表示确认到指定 tag 及其之前的所有消息。如果设置为 false,则仅确认指定的消息。
2.3.channel.basicReject(tag,true); 代码解析:

  channel.basicReject(tag,true);是在使用 AMQP(Advanced Message Queuing Protocol)中的通道(channel)对象时,用于拒绝一条消息的方法。让我们解释这个方法及其参数的意义:

  • tag 参数:

    • tag 表示消息的唯一标识符,通常是通过消费者获取的消息标签。它用于标识要拒绝的特定消息。
  • multiple 参数:

    • multiple 是一个布尔值,用于指定是拒绝单个消息还是多个消息。如果 multiple 为 false,则表示仅拒绝标记为 tag 的单个消息。如果 multiple 为 true,则表示拒绝所有比 tag 小的、未被确认的消息。在这里,true 表示拒绝所有比 tag 小的未确认消息。
  • 作用:

    1. channel.basicReject(tag, true) 的作用是拒绝一条或多条消息。当消费者无法处理接收到的消息时,可以使用该方法将消息标记为拒绝,并根据需要将其重新排队或进入死信队列。

    2. 通过将 multiple 设置为 true,可以一次性拒绝多条消息,这对于批量处理消息的情况很有用。

    3. 拒绝消息会触发相应的处理机制,例如将消息重新排队或将其发送到死信交换器,具体取决于消息代理的配置。

总体来说,channel.basicReject(tag, true) 是用于拒绝一条或多条消息的方法,其中 tag 表示消息的标签,而 multiple 决定了是拒绝单个消息还是多个消息。

queueB 

@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "queueB")
public class ReceiverQB {@RabbitHandlerpublic void process(String msg) {log.warn("QB接收到:" + msg);//去数据库做修改//更改数据库的订单状态为取消//update order set status=-1 where id=#{id} and status=1}
}

 

 3.对死信交换机进行测试

  @RequestMapping("/send6")public String send6(){template.convertAndSend("ExchangeA","aa","43249849234");return "🤣";}

最后页面网址访问方法/send6即可

http://www.lryc.cn/news/287445.html

相关文章:

  • 人工智能原理实验4(2)——贝叶斯、决策求解汽车评估数据集
  • 算力网络:未来计算资源的驱动力
  • java动态导入excel按照表头生成数据库表
  • Java 集合List相关面试题
  • k8s-基础知识(Pod,Deployment,ReplicaSet)
  • matlab查看源代码
  • 【数据库学习】PostgreSQL优化
  • 微信小程序分页加载功能,结合后端实现上拉底部加载下一页数据,数据加载中和暂无数据提示
  • idea 打包跳过测试
  • python sqlite3 线程池封装
  • 亚马逊运营:如何通过自养号测评有效防关联,避免砍单
  • winfrom图像加速渲染时图像不显示
  • Redash 默认key漏洞(CVE-2021-41192)复现
  • Git学习笔记:3 git tag命令
  • 10年软件测试经验,该有什么新的职业规划?
  • 重构改善既有代码的设计-学习(四):简化条件逻辑
  • 【代码---利用一个小程序,读取文件夹中图片,将其合成为一个视频】
  • MVC 和 MVVM的区别
  • redis—Set集合
  • 【jetson笔记】vscode远程调试
  • 大数据处理流程包括哪些环节
  • C++入门篇章1(C++是如何解决C语言不能解决的问题的)
  • java复习篇 数据结构:链表第一节
  • 深入理解与运用Lombok的@Cleanup注解:自动化资源管理利器
  • 【LeetCode每日一题】2865. 美丽塔 I
  • Cute Http File Server 使用文章
  • c#算法(10)——求点到直线的距离
  • [小脚本] maya 命令行常用操作
  • 数据结构·单链表
  • Redis(秒杀活动、持久化之RDB、AOF)