当前位置: 首页 > news >正文

【C++进阶】实现C++线程池

文章目录

    • 1. thread_pool.h
    • 2. main.cpp

1. thread_pool.h

#pragma once
#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>class ThreadPool
{
public:ThreadPool(size_t);template <class F, class... Args>auto enqueue(F &&f, Args &&...args)-> std::future<typename std::result_of<F(Args...)>::type>;~ThreadPool();private:// 线程池中的工作线程std::vector<std::thread> workers;// 任务队列std::queue<std::function<void()>> tasks;// 同步相关std::mutex queue_mutex;std::condition_variable condition;bool stop;
};
// 构造函数,创建线程并将其放入工作线程中
ThreadPool::ThreadPool(size_t threads): stop(false)
{for (size_t i = 0; i < threads; ++i)workers.emplace_back([this]{for (;;){std::function<void()> task;{std::unique_lock<std::mutex> lock(this->queue_mutex);this->condition.wait(lock,[this]{ return this->stop || !this->tasks.empty(); });if (this->stop && this->tasks.empty())return;task = std::move(this->tasks.front());this->tasks.pop();}task();}});
}
template <class F, class... Args>
auto ThreadPool::enqueue(F &&f, Args &&...args)-> std::future<typename std::result_of<F(Args...)>::type>
{using return_type = typename std::result_of<F(Args...)>::type;auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));std::future<return_type> res = task->get_future();{std::unique_lock<std::mutex> lock(queue_mutex);if (stop)throw std::runtime_error("enqueue on stopped ThreadPool");tasks.emplace([task](){ (*task)(); });}condition.notify_one();return res;
}
// 线程池析构函数,设置停止标志,并等待所有线程完成任务
ThreadPool::~ThreadPool()
{{std::unique_lock<std::mutex> lock(queue_mutex);stop = true;}condition.notify_all();for (std::thread &worker : workers)worker.join();
}
  • 定义了一个名为ThreadPool的类,它包含以下成员:
    • workers:一个std::vector,用于存储线程池中的工作线程
    • tasks:一个std::queue,用于存储待执行的任务
    • queue_mutex:一个互斥锁,用于同步任务队列的访问
    • condition:一个条件变量,用于在添加新任务时唤醒工作线程
    • stop:一个布尔值,表示线程池是否应停止接受新任务并等待所有线程完成后终止。
  • ThreadPool类的构造函数接受一个size_t类型的参数,表示线程池中工作线程的数量。在构造函数中,创建指定数量的工作线程,并在这些线程中执行一个匿名函数,该匿名函数用于从任务队列中获取任务并执行。
  • enqueue成员函数模板用于向线程池添加新任务。它接受一个可调用对象f和其参数args,并将任务添加到任务队列中。enqueue函数返回一个std::future对象,表示任务的异步结果。
  • ThreadPool类的析构函数设置stop标志为true,通知所有工作线程停止接受新任务,并在所有任务完成后终止。然后,调用condition.notify_all()唤醒所有等待的工作线程,最后使用join()等待所有工作线程完成。

2. main.cpp

