当前位置: 首页 > news >正文

【线程系列之五】线程池介绍C语言

一、基本概念

1.1 概念

线程池(Thread Pool)是一种基于池化技术管理线程的机制,旨在减少线程创建和销毁的开销,提高系统资源的利用率,以及更好地控制系统中同时运行的线程数量。线程池通过预先创建一定数量的线程,并将这些线程放入一个“池”中,当有新任务到来时,不是立即创建新线程来执行,而是从线程池中取出一个空闲的线程来执行该任务。如果所有线程都在忙碌,则新任务会等待直到有线程变得空闲。

在C语言中,由于标准库(如C89/C99/C11等)不支持线程或线程池,因此通常需要使用第三方库如POSIX线程(pthread)来实现线程池。

1.2 应用场景【C语言】

C语言中的线程池应用场景与在其他编程语言中类似,主要包括以下几类:

  • 高并发服务器: 在网络服务器中处理大量客户端请求。每个请求可以分配给一个线程池中的线程进行处理,以提高响应速度和吞吐量。

  • 数据处理和计算密集型任务: 当需要处理大量数据或执行复杂的计算时,可以将任务分配到线程池中并行执行,以缩短总体执行时间。

  • 资源密集型任务: 对于需要频繁访问共享资源(如数据库、文件系统等)的任务,使用线程池可以减少线程创建和销毁的开销,并可能通过更高效的资源管理来提高性能。

  • 异步操作: 在需要执行非阻塞操作(如异步I/O、异步网络请求等)时,线程池可以用来处理这些异步操作的结果或回调。

  • 定时任务和周期性任务: C语言本身不直接支持定时器和周期性任务,但你可以使用线程池中的线程来模拟这些功能,通过轮询或睡眠机制来检查时间并执行相应的任务。

1.3 实现线程池步骤
  • 定义线程池结构: 包括线程数量、任务队列、互斥锁、条件变量等;

  • 实现任务队列: 用于存储待执行的任务;

  • 编写线程工作函数: 该函数将被多个线程同时执行,并从任务队列中取出任务进行处理;

  • 初始化和管理线程池: 包括创建线程、启动线程、销毁线程等操作;

  • 添加任务到线程池: 提供接口将新任务添加到任务队列中,并通知等待的线程。

由于C语言相对底层,因此实现这些功能需要更多的手动编码和对系统资源的管理。此外,你还需要考虑线程安全和性能优化等问题,第三方库可在C++、python中查找,避免从头实现线程池。

下文将具体说明C语言实现线程池的代码。

二、实现

2.1 定义线程池结构

首先,定义一个包含线程池所需所有信息的结构体,如线程数组、任务队列、互斥锁、条件变量等。

#include <pthread.h>  
#include <stdbool.h>  
#include <stdlib.h> #define MAX_THREADS 4 // 任务队列节点  
typedef struct task {  void (*function)(void*);  void* arg;  struct task* next;  
} task_t;  // 线程池结构体  
typedef struct {  pthread_t threads[MAX_THREADS]; // 线程数组  pthread_mutex_t queue_mutex;    // 保护任务队列的互斥锁  pthread_cond_t queue_cond;      // 任务队列的条件变量  bool stop;                      // 线程池停止标志  task_t* head;                   // 任务队列头指针  task_t* tail;                   // 任务队列尾指针  int thread_count;               // 当前活跃线程数  int active_threads;             // 最大活跃线程数(可配置)  
} threadpool_t;
2.2 初始化线程池

实现一个函数来初始化线程池,包括创建线程、初始化同步等。

