当前位置: 首页 > news >正文

RabbitMQ延时队列的两种实现方式

目录

一、延时插件实现

1、版本要求

2、为运行新容器时安装

3、为已运行的容器安装

4、验证安装

5、代码编写

1. 配置类

2. 生产者

3. 消费者

二、死信队列实现

1、代码编写

1. 配置类

2. 生产者

3. 消费者

三、踩坑记录

1、发送消息失败

2、消息过期后未能转发到死信队列

3、消费者消费报错


一、延时插件实现

1、版本要求

RabbitMQ 3.5.7以上

2、为运行新容器时安装

# 1. 拉取带管理界面的镜像
docker pull rabbitmq:3.11-management
​
# 2. 启动容器并启用插件
docker run -d \--name rabbitmq \-p 5672:5672 \-p 15672:15672 \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=password \rabbitmq:3.11-management \bash -c "rabbitmq-plugins enable rabbitmq_delayed_message_exchange && rabbitmq-server"

3、为已运行的容器安装

# 1. 进入正在运行的容器
docker exec -it rabbitmq /bin/bash
​
# 2. 在容器内执行插件安装
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
​
# 3. 退出容器
exit
​
# 4. 重启容器使插件生效
docker restart rabbitmq

4、验证安装

# 方法1:检查插件列表
docker exec rabbitmq rabbitmq-plugins list | grep delayed
​
# 方法2:登录管理界面
# 访问 http://localhost:15672 (使用设置的账号密码登录)
# 在 "Exchanges" 标签页创建交换机时,Type 下拉框会出现 "x-delayed-message" 选项

5、代码编写

1. 配置类

@Configuration
public class RabbitMqConfig {public static final String DELAYED_EXCHANGE = "delayed.exchange";public static final String DELAYED_QUEUE = "delayed.queue";public static final String DELAYED_ROUTING_KEY = "delayed_routing_key";@Beanpublic CustomExchange delayedExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct"); // 交换机类型return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message", // 固定类型true,false,args);}
​@Beanpublic Queue delayedQueue() {return new Queue(DELAYED_QUEUE, true);}
​@Beanpublic Binding delayedBinding(Queue delayedQueue, CustomExchange delayedExchange) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}}

2. 生产者

public void send(String exchange, String routing_key,Object data, Integer delayMillis) {// 消息后处理器:设置延时和持久化MessagePostProcessor processor = message -> {// 毫秒message.getMessageProperties().setDelay(delayMillis);// 持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;};
​rabbitTemplate.convertAndSend(exchange, routingKey, data, processor);
}

3. 消费者

@Component
@RabbitListener(queues = RabbitMqConfig.DELAYED_QUEUE)
public class DelayedListener {
​@RabbitHandlerpublic void listener(String data, Channel channel, Message message) {log.warn("消息消费成功,消息内容:{}", data);MessageProperties properties = message.getMessageProperties();long deliveryTag = properties.getDeliveryTag()channel.basicAck(deliveryTag, false);}
​
}

二、死信队列实现

1、代码编写

1. 配置类

