当前位置: 首页 > news >正文

C++11实现线程池(2)固定线程池

二、FixedThreadPool 的实现

2.1 需求

FixedThreadPool 是一个固定大小的线程池,在创建时会指定线程池中线程的数量。每当有任务提交到线程池时,线程池会利用预先创建的线程来执行任务,直到达到线程池的最大线程数。

2.2 SyncQueue 同步队列的设计

同步队列为线程池中三层结构(同步服务层、排队层、异步服务层 )的中间层,主要作用:

  • 保证任务队列中共享数据的线程安全;
  • 为上层同步服务层提供添加新任务的接口,为下层异步服务层提供获取任务的接口;
  • 限制任务数上限,避免因任务过多导致内存暴涨。

同步队列实现用到 C++11 特性(互斥锁、条件变量、右值引用、std::movestd::forward 等 ),还支持 Stop 接口终止任务,且优化了任务获取逻辑(一次加锁取出队列所有数据,减少加锁次数、避免数据拷贝 )。

2.3 SyncQueue 类型的代码实现

同步队列类型:生产者消费者队列
const int MaxTaskCount = 200;
template<class T>
class SyncQueue
{
private:std::list<T> m_queue;               // 任务缓冲区,也可考虑 deque<T> m_queue;mutable std::mutex m_mutex;        // 缓冲区的互斥锁std::condition_variable m_notEmpty;// 不为空的条件变量,对应消费者std::condition_variable m_notFull; // 不为满的条件变量,对应生产者int m_maxSize;                     // 缓冲区任务数的上限值bool m_needStop;                   // 同步队列的停止标志,true 表示停止工作bool IsFull() const;bool IsEmpty() const;template<class F>void Add(F &&task); 
public:SyncQueue(int maxsize);~SyncQueue();void Put(const T &task);void Put(T &&task);void Take(std::list<T> &list);void Take(T &task);void Stop();bool Empty() const;bool Full() const;size_t Size() const;size_t Count() const;
};
同步队列函数实现(Take、Add、Stop 等 )
  • Take 函数
    创建 unique_lock 获取互斥锁,通过条件变量 m_notEmpty 等待(判断式为队列停止标志或队列不为空 );被唤醒后检查条件,满足则取出队列任务并唤醒添加任务的线程,不满足则释放锁继续等待。
  • Add 函数
    流程与 Take 类似,获取锁后检查条件(队列停止标志或队列未满 ),满足则插入任务并唤醒取任务线程,不满足则释放锁等待。
  • Stop 函数
    获取锁后设置停止标志 m_needStop,唤醒所有等待线程;因线程在 m_needStop 为 true 时退出,可让等待线程相继退出。同时将 m_notFull.notify_all() 放在 lock_guard 保护范围外做性能优化(避免唤醒线程额外等待锁释放 )。

完整函数实现代码(示例 ):

#include<list>
#include<mutex>
#include<condition_variable>
#include<iostream>
using namespace std;const int MaxTaskCount = 200;
template<class T>
class SyncQueue
{
private:list<T> m_queue;               mutable mutex m_mutex;        condition_variable m_notEmpty;condition_variable m_notFull; int m_maxSize;                     bool m_needStop;                   bool IsFull() const {bool full = m_queue.size() >= m_maxSize;if(full) {cout << "m_queue 已经满了,需要等待..." << endl;}return full;}bool IsEmpty() const {bool empty = m_queue.empty();if(empty) {cout << "m_queue 已经空了,需要等待..." << endl;}return empty;}template<class F>void Add(F &&task) {unique_lock<mutex> locker(m_mutex);m_notFull.wait(locker, [this]{return m_needStop || !IsFull();});if(m_needStop) {return ;}m_queue.push_back(forward<F>(task));m_notEmpty.notify_one();}
public:SyncQueue(int maxsize) : m_maxSize(maxsize), m_needStop(false) {}~SyncQueue() {}void Put(const T &task) {Add(task);}void Put(T &&task) {Add(forward<T>(task));}void Take(list<T> &list) {unique_lock<mutex> locker(m_mutex);m_notEmpty.wait(locker, [this]{return m_needStop || !IsEmpty();});if(m_needStop) {return ;}list = move(m_queue);m_notFull.notify_one();}void Take(T &task) {unique_lock<mutex> locker(m_mutex);m_notEmpty.wait(locker, [this]{return m_needStop || !IsEmpty();});if(m_needStop) {return ;}task = m_queue.front();m_queue.pop_front();m_notFull.notify_one();}void Stop() {unique_lock<mutex> locker(m_mutex);m_needStop = true;// 唤醒所有等待的线程,让它们检测 m_needStop 并退出m_notEmpty.notify_all();m_notFull.notify_all();}bool Empty() const {lock_guard<mutex> locker(m_mutex);return m_queue.empty();}bool Full() const {lock_guard<mutex> locker(m_mutex);return m_queue.size() >= m_maxSize;}size_t Size() const {lock_guard<mutex> locker(m_mutex);return m_queue.size();}size_t Count() const {return m_queue.size();}
};

2.4 FixedThreadPool 线程池的设计

线程池包含三层(同步服务层、排队层、异步服务层 ),遵循生产者 - 消费者模式:

