当前位置: 首页 > news >正文

项目实战二:RPC

一.初识RPC

RPC的具体作用是做到让客户端调用服务端的服务像是调用本地函数一样。所以RPC配合TCP和UDP都行,只要能发送一帧 json 让服务端能收到并且响应就行。

二. 项目粗略流程

一个客户端一个服务端,双方在能正常通信的情况下,算作网络层封装结束。

为我们RPC项目构建一个简单“字典”,上面记录服务端能允许的函数信息。

客户端   json_encode  以及  服务端  parser 实现

三.  项目问题

思路问题:

开发需要解决的问题:

1.若客户端多线程调用,如何判断哪个响应是哪个线程发出的?

        每一个请求协议加上callerid标识

2.多线程发送会出现tcp粘包的情况,如何解?

        每个请求/响应头两个字节用作长度记录,接收方先读前两字节再读后面

3.如何避免数据被篡改?

        请求/响应数据包前加上CRC循环冗余验证

具体处理分包和粘包代码:

接收:
char header[6] = {0};
int ret = recv(connfd,hreader,6,0);    //第一次接收unsigned int crc32 = *(unsigned int*)header;
unsigned short len = *(unsigned short*)(header+4);    //从第一次接收里取出长度char *data = malloc((len + 1) * sizeof(char *));
memset(data,0,(len + 1) * sizeof(char *));
ret = recv(connfd,data,len,0);    //第二次接收free(data);
发送:char *header2 = rpc_header_encode(response);
ret = send(connfd,header2,6,0);
ret = send(connfd,response,strlen(response),0);
其中header_encode:
char *rpc_header_encode(char *msg) {char *header = rpc_malloc(KRPC_HEADER_LENGTH);unsigned int crc32 = calc_crc32(msg, strlen(msg));*(unsigned int *)header = crc32;*(unsigned short*)(header+4) = (unsigned short)strlen(msg);return header;
}
free(header2);

代码具体问题:

4.用户调用不同的函数,但是流程都是一样的 encode  -> session -> decode,只有传的参数不同,用户端如何根据不同的函数组织不同的json发送给服务器?

答:

给json里面传method方法名
const char *zrpc_caller_name() { //返回上两级调用的函数的函数名void *return_addr = __builtin_return_address(1);Dl_info info;if (0 != dladdr(return_addr, &info) && info.dli_sname != NULL) {return info.dli_sname;}return 0;
}
char *rpc_request_json_encode(int numargs, ...) { //caller_name的上一级const char *method = zrpc_caller_name();cJSON_AddStringToObject(root, "method", method); //写入json method字段...
}
char *sayhello(char *msg,int length){    //caller_name的上两级char *request = rpc_request_json_encode(2,msg,length); //由参数 生成json...
}

5.用户调用函数,需要根据传入的内容组织一个json,但是问题是,参数的值可以直接传入但是用户端如何知道用户传入参数的名字?比如下图右边的 "a"  和 "b"

参数解析:method:告诉服务器调用哪个方法。callerid:可以标识调用同一个函数的不同线程,也可以权限判断。

6.同样是组织json的时候,我们需要设置一个值来接收用户传进来的参数的值,但是问题是用户可能传int 也可能传char *,接收参数该设置什么类型呢?

5+6答:通过字典,我们能够根据函数名找到对应的结点item,里面会存有对应函数中参数的类型和名称以及参数个数。参照这个来构建json的params部分。

char *rpc_request_json_encode(int numargs, ...) {...struct zrpc_func *func = zrpc_get_caller_table(); //获取表头while (func) {if (0 == strcmp(func->method, method)) break;func = func->next;}if (func == NULL) return NULL;cJSON *params = cJSON_CreateObject(); //组织参数va_list args;//通过内存地址偏移逐个读取栈上的参数va_start(args, numargs);//根据numargs的地址,定位可变参数的起始位置int i = 0;for (i = 0;i < func->count;i ++) {if (0 == strcmp(func->types[i], "int")) {//向JSON对象 params里面添加一个 key-value,key是func->params[i],value是va_arg(args, int)cJSON_AddNumberToObject(params, func->params[i], va_arg(args, int));} else if (0 == strcmp(func->types[i], "double")) {cJSON_AddNumberToObject(params, func->params[i], va_arg(args, double));} else if (0 == strcmp(func->types[i], "float")) {cJSON_AddNumberToObject(params, func->params[i], va_arg(args, double));} else if (0 == strcmp(func->types[i], "char *") || 0 == strcmp(func->types[i], "char*")) {cJSON_AddStringToObject(params, func->params[i], va_arg(args, char *));} else {printf("types: %s\n", func->types[i]);assert(0);}}...
}

