线程池分析与设计
线程池
基本功能接口
C++11 及以后的标准中,std::packaged_task
和std::future
是并发编程中用于任务封装和结果获取的重要组件,它们通常与线程配合使用,实现异步操作。
std::packaged_task
std::packaged_task
:封装可调用对象为异步任务,它是一个模板类,用于封装任何可调用对象(包括函数、lambda
、函数对象等),并且它还需要与std::future
关联使用,当std::packaged_task
被执行时,其中封装的任务也会运行,结果会存储在内部,和这个std::packaged_task
关联的std::future
可以进行调用。
核心功能
- 异步任务封装:将任务(在线程池中每个任务起始就是一个函数)打包起来,让它可以异步执行
- 和
std::future
绑定:因为每一个std::packaged_task()
会对应一个std::future
,所以在std::packaged_task
执行之后,其中的任务也运行了,结果就存储在内部,等待std::future
通过get_future()
调用. - 执行任务:可以直接通过
operator()
调用,也可传递给线程执行(std::thread
接收std::packaged_task()
当参数就行)
简单函数的示例
这里先给出一个简单的小示例,之后会结合线程池进行阐述
#include <future>
#include <thread>
#include <iostream>int add(int a, int b) {return a + b;
}int main()
{// 1、首先需要封装任务-->这里是封装add(),需要一个int做返回值以及两个int参数std::packaged_task<int(int, int)> task(add);// 2、和std::future绑定 异步任务task调用get_future进行绑定std::future<int> f = task.get_future();// 3、在一个线程中执行这个封装好的异步任务std::thread t(std::move(task), 10, 20);// 这里需要强调一下,封装后的异步任务不可复制,只能进行移动,所以再传给线程做参数时,只能使用std::move()// 因为这里是异步执行的,所以主线程可以执行其他任务// 现在获取这个线程中执行的结果// future() 具有唯一性,使用get()获取一次之后,就不能获取第二次了,如果结果没有就绪就阻塞等待结果就绪int res = f.get(); // 现在就可以打印获得的这个结果了std::cout << "res = " << res << std::endl;// 在创建线程之后,必须 join或者detach,否则就会出现错误t.join();return 0;
}
那么接下来是关于std::future
获取结果
std::future
用于获取异步操作(线程、任务)执行的结果,可以理解为一种类似于“未来结果的占位符”,因为你启动一个异步线程时,可能无法立即得到结果,但是可以使用std::future
对象在未来某个时刻获取结果。
核心功能
- 可以通过
get()
方法获取异步操作得到的结果(返回值),如果在调用get()
时,异步操作还未完成,那么就会阻塞当前线程等待有结果产生。 - 可以通过
valid()
判断future
是否与一个有效的异步操作关联成功,可以通过wait()
阻塞等待结果,也可以通过wait_for()
或wait_until()
等待指定时长之后返回状态。
上方已经给出了示例用法,都是一样的,这里就不给了,待会直接上线程池相关的示例。
任务入队操作
当有新任务到来时,任务会被添加到任务队列中,这个过程中,需要先获取互斥锁,保证任务队列的线程安全,添加任务后,通过条件变量通知等待的线程有新任务到来。我这里将任务划分成了不带返回值的普通任务和带返回值的任务,其中带返回值的任务使用异步封包的方式进行封装,分别如下:
带返回值的异步任务提交到任务队
步骤:
- 通过
std::bind()
和std::make_shared()
创建一个包装了任务的std::package_task
- 获取其对应的
std::future
用于获取任务执行结果 - 在临界区内(加锁)将任务添加到任务队列
tasks
中 - 通知一个等待的线程有新任务
以下是线程池提交带有返回值的任务的示例过程
template<typename F, typename... Args>
auto SubmitRetTask(F&& func, Args... args) -> std::future<std::invoke_result_t<F, Args...>> {// 首先定义返回类型auto ret_type = std::invoke_result_t<F, Args...>;// 状态判断if (is_shuntdown_.load() || !is_available_.load()) {// 返回一个控制return std::future<ret_type>();}// 开始封装异步任务auto bind_task = std::bind(std::forward<F>(func), std::forward<Args>(args...));auto task = std::make_shared<std::packaged_task<ret_type>()>(std::move(bind_task));std::future<ret_type> res = task.get_future();{// 在临界区加锁,将任务添加到任务队列中std::lock_guard<std::mutex> lock(task_mutex_);tasks_.emplace([task](){(*task)();});}task_cv_.notify_one();return res;
}// 用到的成员变量
std::queue<std::function<void()> tasks_; // 任务队列
std::atomic<bool> is_shutdown_; // 线程是否关闭
std::atomic<bool> is_available_; // 线程池是否还有效std::mutex task_mutex_; // 任务锁
std::condition_variable task_cv_; // 条件变量,用于阻塞任务
不带返回值的普通任务
template<typename F, typename... Args>
void SubmitTask(F&& func, Args... args) {// 终止条件if (is_shutdown_.load() || !is_available_.load()) {return;}// 封装任务auto task = std::bind(std::forward<F>(func), std::forward<Args>(args...));{std::lock_guard<std::mutex> lock(task_mutex_);tasks.emplace([task](){task(); // 调用对应的任务});}// 唤醒一个阻塞中的线程task_cv_.notify_one();
}
所以可以看出,起始线程池中任务的提交过程整体思路都是一致的,只是有返回值的提交上,添加了std::packaged_task
与std::future
来做异步任务的封装而已。
工作线程取出任务执行过程
在工作线程开启之后,需要去任务队列中取出任务然后执行。主要的过程是,获取互斥锁保证资源的互斥访问,然后检查任务队列是否为空,如果为空,就需要通过条件变量阻塞,等待任务添加进来。获取到任务之后就会执行任务,执行完毕马上继续获取任务,除非线程池停止并且任务队列为空。
主要的过程如下:
- 由于每次都会取出一个任务
task
,每个任务都是一个函数std::function<void()>
- 无限循环,一直访问任务队列,直到线程池停止,然后任务队列为空
- 取出任务队列中的任务,执行
我的取出任务的接口函数
成员变量信息:
using ThreadPtr = std::shared_ptr<std::thread>;
using Task = std::function<void()>;// 一个线程信息结构体,包含管理线程的智能指针
struct ThreadInfo {ThreadInfo();~ThreadInfo();ThreadPtr ptr{nullptr};
}// 每一个线程的信息都是有一个智能指针来管理
using ThreadInfoPtr = std::shared_ptr<ThreadInfo>;// 线程数组
std::vector<ThreadInfoPtr> work_threads_;
添加线程函数:
void ThreadPool::AddThread() {// 先从任务队列中取出一个任务auto func = [this]() {while (true) {Task task;{// 首先获取互斥锁std::unique_lock<std::mutex> lock(task_mutex_);// 通过条件变量等待条件满足task_cv_.wait(lock, [this](){return is_shutdown_.load() || !tasks.empty();});if (is_shutdown_.load() && tasks.empty()) {return;}// 取出任务task = std::move(tasks.front());tasks.pop();}task();}};// 将取出来的任务封装到线程中添加到线程池ThreadInfoPtr thread_ptr = std::shared_ptr<std::thread>();thread_ptr->ptr = std::make_shared<ThreadInfo>(std::move(func));// 添加到线程池中work_threads_.emplace_back(std::move(thread_ptr));
}
线程池类设计
线程池类负责创建线程池、销毁线程池以及管理线程队列、任务队列以及添加任务或者取出任务执行等操作。
类定义如下:
class ThreadPool{
public:explicit ThreadPool(uint32_t thread_count);// 禁止拷贝线程池ThreadPool(const ThreadPool&) = delete;ThreadPool& operator=(const ThreadPool&) = delete;~ThreadPool();bool Start(); // 启动线程池void Stop(); // 停止线程池// 提交任务,分别有普通任务和带返回值的任务template<typename F, typename... Args>void SubmitTask(F&& func, Args... args) {if (is_shutdown_.load() || !is_available_.load()) {return;}auto task = std::bind(std::forward<F>(func), std::forward<Args>(args...));{std::unique_lock<std::mutex> lock(task_mutex_);// 添加任务tasks.emplace([task](){task();});}// 唤醒一个等待任务的阻塞线程task_cv_.notify_one();}// 提交带有返回值的任务template<typename F, typename... Args>auto SubmitRetTask(F&& func, Args... args) -> std::future<std::invoke_result_t<F, Args...>> {auto ret_type = std::invoke_result_t<F, Args...>;// 检查变量判断是否还能继续if (is_shutdown_.load() || !is_available_.load()) {return std::future<ret_type>(); // 此时需要返回一个空对象}auto bind_task = std::bind(std::forward<F>(func), std::forward<Args>(args...));// 用packaged_task和shared_ptr封装异步任务auto task = std::make_shared<std::packaged_task<ret_type>()>(std::move(bind_task));// 与future绑定std::future<ret_type> res = task.get_future();{std::unique_lock<std::mutex> lock(task_mutex_);tasks_.emplace([task](){(*task)();});}// 唤醒等待线程task_cv_.notify_one();return res;}private:// 增加线程函数void AddThread();// 通过智能指针来管理线程using ThreadPtr = std::shared_ptr<std::thread>;using Task = std::function<void()>;struct ThreadInfo{ThreadInfo();~ThreadInfo();ThreadInfo ptr{nullptr};}using ThreadInfoPtr = std::shared_ptr<ThreadInfo>;std::vector<ThreadInfoPtr> works_threads_; std::queue<Task> tasks_;std::mutex task_mutex_;std::condition_variable task_cv_;std::atomic<uint32_t> thread_count_;std::atomic<bool> is_shutdown_;std::atomic<bool> is_available_;
}
接口实现
构造与析构
我这里的思路是构造函数初始化一些基本的成员变量,比如thread_count_
,is_shutdown_
,is_available_
就够了,在启动线程池时采取初始化,并且创建线程添加到线程池中,所以构造函数如下:
explicit ThreadPool::ThreadPool(uint32_t thread_count) : thread_count_(thread_count), is_shutdown_(false), is_available(false){}
析构函数和构造函数的思路类似,里面由Stop()
这个接口来处理线程池的终止
ThreadPool::~ThreadPool() { Stop();}
Start() 启动线程池和 Stop()终止线程池
Start()
负责启动线程池,然后循环创建线程并且添加到容器中
bool ThreadPool::Start() {if (!is_available_.load()) {is_availeable_.store(true);uint32_t thread_count = thread_count_.load();for (uint32_t i = 0; i < thread_count; i++) {AddThread(); // 由这个添加函数完成创建线程并且绑定任务添加到容器中}return true;}return false;
}
Stop()
代表线程池停止接口,首先需要将所有相关的成员变量置为停止状态下对应的值,然后停止所有进程,回收所有进程,保证所有进程只join()
一次
void ThreadPool::Stop() {if (!is_shotdown_.load()) {return ;}// 将对应的变量置为退出状态is_shutdown_.store(true);is_available_.store(false);// 通知所有线程task_cv_.notify_all();// 回收所有线程for (auto& thread_info_ptr : work_threads_) {if (thread_info_ptr && thread_info_ptr->ptr) {std::thread& t = *thread_info_ptr->ptr;if (t.joinable()) {t.join();}}}// 清空所有线程容器work_threads_.clear();{// 在线程池关闭的时候,还需要将任务队列中的所有任务popstd::lock_guard<std::mutex> lock(task_mutex_);while (!tasks_.empty()) {tasks_.pop();}}
}
取出任务绑定线程然后添加到线程函数 AddThread()
AddThread()
这个函数主要是从任务队列中取出任务,然后将其绑定到线程,并且添加到容器中.
void ThreadPool::AddThread() {// 取出任务auto func = [this]() {while(true) {Task task;{std::unqiue_lock<std::mutex> lock(task_mutex_);task_cv_.wait(lock, [this](){return is_shutdown_.load() || !tasks.empty(); });if (is_shutdown_.load() && tasks.empty()) {return;}// 取出任务task = std::move(tasks.front());tasks.pop();}task();}}// 将其封装为线程ThreadInfoPtr thread_ptr = std::make_shared<ThreadInfo>();thread_ptr->ptr = std::make_shared<std::thread>(std::move(func));work_threads_.emplace_back(std::move(thread_ptr));
}
线程池
基本功能接口
C++11 及以后的标准中,std::packaged_task
和std::future
是并发编程中用于任务封装和结果获取的重要组件,它们通常与线程配合使用,实现异步操作。
std::packaged_task
std::packaged_task
:封装可调用对象为异步任务,它是一个模板类,用于封装任何可调用对象(包括函数、lambda
、函数对象等),并且它还需要与std::future
关联使用,当std::packaged_task
被执行时,其中封装的任务也会运行,结果会存储在内部,和这个std::packaged_task
关联的std::future
可以进行调用。
核心功能
- 异步任务封装:将任务(在线程池中每个任务起始就是一个函数)打包起来,让它可以异步执行
- 和
std::future
绑定:因为每一个std::packaged_task()
会对应一个std::future
,所以在std::packaged_task
执行之后,其中的任务也运行了,结果就存储在内部,等待std::future
通过get_future()
调用. - 执行任务:可以直接通过
operator()
调用,也可传递给线程执行(std::thread
接收std::packaged_task()
当参数就行)
简单函数的示例
这里先给出一个简单的小示例,之后会结合线程池进行阐述
#include <future>
#include <thread>
#include <iostream>int add(int a, int b) {return a + b;
}int main()
{// 1、首先需要封装任务-->这里是封装add(),需要一个int做返回值以及两个int参数std::packaged_task<int(int, int)> task(add);// 2、和std::future绑定 异步任务task调用get_future进行绑定std::future<int> f = task.get_future();// 3、在一个线程中执行这个封装好的异步任务std::thread t(std::move(task), 10, 20);// 这里需要强调一下,封装后的异步任务不可复制,只能进行移动,所以再传给线程做参数时,只能使用std::move()// 因为这里是异步执行的,所以主线程可以执行其他任务// 现在获取这个线程中执行的结果// future() 具有唯一性,使用get()获取一次之后,就不能获取第二次了,如果结果没有就绪就阻塞等待结果就绪int res = f.get(); // 现在就可以打印获得的这个结果了std::cout << "res = " << res << std::endl;// 在创建线程之后,必须 join或者detach,否则就会出现错误t.join();return 0;
}
那么接下来是关于std::future
获取结果
std::future
用于获取异步操作(线程、任务)执行的结果,可以理解为一种类似于“未来结果的占位符”,因为你启动一个异步线程时,可能无法立即得到结果,但是可以使用std::future
对象在未来某个时刻获取结果。
核心功能
- 可以通过
get()
方法获取异步操作得到的结果(返回值),如果在调用get()
时,异步操作还未完成,那么就会阻塞当前线程等待有结果产生。 - 可以通过
valid()
判断future
是否与一个有效的异步操作关联成功,可以通过wait()
阻塞等待结果,也可以通过wait_for()
或wait_until()
等待指定时长之后返回状态。
上方已经给出了示例用法,都是一样的,这里就不给了,待会直接上线程池相关的示例。
任务入队操作
当有新任务到来时,任务会被添加到任务队列中,这个过程中,需要先获取互斥锁,保证任务队列的线程安全,添加任务后,通过条件变量通知等待的线程有新任务到来。我这里将任务划分成了不带返回值的普通任务和带返回值的任务,其中带返回值的任务使用异步封包的方式进行封装,分别如下:
带返回值的异步任务提交到任务队
步骤:
- 通过
std::bind()
和std::make_shared()
创建一个包装了任务的std::package_task
- 获取其对应的
std::future
用于获取任务执行结果 - 在临界区内(加锁)将任务添加到任务队列
tasks
中 - 通知一个等待的线程有新任务
以下是线程池提交带有返回值的任务的示例过程
template<typename F, typename... Args>
auto SubmitRetTask(F&& func, Args... args) -> std::future<std::invoke_result_t<F, Args...>> {// 首先定义返回类型auto ret_type = std::invoke_result_t<F, Args...>;// 状态判断if (is_shuntdown_.load() || !is_available_.load()) {// 返回一个控制return std::future<ret_type>();}// 开始封装异步任务auto bind_task = std::bind(std::forward<F>(func), std::forward<Args>(args...));auto task = std::make_shared<std::packaged_task<ret_type>()>(std::move(bind_task));std::future<ret_type> res = task.get_future();{// 在临界区加锁,将任务添加到任务队列中std::lock_guard<std::mutex> lock(task_mutex_);tasks_.emplace([task](){(*task)();});}task_cv_.notify_one();return res;
}// 用到的成员变量
std::queue<std::function<void()> tasks_; // 任务队列
std::atomic<bool> is_shutdown_; // 线程是否关闭
std::atomic<bool> is_available_; // 线程池是否还有效std::mutex task_mutex_; // 任务锁
std::condition_variable task_cv_; // 条件变量,用于阻塞任务
不带返回值的普通任务
template<typename F, typename... Args>
void SubmitTask(F&& func, Args... args) {// 终止条件if (is_shutdown_.load() || !is_available_.load()) {return;}// 封装任务auto task = std::bind(std::forward<F>(func), std::forward<Args>(args...));{std::lock_guard<std::mutex> lock(task_mutex_);tasks.emplace([task](){task(); // 调用对应的任务});}// 唤醒一个阻塞中的线程task_cv_.notify_one();
}
所以可以看出,起始线程池中任务的提交过程整体思路都是一致的,只是有返回值的提交上,添加了std::packaged_task
与std::future
来做异步任务的封装而已。
工作线程取出任务执行过程
在工作线程开启之后,需要去任务队列中取出任务然后执行。主要的过程是,获取互斥锁保证资源的互斥访问,然后检查任务队列是否为空,如果为空,就需要通过条件变量阻塞,等待任务添加进来。获取到任务之后就会执行任务,执行完毕马上继续获取任务,除非线程池停止并且任务队列为空。
主要的过程如下:
- 由于每次都会取出一个任务
task
,每个任务都是一个函数std::function<void()>
- 无限循环,一直访问任务队列,直到线程池停止,然后任务队列为空
- 取出任务队列中的任务,执行
我的取出任务的接口函数
成员变量信息:
using ThreadPtr = std::shared_ptr<std::thread>;
using Task = std::function<void()>;// 一个线程信息结构体,包含管理线程的智能指针
struct ThreadInfo {ThreadInfo();~ThreadInfo();ThreadPtr ptr{nullptr};
}// 每一个线程的信息都是有一个智能指针来管理
using ThreadInfoPtr = std::shared_ptr<ThreadInfo>;// 线程数组
std::vector<ThreadInfoPtr> work_threads_;
添加线程函数:
void ThreadPool::AddThread() {// 先从任务队列中取出一个任务auto func = [this]() {while (true) {Task task;{// 首先获取互斥锁std::unique_lock<std::mutex> lock(task_mutex_);// 通过条件变量等待条件满足task_cv_.wait(lock, [this](){return is_shutdown_.load() || !tasks.empty();});if (is_shutdown_.load() && tasks.empty()) {return;}// 取出任务task = std::move(tasks.front());tasks.pop();}task();}};// 将取出来的任务封装到线程中添加到线程池ThreadInfoPtr thread_ptr = std::shared_ptr<std::thread>();thread_ptr->ptr = std::make_shared<ThreadInfo>(std::move(func));// 添加到线程池中work_threads_.emplace_back(std::move(thread_ptr));
}
线程池类设计
线程池类负责创建线程池、销毁线程池以及管理线程队列、任务队列以及添加任务或者取出任务执行等操作。
类定义如下:
class ThreadPool{
public:explicit ThreadPool(uint32_t thread_count);// 禁止拷贝线程池ThreadPool(const ThreadPool&) = delete;ThreadPool& operator=(const ThreadPool&) = delete;~ThreadPool();bool Start(); // 启动线程池void Stop(); // 停止线程池// 提交任务,分别有普通任务和带返回值的任务template<typename F, typename... Args>void SubmitTask(F&& func, Args... args) {if (is_shutdown_.load() || !is_available_.load()) {return;}auto task = std::bind(std::forward<F>(func), std::forward<Args>(args...));{std::unique_lock<std::mutex> lock(task_mutex_);// 添加任务tasks.emplace([task](){task();});}// 唤醒一个等待任务的阻塞线程task_cv_.notify_one();}// 提交带有返回值的任务template<typename F, typename... Args>auto SubmitRetTask(F&& func, Args... args) -> std::future<std::invoke_result_t<F, Args...>> {auto ret_type = std::invoke_result_t<F, Args...>;// 检查变量判断是否还能继续if (is_shutdown_.load() || !is_available_.load()) {return std::future<ret_type>(); // 此时需要返回一个空对象}auto bind_task = std::bind(std::forward<F>(func), std::forward<Args>(args...));// 用packaged_task和shared_ptr封装异步任务auto task = std::make_shared<std::packaged_task<ret_type>()>(std::move(bind_task));// 与future绑定std::future<ret_type> res = task.get_future();{std::unique_lock<std::mutex> lock(task_mutex_);tasks_.emplace([task](){(*task)();});}// 唤醒等待线程task_cv_.notify_one();return res;}private:// 增加线程函数void AddThread();// 通过智能指针来管理线程using ThreadPtr = std::shared_ptr<std::thread>;using Task = std::function<void()>;struct ThreadInfo{ThreadInfo();~ThreadInfo();ThreadInfo ptr{nullptr};}using ThreadInfoPtr = std::shared_ptr<ThreadInfo>;std::vector<ThreadInfoPtr> works_threads_; std::queue<Task> tasks_;std::mutex task_mutex_;std::condition_variable task_cv_;std::atomic<uint32_t> thread_count_;std::atomic<bool> is_shutdown_;std::atomic<bool> is_available_;
}
接口实现
构造与析构
我这里的思路是构造函数初始化一些基本的成员变量,比如thread_count_
,is_shutdown_
,is_available_
就够了,在启动线程池时采取初始化,并且创建线程添加到线程池中,所以构造函数如下:
explicit ThreadPool::ThreadPool(uint32_t thread_count) : thread_count_(thread_count), is_shutdown_(false), is_available(false){}
析构函数和构造函数的思路类似,里面由Stop()
这个接口来处理线程池的终止
ThreadPool::~ThreadPool() { Stop();}
Start() 启动线程池和 Stop()终止线程池
Start()
负责启动线程池,然后循环创建线程并且添加到容器中
bool ThreadPool::Start() {if (!is_available_.load()) {is_availeable_.store(true);uint32_t thread_count = thread_count_.load();for (uint32_t i = 0; i < thread_count; i++) {AddThread(); // 由这个添加函数完成创建线程并且绑定任务添加到容器中}return true;}return false;
}
Stop()
代表线程池停止接口,首先需要将所有相关的成员变量置为停止状态下对应的值,然后停止所有进程,回收所有进程,保证所有进程只join()
一次
void ThreadPool::Stop() {if (!is_shotdown_.load()) {return ;}// 将对应的变量置为退出状态is_shutdown_.store(true);is_available_.store(false);// 通知所有线程task_cv_.notify_all();// 回收所有线程for (auto& thread_info_ptr : work_threads_) {if (thread_info_ptr && thread_info_ptr->ptr) {std::thread& t = *thread_info_ptr->ptr;if (t.joinable()) {t.join();}}}// 清空所有线程容器work_threads_.clear();{// 在线程池关闭的时候,还需要将任务队列中的所有任务popstd::lock_guard<std::mutex> lock(task_mutex_);while (!tasks_.empty()) {tasks_.pop();}}
}
取出任务绑定线程然后添加到线程函数 AddThread()
AddThread()
这个函数主要是从任务队列中取出任务,然后将其绑定到线程,并且添加到容器中.
void ThreadPool::AddThread() {// 取出任务auto func = [this]() {while(true) {Task task;{std::unqiue_lock<std::mutex> lock(task_mutex_);task_cv_.wait(lock, [this](){return is_shutdown_.load() || !tasks.empty(); });if (is_shutdown_.load() && tasks.empty()) {return;}// 取出任务task = std::move(tasks.front());tasks.pop();}task();}}// 将其封装为线程ThreadInfoPtr thread_ptr = std::make_shared<ThreadInfo>();thread_ptr->ptr = std::make_shared<std::thread>(std::move(func));work_threads_.emplace_back(std::move(thread_ptr));
}
最后,希望自己继续加油,学无止境,还请各位大佬海涵,如有错误请直接指出,我一定会及时修改。如果侵权,请联系我删除~