当前位置: 首页 > news >正文

[muduo] ThreadPool | TcpClient | 异步任务 | 通信测试

第九章:线程池(ThreadPool)

在第八章《TcpServer》中,我们了解muduo::net::TcpServer通过EventLoop线程池处理入站连接

这些EventLoop线程主要负责网络I/O:套接字读写和定时器处理,由PollerChannel协调,保持高效响应。

但当EventLoop线程中的MessageCallback函数需要执行耗时操作时会发生什么?

例如:

  • 复杂计算
  • 数据库访问(可能阻塞等待结果)
  • 大文件磁盘写入(可能阻塞)
  • 同步网络请求其他服务

若直接在EventLoop线程执行这些操作,线程将被阻塞,导致该线程管理的所有连接无法处理其他事件,服务器响应能力下降甚至引发超时。

这正是muduo::ThreadPool的价值所在。

线程池解决的问题

核心目标是可能阻塞或耗时的计算任务从关键EventLoop线程卸载

EventLoop线程必须保持空闲以快速处理网络I/O。

ThreadPool提供专用线程组执行任意任务。当EventLoop回调中遇到耗时任务时,将其提交给线程池,由池中的工作线程异步处理,使EventLoop线程快速返回继续处理网络事件。

类比场景:

  • EventLoop线程如同高效的前台接待员,仅处理简短交互(如引导访客、接收邮件)。
  • ThreadPool则是后勤团队,处理复杂请求(如审核详细申请),前台人员可立即返回岗位继续接待。
  • (类似于redismysql实现分类,还有上一篇文章当中的reactor分离,也是一个前台+多个处理人员)

ThreadPool:后台任务执行引擎

