并发线程(21)——线程池
文章目录
- 二十一、day21
- 1. 线程池实现
- 1.1 完整代码
- 1.2 解释
二十一、day21
我们之前在学习std::future
、std::async
、std::promise
相关的知识时,通过std::promise
和packaged_task
构建了一个可用的线程池,可参考文章:并发编程(6)——future、promise、async,线程池 | 爱吃土豆的个人博客。但只是将代码给出,简单做了介绍,从本节开始,将从头开始学习如何通过构建线程池以及一些其他和线程池有关的知识。
参考:
恋恋风辰官方博客
【C++】线程池 - AirCL - 博客园
线程池工作原理和实现 - 【C语言版 】C/C++_哔哩哔哩_bilibili
基于C++11实现的异步线程池【C/C++】_哔哩哔哩_bilibili
1. 线程池实现
1.1 完整代码
#ifndef __THREAD_POOL_H__
#define __THREAD_POOL_H__
#include <atomic>
#include <condition_variable>
#include <future>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>class NoneCopy {
public:~NoneCopy() {}
protected:NoneCopy() {}
private:NoneCopy(const NoneCopy&) = delete;NoneCopy& operator=(const NoneCopy&) = delete;
};class ThreadPool : public NoneCopy {
public:// delte拷贝构造和拷贝赋值// ThreadPool(const ThreadPool&) = delete;// ThreadPool& operator=(const ThreadPool&) = delete;// 简单单例模式的实现static ThreadPool& instance() {static ThreadPool ins;return ins;}// 对任务(不接受参数并返回 void 的可调用对象)进行包装的包装器,用Task作为别名using Task = std::packaged_task<void()>;// 析构函数~ThreadPool() {stop();}// 该函数用于插入新任务至队列,并返回新任务的futuretemplate <class F, class... Args>auto commit(F&& f, Args&&... args) ->std::future<decltype(std::forward<F>(f)(std::forward<Args>(args)...))> {using RetType = decltype(f(args...)); // 使用RetType作为可调用对象返回类型的别名if (stop_.load()) // 线程池是否处于关闭状态return std::future<RetType>{};// 将可调用对象和参数用装饰器packaged_task进行包装auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));// 从包装器获取futurestd::future<RetType> ret = task->get_future();// 在{}内使用lock_guard锁定互斥量,并将task使用emplace插入至任务队列{std::lock_guard<std::mutex> cv_mt(cv_mt_);tasks_.emplace([task] { (*task)(); });}cv_lock_.notify_one();return ret;}// 返回线程数量int idleThreadCount() {return thread_num_;}
private:ThreadPool(unsigned int num = std::thread::hardware_concurrency()): stop_(false) {{if (num <= 1)thread_num_ = 2;elsethread_num_ = num;}start();}// 启动线程池void start() {for (int i = 0; i < thread_num_; ++i) {// 使用容器存储线程对象pool_.emplace_back([this]() { // 每个线程执行的lamda函数(线程池执行的任务相同)while (!this->stop_.load()) {Task task;{std::unique_lock<std::mutex> cv_mt(cv_mt_);this->cv_lock_.wait(cv_mt, [this] {return this->stop_.load() || !this->tasks_.empty();});if (this->tasks_.empty())return;task = std::move(this->tasks_.front());this->tasks_.pop();}this->thread_num_--;task();this->thread_num_++;}});}}// 将线程池关闭void stop() {stop_.store(true); // 将判断变量置为truecv_lock_.notify_all(); // 将所有线程唤醒for (auto& td : pool_) {if (td.joinable()) { // 打印线程池中所有的线程idstd::cout << "join thread " << td.get_id() << std::endl;td.join();}}}
private:std::mutex cv_mt_; // 互斥量std::condition_variable cv_lock_; // 条件变量std::atomic_bool stop_; // 布尔类型,配合条件变量使用std::atomic_int thread_num_; // 线程数量std::queue<Task> tasks_; // 任务队列,每个任务使用packaged_task进行包装std::vector<std::thread> pool_; // 使用容器存储线程
};
#endif // !__THREAD_POOL_H__
1.2 解释
1)首先我们需要设计一个基类,所有继承此基类的所有类均会 delete
拷贝构造和拷贝赋值(如果父类 delete
、default
、explicit
构造函数,那么同样会作用于子类),基类 NoneCopy
定义如下:
class NoneCopy {
public:~NoneCopy(){}
protected:NoneCopy(){}
private:NoneCopy(const NoneCopy&) = delete;NoneCopy& operator=(const NoneCopy&) = delete;
};class ThreadPool : public NoneCopy {}
当 ThreadPool
类继承基类 NoneCopy
后,会自动继承父类的拷贝控制属性,ThreadPool
的拷贝构造和拷贝赋值会隐式 delete
。而且我们需要将 ThreadPool
的默认构造函数设置为 private
或 protected
,确保外部无法直接调用 ThreadPool()
来创建实例,只能通过 instance()
方法获取唯一实例。
2)因为线程池不能被拷贝也不能被赋值,并且必须是单例模型,所以接下来我们实现 ThreadPool
的单例,我们在前文说过,单例模式有三种实现方式:1. 通过std::call_once
和std::once_flag
实现;2. 通过静态成员函数实现;3. 智能指针双重检测实现。区别在于1和3可以显式定义删除器,而2没办法定义删除器,但是2的实现方式最简单。我们这里使用第二种方式即可:
// 简单单例模式的实现
static ThreadPool& instance() {static ThreadPool ins;return ins;
}
但注意,该方法只能在C++11及以后的平台才可以使用,因为返回局部静态变量只有在C++11及以上是线程安全的
3)在线程池中,我们需要将任务存储至任务队列中,但因为队列要求存储数据元素的类型相同,我们这里必须定义任务的类型为:
using Task = std::packaged_task<void()>;
std::queue<Task> tasks_;
tasks_.emplace([task] { (*task)(); });
队列使用 STL
的 queue
即可。任务队列 tasks_的元素类型是std::packaged_task<void()>
,但为什么后面我们在 commit
函数中插入任务时插入的是一个lambda
函数?
-
std::packaged_task<void()>
:std::packaged_task<void()>
表示packaged_task
接受任何无传递参数并返回void
的可调用对象。它可以封装任何符合这个签名的可调用对象,包括普通函数、函数对象、和 lambda 表达式。 -
lambda 表达式:
[task] { (*task)(); }
,这个表达式本身是一个可调用对象。它捕获了外部变量task
,并在调用时执行(*task)()
,这实际上是调用了packaged_task
封装的函数。 -
插入到队列:
在使用
tasks_.emplace(...)
时,实际上在构造一个std::function<void()>
类型的对象,而这个对象可以被std::packaged_task
接受。由于 lambda 表达式符合
void()
的函数签名,因此std::packaged_task
能够接受它并正确地存储。
std::packaged_task
重载了operator()
运算符,并通过重载的operator()
来执行包装的可调用对象。比如在命令std::packaged_task<void(int)> task(myFunction);
中,执行task()
其实就算再执行myFunction()
。所以在lambda
函数中(*task)()
其实是在调用可执行函数,只不过我们将其封装到了一个无返回值且无参数的lambda
函数中,这是为了所有类型的可调用对象都可以通过一个无返回值且无参数的lambda
函数封装,以便存储至任务队列中。
4)ThreadPool
的其他私有成员定义如下:
std::mutex cv_mt_; // 互斥量std::condition_variable cv_lock_; // 条件变量std::atomic_bool stop_; // 布尔类型,配合条件变量使用std::atomic_int thread_num_; // 线程数量std::queue<Task> tasks_; // 任务队列,每个任务使用packaged_task进行包装std::vector<std::thread> pool_; // 使用容器存储线程
thread_num_
是线程池当前可用线程的数量,当我们调用一个线程执行任务时,thread_num_
会减一,当任务完成后会加一,保证线程池不超负载。pool_
用于存储线程,因为线程的创建和销毁存在一定的开销,我们需要将线程存放至vector
中,以便复用线程。当线程中的任务执行完毕后,线程池会进入 “可联结状态”(joinable),但std::thread
对象本身不会自动销毁,仍旧保留在 vector 中,让线程处于空闲状态,等待下一个任务,以便复用。stop_
用于判断线程池是否被销毁,默认是False
,当stop_ == True
时,线程池销毁,此时会将任务队列和线程容器全部销毁(等所有任务执行完毕后才会销毁)。
5)ThreadPool
的构造函数被设置为 private,这是为了确保外部无法直接调用 ThreadPool()
来创建实例,只能通过 instance()
方法获取唯一实例:
ThreadPool(unsigned int num = std::thread::hardware_concurrency()): stop_(false) {{if (num <= 1)thread_num_ = 2;elsethread_num_ = num;}start();
}
std::thread::hardware_concurrency()
是thread
类是一个静态方法,用于获取当前计算机的CPU核心数,我们可以根据这个结果在线程池中创建出数量相等的线程,每个线程独自占有一个CPU核心,这些线程就不用分时复用CPU时间片,此时程序的并发效率是最高的。
6)start
函数的主要功能是启动线程,并将线程存储到一个 vector
中进行管理。在线程的回调函数中(lambda
函数执行过程中),线程的主要任务是从任务队列中取出任务并执行。如果队列中有任务,线程会弹出任务并执行;如果队列为空,线程将挂起,等待新的任务被添加被通知(notify_one()
)。这种实现通过条件变量来避免线程在无任务时的忙等问题,从而有效减少资源浪费。而部分初学者在实现线程池时,可能会采用简单的循环检查方式(即当任务队列为空时,线程会不断循环检查队列状态)。这种方式容易导致线程忙等,占用大量 CPU 资源,进而造成性能下降和资源浪费。实现代码如下:
void start() {for (int i = 0; i < thread_num_; ++i) {// 使用容器存储线程对象pool_.emplace_back([this]() { // 每个线程执行的lamda函数(线程池执行的任务相同)while (!this->stop_.load()) { // 判断线程池是否停止Task task; // 初始化一个接受无参并无返回值可调用对象的packaged_task{std::unique_lock<std::mutex> cv_mt(cv_mt_);// 条件变量判断,当满足任务队列不为空或者stop_为true,并且当前线程被唤醒时,退出挂起状态this->cv_lock_.wait(cv_mt, [this] {return this->stop_.load() || !this->tasks_.empty();});// 队列为空,那么就只有stop_为true一种情况,此时无任务需要处理,直接退出if (this->tasks_.empty())return;// 处理队列剩余任务task = std::move(this->tasks_.front());this->tasks_.pop();}this->thread_num_--; // 减少一个线程数用来执行task任务task(); // 执行任务this->thread_num_++; // task任务在线程内是同步执行的,所以当task任务执行完后,可用的线程数加一}});}
}
线程池会启动数量为thread_num_
的线程,每个线程执行以下程序:
- 将当前线程插入至
pool_
容器内进行管理 - 循环判断线程池是否关闭,如果不关闭,那么该线程将会无止境的进行工作
- 为了避免while循环占用CPU资源,使用条件变量挂起当前当前,直至满足(任务队列不为空或者
stop_
为true
),并且当前线程被唤醒时,退出挂起状态。挂起状态时,当前线程会释放锁让其他线程可以访问共享资源;线程被唤醒时,当前线程会重新拿取锁- 如果队列为空,且当前线程被唤醒,那就只有一种可能:线程池关闭。此时无任务需要处理,直接退出
- 如果队列不为空,取出任务队列的第一个元素
- 因为在当前线程内的任务执行是同步的,所以在执行任务前需要将可用线程数减一,待执行完任务后,可用线程数加一。
- 最后,循环第二步~第四步
7)当线程池被销毁时,必须等待线程池中的所有任务执行完毕后才可以销毁资源:
~ThreadPool() {stop();
}// 将线程池关闭
void stop() {stop_.store(true); // 将判断变量置为truecv_lock_.notify_all(); // 将所有线程唤醒for (auto& td : pool_) {if (td.joinable()) { // 打印线程池中所有的线程idstd::cout << "join thread " << td.get_id() << std::endl;td.join();}}
}
stop
函数中我们将停止标记设置为true
,并且调用条件变量的notify_all
唤醒所有线程,被唤醒的线程获取任务队列中的任务,如果队列为空,则该线程直接返回,其他线程继续执行任务,直至所有任务结束,销毁资源。
8) 接下来我们需要封装一个接口用于投递任务给线程池,我们在上面说过,我们需要将任务存储至任务队列中,但因为队列要求存储数据元素的类型相同,我们这里必须定义任务的类型。我们定义了线程池执行的任务类型为 void(void)
的可调用对象,但是现实情况是存在不同类型的任务,返回类型已经参数类型军可能不同,我们应该如何将其封装为一个 void(void)
对象而不影响其使用功能?我们在一开始简单的说了可以通过 packaged_task
重载的 ()
运算符进行调用函数,但具体过程需要详细说明。
我们可以通过 bind
将一个函数绑定为 void(void)
类型:
int functionint(int param) {std::cout << "param is " << param << std::endl;return 0;
}
void bindfunction() {std::function<int(void)> functionv = std::bind(functionint, 3);functionv();
}
假如我们的可执行任务是 functionint
,它接受一个类型为 int
的参数,返回 int
类型,那么我们可以通过 std::bind
以及 std::function
将其绑定为一个 std::function<int(void)>
对象,我们通过执行 std::function
对象即可执行可执行函数,而且 std::function
是类型是 int(void)
。因为我们这里手动给 functionint
了一个参数 3,所以 functionint
的参数类型是 void
,但是它的返回类型仍然是 int
。而但我们的任务队列要放入返回值为void
,参数也为void
的函数,该怎么办呢?
我们可以通过 lambda
生成一个返回值和参数都为 void
的函数,函数内部调用 functionv
即可,我们将上面的函数functionint
和调用的参数3打包放入任务队列:
void pushtasktoque() {std::function<int(void)> functionv = std::bind(functionint, 3);using Task = std::packaged_task<void()>;std::queue<Task> taskque;taskque.emplace([functionv]() {functionv();});
}
因为我们是通过一个类型为void(void)
的 lambda 表达式间接调用的可调用对象,我们可以将这个 lambda 投递至任务队列。
当任务执行完成后,由 packaged_task
封装的任务会返回 future
对象,我们可以通过调用 future
对象的 get()
或 wait()
函数获取任务结果。修改上面的代码,使其返回带有任务结果的 future 对象:
std::future<int> committask() {std::function<int(void)> functionv = std::bind(functionint, 3);auto taskf = std::make_shared<std::packaged_task<int(void)>>(functionv);auto res = taskf->get_future();using Task = std::packaged_task<void()>;std::queue<Task> taskque;taskque.emplace([taskf]() {(*taskf)();});return res;
}
我们将上面这个函数封装为一个适用于多种情况的模板函数,为了保证对象类型的不变,使用 C++ 的完美转发:
template <class F, class... Args>
auto commit(F&& f, Args&&... args) -> std::future<decltype(std::forward<F>(f)(std::forward<Args>(args)...))> {using RetType = decltype(f(args...)); // 使用RetType作为可调用对象返回类型的别名if (stop_.load()) // 线程池是否处于关闭状态return std::future<RetType>{};// 将可调用对象和参数用装饰器packaged_task进行包装auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));// 从包装器获取futurestd::future<RetType> ret = task->get_future();// 在{}内使用lock_guard锁定互斥量,并将task使用emplace插入至任务队列{std::lock_guard<std::mutex> cv_mt(cv_mt_);tasks_.emplace([task] { (*task)(); });}cv_lock_.notify_one();return ret;
}
commit 函数执行完后需要返回 std::packaged_task 对象的 future,以便获取任务执行结果,但是 future 的类型和可调用对象绑定,我们不知道任务执行的结果是什么,我们应该如何确定函数的返回类型?
我们通过 C++11 的尾置推导,std::future<decltype(std::forward<F>(f)(std::forward<Args>(args)...))>
,decltype
会根据根据表达式推断表达式的结果类型,我们用future
存储这个类型,这个future
就是返回值类型。
我们在对 thread、async、future源码进行分析的文章中详细介绍了完美转发原理,并且单独写了一篇文章对完美转发进行分析,可参考文章:
并发编程(1)——线程、thread源码解析 | 爱吃土豆的个人博客
并发编程(8)—— std::async、std::future 源码解析 | 爱吃土豆的个人博客
C++——完美转发(引用折叠+forward) | 爱吃土豆的个人博客