当前位置: 首页 > news >正文

linux高性能服务器-线程池实现

文章目录

        • 定义
        • 应用场景
        • 任务类型
        • 线程数量
        • 数据结构设计:
          • 任务设计:
          • 队列设计:
          • 线程池设计
        • 接口设计

定义

线程池属于生产消费模型,管理维持固定数量线程的池式结构,避免线程频繁的创建和销毁

应用场景

当一类任务耗时,严重影响当前线程处理其他任务,异步执行

任务类型

耗时任务:

  • CPU密集型
  • IO密集型 ( 网络IO 磁盘IO)
线程数量

n * proc

数据结构设计:
任务设计:
typedef struct task_s {void * next;handler_pt func;void * arg;
} task_t;

生产者线程: 发布任务
消费者线程: 取出任务,执行任务
数据结构为链表

队列设计:

typedef struct task_queue_s {void * head;void **tail;int block; // 是否阻塞 spinlock_t lock; // 自选锁pthread_muxtex_t mutex;pthread_cond_t cond;
}task_queue_t;

队列: 存储任务,调度线程池 ,双端开口,先进先出,在多线程中执行,需要加锁
功能: 调取线程池中的消费者线程, 如果此时队列为空,谁(线程)来取任务,谁阻塞休眠
当允许一个进程进入临界资源(互斥状态)。
自旋锁: 其他线程空转cpu,一直等待进入临界资源
互斥锁:切换cpu, 让出执行权, 线程阻塞住,操作系统调用其他的线程

某个线程持有临界资源的时间 < 线程切换的时间 , 自旋锁 ,时间复杂度为0(1)
生产者新增任务,消费者取出任务 ,0(1),均为移动指针完成(尾插法,头插法)
故使用自旋锁 spinlock_t lock

线程池设计
struct thredpool_t {atomic_int quit;  // 原子变量int thrd_count;pthread_t * threads;task_queue_t task_queue;
};

原子操作:一个线程在执行过程中,其他线程不能执行这个线程的内部操作,只能看到线程执行前或者执行后
应用场景: 某一个基础类型给的变量