muduo::ThreadPool类管理一组待命的工作线程,关键设计要素包括:

  1. 线程集合:使用muduo::Thread类(第二章:线程)创建指定数量的工作线程
  2. 任务队列:线程安全的双端队列(std::deque),通过互斥锁(MutexLock)和条件变量(Condition)实现同步
  3. 工作线程逻辑:每个线程循环执行:取任务→执行→等待新任务
  4. 任务提交(run():通过run()方法将函数对象加入队列
  5. 阻塞等待(take():队列空时,工作线程通过条件变量(notEmpty_)阻塞休眠
  6. 唤醒机制:添加新任务时通过notEmpty_.notify()唤醒等待线程
  7. 有界队列(可选):可设置队列最大容量(maxQueueSize_),满时run()可能阻塞(通过notFull_),防止任务积压
  8. 启停控制start()启动线程池,stop()停止并等待线程退出

使用ThreadPool执行异步任务

通过创建实例、启动线程池并调用run()提交任务即可使用:

#include "muduo/base/ThreadPool.h"
#include "muduo/base/CurrentThread.h" 
#include "muduo/base/CountDownLatch.h" 
#include <cstdio>
#include <string>
#include <unistd.h>
#include <functional>// 工作线程中运行的函数
void print(const std::string& msg) {printf("线程池任务: %s, 进程ID: %d, 线程ID: %d\n",msg.c_str(), getpid(), muduo::CurrentThread::tid());sleep(1); // 模拟耗时操作
}int main() {printf("主线程启动. 进程ID: %d, 线程ID: %d\n", getpid(), muduo::CurrentThread::tid());muduo::ThreadPool pool("MyWorkerPool"); // 1. 创建线程池pool.setMaxQueueSize(5);               // 设置队列最大容量为5pool.start(3);                          // 2. 启动3个工作线程sleep(1); // 等待线程启动printf("线程池已启动.\n");// 3. 提交10个任务for (int i = 0; i < 10; ++i) {char task_msg[32];snprintf(task_msg, sizeof task_msg, "任务%d", i);pool.run(std::bind(print, std::string(task_msg))); // 提交任务}printf("任务提交完成.\n");sleep(10); // 等待任务执行pool.stop(); // 4. 停止线程池printf("线程池已停止.\n");return 0;
}

(需包含Muduo头文件并链接库)

使用muduo网络库中的线程池组件处理并发任务的基本流程,包含四个关键操作:
创建线程池、启动线程、提交任务、停止线程池。

原理

线程池管理一组工作线程,避免频繁创建销毁线程的开销。

任务提交后进入队列,空闲线程自动获取任务执行。

muduo库的ThreadPool采用生产者-消费者模型,主线程提交任务(生产者),工作线程处理任务(消费者)。

解析

初始化阶段
  • muduo::ThreadPool pool("MyWorkerPool")创建名为"MyWorkerPool"的线程池实例。
    pool.setMaxQueueSize(5)限制任务队列最多容纳5个未处理任务,防止内存过度消耗。

    pool.start(3)启动3个常驻工作线程,这些线程会持续从任务队列获取任务执行。启动后立即进入等待任务状态。

任务提交阶段
  • 循环提交10个打印任务,每个任务绑定print函数和字符串参数。

    当任务提交速度超过处理速度时,后提交的任务会因队列满(max=5)而阻塞,直到有线程处理完队列任务。

    print函数显示任务信息、进程PID和线程TID,通过sleep(1)模拟耗时操作。注意线程ID由muduo::CurrentThread::tid()获取,这是muduo库提供的跨平台线程标识。

终止阶段

sleep(10)确保所有任务完成,pool.stop()安全停止线程池:停止接受新任务,等待正在执行的任务完成,最后回收线程资源。

执行效果说明

程序输出将显示:

  1. 主线程信息
  2. 3个工作线程循环执行10个任务(每个任务间隔1秒)
  3. 由于只有3个线程,10个任务需约4秒完成(3并发+队列等待)
  4. 最终线程池停止信息

该模式适用于需要批量处理短生命周期任务的场景,如网络请求处理、日志写入等I/O密集型操作。

运行
理解:货架上有五个空位,三个三个的往上放货物
在这里插入图片描述

执行流程:

  1. 主线程创建线程池并启动3个工作线程
  2. 工作线程启动后立即尝试从空队列取任务,进入阻塞等待
  3. 主循环提交10个任务,工作线程被唤醒并并发执行
  4. 输出显示不同线程ID执行任务,主线程快速返回
  5. 任务队列满时run()可能阻塞,直到有空闲
  6. stop()通知所有线程退出,等待清理

线程池内部机制

核心交互流程:
在这里插入图片描述
加锁到任务队列中,再加锁的执行任务

关键源码(简化版):

// ThreadPool.h
class ThreadPool : noncopyable {
public:using Task = std::function<void ()>; // 任务类型explicit ThreadPool(const string& name = "ThreadPool");void setMaxQueueSize(int maxSize);    // 设置队列容量void start(int numThreads);           // 启动线程void stop();                          // 停止线程void run(Task f);                     // 提交任务private:bool isFull() const;                  // 队列是否满Task take();                          // 获取任务void runInThread();                   // 工作线程主循环mutable MutexLock mutex_;             // 保护队列和状态Condition notEmpty_;                  // 非空条件变量Condition notFull_;                   // 非满条件变量std::deque<Task> queue_;              // 任务队列size_t maxQueueSize_ = 0;             // 队列最大容量bool running_ = false;                // 运行状态
};// ThreadPool.cc
void ThreadPool::start(int numThreads) {running_ = true;// 创建numThreads个工作线程threads_.reserve(numThreads);for (int i = 0; i < numThreads; ++i) {threads_.emplace_back(new Thread(std::bind(&ThreadPool::runInThread, this)));threads_.back()->start();}
}void ThreadPool::run(Task task) {if (threads_.empty()) {task(); // 无线程时直接执行} else {MutexLockGuard lock(mutex_);while (isFull() && running_) {notFull_.wait(); // 队列满时等待}queue_.push_back(std::move(task));notEmpty_.notify(); // 通知工作线程}
}ThreadPool::Task ThreadPool::take() {MutexLockGuard lock(mutex_);while (queue_.empty() && running_) {notEmpty_.wait(); // 队列空时等待}Task task;if (!queue_.empty()) {task = queue_.front();queue_.pop_front();if (maxQueueSize_ > 0) {notFull_.notify(); // 通知任务提交者}}return task;
}void ThreadPool::runInThread() {while (running_) {Task task(take()); // 循环取任务if (task) task();  // 执行任务}
}

⭕核心功能

任务队列管理
ThreadPool 使用 std::deque<Task> 作为任务队列,通过 maxQueueSize_ 控制队列容量。

任务类型为 std::function<void()>,表示无参数无返回值的可调用对象

线程工作流程

  • 启动时通过 start() 创建多个工作线程,每个线程执行 runInThread() 循环。
  • runInThread() 不断调用 take() 获取任务并执行。若队列为空,线程在 notEmpty_ 条件变量上等待。
  • take() 从队列头部取出任务,若队列从满变为非满,通知 notFull_ 唤醒可能阻塞的任务提交者。

任务提交逻辑

  • run() 提交任务时,若队列已满且线程池在运行,提交线程在 notFull_ 上等待。
  • 任务入队后通过 notEmpty_.notify() 唤醒一个工作线程。
  • 若线程池未启动或无工作线程,任务直接在当前线程执行。

同步机制

  • MutexLock 保护队列和状态变量(running_)。
  • 两个 Condition 变量(notEmpty_notFull_)实现生产者-消费者模型,避免忙等待。

代码设计简洁高效,适合处理大量短期任务,典型应用如网络服务器的请求处理。

总结

功能描述作用/优势
工作线程基于muduo::Thread的线程集合提供独立于I/O线程的任务执行环境
任务队列线程安全的双端队列存储std::function<void ()类型任务解耦任务提交与执行,支持异步处理
run()方法提交任务到队列,队列满时可能阻塞用户接口,确保任务安全加入队列
take()方法工作线程从队列取任务,队列空时阻塞核心任务获取机制,保证工作线程高效等待
runInThread()工作线程主循环,执行take()和任务定义工作线程行为,维持任务处理循环
互斥锁保护队列和运行状态确保多线程访问共享数据的正确性
条件变量notEmpty_notFull_协调线程间通信实现高效的任务调度与资源管理
有界队列通过maxQueueSize_限制队列容量防止任务堆积导致内存溢出,提供背压机制

结论

muduo::ThreadPool是Muduo库中处理异步任务的核心组件,通过线程安全的任务队列和工作线程池,有效隔离耗时操作与网络I/O处理。其价值体现在:

  1. 资源隔离:保护EventLoop线程免受阻塞,确保网络高吞吐
  2. 弹性扩展:通过线程数配置适应不同计算负载
  3. 流量控制:有界队列防止资源耗尽
  4. 简化并发封装底层线程同步细节,提供简洁API

下一章我们将探讨TcpClient类,了解如何主动连接远程服务并管理TCP连接

第十章:TcpClient


第10章:TcpClient

  • 在上一章第9章:线程池中,我们讨论了muduo::ThreadPool如何通过卸载阻塞任务来保持EventLoop线程的响应能力。

  • 在此之前,我们探讨了TcpServer(第8章:TcpServer),它允许使用EventLoop线程池构建监听并管理多个传入TCP连接的服务器。

但如果想编写主动连接到远程服务器的程序(而不是等待连接)呢?

这就是网络客户端的作用。

  • 需要一种方法来连接到特定地址和端口,处理连接建立的异步性,管理连接后的数据交换,并可能需要处理连接失败和重试。

  • 手动实现这些需要创建套接字、设置为非阻塞模式、发起connect(2)系统调用(在非阻塞模式下会立即返回但稍后完成连接)、使用EventLoopChannel监视套接字的可写性(表示连接完成)或错误,然后管理已连接的套接字

这非常复杂,尤其是在需要自动重连等功能时。

这正是muduo::net::TcpClient要解决的问题。


TcpClient 解决了什么问题?

muduo::net::TcpClient为TCP连接的客户端提供了高级抽象。它处理以下完整流程:

  1. 发起连接:创建非阻塞套接字并调用connect()
  2. 监控连接状态:使用EventLoop等待连接成功建立或失败
  3. 管理连接:连接成功后,创建并管理单个TcpConnection对象(第6章:TcpConnection)来处理数据传输(发送和接收),使用其EventLoopBuffer(第7章:Buffer)
  4. 处理断开连接:在连接关闭时通知
  5. 自动重试(可选):如果启用,在初始连接失败或连接丢失时自动尝试重连

TcpClient想象成一位专门代表,其职责是呼叫另一个办公室(TcpServer)并建立单条通信线路。

  • 它处理拨号、等待对方接听以及通话接通后的线路管理。

  • 如果线路忙或通话中断,可以配置自动重拨。

TcpClient:连接发起者

muduo::net::TcpClient是在Muduo中创建TCP客户端应用程序的主要类。

以下是muduo::net::TcpClient的关键概念:

  1. 建立单一连接:与TcpServer不同,TcpClient对象设计用于建立和管理到一个特定远程服务器地址的连接,不处理多个传入连接
  2. 单个EventLoop拥有TcpClient对象必须在单个EventLoop线程中存在和使用,通常是客户端的主循环。所有回调和内部操作都在此循环线程中发生
  3. 拥有Connector:内部使用Connector对象,该组件负责异步connect()系统调用、监视套接字通道的连接完成/失败,并实现重试逻辑
  4. 拥有TcpConnection(连接时):当Connector成功建立连接后,TcpClient会为新套接字创建TcpConnection对象,所有数据I/O都通过该对象进行
  5. 使用相同回调:与TcpServer类似,通过熟悉的ConnectionCallbackMessageCallbackWriteCompleteCallback暴露连接生命周期和数据事件
  6. 连接/断开/停止控制:提供connect()disconnect()stop()方法控制客户端状态
  7. 自动重试enableRetry()方法允许在连接失败或丢失时启用自动重连

使用TcpClient:连接到回声服务器

让我们编写一个连接到第8章:TcpServer中构建的回声服务器的简单客户端。

该客户端将连接、发送消息、接收回声并断开连接。

需要:

  • 客户端的EventLoop
  • 服务器地址的InetAddress
  • 回调函数
#include "muduo/net/TcpClient.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/InetAddress.h"
#include "muduo/base/Logging.h"  // 用于LOG_INFO
#include <cstdio>  // 用于printf
#include <string>  // 用于std::string// 我们*单条连接*的TcpConnectionPtr(shared_ptr指向TcpConnection)
muduo::net::TcpConnectionPtr clientConnection;// 示例连接回调
void onConnection(const muduo::net::TcpConnectionPtr& conn) {LOG_INFO << "客户端连接 " << (conn->connected() ? "建立" : "断开");if (conn->connected()) {// 连接建立时存储连接指针clientConnection = conn;LOG_INFO << "已连接到 " << conn->peerAddress().toIpPort();// 发送初始消息std::string message = "你好,来自客户端!\n";printf("发送: %s", message.c_str());conn->send(message);} else {// 连接断开,清空存储的指针clientConnection.reset();LOG_INFO << "已断开与 " << conn->peerAddress().toIpPort() << " 的连接";// 实际应用中可在此处调用loop.quit()或处理重连conn->getLoop()->quit();  // 简单示例:断开时退出循环}
}// 示例消息回调(处理回声数据)
void onMessage(const muduo::net::TcpConnectionPtr& conn,muduo::net::Buffer* buf,muduo::Timestamp receiveTime) {LOG_INFO << "从 " << conn->name() << " 接收到 " << buf->readableBytes() << " 字节";// 将回声数据转为字符串std::string msg = buf->retrieveAllAsString();printf("收到回声消息: %s", msg.c_str());// 简单示例:收到回声后发送下一条消息// 实际协议中应处理消息并决定是否响应// std::string nextMessage = "另一条消息...\n";// conn->send(nextMessage); // 按需持续发送
}int main() {LOG_INFO << "main(): 进程ID = " << getpid();muduo::net::EventLoop loop;  // 客户端的事件循环// 服务器地址:localhost (127.0.0.1),端口9988muduo::net::InetAddress serverAddr("127.0.0.1", 9988);// 创建TcpClient实例muduo::net::TcpClient client(&loop, serverAddr, "EchoClient");// --- 配置客户端 ---// 设置回调client.setConnectionCallback(onConnection);client.setMessageCallback(onMessage);// 可选:启用连接失败/断开自动重连// client.enableRetry();// LOG_INFO << "已启用重连";// --- 启动连接过程 ---printf("启动客户端,正在连接到 %s...\n", serverAddr.toIpPort().c_str());client.connect();// --- 运行客户端循环 ---printf("运行客户端循环...\n");loop.loop();  // 阻塞直到调用loop.quit()// loop.loop()在连接关闭且onConnection调用loop->quit()后退出printf("客户端已停止。main()退出。\n");return 0;
}

(需要包含必要的Muduo头文件并链接库)

这段代码是使用muduo网络库(一个高性能C++网络库)实现的TCP客户端程序,负责连接到指定服务器,发送初始消息 并接收服务器返回的数据(回声)。

关键组件说明

必要头文件

  • TcpClient.h:客户端核心类
  • EventLoop.h:事件循环处理类
  • InetAddress.h:网络地址封装类
  • Logging.h:日志输出工具

主要执行流程

全局连接对象

muduo::net::TcpConnectionPtr clientConnection; // 保存当前有效连接

连接状态回调

void onConnection(const muduo::net::TcpConnectionPtr& conn) {// 连接建立时:// 1. 存储连接对象// 2. 发送初始问候消息// 连接断开时:// 1. 清除连接对象// 2. 退出事件循环
}

数据接收回调

void onMessage(/*参数略*/) {// 1. 读取接收缓冲区数据// 2. 打印接收内容(示例为回声服务)// 3. 可扩展发送后续消息
}

主程序逻辑

int main() {// 1. 创建事件循环对象// 2. 指定服务器地址(127.0.0.1:9988)// 3. 创建客户端实例// 4. 绑定回调函数// 5. 发起连接请求// 6. 运行事件循环(持续处理网络事件)
}

运行

  1. 客户端启动后自动连接服务器
  2. 连接成功后立即发送"你好,来自客户端!"
  3. 接收服务器返回的相同消息(回声)
  4. 断开连接时自动退出程序

注:实际使用时需配合对应的回声服务器运行,示例默认使用本地127.0.0.1:9988

在这里插入图片描述

main()关键部分解析:

  • muduo::net::EventLoop loop;:创建客户端的单一事件循环
  • muduo::net::InetAddress serverAddr("127.0.0.1", 9988);:指定服务器地址
  • muduo::net::TcpClient client(...);:创建关联事件循环和服务器地址的客户端实例
  • client.setConnectionCallback(...);:设置连接状态变更回调
  • client.connect();:启动异步连接过程
  • loop.loop();:启动事件循环

TcpClient内部机制:Connector与状态管理

TcpClient依赖内部Connector对象管理连接建立阶段和重试,同时管理状态标志和用于保存连接的shared_ptr

调用client.connect()时的流程:

在这里插入图片描述


核心源码解析

TcpClient.h关键成员(详见附带的代码):

class TcpClient : noncopyable {public:// 构造函数TcpClient(EventLoop* loop, const InetAddress& serverAddr, const string& nameArg);~TcpClient();void connect();    // 启动连接void disconnect(); // 优雅断开void stop();       // 停止连接和重试// 设置用户回调void setConnectionCallback(ConnectionCallback cb);void setMessageCallback(MessageCallback cb);private:EventLoop* loop_;ConnectorPtr connector_;  // 连接处理器TcpConnectionPtr connection_; // 当前连接// ... 其他成员
};

Connector类(详见附带的Connector.h/cc)关键方法:

  • start():由TcpClient::connect()调用,加入事件循环队列
  • handleWrite():处理连接完成事件
  • retry():实现带退避策略的重连机制

总结

功能描述优势
连接发起器启动与远程服务器的连接过程提供高级连接建立抽象
EventLoop绑定完全在单个事件循环线程内运行简化并发控制
Connector组件处理异步connect和重试逻辑封装复杂的状态管理
单一连接管理通过TcpConnection管理活动连接简化客户端连接管理逻辑
标准回调接口使用与TcpServer相同的回调机制统一编程模型
自动重试机制支持可配置的退避重试策略提升客户端容错能力

结论

muduo::net::TcpClient是构建Muduo客户端应用的核心抽象。

  • 通过内部Connector封装异步连接建立的复杂性,并通过标准TcpConnection对象管理通信。

  • 通过回调机制定义连接生命周期行为,配合可选的重试功能,可轻松构建高可靠的网络客户端。

  • 单线程设计模型与TcpServer形成完美互补,共同构成Muduo网络库的核心架构。

完结撒花~

test code: https://github.com/lvy010/Common-C-_Lib/tree/main/test

http://www.lryc.cn/news/573481.html

相关文章:

  • 【单调栈】-----【Largest Rectangle in a Histogram】
  • NuttX Socket 源码学习
  • C++ 第一阶段项目一:实现简易计算器
  • MCPServer编程与CLINE配置调用MCP
  • Taro 状态管理全面指南:从本地状态到全局方案
  • 人工智能学习57-TF训练
  • 逆向入门(16)程序逆向篇-Cabeca
  • 成长笔记——多串口发送与接收
  • Python 数据分析与可视化 Day 3 - Pandas 数据筛选与排序操作
  • springboot垃圾分类网站
  • 关于 Kyber:抗量子密码算法 Kyber 详解
  • 【软考高级系统架构论文】论多源数据集成及应用
  • 组件之间的双向绑定:v-model
  • GitHub OAuth 认证示例
  • 闲庭信步使用SV进行图像处理系列教程介绍
  • 2025年- H83-Lc191--139.单词拆分(动态规划)--Java版
  • 吴恩达:从斯坦福到 Coursera,他的深度学习布道之路
  • C++基础练习-二维数组
  • C++ 文件读写
  • GPT-1 与 BERT 架构
  • 开源项目分析:EDoRA | 了解如何基于peft实现EDoRA方法
  • 【软考高级系统架构论文】论无服务器架构及其应用
  • 博图SCL语言GOTO语句深度解析:精准跳转
  • 深入解析ID3算法:信息熵驱动的决策树构建基石
  • GO语言---数组
  • 基于Spring Boot瀚森健身房会员管理系统设计与实现【源码+文档】
  • 作为测试人员,平时用什么大模型?怎么用?
  • 《深入解析:如何通过CSS集成WebGPU实现高级图形效果》
  • 【软考高级系统架构论文】论企业应用系统的数据持久层架构设计
  • 【FineDance】舞蹈多样性的得来