C++ 中实现 `Task::WhenAll` 和 `Task::WhenAny` 的两种方案
📚 C++ 中实现 Task::WhenAll
和 Task::WhenAny
的两种方案
引用:
拈朵微笑的花
想一番人世變換
到頭來輸贏又何妨
日與夜互消長
富與貴難久長
今早的容顏老於昨晚
- C++ 标准库异步编程示例(一)
- C++ TAP(基于任务的异步编程模式)
🚀 引言:异步编程的需求与挑战
在现代软件开发中,异步编程已成为提升应用性能的关键技术。C# 提供了优雅的 Task.WhenAll
和 Task.WhenAny
机制来管理多个异步任务,但 C++ 标准库中缺乏直接等效功能。本文将深入探讨两种高效实现方案:
🔄 方案1:基于轮询的简单实现
🛠️ when_all
实现(简单轮询)
#include <vector>
#include <future>template <typename T>
std::vector<T> when_all(std::vector<std::future<T>>& futures) {std::vector<T> results;for (auto& fut : futures) {results.push_back(fut.get());}return results;
}// void 特化版本
void when_all(std::vector<std::future<void>>& futures) {for (auto& fut : futures) {fut.get();}
}
原理说明
- 顺序执行:循环遍历每个 future,调用
get()
方法阻塞等待结果 - 结果收集:对于非 void 任务,结果存储在 vector 中返回
- 优点:实现简单,代码直观
- 缺点:顺序等待导致性能瓶颈
🛠️ when_any
实现(轮询方式)
#include <chrono>
#include <vector>
#include <future>template <typename T>
size_t when_any(std::vector<std::future<T>>& futures) {while (true) {for (size_t i = 0; i < futures.size(); ++i) {if (futures[i].wait_for(std::chrono::seconds(0)) == std::future_status::ready) {return i;}}std::this_thread::sleep_for(std::chrono::milliseconds(10));}
}
原理说明
- 轮询检测:使用
wait_for(0)
非阻塞检查任务状态 - 指数退避:每次检测后休眠减少 CPU 占用
- 返回索引:返回第一个完成任务的索引
- 缺点:CPU 占用高,响应延迟(最大10ms)
⚡ 方案2:基于条件变量的高效实现
🧩 WhenAll
类设计(高效等待所有任务)
#include <vector>
#include <future>
#include <mutex>
#include <condition_variable>
#include <thread>class WhenAll {
public:void add_future(std::future<void> fut) {std::lock_guard<std::mutex> lock(mtx);futures.push_back(std::move(fut));count++;}void wait() {std::unique_lock<std::mutex> lock(mtx);if (count == 0) return;for (auto& fut : futures) {std::thread([&, this] {fut.wait();std::lock_guard<std::mutex> lock(mtx);if (--count == 0) cv.notify_all();}).detach();}cv.wait(lock, [this] { return count == 0; });}private:std::vector<std::future<void>> futures;std::mutex mtx;std::condition_variable cv;int count = 0;
};
架构解析
工作原理
- 添加任务:通过
add_future
添加异步任务 - 监控线程:为每个任务创建监控线程
- 条件等待:主线程等待条件变量
cv
- 完成通知:最后完成的任务触发
notify_all()
- 资源释放:监控线程自动分离(
detach
)
性能优势
- 零轮询:完全消除CPU空转
- 即时响应:任务完成立即通知
- 线程安全:互斥锁保护共享状态
🧩 WhenAny
类设计(高效等待任意任务)
#include <vector>
#include <future>
#include <mutex>
#include <condition_variable>template <typename T>
class WhenAny {
public:template <typename Func>void add_task(Func func) {std::lock_guard<std::mutex> lock(mtx);futures.push_back(std::async(std::launch::async, [this, func] {auto result = func();{std::lock_guard<std::mutex> lock(mtx);if (!completed) {completed = true;completed_index = futures.size() - 1;cv.notify_all();}}return result;}));}size_t wait() {std::unique_lock<std::mutex> lock(mtx);cv.wait(lock, [this] { return completed; });return completed_index;}std::future<T>& get_future(size_t index) {return futures[index];}private:std::vector<std::future<T>> futures;std::mutex mtx;std::condition_variable cv;bool completed = false;size_t completed_index = 0;
};
架构解析
工作流程
关键特性
- 泛型支持:模板化设计支持任意返回类型
- 一次性通知:
completed
标志确保只通知一次 - 结果获取:
get_future
方法获取已完成任务的结果 - 线程安全:互斥锁保护共享状态
🧪 使用示例与场景分析
基本使用示例
#include <iostream>
#include <chrono>
#include <thread>int main() {// 示例1: WhenAll 使用WhenAll wa;wa.add_future(std::async([] { std::this_thread::sleep_for(std::chrono::seconds(1));std::cout << "任务1完成\n";}));wa.add_future(std::async([] { std::this_thread::sleep_for(std::chrono::seconds(2));std::cout << "任务2完成\n"; }));std::cout << "等待所有任务...\n";wa.wait();std::cout << "所有任务完成!\n";// 示例2: WhenAny 使用WhenAny<int> wany;wany.add_task([] {std::this_thread::sleep_for(std::chrono::seconds(3));return 100;});wany.add_task([] {std::this_thread::sleep_for(std::chrono::seconds(1));return 200;});std::cout << "等待任意任务完成...\n";size_t index = wany.wait();auto& fut = wany.get_future(index);std::cout << "任务" << index << "最先完成,结果: " << fut.get() << "\n";return 0;
}
实际应用场景
- 微服务聚合:并行调用多个微服务,等待所有响应
- 竞态条件处理:多个数据源查询,使用第一个返回结果
- 资源加载:并行加载多个资源文件,等待全部完成
- 超时处理:多个备用服务调用,使用最先响应的服务
🚀 高级优化与扩展
性能优化技术
-
线程池集成:避免为每个任务创建新线程
void wait(ThreadPool& pool) {std::unique_lock<std::mutex> lock(mtx);if (count == 0) return;for (auto& fut : futures) {pool.enqueue([&, this] {fut.wait();std::lock_guard<std::mutex> lock(mtx);if (--count == 0) cv.notify_all();});}cv.wait(lock, [this] { return count == 0; }); }
-
共享future优化:避免多次get()调用
void add_future(std::shared_future<void> shared_fut) {std::lock_guard<std::mutex> lock(mtx);shared_futures.push_back(shared_fut);count++; }
-
批量任务添加:减少锁竞争
void add_futures(const std::vector<std::future<void>>& new_futures) {std::lock_guard<std::mutex> lock(mtx);futures.insert(futures.end(), new_futures.begin(), new_futures.end());count += new_futures.size(); }
功能扩展
-
超时支持:添加
wait_for
和wait_until
template <typename Rep, typename Period> bool wait_for(const std::chrono::duration<Rep, Period>& timeout) {std::unique_lock<std::mutex> lock(mtx);return cv.wait_for(lock, timeout, [this] { return count == 0; }); }
-
混合类型支持:使用
std::variant
class WhenAnyVariant { public:template <typename Func>void add_task(Func func) {using ResultType = decltype(func());std::lock_guard<std::mutex> lock(mtx);futures.push_back(std::async(std::launch::async, [=] {auto result = func();{std::lock_guard<std::mutex> lock(mtx);if (!completed) {completed = true;completed_index = futures.size() - 1;cv.notify_all();}}return std::variant<ResultType>(std::move(result));}));}// ... 其他成员 private:std::vector<std::future<std::variant<int, double, std::string>>> futures; };
-
进度追踪:添加进度回调
void wait(std::function<void(int)> progress_callback) {std::unique_lock<std::mutex> lock(mtx);if (count == 0) return;for (auto& fut : futures) {std::thread([&, this] {fut.wait();std::lock_guard<std::mutex> lock(mtx);progress_callback(++completed_count);if (completed_count == count) cv.notify_all();}).detach();}cv.wait(lock, [this] { return completed_count == count; }); }
⚠️ 注意事项与最佳实践
异常处理
void add_future(std::future<void> fut) {std::lock_guard<std::mutex> lock(mtx);futures.push_back(std::async(std::launch::async, [fut = std::move(fut)]() mutable {try {fut.get();} catch (const std::exception& e) {std::cerr << "任务异常: " << e.what() << std::endl;}}));count++;
}
资源管理最佳实践
- 避免线程泄漏:使用RAII包装线程
- 预防死锁:锁粒度最小化
- 内存安全:使用智能指针管理共享数据
- 性能监控:添加任务执行时间统计
平台适配性
- 跨平台考虑:使用标准库确保可移植性
- 编译器支持:确保C++17及以上特性
- 异步模型:与平台特定API(如IOCP/epoll)集成
📊 性能对比分析
方案类型 | CPU占用 | 响应延迟 | 内存开销 | 适用场景 |
---|---|---|---|---|
简单轮询 | 高 (持续10-100%) | 高 (10ms级) | 低 | 任务少、低频率 |
条件变量 | 低 (<1%) | 低 (µs级) | 中 | 高并发、实时系统 |
线程池集成 | 中 (可控) | 低 (µs级) | 高 | 大规模任务处理 |
🔮 未来发展与C++标准展望
C++23/26异步特性
- std::execution:标准执行器支持
- 协程增强:更简洁的异步代码编写
- 网络库集成:与标准网络库协同工作
社区解决方案
- Boost.Asio:提供
async_wait_all
等实用工具 - Folly库:Facebook的高性能异步工具集
- Qt Concurrent:跨平台异步框架
💎 总结
本文详细探讨了在C++中实现 Task.WhenAll
和 Task.WhenAny
的两种核心方案:
- 简单轮询方案:适用于轻量级场景,实现简单但效率较低
- 条件变量方案:高性能实现,适用于生产环境
- 零轮询设计减少CPU占用
- 即时响应确保最佳性能
- 扩展性强,支持超时、进度回调等高级功能
最佳实践建议:
- 小型工具类使用简单轮询方案
- 高性能服务器使用条件变量方案
- 大规模并行处理集成线程池
- 始终考虑异常安全和资源管理