当前位置: 首页 > news >正文

【Node.js】RabbitMQ 延时消息

概述

在 RabbitMQ 中实现延迟消息通常需要借助插件(如 RabbitMQ 延迟队列插件),因为 RabbitMQ 本身不原生支持延迟消息。

延迟消息的一个典型场景是,当消息发布到队列后,等待一段时间再由消费者消费。这可以通过配置 TTL(Time-To-Live)和死信队列(DLX, Dead Letter Exchange)实现,或者通过 RabbitMQ 的延迟插件实现。

安装插件

下载地址

在这里插入图片描述

直接点击下载,然后将下载后的文件直接放入在 plugins 目录中:

在这里插入图片描述

启动插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

检查是否成功

打开可视化面板,访问 http://localhost:15672/#/ 账号密码都是 guest

发现新增了一个延迟队列类型 x-delayed-message:

在这里插入图片描述

延迟消息实现步骤

1. 创建一个延迟交换机

RabbitMQ 延迟插件允许我们使用一种特殊的交换机类型 x-delayed-message,可以设置延迟时间。

2. 发送延迟消息

通过设置消息属性中的 x-delay 来定义延迟时间。

3. 消费消息

消费者在消息到达指定的延迟时间后可以消费。

代码示例

1. 发送延迟消息的生产者代码
const amqp = require('amqplib');async function sendDelayedMessage() {const exchangeName = 'delayed_exchange';const routingKey = 'my_routing_key';const delayTime = 5000; // 延迟 5 秒// 连接到 RabbitMQ 服务器const connection = await amqp.connect('amqp://localhost');const channel = await connection.createChannel();// 声明延迟交换机await channel.assertExchange(exchangeName, 'x-delayed-message', {durable: true,arguments: {'x-delayed-type': 'direct', // 交换机的基础类型},});const message = 'Hello, this is a delayed message!';// 发送带有延迟的消息channel.publish(exchangeName, routingKey, Buffer.from(message), {headers: {'x-delay': delayTime, // 设置延迟时间},});console.log(`[x] Sent delayed message: "${message}" with delay: ${delayTime}ms`);// 关闭连接setTimeout(() => {connection.close();}, 1000);
}sendDelayedMessage().catch(console.error);
2. 消费延迟消息的消费者代码
const amqp = require('amqplib');async function consumeDelayedMessage() {const exchangeName = 'delayed_exchange';const queueName = 'delayed_queue';const routingKey = 'my_routing_key';// 连接到 RabbitMQ 服务器const connection = await amqp.connect('amqp://localhost');const channel = await connection.createChannel();// 声明队列并绑定到交换机await channel.assertQueue(queueName, { durable: true });await channel.bindQueue(queueName, exchangeName, routingKey);console.log('[*] Waiting for messages in delayed queue. To exit press CTRL+C');// 消费消息channel.consume(queueName, (msg) => {if (msg !== null) {console.log(`[x] Received delayed message: "${msg.content.toString()}"`);channel.ack(msg); // 手动确认消息}});
}consumeDelayedMessage().catch(console.error);
  1. 生产者部分

    • 使用 x-delayed-message 交换机,它允许消息在交换机中保留一段时间(通过 x-delay 属性),再发布到相应的队列。
    • 通过设置消息属性 headers: { 'x-delay': delayTime } 来指定延迟的时间。
  2. 消费者部分

    • 声明一个队列并将其绑定到延迟交换机,消费者从队列中接收消息。
    • 当消息的延迟时间到达后,消息被投递到队列并由消费者处理。

总结

使用延迟插件可以简化 RabbitMQ 中延迟消息的实现。

通过 x-delayed-message 交换机和 x-delay 属性,开发者可以灵活地控制消息的延迟发送时间。这种方式常用于需要延迟执行某些任务的场景,例如订单超时处理、延迟通知、预约外卖时延迟预约消息推送到商家等。

http://www.lryc.cn/news/440668.html

相关文章:

  • 前后端分离Vue美容店会员信息管理系统o7grs
  • 初学Linux(学习笔记)
  • 新增的标准流程
  • WebSocket 协议
  • [mysql]mysql排序和分页
  • 开源 AI 智能名片 S2B2C 商城小程序中的全渠道供应策略
  • 一次渲染十万条数据:前端技术优化(上)
  • springboot实训学习笔记(5)(用户登录接口的主逻辑)
  • python中网络爬虫框架
  • GEC6818初次连接使用
  • 解释下不同Gan模型之间的异同点
  • Hadoop的一些高频面试题 --- hdfs、mapreduce以及yarn的面试题
  • Day99 代码随想录打卡|动态规划篇--- 01背包问题
  • 往证是什么意思
  • Camunda流程引擎并发性能优化
  • spring springboot 日志框架
  • 【D3.js in Action 3 精译_022】3.2 使用 D3 完成数据准备工作
  • 电脑怎么禁用软件?5个方法速成,小白必入!
  • 力扣之181.超过经理收入的员工
  • C++语法应用:从return机制看返回指针,返回引用
  • Linux5-echo,>,tail
  • sqlgun靶场训练
  • 简化登录流程,助力应用建立用户体系
  • 【研发日记】嵌入式处理器技能解锁(六)——ARM的Cortex-M4内核
  • 深度学习经典模型之T5
  • 10.第二阶段x86游戏实战2-反编译自己的程序加深堆栈的理解
  • ARM总复习
  • ​​使用ENVI之大气校正(下)
  • C++(学习)2024.9.18
  • 认知小文2《成功之路:习惯、学习与实践》