C++ 网络编程(14) asio多线程模型IOThreadPool
🎯 C++ Boost.Asio 多线程模型快速上手指南:从单线程到多 io_context
📅 更新时间:2025年7月3日
🏷️ 标签:C++ | Boost.Asio | 多线程 | 网络编程 | io_context
文章目录
- 前言
- 一、结果模式图
- 一、IOThreadPool.h实现
- 1.代码部分
- 2.代码详解
- 1.详解一
- 2.详解二
- 3.详解三
- 4.详解四
- 二、IOThreadPool.cpp实现
- 1.代码部分
- 2.代码思路解析
- 1.构造函数
- 2.Stop()
- 三、全局思路
- 四、安全隐患
- 五、利用strand改进
- 1.原理
- 2.模式图
- 3.调用strand
- 六、主线程
- 七、测试与总结
前言
前文我们学习了一种asio
的多线程模式,根据CPU
的核数,创建多了线程与多个上下文io_context
,然后每个线程使用一个io_context
进行调用
今天我们学习第二种asio
多线程的模式,使用同一个上下文io_context
然后在多个线程中被调用
一、结果模式图
一、IOThreadPool.h实现
1.代码部分
#pragma once
#include"Singleton.h"
#include<boost/asio.hpp>class AsioThreadPool:public Singleton<AsioThreadPool>
{
public:friend class Singleton<AsioThreadPool>;~AsioThreadPool() {};AsioThreadPool& operator=(const AsioThreadPool&) = delete;AsioThreadPool(const AsioThreadPool&) = delete;boost::asio::io_context& GetIOService();//返回io_context;void Stop();private:AsioThreadPool(int threadNum = std::thread::hardware_concurrency());boost::asio::io_context _service;std::unique_ptr<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>> work_guard;std::vector<std::thread> _threads;};
2.代码详解
1.详解一
我们还是用单例模式,所以这边直接继承我们的单例模板即可
class AsioThreadPool:public Singleton<AsioThreadPool>
2.详解二
我们既然使用的是单例模式,所以我们把拷贝构造和赋值都给禁用掉
AsioThreadPool& operator=(const AsioThreadPool&) = delete;
AsioThreadPool(const AsioThreadPool&) = delete;
3.详解三
boost::asio::io_context& GetIOService();//返回io_context;
这个函数是用来返回我们用的上下文io_context
4.详解四
std::unique_ptr<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>>
work_guard;
这个work_guard
是用来绑定上下文io_context
防止在没有任务的时候,io_context提前退出
二、IOThreadPool.cpp实现
1.代码部分
#include "AsioThreadPool.h"
#include<boost/asio.hpp>
#include"Singleton.h"AsioThreadPool::AsioThreadPool(int threadNum):work_guard(new boost::asio::executor_work_guard<boost::asio::io_context::executor_type>(_service.get_executor()))
{for (int i = 0; i < threadNum;i++)//插入线程{_threads.emplace_back([this](){_service.run();});}
}boost::asio::io_context& AsioThreadPool::GetIOService()
{return _service;
}void AsioThreadPool::Stop()
{work_guard.reset();for (auto& t : _threads){t.join();}}
2.代码思路解析
1.构造函数
构造函数中我们要初始化一个work_guard
去绑定上下文,防止其提前释放
work_guard(new boost::asio::executor_work_guard<boost::asio::io_context::executor_type>(_service.get_executor()))
然后我们要创建多个线程,并且在每个线程中都启动上下文
for (int i = 0; i < threadNum;i++)//插入线程{_threads.emplace_back([this](){_service.run();});}
2.Stop()
我们在停止函数首先要将绑定到上下文的work_guard
解除
然后对于每个线程我们要等待所有线程走完
三、全局思路
构造函数中实现了一个线程池,线程池里每个线程都会运行_service.run
函数,_service.run
函数内部就是从iocp
或者epoll
获取就绪描述符和绑定的回调函数,进而调用回调函数,因为回调函数是在不同的线程里调用的,所以会存在不同的线程调用同一个socket的回调函数的情况
_service.run
内部在Linux环境下调用的是epoll_wait
返回所有就绪的描述符列表,在windows上会循环调用 GetQueuedCompletionStatus
函数返回就绪的描述符,二者原理类似,进而通过描述符找到对应的注册的回调函数,然后调用回调函数
比如iocp的流程是这样的
epoll的流程是这样的
四、安全隐患
IOThreadPool模式有一个隐患,同一个socket
的就绪后,触发的回调函数可能在不同的线程里,比如第一次是在线程1,第二次是在线程3,如果这两次触发间隔时间不大,那么很可能出现不同线程并发访问数据的情况,比如在处理读事件时,第一次回调触发后我们从socket的接收缓冲区读数据出来,第二次回调触发,还是从socket的接收缓冲区读数据,就会造成两个线程同时从socket中读数据的情况,会造成数据混乱
五、利用strand改进
1.原理
对于多线程触发回调函数的情况,我们可以利用asio提供的串行类 strand
封装一下,这样就可以被串行调用了,其基本原理就是在线程各自调用函数时取消了直接调用的方式,而是利用一个strand类型的对象将要调用的函数投递到strand管理的队列中,再由一个统一的线程调用回调函数,调用是串行的,解决了线程并发带来的安全问题
2.模式图
图中当socket就绪后并不是由多个线程调用每个socket
注册的回调函数,而是将回调函数投递给strand
管理的队列,再由strand
统一调度派发。
为了让回调函数被派发到strand
的队列,我们只需要在注册回调函数时加一层strand的包装即可’’
3.调用strand
我们直接在触发回调函数前绑定strand即可
在CSession
类中添加一个成员变量
strand<io_context::executor_type> _strand;
CSession
的构造函数
CSession::CSession(boost::asio::io_context& io_context, CServer* server):_socket(io_context), _server(server), _b_close(false),_b_head_parse(false), _strand(io_context.get_executor()){boost::uuids::uuid a_uuid = boost::uuids::random_generator()();_uuid = boost::uuids::to_string(a_uuid);_recv_head_node = make_shared<MsgNode>(HEAD_TOTAL_LEN);
}
可以看到_strand
的初始化是放在初始化列表里,利用io_context.get_executor()
返回的执行器构造strand
因为在asio
中无论iocontext
还是strand
,底层都是通过executor
调度的,我们将他理解为调度器就可以了,如果多个iocontext
和strand
的调度器是一个,那他们的消息派发统一由这个调度器执行
我们利用iocontext
的调度器构造strand
,这样他们统一由一个调度器管理。在绑定回调函数的调度器时,我们选择strand绑定即可
比如我们在Start
函数里添加绑定 ,将回调函数的调用者绑定为_strand
void CSession::Start(){::memset(_data, 0, MAX_LENGTH);_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),boost::asio::bind_executor(_strand, std::bind(&CSession::HandleRead, this,std::placeholders::_1, std::placeholders::_2, SharedSelf())));
}
boost::asio::bind_executor(_strand, handler)
的作用是:把 handler
(回调)绑定到 _strand
上。
_strand
是一个 “串行化执行器”,保证所有绑定到它的 handler 都会被顺序、一个接一个地执行,即使在多线程环境下也不会并发执行。
这样可以避免多线程下的并发访问问题(比如成员变量的竞态条件),保证同一个 session 的回调不会并发执行
六、主线程
#include <iostream>
#include<boost/asio.hpp>
#include"CSession.h"
#include"CServer.h"
#include"AsioIOServicePool.h"
#include"AsioThreadPool.h"using namespace std;
bool bstop = false;
std::condition_variable cond_quit;
std::mutex mutex_quit;int main()
{try {auto pool = AsioThreadPool::GetInstance();boost::asio::io_context ioc;boost::asio::signal_set signals(ioc, SIGINT, SIGTERM);signals.async_wait([&ioc](auto, auto){ioc.stop();pool->Stop();std::unique_lock<std::mutex> lock(mutex_quit);bstop = true;cond_quit.notify_one();});CServer s(pool->GetIOService(), 10086);{std::unique_lock<std::mutex> lock(mutex_quit);while (!bstop){cond_quit.wait(lock); }}}catch (std::exception& e){std::cerr << "Exception: " << e.what();}
}
我们这里设置了条件变量 _bstop
,然后用锁
搭配cond_quit
当触发关闭信号的时候,再修改条件变量 _bstop
使得主线程运行下去,不然会卡住
因为如果没有条件变量,主线程在执行完 CServer s(pool->GetIOService(), 10086)
; 之后,会直接往下走,如果没有其他阻塞代码,主线程就会退出,整个进程也会结束(即使后台有线程池的线程在跑)。
这会导致服务刚启动就立刻退出,无法正常提供服务
七、测试与总结
我们在客户端创建100个线程,每个线程进行500收发信息,我们来测试现在服务器的多线程效果如何
实际的生产和开发中,我们尽可能利用C++特性,使用多核的优势,将io_context
分布在不同的线程中效率更可取一点,但也要防止线程过多导致cpu切换带来的时间片开销,所以尽量让开辟的线程数小于或等于cpu的核数,从而利用多核优势