void* worker(void* arg) {  threadpool_t* pool = (threadpool_t*)arg;  while (true) {  pthread_mutex_lock(&pool->queue_mutex);  // 如果线程池已停止且没有任务,则退出循环  if (pool->stop && pool->head == NULL) {  pthread_mutex_unlock(&pool->queue_mutex);  break;  }  // 等待任务或线程池停止信号  while (!pool->stop && pool->head == NULL && pool->thread_count >= pool->active_threads) {  pthread_cond_wait(&pool->queue_cond, &pool->queue_mutex);  }  // 取出任务(如果线程池未停止且队列不为空)  task_t* task = NULL;  if (!pool->stop && pool->head != NULL) {  task = pool->head;  pool->head = task->next;  if (pool->head == NULL) {  pool->tail = NULL;  }  pool->thread_count--;  }  pthread_mutex_unlock(&pool->queue_mutex);  // 执行任务(如果任务不为空)  if (task != NULL) {  (*(task->function))(task->arg);  free(task); // 释放任务节点内存(如果任务节点是动态分配的)  }  }  return NULL;  
}  void threadpool_init(threadpool_t* pool, int active_threads) {  pool->stop = false;  pool->head = pool->tail = NULL;  pool->thread_count = 0;  pool->active_threads = active_threads;  pthread_mutex_init(&pool->queue_mutex, NULL);  pthread_cond_init(&pool->queue_cond, NULL);  for (int i = 0; i < active_threads; i++) {  pthread_create(&pool->threads[i], NULL, worker, pool);  }  
}
2.3 添加任务到线程池

实现一个函数来向线程池的任务队列中添加任务,并唤醒一个等待的线程(如果有的话)。

void threadpool_add_task(threadpool_t* pool, void (*function)(void*), void* arg) {  task_t* new_task = (task_t*)malloc(sizeof(task_t));  new_task->function = function;  new_task->arg = arg;  new_task->next = NULL;  pthread_mutex_lock(&pool->queue_mutex);  if (pool->tail == NULL) {  pool->head = pool->tail = new_task;  } else {  pool->tail->next = new_task;  pool->tail = new_task;  }  pthread_cond_signal(&pool->queue_cond);  pthread_mutex_unlock(&pool->queue_mutex);  
}
2.4 销毁线程池

实现一个函数来停止所有线程并销毁线程池。

void threadpool_destroy(threadpool_t* pool) {  pool->stop = true;  pthread_cond_broadcast(&pool->queue_cond);  for (int i = 0; i < MAX_THREADS; i++) {  pthread_join(pool->threads[i], NULL);  }  pthread_mutex_destroy(&pool->queue_mutex);  pthread_cond_destroy(&pool->queue_cond);  
}

三、完整简单示例

#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <pthread.h>
#include <unistd.h>#define MAX_THREADS 5typedef struct task {void (*function)(void*);void* arg;struct task* next;
} task_t;typedef struct {pthread_t threads[MAX_THREADS];pthread_mutex_t queue_mutex;pthread_cond_t queue_cond;bool stop;task_t* head;task_t* tail;int thread_count;int active_threads;
} threadpool_t;void* worker(void* arg) {threadpool_t* pool = (threadpool_t*)arg;while (true) {pthread_mutex_lock(&pool->queue_mutex);while (pool->head == NULL && !pool->stop) {pthread_cond_wait(&pool->queue_cond, &pool->queue_mutex);}if (pool->stop && pool->head == NULL) {pthread_mutex_unlock(&pool->queue_mutex);break;}task_t* task = pool->head;if (task != NULL) {pool->head = task->next;if (pool->head == NULL) {pool->tail = NULL;}}pthread_mutex_unlock(&pool->queue_mutex);if (task != NULL) {(*(task->function))(task->arg);free(task); // 释放任务节点内存}sleep(1);}return NULL;
}void threadpool_init(threadpool_t* pool, int active_threads) {pool->stop = false;pool->head = pool->tail = NULL;pool->thread_count = 0;pool->active_threads = active_threads;pthread_mutex_init(&pool->queue_mutex, NULL);pthread_cond_init(&pool->queue_cond, NULL);for (int i = 0; i < active_threads; ++i) {pthread_create(&pool->threads[i], NULL, worker, pool);}
}void threadpool_add_task(threadpool_t* pool, void (*function)(void*), void* arg) {task_t* new_task = (task_t*)malloc(sizeof(task_t));new_task->function = function;new_task->arg = arg;new_task->next = NULL;pthread_mutex_lock(&pool->queue_mutex);if (pool->tail == NULL) {pool->head = pool->tail = new_task;} else {pool->tail->next = new_task;pool->tail = new_task;}pthread_cond_signal(&pool->queue_cond);pthread_mutex_unlock(&pool->queue_mutex);
}void threadpool_destroy(threadpool_t* pool) {pool->stop = true;pthread_mutex_lock(&pool->queue_mutex);pthread_cond_broadcast(&pool->queue_cond);pthread_mutex_unlock(&pool->queue_mutex);for (int i = 0; i < pool->active_threads; ++i) {pthread_join(pool->threads[i], NULL);printf("%d\r\n", i);}pthread_mutex_destroy(&pool->queue_mutex);pthread_cond_destroy(&pool->queue_cond);
}// 示例任务函数
void example_task(void* arg) {int task_id = *(int*)arg;printf("Task %d is executing\n", task_id);free(arg); // 释放参数内存sleep(1);  // 模拟任务执行时间
}int main() {threadpool_t pool;threadpool_init(&pool, 2); // 初始化线程池,最大活跃线程数为2// 添加示例任务for (int i = 0; i < MAX_THREADS; ++i) {int* arg = (int*)malloc(sizeof(int));*arg = i;threadpool_add_task(&pool, example_task, arg);}// 等待一段时间,观察任务执行情况sleep(5);// 销毁线程池threadpool_destroy(&pool);return 0;
}

运行结果为:

在这里插入图片描述

这段代码演示了一个简单的线程池的使用方式。在主函数中,我们初始化了一个线程池(最大活跃线程数为2),然后添加了5个示例任务(每个任务执行时间模拟为1秒)。在任务执行完毕后,通过调用 threadpool_destroy 函数来销毁线程池,确保所有任务都被执行完毕。

注意:这里销毁的时候保证知道数组中有多少个有效的线程ID,进而销毁。

可在结构体中添加pool->num_threads ,即用来保存线程池中线程数量的成员变量,应该在初始化线程池时进行设置,并在后续操作中根据需要使用,则不会出现在销毁时越界或死锁。

这个示例展示了如何使用线程池来管理并发执行的任务,以及如何通过互斥锁和条件变量来实现线程安全的任务队列操作。

http://www.lryc.cn/news/401132.html

相关文章:

  • 【学习css3】使用flex和grid实现等高元素布局
  • 如何防止Eclipse格式化程序在行注释开头插入空格
  • Nextjs 调用组件内的方法
  • ip地址是电脑还是网线决定的
  • Hadoop中HDFS、Hive 和 HBase三者之间的关系
  • opencv—常用函数学习_“干货“_10
  • Jmeter二次开发Demo
  • MongoDB综合实战篇(超容易)
  • 框架设计MVVM
  • RK3399基础部分
  • linux高级编程(广播与组播)
  • Andriod Stdio新建Kotlin的Jetpack Compose简单项目
  • Linux多线程编程-哲学家就餐问题详解与实现(C语言)
  • 从C向C++18——演讲比赛流程管理系统
  • QThread和std::thread
  • LeetCode 算法:组合总和 c++
  • 【两大3D转换SDK对比】HOOPS Exchange VS. CAD Exchanger
  • Openerstry + lua + redis根据请求参数实现动态路由转发
  • 数字名片-Pushmall 智能AI数字名片7月更新计划
  • 21. Python代码快速查看数组分布
  • 记录些Redis题集(3)
  • OracleLinux6.9升级UEK内核
  • React学习笔记03-----手动创建和运行
  • ubantu22.04安装OceanBase 数据库
  • 【linux】【深度学习】fairseq框架安装踩坑
  • 【Python爬虫教程】第7篇-requests模块的cookies保存和使用
  • 微信小程序开发基础知识6----使用npm包
  • 如何在element中table的 v-for中 使用slot-scope?
  • 企业网络实验dhcp-snooping、ip source check,防非法dhcp服务器、自动获取ip(虚拟机充当DHCP服务器)、禁手动修改IP
  • 20. Python读取.mat格式文件通用函数