当前位置: 首页 > news >正文

【Node.js】worker_threads 多线程

Node.js 中的 worker_threads 模块

worker_threads 模块是 Node.js 中用于创建多线程处理的工具。

尽管 JavaScript 是单线程的,但有时候在处理计算密集型任务或长时间运行的操作时,单线程的运行会导致主线程被阻塞,影响服务器性能。

为了解决这种问题,worker_threads 模块允许我们在同一个进程内创建并运行多个线程,每个线程有自己的事件循环,但共享进程的内存空间。

基本概念

  • 主线程:主线程是 Node.js 程序默认执行代码的地方,通常是单线程运行,执行同步和异步的事件循环。
  • Worker(工作线程):工作线程是与主线程平行执行的额外线程,用于处理复杂、长时间运行的任务,不会阻塞主线程的执行。

何时使用 worker_threads

  • 当需要处理 CPU 密集型 任务(如大型计算、图像处理、数据加密等)时。
  • 当需要保持 异步 I/O 操作的同时,不阻塞主线程时。

基本使用方法

1. 创建一个简单的 Worker

我们可以通过 Worker 类创建工作线程。每个工作线程运行一个独立的 JavaScript 文件。

// main.js
const { Worker } = require('worker_threads');// 创建一个新的 Worker,并指定 worker 执行的脚本文件
const worker = new Worker('./worker.js');// 监听 worker 发回的消息
worker.on('message', (message) => {console.log(`Received from worker: ${message}`);
});// 向 worker 发送消息
worker.postMessage('Start task');// worker.js
const { parentPort } = require('worker_threads');// 监听来自主线程的消息
parentPort.on('message', (message) => {console.log(`Worker received: ${message}`);// 进行一些耗时操作let result = 0;for (let i = 0; i < 1e9; i++) {result += i;}// 将结果发回主线程parentPort.postMessage(result);
});

在这个例子中,主线程(main.js)创建了一个 Worker 线程(worker.js),并通过 parentPort 与其通信。主线程可以向 Worker 发送任务,Worker 在处理完后将结果返回给主线程。

2. 数据通信

主线程和 Worker 通过 postMessage()message 事件来传递数据。可以发送任意可以序列化的 JavaScript 数据类型,如字符串、对象、数组等。

  • 主线程向 Worker 发送消息
worker.postMessage('Some data');
  • Worker 向主线程发送消息
parentPort.postMessage('Some result');

3. 共享内存(SharedArrayBuffer)

worker_threads 支持通过 SharedArrayBuffer 来在多个线程之间共享内存。这种机制可以避免频繁的消息传递开销,提高性能。

// main.js
const { Worker } = require('worker_threads');const sharedBuffer = new SharedArrayBuffer(4);  // 分配 4 字节的共享内存
const sharedArray = new Int32Array(sharedBuffer);const worker = new Worker('./worker.js', { workerData: sharedBuffer });worker.on('message', () => {console.log('Modified shared array:', sharedArray);
});// worker.js
const { parentPort, workerData } = require('worker_threads');const sharedArray = new Int32Array(workerData);// 修改共享数组
sharedArray[0] = 42;parentPort.postMessage('Shared data modified');

这里,SharedArrayBuffer 是共享内存的核心,它允许主线程和 Worker 线程访问相同的内存空间。我们用 Int32Array 对内存进行操作,修改数据后,主线程可以立即读取结果,无需通过消息传递。

4. 工作线程与主线程的生命周期

  • 启动和终止

    • 当创建一个 Worker 实例时,线程会自动启动。
    • 当 Worker 执行完所有任务或调用 worker.terminate() 时,线程会退出。
  • 自动终止
    如果工作线程的事件循环为空(没有待处理的事件),Worker 会自动退出。

worker.terminate().then(() => {console.log('Worker terminated');
});

5. 错误处理

在多线程环境下,处理错误尤为重要。我们可以使用 error 事件来捕获线程中的错误。

worker.on('error', (err) => {console.error('Worker error:', err);
});

如果 Worker 出现错误,会触发 error 事件,主线程可以处理这个错误。

