Javar如何用RabbitMQ订单超时处理
RabbitMQ 订单超时处理方案
使用 RabbitMQ 的 TTL + 死信队列(DLX)
RabbitMQ 的 TTL(Time-To-Live) 和 死信队列(Dead Letter Exchange) 是处理订单超时的常见方案。核心思路是设置消息的过期时间,超时后自动转入死信队列进行后续处理。
步骤:
-
创建订单时发送延迟消息
订单创建后,向 RabbitMQ 发送一条带有 TTL 的消息(例如 30 分钟超时)。消息的 TTL 可以通过队列或消息本身设置。// 设置消息的 TTL(单位:毫秒) AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration("1800000") // 30分钟 = 30 * 60 * 1000.build(); channel.basicPublish("", "order_queue", properties, message.getBytes());
-
配置死信队列
定义一个死信交换器(DLX)和死信队列,绑定到原始队列。当消息超时后,RabbitMQ 会自动将其路由到死信队列。// 定义死信交换器和队列 channel.exchangeDeclare("order_dlx", "direct"); channel.queueDeclare("order_dead_letter_queue", true, false, false, null); channel.queueBind("order_dead_letter_queue", "order_dlx", "dead_letter");// 原始队列绑定死信交换器 Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", "order_dlx"); args.put("x-dead-letter-routing-key", "dead_letter"); channel.queueDeclare("order_queue", true, false, false, args);
-
消费死信队列处理超时订单
监听死信队列,接收到消息时检查订单状态。若订单未支付,则执行取消逻辑(如释放库存、标记订单状态)。DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);// 解析订单ID,检查状态if (orderService.isOrderUnpaid(orderId)) {orderService.cancelOrder(orderId);} }; channel.basicConsume("order_dead_letter_queue", true, deliverCallback, consumerTag -> {});
使用 RabbitMQ 插件 rabbitmq_delayed_message_exchange
如果需要更灵活的延迟时间(如动态设置不同订单的超时时间),可以使用官方插件 rabbitmq_delayed_message_exchange
,支持延迟消息投递。
步骤:
-
安装插件
在 RabbitMQ 服务器执行:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
-
声明延迟交换器
在 Java 代码中声明一个x-delayed-message
类型的交换器。Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); channel.exchangeDeclare("delayed_exchange", "x-delayed-message", true, false, args); channel.queueDeclare("delayed_queue", true, false, false, null); channel.queueBind("delayed_queue", "delayed_exchange", "delayed_routing_key");
-
发送延迟消息
通过x-delay
头设置延迟时间(毫秒)。Map<String, Object> headers = new HashMap<>(); headers.put("x-delay", 1800000); // 30分钟延迟 AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().headers(headers).build(); channel.basicPublish("delayed_exchange", "delayed_routing_key", props, message.getBytes());
-
消费延迟消息
监听队列处理超时逻辑,与死信队列方案类似。
注意事项
-
幂等性处理
消息可能重复投递(如消费者处理失败),需确保取消订单逻辑的幂等性。 -
消息持久化
若需要可靠性,将队列和消息设置为持久化:channel.queueDeclare("order_queue", true, false, false, args); // 持久化队列 AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2) // 持久化消息.build();
-
替代方案
高精度场景可结合数据库定时任务或 Redis 的键过期通知,但 RabbitMQ 方案更适用于分布式系统解耦。