当前位置: 首页 > news >正文

Rpc - RpcCaller 模块

为什么需要 RpcCaller 模块 

这里我们举个例子,想象一下,如果你需要和远方的朋友交流:

简化沟通

  • 没有 RpcCaller:你需要写信→装信封→贴邮票→投递→等待回复
  • 有了 RpcCaller:你只需拿起电话说"嘿,你们那天气怎么样?"

多种联系方式

  • 同步调用:像打电话,你等在电话旁直到得到回答
  • 异步调用:像发短信,你可以先去做别的事,回复来了再看
  • 回调调用:像留言"有消息请回电",对方会在有结果时主动联系你

隐藏复杂性

  • 你不需要知道电话信号如何传输
  • 你只关心"问问题"和"得到答案"
  • RpcCaller 处理所有中间过程

统一接口

  • 不管你是打电话、发短信还是留言
  • 都用同一种方式表达你的请求
  • 不需要学习不同的沟通方式

简单来说,RpcCaller 就是让你能够像调用本地函数一样简单地调用远程服务,而不必关心网络通信的复杂细节,同时还能根据需要选择不同的调用方式。

RpcCall模块的过程

我们要给服务器发送请求,肯定是需要requestor模块来实现的,RpcRequest 对象是关键组件,它封装了远程调用的所有必要信息(方法名、参数、请求ID等)。所以我们在构造函数的时候,要创建一个requestor对象,接着就是实现三个call函数,那这三个函数是干嘛的呢,也就是说这三个call函数是为了简便用户的步骤的,用户只需要把方法名和参数给过来,call函数会在内部,进行一系列的操作,最终获取到结果把结果再返回给用户。这样用户就不用再考虑底层是怎么传输的了。

至于call函数的步骤就是先进行组织请求然后发送请求最后就是等待响应即可     

那么该如何组织请求呢,使用工厂模式创建 RpcRequest 对象,设置唯一标识符 (UUID),设置消息类型 (REQ_RPC),设置方法名和参数。这其中有两次类型转换,第一次是需要把RpcRequest 类型的req_msg对象转换为基类 BaseMessage 的类型,这么转换的原因是因为send函数的参数接受通用的 BaseMessage类型,而不是特定的 RpcRequest类型。第二次转换时把BaseMessage类型的rsp_msg 转换成RpcResponse 类型的,这次转换的原因是因为rsp_msg是BaseMessage::ptr类型,只包含基类方法和属性,但我们需要访问 RpcResponse 特有的方法,如rcode()和result(),这些方法在基类 BaseMessage 中不存在。

对于异步调用函数的结果,不同于同步调用只需要创建一个BaseMessage类型的 rsp_msg即可,它还需要创建一个共享的 promise 对象json_promise ,用于后续设置异步结果,这个json_promise 需要是智能指针,不能定义局部对象,定义局部对象就会导致promise超出作用域之后释放掉,当再获取结果的时候就会出问题。 获取 Future 对象,从 promise 获取对应的 future 对象作为函数输出参数返回给调用者。还需要绑定 RpcCaller::Callback 方法作为回调函数,传递 promise 对象到回调,以便结果到达时设置值,使用 std::placeholders 预留响应消息的位置

对于回调调用函数来说,也是不同于上述两种的,它的结果是通过用户提供的回调函数直接处理,不返回任何结果对象,也就是说需要先设置一个适配器函数Callback1,这个Callback1可以将将Requestor 传来的 BaseMessage 响应转换为用户期望的 JSON 格式,作为底层消息系统和用户代码之间的转换层。然后再通过设置的回调函数Requestor::RequestCallback req_cb =std::bind(&RpcCaller::Callback1,this, cb, std::placeholders::_1);这个回调函数的意思就是当 Requestor 调用 req_cb(response_message) 时,实际执行的是:this->Callback1(cb, response_message),   然后就是响应到达 → Requestor 调用 req_cb(response) → 实际执行 Callback1(cb, response) → 最终调用用户回调 cb(result),这就是一个异步回调编程模式:我们在发送请求时就设置好了接收响应的处理函数,然后立即返回。当响应最终到达时,系统会自动调用这个处理函数,完成后续流程。就好比你给朋友发短信(send)时告诉他"回复请打这个电话号码"(req_cb),当朋友回复时,他拨打的号码会转接到秘书(Callback1),秘书整理信息后通知你(cb)。

流程图

函数分析

构造函数

RpcCaller(const Requestor::ptr &requestor): _requestor(requestor){}
  • 功能:初始化 RpcCaller 对象
  • 参数:接收一个 Requestor 共享指针
  • 作用:存储 Requestor 实例以便后续发送请求使用

同步调用函数