7.RPC只是提供给用户一个“直接连接”服务器,能够让函数 encode->session->decode 中把算力交给服务器的工具,那要如何制定一个标准,让用户使用的时候,符合我们的代码风格?避免出现用户写一个函数我们就要服务器对应实现一个接口。

:register.json,相当于一本字典。里面存储了服务端的ip和端口信息,以及提供的函数的相关信息。

8.如何存储这个register.json?

服务端负责链式存储,但是客户端和服务端都能获取。register.json里面是有许多函数的信息,而每一个函数信息都被存入一个结构体结点,所有的register.json,即为一个结构体链表。

客户端:到链表中比对找到对应的method相同的结点,参照结点的函数信息构建json request。

服务端:到链表中比对找到对应的method相同的结点,执行对应的parser函数。

7+8答:register相关代码实现:

​
register相关的两个函数:char *rpc_read_register_json(char *filename){    //把register从磁盘读出来int fd = open(filename,O_RDONLY);if(fd == -1){perror(open);return NULL;}off_t file_size = lseek(fd,0,SEEK_END);//long longlseek(fd,0,SEEK_SET);char *buffer = rpc_malloc(file_size + 1);if(buffer == NULL){perror("malloc");close(fd);return NULL;}int rlen = read(fd,buffer,file_size);buffer[rlen] = '\0';close(fd);return buffer;
}int rpc_store_register_json(char *json){    //构建存储方式方便操作cJSON *root = cJSON_Parse(json);cJSON *remote = cJSON_GetObjectItem(root,"remote");rpc_server_ip = cJSON_GetStringValue(remote);cJSON *port = cJSON_GetObjectItem(root,"port");rpc_server_port = cjson_port->valueint;cJSON *config = cJSON_GetObjectItem(root,"config");int config_size = cJSON_GetArraySize(config);int i = 0;cJSON *item = NULL;for(i = 0;i < config_size;i++){//构建每个函数结点  加入链表//构建结点struct rpc_func *func = (struct rpc_func *)rpc_malloc(sizeof(struct rpc_func));memset(func,0,sizeof(struct rpc_func));item = cJSON_GetArrayItem(config,i);cJSON *method = cJSON_GetObjectItem(item, "method");func->method = method->valuestring;cJSON *rettype = cJSON_GetObjectItem(item, "rettype");func->rettype = method->valuestring;cJSON *params = cJSON_GetObjectItem(item, "params");int params_size = cJSON_GetArraySize(params);cJSON *types = cJSON_GetObjectItem(item, "types");int types_size = cJSON_GetArraySize(types);assert(params_size == types_size); //指参数个数和参数类型个数相等int j = 0;for(j = 0;j < params_size;j++){cJSON *param = cJSON_GetArrayItem(params,j);func->params[j] = param->valuestring;cJSON *type = cJSON_GetArrayItem(types,j);func->types[j] = type->valuestring;}func->count = params_size; //参数用了几个//插入链表func->next = rpc_caller_table;rpc_caller_table = func;}return 0;
}​

四.项目代码

下面按照执行流程给出代码:

pre:用户端加载字典

1.用户调用

int main(int argc, char *argv[]) {if (argc != 2)return 0;rpc_load_register(argv[1]);char *res = sayhello("abc", 3);printf("res = %s\n",res);
}

2.跳转到method.c中的sayhello函数

char *sayhello(char *msg, int length) {	//客户端调用函数char *request = rpc_request_json_encode(2, msg, length); char *response = rpc_client_session(request);	char *result = rpc_response_json_decode(response);  char *ret = strdup(result);free(result);free(response);free(request);return ret;
}

3.执行第一个函数:参照字典组织request json

