当前位置: 首页 > news >正文

RabbitMQ-死信队列常见用法

目录

一、什么是死信

 二、什么是死信队列 

​编辑 三、第一种情景:消息被拒绝时

四、第二种场景:. 消费者发生异常,超过重试次数 。 其实spring框架调用的就是 basicNack

五、第三种场景: 消息的Expiration 过期时长或队列TTL过期时间。

六、 第四种情景:  消息队列达到最大容量


一、什么是死信

    在RabbitMQ 中充当主角的就是消息,在不同场景下,消息会有不同地表现。

死信就是消息在特定场景下的一种表现形式,这些场景包括:

1. 消息被拒绝访问,即 RabbitMQ返回 basicNack 的信号时 或者拒绝basicReject

2. 消费者发生异常,超过重试次数 。 其实spring框架调用的就是 basicNack

3. 消息的Expiration 过期时长或队列TTL过期时间

4. 消息队列达到最大容量

上述场景经常产生死信,即消息在这些场景中时,被称为死信。

 二、什么是死信队列 

    死信队列就是用于储存死信的消息队列,在死信队列中,有且只有死信构成,不会存在其余类型的消息。

   死信队列在 RabbitMQ 中并不会单独存在,往往死信队列都会绑定这一个普通的业务消息队列,当所绑定的消息队列中,有消息变成死信了,那么这个消息就会重新被交换机路由到指定的死信队列中去,我们可以通过对这个死信队列进行监听,从而手动的去对这一消息进行补偿。 人工干预

 三、第一种情景:消息被拒绝时

#设置消费者手动应答模式

spring.rabbitmq.listener.simple.acknowledge-mode = manual

package com.by.consumer;import com.by.model.OrderingOk;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.io.IOException;@Configuration
@Slf4j
public class DeadConsumer {//死信交换机@Beanpublic DirectExchange deadExchange() {return ExchangeBuilder.directExchange("Dead_E01").build();}//死信队列@Beanpublic Queue deadQueue1() {return QueueBuilder.durable("Dead_Q01").build();}//死信交换机与死信队列的绑定@Beanpublic Binding deadBinding1(Queue deadQueue1, DirectExchange deadExchange) {return BindingBuilder.bind(deadQueue1).to(deadExchange).with("RK_DEAD");}//业务队列@Beanpublic Queue queue1() {return QueueBuilder.durable("Direct_Q01").deadLetterExchange("Dead_E01").deadLetterRoutingKey("RK_DEAD")//.ttl(10*1000) //该属性是队列的属性,设置消息的过期时间,消息在队列里面停留时间n毫秒后,就会把这个消息投递到死信交换机,针对的是所有的消息//.maxLength(20) //设置队列存放消息的最大个数,x-max-length属性值,当队列里面消息超过20,会把队列之前的消息依次放进死信队列.build();}//业务交换机@Beanpublic DirectExchange exchange() {return ExchangeBuilder.directExchange("Direct_E01").build();}//业务交换机与队列的绑定@Beanpublic Binding binding1(Queue queue1, DirectExchange exchange) {return BindingBuilder.bind(queue1).to(exchange).with("RK01");}//@RabbitListener(queues = "Direct_Q01")
//    public void receiveMessage(OrderingOk msg) {
//        log.info("消费者1 收到消息:"+ msg );
//        int  i= 5/0;
//    }@RabbitListener(queues = "Direct_Q01")public void receiveMessage(OrderingOk msg, Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.println("消费者1 收到消息:" + msg + " tag:" + deliveryTag);channel.basicReject(deliveryTag, false);
//        try {
//            // 处理消息...
//            int  i= 5/0;
//            // 如果处理成功,手动发送ack确认 ,Yes
//            channel.basicAck(deliveryTag, false);
//        } catch (Exception e) {
//            // 处理失败,可以选择重试或拒绝消息(basicNack或basicReject)  NO
//            channel.basicNack(deliveryTag, false, false); // 并重新入队
//
//        }}
}

四、第二种场景:. 消费者发生异常,超过重试次数 。 其实spring框架调用的就是 basicNack

一般要和自动重启一起使用,否则死信队列收不到消息

#设置消费者自动应答模式
spring.rabbitmq.listener.simple.acknowledge-mode = auto
#开启自动应答重试机制
spring.rabbitmq.listener.simple.retry.enabled=true

