Node.js 中基于请求 ID 实现简单队列(即时阻止策略/排队等待策略)
在Node.js 中基于请求 ID 实现简单队列
下面示例演示两种策略,以同一个请求 ID 为单位:
-
即时阻止策略:如果已有相同 ID 的请求在处理,直接报错并返回。
-
排队等待策略:后续相同 ID 的请求不报错,而是挂起,直到首个请求完成后一起接收相同的处理结果。
Powered by Moshow@https://zhengkai.blog.csdn.net/
一、核心思路
-
用一个
Map
存储「正在处理」的条目,key 为请求 ID,value 包含:-
当前正在执行的 Promise
-
一个用于挂起后续请求的队列数组
-
-
新请求到来时,检查 Map:
-
如果不存在对应条目:将当前请求注册为「首个处理」,并执行异步业务逻辑
-
如果已存在对应条目:
-
即时阻止策略:直接抛错
-
排队等待策略:返回一个挂起的 Promise,入队等待
-
-
-
首个请求完成(resolve 或 reject)后:
-
删除 Map 中的条目
-
将结果(成功或失败)广播给队列中的所有挂起请求
-
二、代码示例
队列管理类
// queue-manager.js
class QueueManager {constructor() {this.map = new Map(); // key: requestId, value: { promise, queue }}/*** 注册请求* @param {string} id 请求 ID* @param {() => Promise<any>} handler 首次请求的异步逻辑* @param {Object} options* @param {boolean} options.rejectOnDuplicate 是否即时拒绝重复请求* @returns {Promise<any>}* @author Moshow@https://zhengkai.blog.csdn.net/*/enqueue(id, handler, options = { rejectOnDuplicate: true }) {// 未在处理队列中,直接执行 handlerif (!this.map.has(id)) {const queue = [];const entry = { queue };this.map.set(id, entry);// 执行核心业务逻辑entry.promise = handler().then(result => {// 广播成功结果给所有挂起请求queue.forEach(({ resolve }) => resolve(result));return result;}).catch(err => {// 广播错误给所有挂起请求queue.forEach(({ reject }) => reject(err));throw err;}).finally(() => {// 清理this.map.delete(id);});return entry.promise;}// 已有正在处理的相同 IDconst entry = this.map.get(id);// ****** 即时阻止策略:直接抛错if (options.rejectOnDuplicate) {return Promise.reject(new Error(`Request ${id} 正在处理中,请稍后再试`));}// ****** 排队等待策略:返回挂起的 Promise,结果由首个请求结束时统一广播return new Promise((resolve, reject) => {entry.queue.push({ resolve, reject });});}
}module.exports = new QueueManager();
排队等待策略:返回一个挂起的 Promise,入队等待
即时阻止策略:直接抛错
// app.js
const express = require('express');
const queueManager = require('./queue-manager');
const app = express();app.get('/process', async (req, res) => {const requestId = req.query.id;if (!requestId) {return res.status(400).send('缺少 id');}try {// Powered by Moshow@https://zhengkai.blog.csdn.net/// strategy 1: 即时拒绝 rejectOnDuplicate: true// const result = await queueManager.enqueue(requestId, () => doWork(requestId), { rejectOnDuplicate: true });// strategy 2: 排队等待并共享结果 rejectOnDuplicate: falseconst result = await queueManager.enqueue(requestId, () => doWork(requestId), { rejectOnDuplicate: false });res.json({ status: 'ok', data: result });} catch (err) {res.status(429).json({ status: 'error', message: err.message });}
});async function doWork(id) {// 模拟耗时操作await new Promise(r => setTimeout(r, 2000));return { id, timestamp: Date.now() };
}app.listen(3000, () => console.log('Server started on port 3000'));
三、使用说明与扩展
-
即时阻止
-
优点:后续请求瞬间反馈,不占用额外内存。
-
缺点:用户需自行决定重试时机,可能客户端不停打重试包。
-
-
排队等待并共享结果
-
优点:后续请求无需重试,直接拿到同样的处理结果。
-
缺点:高并发时可能积压大量挂起请求,需监控内存和队列长度。(或者配置超过30分钟则清空队列+终端业务重新来,根据业务调整)
-
-
异步错误处理
-
在实际业务中,记得对
handler
内部的异常或超时做额外保护,避免永久挂起。
-
-
更细粒度的队列
-
如果想要限制同时处理的总量(不只是同 ID),可在
QueueManager
里再加一个全局计数器或池子。
-
-
分布式场景
-
若服务水平扩展到多台机器,需要引入分布式锁(如 Redis 的 Redlock)或共享队列(如 Kafka、RabbitMQ)来保证同一 ID 只被一台机器处理。
-
这样我们就能在 Node.js/Express 中灵活地基于请求 ID 实现:要么即时拒绝重复请求,要么统一排队等待并共用第一个请求的处理结果。根据业务场景,选择最合适的策略即可。