当前位置: 首页 > article >正文

mongodb源码分析session异步接受asyncSourceMessage()客户端流变Message对象

mongo/transport/service_state_machine.cpp已经分析startSession创建ASIOSession过程,并且验证connection是否超过限制,ASIOSession和connection是循环接受客户端命令,状态转变流程是:State::Created 》 State::Source 》State::SourceWait 》 State::Process 》 State::SinkWait  》 State::Source 》State::SourceWait

State::Created,     //session刚刚创建,但是还没有接受任何命令
State::Source,      //去接受客户端新的命令
State::SourceWait,  // 等待客户端新的命令
State::Process,     // 将接受的命令发送给mongodb数据库
State:: SinkWait,    // 等待将命令的执行结果返回给客户端

session异步接受asyncSourceMessage()客户端流变Message对象代码调用链如下

  1. mongo/transport/service_state_machine.cpp的_sourceMessage方法,返回viod
  2. mongo/transport/session_asio.h的asyncSourceMessage方法,返回Future<Message>
  3. mongo/transport/session_asio.h的sourceMessageImpl方法,返回Future<Message>
  4. mongo/transport/session_asio.h的read方法,返回Future<void>
  5. mongo/transport/session_asio.h的opportunisticRead方法,返回Future<void>

mongo/transport/service_state_machine.cpp的方法_sourceMessage主要状态State::Source变State::SourceWait,TransportLayerASIO模式包含两种线程模型:adaptive(动态线程模型)和synchronous(同步线程模型)。adaptive模式线程设计采用动态线程方式,线程数和 mongodb压力直接相关。

同步线程模型调用_session()->sourceMessage()获取消息。

动态线程模型调用_session()->asyncSourceMessage()异步获取消息,后面重点分析动态线程异步获取消息逻辑。

void ServiceStateMachine::_sourceMessage(ThreadGuard guard) {invariant(_inMessage.empty());invariant(_state.load() == State::Source);LOG(1) << "conca _sourceMessage State::Source";_state.store(State::SourceWait);LOG(1) << "conca _sourceMessage store State::SourceWait";guard.release();auto sourceMsgImpl = [&] {if (_transportMode == transport::Mode::kSynchronous) {MONGO_IDLE_THREAD_BLOCK;return Future<Message>::makeReady(_session()->sourceMessage());} else {invariant(_transportMode == transport::Mode::kAsynchronous);return _session()->asyncSourceMessage();}};sourceMsgImpl().getAsync([this](StatusWith<Message> msg) {if (msg.isOK()) {_inMessage = std::move(msg.getValue());invariant(!_inMessage.empty());}_sourceCallback(msg.getStatus());});
}

一般来说,每条消息均包含一个标准消息头,并后跟特定于请求的数据。标准消息头的结构如下

struct MsgHeader {int32   messageLength; // total message size, including thisint32   requestID;     // identifier for this messageint32   responseTo;    // requestID from the original request//   (used in responses from the database)int32   opCode;        // message type
}

messageLength

消息的总大小(以字节为单位)。该总数包括保存消息长度的 4 个字节。

requestID

客户端或数据库生成的标识符,可用于唯一标识此消息。

responseTo

从客户端消息中获取的 requestID

opCode

消息类型。 有关详细信息,请参阅操作码。

Mongodb协议由msg header + msg body组成,一个完整的mongodb报文内容格式如下:

后面重点研究_session()->asyncSourceMessage()代码,_session()获取当前_session,对应的实现代码是mongo/transport/session_asio.h,mongo/transport/session_asio.h的asyncSourceMessage方法如下:

  Future<Message> asyncSourceMessage(const BatonHandle& baton = nullptr) override {ensureAsync();return sourceMessageImpl(baton);}

mongo/transport/session_asio.h的sourceMessageImpl方法如下:

Future<Message> sourceMessageImpl(const BatonHandle& baton = nullptr) {static constexpr auto kHeaderSize = sizeof(MSGHEADER::Value);std::cout << "conca sourceMessageImpl" << std::endl;auto headerBuffer = SharedBuffer::allocate(kHeaderSize);auto ptr = headerBuffer.get();return read(asio::buffer(ptr, kHeaderSize), baton).then([headerBuffer = std::move(headerBuffer), this, baton]() mutable {if (checkForHTTPRequest(asio::buffer(headerBuffer.get(), kHeaderSize))) {return sendHTTPResponse(baton);}const auto msgLen = size_t(MSGHEADER::View(headerBuffer.get()).getMessageLength());if (msgLen < kHeaderSize || msgLen > MaxMessageSizeBytes) {StringBuilder sb;sb << "recv(): message msgLen " << msgLen << " is invalid. "<< "Min " << kHeaderSize << " Max: " << MaxMessageSizeBytes;const auto str = sb.str();LOG(0) << str;return Future<Message>::makeReady(Status(ErrorCodes::ProtocolError, str));}LOG(1) << "msgLen:" << msgLen << " kHeaderSize " << kHeaderSize;if (msgLen == kHeaderSize) {// This probably isn't a real case since all (current) messages have bodies.if (_isIngressSession) {networkCounter.hitPhysicalIn(msgLen);}return Future<Message>::makeReady(Message(std::move(headerBuffer)));}auto buffer = SharedBuffer::allocate(msgLen);memcpy(buffer.get(), headerBuffer.get(), kHeaderSize);LOG(1) << " buffer.get() " << buffer.get();MsgData::View msgView(buffer.get());return read(asio::buffer(msgView.data(), msgView.dataLen()), baton).then([this, buffer = std::move(buffer), msgLen]() mutable {if (_isIngressSession) {networkCounter.hitPhysicalIn(msgLen);}return Message(std::move(buffer));});});}

mongo/transport/session_asio.h的sourceMessageImpl代码异步获取消息,先读取kHeaderSize长度数据,再读取Body具体信息。

 read(asio::buffer(ptr, kHeaderSize), baton)读取mongodb头部header数据,解析出header中的messageLength字段。

 if (msgLen < kHeaderSize || msgLen > MaxMessageSizeBytes)检查messageLength字段是否在指定的合理范围,该字段不能小于Header整个头部大小,也不能超过MaxMessageSizeBytes最大长度。

 if (msgLen == kHeaderSize)如果只有头部信息直接返回

Header len检查通过,说明读取header数据完成,read继续读取body信息。

最后将上面步骤读取的buffer封装成Message对象,返回给上级Message,后面再根据message具体调用MongoDB数据库。

mongo/transport/session_asio.h的read方法如下:

 Future<void> read(const MutableBufferSequence& buffers, const BatonHandle& baton = nullptr) {
#ifdef MONGO_CONFIG_SSLif (_sslSocket) {std::cout << "conca read _sslSocket" << std::endl;return opportunisticRead(*_sslSocket, buffers, baton);} else if (!_ranHandshake) {invariant(asio::buffer_size(buffers) >= sizeof(MSGHEADER::Value));std::cout << "conca read !_ranHandshake" << std::endl;return opportunisticRead(_socket, buffers, baton).then([this, buffers]() mutable {_ranHandshake = true;return maybeHandshakeSSLForIngress(buffers);}).then([this, buffers, baton](bool needsRead) mutable {if (needsRead) {return read(buffers, baton);} else {return Future<void>::makeReady();}});}
#endifreturn opportunisticRead(_socket, buffers, baton);}

mongo/transport/session_asio.h的opportunisticRead方法代码,来自 MongoDB 的网络层,是一个使用 Asio 库实现的异步读取函数。它的主要功能是尝试从流中读取数据到缓冲区。

    Future<void> opportunisticRead(Stream& stream,const MutableBufferSequence& buffers,const BatonHandle& baton = nullptr) {std::error_code ec;size_t size;if (MONGO_unlikely(transportLayerASIOshortOpportunisticReadWrite.shouldFail()) &&_blockingMode == Async) {asio::mutable_buffer localBuffer = buffers;std::cout << "conca opportunisticRead asio::read 11" << std::endl;if (buffers.size()) {localBuffer = asio::mutable_buffer(buffers.data(), 1);}size = asio::read(stream, localBuffer, ec);if (!ec && buffers.size() > 1) {ec = asio::error::would_block;}} else {std::cout << "conca opportunisticRead asio::read" << std::endl;size = asio::read(stream, buffers, ec);std::cout << "conca opportunisticRead asio::read size is " << size<< std::endl;}if (((ec == asio::error::would_block) || (ec == asio::error::try_again)) &&(_blockingMode == Async)) {// asio::read is a loop internally, so some of buffers may have been read into already.// So we need to adjust the buffers passed into async_read to be offset by size, if// size is > 0.MutableBufferSequence asyncBuffers(buffers);if (size > 0) {asyncBuffers += size;}std::cout << "conca opportunisticRead asyncBuffers" << std::endl;if (baton && baton->networking()) {return baton->networking()->addSession(*this, NetworkingBaton::Type::In).then([&stream, asyncBuffers, baton, this] {return opportunisticRead(stream, asyncBuffers, baton);});}return asio::async_read(stream, asyncBuffers, UseFuture{}).ignoreValue();} else {return futurize(ec);}}

http://www.lryc.cn/news/2401304.html

相关文章:

  • 【数据分析】什么是鲁棒性?
  • 适老化场景重构:现代家政老年照护虚拟仿真实训室建设方案​
  • Qt/C++学习系列之QGroupBox控件的简单使用
  • Ubuntu设置之初始化
  • 如何轻松地将数据从 iPhone传输到iPhone 16
  • 开源供应链攻击持续发酵,多个软件包仓库惊现恶意组件
  • Docker Compose 备忘
  • 量子计算+AI:特征选择与神经网络优化创新应用
  • 算法分析与设计-动态规划、贪心算法
  • 光伏功率预测新突破:TCN-ECANet-GRU混合模型详解与复现
  • React组件基础
  • 2025年5月24日系统架构设计师考试题目回顾
  • ABP 框架集成 EasyAbp.Abp.GraphQL 构建高性能 GraphQL API
  • C# 用户控件(User Control)详解:创建、使用与最佳实践
  • OpenWrt 搭建 samba 服务器的方法并解决 Windows 不允许访问匿名服务器(0x80004005的错误)的方法
  • 【 Redis | 完结篇 缓存优化 】
  • AI数据集构建:从爬虫到标注的全流程指南
  • Android 颜色百分比对照
  • AI破局:饿了么如何搅动即时零售江湖
  • 04 APP 自动化- Appium toast 元素定位列表滑动
  • 判断它是否引用了外部库
  • 物流项目第十期(轨迹微服务)
  • Python 入门到进阶全指南:从语言特性到实战项目
  • 【数据库】关系数据理论--规范化
  • SQL 中 JOIN 的执行顺序优化指南
  • Oracle双平面适用场景讨论会议
  • OD 算法题 B卷【矩阵稀疏扫描】
  • 使用BERT/BiLSTM + CRF 模型进行NER进展记录~
  • HarmonyOS运动开发:精准估算室内运动的距离、速度与步幅
  • Web攻防-SQL注入高权限判定跨库查询文件读写DNS带外SecurePriv开关绕过