当前位置: 首页 > news >正文

ASIO 避坑指南:高效、安全与稳健的异步网络编程

ASIO 避坑指南:高效、安全与稳健的异步网络编程

引言

ASIO是很强大的一个异步io库,但服务器除了高效以外,稳定也是关键,这篇文章主要总结了ASIO使用遇到的典型问题和坑:

  • 如何榨干io_context的性能,让CPU和网卡持续饱和工作?
  • 如何安全地关闭一个连接,避免资源泄漏或程序崩溃?(特别是当异步操作还在进行时)
  • 如何正确地实现一个异步写操作,确保数据完整发送且内存安全?
  • 如何管理跨越多个异步操作的对象生命周期
  • 如何设计缓冲区(Buffer) 才能避免悬垂指针或数据竞争?
  • 如何在多线程环境下安全地操作共享资源?
  • 如何处理错误码,哪些错误码要特殊处理,例如operation_abortedeof

上面这些问题处理不好,轻则导致性能低下、资源泄漏,重则引发程序崩溃、数据错误,是基于ASIO开发服务器必须要了解清楚的点。


一、最大化利用 I/O (榨干 io_context 的性能)

很多人包括很多博客对asio的多线程操作仅仅照搬例子进行介绍,没有真正的线上实战,asio多线程有两种方法:

  1. 单一 I/O 上下文 + 多工作线程

    • io_context 实例
    • 多个线程调用 io_context.run()
    • 事件分发机制:操作系统将就绪事件分配给不同工作线程执行
  2. 多 I/O 上下文 (io_context per Thread)

    • 每个线程独占一个 io_context 实例
    • 每个线程调用自己 io_context.run()
    • 资源隔离:Socket/Timer 绑定到特定线程的 io_context

io_context 的核心是事件循环。一个线程调用 run() 通常足以高效处理数千连接,如果多个线程都执行 run() ,那么事件会分配到多个线程中执行,这样你首先要考虑的是线程安全,每个回调如果调用了共享资源都需要枷锁,这会降低运行效率。

单一 I/O 上下文 + 多工作线程

// 创建线程池执行同一个io_context
asio::io_context io;
asio::thread_pool pool(4); // 4个线程// 多个线程执行同一个io_context的run()
for(int i=0; i<4; ++i){asio::post(pool, [&]{ io.run(); });
}// 注意:所有Handler可能在不同线程执行!
socket.async_read_some(..., [](...){// 需要线程同步!可能被任意线程执行
});

优势

  • 最佳负载均衡:内核自动分配事件到空闲线程
  • 简化资源管理:所有操作共享单一I/O上下文

劣势

  • 锁竞争开销:共享资源访问需要同步,抵消多线程收益

ASIO针对这种情况提供了 strand 进行序列化访问

例如:

// 创建strand绑定到io_context
asio::strand<asio::io_context::executor_type> my_strand = asio::make_strand(io.get_executor());// 通过strand分发处理程序
socket.async_read_some(asio::bind_executor(my_strand, [](...){// 保证同一strand上的handler不会并发执行connections.erase(id); // 无需锁!}
));

每线程独立 I/O 上下文 (io_context per Thread)

这是推荐做法,经过本人验证,能极大提高并发处理能力

// 每个线程拥有独立io_context
std::vector<std::unique_ptr<asio::io_context>> io_contexts;
std::vector<std::thread> threads;for(int i=0; i<4; ++i){io_contexts.emplace_back(std::make_unique<asio::io_context>());threads.emplace_back([ioc=io_contexts.back().get()]{ioc->run(); // 每个线程运行自己的io_context});
}// 连接绑定到特定io_context
auto& io = *io_contexts[connection_id % 4];
tcp::socket socket(io);

优势

  • 处理程序始终在同一线程执行,避免线程切换开销
  • 能更大程度发挥单个io的性能

保证异步操作链的持续

一个异步操作完成时,在其完成处理函数 (Completion Handler) 中发起下一个异步操作(如 async_read 后发起 async_write,或继续 async_read),这样可以保持 I/O 通道持续忙碌,避免轮询

要注意的是,一定要避免在 回调 中做耗时同步操作阻塞事件循环。

高效 Buffer 管理

asio::buffer 是视图: 它不拥有数据,只是指向现有内存块的引用,处理不当会导致野指针、数据损坏或程序崩溃。底层数据必须在异步操作期间保持有效!

绝对要避免使用栈分配的内容做作为 Buffer,例如下面这个就是典型的错误:

//错误示范
void do_async_write(tcp::socket& socket) {char buffer[1024]; // 错误:栈分配缓冲区generate_data(buffer, 1024); // 填充数据// 异步写操作 - 缓冲区可能在函数返回后失效!socket.async_write_some(asio::buffer(buffer, 1024),[](const asio::error_code& ec, size_t bytes) {// 此时原buffer栈帧已销毁 - 野指针访问!});
} // 函数退出,栈缓冲区被销毁!

除非你用的是协程模式,否则不要用栈分配内存做 Buffer,因为异步操作结束后,栈内存会被回收,Buffer 就会变成无效的,过一段时间在执行回调你的buffer里面就是野指针,因此要严格保证 async_read/async_write 使用的 buffer 底层内存在其整个操作期间(从调用开始到 Handler 执行结束)有效且不被修改

你应该使用智能指针来分配缓冲,并让这个智能指针跟随回调函数,直至回调函数结束,典型的就是让lambda把这个智能指针捕获,让它跟着回调函数的生命周期。

void send_large_data(tcp::socket& socket) {// 使用shared_ptr管理堆缓冲区auto buf = std::make_shared<std::vector<char>>(generate_large_data());asio::async_write(socket, asio::buffer(*buf),// 捕获智能指针延长生命周期[buf](const asio::error_code& ec, size_t) {// 缓冲区在lambda销毁前保持有效});
}

或者作为session的成员变量

class Connection : public std::enable_shared_from_this<Connection> {std::array<char, 8192> buffer_; // 成员缓冲区void start_read() {auto self(shared_from_this());socket_.async_read_some(asio::buffer(buffer_),[self](const asio::error_code& ec, size_t length) {if (!ec) self->process_data(length);});}
};

如果是linux系统,还可以用零拷贝缓冲区注册方法,让io和回调都操作这个缓冲区,从而避免了数据拷贝。

// 注册持久内存到io_context
auto buf = std::make_shared<std::array<char, 4096>>();
asio::io_context& ioc = socket.get_executor().context();// 显式注册缓冲区(Linux专属优化)
const bool registered = asio::register_buffer(ioc, asio::buffer(*buf), asio::buffer_registration::permanent);socket.async_read_some(asio::buffer(*buf),[buf](const asio::error_code& ec, size_t bytes) {// 缓冲区保持注册状态});

这种尤其适合高频小包数据的处理

安全关闭 Socket 和连接

关闭是异步编程中最容易出资源泄漏或崩溃的地方。关闭做的不好,会出现如下问题:

  • 资源泄漏(文件描述符、内存)
  • 大量CLOSE_WAIT状态连接
  • 程序崩溃(访问已销毁对象)
  • 数据丢失(未发送完的数据)

Socket的关闭有shutdown()close()两个行数

socket.shutdown

shutdown可以理解为是关闭通知,有三种模式(shutdown_receive, shutdown_send, shutdown_both),通知对端“我不会再发数据了”(shutdown_send)或“我不想再收数据了”(shutdown_receive)。

shutdown执行后,后续的 async_read 会立即完成并返回 asio::error::shut_down (如果接收端关闭),后续的 async_write 会立即完成并返回 asio::error::shut_down (如果发送端关闭)。

需要注意的是,shutdown()后,Socket 描述符依然有效。

socket.close

socket.close会释放系统资源(Socket 描述符),它会隐式地执行 shutdown(both)。任何挂起(Pending)的异步操作(async_read, async_write, async_connect 等)会立即取消,它们的回调函数会被调用,并传入 asio::error::operation_aborted 错误码。

因此,在读写回调中,遇到asio::error::operation_aborted 错误码要特殊处理,避免重复关闭

回调函数设计时,应检查错误码,如果是 operation_aborted,通常意味着 Socket 正在被关闭/销毁,回调函数应该:

  • 忽略这个操作的结果。
  • 清理相关的资源(如释放为这次操作分配的 Buffer)。
  • 避免再访问Socket

当调用 socket.close() 取消操作时,包含 socket 的对象(如 connection)可能正在被销毁

安全关闭方法

关闭分服务器主动关闭,以及对方客户端主动关闭,两种不同方式的关闭处理方式不太一样

  • 服务器主动关闭
  1. 标记关闭开始,执行shutdown(socket, asio::ip::tcp::socket::shutdown_receive); // 告诉对方我不再接收数据了
  2. 检查是否有待发送数据,无数据 → 立即关闭,有数据 → 等待当前写操作完成
// 在管理类中触发关闭
void ConnectionManager::stop_all() {for (auto& conn : connections_) {conn->safe_shutdown();}
}// Connection::safe_shutdown实现:
void safe_shutdown() {if (shutdown_initiated_.exchange(true)) return;// 1. 停止接收新数据socket_.shutdown(shutdown_receive);// 2. 检查写状态if (!writing_) {final_close();  // 无待发数据直接关闭}// 否则等待进行中的写操作完成
}
  • 客户端主动关闭
  1. async_read Handler 中检测到 error_code == asio::error::eof (对方正常关闭发送端)
  2. (可选)如果还有数据要发送,可以尝试发送(但对方可能已关闭接收端,会出错)。
  3. 调用 socket.close()

下面是一个客户端的安全关闭示例:

void Connection::handle_read_error(asio::error_code ec) {if (ec == asio::error::eof) {// 客户端发送了FIN包safe_shutdown();}else if (ec == asio::error::operation_aborted) {// 正常关闭过程中的取消// 不进行任何操作,连接即将销毁return;}
}

因此,一个安全的关闭不仅仅是close,还要针对不同的错误码来执行不同的关闭策略,在接收和发送的错误码处理不一样,建议一个回话应该对错误码处理单独提取一个函数,如下:

class Connection : public std::enable_shared_from_this<Connection> {
private:asio::ip::tcp::socket socket_;std::queue<std::vector<char>> write_queue_;bool writing_;std::atomic<bool> shutdown_initiated_;std::array<char, 1024> read_buffer_;
public:Connection(asio::ip::tcp::socket socket): socket_(std::move(socket)), writing_(false),shutdown_initiated_(false) {}void start() {read_header();  // 开始读循环}void safe_shutdown() {if (shutdown_initiated_.exchange(true)) return;// 1. 停止接受新数据asio::error_code ec;socket_.shutdown(asio::ip::tcp::socket::shutdown_receive, ec);// 忽略错误:可能已关闭// 2. 检查写队列if (!writing_) {// 无待发送数据,直接关闭final_close();} else {// 等待进行中的写操作完成// final_close将在写回调中调用}}private:void read_header() {auto self(shared_from_this());socket_.async_read_some(asio::buffer(read_buffer_),[this, self](asio::error_code ec, size_t length) {if (ec) {handle_read_error(ec);return;}process_data(length);read_header();  // 继续读});}void handle_read_error(asio::error_code ec) {if (ec == asio::error::eof) {// 客户端正常关闭safe_shutdown();} else if (ec == asio::error::operation_aborted) {// 关闭过程中的正常取消} else {// 其他错误final_close();}}void async_write_data(std::vector<char> data) {// 将数据加入队列bool write_in_progress = !write_queue_.empty();write_queue_.push(std::move(data));if (!write_in_progress && !writing_) {start_write_chain();}}void start_write_chain() {writing_ = true;auto self(shared_from_this());auto& buf = write_queue_.front();asio::async_write(socket_, asio::buffer(buf),[this, self](asio::error_code ec, size_t /*bytes*/) {writing_ = false;if (ec) {handle_write_error(ec);return;}write_queue_.pop();if (!write_queue_.empty()) {start_write_chain();} else if (shutdown_initiated_) {// 所有数据已发送,安全关闭final_close();}});}void handle_write_error(asio::error_code ec) {if (ec == asio::error::operation_aborted) {// 正常取消,忽略} else if (ec == asio::error::broken_pipe || ec == asio::error::connection_reset) {// 连接已断开final_close();} else {// 其他错误处理safe_shutdown();}}void final_close() {asio::error_code ignore_ec;// 取消所有异步操作socket_.cancel(ignore_ec);// 关闭socketsocket_.shutdown(asio::ip::tcp::socket::shutdown_both, ignore_ec);socket_.close(ignore_ec);// 清理资源decltype(write_queue_) empty;std::swap(write_queue_, empty);}
};

上面的例子不仅展示了安全关闭,还包含了安全发送,关闭和发生接收关联紧密,因此很难用一句函数就能涵盖,上面的例子的发送用来一个队列,这引出下一节,如何用asio进行高并发的安全的异步写操作

安全的异步写操作

除了上面提到的buffer有效性外,异步写操作有其特定要点:

  • 同一 Socket 上的多个并发 async_write 操作是未定义行为! TCP 是流协议,数据顺序必须保证。
  • 必须使用队列(见上面的例子)。同一时间只允许一个 async_write 操作在进行,等回调完了之后,再写入下一个

写回调应该作如下工作:

  • 检查 error_code (包括 operation_aborted,具体见上节安全关闭)。
  • 处理 bytes_transferred (通常成功时等于请求量)。
  • 释放或标记该次写操作使用的 Buffer 可重用/释放。
  • **检查写队列:如果队列非空,取出下一批数据发起新的async_write,如果队列为空,设置 writing_ = false

如上面的例子所示,安全大并发的异步写操作,应该如下:

class Connection : public std::enable_shared_from_this<Connection> {
private:asio::ip::tcp::socket socket_;std::queue<std::vector<char>> write_queue_;bool writing_;std::atomic<bool> shutdown_initiated_;std::array<char, 1024> read_buffer_;
public://这里省略其他函数,在安全关闭里已经展示完整代码void async_write_data(std::vector<char> data) {// 将数据加入队列bool write_in_progress = !write_queue_.empty();write_queue_.push(std::move(data));if (!write_in_progress && !writing_) {start_write_chain();}}void start_write_chain() {writing_ = true;auto self(shared_from_this());auto& buf = write_queue_.front();asio::async_write(socket_, asio::buffer(buf),[this, self](asio::error_code ec, size_t /*bytes*/) {writing_ = false;if (ec) {handle_write_error(ec);return;}write_queue_.pop();if (!write_queue_.empty()) {start_write_chain();} else if (shutdown_initiated_) {// 所有数据已发送,安全关闭final_close();}});}void handle_write_error(asio::error_code ec) {if (ec == asio::error::operation_aborted) {// 正常取消,忽略} else if (ec == asio::error::broken_pipe || ec == asio::error::connection_reset) {// 连接已断开final_close();} else {// 其他错误处理safe_shutdown();}}
};
  • 要有个写队列,示例中的std::queue<std::shared_ptr<std::string>> write_queue_;
  • 写状态标记writing_,主要作用是在关闭时,检查是否还没写完,没写完就等写完再关闭socket
  • 关闭标记shutdown_initiated_,这个主要作用是关闭标记,如果写完发现已经关闭,直接调用close,通过shutdown_initiated_writing_可以实现安全的关闭,同时保证写数据不会丢失
http://www.lryc.cn/news/575737.html

相关文章:

  • 游戏App前端安全加固:利用AI云防护技术抵御恶意攻击
  • vue3 json 转 实体
  • 临床开发计划:从实验室到市场的战略蓝图
  • day48-硬件学习之GPT定时器、UART及I2C
  • 面试150 判断子序列
  • 【已解决】Android Studio gradle遇到unresolved reference错误
  • 鸿蒙 SplitLayout 组件解析:折叠屏分割布局与多端适配指南
  • 视频关键帧提取
  • 跟着AI学习C#之项目实战-电商平台 Day1
  • Python打卡:Day36
  • mac电脑安装vscode的力扣插件报错解决办法
  • 板凳-------Mysql cookbook学习 (十--11)
  • Spring Boot高并发 锁的使用方法
  • Flutter 多平台项目开发指南
  • 使用java语言,计算202503291434距离当前时间,是否大于三天
  • SQL SERVER存储过程
  • 赋能 Java 工程,飞算科技重新定义智能开发
  • 自动化测试--app自动化测试之给手机设置锁屏图案
  • 桌面小屏幕实战课程:DesktopScreen 11 SPI 水墨屏
  • httpClient连接配置超时时间该设置多长才合适?
  • 从提示工程(Prompt Engineering)到上下文工程(Context Engineering)
  • 【RabbitMQ】多系统下的安装配置与编码使用(python)
  • 深入解析Python多服务器监控告警系统:从原理到生产部署
  • Linux IPV4/IPV6配置终极指南
  • 解锁AI无限潜能!景联文科技数据产品矩阵再升级:多语言题库、海量语料、垂域代码库,全面赋能大模型训练
  • java解决超大二维矩阵数组引起的内存占用过大问题
  • 【redis使用场景——缓存——数据过期策略 】
  • 新手向:Neo4j的安装与使用
  • CTF:PHP 多关卡绕过挑战
  • Flink部署与应用——Flink架构概览