当前位置: 首页 > news >正文

延时队列实现实战:如何利用 RabbitMQ 实现延时队列,以满足特定延迟处理需求

实现延时队列,可以通过RabbitMQ的死信队列(Dead-letter queue)特性,“死信队列”是当消息过期,或者队列达到最大长度时,未消费的消息会被加入到死信队列。然后,我们可以对死信队列中的消息进行消费,完成类似“延时”的效果。

下面的示例代码演示了如何在Spring Boot中使用RabbitMQ设置一个订单,然后在15分钟后自动取消。

1. 添加 RabbitMQ 依赖:

在你的pom.xml中加入Spring Boot对RabbitMQ的Starter:

xml

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. 配置队列、交换器、绑定和容器

我们创建一个配置类,定义一个正常的队列和一个死信队列,以及相应的交换机和队列的绑定:

java

@Configuration
public class RabbitMQConfig {/* 正常队列配置 */@Beanpublic Queue orderQueue() {Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", "dead_exchange");  // 队列消息过期后发送的交换器args.put("x-dead-letter-routing-key", "dead"); // 队列消息过期后发送的路由键return new Queue("order_queue", true, false, false, args);}@Beanpublic DirectExchange orderExchange() {return new DirectExchange("order_exchange");}@Beanpublic Binding orderBinding() {return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("order");}/* 死信队列配置 */@Beanpublic Queue deadQueue() {return new Queue("dead_queue");}@Beanpublic DirectExchange deadExchange() {return new DirectExchange("dead_exchange");}@Beanpublic Binding deadBinding() {return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead");}
}

3. 发送延时消息

创建订单时,我们发送一个延时消息到队列:

java

@Autowired
private RabbitTemplate rabbitTemplate;public void createOrder(String orderId) {//订单创建业务...rabbitTemplate.convertAndSend("order_exchange", "order", orderId, message -> {message.getMessageProperties().setExpiration(String.valueOf(15 * 60 * 1000)); // 15分钟return message;});
}

4. 消费死信队列中的消息

然后,我们需要消费死信队列中的消息,进行订单取消的操作:

java

@RabbitListener(queues = "dead_queue")
public void processDeadLetter(String orderId) {// 订单取消业务...
}
  1. 在RabbitMQ中设置消息的过期时间
    RabbitMQ允许你在两个级别上设置消息的过期时间:队列级别和消息级别。一旦消息过期,它将从队列中删除。
  • 队列级别:你可以在声明队列时通过"x-message-ttl"参数设置过期时间(以毫秒为单位)。这会影响队列中的所有消息。

java

Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 60000); // 设置60s过期时间
Channel channel = ...;
channel.queueDeclare("my_queue", false, false, false, args); 
  • 消息级别:你也可以在发布消息时单独为每条消息设置过期时间。如果队列级别的TTL和消息级别的TTL都被设置了,那么较小的那个值会被应用。

java

byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().expiration("60000").build(); // 设置60s过期时间
Channel channel = ...;
channel.basicPublish("my_exchange", "my_routing_key", props, messageBodyBytes);

5、RabbitMQ中的死信队列如何工作?

“死信队列”用于接收不可路由的消息,或者由于一些原因不能正确处理的消息。这样能防止原始队列堵塞,也可以进一步处理这些消息。

当以下情况发生时,消息会被投递到死信队列:

  • 消息被拒绝 (basic.reject / basic.nack),并且requeue=false。
  • 在队列中排队的时间过长(超过设置的TTL)。
  • 队列长度溢出(超过设置的最大长度)。

在声明队列时,可以通过"x-dead-letter-exchange"和"x-dead-letter-routing-key"来设置死信交换器和路由键。

 6、如何确保消费者在消费消息时不会发生重复消费的情况?

