当前位置: 首页 > news >正文

C++ 网络编程(14) asio多线程模型IOThreadPool

🎯 C++ Boost.Asio 多线程模型快速上手指南:从单线程到多 io_context
📅 更新时间:2025年7月3日
🏷️ 标签:C++ | Boost.Asio | 多线程 | 网络编程 | io_context

文章目录

  • 前言
  • 一、结果模式图
  • 一、IOThreadPool.h实现
    • 1.代码部分
    • 2.代码详解
      • 1.详解一
      • 2.详解二
      • 3.详解三
      • 4.详解四
  • 二、IOThreadPool.cpp实现
    • 1.代码部分
    • 2.代码思路解析
      • 1.构造函数
      • 2.Stop()
  • 三、全局思路
  • 四、安全隐患
  • 五、利用strand改进
    • 1.原理
    • 2.模式图
    • 3.调用strand
  • 六、主线程
  • 七、测试与总结


前言

前文我们学习了一种asio的多线程模式,根据CPU的核数,创建多了线程与多个上下文io_context,然后每个线程使用一个io_context进行调用
今天我们学习第二种asio多线程的模式,使用同一个上下文io_context 然后在多个线程中被调用


一、结果模式图

在这里插入图片描述

一、IOThreadPool.h实现

1.代码部分

#pragma once
#include"Singleton.h"
#include<boost/asio.hpp>class AsioThreadPool:public Singleton<AsioThreadPool>
{
public:friend class Singleton<AsioThreadPool>;~AsioThreadPool() {};AsioThreadPool& operator=(const AsioThreadPool&) = delete;AsioThreadPool(const AsioThreadPool&) = delete;boost::asio::io_context& GetIOService();//返回io_context;void Stop();private:AsioThreadPool(int threadNum = std::thread::hardware_concurrency());boost::asio::io_context _service;std::unique_ptr<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>> work_guard;std::vector<std::thread> _threads;};

2.代码详解

1.详解一

我们还是用单例模式,所以这边直接继承我们的单例模板即可

class AsioThreadPool:public Singleton<AsioThreadPool>

2.详解二

我们既然使用的是单例模式,所以我们把拷贝构造和赋值都给禁用掉

AsioThreadPool& operator=(const AsioThreadPool&) = delete;
AsioThreadPool(const AsioThreadPool&) = delete;

3.详解三

boost::asio::io_context& GetIOService();//返回io_context;

这个函数是用来返回我们用的上下文io_context

4.详解四

std::unique_ptr<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>> 
work_guard;

这个work_guard是用来绑定上下文io_context 防止在没有任务的时候,io_context提前退出

二、IOThreadPool.cpp实现

1.代码部分

#include "AsioThreadPool.h"
#include<boost/asio.hpp>
#include"Singleton.h"AsioThreadPool::AsioThreadPool(int threadNum):work_guard(new boost::asio::executor_work_guard<boost::asio::io_context::executor_type>(_service.get_executor()))
{for (int i = 0; i < threadNum;i++)//插入线程{_threads.emplace_back([this](){_service.run();});}
}boost::asio::io_context& AsioThreadPool::GetIOService()
{return _service;
}void AsioThreadPool::Stop()
{work_guard.reset();for (auto& t : _threads){t.join();}}

2.代码思路解析

1.构造函数

构造函数中我们要初始化一个work_guard去绑定上下文,防止其提前释放

work_guard(new boost::asio::executor_work_guard<boost::asio::io_context::executor_type>(_service.get_executor()))

然后我们要创建多个线程,并且在每个线程中都启动上下文

for (int i = 0; i < threadNum;i++)//插入线程{_threads.emplace_back([this](){_service.run();});}

2.Stop()

我们在停止函数首先要将绑定到上下文的work_guard解除
然后对于每个线程我们要等待所有线程走完

三、全局思路

构造函数中实现了一个线程池线程池里每个线程都会运行_service.run函数,_service.run函数内部就是从iocp或者epoll获取就绪描述符和绑定的回调函数,进而调用回调函数因为回调函数是在不同的线程里调用的,所以会存在不同的线程调用同一个socket的回调函数的情况
_service.run 内部在Linux环境下调用的是epoll_wait返回所有就绪的描述符列表,在windows上会循环调用 GetQueuedCompletionStatus函数返回就绪的描述符,二者原理类似,进而通过描述符找到对应的注册的回调函数,然后调用回调函数

比如iocp的流程是这样的
在这里插入图片描述
epoll的流程是这样的
在这里插入图片描述

四、安全隐患

IOThreadPool模式有一个隐患,同一个socket的就绪后,触发的回调函数可能在不同的线程里,比如第一次是在线程1,第二次是在线程3,如果这两次触发间隔时间不大,那么很可能出现不同线程并发访问数据的情况,比如在处理读事件时,第一次回调触发后我们从socket的接收缓冲区读数据出来,第二次回调触发,还是从socket的接收缓冲区读数据,就会造成两个线程同时从socket中读数据的情况,会造成数据混乱

五、利用strand改进

1.原理

对于多线程触发回调函数的情况,我们可以利用asio提供的串行类 strand封装一下,这样就可以被串行调用了,其基本原理就是在线程各自调用函数时取消了直接调用的方式,而是利用一个strand类型的对象将要调用的函数投递到strand管理的队列中,再由一个统一的线程调用回调函数,调用是串行的,解决了线程并发带来的安全问题

2.模式图

在这里插入图片描述
图中当socket就绪后并不是由多个线程调用每个socket注册的回调函数,而是将回调函数投递给strand管理的队列,再由strand统一调度派发。

为了让回调函数被派发到strand的队列,我们只需要在注册回调函数时加一层strand的包装即可’’

3.调用strand

我们直接在触发回调函数前绑定strand即可

CSession类中添加一个成员变量

strand<io_context::executor_type> _strand;

CSession的构造函数

CSession::CSession(boost::asio::io_context& io_context, CServer* server):_socket(io_context), _server(server), _b_close(false),_b_head_parse(false), _strand(io_context.get_executor()){boost::uuids::uuid  a_uuid = boost::uuids::random_generator()();_uuid = boost::uuids::to_string(a_uuid);_recv_head_node = make_shared<MsgNode>(HEAD_TOTAL_LEN);
}

可以看到_strand的初始化是放在初始化列表里,利用io_context.get_executor()返回的执行器构造strand

因为在asio中无论iocontext还是strand底层都是通过executor调度的,我们将他理解为调度器就可以了,如果多个iocontextstrand的调度器是一个,那他们的消息派发统一由这个调度器执行

我们利用iocontext的调度器构造strand,这样他们统一由一个调度器管理。在绑定回调函数的调度器时,我们选择strand绑定即可

比如我们在Start函数里添加绑定 ,将回调函数的调用者绑定为_strand

void CSession::Start(){::memset(_data, 0, MAX_LENGTH);_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),boost::asio::bind_executor(_strand, std::bind(&CSession::HandleRead, this,std::placeholders::_1, std::placeholders::_2, SharedSelf())));
}

