当前位置: 首页 > news >正文

1.RPC基本原理

文章目录

  • RPC
    • 1.定义
    • 2.概念
    • 3.优缺点
    • 4.RPC结构
    • 5.RPC消息协议
      • 5.1 消息边界
      • 5.2 内容
      • 5.3 压缩
    • 6.RPC的实现
      • 6.1 divide_protocol.py
      • 6.2 server.py
      • 6.3 client.py

RPC

1.定义

远程过程调用(remote procedure call)

2.概念

广义:所有通过网络进行通讯,的调用统称为RPC调用

狭义:不采用http协议的方式,采用自定义格式的二进制方式

3.优缺点

  • 优点
    • 效率高
    • 发起rpc调用的一方,可以忽略RPC的具体实现,如同编写本地函数调用
  • 缺点
    • 通用性不高

4.RPC结构

  • client(caller):调用者
  • client stub(bundle args/unbundle ret vals):客户端存根
  • client network service
  • server network service
  • server stub(bundle ret vals/unbundle args)

5.RPC消息协议

5.1 消息边界

  • 分隔符(\r\n)
  • 长度声明法(例如HTTP中 Content-Length)

5.2 内容

  • 二进制
  • 文本内容

5.3 压缩

  • 压缩处理是一把双刃剑,减少数据量减轻带宽压力同时,额外增加了压缩和解压缩的时间

6.RPC的实现

6.1 divide_protocol.py

import struct
from io import BytesIOclass InvalidOperation(Exception):...class DivideProtocol(object):"""float divide(1:int num1, 2:int num2=1)"""def _read_all(self, size):"""读取指定长度的字节:param size: 长度:return: 读取出的二进制数据"""if isinstance(self.conn, BytesIO):# BytesIO类型,用于演示buff = b''have = 0while have < size:chunk = self.conn.read(size - have)have += len(chunk)buff += chunkreturn buffelse:# socket类型buff = b''have = 0while have < size:chunk = self.conn.recv(size - have)have += len(chunk)buff += chunk# 客户端关闭了连接if len(chunk) == 0:raise EOFError()return buffdef args_encode(self, num1, num2=1):"""对调用参数进行编码:param num1: int:param num2: int:return: 编码后的二进制数据"""# 处理参数num1, 4字节整型buff = struct.pack('!B', 1)buff += struct.pack('!i', num1)# 处理参数num2, 4字节整型,如为默认值1,则不再放到消息中if num2 != 1:buff += struct.pack('!B', 2)buff += struct.pack('!i', num2)# 处理消息总长度,4字节无符号整型length = len(buff)# 处理方法名,字符串类型name = 'divide'# 字符串长度,4字节无符号整型msg = struct.pack('!I', len(name))msg += name.encode()msg += struct.pack('!I', length) + buffreturn msgdef args_decode(self, connection):"""获取调用参数并进行解码:param connection: 传输工具对象,如socket对象或者BytesIO对象,从中可以读取消息数据:return: 解码后的参数字典"""# 保存到当前对象中,供_read_all方式使用self.conn = connectionparam_name_map = {1: 'num1',2: 'num2'}param_len_map = {1: 4,2: 4}# 用于保存解码后的参数字典args = dict()# 读取消息总长度,4字无节符号整数buff = self._read_all(4)length = struct.unpack('!I', buff)[0]# 记录已读取的长度have = 0# 读取第一个参数,4字节整型buff = self._read_all(1)have += 1param_seq = struct.unpack('!B', buff)[0]param_len = param_len_map[param_seq]buff = self._read_all(param_len)have += param_lenargs[param_name_map[param_seq]] = struct.unpack('!i', buff)[0]if have >= length:return args# 读取第二个参数,4字节整型buff = self._read_all(1)have += 1param_seq = struct.unpack('!B', buff)[0]param_len = param_len_map[param_seq]buff = self._read_all(param_len)have += param_lenargs[param_name_map[param_seq]] = struct.unpack('!i', buff)[0]return argsdef result_encode(self, result):"""对调用的结果进行编码:param result: float 或 InvalidOperation对象:return: 编码后的二进制数据"""if isinstance(result, float):# 没有异常,正常执行# 处理结果类型,1字节无符号整数buff = struct.pack('!B', 1)# 处理结果值, 4字节floatbuff += struct.pack('!f', result)else:# 发生了InvalidOperation异常# 处理结果类型,1字节无符号整数buff = struct.pack('!B', 2)# 处理异常结果值, 字符串# 处理字符串长度, 4字节无符号整数buff += struct.pack('!I', len(result.message))# 处理字符串内容buff += result.message.encode()return buffdef result_decode(self, connection):"""对调用结果进行解码:param connection: 传输工具对象,如socket对象或者BytesIO对象,从中可以读取消息数据:return: 结果数据"""self.conn = connection# 取出结果类型, 1字节无符号整数buff = self._read_all(1)result_type = struct.unpack('!B', buff)[0]if result_type == 1:# float的结果值, 4字节floatbuff = self._read_all(4)result = struct.unpack('!f', buff)[0]return resultelse:# InvalidOperation对象# 取出字符串长度, 4字节无符号整数buff = self._read_all(4)str_len = struct.unpack('!I', buff)[0]buff = self._read_all(str_len)message = buff.decode()return InvalidOperation(message)class MethodProtocol(object):def __init__(self, connection):self.conn = connectiondef _read_all(self, size):"""读取指定长度的字节:param size: 长度:return: 读取出的二进制数据"""if isinstance(self.conn, BytesIO):# BytesIO类型,用于演示buff = b''have = 0while have < size:chunk = self.conn.read(size - have)have += len(chunk)buff += chunkreturn buffelse:# socket类型buff = b''have = 0while have < size:print('have=%d size=%d' % (have, size))chunk = self.conn.recv(size - have)have += len(chunk)buff += chunkif len(chunk) == 0:raise EOFError()return buffdef get_method_name(self):# 获取方法名# 读取字符串长度,4字节无符号整型buff = self._read_all(4)str_len = struct.unpack('!I', buff)[0]# 读取字符串buff = self._read_all(str_len)name = buff.decode()return name

