当前位置: 首页 > news >正文

从零开始实现一个简单的 RPC 框架(Java 版)

一、什么是 RPC?

RPC(Remote Procedure Call,远程过程调用)是一种进程间通信技术,允许程序像调用本地函数一样调用远程服务器上的方法。在分布式系统中,RPC 是非常常见且核心的技术之一,它隐藏了底层网络通信的复杂性,使得开发人员可以专注于业务逻辑的设计与实现。

常见的 RPC 框架:

  • Dubbo:阿里巴巴开源的高性能、轻量级 RPC 框架。
  • gRPC:Google 推出的基于 HTTP/2 的高性能 RPC 框架。
  • Thrift:Facebook 开源的跨语言服务框架。
  • Spring Cloud Feign / OpenFeign:集成在 Spring 生态中的声明式 REST 客户端。

这些成熟的框架功能强大,但如果我们想理解其背后的原理,最好的方式就是自己动手实现一个简单的版本。


二、目标与设计思路

我们要实现的是一个最简版的 RPC 框架,具备以下基本功能:

  1. 服务提供者注册服务接口
  2. 服务消费者调用远程方法
  3. 使用 Netty 或 Socket 进行网络通信
  4. 使用序列化机制传输数据(如 JSON 或 JDK 序列化)
  5. 支持同步调用

我们将采用经典的客户端-服务端架构,整体流程如下:

[客户端] --> 发送请求 --> [服务端]
[服务端] --> 处理请求并返回结果 --> [客户端]

三、项目结构设计

为了便于组织代码,我们按照模块划分如下:

simple-rpc/
├── simple-rpc-api/         # 公共接口定义
├── simple-rpc-server/      # 服务提供方
├── simple-rpc-client/      # 服务消费方
└── simple-rpc-common/      # 公共工具类、协议、序列化等

四、公共接口定义(simple-rpc-api)

首先,我们定义一个服务接口,供服务提供方和消费方共同依赖。

// HelloService.java
public interface HelloService {String sayHello(String name);
}

这个接口会在服务端被实现,在客户端被调用。


五、服务提供方实现(simple-rpc-server)

1. 服务实现类

// HelloServiceImpl.java
public class HelloServiceImpl implements HelloService {@Overridepublic String sayHello(String name) {System.out.println("收到请求:" + name);return "Hello, " + name;}
}

2. 启动服务端

我们使用 Netty 来搭建 TCP 服务器,监听客户端请求。

添加 Maven 依赖(Netty):
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.96.Final</version>
</dependency>
编写服务启动类:
// RpcServer.java
public class RpcServer {private final int port;public RpcServer(int port) {this.port = port;}public void start() throws Exception {ServerBootstrap bootstrap = new ServerBootstrap();EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new RpcDecoder())     // 解码请求.addLast(new RpcEncoder())     // 编码响应.addLast(new RpcServerHandler());}});ChannelFuture future = bootstrap.bind(port).sync();System.out.println("RPC 服务已启动,监听端口:" + port);future.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}public static void main(String[] args) throws Exception {new RpcServer(8080).start();}
}

3. 请求处理器(RpcServerHandler)

// RpcServerHandler.java
public class RpcServerHandler extends SimpleChannelInboundHandler<RpcRequest> {private final Map<String, Object> serviceMap = new HashMap<>();public void addService(String serviceName, Object service) {serviceMap.put(serviceName, service);}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) throws Exception {String serviceName = request.getClassName();Object service = serviceMap.get(serviceName);if (service == null) {RpcResponse response = new RpcResponse();response.setRequestId(request.getRequestId());response.setException(new RuntimeException("找不到对应的服务:" + serviceName));ctx.writeAndFlush(response);return;}try {Method method = service.getClass().getMethod(request.getMethodName(),request.getParameterTypes());Object result = method.invoke(service, request.getParameters());RpcResponse response = new RpcResponse();response.setRequestId(request.getRequestId());response.setResult(result);ctx.writeAndFlush(response);} catch (Exception e) {RpcResponse response = new RpcResponse();response.setRequestId(request.getRequestId());response.setException(e);ctx.writeAndFlush(response);}}
}

六、服务消费方实现(simple-rpc-client)

1. 动态代理调用远程服务

我们通过 Java 动态代理来生成远程调用对象:

// RpcProxy.java
public class RpcProxy {private final String host;private final int port;public RpcProxy(String host, int port) {this.host = host;this.port = port;}public <T> T getProxy(Class<T> serviceClass) {return (T) Proxy.newProxyInstance(getClass().getClassLoader(),new Class<?>[]{serviceClass},new InvocationHandler() {@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {RpcRequest request = new RpcRequest();request.setRequestId(UUID.randomUUID().toString());request.setClassName(method.getDeclaringClass().getName());request.setMethodName(method.getName());request.setParameterTypes(method.getParameterTypes());request.setParameters(args);RpcClient client = new RpcClient(host, port);RpcResponse response = client.send(request);if (response.getException() != null) {throw response.getException();}return response.getResult();}});}
}

2. 网络客户端(RpcClient)

// RpcClient.java
public class RpcClient {private final String host;private final int port;public RpcClient(String host, int port) {this.host = host;this.port = port;}public RpcResponse send(RpcRequest request) throws Exception {EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new RpcEncoder()).addLast(new RpcDecoder()).addLast(new RpcClientHandler());}});ChannelFuture future = bootstrap.connect(host, port).sync();future.channel().writeAndFlush(request).sync();RpcClientHandler handler = (RpcClientHandler) future.channel().pipeline().last();RpcResponse response = handler.getResponse();return response;} finally {group.shutdownGracefully();}}
}

