当前位置: 首页 > news >正文

13-netty基础-手写rpc-消费方生成代理-05

 netty系列文章:

01-netty基础-socket
02-netty基础-java四种IO模型
03-netty基础-多路复用select、poll、epoll
04-netty基础-Reactor三种模型
05-netty基础-ByteBuf数据结构
06-netty基础-编码解码
07-netty基础-自定义编解码器
08-netty基础-自定义序列化和反序列化
09-netty基础-手写rpc-原理-01
10-netty基础-手写rpc-定义协议头-02
11-netty基础-手写rpc-支持多序列化协议-03
12-netty基础-手写rpc-编解码-04
13-netty基础-手写rpc-消费方生成代理-05
14-netty基础-手写rpc-提供方(服务端)-06

1 功能逻辑

在客户端启动的时候要为添加了BonnieRemoteReference注解的属性生成一个代理类;代理类的主要功能:在spring容器加载完BeanDefinition之后,在Bean初始化之前,触发生成代理类。
逻辑:

  1. 获取到所有的BeanDefinition
  2. 拿到BeanDefinition对应的class
  3. 遍历class下的所有被BonnieRemoteReference修饰的属性(成员变量)
  4. 为被BonnieRemoteReference修饰的属性,使用BeanDefinitionBuilder构建BeanDefinition,设置interfaceClass、serviceAddress、servicePort属性,并放入到spring容器中,对象的类型为SpringRpcReferenceBean;
  5. SpringRpcReferenceBean实现FactoryBean接口,然后在getObject中返回代理对象。
  6. 编写NettyClient代码

补充:

Spring 的 FactoryBean 是一个工厂 bean 接口,用于自定义 bean 的创建逻辑。它的核心作用是:

  • 当容器获取该 bean 时(如 getBean("xxx")),实际返回的是 getObject() 方法创建的对象,而非 SpringRpcReferenceBean 自身实例。
  • 常用于创建复杂对象(如远程服务代理、数据库连接池等)

2 重点代码介绍

2.1 触发生成代理类入口代码

在spring容器加载BeanDefinition之后,在Bean初始化之前执行,实现接口BeanFactoryPostProcessor接口中postProcessBeanFactory方法即可
 

获取所有的beanDefinitionNames
String[] beanDefinitionNames = beanFactory.getBeanDefinitionNames();

获取beanClassName对应的类信息
Class<?> clazz = ClassUtils.resolveClassName(beanClassName, this.classLoader);

获取clazz上的所有属性(成员变量)
ReflectionUtils.doWithFields(clazz, this::parseRpcReference);

当前这个field是否被BonnieRemoteReference注解修饰
BonnieRemoteReference remoteReference = AnnotationUtils.getAnnotation(field, BonnieRemoteReference.class);

生成SpringRpcReferenceBean的BeanDefinition
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(SpringRpcReferenceBean.class)
放入属性,远程调用中需要的内容,比如是那个类,以及地址端口信息
builder.addPropertyValue("interfaceClass", field.getType());
builder.addPropertyValue("serviceAddress", rpcClientProperties.getServiceAddress());
builder.addPropertyValue("servicePort", rpcClientProperties.getServicePort());
BeanDefinition beanDefinition = builder.getBeanDefinition();
rpcRefBeanDefinitionMap.put(field.getName(), beanDefinition);

放入到spring容器中
registry.registerBeanDefinition(entry.getKey(), entry.getValue());