#include <iostream>
#include <vector>
#include <random>
#include <chrono>
#include "thread_pool.h"// 矩阵相乘的函数
void multiply(const std::vector<std::vector<int>> &A, const std::vector<std::vector<int>> &B, std::vector<std::vector<int>> &C, size_t row)
{size_t num_columns = B[0].size();size_t num_inner = A[0].size();for (size_t col = 0; col < num_columns; ++col){C[row][col] = 0;for (size_t inner = 0; inner < num_inner; ++inner){C[row][col] += A[row][inner] * B[inner][col];}}
}int main()
{const size_t matrix_size = 1000;std::vector<std::vector<int>> A(matrix_size, std::vector<int>(matrix_size));std::vector<std::vector<int>> B(matrix_size, std::vector<int>(matrix_size));std::vector<std::vector<int>> C(matrix_size, std::vector<int>(matrix_size));// 随机填充矩阵 A 和 Bstd::random_device rd;std::mt19937 gen(rd());std::uniform_int_distribution<> dis(1, 10);for (size_t i = 0; i < matrix_size; ++i){for (size_t j = 0; j < matrix_size; ++j){A[i][j] = dis(gen);B[i][j] = dis(gen);}}// 使用线程池进行矩阵相乘ThreadPool pool(20); // 根据自己处理器支持的线程数设定,越大耗时越短auto start = std::chrono::high_resolution_clock::now();std::vector<std::future<void>> results;for (size_t i = 0; i < matrix_size; ++i){results.emplace_back(pool.enqueue(multiply, std::cref(A), std::cref(B), std::ref(C), i));}for (auto &&result : results){result.get();}auto end = std::chrono::high_resolution_clock::now();std::chrono::duration<double> elapsed = end - start;std::cout << "Time elapsed: " << elapsed.count() << " seconds" << std::endl;return 0;
}
  • 定义了一个名为multiply的函数,用于计算矩阵相乘。这个函数接受两个输入矩阵AB,一个输出矩阵C以及一个行索引。在这个示例中,每个线程将负责计算矩阵C中的一行。
  • main函数中,首先定义了矩阵的大小(matrix_size),并创建了大小为matrix_size的二维矩阵A、B和C。
  • 使用随机数生成器填充矩阵A和B的元素。
  • 创建一个包含4个工作线程的线程池pool
  • 记录开始时间,然后将每行矩阵相乘的任务添加到线程池中。这里使用了std::crefstd::ref来传递矩阵的引用,以避免不必要的拷贝。
  • 使用results向量存储每个任务返回的std::future对象。
  • 遍历results向量,并调用每个std::future对象的get()方法,以确保所有任务都已完成。
  • 记录结束时间,计算并输出所用时间。
http://www.lryc.cn/news/59017.html

相关文章:

  • Redis常用五种数据类型
  • C++ Primer第五版_第十一章习题答案(1~10)
  • GEE:使用LandTrendr进行森林变化检测详解
  • docker项目实施
  • springboot实现邮箱验证码功能
  • Java 进阶(5) Java IO流
  • “终于我从字节离职了...“一个年薪40W的测试工程师的自白...
  • 设计模式之策略模式(C++)
  • 从工厂普工到Python女程序员,聊聊这一路我是如何逆袭的?
  • 全国青少年信息素养大赛2023年python·选做题模拟二卷
  • 分布式事务Seata原理
  • 用ChatGPT怎么赚钱?普通人用这5个方法也能赚到生活费
  • ( “树” 之 DFS) 110. 平衡二叉树 ——【Leetcode每日一题】
  • nvm软件使用-同一个环境下控制多个不同node版本
  • 连续两个南航的研究生面试出了从来没出现过的问题,本科和研究生都是计算机专业的,竟然说static是不可更改的。
  • How to install nacos/nacos-server:v2.1.2-slim with docker
  • Rust社区引发舆论危机,问题到底出在哪儿?
  • C++算法恢复训练之归并排序
  • 使用Process Explorer和Clumsy工具定位软件高CPU占用问题
  • 为何巴菲特和马斯克站在了一起?
  • 企业数字化转型全是坑?这几篇数字化转型成功案例,减少70%损失
  • 13.Java面向对象----嵌套类
  • Redis数据迁移过程,使用jedis客户端发送命令,需要注意string和byte类型的命令,如果使用的转换字符编码不一致,会导致丢数据
  • 第六章 IA-32指令类型
  • 基于BenchmarkSQL的Oracle数据库tpcc性能测试
  • Dapr和Rainbond集成,实现云原生BaaS和模块化微服务开发
  • 全国青少年信息素养大赛2023年python·选做题模拟五卷
  • itop-3568开发板驱动学习笔记(18)tasklet 机制
  • 全国青少年电子信息智能创新大赛(复赛)python·模拟二卷
  • 对标ChatGPT的开源中文方案