  • 同步服务层(生产者 ):不断将新任务添加到排队层(同步队列 ),需提供添加任务接口;
  • 排队层(同步队列 ):保证上下层共享数据安全访问,限制任务数上限;
  • 异步服务层(消费者 ):由线程池中预先创建的线程处理排队层任务,需提供停止接口。

线程池成员变量:

  • 线程组(m_threadgroup ):存储预先创建的线程;
  • 同步队列(m_queue ):用于任务排队和线程同步;
  • 运行标志(m_running ):原子变量,标记线程池是否运行;
  • 一次性标志(m_flag ):保证停止操作仅执行一次。

2.5 FixedThreadPool 代码实现

class FixedThreadPool
{
public:using Task = function<void(void)>;
private:list<shared_ptr<thread>> m_threadgroup; // 线程组SyncQueue<Task> m_queue;                // 同步队列atomic_bool m_running;                  // 运行标志,true 运行 / false 停止once_flag m_flag;                       // 保证 Stop 操作仅执行一次void Start(int numThreads) {m_running = true;for (int i = 0; i < numThreads; ++i) {m_threadgroup.push_back(make_shared<thread>(&FixedThreadPool::RunInThread, this));}}void RunInThread() {while (m_running) {Task task;m_queue.Take(task);if (task && m_running) {task();}}}void StopThreadGroup() {m_queue.Stop();m_running = false;for (auto &thread : m_threadgroup) {if (thread) {thread->join();}}m_threadgroup.clear();}
public:FixedThreadPool(int numThreads = thread::hardware_concurrency()): m_queue(MaxTaskCount), m_running(false) {Start(numThreads);}~FixedThreadPool() {Stop();}void Stop() {call_once(m_flag, [this] { StopThreadGroup(); });}void AddTask(Task&& task) {m_queue.Put(forward<Task>(task));}void AddTask(const Task& task) {m_queue.Put(task);}
};

2.6 线程池的拒绝策略

线程池任务队列满时,有以下拒绝策略:

  • AbortPolicy(中止策略 ):默认策略,直接抛出 RejectedExecutionException,调用者可捕获并自定义处理;
  • DiscardPolicy(抛弃策略 ):直接丢弃被拒绝任务,不做任何操作;
  • DiscardOldestPolicy(抛弃最老策略 ):抛弃阻塞队列中最老任务(队列中下一个要执行的任务 ),重新提交被拒任务;若队列是优先队列,可能抛弃最高优先级任务,不建议与优先队列混用;
  • CallerRunsPolicy(调用者运行策略 ):在调用者线程中执行被拒任务,实现调节机制(不抛异常、不丢弃任务,让调用者线程执行任务,间接 “节流” )。

2.7 实现调用者运行策略的代码示例

以同步队列 Add 函数为例,演示超时等待与拒绝策略:

template<class F>
int Add(F&& task) {unique_lock<mutex> locker(m_mutex);// 等待 1 秒,若队列仍满则返回拒绝if (!m_notFull.wait_for(locker, chrono::seconds(1), [this] { return m_needStop || !IsFull(); })) {return 1; // 任务队列满,添加失败}if (m_needStop) {return 2; // 任务队列停止工作}m_queue.push_back(forward<F>(task));m_notEmpty.notify_one();return 0; // 添加成功
}void AddTask(Task&& task) {if (m_queue.Put(forward<Task>(task)) != 0) {cerr << "task queue is full, Add task fail." << endl;// 调用者运行策略:在当前线程执行任务task(); }
}

2.8 测试用例

测试 1:基本任务提交与结果获取
FixedThreadPool pool;void Add(int a, int b, promise<int>& c_promise) {cout << "add begin ..." << endl;this_thread::sleep_for(chrono::milliseconds(2000));int c = a + b;c_promise.set_value(c);this_thread::sleep_for(chrono::milliseconds(1000));cout << "add end ... " << endl;
}void add_a() {promise<int> c_promise;future<int> a_future = c_promise.get_future();function<void(void)> f = bind(Add, 10, 20, ref(c_promise));pool.AddTask(f);cout << "add_a: " << a_future.get() << endl;
}void add_b() {promise<int> c_promise;future<int> a_future = c_promise.get_future();function<void(void)> f = bind(Add, 20, 30, ref(c_promise));pool.AddTask(f);cout << "add_b: " << a_future.get() << endl;
}void add_c() {promise<int> c_promise;future<int> a_future = c_promise.get_future();function<void(void)> f = bind(Add, 30, 40, ref(c_promise));pool.AddTask(f);cout << "add_c: " << a_future.get() << endl;
}int main() {thread tha(add_a);thread thb(add_b);thread thc(add_c);tha.join();thb.join();thc.join();return 0;
}
测试 2:内存分配任务测试
ThreadPool pool;void my_malloc(int size, promise<int*>& c_promise) {int* p = (int*)malloc(size);c_promise.set_value(p);
}void my_free(int* p) {free(p);
}void my_a() {int n = 10;promise<int*> c_promise;future<int*> a_future = c_promise.get_future();function<void(void)> f = bind(my_malloc, sizeof(int) * n, ref(c_promise));pool.AddTask(f);int* p = a_future.get();if (p == nullptr) {cout << "失败 ... " << endl;exit(1);}*p = 5;cout << "p:  " << p << "  *p: " << *p << endl;pool.AddTask(bind(my_free, p));
}// my_b、my_c 类似 my_a,调整内存分配大小等参数 ...int main() {thread tha(my_a);thread thb(my_b);thread thc(my_c);tha.join();thb.join();thc.join();return 0;
}

2.9 FixedThreadPool 的使用场景

FixedThreadPool 适用于以下场景,核心优势是线程数固定可控,平衡资源占用与任务并发:

  1. 并发限制
    当需执行大量任务但希望限制并发线程数时,可通过它控制线程数量,避免系统资源(CPU、内存)过度占用,减少线程竞争导致的性能下降。

  2. 稳定且可控的任务执行
    若任务量稳定、执行时间短,FixedThreadPool 能提供固定线程环境,避免频繁创建 / 销毁线程的开销,保证执行效率稳定。

  3. 服务器应用
    处理服务器大量请求时,可根据硬件配置(如 CPU 核心数)和负载预期设置线程池大小,平衡性能与资源利用率,高效响应客户端请求。

  4. 批量任务处理
    需并发处理一批任务(如批量数据处理、文件上传 / 下载)时,线程池可统一管理线程,调度任务执行,简化并发逻辑。

注意:因线程数固定,若任务量超过线程数且队列满,新任务会被拒绝。使用时需结合系统负载、任务特性(执行时长、数量)合理配置线程池大小。

http://www.lryc.cn/news/611617.html

相关文章:

  • Java Stream API 详解(Java 8+)
  • 云计算一阶段Ⅱ——12. SELinux 加固 Linux 安全
  • 8.6学习总结
  • AI增强的软件测试工具
  • 网站、域名、IP在什么场景下需要备案
  • 动态代理常用的两种方式?
  • OA系统详解:有哪些功能、主流产品怎么选?
  • 自己本地搭建的服务器怎么接公网?公网IP直连服务器方法,和只有内网IP直接映射到互联网
  • 深度解析:AI如何重塑供应链?从被动响应到预测性防御的三大核心实践
  • 希尔排序:高效插入排序的进阶之道
  • 【JS-7-ajax】AJAX技术:现代Web开发的异步通信核心
  • 【Java String】类深度解析:从原理到高效使用技巧
  • 生成网站sitemap.xml地图教程
  • 从代码学习LLM - llama3 PyTorch版
  • GitHub Spark公共预览版上线
  • 利用OJ判题的多语言优雅解耦方法深入体会模板方法模式、策略模式、工厂模式的妙用
  • 本地服务器端部署基于大模型的通用OCR项目——dots.ocr
  • 达梦数据库日常运维命令
  • cdn是什么
  • 【C++】unordered系列容器使用及封装
  • 生成式 AI 重塑自动驾驶仿真:4D 场景生成技术的突破与实践
  • QT----不同线程中信号发送了槽函数没反应QObject::connect: Cannot queue arguments of typeXXX
  • SG105 Pro 网管交换机的3种VLAN配置
  • java实现生成自定义二维码
  • 软考信息安全工程师11月备考
  • Ragflow介绍与安装
  • 考研408_数据结构笔记(第四章 串)
  • Spearman 相关系数与 Pearson 相关系数的区别
  • Java 工具类的“活化石”:Apache Commons 核心用法、性能陷阱与现代替代方案
  • 湖南14个市州分流线得分率对比