当前位置: 首页 > news >正文

C++ 中实现 `Task::WhenAll` 和 `Task::WhenAny` 的两种方案

📚 C++ 中实现 Task::WhenAllTask::WhenAny 的两种方案

引用

  1. 拈朵微笑的花 想一番人世變換 到頭來輸贏又何妨
  2. 日與夜互消長 富與貴難久長 今早的容顏老於昨晚
  3. C++ 标准库异步编程示例(一)
  4. C++ TAP(基于任务的异步编程模式)

🚀 引言:异步编程的需求与挑战

在现代软件开发中,异步编程已成为提升应用性能的关键技术。C# 提供了优雅的 Task.WhenAllTask.WhenAny 机制来管理多个异步任务,但 C++ 标准库中缺乏直接等效功能。本文将深入探讨两种高效实现方案:

异步编程需求
WhenAll 实现
WhenAny 实现
简单轮询方案
高效条件变量方案
简单轮询方案
高效条件变量方案

🔄 方案1:基于轮询的简单实现

🛠️ when_all 实现(简单轮询)

#include <vector>
#include <future>template <typename T>
std::vector<T> when_all(std::vector<std::future<T>>& futures) {std::vector<T> results;for (auto& fut : futures) {results.push_back(fut.get());}return results;
}// void 特化版本
void when_all(std::vector<std::future<void>>& futures) {for (auto& fut : futures) {fut.get();}
}
原理说明
  • 顺序执行:循环遍历每个 future,调用 get() 方法阻塞等待结果
  • 结果收集:对于非 void 任务,结果存储在 vector 中返回
  • 优点:实现简单,代码直观
  • 缺点:顺序等待导致性能瓶颈

🛠️ when_any 实现(轮询方式)

#include <chrono>
#include <vector>
#include <future>template <typename T>
size_t when_any(std::vector<std::future<T>>& futures) {while (true) {for (size_t i = 0; i < futures.size(); ++i) {if (futures[i].wait_for(std::chrono::seconds(0)) == std::future_status::ready) {return i;}}std::this_thread::sleep_for(std::chrono::milliseconds(10));}
}
原理说明
  • 轮询检测:使用 wait_for(0) 非阻塞检查任务状态
  • 指数退避:每次检测后休眠减少 CPU 占用
  • 返回索引:返回第一个完成任务的索引
  • 缺点:CPU 占用高,响应延迟(最大10ms)
开始
遍历所有future
是否ready?
返回索引
休眠10ms

⚡ 方案2:基于条件变量的高效实现

🧩 WhenAll 类设计(高效等待所有任务)