3. 客户端处理器(RpcClientHandler)

// RpcClientHandler.java
public class RpcClientHandler extends SimpleChannelInboundHandler<RpcResponse> {private RpcResponse response;public RpcResponse getResponse() {return response;}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) {this.response = msg;}
}

七、通用组件(simple-rpc-common)

1. 协议定义

我们自定义一个简单的 RPC 协议,包含请求和响应结构。

请求体(RpcRequest):
// RpcRequest.java
public class RpcRequest {private String requestId;private String className;private String methodName;private Class<?>[] parameterTypes;private Object[] parameters;// getter/setter
}
响应体(RpcResponse):
// RpcResponse.java
public class RpcResponse {private String requestId;private Object result;private Throwable exception;// getter/setter
}

2. 序列化与反序列化(JSON 示例)

我们可以使用 Jackson 来进行 JSON 序列化。

// JsonUtil.java
public class JsonUtil {private static final ObjectMapper mapper = new ObjectMapper();public static byte[] serialize(Object obj) {try {return mapper.writeValueAsBytes(obj);} catch (Exception e) {throw new RuntimeException(e);}}public static <T> T deserialize(byte[] data, Class<T> clazz) {try {return mapper.readValue(data, clazz);} catch (Exception e) {throw new RuntimeException(e);}}
}

3. 编解码器(Netty Handler)

编码器(RpcEncoder):
// RpcEncoder.java
public class RpcEncoder extends MessageToByteEncoder {@Overrideprotected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) {byte[] data = JsonUtil.serialize(msg);out.writeInt(data.length);out.writeBytes(data);}
}
解码器(RpcDecoder):
// RpcDecoder.java
public class RpcDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {if (in.readableBytes() < 4) {return;}in.markReaderIndex();int length = in.readInt();if (in.readableBytes() < length) {in.resetReaderIndex();return;}byte[] data = new byte[length];in.readBytes(data);try {Object obj = JsonUtil.deserialize(data, RpcRequest.class);out.add(obj);} catch (Exception e) {throw new RuntimeException(e);}}
}

八、测试运行

1. 启动服务端

// 启动服务端
public class ServerMain {public static void main(String[] args) throws Exception {RpcServer server = new RpcServer(8080);HelloService helloService = new HelloServiceImpl();server.addHandler(HelloService.class.getName(), helloService);server.start();}
}

2. 调用远程服务

// 客户端调用
public class ClientMain {public static void main(String[] args) {RpcProxy proxy = new RpcProxy("127.0.0.1", 8080);HelloService service = proxy.getProxy(HelloService.class);String result = service.sayHello("World");System.out.println("服务端返回结果:" + result);}
}

九、总结与优化建议

当前实现的优点:

  • 结构清晰,易于理解和扩展。
  • 使用 Netty 提高通信性能。
  • 支持同步调用,满足基础需求。

可以进一步优化的方向:

  1. 异步调用支持:增加 Future/Promise 机制。
  2. 服务注册中心:引入 Zookeeper、Eureka、Consul 等注册中心。
  3. 负载均衡策略:多个服务实例时选择合适的调用节点。
  4. 异常处理增强:超时重试、熔断机制。
  5. 协议扩展:支持 Protobuf、Thrift 等更高效的序列化格式。
  6. 日志与监控:添加调用链追踪、性能统计等功能。

http://www.lryc.cn/news/590959.html

相关文章:

  • kubeadm 部署 K8S(v1.23.1)集群
  • 直播带货与开源AI智能名片链动2+1模式S2B2C商城小程序:重塑电商营销新格局
  • python 【技术面试题和HR面试题】➕列表操作、条件判断、循环、函数定义编程题
  • 从0开始学习R语言--Day49--Lasso-Cox 回归
  • 十五、K8s可观测能力:日志收集
  • 【41】MFC入门到精通——MFC中 GetLBText()、GetWindowText()、SetWindowText区别
  • PyTorch笔记8----------卷积神经网络
  • 魔术公式轮胎simulink模型建立及参数拟合
  • 【机器学习】第三章 分类算法
  • HANA SQLScript中的变量类型汇总
  • 从现场出发:能源系统中的智能设备与实际落地工具解读
  • ClickHouse 多表 JOIN 时 SELECT * 语法错误解析与解决方案
  • 不同相机CMOS噪点对荧光计算的影响
  • AWS WebRTC:RTP讲解
  • 磁盘分区(D盘分给C盘)
  • 学习笔记(39):结合生活案例,介绍 10 种常见模型
  • IPC进程间通信 interprocess communicate
  • 09-three.js Materials
  • 如何解决pip安装报错ModuleNotFoundError: No module named ‘flask’问题
  • 串口232通讯数据传输丢失的原因、不可靠性及底层原理分析
  • 12.9 Mixtral-8x7B核心技术解密:如何用1/3参数实现4倍推理速度碾压LLaMA2?
  • RabbitMQ概述和工作模式
  • 苍穹外卖项目日记(day11)
  • 优先队列的实现
  • vue中的this.$set
  • Spring Cloud LoadBalancer 详解
  • 理解 PS1/PROMPT 及 macOS iTerm2 + zsh 终端配置优化指南
  • javaScript中数组常用的函数方法
  • 【Java开发日记】我们来说说 LockSupport 的 park 和 unpark
  • python Flask 框架入门