boost::asio::bind_executor(_strand, handler) 的作用是:把 handler(回调)绑定到 _strand 上。
_strand 是一个 “串行化执行器”,保证所有绑定到它的 handler 都会被顺序、一个接一个地执行,即使在多线程环境下也不会并发执行
这样可以避免多线程下的并发访问问题(比如成员变量的竞态条件),保证同一个 session 的回调不会并发执行

六、主线程


#include <iostream>
#include<boost/asio.hpp>
#include"CSession.h"
#include"CServer.h"
#include"AsioIOServicePool.h"
#include"AsioThreadPool.h"using namespace std;
bool bstop = false;
std::condition_variable cond_quit;
std::mutex mutex_quit;int main()
{try {auto pool = AsioThreadPool::GetInstance();boost::asio::io_context ioc;boost::asio::signal_set signals(ioc, SIGINT, SIGTERM);signals.async_wait([&ioc](auto, auto){ioc.stop();pool->Stop();std::unique_lock<std::mutex> lock(mutex_quit);bstop = true;cond_quit.notify_one();});CServer s(pool->GetIOService(), 10086);{std::unique_lock<std::mutex> lock(mutex_quit);while (!bstop){cond_quit.wait(lock); }}}catch (std::exception& e){std::cerr << "Exception: " << e.what();}
}

我们这里设置了条件变量 _bstop ,然后用搭配cond_quit 当触发关闭信号的时候,再修改条件变量 _bstop
使得主线程运行下去,不然会卡住

因为如果没有条件变量,主线程在执行完 CServer s(pool->GetIOService(), 10086); 之后,会直接往下走,如果没有其他阻塞代码,主线程就会退出,整个进程也会结束(即使后台有线程池的线程在跑)
这会导致服务刚启动就立刻退出,无法正常提供服务

七、测试与总结

我们在客户端创建100个线程,每个线程进行500收发信息,我们来测试现在服务器的多线程效果如何
在这里插入图片描述

实际的生产和开发中,我们尽可能利用C++特性,使用多核的优势,将io_context分布在不同的线程中效率更可取一点,但也要防止线程过多导致cpu切换带来的时间片开销,所以尽量让开辟的线程数小于或等于cpu的核数,从而利用多核优势

http://www.lryc.cn/news/579299.html

相关文章:

  • PyTorch 安装使用教程
  • EXCEL小妙招——判断A列和B列是否相等
  • AI时代SEO关键词策略
  • cv610将音频chn0配置为g711a,chn1配置为 aac编码,记录
  • Java 大视界 -- Java 大数据机器学习模型在自然语言处理中的跨语言信息检索与知识融合(331)
  • Docker:容器化技术的基石与实践指南
  • 机器学习在智能能源管理中的应用:需求响应与可再生能源整合
  • ECharts 安装使用教程
  • 计算机视觉的新浪潮:扩散模型(Diffusion Models)技术剖析与应用前景
  • 第8章网络协议-NAT
  • 多种方法实现golang中实现对http的响应内容生成图片
  • 50天50个小项目 (Vue3 + Tailwindcss V4) ✨ | ButtonRippleEffect(按钮涟漪效果)
  • springboot切面编程
  • Softhub软件下载站实战开发(十):实现图片视频上传下载接口
  • 全角半角空格在网页中占位符和编码emsp;ensp;
  • CentOS 6操作系统安装
  • 毫米波雷达 – 深度学习
  • ubuntu 22.04 LTS 安装preempt-rt
  • C++2d我的世界V1.4
  • 模型预测专题:强鲁棒性DPCC
  • YOLOv11剪枝与量化(二)通道剪枝技术原理
  • Dify 工作流全栈解析:从零构建你的 AI 应用流程引擎
  • 【Java面试】Redis的poll函数epoll函数区别?
  • springboot 显示打印加载bean耗时工具类
  • 【大模型学习 | MINIGPT-4原理】
  • MYSQL基础内容
  • dial tcp 10.1.68.88:3306: connect: cannot assign requested address
  • Python 数据分析:numpy,说人话,说说数组维度。听故事学知识点怎么这么容易?
  • 深度剖析NumPy核心函数reshape()
  • 使用Scapy构造OSPF交互报文欺骗网络设备与主机建立Full关系