当前位置: 首页 > news >正文

Boost.Asio 异步写:为什么多次 async_write_some 会导致乱序,以及如何解决

Boost.Asio 异步写:为什么多次 async_write_some 会导致乱序,以及如何解决

问题背景

在网络编程中,我们经常需要向对端发送多条消息,例如:

WriteToSocketErr("Hello ");
WriteToSocketErr("World");

这两个函数内部调用:

_socket->async_write_some(...);

很多人会以为:既然是在同一个线程里顺序调用两次 async_write_some,发送顺序也应该是固定的,对端就会先收到 "Hello " 再收到 “World”。

但事实并非如此 —— 对端有可能先收到 “World”,再收到 "Hello "。

为什么会出现乱序?

1. async_write_some 是异步的

当我们调用:

async_write_some(buffer, handler);

这个函数会立即返回,并不是立刻执行底层 write(),而是:

  • 把写操作和回调注册到事件循环(io_context)
  • 等到 epoll(或 kqueue 等)通知 fd 可写时,再调用回调执行实际的写操作

2. 多个异步写操作的竞争

如果我们连续快速发起两个 async_write_some

async_write_some("Hello ", handler1);
async_write_some("World", handler2);

这两个操作都会注册在 io_context 的任务队列里:

  • 当 epoll 通知 fd 可写时,io_context.run() 会调度回调执行
  • 关键问题:哪个回调先被调度执行是不确定的,取决于事件循环内部的调度顺序
  • 可能先执行 handler2,再执行 handler1

结果:

  • 内核先执行 write(fd, "World", 5),再执行 write(fd, "Hello ", 6)
  • 对端先收到 “World”,后收到 "Hello "

3. TCP 协议层面的保证

需要澄清的是:

  • TCP 保证同一次 write() 调用的内容不会被拆散;“World” 永远是连续的
  • TCP 保证数据包的顺序性:先写入内核缓冲区的数据先发送
  • 但是,如果多个 async_write_some 的回调执行顺序不确定,就会导致写入内核缓冲区的顺序不确定

总结:乱序的根本原因

  • async_write_some 只是注册回调,不是立即写入
  • 同时注册多个 async_write_some 时,Boost.Asio 无法保证它们的执行顺序
  • 执行顺序的不确定性导致了最终发送顺序的不确定性

解决方案:使用发送队列

要让对端收到严格有序的数据,关键原则是:

同一时刻只发起一次异步写操作,上一次写完成后再发起下一次写。

实现方案

  1. 定义消息队列和状态标志
class Session {
private:std::queue<std::shared_ptr<MsgNode>> _send_queue;bool _writing = false;  // 标记是否有正在进行的写操作std::shared_ptr<asio::ip::tcp::socket> _socket;
};
  1. 封装发送函数
void Session::Send(const std::string& data) {auto node = std::make_shared<MsgNode>(data.c_str(), data.size());_send_queue.push(node);// 如果当前没有正在进行的写操作,启动写操作if (!_writing) {DoWrite();}
}
  1. 串行写操作的核心函数