package com.by.consumer;import com.by.model.OrderingOk;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.io.IOException;@Configuration
@Slf4j
public class DeadConsumer {//死信交换机@Beanpublic DirectExchange deadExchange() {return ExchangeBuilder.directExchange("Dead_E01").build();}//死信队列@Beanpublic Queue deadQueue1() {return QueueBuilder.durable("Dead_Q01").build();}//死信交换机与死信队列的绑定@Beanpublic Binding deadBinding1(Queue deadQueue1, DirectExchange deadExchange) {return BindingBuilder.bind(deadQueue1).to(deadExchange).with("RK_DEAD");}//业务队列@Beanpublic Queue queue1() {return QueueBuilder.durable("Direct_Q01").deadLetterExchange("Dead_E01").deadLetterRoutingKey("RK_DEAD")//.ttl(10*1000) //该属性是队列的属性,设置消息的过期时间,消息在队列里面停留时间n毫秒后,就会把这个消息投递到死信交换机,针对的是所有的消息//.maxLength(20) //设置队列存放消息的最大个数,x-max-length属性值,当队列里面消息超过20,会把队列之前的消息依次放进死信队列.build();}//业务交换机@Beanpublic DirectExchange exchange() {return ExchangeBuilder.directExchange("Direct_E01").build();}//业务交换机与队列的绑定@Beanpublic Binding binding1(Queue queue1, DirectExchange exchange) {return BindingBuilder.bind(queue1).to(exchange).with("RK01");}//@RabbitListener(queues = "Direct_Q01")
//    public void receiveMessage(OrderingOk msg) {
//        log.info("消费者1 收到消息:"+ msg );
//        int  i= 5/0;
//    }@RabbitListener(queues = "Direct_Q01")public void receiveMessage(OrderingOk msg, Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.println("消费者1 收到消息:" + msg + " tag:" + deliveryTag);int a=10/0;
//        channel.basicReject(deliveryTag, false);
//        try {
//            // 处理消息...
//            int  i= 5/0;
//            // 如果处理成功,手动发送ack确认 ,Yes
//            channel.basicAck(deliveryTag, false);
//        } catch (Exception e) {
//            // 处理失败,可以选择重试或拒绝消息(basicNack或basicReject)  NO
//            channel.basicNack(deliveryTag, false, false); // 并重新入队
//
//        }}
}

五、第三种场景: 消息的Expiration 过期时长或队列TTL过期时间

    //业务队列@Beanpublic Queue queue1() {return QueueBuilder.durable("Direct_Q01").deadLetterExchange("Dead_E01").deadLetterRoutingKey("RK_DEAD").ttl(10*1000) //该属性是队列的属性,设置消息的过期时间,消息在队列里面停留时间n毫秒后,就会把这个消息投递到死信交换机,针对的是所有的消息//.maxLength(20) //设置队列存放消息的最大个数,x-max-length属性值,当队列里面消息超过20,会把队列之前的消息依次放进死信队列.build();}

六、 第四种情景:  消息队列达到最大容量

    //业务队列@Beanpublic Queue queue1() {return QueueBuilder.durable("Direct_Q01").deadLetterExchange("Dead_E01").deadLetterRoutingKey("RK_DEAD")
//                .ttl(10*1000) //该属性是队列的属性,设置消息的过期时间,消息在队列里面停留时间n毫秒后,就会把这个消息投递到死信交换机,针对的是所有的消息.maxLength(5) //设置队列存放消息的最大个数,x-max-length属性值,当队列里面消息超过5,会把队列之前的消息依次放进死信队列.build();}
    /*** 测试直联交换机** @throws IOException* @throws InterruptedException*/@Testvoid contextLoads() throws IOException, InterruptedException {for (int i = 0; i < 8; i++) {OrderingOk orderingOk = OrderingOk.builder().id(1).name("张三"+i).build();directProvide.send(orderingOk);}System.in.read();}

id为1,2,3的被装进了死信队列,因为数据太老,业务队列优先要新的数据 

http://www.lryc.cn/news/338664.html

相关文章:

  • 2024/4/14周报
  • MySQL 社区版 安装总结
  • 二叉排序树的增删改查(java版)
  • linux下coredump问题的定位分析方法
  • 第十届蓝桥杯省赛真题(C/C++大学B组)
  • Scrapy 爬取m3u8视频
  • LVGL简单记录
  • 计算机网络——ARP协议
  • 【C++]C/C++的内存管理
  • 深入理解计算机网络分层结构
  • 亚马逊云科技CTO带你学习云计算降本增效秘诀
  • 快速上手Vue
  • java 目录整理
  • 使用Python的Pillow库进行图像处理书法参赛作品
  • docker 容器指定utf-8编码
  • 单例模式以及常见的两种实现模式
  • Java hashCode() 和 equals()的若干问题解答
  • 高级IO——React服务器简单实现
  • Qt使用插件QPluginLoader 机制开发
  • 双子座 Gemini1.5和谷歌的本质
  • 二百三十、MySQL——MySQL表的索引
  • 并发编程之ThreadLocal使用及原理
  • 软件测试 测试开发丨Pytest结合数据驱动-yaml,熬夜整理蚂蚁金服软件测试高级笔试题
  • 软考数据库---2.SQL语言
  • 基于顺序表实现通讯录
  • 咸鱼之王_手游_开服搭建架设_内购修复无bug运营版
  • 【JSON2WEB】14 基于Amis的CRUD开发30分钟速成
  • Java入门教程||Java 变量
  • 基于Java的校园快递一站式服务系统 (源码+文档+包运行)
  • 通讯录的实现(顺序表版本)