Worker 线程池

虽然 worker_threads 允许我们创建多个 Worker,但直接为每个任务创建一个新的 Worker 可能效率较低。为此,我们可以创建一个 线程池,通过复用 Worker 来处理多个任务。

线程池实现(简单示例):

const { Worker } = require('worker_threads');class ThreadPool {constructor(size) {this.size = size;this.workers = [];this.tasks = [];// 初始化线程池for (let i = 0; i < size; i++) {this.workers.push(this.createWorker());}}createWorker() {const worker = new Worker('./worker.js');worker.on('message', () => {this.executeNextTask(worker);});return worker;}executeNextTask(worker) {if (this.tasks.length === 0) {return;}const task = this.tasks.shift();worker.postMessage(task);}runTask(task) {const availableWorker = this.workers.find(w => w.isIdle);if (availableWorker) {availableWorker.isIdle = false;availableWorker.postMessage(task);} else {this.tasks.push(task);}}
}const pool = new ThreadPool(4);pool.runTask('Task 1');
pool.runTask('Task 2');

在这个简单的示例中,我们创建了一个大小为 4 的线程池,任务可以通过 runTask 方法提交到线程池中。线程池会依次执行任务,并复用空闲的线程。

与其他多线程解决方案的比较

  • child_process 模块:允许在 Node.js 中创建独立的进程,进程间通过消息传递进行通信,但资源隔离更强,消耗较大。相比之下,worker_threads 在线程间共享内存,创建成本和通信成本较低。
  • 异步操作:虽然 Node.js 的异步 I/O 可以通过事件驱动模型来处理大量任务,但对于 CPU 密集型任务,异步操作并不适合,此时可以使用 worker_threads 来实现并行计算。

总结

  • worker_threads 是 Node.js 中用于多线程处理的核心工具。
  • 它允许在单个进程内创建多个线程,线程间可以通过消息传递和共享内存进行通信。
  • 非常适合用于处理计算密集型任务,避免主线程的阻塞。
  • 虽然 worker_threads 增强了并行计算的能力,但需要合理管理线程的创建和销毁,避免线程资源的浪费。
http://www.lryc.cn/news/454950.html

相关文章:

  • 贪心算法c++
  • 【STM32】 TCP/IP通信协议(3)--LwIP网络接口
  • 15分钟学 Python 第39天:Python 爬虫入门(五)
  • 使用Pytorch构建自定义层并在模型中使用
  • 学习记录:js算法(五十六):从前序与中序遍历序列构造二叉树
  • qt使用QDomDocument读写xml文件
  • Oracle架构之表空间详解
  • springboot整合seata
  • 鸿蒙开发(NEXT/API 12)【二次向用户申请授权】程序访问控制
  • docker export/import 和 docker save/load 的区别
  • 明星周边销售网站开发:SpringBoot技术全解析
  • STM32+ADC+扫描模式
  • R语言绘制散点图
  • 安装最新 MySQL 8.0 数据库(教学用)
  • 微信小程序开发-配置文件详解
  • TCP/UDP初识
  • 【大数据】在线分析、近线分析与离线分析
  • 【unity进阶知识9】序列化字典,场景,vector,color,Quaternion
  • 传奇GOM引擎架设好进游戏后提示请关闭非法外挂,重新登录,如何处理?
  • OpenCV视频I/O(15)视频写入类VideoWriter之标识视频编解码器函数fourcc()的使用
  • rust log选型
  • 数据库-分库分表
  • 基于SSM的校园社团管理系统的设计 社团信息管理 智慧社团管理社团预约系统 社团活动管理 社团人员管理 在线社团管理社团资源管理(源码+定制+文档)
  • 【SVN】一文读懂Subversion(SVN)
  • nginx打包部署前端vue项目全过程【保姆级教程】
  • From SAM to CAMs
  • 【NLP自然语言处理】01-基础学习路径简介
  • ffmpeg取rtsp流音频数据保存声音为wav文件
  • 《数字图像处理基础》学习01-数字图像处理的相关基础知识
  • C#-泛型学习笔记