void Session::DoWrite() {if (_send_queue.empty()) {_writing = false;return;}_writing = true;auto node = _send_queue.front();_socket->async_write_some(asio::buffer(node->_msg + node->_cur_len, node->_total_len - node->_cur_len),[this, node](const boost::system::error_code& ec, std::size_t bytes_transferred) {if (!ec) {node->_cur_len += bytes_transferred;if (node->_cur_len < node->_total_len) {// 当前消息还没写完,继续写剩余部分DoWrite();} else {// 当前消息写完了,处理下一个消息_send_queue.pop();DoWrite();}} else {// 错误处理_writing = false;HandleError(ec);}});
}

方案优势

  • 保证顺序:写操作是串行的,对端收到顺序与 Send() 调用顺序一致
  • 处理部分写入async_write_some 可能只写入部分数据,代码能正确处理
  • 高效:避免了阻塞等待,保持了异步编程的优势

线程安全性考虑

单线程情况(推荐)

如果你的程序采用典型的单线程 Boost.Asio 模型:

  • 只在一个线程内调用 io_context.run()
  • 所有对消息队列的操作都在这个线程里完成

→ 不需要加锁

原因

  • Boost.Asio 在单线程模型下,所有回调都在 io_context.run() 中顺序执行
  • 不会出现并发访问 _send_queue 的情况
  • 天然线程安全

多线程情况

如果出现以下情况:

  • 多个线程同时调用 io_context.run()
  • 或者其他线程直接访问 _send_queue

→ 需要加锁保护队列

class Session {
private:std::queue<std::shared_ptr<MsgNode>> _send_queue;std::mutex _queue_mutex;  // 保护队列的互斥锁bool _writing = false;
};void Session::Send(const std::string& data) {auto node = std::make_shared<MsgNode>(data.c_str(), data.size());{std::lock_guard<std::mutex> lock(_queue_mutex);_send_queue.push(node);if (!_writing) {DoWrite();}}
}

实践建议

  1. 优先使用单线程模型:简单、高效、不需要考虑锁的问题
  2. 避免直接使用多个 async_write_some:始终通过队列机制来保证顺序
  3. 错误处理:在写操作失败时,要正确重置 _writing 状态
  4. 使用 async_write 而非 async_write_some:如果不需要处理部分写入,可以使用 async_write 来简化代码

进阶话题

1. 为什么不推荐用 async_write?

你可能会想:既然 async_write_some 有乱序问题,为什么不直接用 async_write

// 看起来更简单的方式
asio::async_write(*_socket, asio::buffer("Hello "), handler1);
asio::async_write(*_socket, asio::buffer("World"), handler2);

问题依然存在

  • async_write 内部也是异步的,多次调用仍然会有乱序问题
  • 它只是保证单次写操作的完整性,不解决多次写操作的顺序问题

正确做法:仍然需要队列机制。

2. 性能优化:批量写入

如果你的应用需要频繁发送小消息,可以考虑批量写入:

void Session::DoWrite() {if (_send_queue.empty()) {_writing = false;return;}_writing = true;// 批量处理多个小消息std::vector<asio::const_buffer> buffers;std::vector<std::shared_ptr<MsgNode>> current_batch;// 收集一批消息(最多N个或总大小不超过M字节)while (!_send_queue.empty() && current_batch.size() < 10) {auto node = _send_queue.front();_send_queue.pop();buffers.push_back(asio::buffer(node->_msg, node->_total_len));current_batch.push_back(node);}// 一次性写入多个bufferasio::async_write(*_socket, buffers,[this, current_batch](const boost::system::error_code& ec, std::size_t bytes_transferred) {if (!ec) {DoWrite();  // 继续处理下一批} else {_writing = false;HandleError(ec);}});
}

3. 常见陷阱与调试技巧

陷阱1:忘记处理部分写入

// 错误示例:假设async_write_some总是写完所有数据
_socket->async_write_some(buffer, [](auto ec, auto bytes) {// 危险!可能只写了部分数据
});

陷阱2:在回调中直接递归调用

// 可能导致栈溢出
void DoWrite() {_socket->async_write_some(buffer, [this](auto ec, auto bytes) {DoWrite();  // 危险的递归调用});
}

调试技巧

  • 使用 Wireshark 抓包验证发送顺序
  • 添加日志记录每次写操作的时间戳和内容
  • 在测试环境中故意增加网络延迟来暴露问题

4. 完整的消息节点实现

class MsgNode {
public:MsgNode(const char* data, size_t len) : _total_len(len), _cur_len(0) {_msg = new char[len];memcpy(_msg, data, len);}~MsgNode() {delete[] _msg;}char* _msg;size_t _total_len;size_t _cur_len;// 禁止拷贝MsgNode(const MsgNode&) = delete;MsgNode& operator=(const MsgNode&) = delete;
};

相关技术对比

方案优点缺点适用场景
多个async_write_some简单直接乱序问题❌ 不推荐
发送队列保证顺序,高效代码复杂度稍高✅ 推荐
同步写入简单,天然有序阻塞,性能差低并发场景
批量写入高性能实现复杂高频小消息

总结

Boost.Asio 的 async_write_some 是异步的,多次调用会导致执行顺序不确定,从而造成发送数据乱序。

解决方案

  • 使用发送队列,确保同一时刻只有一个活跃的写操作
  • 在单线程模型下不需要加锁
  • 在多线程模型下需要用互斥锁保护队列

核心原则:串行化写操作,保证数据发送的顺序性。

进阶优化

  • 对于高频小消息,考虑批量写入
  • 注意处理部分写入的情况
  • 避免递归调用导致的栈溢出

这个问题在实际项目中很常见,理解其根本原因和解决方案对于编写可靠的网络程序至关重要。

http://www.lryc.cn/news/587847.html

相关文章:

  • 机器学习中的朴素贝叶斯(Naive Bayes)模型
  • 微软发布BioEmu模型
  • Web3:Foundry使用指南
  • 银河麒麟KYSEC安全机制详解
  • 《C++初阶之STL》【泛型编程 + STL简介】
  • 宝塔面板常见问题
  • 【算法】贪心算法:将数组和减半的最少操作次数C++
  • ubuntu22.04下配置qt5.15.17开发环境
  • 2.查询操作-demo
  • 解决Chrome此扩展程序不再受支持,因此已停用
  • 代数基本定理
  • 史上最清楚!读者,写者问题(操作系统os)
  • 美联储降息趋缓叠加能源需求下调,泰国证券交易所新一代交易系统架构方案——高合规、强韧性、本地化的跨境金融基础设施解决方案
  • 软考 系统架构设计师系列知识点之杂项集萃(110)
  • 在Adobe Substance 3D Painter中,已经有基础图层,如何新建一个图层A,clone基础图层的纹理和内容到A图层
  • K8S的Helm包管理器
  • WebView 性能调试全流程:卡顿问题实战还原与优化路径解析
  • 基于 Gitlab、Jenkins与Jenkins分布式、SonarQube 、Nexus 的 CiCd 全流程打造
  • 考完数通,能转云计算/安全方向吗?转型路径与拓展路线分析
  • 计算机毕业设计Java医学生在线学习平台系统 基于 Java 的医学生在线学习平台设计与开发 Java 医学在线教育学习系统的设计与实现
  • 【云服务器安全相关】如何使用 `ping` 命令排查云服务器网络连接问题
  • Java实现文件自动下载,XXL-Job定时任务中的HTTP文件下载最佳实践
  • JAVA 设计模式 适配器
  • 设计模式之适配器模式:让不兼容的接口协同工作的艺术
  • 闲庭信步使用图像验证平台加速FPGA的开发:第十四课——图像二值化的FPGA实现
  • 使用aiohttp实现高并发爬虫
  • 未来手机会自动充电吗
  • vscode 源码编译
  • TCP半关闭
  • 使用layui的前端框架过程中,无法加载css和js怎么办?