bool call(const BaseConnection::ptr &conn, const std::string &method, const Json::Value &params, Json::Value &result) {DLOG("开始同步rpc调用...");//1. 组织请求auto req_msg = MessageFactory::create<RpcRequest>();req_msg->setId(UUID::uuid());req_msg->setMType(MType::REQ_RPC);req_msg->setMethod(method);req_msg->setParams(params);BaseMessage::ptr rsp_msg;//2. 发送请求bool ret = _requestor->send(conn, std::dynamic_pointer_cast<BaseMessage>(req_msg), rsp_msg);if (ret == false) {ELOG("同步Rpc请求失败!");return false;}DLOG("收到响应,进行解析,获取结果!");//3. 等待响应auto rpc_rsp_msg = std::dynamic_pointer_cast<RpcResponse>(rsp_msg);if (!rpc_rsp_msg) {ELOG("rpc响应,向下类型转换失败!");return false;}if (rpc_rsp_msg->rcode() != RCode::RCODE_OK) {ELOG("rpc请求出错:%s", errReason(rpc_rsp_msg->rcode()));return false;}result = rpc_rsp_msg->result();DLOG("结果设置完毕!");return true;}
  • 功能:
    • 同步方式执行远程调用
  • 参数:
    • conn: 网络连接对象
    • method: 要调用的远程方法名
    • params: JSON格式的参数
    • result: 用于存储调用结果的引用
  • 流程:
    • 创建 RPC 请求消息,设置唯一 ID、类型、方法名和参数
    • 声明响应消息变量 rsp_msg
    • 调用 Requestor 的同步 send 方法发送请求并等待响应
    • 检查发送是否成功
    • 将 BaseMessage 类型响应转为 RpcResponse 类型
    • 检查响应码是否为 OK
    • 从响应中提取 result 字段并设置到输出参数
  • 返回值:
    • 调用成功返回 true,失败返回 false
  • 特点:
    • 阻塞调用,直到收到响应或出错才返回

异步调用函数

bool call(const BaseConnection::ptr &conn, const std::string &method, const Json::Value &params, JsonAsyncResponse &result) {//向服务器发送异步回调请求,设置回调函数,回调函数中会传入一个promise对象,在回调函数中去堆promise设置数据auto req_msg = MessageFactory::create<RpcRequest>();req_msg->setId(UUID::uuid());req_msg->setMType(MType::REQ_RPC);req_msg->setMethod(method);req_msg->setParams(params);auto json_promise = std::make_shared<std::promise<Json::Value>>() ;result = json_promise->get_future();Requestor::RequestCallback cb = std::bind(&RpcCaller::Callback, this, json_promise, std::placeholders::_1);bool ret = _requestor->send(conn, std::dynamic_pointer_cast<BaseMessage>(req_msg), cb);if (ret == false) {ELOG("异步Rpc请求失败!");return false;}return true;}
  • 功能:
    • 异步方式执行远程调用
  • 参数:
    • conn: 网络连接对象
    • method: 要调用的远程方法名
    • params: JSON格式的参数
    • result: 返回 future 对象,用于将来获取结果

异步调用与同步调用的result区别:

result的类型不同:

  • 同步调用:Json::Value &result(普通JSON对象引用)
    • 直接存储结果的变量
    • 函数返回前会被填充结果值
  • 异步调用:JsonAsyncResponse &result(即std::future<Json::Value> &result)
    • 一个future对象,代表"未来会有的结果"
    • 函数立即返回,结果稍后可用

result获取方式不同:

  • 同步调用:函数返回时,结果已经直接放在传入的变量中
  • 异步调用:函数返回时只获得future对象,需要后续调用:
     Json::Value final_result = result.get(); // 获取实际结果,可能会阻塞直到结果可用
  • 流程:
    • 创建 RPC 请求消息,设置唯一 ID、类型、方法名和参数
    • 创建 promise 对象并获取对应的 future
    • 创建回调函数,绑定 Callback 方法和 promise 对象
    • 调用 Requestor 的回调式 send 方法发送请求
    • 检查发送是否成功
    • 设置 future 到输出参数并返回
  • 返回值:
    • 请求发送成功返回 true,失败返回 false
  • 特点:
    • 非阻塞调用,立即返回 future,客户端可以选择何时等待结果

回调式调用函数

 bool call(const BaseConnection::ptr &conn, const std::string &method, const Json::Value &params, const JsonResponseCallback &cb) {auto req_msg = MessageFactory::create<RpcRequest>();req_msg->setId(UUID::uuid());req_msg->setMType(MType::REQ_RPC);req_msg->setMethod(method);req_msg->setParams(params);Requestor::RequestCallback req_cb = std::bind(&RpcCaller::Callback1, this, cb, std::placeholders::_1);bool ret = _requestor->send(conn, std::dynamic_pointer_cast<BaseMessage>(req_msg), req_cb);if (ret == false) {ELOG("回调Rpc请求失败!");return false;}return true;}
  • 功能:
    • 基于回调的远程调用
  • 参数:
    • conn: 网络连接对象
    • method: 要调用的远程方法名
    • params: JSON 格式的参数
    • cb: 用户提供的回调函数,接收 JSON 结果
  • 流程:
    • 创建 RPC 请求消息,设置唯一 ID、类型、方法名和参数
    • 创建内部回调 req_cb,绑定 Callback1 方法和用户回调函数
    • 调用 Requestor 的回调式 send 方法发送请求
    • 检查发送是否成功
  • 返回值:
    • 请求发送成功返回 true,失败返回 false
  • 特点:
    • 非阻塞调用,结果通过回调函数异步处理,适合事件驱动编程

异步回调处理函数

void Callback(std::shared_ptr<std::promise<Json::Value>> result, const BaseMessage::ptr &msg)  {auto rpc_rsp_msg = std::dynamic_pointer_cast<RpcResponse>(msg);if (!rpc_rsp_msg) {ELOG("rpc响应,向下类型转换失败!");return ;}if (rpc_rsp_msg->rcode() != RCode::RCODE_OK) {ELOG("rpc异步请求出错:%s", errReason(rpc_rsp_msg->rcode()));return ;}result->set_value(rpc_rsp_msg->result());}
  • 功能:处理异步调用的响应
  • 参数:
  • result: promise 对象指针
  • msg: 收到的响应消息
  • 流程:
    • 将 BaseMessage 响应转换为 RpcResponse 类型
    • 检查类型转换是否成功
    • 检查响应码是否为 OK
    • 从响应中提取 result 字段并设置到 promise
  • 作用:在响应到达时被调用,处理结果并通过 promise 机制传递给等待的 future
  • 说明:由 Requestor 的 onResponse 方法间接调用

用户回调处理函数

void Callback1(const JsonResponseCallback &cb, const BaseMessage::ptr &msg)  {auto rpc_rsp_msg = std::dynamic_pointer_cast<RpcResponse>(msg);if (!rpc_rsp_msg) {ELOG("rpc响应,向下类型转换失败!");return ;}if (rpc_rsp_msg->rcode() != RCode::RCODE_OK) {ELOG("rpc回调请求出错:%s", errReason(rpc_rsp_msg->rcode()));return ;}cb(rpc_rsp_msg->result());}
  • 功能:处理回调式调用的响应
  • 参数:
  • cb: 用户提供的回调函数
  • msg: 收到的响应消息
  • 流程:
    • 将 BaseMessage 响应转换为 RpcResponse 类型
    • 检查类型转换是否成功
    • 检查响应码是否为 OK
    • 从响应中提取 result 字段并调用用户回调函数
  • 作用:在响应到达时被调用,处理结果并通过用户回调函数返回
  • 说明:由 Requestor 的 onResponse 方法间接调用

所有代码

#pragma once
#include "requestor.hpp"namespace bitrpc {namespace client {class RpcCaller {public:using ptr = std::shared_ptr<RpcCaller>;using JsonAsyncResponse = std::future<Json::Value>;using JsonResponseCallback = std::function<void(const Json::Value&)>;RpcCaller(const Requestor::ptr &requestor): _requestor(requestor){}//requestor中的处理是针对BaseMessage进行处理的//用于在rpccaller中针对结果的处理是针对 RpcResponse里边的result进行的bool call(const BaseConnection::ptr &conn, const std::string &method, const Json::Value &params, Json::Value &result) {DLOG("开始同步rpc调用...");//1. 组织请求auto req_msg = MessageFactory::create<RpcRequest>();req_msg->setId(UUID::uuid());req_msg->setMType(MType::REQ_RPC);req_msg->setMethod(method);req_msg->setParams(params);BaseMessage::ptr rsp_msg;//2. 发送请求bool ret = _requestor->send(conn, std::dynamic_pointer_cast<BaseMessage>(req_msg), rsp_msg);if (ret == false) {ELOG("同步Rpc请求失败!");return false;}DLOG("收到响应,进行解析,获取结果!");//3. 等待响应auto rpc_rsp_msg = std::dynamic_pointer_cast<RpcResponse>(rsp_msg);if (!rpc_rsp_msg) {ELOG("rpc响应,向下类型转换失败!");return false;}if (rpc_rsp_msg->rcode() != RCode::RCODE_OK) {ELOG("rpc请求出错:%s", errReason(rpc_rsp_msg->rcode()));return false;}result = rpc_rsp_msg->result();DLOG("结果设置完毕!");return true;}bool call(const BaseConnection::ptr &conn, const std::string &method, const Json::Value &params, JsonAsyncResponse &result) {//向服务器发送异步回调请求,设置回调函数,回调函数中会传入一个promise对象,在回调函数中去堆promise设置数据auto req_msg = MessageFactory::create<RpcRequest>();req_msg->setId(UUID::uuid());req_msg->setMType(MType::REQ_RPC);req_msg->setMethod(method);req_msg->setParams(params);auto json_promise = std::make_shared<std::promise<Json::Value>>() ;result = json_promise->get_future();Requestor::RequestCallback cb = std::bind(&RpcCaller::Callback, this, json_promise, std::placeholders::_1);bool ret = _requestor->send(conn, std::dynamic_pointer_cast<BaseMessage>(req_msg), cb);if (ret == false) {ELOG("异步Rpc请求失败!");return false;}return true;}bool call(const BaseConnection::ptr &conn, const std::string &method, const Json::Value &params, const JsonResponseCallback &cb) {auto req_msg = MessageFactory::create<RpcRequest>();req_msg->setId(UUID::uuid());req_msg->setMType(MType::REQ_RPC);req_msg->setMethod(method);req_msg->setParams(params);Requestor::RequestCallback req_cb = std::bind(&RpcCaller::Callback1, this, cb, std::placeholders::_1);bool ret = _requestor->send(conn, std::dynamic_pointer_cast<BaseMessage>(req_msg), req_cb);if (ret == false) {ELOG("回调Rpc请求失败!");return false;}return true;}private:void Callback1(const JsonResponseCallback &cb, const BaseMessage::ptr &msg)  {auto rpc_rsp_msg = std::dynamic_pointer_cast<RpcResponse>(msg);if (!rpc_rsp_msg) {ELOG("rpc响应,向下类型转换失败!");return ;}if (rpc_rsp_msg->rcode() != RCode::RCODE_OK) {ELOG("rpc回调请求出错:%s", errReason(rpc_rsp_msg->rcode()));return ;}cb(rpc_rsp_msg->result());}void Callback(std::shared_ptr<std::promise<Json::Value>> result, const BaseMessage::ptr &msg)  {auto rpc_rsp_msg = std::dynamic_pointer_cast<RpcResponse>(msg);if (!rpc_rsp_msg) {ELOG("rpc响应,向下类型转换失败!");return ;}if (rpc_rsp_msg->rcode() != RCode::RCODE_OK) {ELOG("rpc异步请求出错:%s", errReason(rpc_rsp_msg->rcode()));return ;}result->set_value(rpc_rsp_msg->result());}private:Requestor::ptr _requestor;};}
}
http://www.lryc.cn/news/572210.html

相关文章:

  • 获取gitlab上项目分支版本(二)
  • 据字典是什么?和数据库、数据仓库有什么关系?
  • 解锁 JavaScript 模块化:ES6 Module 语法深度指南
  • OpenGL——单位向量点乘和叉乘在几何的应用
  • 从C++编程入手设计模式——装饰器模式
  • implement用法
  • 客户催单-01.需求分析和设计
  • 【单片机】51单片机学习笔记
  • 数据结构 4 (栈和队列)
  • 苍穹外卖-2025 完成基础配置环节(详细图解)
  • 拼多多商家端 anti_content 补环境分析
  • 如何使用 USB 数据线将文件从 PC 传输到 iPhone
  • 【漏洞复现】Apache Kafka Connect 任意文件读取漏洞(CVE-2025-27817)
  • 数控滑台在精密制造中起着至关重要的作用
  • 主成分分析(PCA)例题——给定协方差矩阵
  • camel-ai Agent模块- CriticAgent
  • 用 python 开发一个可调用工具的 AI Agent,实现电脑配置专业评价
  • Vim:从入门到进阶的高效文本编辑器之旅
  • 微信小程序传参过来了,但是数据没有获取到
  • THUCNEWS数据集-文本分类
  • C++(运算符重载)
  • 2025虚幻引擎文件与文件夹命名规律
  • 代理 AI 时代的隐私重构:从边界控制到信任博弈
  • MySQL RC隔离级别惊现间隙锁:是bug吗?
  • 如何在中将网络改为桥接模式并配置固定IP地址
  • openLayers切换基于高德、天地图切换矢量、影像、地形图层
  • Zabbix监控系统安装部署(图文)
  • Linux简单了解以及VM虚拟机的安装使用(后端程序员)
  • 探秘阿里云EBS存储:云计算的存储基石
  • LINUX 619 NFS rsync