当前位置: 首页 > news >正文

C 语言编程 — 线程池设计与实现

目录

文章目录

  • 目录
  • 线程池(Thread Pool)
  • tiny-threadpool
    • 数据结构设计
      • Task / Job
      • Task / Job Queue
      • Worker / Thread
      • Thread Pool Manager
    • Public APIs
    • Private Functions
    • 运行示例

线程池(Thread Pool)

线程池(Thread Pool)是一种用于管理多线程环境的技术。Thread Pool 会在 User Process 中预先创建一组 User Threads,并在需要时重复使用它们,而不是频繁的创建新线程。

线程池可以有效提高程序性能和可靠性:

  • 避免频繁创建、销毁线程,降低了系统开销;
  • 限制线程数量,防止过度占用系统资源;
  • 提高了程序的响应速度和吞吐量;
  • 管理线程的生命周期,避免了线程泄漏的风险;
  • 可实现任务优先级。

tiny-threadpool

  • Github:https://github.com/JmilkFan/tiny-threadpool

tiny-threadpool fork 自 C-Thread-Pool,是一个开源的轻量级线程池,实现了以下功能。

  • 符合 ANCI C 和 POSIX 风格;
  • 支持 Thread 的 Pause(暂停)、Resume(恢复)、Wait(等待);
  • 提供简洁的 APIs。

数据结构设计

在这里插入图片描述

Task / Job

Task / Job 本质是由 Producer 发出的任务请求,通常数量庞大,需要高并发的进行处理。

/*** Job 应该包含以下 3 个成员:*  1. 线程入口函数;* 	2. 线程入口函数的参数;* 	3. 指向下一个 Job 的指针。*/
typedef struct job{struct job*  prev;  // 指向下一个 Job,添加 New Job 入队尾(Rear)时,上一次的 Rear Job,应该要指向下一个 New Job,然后 New Job 成为新的 Near Job。void   (*function)(void* arg);  // 线程入口函数的类型声明void*  arg;                     // 线程入口函数的参数的类型声明
} job;

Task / Job Queue

Queue 用于缓存 Jobs,并且是 FIFO 的。在某些场景中,可能会要求 Queue 的长度是可调整的,也可能会要求有多条不同优先级的 Queues。

/*** Job Queue 应该包含以下 5 个成员:*  1. 队头* 	2. 队尾*  3. 队长*  4. 互斥锁,保证高并发 Jobs 入队/出队是 FIFO 的。* 	5. 队列状态:1)空队列;2)非空队列;*/
typedef struct jobqueue{job  *front;              // 指向队头job  *rear;               // 指向队尾int   len;                // 队列长度pthread_mutex_t rwmutex;  // 任务队列的锁bsem *has_jobs;           // 指向一个二元信号量,用于表示 Queue 是否为空。
} jobqueue;/*** 二元信号量,用于标识 Job Queue 是否为空。* 	同样需要使用互斥锁和条件变量来保证高并发处理时二元值的安全性。*/
typedef struct bsem {pthread_mutex_t mutex;  // 信号量的锁pthread_cond_t   cond;  // 信号量的条件int v;                  // 信号量的互斥值//   0:空队列;//   1:非空队列。
} bsem;

Worker / Thread

Worker / Thread 是 Thread Pool 分配出来真正处理 Job 的执行单元,它们是线程入口函数的 Caller(调用者),所以也称为 Consumer。

/*** Thread 应该包含以下 3 个成员:*  1. 友好的 ID,便于调试。区别于 Kernel 分配的 TID。* 	2. 指向 pthread 实体的指针*  3. 指向 Thread Pool 的指针,以此来获得/释放线程池的锁和条件变量*/
typedef struct thread{int       id;              // 友好 IDpthread_t pthread;         // 指向一个 pThread 实体struct thpool_* thpool_p;  // 指向线程池
} thread;

Thread Pool Manager

Thread Pool Manager 用于管理 Thread Pool 资源:例如:创建/销毁多线程、维护任务队列、线程池状态、互斥锁和条件变量等。

当有 Job 需要处理时,Manager 就从 Pool 中获取一个可用的 Thread 来处理。当 Job 完成后,Manager 回收 Thread,而不是销毁它。

/*** Thread Pool Manager 应该包含以下 5 个成员:*  1. 多线程列表*  2. 活跃线程数量* 	3. 工作线程数量(可用线程数 = 活跃线程数量 - 工作线程数量)*  4. 任务队列*  5. 互斥锁*  6. 条件变量*/
typedef struct thpool_{thread**   threads;                // 指向线程指针数组volatile int num_threads_alive;    // 当前活跃的线程数量volatile int num_threads_working;  // 当前工作中的线程数量jobqueue  jobqueue;                // 线程池关联的任务队列pthread_mutex_t  thcount_lock;     // 线程池的锁pthread_cond_t  threads_all_idle;  // 线程池的条件变量
} thpool_;