package com.bonnie.protocol.spring.reference;import com.bonnie.protocol.annotation.BonnieRemoteReference;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils;import java.lang.reflect.Field;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;@Slf4j
public class SpringRpcReferencePostProcessor implements ApplicationContextAware, BeanClassLoaderAware, BeanFactoryPostProcessor {private ApplicationContext applicationContext;private ClassLoader classLoader;//保存发布的引用bean的信息private final Map<String, BeanDefinition> rpcRefBeanDefinitionMap = new ConcurrentHashMap<>();private RpcClientProperties rpcClientProperties;public SpringRpcReferencePostProcessor(RpcClientProperties rpcClientProperties) {this.rpcClientProperties = rpcClientProperties;}/*** 实现postProcessBeanFactory方法,spring容器加载了bean的定义文件之后, 在bean实例化之前执行* 1、将类型的存在的BonnieRemoteReference注解的属性,构造BeanDefinition放在容器中,beanName是类的全限定名, BeanDefinition(类的全限定名,客户端IP,客户端端口号)* @param beanFactory* @throws BeansException*/@Overridepublic void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {// 获取到所有的beanDefinitionString[] beanDefinitionNames = beanFactory.getBeanDefinitionNames();// 遍历for (String beanDefinitionName : beanDefinitionNames) {BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanDefinitionName);String beanClassName = beanDefinition.getBeanClassName();if (Objects.nonNull(beanClassName)) {// 获取到这个类的所有fieldClass<?> clazz = ClassUtils.resolveClassName(beanClassName, this.classLoader);// 该方法遍历class对象中的所有的field属性,并且作为参数传入到parseRpcReference方法中ReflectionUtils.doWithFields(clazz, this::parseRpcReference);}}// 将生成的BeanDefinition放入到容器中BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;Set<Map.Entry<String, BeanDefinition>> entries = this.rpcRefBeanDefinitionMap.entrySet();for (Map.Entry<String, BeanDefinition> entry : entries) {if (applicationContext.containsBean(entry.getKey())) {log.warn("SpringContext already register bean {}", entry.getKey());} else {registry.registerBeanDefinition(entry.getKey(), entry.getValue());log.info("registered RpcReferenceBean {} success", entry.getKey());}}}private void parseRpcReference(Field field) {// 当前这个field是否被BonnieRemoteReference注解修饰BonnieRemoteReference remoteReference = AnnotationUtils.getAnnotation(field, BonnieRemoteReference.class);// BonnieRemoteReference注解修饰if (Objects.nonNull(remoteReference)) {BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(SpringRpcReferenceBean.class);builder.addPropertyValue("interfaceClass", field.getType());builder.addPropertyValue("serviceAddress", rpcClientProperties.getServiceAddress());builder.addPropertyValue("servicePort", rpcClientProperties.getServicePort());BeanDefinition beanDefinition = builder.getBeanDefinition();rpcRefBeanDefinitionMap.put(field.getName(), beanDefinition);}}@Overridepublic void setBeanClassLoader(ClassLoader classLoader) {this.classLoader = classLoader;}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}}

2.2 生成代理类代码

上面会被BonnieRemoteReference修饰的属性(Field)为生成SpringRpcReferenceBean对象,并添加相关的属性。

实现FactoryBean接口,当spring获取SpringRpcReferenceBean对象的时候,调用的就是里面的getObject对象,在getObject里面生成一个代理类,即可代理被BonnieRemoteReference修饰的类。

package com.bonnie.protocol.spring.reference;import lombok.Setter;
import org.springframework.beans.factory.FactoryBean;import java.lang.reflect.Proxy;/*** 创建SpringRpcReferenceBean的代理对象*/
@Setter
public class SpringRpcReferenceBean implements FactoryBean<Object> {private String serviceAddress;private Integer servicePort;private Class<?> interfaceClass;/*** 返回由工厂创建的目标Bean实例* @return* @throws Exception*/@Overridepublic Object getObject() throws Exception {System.out.println("代理类 serviceAddress "+serviceAddress);System.out.println("代理类 servicePort "+servicePort);System.out.println("代理类 interfaceClass "+interfaceClass);// 为BonnieRemoteReference生成一个代理类return Proxy.newProxyInstance(interfaceClass.getClassLoader(),new Class<?>[]{interfaceClass},new RpcInvokerProxy(serviceAddress, servicePort));}/*** 返回目标Bean的类型* @return*/@Overridepublic Class<?> getObjectType() {return this.interfaceClass;}}

2.3 代理类handler

这块主要是在发生rpc调用的时候,组装请求信息,并通过nettyClient向服务端发起连接并且发送请求。

package com.bonnie.protocol.spring.reference;import com.alibaba.fastjson.JSONObject;
import com.bonnie.protocol.core.Header;
import com.bonnie.protocol.core.RpcProtocol;
import com.bonnie.protocol.core.RpcRequest;
import com.bonnie.protocol.core.RpcResponse;
import com.bonnie.protocol.enums.ReqTypeEnum;
import com.bonnie.protocol.enums.RpcConstant;
import com.bonnie.protocol.enums.SerialTypeEnum;
import com.bonnie.protocol.netty.NettyClient;
import io.netty.channel.DefaultEventLoop;
import io.netty.util.concurrent.DefaultPromise;import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;public class RpcInvokerProxy implements InvocationHandler {private String host;private Integer port;public RpcInvokerProxy(String host, Integer port) {this.host = host;this.port = port;}@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {/*** 构建发送的请求报文,首先去创建RequestHold类,在这个类定义一个原子自增的RequestId,* 在一个就是每次请求都会有结果,那么请求id和结果的关系要有一个映射关系*/RpcProtocol<RpcRequest> reqProtocol = new RpcProtocol<>();long requestId = RequestHolder.REQUEST_ID.incrementAndGet();System.out.println("生成的requestId:" + requestId);Header header = new Header();header.setMagic(RpcConstant.MAGIC);header.setSerialType(SerialTypeEnum.JAVA_SERIAL.getCode());header.setReqType(ReqTypeEnum.REQUEST.getCode());header.setRequestId(requestId);header.setLength(0);RpcRequest rpcRequest = new RpcRequest();rpcRequest.setClassName(method.getDeclaringClass().getName());rpcRequest.setMethodName(method.getName());rpcRequest.setParams(args);rpcRequest.setParameterTypes(method.getParameterTypes());reqProtocol.setHeader(header);reqProtocol.setContent(rpcRequest);// 发起远程调用NettyClient nettyClient = new NettyClient(host, port);System.out.println("代理发送到服务端请求内容:" + JSONObject.toJSONString(reqProtocol));// new DefaultEventLoop(),是用来去执行监听器的RpcFuture<RpcResponse> future = new RpcFuture<>(new DefaultPromise<RpcResponse>(new DefaultEventLoop()));// 在发起请求之前,添加映射关系到map中RequestHolder.REQUEST_MAP.put(header.getRequestId(), future);// 客户端发送数据nettyClient.sendRequest(reqProtocol);// 通过promise,异步等待服务端发送数据来,不然就会一直在此等待// get方法得到的是RpcResponse类,然后调用getData方法获取到数据return future.getPromise().get().getData();}
}

2.4 netty客户端代码

这块主要包含创建客户端、向服务端发起连接、发送请求,也会设置前文中自定义编解码、序列化的操作

package com.bonnie.protocol.netty;import com.bonnie.protocol.code.BonnieDecoder;
import com.bonnie.protocol.code.BonnieEncoder;
import com.bonnie.protocol.core.RpcProtocol;
import com.bonnie.protocol.core.RpcRequest;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class NettyClient {private final Bootstrap bootstrap;private final NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();private String serviceAddress;private Integer servicePort;public NettyClient(String serviceAddress, Integer servicePort) {log.info("开始初始化NettyClient======");bootstrap = new Bootstrap();bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {log.info("开始初始化RpcClientInitializer======");ch.pipeline().addLast(new LoggingHandler()).addLast(new BonnieEncoder()).addLast(new BonnieDecoder()).addLast(new RpcClientHandler());}});this.serviceAddress = serviceAddress;this.servicePort = servicePort;}/*** 发送数据* @param protocol* @throws Exception*/public void sendRequest(RpcProtocol<RpcRequest> protocol) {try {System.out.println(this.serviceAddress+ "===="+this.servicePort);final ChannelFuture channelFuture  = bootstrap.connect(this.serviceAddress, this.servicePort).sync();// 注册一个监听器,如果出问题就关闭groupchannelFuture.addListener(listener -> {if (channelFuture.isSuccess()) {log.info("connect rpc server {} success.",this.serviceAddress);} else {log.error("connect rpc server {} failed. ",this.servicePort);channelFuture.cause().printStackTrace();eventLoopGroup.shutdownGracefully();}});log.info("begin transfer data");// 向服务端发送数据channelFuture.channel().writeAndFlush(protocol);} catch (InterruptedException e) {e.printStackTrace();}}}

2.5 netty客户端接收服务端响应数据

package com.bonnie.protocol.netty;import com.alibaba.fastjson.JSONObject;
import com.bonnie.protocol.core.RpcProtocol;
import com.bonnie.protocol.core.RpcResponse;
import com.bonnie.protocol.spring.reference.RequestHolder;
import com.bonnie.protocol.spring.reference.RpcFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class RpcClientHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcResponse>> {/*** 接收服务端响应数据* @param ctx* @param msg* @throws Exception*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcResponse> msg) throws Exception {long requestId = msg.getHeader().getRequestId();log.info("接收服务端响应的结果====== requestId {} {}", requestId, JSONObject.toJSONString(msg));// 删除映射关系RpcFuture<RpcResponse> future = RequestHolder.REQUEST_MAP.remove(requestId);// 我们之前说异步等待服务端发送数据过来,那么只要服务端发送数据过来,就会调用管道RpcClentHandler的read方法// 那么当初future.getPromise().get()如果不再阻塞获取数据呢?就是通过给Promise中的Success设置值,同时会唤醒阻塞的线程// 一当唤醒线程, future.getPromise().get()就会不再阻塞,就获取到服务端返回的数据future.getPromise().setSuccess(msg.getContent());}}

http://www.lryc.cn/news/613326.html

相关文章:

  • 车辆特征与车牌识别准确率↑29%:陌讯多模态融合算法实战解析
  • [spring-cloud: 动态刷新]-源码分析
  • 基于MATLAB实现支持向量机(SVM)分类
  • android 之 Kotlin中Handler的使用
  • 栅栏密码的加密解密原理
  • zookeeper因jute.maxbuffer启动异常问题排查处理
  • 使用 decimal 包解决 go float 浮点数运算失真
  • 可执行文件的生成与加载执行
  • Linux的进程间通信
  • 嵌入式学习硬件(一)ARM体系架构
  • 简单手写Transformer:原理与代码详解
  • Java中的反射机制
  • 土壤盐分传感器与土壤电导率传感器直接的关系
  • 深入理解String类:揭秘Java字符串常量池的优化机制
  • 【2025最新版】火狐浏览器(官方版)安装-附教程
  • 飞算JavaAI深度解析:Java开发者的智能革命
  • AUTOSAR进阶图解==>AUTOSAR_EXP_BSWDistributionGuide
  • 损耗对信号质量的影响
  • Java 八大经典排序算法全解析
  • 数组指针-函数指针-回调函数
  • 人工智能——自动微分
  • Docker容器部署harbor-小白级教学
  • Dlib库是什么?白话,详细介绍版
  • python中用xlrd、xlwt读取和写入Excel中的日期值
  • GIT操作卡顿
  • 机器学习核心算法与实践要素(全篇)
  • java excel转图片常用的几种方法
  • 玳瑁的嵌入式日记D14-0807(C语言)
  • NVIDIA/k8s-device-plugin仓库中GPU无法识别问题的issues分析报告
  • Linux学习记录 DNS