确保消费者在消费消息时不发生重复消费,一般可以通过以下方式实现:

  • 消息确认机制:RabbitMQ提供了消息确认机制ACK,消费者处理完消息后,返回一个ACK信号,告诉RabbitMQ可以将该消息从队列中删除。如果在处理消息的过程中消费者出现异常(如断开连接),RabbitMQ将不会删除队列中的消息,并且会再次尝试发送该消息给消费者。
  • 幂等操作:在消费者端,确保消息处理逻辑是幂等的,也就是说,无论消息被处理一次还是多次,结果都是相同的。例如,对于一个扣款操作,无论这个操作执行多少次,账户的余额都应该只被扣除一次。
  • 分布式锁:在处理消息之前,消费者首先尝试获取一个分布式锁。只有获取到锁的消费者才能处理消息。处理完毕后释放锁。避免了同一消息被多个消费者处理的情况。

7、设置RabbitMQ中消息的优先级
在RabbitMQ中,对消息的优先级的支持是通过队列来实现的。在声明队列的时候,可以通过x-max-priority参数来指定队列支持的最大优先级。然后在发布消息的时候,可以通过basicPropertiespriority字段来指定消息的优先级。如下:

java

// 声明队列时设置最大优先级
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 10);
channel.queueDeclare("my_queue", false, false, false, args);// 发布消息时设置优先级
AMQP.BasicProperties props = new AMQP.BasicProperties().builder().priority(5).build();
channel.basicPublish("", "my_queue", props, "Hello world".getBytes());

注意,优先级较高的消息将会优先被消费,但是并不保证完全按照优先级顺序消费。

8、RabbitMQ中如何处理消费者异常断开连接的情况?
当RabbitMQ检测到消费者(如一个TCP连接)异常断开,例如因为消费者主机崩溃或因为网络问题,它将关闭该消费者的连接,并将消费者未确认的任何消息重新放入队列。如果你希望一个消息在消费者断开连接时不被再次放入队列,你可以设置该消费者的autoAck参数为true(也就是无需显示确认)。但一般情况下,我们推荐消费者在正确处理消息后发送一个确认应答(basicAck)。

http://www.lryc.cn/news/334577.html

相关文章:

  • 关于在Ubuntu上配置mysql踩的一些坑
  • JSBridge原理 - 前端H5与客户端Native交互
  • 【Java EE】Spring请求如何传递参数详解
  • 菜鸟笔记-Numpy常用函数用法汇总
  • tensorflow.js 如何使用opencv.js通过面部特征点估算脸部姿态并绘制示意图
  • Linux命令-dpkg-divert命令(Debian Linux中创建并管理一个转向列表)
  • flex: 1 是哪些属性的缩写?
  • python基于opencv实现数籽粒
  • OpenCV图像处理——基于OpenCV的ORB算法实现目标追踪
  • 13.JavaWeb XML:构建结构化数据的重要工具
  • 鸿蒙OS实战开发:【多设备自适应服务卡片】
  • 深度学习基础之一:机器学习
  • Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单视频处理实战案例 之五 简单指定视频某片段重复播放效果
  • ARXML处理 - C#的解析代码(二)
  • 关于华为即将举行的鸿蒙春季沟通会的新闻报道
  • MySQL视图及如何导入导出
  • 文心一言上线声音定制功能;通义千问开源模型;openAI又侵权?
  • 课时89:流程控制_函数进阶_函数变量
  • Linux命令-dpkg-preconfigure命令(Debian Linux中软件包安装之前询问问题)
  • SEO优化艺术:精细化技巧揭示与搜索引擎推广全面战略解读
  • 《springcloud alibaba》 四 seata安装以及使用
  • -bash: cd: /etc/hadoop: 没有那个文件或目录
  • JVM字节码与类加载——字节码指令集与解析
  • 景芯2.5GHz A72训练营dummy添加(一)
  • React - 请你说一说setState是同步的还是异步的
  • 设计模式之命令模式(下)
  • 【opencv】示例-demhist.cpp 调整图像的亮度和对比度,并在GUI窗口中实时显示调整后的图像以及其直方图。...
  • 计算机网络---第三天
  • 怎么防止文件被拷贝,复制别人拷贝电脑文件
  • 流式密集视频字幕