C++11实现线程池(2)固定线程池
二、FixedThreadPool 的实现
2.1 需求
FixedThreadPool 是一个固定大小的线程池,在创建时会指定线程池中线程的数量。每当有任务提交到线程池时,线程池会利用预先创建的线程来执行任务,直到达到线程池的最大线程数。
2.2 SyncQueue 同步队列的设计
同步队列为线程池中三层结构(同步服务层、排队层、异步服务层 )的中间层,主要作用:
- 保证任务队列中共享数据的线程安全;
- 为上层同步服务层提供添加新任务的接口,为下层异步服务层提供获取任务的接口;
- 限制任务数上限,避免因任务过多导致内存暴涨。
同步队列实现用到 C++11 特性(互斥锁、条件变量、右值引用、std::move
、std::forward
等 ),还支持 Stop
接口终止任务,且优化了任务获取逻辑(一次加锁取出队列所有数据,减少加锁次数、避免数据拷贝 )。
2.3 SyncQueue 类型的代码实现
同步队列类型:生产者消费者队列
const int MaxTaskCount = 200;
template<class T>
class SyncQueue
{
private:std::list<T> m_queue; // 任务缓冲区,也可考虑 deque<T> m_queue;mutable std::mutex m_mutex; // 缓冲区的互斥锁std::condition_variable m_notEmpty;// 不为空的条件变量,对应消费者std::condition_variable m_notFull; // 不为满的条件变量,对应生产者int m_maxSize; // 缓冲区任务数的上限值bool m_needStop; // 同步队列的停止标志,true 表示停止工作bool IsFull() const;bool IsEmpty() const;template<class F>void Add(F &&task);
public:SyncQueue(int maxsize);~SyncQueue();void Put(const T &task);void Put(T &&task);void Take(std::list<T> &list);void Take(T &task);void Stop();bool Empty() const;bool Full() const;size_t Size() const;size_t Count() const;
};
同步队列函数实现(Take、Add、Stop 等 )
- Take 函数:
创建unique_lock
获取互斥锁,通过条件变量m_notEmpty
等待(判断式为队列停止标志或队列不为空 );被唤醒后检查条件,满足则取出队列任务并唤醒添加任务的线程,不满足则释放锁继续等待。 - Add 函数:
流程与Take
类似,获取锁后检查条件(队列停止标志或队列未满 ),满足则插入任务并唤醒取任务线程,不满足则释放锁等待。 - Stop 函数:
获取锁后设置停止标志m_needStop
,唤醒所有等待线程;因线程在m_needStop
为true
时退出,可让等待线程相继退出。同时将m_notFull.notify_all()
放在lock_guard
保护范围外做性能优化(避免唤醒线程额外等待锁释放 )。
完整函数实现代码(示例 ):
#include<list>
#include<mutex>
#include<condition_variable>
#include<iostream>
using namespace std;const int MaxTaskCount = 200;
template<class T>
class SyncQueue
{
private:list<T> m_queue; mutable mutex m_mutex; condition_variable m_notEmpty;condition_variable m_notFull; int m_maxSize; bool m_needStop; bool IsFull() const {bool full = m_queue.size() >= m_maxSize;if(full) {cout << "m_queue 已经满了,需要等待..." << endl;}return full;}bool IsEmpty() const {bool empty = m_queue.empty();if(empty) {cout << "m_queue 已经空了,需要等待..." << endl;}return empty;}template<class F>void Add(F &&task) {unique_lock<mutex> locker(m_mutex);m_notFull.wait(locker, [this]{return m_needStop || !IsFull();});if(m_needStop) {return ;}m_queue.push_back(forward<F>(task));m_notEmpty.notify_one();}
public:SyncQueue(int maxsize) : m_maxSize(maxsize), m_needStop(false) {}~SyncQueue() {}void Put(const T &task) {Add(task);}void Put(T &&task) {Add(forward<T>(task));}void Take(list<T> &list) {unique_lock<mutex> locker(m_mutex);m_notEmpty.wait(locker, [this]{return m_needStop || !IsEmpty();});if(m_needStop) {return ;}list = move(m_queue);m_notFull.notify_one();}void Take(T &task) {unique_lock<mutex> locker(m_mutex);m_notEmpty.wait(locker, [this]{return m_needStop || !IsEmpty();});if(m_needStop) {return ;}task = m_queue.front();m_queue.pop_front();m_notFull.notify_one();}void Stop() {unique_lock<mutex> locker(m_mutex);m_needStop = true;// 唤醒所有等待的线程,让它们检测 m_needStop 并退出m_notEmpty.notify_all();m_notFull.notify_all();}bool Empty() const {lock_guard<mutex> locker(m_mutex);return m_queue.empty();}bool Full() const {lock_guard<mutex> locker(m_mutex);return m_queue.size() >= m_maxSize;}size_t Size() const {lock_guard<mutex> locker(m_mutex);return m_queue.size();}size_t Count() const {return m_queue.size();}
};
2.4 FixedThreadPool 线程池的设计
线程池包含三层(同步服务层、排队层、异步服务层 ),遵循生产者 - 消费者模式:
- 同步服务层(生产者 ):不断将新任务添加到排队层(同步队列 ),需提供添加任务接口;
- 排队层(同步队列 ):保证上下层共享数据安全访问,限制任务数上限;
- 异步服务层(消费者 ):由线程池中预先创建的线程处理排队层任务,需提供停止接口。
线程池成员变量:
- 线程组(
m_threadgroup
):存储预先创建的线程; - 同步队列(
m_queue
):用于任务排队和线程同步; - 运行标志(
m_running
):原子变量,标记线程池是否运行; - 一次性标志(
m_flag
):保证停止操作仅执行一次。
2.5 FixedThreadPool 代码实现
class FixedThreadPool
{
public:using Task = function<void(void)>;
private:list<shared_ptr<thread>> m_threadgroup; // 线程组SyncQueue<Task> m_queue; // 同步队列atomic_bool m_running; // 运行标志,true 运行 / false 停止once_flag m_flag; // 保证 Stop 操作仅执行一次void Start(int numThreads) {m_running = true;for (int i = 0; i < numThreads; ++i) {m_threadgroup.push_back(make_shared<thread>(&FixedThreadPool::RunInThread, this));}}void RunInThread() {while (m_running) {Task task;m_queue.Take(task);if (task && m_running) {task();}}}void StopThreadGroup() {m_queue.Stop();m_running = false;for (auto &thread : m_threadgroup) {if (thread) {thread->join();}}m_threadgroup.clear();}
public:FixedThreadPool(int numThreads = thread::hardware_concurrency()): m_queue(MaxTaskCount), m_running(false) {Start(numThreads);}~FixedThreadPool() {Stop();}void Stop() {call_once(m_flag, [this] { StopThreadGroup(); });}void AddTask(Task&& task) {m_queue.Put(forward<Task>(task));}void AddTask(const Task& task) {m_queue.Put(task);}
};
2.6 线程池的拒绝策略
线程池任务队列满时,有以下拒绝策略:
- AbortPolicy(中止策略 ):默认策略,直接抛出
RejectedExecutionException
,调用者可捕获并自定义处理; - DiscardPolicy(抛弃策略 ):直接丢弃被拒绝任务,不做任何操作;
- DiscardOldestPolicy(抛弃最老策略 ):抛弃阻塞队列中最老任务(队列中下一个要执行的任务 ),重新提交被拒任务;若队列是优先队列,可能抛弃最高优先级任务,不建议与优先队列混用;
- CallerRunsPolicy(调用者运行策略 ):在调用者线程中执行被拒任务,实现调节机制(不抛异常、不丢弃任务,让调用者线程执行任务,间接 “节流” )。
2.7 实现调用者运行策略的代码示例
以同步队列 Add
函数为例,演示超时等待与拒绝策略:
template<class F>
int Add(F&& task) {unique_lock<mutex> locker(m_mutex);// 等待 1 秒,若队列仍满则返回拒绝if (!m_notFull.wait_for(locker, chrono::seconds(1), [this] { return m_needStop || !IsFull(); })) {return 1; // 任务队列满,添加失败}if (m_needStop) {return 2; // 任务队列停止工作}m_queue.push_back(forward<F>(task));m_notEmpty.notify_one();return 0; // 添加成功
}void AddTask(Task&& task) {if (m_queue.Put(forward<Task>(task)) != 0) {cerr << "task queue is full, Add task fail." << endl;// 调用者运行策略:在当前线程执行任务task(); }
}
2.8 测试用例
测试 1:基本任务提交与结果获取
FixedThreadPool pool;void Add(int a, int b, promise<int>& c_promise) {cout << "add begin ..." << endl;this_thread::sleep_for(chrono::milliseconds(2000));int c = a + b;c_promise.set_value(c);this_thread::sleep_for(chrono::milliseconds(1000));cout << "add end ... " << endl;
}void add_a() {promise<int> c_promise;future<int> a_future = c_promise.get_future();function<void(void)> f = bind(Add, 10, 20, ref(c_promise));pool.AddTask(f);cout << "add_a: " << a_future.get() << endl;
}void add_b() {promise<int> c_promise;future<int> a_future = c_promise.get_future();function<void(void)> f = bind(Add, 20, 30, ref(c_promise));pool.AddTask(f);cout << "add_b: " << a_future.get() << endl;
}void add_c() {promise<int> c_promise;future<int> a_future = c_promise.get_future();function<void(void)> f = bind(Add, 30, 40, ref(c_promise));pool.AddTask(f);cout << "add_c: " << a_future.get() << endl;
}int main() {thread tha(add_a);thread thb(add_b);thread thc(add_c);tha.join();thb.join();thc.join();return 0;
}
测试 2:内存分配任务测试
ThreadPool pool;void my_malloc(int size, promise<int*>& c_promise) {int* p = (int*)malloc(size);c_promise.set_value(p);
}void my_free(int* p) {free(p);
}void my_a() {int n = 10;promise<int*> c_promise;future<int*> a_future = c_promise.get_future();function<void(void)> f = bind(my_malloc, sizeof(int) * n, ref(c_promise));pool.AddTask(f);int* p = a_future.get();if (p == nullptr) {cout << "失败 ... " << endl;exit(1);}*p = 5;cout << "p: " << p << " *p: " << *p << endl;pool.AddTask(bind(my_free, p));
}// my_b、my_c 类似 my_a,调整内存分配大小等参数 ...int main() {thread tha(my_a);thread thb(my_b);thread thc(my_c);tha.join();thb.join();thc.join();return 0;
}
2.9 FixedThreadPool 的使用场景
FixedThreadPool 适用于以下场景,核心优势是线程数固定可控,平衡资源占用与任务并发:
并发限制
当需执行大量任务但希望限制并发线程数时,可通过它控制线程数量,避免系统资源(CPU、内存)过度占用,减少线程竞争导致的性能下降。稳定且可控的任务执行
若任务量稳定、执行时间短,FixedThreadPool 能提供固定线程环境,避免频繁创建 / 销毁线程的开销,保证执行效率稳定。服务器应用
处理服务器大量请求时,可根据硬件配置(如 CPU 核心数)和负载预期设置线程池大小,平衡性能与资源利用率,高效响应客户端请求。批量任务处理
需并发处理一批任务(如批量数据处理、文件上传 / 下载)时,线程池可统一管理线程,调度任务执行,简化并发逻辑。
注意:因线程数固定,若任务量超过线程数且队列满,新任务会被拒绝。使用时需结合系统负载、任务特性(执行时长、数量)合理配置线程池大小。