6.2 server.py

import socket
import threadingfrom customize_rpc.divide_protocol import DivideProtocol, MethodProtocol, InvalidOperationclass Handlers:@staticmethoddef divide(num1, num2=1):"""除法:param num1::param num2::return:"""if num2 == 0:raise InvalidOperation()val = num1 / num2return valclass ServerStub(object):def __init__(self, connection, handlers):"""服务器存根:param connection: 与客户端的socket连接:param handlers: 存放被调用的方法"""self._process_map = {'divide': self._process_divide,}self.conn = connectionself.method_proto = MethodProtocol(self.conn)self.handlers = handlersdef process(self):"""被服务器调用的入口,服务器收到请求后调用该方法"""# 获取解析调用请求的方法名name = self.method_proto.get_method_name()# 调用对应的处理方法self._process_map[name]()def _process_divide(self):"""执行divide本地调用,并将结果返回给客户端"""# 接收调用参数proto = DivideProtocol()args = proto.args_decode(self.conn)# 进行本地divide调用try:result = self.handlers.divide(**args)except InvalidOperation as e:result = e# 构造返回值消息并返回result = proto.result_encode(result)self.conn.sendall(result)class Server(object):def __init__(self, host, port, handlers):self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)self.host = hostself.port = portself.sock.bind((host, port))self.handlers = handlersdef serve(self):"""开始服务"""self.sock.listen(128)print("开始监听")while True:conn, addr = self.sock.accept()print("建立链接%s" % str(addr))stub = ServerStub(conn, self.handlers)try:while True:stub.process()except EOFError:print("客户端关闭连接")# 关闭服务端连接conn.close()class ThreadServer(object):def __init__(self, host, port, handlers):self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)self.host = hostself.port = portself.sock.bind((host, port))self.handlers = handlersdef serve(self):"""开始服务"""self.sock.listen(128)print("开始监听")while True:conn, addr = self.sock.accept()print("建立链接%s" % str(addr))t = threading.Thread(target=self.handle, args=(conn,))t.start()def handle(self, client):stub = ServerStub(client, self.handlers)try:while True:stub.process()except EOFError:print("客户端关闭连接")client.close()if __name__ == '__main__':server = Server('127.0.0.1', 8000, Handlers)server.serve()    

6.3 client.py

import time
import socketfrom customize_rpc.divide_protocol import DivideProtocol, InvalidOperationclass Channel(object):"""连接通道"""def __init__(self, host, port):self.host = hostself.port = portdef get_connection(self):"""获取一个tcp连接"""sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)sock.connect((self.host, self.port))return sockclass ClientStub(object):"""客户端存根"""def __init__(self, channel: Channel):self.channel = channelself.conn = self.channel.get_connection()def divide(self, num1, num2=1):# 构造proto = DivideProtocol()args = proto.args_encode(num1, num2)self.conn.sendall(args)result = proto.result_decode(self.conn)if isinstance(result, InvalidOperation):raise resultelse:return resultif __name__ == '__main__':channel = Channel('127.0.0.1', 8000)stub = ClientStub(channel)for i in range(5):try:val = stub.divide(i * 100, 10)except InvalidOperation as e:print(e.message)else:print(val)time.sleep(1)
http://www.lryc.cn/news/511237.html

相关文章:

  • vue2/3,Spring Boot以及生产环境跨域解决方案
  • 【centos8 镜像修改】centos8 镜像修改阿里云
  • 多线程编程初探:掌握基本概念与核心原理
  • 【信息系统项目管理师】第13章:项目资源管理过程详解
  • vue3封装而成的APP ,在版本更新后,页面显示空白
  • GEE云计算、多源遥感、高光谱遥感技术蓝碳储量估算;红树林植被指数计算及提取
  • 【知识】cuda检测GPU是否支持P2P通信及一些注意事项
  • 用 Python 生成功能强大的二维码工具(支持自定义颜色与 Logo)
  • RTX5 数据队列传输流程
  • 24.try块怎么用 C#例子
  • 【机器学习 | 数据挖掘】智能推荐算法
  • 120.【C语言】数据结构之快速排序(详解Hoare排序算法)
  • uniapp通过v-if进行判断时,会出现闪屏?【已解决】
  • 各种网站(学习资源、常用工具及其他,持续更新中~)
  • 网络技术-QoS策略以及如何定义 流分类,流行为,流策略
  • 线程晨考day20
  • 【ES6复习笔记】迭代器(10)
  • MySQL的TIMESTAMP类型字段非空和默认值属性的影响
  • 【Linux进程】初悉进程
  • Python学习之路(5)— 使用C扩展
  • 动态规划34:446. 等差数列划分 II - 子序列
  • PPT画图——如何设置导致图片为600dpi
  • 【模块系列】STM321.69TFT屏幕
  • 大模型辅助测试的正确打开方式?
  • 三相电的相电压、线电压、额定值、有效值,变比,零序电压,零序电流,三相三线制的三角形连接,三相四线制的星形连接
  • 电商网站的基础用户数在100万,日活跃用户数在1万左右,系统下单TPS最大支持1000,应用服务要保证高可用。请预估该网站每天的使用成本。
  • 线性代数期末总复习的点点滴滴(1)
  • python+reportlab创建PDF文件
  • 2024最新qrcode.min.js生成二维码Demo
  • 【Microi吾码】开源力量赋能低代码创新,重塑软件开发生态格局