接口设计
static task_queue_t * __taksqueue_create() {task_queue_t  * queue = malloc(sizeof(*queue));int ret;ret = pthrad_mutex_init(&mutex);if(ret == 0) {ret = pthread_cond_init(&cond);if(ret == 0) {queue->head = NULL;queue->tail = &(queue->head);queue->block = 1;return queue;}pthread_cond_destory(&queue);}pthread_muext_destory(&queue->mutex);return NULL;
}static void __add_task(task_queue_t  * queue, void * task) {void **link = (void **)task; // malloc*link = NULL; // task->next = NULL;spinlock_lock(&queue->lock);*queue->tail = link; // 末尾添加新的节点queue->tail = link // tail 指向新的尾节点spinlock_unlock(&qeuue->lock);pthread_cond_signal(&queue->cond); // 有任务,唤醒休眠的线程
}static task_t * __pop__task(task_queue_t * queue) {spinlock_lock(&queue->lock);if(queue->head) {spinlock_unclock(&queue->lock);return NULL;}task_t *task;task = queue->head;queue->head = task->next;if(queue-head == NULL) {queue->tail = &queue->head; // &NULL}spinlock_unlock(&queue->lock);return task;
} static void * __get_task (task_queue_t *queue) {task_t *task;while((task = __pop_task(queue))== NULL) {pthread_mutex_lock(&queue->lock);if(queue->block == 0) {rerurn NULL;}// pthread_cond_wait 执行过程:// 1. 先unlock(&mutex)// 2. cond 休眠// 3, 生产者 发送signal// 4. cond 唤醒// 5. 既然上clock(&mutex)pthead_cond_wait(&queue->cond,&queue->mutex);  //休眠pthread_mutex_unlock(&queue->mutex);} return task;
}static void __taskqueue_destroy(task_queue_t * queue) {task_t *task;while((task) = _pop_task(queue)) {free(ptr:task);}spinlock_destroy(&queue->lock);pthread_cond_destory(&queue->cond);pthread_mutex_destory(&queue->mutex);free(ptr:queue);
}// 消费者线程 ,取出任务,执行任务
static void * __thrdpoll_worker(void *arg) {thrdpool_t *pool = (thrdpool *)arg;task_t *task;void *ctx;while(atomic_load(&pool->quit) == 0) {  //原子读task = (task *) __get_task(poll->task_queue);if(!task) {break;}handler_pt func = task->func;ctx = taks->arg;free(task);func(ctx);}return NULL;
}
// 设置队列为非阻塞,并唤醒所有的线程
static void __nonblock(task_queue_t *queue) {pthread->mutex_lock =&queue->lock;queue->block = 0;pthread_mutex_unclock(&queue->mutex);pthread_cond_broadcast(queue->cond); 
}
// 创建线程,回滚式创建对象
static int __threads_create(thrdpool * pool, size_t thrd_count) {pthread_attr_t attr;int ret;ret = pthread_attr_init(&attr); //初始化线程参数if (ret == 0) {pool_>threads = (pthread_t *)malloc(sizeof(pthread_t) * thrd_count);if(pool_threads) {int i = 0;for(;i < thrd_count; i++) {if(pthread_create(&pool->threads[i),&attr,start_routine(),NULL);break; // 创建线程失败,返回}pool->thrd_count  = i; pthread_attr_destory(&attr);if( i == thrd_count)reurn 0;__threads_terminante(pool); // 如果创建的线程数量不等于thrd_count,把创建的线程全部销毁free(pool->threads); // 释放堆空间}}return ret;
}// 创建线程池
static thrdpool_t *  thrdpool_create(int thrd_count) {thrdpool_t * pool;poll = (thrdpool_t *)malloc(sizeof(poll);if(!pool) return NULL;task_queue_t  *queue = __taskqueue_create();if(queue) {pool->task_queue = queue;atomic_init(&pool->quit, 0);if(__threads_create(pool,thrd_count) == 0 ) {return pool;}__taskqueue_destory(pool->taks_queue);}free(pool);return NULL;
}static void __threads_terminate(thrdpool_t * pool) {atomic_store(&queue->quit,1); //原子写__nonblock(pool->task_queue); // 设置非阻塞队列,唤醒所有的线程int i;for(i=0; i<pool->thrd_count;i++) {pthread_join(pool->thread[i],NULL);}
}// 生产者创建任务
static int thrdpool_post(thrdpool_t  * pool, handler_pt func, void *arg) {if (atomic_load(&pool->quit) == 1 ) {  //判断线程池是否标记退出return -1;}task * task = (task_t *)malloc(sizeof(task_t));if(!task)  return -1;task->func = func; // 初始化task->arg = arg;__add_task(pool->task_queue,task); // 添加任务return 0;
}//等待所有线程结束,释放资源
static thrdpool_wait(thrdpool_t *pool) {int i;for(i=0; i<poll->thrd_count;i++) {pthread_join(pool->thread[i],NULL);}__taskqueue_destory(pool->taks_queue);free(pool->threads);free(pool);
}
http://www.lryc.cn/news/343434.html

相关文章:

  • 算法训练营第56天|LeetCode 583.两个字符串的删除操作 72.编辑距离
  • 首页最新 多IP浏览器防关联:如何配置多个独立且稳定的IP地址?
  • 电脑连接公司打印机教程
  • JavaScript 中的 Promise.all
  • 机器视觉_联合编程(二)
  • AUTOCRAWLER : A Progressive Understanding Web Agent for WebCrawler Generation
  • php使用服务器端和客户端加密狗环境部署及使用记录(服务器端windows环境下部署、linux环境宝塔面板部署、客户端部署加密狗)
  • Android selinux权限
  • Flutter笔记:Widgets Easier组件库(9)使用弹窗
  • 【解决Android Studio】cmake报错找不到vulkan包
  • 手动卸载32 位office
  • python selenium 滑动后获取动态追加的元素
  • 【idea-sprongboot项目】在linux服务器上纯远程开发方式
  • ADC模-数转换原理与实现
  • Android 文件传输
  • 一起深度学习
  • servlet-会话(cookie与session)
  • windows11忘记登录密码怎么办?
  • C#里如何设置输出路径,不要net7.0-windows
  • 知名员工上网行为管理系统推荐榜单
  • 第12章 软件测试基础(第三部分)测试类型、测试工具
  • open-vm-tools使用虚机的拷贝/粘切
  • CKEditor编辑器的简单使用方法,取值,赋值
  • 创建一个线程对象需要花费多少内存空间
  • Java -- (part23)
  • 1. C++入门:命名空间及输入输出
  • 【Kotlin】Java三目运算转成 kotlin 表达
  • 如何安全可控地进行内外网跨网络传输文件?
  • Python Json数据解析
  • pyinstaller打包pytorch和transformers程序