char *rpc_request_json_encode(int numargs, ...) {//组织一个json 3部分:method函数名,params函数参数,callerid标识cJSON *root = cJSON_CreateObject(); //组织methodconst char *method = rpc_caller_name();cJSON_AddStringToObject(root, "method", method);struct rpc_func *func = rpc_get_caller_table(); //获取表头while (func) {if (0 == strcmp(func->method, method))break;func = func->next;}if (func == NULL)return NULL;cJSON *params = cJSON_CreateObject(); //组织参数va_list args;//通过内存地址偏移逐个读取栈上的参数va_start(args, numargs);//根据numargs的地址,定位可变参数的起始位置int i = 0;for (i = 0; i < func->count; i ++) {if (0 == strcmp(func->types[i], "int")) {//向JSON对象 params里面添加一个 key-value,key是func->params[i],value是va_arg(args, int)cJSON_AddNumberToObject(params, func->params[i], va_arg(args, int));} else if (0 == strcmp(func->types[i], "double")) {cJSON_AddNumberToObject(params, func->params[i], va_arg(args, double));} else if (0 == strcmp(func->types[i], "float")) {cJSON_AddNumberToObject(params, func->params[i], va_arg(args, double));} else if (0 == strcmp(func->types[i], "char *") || 0 == strcmp(func->types[i], "char*")) {cJSON_AddStringToObject(params, func->params[i], va_arg(args, char *));} else {printf("types: %s\n", func->types[i]);assert(0);}}va_end(args);//将另外两个JSON对象写到root对象里面 key是params value是传好的值cJSON_AddItemToObject(root, "params", params);cJSON_AddNumberToObject(root, "callerid", rpc_get_callerid());char *out = cJSON_Print(root); //取出整个JSON   作为dataprintf("send the request json of %s:\n%s",func->method,out);//cJSON_Delete(params);cJSON_Delete(root);return out;
}

4.执行第二个函数,和服务端建立连接,发送数据等待接收