#include <vector>
#include <future>
#include <mutex>
#include <condition_variable>
#include <thread>class WhenAll {
public:void add_future(std::future<void> fut) {std::lock_guard<std::mutex> lock(mtx);futures.push_back(std::move(fut));count++;}void wait() {std::unique_lock<std::mutex> lock(mtx);if (count == 0) return;for (auto& fut : futures) {std::thread([&, this] {fut.wait();std::lock_guard<std::mutex> lock(mtx);if (--count == 0) cv.notify_all();}).detach();}cv.wait(lock, [this] { return count == 0; });}private:std::vector<std::future<void>> futures;std::mutex mtx;std::condition_variable cv;int count = 0;
};
架构解析
WhenAll
- futures: vector>
- mtx: mutex
- cv: condition_variable
- count: int
+add_future(future fut)
+wait()
工作原理
  1. 添加任务:通过 add_future 添加异步任务
  2. 监控线程:为每个任务创建监控线程
  3. 条件等待:主线程等待条件变量 cv
  4. 完成通知:最后完成的任务触发 notify_all()
  5. 资源释放:监控线程自动分离(detach
性能优势
  • 零轮询:完全消除CPU空转
  • 即时响应:任务完成立即通知
  • 线程安全:互斥锁保护共享状态

🧩 WhenAny 类设计(高效等待任意任务)

#include <vector>
#include <future>
#include <mutex>
#include <condition_variable>template <typename T>
class WhenAny {
public:template <typename Func>void add_task(Func func) {std::lock_guard<std::mutex> lock(mtx);futures.push_back(std::async(std::launch::async, [this, func] {auto result = func();{std::lock_guard<std::mutex> lock(mtx);if (!completed) {completed = true;completed_index = futures.size() - 1;cv.notify_all();}}return result;}));}size_t wait() {std::unique_lock<std::mutex> lock(mtx);cv.wait(lock, [this] { return completed; });return completed_index;}std::future<T>& get_future(size_t index) {return futures[index];}private:std::vector<std::future<T>> futures;std::mutex mtx;std::condition_variable cv;bool completed = false;size_t completed_index = 0;
};
架构解析
WhenAny
- futures: vector>
- mtx: mutex
- cv: condition_variable
- completed: bool
- completed_index: size_t
+add_task(Func func)
+wait()
+get_future(size_t index)
工作流程
主线程任务线程WhenAny对象add_task(任务1)启动异步任务add_task(任务2)启动异步任务wait()任务完成返回完成索引主线程任务线程WhenAny对象
关键特性
  1. 泛型支持:模板化设计支持任意返回类型
  2. 一次性通知completed 标志确保只通知一次
  3. 结果获取get_future 方法获取已完成任务的结果
  4. 线程安全:互斥锁保护共享状态

🧪 使用示例与场景分析

基本使用示例

#include <iostream>
#include <chrono>
#include <thread>int main() {// 示例1: WhenAll 使用WhenAll wa;wa.add_future(std::async([] { std::this_thread::sleep_for(std::chrono::seconds(1));std::cout << "任务1完成\n";}));wa.add_future(std::async([] { std::this_thread::sleep_for(std::chrono::seconds(2));std::cout << "任务2完成\n"; }));std::cout << "等待所有任务...\n";wa.wait();std::cout << "所有任务完成!\n";// 示例2: WhenAny 使用WhenAny<int> wany;wany.add_task([] {std::this_thread::sleep_for(std::chrono::seconds(3));return 100;});wany.add_task([] {std::this_thread::sleep_for(std::chrono::seconds(1));return 200;});std::cout << "等待任意任务完成...\n";size_t index = wany.wait();auto& fut = wany.get_future(index);std::cout << "任务" << index << "最先完成,结果: " << fut.get() << "\n";return 0;
}

实际应用场景

  1. 微服务聚合:并行调用多个微服务,等待所有响应
  2. 竞态条件处理:多个数据源查询,使用第一个返回结果
  3. 资源加载:并行加载多个资源文件,等待全部完成
  4. 超时处理:多个备用服务调用,使用最先响应的服务

🚀 高级优化与扩展

性能优化技术

  1. 线程池集成:避免为每个任务创建新线程

    void wait(ThreadPool& pool) {std::unique_lock<std::mutex> lock(mtx);if (count == 0) return;for (auto& fut : futures) {pool.enqueue([&, this] {fut.wait();std::lock_guard<std::mutex> lock(mtx);if (--count == 0) cv.notify_all();});}cv.wait(lock, [this] { return count == 0; });
    }
    
  2. 共享future优化:避免多次get()调用

    void add_future(std::shared_future<void> shared_fut) {std::lock_guard<std::mutex> lock(mtx);shared_futures.push_back(shared_fut);count++;
    }
    
  3. 批量任务添加:减少锁竞争

    void add_futures(const std::vector<std::future<void>>& new_futures) {std::lock_guard<std::mutex> lock(mtx);futures.insert(futures.end(), new_futures.begin(), new_futures.end());count += new_futures.size();
    }
    

功能扩展

  1. 超时支持:添加 wait_forwait_until

    template <typename Rep, typename Period>
    bool wait_for(const std::chrono::duration<Rep, Period>& timeout) {std::unique_lock<std::mutex> lock(mtx);return cv.wait_for(lock, timeout, [this] { return count == 0; });
    }
    
  2. 混合类型支持:使用 std::variant

    class WhenAnyVariant {
    public:template <typename Func>void add_task(Func func) {using ResultType = decltype(func());std::lock_guard<std::mutex> lock(mtx);futures.push_back(std::async(std::launch::async, [=] {auto result = func();{std::lock_guard<std::mutex> lock(mtx);if (!completed) {completed = true;completed_index = futures.size() - 1;cv.notify_all();}}return std::variant<ResultType>(std::move(result));}));}// ... 其他成员
    private:std::vector<std::future<std::variant<int, double, std::string>>> futures;
    };
    
  3. 进度追踪:添加进度回调

    void wait(std::function<void(int)> progress_callback) {std::unique_lock<std::mutex> lock(mtx);if (count == 0) return;for (auto& fut : futures) {std::thread([&, this] {fut.wait();std::lock_guard<std::mutex> lock(mtx);progress_callback(++completed_count);if (completed_count == count) cv.notify_all();}).detach();}cv.wait(lock, [this] { return completed_count == count; });
    }
    