Public APIs

typedef struct thpool_* threadpool;// 创建一个包含有指定数量线程的线程池
threadpool thpool_init(int num_threads);// 添加 task 到 task queue 中。
int thpool_add_work(threadpool, void (*function_p)(void*), void* arg_p);// 等待所有 tasks 执行完。
void thpool_wait(threadpool);// 暂停所有 tasks,使它们进入 sleep 状态。通过信号机制来实现。
void thpool_pause(threadpool);// 恢复所有 tasks 继续执行。
void thpool_resume(threadpool);// 销毁所有 tasks,如果有 task 正在执行,则先等待 task 执行完。
void thpool_destroy(threadpool);// 获取当前正在工作中的线程数量。
int thpool_num_threads_working(threadpool);

Private Functions

// 构造 struct thread,并调用 pthread_create() 创建线程
static int thread_init (thpool_* thpool_p, struct thread** thread_p, int id) // 当线程被暂停时会在这里休眠
static void thread_hold(int sig_id) // 线程在此函数中执行任务
static void* thread_do(struct thread* thread_p) // 销毁 struct thread
static void thread_destroy (thread* thread_p) // 任务队列相关的操作集合
static int jobqueue_init(jobqueue* jobqueue_p) 
static void jobqueue_clear(jobqueue* jobqueue_p) 
static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob) 
static struct job* jobqueue_pull(jobqueue* jobqueue_p) 
static void jobqueue_destroy(jobqueue* jobqueue_p) // 信号量相关的操作集合
static void bsem_init(bsem *bsem_p, int value) 
static void bsem_reset(bsem *bsem_p) 
static void bsem_post(bsem *bsem_p) 
static void bsem_post_all(bsem *bsem_p) 
static void bsem_wait(bsem* bsem_p)

运行示例

$ gcc example.c thpool.c -D THPOOL_DEBUG -pthread -o example$ ./example
Making threadpool with 4 threads
THPOOL_DEBUG: Created thread 0 in pool
THPOOL_DEBUG: Created thread 1 in pool
THPOOL_DEBUG: Created thread 2 in pool
THPOOL_DEBUG: Created thread 3 in pool
Adding 40 tasks to threadpool
Thread #245600256 working on 0
Thread #246136832 working on 2
Thread #246673408 working on 3
Thread #246673408 working on 6
Thread #246673408 working on 7
...
Killing threadpool
http://www.lryc.cn/news/39551.html

相关文章:

  • 并发编程要点
  • HDFS黑名单退役服务器
  • 基于stm32智能语音电梯消毒系统
  • FreeRTOS系列第1篇---为什么选择FreeRTOS?
  • 基于.NET Core内置浏览器窗体应用程序界面框架
  • 【数据结构初阶】一文带你学会归并排序(递归非递归)
  • Simulink壁咚(一)——What and How
  • 【PyTorch】Pytorch基础第0章
  • Android学习总结
  • 虚拟机ubuntu安装samba服务
  • 开发板中的内存压力测试,你了解多少?
  • MATLAB | 这些花里胡哨的热图怎么画
  • Java开发的一些编码建议
  • 【YOLOv8/YOLOv7/YOLOv5/YOLOv4/Faster-rcnn系列算法改进NO.59】引入ASPP模块
  • C++STL set/multiset容器 构造和赋值 大小和交换 插入和删除 查找和统计
  • 产品研发项目进度管理软件工具有哪些推荐?整理10款最佳进度管理软件
  • 「ML 实践篇」分类系统:图片数字识别
  • 从大专到测开,上海某字母站大厂的面试题,岗位是测开(25K*16)
  • 【面试题】Python软件工程师能力评估试题(一)
  • Java八股文(Java多线程面试题)
  • 小程序当前页面如何分享别的页面内容呢?
  • 编写Java哪个编译器好
  • 第十六章 Java为什么使用序列化
  • 28岁小公司程序员,无车无房不敢结婚,要不要转行?
  • 出道即封神的ChatGPT,现在怎么样了?
  • 【计算机视觉】CNN 可视化算法
  • 自动抓取服务器巡检、登录、执行命令记录+备份脚本
  • 如何用Python求解微分方程组
  • 【微信小程序】-- 自定义组件 - behaviors(三十九)
  • 【微信小程序】-- 自定义组件 - 父子组件之间的通信(三十八)