char *rpc_client_session(char *request) {const char *ip = "192.168.88.9";unsigned short port = 9096;int connfd = connect_tcpserver(ip, port);//sendchar *header = rpc_header_encode(request);int sent = send(connfd, header, RPC_HEADER_LENGTH, 0);sent = send(connfd, request, strlen(request), 0);free(header);// recv    char resheader[RPC_HEADER_LENGTH] = {0};
-------------------------卡住等待服务端执行---------------------------------------------------------int ret = recv(connfd, resheader, RPC_HEADER_LENGTH, 0);assert (ret == RPC_HEADER_LENGTH);unsigned int crc32 = *(unsigned int *)resheader;unsigned short length = *(unsigned short *)(resheader + 4);char *payload = rpc_malloc((length + 1) * sizeof(char *));if (!payload)return 0;memset(payload, 0, (length + 1) * sizeof(char *));ret = recv(connfd, payload, length, 0);if (ret) {//printf("recv response json success,total length is :%d\n", ret);}assert(ret == length);close(connfd);return payload;}

pre:服务器加载字典

5.服务器监听到端口有请求,接收后等待处理,处理后send返回

void server_session(void *arg) {int connfd = *(int *)arg;free(arg);int ret = 0;struct pollfd fds;fds.fd = connfd;fds.events = POLLIN;while (1) {char header[6] = {0};int ret = recv(connfd, header, 6, 0); //第一次接收if (ret == 0) {close(connfd);break;}unsigned int crc32 = *(unsigned int *)header;unsigned short len = *(unsigned short *)(header + 4); //从第一次接收里取出长度char *data = rpc_malloc((len + 1) * sizeof(char *));memset(data, 0, (len + 1) * sizeof(char *));ret = recv(connfd, data, len, 0); //第二次接收//printf("recv : %s\n", data);
------------------------卡住等待parse处理--------------------------------------------------char *response = rpc_parser(data);	//业务处理char *header2 = rpc_header_encode(response);ret = send(connfd, header2, 6, 0);ret = send(connfd, response, strlen(response), 0);if (ret) {//printf("send response json success,total length is :%d\n", ret);}rpc_free(header2);rpc_free(data);}
}

6.服务器,解析request json后参照字典找到对应函数再执行对应handler

char *rpc_parser(char *json) {   //1.decode request  解析请求cJSON *root = cJSON_Parse(json);if (root == NULL)return NULL;cJSON *method = cJSON_GetObjectItem(root, "method");cJSON *params = cJSON_GetObjectItem(root, "params");cJSON *callerid = cJSON_GetObjectItem(root, "callerid");struct rpc_func *func = rpc_get_caller_table();//遍历字典while (func) {if (0 == strcmp(method->valuestring, func->method)) {printf("find the same method aboud %s\n", func->method);struct rpc_task *task = (struct rpc_task *)malloc(sizeof(struct rpc_task));//为什么需要构建一个task?//解答:符合设计模式,把静态的参数统一传入,利于延展性task->method = method->valuestring;task->callerid = callerid->valueint;//2.handler  计算结果
-------------------------卡住等待结果计算---------------------------------------------------char *result = func->handler(params, task); //rpc_response_json_encode_sayhelloprintf("get the result json of %s:\n%s", func->method, result);//难点:如何把handler和对应的函数绑定?//解答:在解析register.json的时候,通过函数名,到method.c编译完//		生成的动态库中找到函数入口地址,返回给handler完成初始化return result;}func = func->next;}
}//handle指向的函数:
char *rpc_response_json_encode_sayhello(cJSON *params, struct rpc_task *task) {cJSON *cjson_msg = cJSON_GetObjectItem(params, "msg");cJSON *cjson_length = cJSON_GetObjectItem(params, "length");char *ret =  rpc_handle_sayhello(cjson_msg->valuestring, cjson_length->valueint);//3.将计算结果打包成jsoncJSON *root = cJSON_CreateObject();cJSON_AddStringToObject(root, "method", task->method);cJSON_AddStringToObject(root, "result", ret);cJSON_AddNumberToObject(root, "callerid", task->callerid);char *out = cJSON_Print(root);cJSON_Delete(root);return out;
}char *rpc_handle_sayhello(char *msg, int length) { //具体计算过程int i = 0;for (i = 0; i < length / 2; i ++) {char tmp = msg[i];msg[i] = msg[length - i - 1];msg[length - i - 1] = tmp;}return msg;
}

7.执行第三个函数:客户端接收到response json,解析取出result对应的value值,返回给第三个函数。

        三个函数都执行完,最开始调用的函数sayhello也返回,至此流程结束,用户拿到答案!

char *rpc_response_json_decode(char *json) {cJSON *root = cJSON_Parse(json); //将响应的字符串json解析为json对象if (root == NULL) {return NULL;}cJSON *result = cJSON_GetObjectItem(root, "result");return cJSON_Print(result);//返回到下面的函数
}char *sayhello(char *msg, int length) {	//客户端调用函数char *request = rpc_request_json_encode(2, msg, length);  char *response = rpc_client_session(request);	char *result = rpc_response_json_decode(response);  resultchar *ret = strdup(result);free(result);free(response);free(request);return ret; //返回到下面的函数
}

结果:

服务端:

客户端:

五.性能测试

客户端请求1w次的qps

六.功能拓展

通过增加一个函数max来感受增加一个功能需要做什么:

第一步:在register.json中增加一个函数信息

第二步:在method.c中添加提供给客户端调用的函数

第三步:在method.c中添加服务端handler的具体函数

第四步:在method.h中添加对应头文件

结果:

待完善:自动生成代码

在register.json上新提供一个函数,就自动生成服务端代码,让用户可以直接通过接口调用。

思路:参考上面添加max,可以通过fprint,在method.c和method.h中添加对应的代码,红框圈起来的地方即需要根据函数具体参数信息if-else判断修改的地方。

代码参考:

https://github.com/0voice

本文代码存储:

He Jiangtao / 9.2_RPC · GitLab

http://www.lryc.cn/news/609826.html

相关文章:

  • 17.6 超拟人大模型CharacterGLM技术解析:92.7%角色一致性+虚拟偶像互动提升300%,如何吊打GPT-4?
  • C++-异常
  • Python----大模型(量化 Quantization)
  • MySQL详解(一)
  • 从零开始的云计算生活——项目实战
  • 商标续展如果逾期了还有办法补救吗?
  • 消息系统技术文档
  • 学习嵌入式第十九天
  • 系统一个小时多次Full GC,导致系统线程停止运行,影响系统的性能,可靠性
  • 靶场(二十八)---小白心得靶场体会---Mantis
  • 前端VUE基础环境搭建
  • STM32_Hal库学习SPI
  • ctfshow:pwn85(高级ROP 64 位 Partial-RELRO)、pwn141
  • 探访WAIC2025:当AI成为双刃剑,合合信息如何破解真假难题
  • ZYNQ-按键消抖
  • 如何在 Ubuntu 24.04 LTS 上安装 Docker
  • Baumer工业相机堡盟工业相机如何通过YoloV8深度学习模型实现路口车辆速度的追踪识别(C#代码UI界面版)
  • Apache Spark 的结构化流
  • bypass
  • 基于PSO-NSGAIII混合优化的生产调度算法matlab仿真,输出甘特图,对比PSO和NSGAIII
  • 开源的现代数据探索和可视化平台:Apache Superset 从 PyPI 安装 Superset
  • 基于深度学习的医学图像分析:使用PatchGAN实现医学图像分割
  • 优选算法 力扣 11. 盛最多水的容器 双指针降低时间复杂度 贪心策略 C++题解 每日一题
  • AI开灯的几种方法,与物理世界的交互过渡
  • AUTOSAR CP:深度揭秘APPL层(Application Layer)!SWC分配策略与端口交互的终极指南
  • 交叉验证:原理、作用与在机器学习流程中的位置
  • LeetCode 135:分糖果
  • lodash的替代品es-toolkit详解
  • 认识爬虫 —— xpath提取
  • Go语言高并发价格监控系统设计