当前位置: 首页 > news >正文

RabbitMQ延迟消息(通过死信交换机实现)

延迟消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间后才收到消息

通过DLX和TTL模拟出延迟队列的功能,即,消息发送以后,不让消费者拿到,而是等待过期时间,变成死信后,发送给死信交换机再路由到死信队列进行消费

1、声明延迟队列

package com.smart.wms.config.rabbitmq;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 功能描述:* 配置交换机、队列、绑定关系* @Authoer: bgq* @Date:2024/6/4 17:33*/
@Configuration
public class MaterialStockExchangeConfig {public static final String MATERIAL_STOCK_EXCHANGE = "wms.material.exchange";public static final String MATERIAL_STOCK_QUEUE = "wms.material.queue";public static final String MATERIAL_DLX_DIRECT_EXCHANGE = "wms.dlx.exchange";public static final String MATERIAL_DLX_QUEUE = "wms.dlx.queue";public static final String MATERIAL_TTL__ROUTING_KEY = "ttl";public static final String MATERIAL_DLX_DELAYED_KEY = "dlx";@Beanpublic Queue ttlQueue() {return QueueBuilder.durable(MATERIAL_STOCK_QUEUE) // 指定队列的名称.ttl(10000) // 指定 TTL 为 10 秒,这里可设置过期时间也可以在发送消息时设置过期时间.deadLetterExchange(MATERIAL_DLX_DIRECT_EXCHANGE) // 指定死信交换机.deadLetterRoutingKey(MATERIAL_DLX_DELAYED_KEY) // 指定死信交换机的 RoutingKey.build();}/*** 声明TTl交换机*/@Beanpublic DirectExchange directExchange(){return new DirectExchange(MATERIAL_STOCK_EXCHANGE);}/*** 声明ttl交换机与队列的关联关系*/@Beanpublic Binding directBinding(){return BindingBuilder.bind(ttlQueue()).to(directExchange()).with(MATERIAL_TTL__ROUTING_KEY);}/*** 声明死信交换机*/@Beanpublic DirectExchange dlxDirect(){return new DirectExchange(MATERIAL_DLX_DIRECT_EXCHANGE);}/*** 声明死信队列*/@Beanpublic Queue dlxQueue(){return new Queue(MATERIAL_DLX_QUEUE);}/*** 声明死信交换机与队列关联关系*/@Beanpublic Binding tlxBinding(){return BindingBuilder.bind(dlxQueue()).to(dlxDirect()).with(MATERIAL_DLX_DELAYED_KEY);}}

2、监听死信队列消费

@RabbitListener(queues = MaterialStockExchangeConfig.MATERIAL_DLX_QUEUE)//监听的队列public void process(Message message, Channel channel) throws Exception {// 进入消息消费业务逻辑String body = new String(message.getBody());log.info("消息,参数:{}",body);JSONObject bodyJson = JSONUtil.parseObj(body);//业务逻辑TODOchannel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

3、发送消息

@RequestMapping("/sendTTLMessage")
public void  sendTTLMessage(){JSONObject jsonObject = new JSONObject();jsonObject.putOpt("orderId",wmsSendOrder.getId());rabbitMQSendUtils.sendMessage(MaterialStockExchangeConfig.MATERIAL_STOCK_EXCHANGE,             MaterialStockExchangeConfig.MATERIAL_TTL__ROUTING_KEY, jsonObject, wmsSendOrder.getId().toString());log.info("消息发送成功!");
}

http://www.lryc.cn/news/376429.html

相关文章:

  • Java - 分支结构 - if…else/switch
  • web安全渗透测试十大常规项(一):web渗透测试之XML和XXE外部实体注入
  • 任务3.8.2 利用RDD计算总分与平均分
  • 探索磁力搜索引擎:互联网资源获取的新视角
  • 立创开源学习篇(一)
  • 2024/6/18 英语每日一段
  • 时隔一年,SSD大涨价?
  • 【TB作品】MSP430G2553,单片机,口袋板,流量积算仪设计
  • 九、数据结构(并查集)
  • 大模型开发技术基础
  • 芯片验证分享9 —— 芯片调试
  • java 面试题--基础
  • 必看!!! 2024 最新 PG 硬核干货大盘点(上)
  • Redis 高可用 sentinel
  • 【数据结构】练习集
  • 驱动开发(四):Linux内核中断
  • btrace:binder_transaction+eBPF+Golang实现通用的Android APP动态行为追踪工具
  • C# OCCT Winform 界面搭建
  • System.Dynamic.ExpandoObject的使用说明
  • adb之ps命令用法
  • Ubuntu-24.04-live-server-amd64安装界面中文版
  • Git的3个主要区域
  • 【操作系统】操作系统实验02-生产者消费者程序改进
  • TCP协议是安全的吗?
  • c语言回顾-结构体(2)
  • Prometheus常见exporter安装部署
  • DGit的使用
  • ElasticSearch学习篇13_《检索技术核心20讲》进阶篇之LSM树
  • 简单好用的C++日志库spdlog使用示例
  • python 方法运行计时装饰模式实现