当前位置: 首页 > news >正文

C/C++开发,无可避免的多线程(篇六).线程池封装类

一、线程池概念

        线程池是一种多线程处理方式,它包含一个线程工作队列和一个任务队列。当有任务需要处理时,线程池会从线程工作队列中取出一个空闲线程来处理任务,如果线程工作队列中没有空闲线程,则任务会被放入任务队列中等待处理。当线程处理完任务后,它会返回线程工作队列中等待下一个任务的到来。线程池的好处是可以避免频繁创建和销毁线程,提高程序的效率和稳定性。

        在c++中,所谓的任务队列就是一个双端队列,采用先进先出的原则,而任务则是一些系列的功能函数。所谓线程工作队列,就是一组线程,这些线程通常采用容器管理起来。

        线程池可以应用于需要频繁创建和销毁线程的场景,例如网络服务器、数据库连接池等。通过线程池,可以提高线程的复用率,减少线程创建和销毁的开销,从而提高系统的性能和稳定性。同时,线程池还可以控制并发线程的数量,避免系统资源被过度占用,保证系统的可靠性和可用性。

        线程池一般可以提高程序的效率,尤其是具有延时等待、复杂交互、共享线程等情况,因为它可以避免频繁地创建和销毁线程,从而减少了系统开销。同时,线程池还可以控制线程的数量,避免线程数量过多导致系统资源的浪费。在使用线程池时,需要注意线程安全问题,例如共享变量的访问、线程间的同步等。可以通过加锁、使用原子操作等方式来保证线程安全。但是有时候采用线程池是个不明智的举动,例如那些可以连续执行,无需等待,延时的纯计算场景。

二、任务队列设计

        通常任务队列就是将一个个任务(即函数对象或函数指针)装载进一个队列内,然后各个线程从该队列中取出任务执行。任务进入队列遵循先进先出原则,当然复杂的可以给任务执行优先级,进入队列时按优先级排序。另外为了队列跨线程使用的安全,需要通过互斥变量来约束。

        创建taskqueue.h和taskqueue.cpp源文件,在taskqueue.h创建一个TaskQueue类模板,其内置一个std::deque容器和一个std::mutex互斥锁及一些辅助变量,提供任务入列、出列、大小及空队列判断等成员函数,内容如下:

#ifndef _TASK_QUEUE_H_
#define _TASK_QUEUE_H_
/************************************************************************Copyright 2023-03-06, pyfree**File Name       : taskqueue.h*File Mark       : *Summary         : *任务数据队列类,线程安全**Current Version : 1.00*Author          : pyfree*FinishDate      :**Replace Version :*Author          :*FinishDate      :************************************************************************/
#include <queue>
#include <mutex>
#include <string>template <typename T>
class TaskQueue
{
public:TaskQueue(std::string desc = "taskqueue_for_threadsafe");~TaskQueue();///*** 获取队列大小* @return {int } 队列大小*/size_t size();/*** 判定队列是否为空* @return {bool } 是否为空队列*/bool empty();/*** 获取队列头元素* @param it {T&} 头元素* @return {bool } 是否成功*/bool get_front(T &it);/*** 删除元素* @return {bool } 是否成功*/bool pop_front();/*** 获取队列头元素,并从队列终删除* @param it {T&} 头元素* @return {bool } 是否成功*/bool pop_front(T &it);/*** 从队列头开始逐步获取多个元素,并剔除* @param its {queue<T>&} 获取到的元素集* @param sizel {int} 一次获取多少个* @return {bool } 至少获取一个元素以上则成功*/bool get_queue(std::queue<T> &its,unsigned int sizel=5);/*** 从队列尾部添加元素* @param it {T} 被添加元素* @return {void } 无返回*/void add_back(T it);/*** 从队列头部添加元素* @param it {T} 被添加元素* @return {void } 无返回*/void add_front(T it);/*** 清空元素* @return {void }*/void clear();/*** 重置容器元素大小限制* @param sizel {unsigned int} 容器限制长度* @return {void }*/void reQSize(unsigned int sizel);
private:TaskQueue& operator=(const TaskQueue&) {return this;};void over_log(bool addfront_flag);
protected:std::string queue_desc;
private:/点集转发//协议解析结果缓存std::deque<T> cache_queue;	    //队列容器std::mutex m_Mutex;				//线程锁,c++11unsigned int QSize;		        //队列大小约束,超出是会从队列头剔除旧数据腾出空位在对末添加数据int queue_overS;				//队列溢出次数计数
};#include "taskqueue.cpp"#endif //_TASK_QUEUE_H_

        由于这一个类模板,因此要遵循类模板声明定义分离一些规则,因此在头文件结尾添加了#include "taskqueue.cpp"(PS,本专栏的无可避免的模板编程实践(篇一)课题2.4节讲述过原因)。模板还定义了容器队列长度约束,防止容器一直添加内容而无线程处理的异常情况出现,并会打印容器溢出日志信息。类模板的TaskQueue定义放置在taskqueue.cpp实现,由于头文件结尾添加了#include "taskqueue.cpp",其实相当于是在头文件实现,这样分离只是为了统一排版。