⚠️ 注意事项与最佳实践

异常处理

void add_future(std::future<void> fut) {std::lock_guard<std::mutex> lock(mtx);futures.push_back(std::async(std::launch::async, [fut = std::move(fut)]() mutable {try {fut.get();} catch (const std::exception& e) {std::cerr << "任务异常: " << e.what() << std::endl;}}));count++;
}

资源管理最佳实践

  1. 避免线程泄漏:使用RAII包装线程
  2. 预防死锁:锁粒度最小化
  3. 内存安全:使用智能指针管理共享数据
  4. 性能监控:添加任务执行时间统计

平台适配性

  1. 跨平台考虑:使用标准库确保可移植性
  2. 编译器支持:确保C++17及以上特性
  3. 异步模型:与平台特定API(如IOCP/epoll)集成

📊 性能对比分析

方案类型CPU占用响应延迟内存开销适用场景
简单轮询高 (持续10-100%)高 (10ms级)任务少、低频率
条件变量低 (<1%)低 (µs级)高并发、实时系统
线程池集成中 (可控)低 (µs级)大规模任务处理

在这里插入图片描述

🔮 未来发展与C++标准展望

C++23/26异步特性

  1. std::execution:标准执行器支持
  2. 协程增强:更简洁的异步代码编写
  3. 网络库集成:与标准网络库协同工作

社区解决方案

  1. Boost.Asio:提供 async_wait_all 等实用工具
  2. Folly库:Facebook的高性能异步工具集
  3. Qt Concurrent:跨平台异步框架

💎 总结

本文详细探讨了在C++中实现 Task.WhenAllTask.WhenAny 的两种核心方案:

  1. 简单轮询方案:适用于轻量级场景,实现简单但效率较低
  2. 条件变量方案:高性能实现,适用于生产环境
    • 零轮询设计减少CPU占用
    • 即时响应确保最佳性能
    • 扩展性强,支持超时、进度回调等高级功能
少量任务
大量任务
高实时性
可接受延迟
选择实现方案
任务数量
简单轮询方案
实时性要求
条件变量方案
线程池集成方案

最佳实践建议

  • 小型工具类使用简单轮询方案
  • 高性能服务器使用条件变量方案
  • 大规模并行处理集成线程池
  • 始终考虑异常安全和资源管理
http://www.lryc.cn/news/602003.html

相关文章:

  • Android启动时间优化大全
  • i节点学习
  • JavaScript核心概念全解析
  • Flutter中 Provider 的基础用法超详细讲解(二)之ChangeNotifierProvider
  • Vim 编辑器工作模式及操作指南
  • Spring AI 项目实战(二十一):Spring Boot + AI +DeepSeek驱动的智能题库系统(附完整源码)
  • zabbix-agent静默安装
  • @RefreshScope 核心原理深度解析:Spring Boot 的动态魔法
  • 抗辐照芯片在低轨卫星星座CAN总线通讯及供电系统的应用探讨
  • 第二阶段-第二章—8天Python从入门到精通【itheima】-138节(MySQL的综合案例)
  • 【程序员私房菜】python洋葱炒王中王火腿肠
  • 数据结构基础内容(第二篇:线性结构)
  • 【LeetCode刷题指南】--设计循环队列
  • 自由学习记录(74)
  • 【LeetCode 热题 100】51. N 皇后——回溯
  • c语言结构体字节对齐
  • EPOLLONESHOT 深度解析:Linux epoll 的单次触发机制
  • LeetCode 1577.数的平方等于两数乘积的方法数
  • 详解力扣高频SQL50题之180. 连续出现的数字【困难】
  • Spring MVC设计精粹:源码级架构解析与实践指南
  • 网络基础19:OSPF多区域实验
  • 俄罗斯方块游戏开发(面向对象编程)
  • Python-初学openCV——图像预处理(四)——滤波器
  • Redis6.0+安装教程(Linux)
  • vscode找不到python解释器的解决方案
  • VINS外参精确自标定飘的问题
  • Triton编译
  • C++ 多线程 std::thread::joinable
  • 3.Linuxvim编辑器及快捷键的使用
  • 【奔跑吧!Linux 内核(第二版)】第4章:内核编译和调试