@Configuration
public class RabbitMqConfig {public static final String DELAYED_EXCHANGE = "delayed.exchange";public static final String DELAYED_QUEUE = "delayed.queue";public static final String DELAYED_ROUTING_KEY = "delayed_routing_key";
​public static final String NORMAL_EXCHANGE = "normal.exchange";public static final String NORMAL_QUEUE = "normal.queue";public static final String NORMAL_ROUTING_KEY = "normal_routing_key";// 死信队列(延时队列)@Beanpublic Queue delayedQueue() {return QueueBuilder.durable(DELAYED_QUEUE).build();}
​// 死信交换机@Beanpublic DirectExchange delayedExchange() {return new DirectExchange(DELAYED_EXCHANGE);}
​// 绑定死信队列到死信交换机@Beanpublic Binding delayedBinding(Queue delayedQueue, DirectExchange delayedExchange) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY);}
​// 普通队列@Beanpublic Queue normalQueue() {return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DELAYED_EXCHANGE).deadLetterRoutingKey(DELAYED_ROUTING_KEY).build();}
​// 普通交换机@Beanpublic DirectExchange normalExchange() {return new DirectExchange(NORMAL_EXCHANGE);}
​// 绑定普通队列到普通交换机@Beanpublic Binding normalBinding(Queue normalQueue, DirectExchange normalExchange) {return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY);}}

2. 生产者

public void send(String exchange, String routing_key, Object data, Integer delayMillis) {String uuid = IdUtil.simpleUUID();// 消息入库略,uuid为主键MessageProperties properties = new MessageProperties();// 设置TTL,单位毫秒properties.setExpiration(String.valueOf(delayMillis));// 消息持久化(2 表示持久化)properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
​Message msg = rabbitTemplate.getMessageConverter().toMessage(data, properties);rabbitTemplate.send(exchange, routingKey, msg, new CorrelationData(uuid));
}

3. 消费者

@Component
@RabbitListener(queues = RabbitMqConfig.DELAYED_QUEUE)
public class DelayedListener {
​@RabbitHandlerpublic void listener(String data, Channel channel, Message message) {log.warn("消息消费成功,消息内容:{}", data);MessageProperties properties = message.getMessageProperties();long deliveryTag = properties.getDeliveryTag()channel.basicAck(deliveryTag, false);}
​
}

三、踩坑记录

1、发送消息失败

原因RabbitTemplate 配置了消息抵达确认,消息ID没有传值。

RabbitTemplate rabbitTemplate = new RabbitTemplate();
// 消息抵达确认通知
rabbitTemplate.setConfirmCallback((data, ack, cause) -> {String msgId = data.getId();if (ack) {log.info("消息抵达队列成功:{}", data);} else {log.error("消息未能发送成功,消息ID:{}", data.getId(), cause);}
});

生产者实际发送消息未传消息ID:

错误格式

rabbitTemplate.convertAndSend(exchange, routingKey, data);

正确格式

String uuid = IdUtil.simpleUUID();
rabbitTemplate.convertAndSend(exchange, routingKey, data, new CorrelationData(uuid));

2、消息过期后未能转发到死信队列

原因:正常消息未绑定死信队列,消息过期自动删除,而不会转发到死信队列中。

错误格式

@Bean
public Queue delayedNormalQueue() {return QueueBuilder.durable(NORMAL_QUEUE).build();
}

正确格式

@Bean
public Queue delayedNormalQueue() {return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DELAYED_EXCHANGE) // 指定死信交换机.deadLetterRoutingKey(DELAYED_ROUTING_KEY) // 指定死信路由键.build();
}

3、消费者消费报错

原因:发送的消息由于自定义的 MessageProperties ,其中缺失了 contentType 参数,需要使用转化器进行转换,而不是直接发送消息。

错误格式

MessageProperties properties = new MessageProperties();
properties.setExpiration(String.valueOf(delayMillis));
​
Message msg = new Message(message.getBytes(), properties);
rabbitTemplate.convertAndSend(exchange, routingKey, msg, new CorrelationData(uuid));

正确格式

MessageProperties properties = new MessageProperties();
properties.setExpiration(String.valueOf(delayMillis));
​
Message msg = rabbitTemplate.getMessageConverter().toMessage(message, properties);
rabbitTemplate.send(exchange, routingKey, msg, new CorrelationData(uuid));

http://www.lryc.cn/news/611238.html

相关文章:

  • 磁悬浮转子的“静音术”:深度解析无接触抑制旋转幽灵的奥秘
  • 基于华为开发者空间的Open WebUI数据分析与可视化实战
  • 【Linux系统编程】线程概念与控制
  • MATLAB实现菲涅尔法全息成像仿真
  • Spring Boot 整合 Web 开发全攻略
  • Java面试宝典:深入解析JVM运行时数据区
  • Linux 内存管理之 Rmap 反向映射(二)
  • EP01:【DL 第二弹】张量(Tensor)的创建和常用方法
  • BloodHound 8.0 首次亮相,在攻击路径管理方面进行了重大升级
  • IPD研发管理——决策评审DCP指南
  • Java从入门到精通 - 集合框架(一)
  • MySQL主从延迟到崩溃:Binlog格式、半同步复制与GTID的博弈
  • 视频转二维码在教育场景中的深度应用
  • 结合opencv解释图像处理中的结构元素(Structuring Element)
  • 【Java企业级开发】(七)Spring框架
  • 区块链:重构信任的价值互联网革命​
  • 场外个股期权的额度为何受限?
  • 浮动IP(Floating IP)的删除通常需要满足什么条件
  • 基于ZYNQ ARM+FPGA的声呐数据采集系统设计
  • uniapp转app时,cover-view的坑
  • 什么情况下浮动IP(Floating IP)会“漂移”(Drift)
  • OneCode 3.0 前端架构全面研究
  • ​​机器学习贝叶斯算法
  • MinIO01-入门
  • 本地部署文档管理平台 BookStack 并实现外部访问( Windows 版本)
  • Claude Code 完整指南:入门到应用
  • Flux.1系列模型解析--Flux.1 Tools
  • 鸿蒙组件装饰器深度解析:@Component vs @ComponentV2
  • 代码随想录day57图论7
  • LLM开发——语言模型会根据你的提问方式来改变答案