#include "taskqueue.h"#include <stdio.h>template <typename T>
TaskQueue<T>::TaskQueue(std::string desc): queue_desc(desc), QSize(100), queue_overS(0)
{};template <typename T>
TaskQueue<T>::~TaskQueue()
{}
//
template <typename T>
size_t TaskQueue<T>::size()
{std::unique_lock<std::mutex> lock(m_Mutex);return cache_queue.size();
}template <typename T>
bool TaskQueue<T>::empty()
{std::unique_lock<std::mutex> lock(m_Mutex); return cache_queue.empty();
}template <typename T>
bool TaskQueue<T>::get_front(T &it) 
{std::unique_lock<std::mutex> lock(m_Mutex);bool ret = !cache_queue.empty();if (ret) { it = cache_queue.front(); }return ret;
}template <typename T>
bool TaskQueue<T>::pop_front() 
{std::unique_lock<std::mutex> lock(m_Mutex);bool ret = !cache_queue.empty();if (ret) { cache_queue.pop_front();}return ret;
}template <typename T>
bool TaskQueue<T>::pop_front(T &it)
{std::unique_lock<std::mutex> lock(m_Mutex);bool ret = !cache_queue.empty();if (ret) {it = cache_queue.front();cache_queue.pop_front();}return ret;
};template <typename T>
bool TaskQueue<T>::get_queue(std::queue<T> &its,unsigned int sizel)
{std::unique_lock<std::mutex> lock(m_Mutex);while (!cache_queue.empty()){its.push(cache_queue.front());cache_queue.pop_front();if (its.size() >= sizel){break;}}return !its.empty();
};template <typename T>
void TaskQueue<T>::add_back(T it) 
{{std::unique_lock<std::mutex> lock(m_Mutex);if (cache_queue.size() > QSize) {queue_overS++;cache_queue.pop_front();}cache_queue.push_back(it);}  over_log(false);
}template <typename T>
void TaskQueue<T>::add_front(T it)
{{std::unique_lock<std::mutex> lock(m_Mutex);if (cache_queue.size() > QSize) {queue_overS++;cache_queue.pop_front();}cache_queue.push_front(it);}over_log(true);
}template <typename T>
void TaskQueue<T>::clear()
{std::unique_lock<std::mutex> lock(m_Mutex);cache_queue.clear();queue_overS = 0;
}template <typename T>
void TaskQueue<T>::over_log(bool addfront_flag)
{if (queue_overS >= 10) {//每溢出10次,报告一次if(addfront_flag)printf("add item to queue %s at first,but the size of TaskQueue is up to limmit size: %d.\n", queue_desc.c_str(), QSize);elseprintf("add item to queue %s at end,but the size of TaskQueue is up to limmit size: %d.\n", queue_desc.c_str(), QSize);	queue_overS = 0;}
}template <typename T>
void TaskQueue<T>::reQSize(unsigned int sizel)
{QSize = sizel;
}

        其实一直跟着本人博客的应该很好理解这些代码,毕竟数据队列在前面将TCP/Socket通信开发实战案例博文内就讲述过,这里很类似,只是做了一些调整而已。

三、线程池类设计

        线程池的设计,首选它包含一个任务队列,然后包括一组任务线程,这组线程采用std::vector来装载管理,任务线程是用来执行任务的。简单来说就是从任务队列取出任务,把活干了。

#ifndef _THREAD_EPOLL_H_
#define _THREAD_EPOLL_H_
#include "taskqueue.h"#include <functional>
#include <thread>
#include <future>class ThreadPool
{
public:ThreadPool(const int n_threads = 5);        //线程池显式构造函数,默认创建及初始化5个线程~ThreadPool();void init();                                // 初始化线程池void close();                               // 等待正执行任务完成并关闭线程池void setTaskQSize(unsigned int sizel);// 提交一个任务(function)给线程池做异步处理template <typename F, typename... Args>auto addTask(F &&f, Args &&...args) -> std::future<decltype(f(args...))>;//指定auto返回值格式std::future<>friend class ThreadWorker;                  //友元线程工作类
private:ThreadPool(const ThreadPool &) = delete;ThreadPool(ThreadPool &&) = delete;ThreadPool &operator=(const ThreadPool &) = delete;ThreadPool &operator=(ThreadPool &&) = delete;
private:bool m_running;                                         // 线程池是否处于运行态势TaskQueue<std::function<void()> > m_taskqueue;          // 任务队列,跨线程安全std::vector<std::thread>          m_threads;            // 作业线程队列std::mutex                        m_conditional_mutex;  // 线程休眠锁互斥变量std::condition_variable           m_conditional_lock;   // 线程环境锁,可以让线程处于休眠或者唤醒状态
};class ThreadWorker          // 工作类线程
{
private:int m_id;               // 工作idThreadPool *m_pool;     // 所属线程池
public:ThreadWorker(ThreadPool *pool, const int id);// 构造函数void operator()();      // 重载()操作          
};
#include "thread_pool.cpp"  //有模板成员函数存在
#endif //_THREAD_EPOLL_H_

        由于addTask函数是成员函数模板,因此同样在头文件末尾添加#include "thread_pool.cpp"。

        下来就是实现线程池的重头戏了,其中最关键的就是添加任务进入队列的addTask函数,因为我们的任务队列元素是一个函数对象,采用的是通用多态函数包装器std::function类模板包裹的void()函数对象。而往线程池添加任务是各种函数及参数集。通过转发调用包装器std::bind函数模板,将传入函数及参数集转为函数对象,再由类模板 std::packaged_task包装可调用 (Callable) 目标函数,即std::bind函数模板绑定转换后的函数对象。最后采用std::make_shared函数模板将函数包装为 std::shared_ptr实例,随即将该实例通过lambda表达式的函数包装器,最终转换为满足可复制构造 (CopyConstructible) 和可复制赋值 (CopyAssignable)的函数对象void()加入任务队列。

        thread_pool.cpp

#include "thread_pool.h"
//ThreadPool类
// 线程池构造函数
ThreadPool::ThreadPool(const int n_threads): m_threads(std::vector<std::thread>(n_threads)), m_running(false)
{
}ThreadPool::~ThreadPool()
{close();std::unique_lock<std::mutex> lock(m_conditional_mutex);m_taskqueue.clear();m_threads.clear();
}
//
void ThreadPool::init()
{for (int i = 0; i < m_threads.size(); ++i){m_threads.at(i) = std::thread(ThreadWorker(this, i)); // 分配工作线程}m_running = true;
}
//
void ThreadPool::close()
{m_running = false;m_conditional_lock.notify_all();        // 通知,唤醒所有工作线程for (int i = 0; i < m_threads.size(); ++i){if (m_threads.at(i).joinable())     // 判断线程是否在等待{m_threads.at(i).join();         // 将线程加入到等待队列}}
}void ThreadPool::setTaskQSize(unsigned int sizel)
{std::unique_lock<std::mutex> lock(m_conditional_mutex);m_taskqueue.reQSize(sizel);
}
// 关键函数
template <typename F, typename... Args>
auto ThreadPool::addTask(F &&f, Args &&...args) -> std::future<decltype(f(args...))>
{// 采用std::bind绑定函数f及形参集args...创建函数对象(decltype获取函数对象类型)std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...); /*以 args-func 为 <T> 的构造函数参数列表,构造 T 类型对象并将它包装于 std::shared_ptr,*T是类模板 std::packaged_task 包装的可调用 (Callable) 目标函数,即std::bind绑定的f(arg...)*可移动的共享互斥体所有权包装,封装到共享指针中,支持复制构造,并隐藏了函数参数集*/auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()> >(func);//再次包装,构建一个lambda表达式的函数包装器,满足可复制构造 (CopyConstructible) 和可复制赋值 (CopyAssignable)的函数对象void()std::function<void()> warpper_func = [task_ptr](){(*task_ptr)();};m_taskqueue.add_back(warpper_func); // 队列通用安全封包函数,并压入安全队列m_conditional_lock.notify_one();    // 唤醒一个等待中的线程return task_ptr->get_future();      // 返回先前注册的任务指针,task_ptr类型为std::packaged_task
}
//ThreadWorker类
// 构造函数
ThreadWorker::ThreadWorker(ThreadPool *pool, const int id) : m_pool(pool), m_id(id)
{
}
// 重载()操作
void ThreadWorker::operator()()
{std::function<void()> func;       // 定义基础函数类funcbool result = false;              // 是否成功从队列中取出元素while (m_pool->m_running){{//锁脱离作用域解锁// 为线程环境加锁,互访问工作线程的休眠和唤醒std::unique_lock<std::mutex> lock(m_pool->m_conditional_mutex);if (m_pool->m_taskqueue.empty())                // 如果任务队列为空,阻塞当前线程{m_pool->m_conditional_lock.wait(lock);      // 等待条件变量通知,开启线程}result = m_pool->m_taskqueue.pop_front(func);   // 取出任务队列中的首元素}if (result) func();                                 // 如果成功取出,执行工作函数}
}

        上述代码可以看到,创建的线程是以工作分配管理线程类ThreadWorker的operator操作函数(这个知识点可以参考本主题篇四的博文)为线程功能函数来实现线程循环体的,就是不断从任务队列中获取到任务,干活。

四、线程池应用测试

        上面代码就设计了一个基本的线程池了,下来就调用问题。

        创建test.h和test.cpp源文件,直接上代码,如下。有两个函数:函数threadPool_test顾名思义就是测试线程池的,main_doit用来测试纯主线程执行同等任务的效率;return_variance函数就带返回值的线程功能函数(即工作任务),out_variance函数是返回值为void,但是返回信息是通过参数引用传递的线程功能函数:

//test.h
#ifndef _TEST_H_
#define _TEST_H_void threadPool_test();
void main_doit();#endif //_TEST_H_
//test.cpp
#include <iostream>
#include <math.h>
#include <random>
#include <chrono>
#include <unistd.h>         //usleep
#include "thread_pool.h"const unsigned int sizel = 100000;
auto end_poll = std::chrono::system_clock::now();double return_variance(int min, int max)
{if(min>max) return 0.0;int size = (max-min+1);double sum = ((double)(min+max)*size)/2.0;double mean =  sum/size;// 求方差double variance  = 0.0;for (int i = min ; i <= max ; i++){variance = variance + pow(i-mean,2);}variance = variance/size;// std::cout<<mean<<" \n";     // 均值// std::cout<<variance<<" \n"; // 方差//usleep(10);                    //让任务等待一下...//测试线程池执行效率使用,只能大概,毕竟先进先出if((min+1)==sizel) end_poll = std::chrono::system_clock::now(); return variance;
};void out_variance(double &out,int min, int max)
{out = return_variance(min,max);
};void main_doit()
{//测试主线程自己执行任务时,间隔时间// 提交方差操作,总共100个auto start = std::chrono::system_clock::now();for (size_t i = 0; i < sizel; i++){return_variance(i,i+rand()/sizel);}auto end = std::chrono::system_clock::now();std::chrono::duration<double> diff = end-start;std::cout << "main product and consum diff.count() = " <<  diff.count() << "\n";//主线程生成加执行一条龙
}
//
void threadPool_test()
{//测试线程池执行任务时,主线程经历间隔时间// 创建10个线程的线程池ThreadPool pool(10);pool.setTaskQSize(sizel);//重设队列大小限制,防止溢出// 初始化线程池pool.init();// 提交方差操作,总共100个auto start = std::chrono::system_clock::now();for (size_t i = 0; i < sizel; i++){pool.addTask(return_variance, i, i+rand()/sizel);}auto end = std::chrono::system_clock::now();std::chrono::duration<double> diff = end-start;start = std::chrono::system_clock::now();//生产完成时刻std::cout << "main product diff.count() = " << diff.count() << "\n";//主线程生成任务//取得返回值测试(任务结果)pool.setTaskQSize(100);//重新设回容器大小限制// 使用ref传递的输出参数提交函数double output_ref;auto future1 = pool.addTask(out_variance, std::ref(output_ref), 9, 99);// 等待操作输出完成future1.get();std::cout << "[9,99] variance is equals to " << output_ref << std::endl;// 使用return参数提交函数auto future2 = pool.addTask(return_variance, 8, 88);// 等待乘法输出完成double variance = future2.get();std::cout << "[8,88] variance is equals to " << variance << std::endl;// 关闭线程池pool.close();diff = end_poll-start;//线程池执行任务时间间隔std::cout << "main consum diff.count() = " << diff.count() << "\n";//线程池执行任务
}

        另外可以通过线程池的addTask返回对象(std::future)实现对函数返回值的获取,std::future的知识点见本课题的篇二的“3.6 线程辅助-Future”。

        在main.cpp源文件内,调用者两个函数来测试:

#include "test.h"int main(int argc, char* argv[])
{threadPool_test();main_doit();return 0;
}

        编译及测试g++ main.cpp test.cpp -o test.exe -std=c++11,运行测程序:

         是不是很惊讶,多线程怎么还比主线程单独执行这些同等任务更高效。正如前面所说的,主线程向线程池添加任务是通过一些列转换的,目的就是为了适应各个功能函数转换为统一的函数对象,但是这样就增加了很多计算成本,以及线程锁的使用,各个线程进行了资源竞争,效率当然就差了。尤其还是这种纯粹计算、连续执行、无延时要求无等待要求的功能函数。

        这就涉及到多线程编程最核心的问题了资源竞争。CPU有多核,可以同时执行多个线程是没有问题的。但是控制台(资源)却只有一个,同时只能有一个线程拥有这个唯一的控制台,将数字输出。大多情况下,程序可能只分配了一个CPU内核在运算(本例子任务对资源的诉求是没满足系统多核调度的判定),各个线程只是每次运行任务是抢占CPU的一段运行时间而已,各个线程之间(包括主线程)相互抢夺及切换要耗费大量时间,当然就没有单个主线程运行来得快。

        为此,又在关闭线程池前,加入了一句等待:

std::this_thread::sleep_for(std::chrono::seconds(100));  //等待,防止线程池没完成就退出了
// 关闭线程池
pool.close();

        重新编译测试,去资源监测器查看,的确创建了10个子线程,只是CPU使用压力太小,无法促使系统做出更多CPU资源的倾斜分配。

        所有我们在开篇时就指出,线程池可以应用于需要频繁创建和销毁线程的场景,例如网络服务器、数据库连接池等,这些场景才是它们的主战场。

        当然说到,在多CPU多核情况下,让各个线程运行在各个不同的CPU的内核上,和控制台线程(主线程)齐驱并驾,这就是多核并行,这就另外一个主题了。

http://www.lryc.cn/news/33723.html

相关文章:

  • HIVE中如何实现针对IPv6 CIDR的查询
  • 【微信小程序】-- 生命周期(二十八)
  • Kafka 概述
  • 详解Java8中如何通过方法引用获取属性名/::的使用
  • 0106广度优先搜索和最短路径-无向图-数据结构和算法(Java)
  • 僵尸(Zombie)进程
  • JS实现:有一对兔子,从出生后第3个月起每个月都生一对兔子,小兔子长到第三个月后每个月又生一对兔子,假如兔子都不死,问每个月的兔子总数为多少?
  • Verilog如何编写一个基础的Testbench
  • 基于JavaEE社区物业管理系统开发与实现(附源码资料)
  • 问一下ChatGPT:DIKW金字塔模型
  • javaScript基础面试题 ---闭包
  • 如何自定义您的网站实时聊天图标
  • Vue侦听器Watch
  • 云快充研发中心平台架构师谈云原生稳定性建设之路
  • ENVI IDL学习笔记之基本操作
  • 多线程面试题
  • YARN运行流程
  • java八股系列——SpringMVC从接受请求到完成响应的过程
  • Elasticsearch索引全生命周期
  • 汇编指令学习(LOOP)
  • Linux 配置本地yum源
  • 【PyTorch】教程:torch.nn.LeakyReLU
  • 【刷题】-- 基础 -- 二分查找
  • Spark MLlib 特征工程
  • CentOS7 完全卸载 php
  • 关于OCS认证里必须知晓的内容
  • 创业做电商难不难?新人做电商怎么才能挣钱?
  • 【项目设计】高并发内存池(七)[性能测试和提升]
  • PHP:Laravel cast array json数据存数据库时unicode 编码问题和update更新不触发数据转换
  • 自动化测试总结--断言