手写RPC框架--7.封装响应
RPC框架-Gitee代码(麻烦点个Starred, 支持一下吧)
RPC框架-GitHub代码(麻烦点个Starred, 支持一下吧)
封装响应
- 封装响应
- a.封装响应
- b.请求id生成器(雪花算法)
- c.抽象序列化
- d.建立序列化工厂
- e.hessian的序列化方式(拓展)
封装响应
a.封装响应
在core模块下的com.dcyrpc
的enumeration
包
在包内创建ResponseCode
枚举:定义响应码枚举
/*** 响应码枚举*/
public enum ResponseCode {SUCCESS((byte) 1, "成功"), FAIL((byte) 2, "失败");private byte code;private String desc;ResponseCode(byte code, String desc) {this.code = code;this.desc = desc;}
}
在core模块下com.dcyrpc
的transprt.message
包下
创建DcyRpcResponse
类:服务提供方回复的响应
/*** 服务提供方回复的响应*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class DcyRpcResponse {// 请求的idprivate long requestId;// 压缩的类型private byte compressType;// 序列化的方式private byte serializeType;// 响应码类型:1 成功,2 异常private byte code;// 具体的消息体private Object body;
}
在core模块channelhandler.handler
包下创建DcyRpcResponseEncoder
类:对调用结果进行编码
/*** 编码器** 4B magic(魔数值) -- Drpc.getBytes()* 1B version(版本) -- 1* 2B header length(首部的长度)* 4B full length(报文的总长度)* 1B serialize (序列化类型的长度)* 1B compress(压缩类型的长度)* 1B code(响应码)* 8B requestId** Object body*/
@Slf4j
public class DcyRpcResponseEncoder extends MessageToByteEncoder<DcyRpcResponse> {@Overrideprotected void encode(ChannelHandlerContext channelHandlerContext, DcyRpcResponse dcyRpcResponse, ByteBuf byteBuf) throws Exception {// 4个字节魔数值byteBuf.writeBytes(MessageFormatConstant.MAGIC);// 1个字节版本号byteBuf.writeByte(MessageFormatConstant.VERSION);// 2个字节的头部的长度byteBuf.writeShort(MessageFormatConstant.HEADER_LENGTH);// 总长度未知,不知道body的长度byteBuf.writerIndex(byteBuf.writerIndex() + MessageFormatConstant.FULL_FIELD_LENGTH);// 响应码byteBuf.writeByte(dcyRpcResponse.getCode());// 序列化类型byteBuf.writeByte(dcyRpcResponse.getSerializeType());// 压缩类型byteBuf.writeByte(dcyRpcResponse.getCompressType());// 8个字节的请求idbyteBuf.writeLong(dcyRpcResponse.getRequestId());// 写入请求体body(requestPayload)byte[] body = getBodyBytes(dcyRpcResponse.getBody());if (body != null) {byteBuf.writeBytes(body);byteBuf.writeInt(MessageFormatConstant.HEADER_LENGTH + body.length);}int bodyLength = body ==null ? 0 : body.length;// 重新处理报文的总长度// 先获取当前的写指针的位置int writerIndex = byteBuf.writerIndex();// 将写指针的位置移动到总长度的位置上byteBuf.writerIndex(MessageFormatConstant.MAGIC.length + MessageFormatConstant.VERSION_LENGTH + MessageFormatConstant.HEADER_FIELD_LENGTH);byteBuf.writeInt(MessageFormatConstant.HEADER_LENGTH + bodyLength);// 将写指针归位byteBuf.writerIndex(writerIndex);}private byte[] getBodyBytes(Object body) {// 心跳请求没有payloadif (body == null) {return null;}// 对象序列化成字节数组try {ByteArrayOutputStream baos = new ByteArrayOutputStream();ObjectOutputStream outputStream = new ObjectOutputStream(baos);outputStream.writeObject(body);return baos.toByteArray();} catch (IOException e) {log.error("序列化时出现异常");throw new RuntimeException(e);}}
}
修改DcyRpcBootstrap
的start()
方法
- 添加
DcyRpcResponseEncoder
响应编码器
// 略....
// 3.配置服务器
serverBootstrap = serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {// TODO 核心内容,需要添加很多入栈和出栈的handlersocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.INFO))// 对报文进行解码.addLast(new DcyRpcRequestDecoder())// 根据请求进行方法调用.addLast(new MethodCallHandler())// 对响应结果进行编码.addLast(new DcyRpcResponseEncoder());}});
// 略....
在core模块channelhandler.handler
包下创建DcyRpcResponseEncoder
类:服务请求方对响应结果进行解码
/*** 解码器*/
@Slf4j
public class DcyRpcResponseDecoder extends LengthFieldBasedFrameDecoder {public DcyRpcResponseDecoder() {// 找到当前报文的总长度,截取报文,截取出来的报文可以进行解析super(// 最大帧的长度,超过这个maxFrameLength值,会直接丢弃MessageFormatConstant.MAX_FRAME_LENGTH,// 长度字段偏移量MessageFormatConstant.MAGIC.length + MessageFormatConstant.VERSION_LENGTH + MessageFormatConstant.HEADER_FIELD_LENGTH,// 长度的字段的长度MessageFormatConstant.FULL_FIELD_LENGTH,// todo 负载的适配长度-(MessageFormatConstant.MAGIC.length + MessageFormatConstant.VERSION_LENGTH + MessageFormatConstant.HEADER_FIELD_LENGTH + MessageFormatConstant.FULL_FIELD_LENGTH),// 跳过的字段0);}@Overrideprotected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {Object decode = super.decode(ctx, in);if (decode instanceof ByteBuf) {ByteBuf byteBuf = (ByteBuf) decode;return decodeFrame(byteBuf);}return null;}private Object decodeFrame(ByteBuf byteBuf) {// 1.解析魔数byte[] magic = new byte[MessageFormatConstant.MAGIC.length];byteBuf.readBytes(magic);// 检测魔数值是否匹配for (int i = 0; i < magic.length; i++) {if (magic[i] != MessageFormatConstant.MAGIC[i]) {throw new RuntimeException("获得的请求类型不匹配");}}// 2.解析版本号byte version = byteBuf.readByte();if (version > MessageFormatConstant.VERSION) {throw new RuntimeException("获得的请求版本不被支持");}// 3.解析头部的长度short headLength = byteBuf.readShort();// 4.解析总长度int fullLength = byteBuf.readInt();// 5.解析响应码byte responseCode = byteBuf.readByte();// 6.解析序列化类型byte serializeType = byteBuf.readByte();// 7.解析压缩型byte compressType = byteBuf.readByte();// 8.解析请求Idlong requestId = byteBuf.readLong();DcyRpcResponse dcyRpcResponse = new DcyRpcResponse();dcyRpcResponse.setCode(responseCode);dcyRpcResponse.setCompressType(compressType);dcyRpcResponse.setSerializeType(serializeType);dcyRpcResponse.setRequestId(requestId);// 9.解析消息体payloadint bodyLength = fullLength - headLength;byte[] body = new byte[bodyLength];byteBuf.readBytes(body);// 解压缩和反序列化// todo 解压缩// 反序列化try (ByteArrayInputStream bis = new ByteArrayInputStream(body);ObjectInputStream ois = new ObjectInputStream(bis)) {Object object = ois.readObject();dcyRpcResponse.setBody(object);} catch (IOException | ClassNotFoundException e) {log.error("请求【{}】反序列化时出现异常", requestId, e);throw new RuntimeException(e);}return dcyRpcResponse;}
}
修改channelHandler
包下的ConsumerChannelInitializer
类:添加入站的解码器 DcyRpcResponseDecoder()
public class ConsumerChannelInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline()// netty自带的日志处理器.addLast(new LoggingHandler(LogLevel.INFO))// 消息编码器.addLast(new DcyRpcRequestEncoder())// 入站的解码器.addLast(new DcyRpcResponseDecoder())// 处理结果.addLast(new MySimpleChannelInboundHandler());}
}
修改channelhandler.handler
包下的MySimpleChannelInboundHandler
类:将响应结果的ByteBuf改成DcyRpcResponse
/*** 处理响应结果*/
public class MySimpleChannelInboundHandler extends SimpleChannelInboundHandler<DcyRpcResponse> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, DcyRpcResponse dcyRpcResponse) throws Exception {// 异步// 服务提供方,给予的结果Object returnValue = dcyRpcResponse.getBody();// 从全局的挂起的请求中,寻找与之匹配的待处理 completeFutureCompletableFuture<Object> completableFuture = DcyRpcBootstrap.PENDING_REQUEST.get(1L);completableFuture.complete(returnValue);}
}
在core模块channelhandler.handler
包的MethodCallHandler
类:封装响应结果
// 略...
// 3.封装响应
DcyRpcResponse dcyRpcResponse = DcyRpcResponse.builder().code(ResponseCode.SUCCESS.getCode()).requestId(dcyRpcRequest.getRequestId()).compressType(dcyRpcRequest.getCompressType()).serializeType(dcyRpcRequest.getSerializeType()).body(result).build();// 4.写出响应
channelHandlerContext.channel().writeAndFlush(dcyRpcResponse);// 略...
b.请求id生成器(雪花算法)
在当前项目中,我们需要给请求一个唯一标识,用来标识一个请求和响应的关联关系,我们要求请求的id必须唯一,且不能占用过大的空间,可用的方案如下:
-
1.自增id,单机的自增id不能解决不重复的问题,微服务情况下我们需要一个稳定的发号服务才能保证,但是这样做性能偏低。
-
2.uuid,将uuid作为唯一标识占用空间太大
-
3.雪花算法,最优解。
雪花算法(snowflake)最早是twitter内部使用分布式环境下的唯一ID生成算法,他使用64位long类型的数据存储
id,具体如下:
0 - 0000000000 0000000000 0000000000 0000000000 0 - 0000000000 - 000000000000
符号位 时间戳 机器码 序列号
最高位表示符号位,其中0代表整数,1代表负数,而id一般都是正数,所以最高位为0。
通过雪花算法实现 – (世界上没有一片雪花是一样的) 5+5+42+12=64位
- 机房号(数据中心)5bit
- 机器号 5bit
- 时间戳(long)原本64位表示的时间,必须减少到42位,可以自由选择一个时间。如: 公司的成立日期
- 序列化 12bit:同一个机房的同一个机器号的同一个时间可能因并发量需要很多个Id
时间戳 (42) 机房号 (5) 机器号 (5) 序列号 (12)
101010101010101010101010101010101010101011 10101 10101 101011010101
在common块下的com.dcy
包下创建IdGenerator
类
/*** 请求id的生成器:雪花算法*/
public class IdGenerator {// 起始时间戳private static final long START_STAMP = DateUtil.get("2022-1-1").getTime();// 机房号public static final long DATA_CENTER_BIT = 5L;// 机器号public static final long MACHINE_BIT = 5L;// 序列化号public static final long SEQUENCE_BIT = 5L;// 机房号的最大值public static final long DATA_CENTER_MAX = ~(-1L << DATA_CENTER_BIT);// 机器号的最大值public static final long MACHINE_MAX = ~(-1L << MACHINE_BIT);// 序列号的最大值public static final long SEQUENCE_MAX = ~(-1L << SEQUENCE_BIT);// 时间戳需要左移的位数public static final long TIMESTAMP_LEFT = DATA_CENTER_BIT + MACHINE_BIT + SEQUENCE_BIT;// 机房号需要左移的位数public static final long DATA_CENTER_LEFT = MACHINE_BIT + SEQUENCE_BIT;// 机器号需要左移的位数public static final long MACHINE_LEFT = SEQUENCE_BIT;private long dataCenterId;private long machineId;private LongAdder sequenceId = new LongAdder();private long lastTimeStamp = -1;public IdGenerator(long dataCenterId, long machineId) {//参数是否合法if (dataCenterId > DATA_CENTER_MAX || machineId > MACHINE_MAX) {throw new IllegalArgumentException("传入的数据中心编号和机器编号不合法");}this.dataCenterId = dataCenterId;this.machineId = machineId;}public long getId() {// 1.处理时间戳的问题long currentTime = System.currentTimeMillis();long timeStamp = currentTime - START_STAMP;// 2.判断时钟回拨if (timeStamp < lastTimeStamp) {throw new RuntimeException("服务器进行了时钟回调");}// 3.对sequenceId做一些处理:如果是同一个时间节点,必须自增if (timeStamp == lastTimeStamp) {sequenceId.increment();if (sequenceId.sum() >= SEQUENCE_MAX) {timeStamp = getNextTimeStamp();sequenceId.reset();}} else {sequenceId.reset();}// 执行结束将时间戳赋值给lastTimeStamplastTimeStamp = timeStamp;long sequence = sequenceId.sum();return timeStamp << TIMESTAMP_LEFT | dataCenterId << DATA_CENTER_LEFT | machineId << MACHINE_LEFT | sequence;}private long getNextTimeStamp() {// 获取当前的时间戳long current = System.currentTimeMillis() - START_STAMP;// 如果一样就一直循环,直到下一个时间戳while (current == lastTimeStamp) {current = System.currentTimeMillis() - START_STAMP;}return current;}
}
在common块下的com.dcy
包下创建DateUtil
类:用于时间日期相关的工具类
/*** 时间日期相关的工具类*/
public class DateUtil {public static Date get(String pattern) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");try {return sdf.parse(pattern);} catch (ParseException e) {throw new RuntimeException(e);}}
}
在DcyRpcBootstrap
类中添加IdGenerator
// 略...
private int port = 8088;public static final IdGenerator ID_GENERATOR = new IdGenerator(1, 2);
// 略...
在RpcConsumerInvocationHandler
类的封装报文位置,将requestId的值设置为通过id生成器获取
// 略...
DcyRpcRequest dcyRpcRequest = DcyRpcRequest.builder().requestId(DcyRpcBootstrap.ID_GENERATOR.getId()).compressType((byte) 1).serializeType((byte) 1).requestType(RequestType.REQUEST.getId()).requestPayload(requestPayload).build();
c.抽象序列化
在core模块下创建serialize
包
在该包下创建Serializer
接口:序列化器
public interface Serializer {/*** 序列化* @param object 待序列化的对象实例* @return 字节数组*/byte[] serializer(Object object);/*** 反序列化* @param bytes 待反序列化的字节数组* @param clazz 目标类的class对象* @return 目标实例* @param <T> 目标类泛型*/<T> T deserialize(byte[] bytes, Class<T> clazz);
}
在serialize
包下创建impl
包,创建JdkSerializer
类,实现Serializer
接口:jdk序列化器
@Slf4j
public class JdkSerializer implements Serializer{@Overridepublic byte[] serializer(Object object) {// 心跳请求没有payloadif (object == null) {return null;}// 对象序列化成字节数组try (ByteArrayOutputStream baos = new ByteArrayOutputStream();ObjectOutputStream outputStream = new ObjectOutputStream(baos)) {outputStream.writeObject(object);return baos.toByteArray();} catch (IOException e) {log.error("序列化对象【{}】时出现异常", object);throw new SerializeException(e);}}@Overridepublic <T> T deserialize(byte[] bytes, Class<T> clazz) {if (bytes == null || clazz == null) {return null;}// 字节数组转成对象序列化try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);ObjectInputStream objectInputStream = new ObjectInputStream(bais)) {return (T) objectInputStream.readObject();} catch (IOException | ClassNotFoundException e) {log.error("反序列化对象【{}】时出现异常", clazz);throw new SerializeException(e);}}
}
在common的exceptions
中创建SerializeException
类:序列化异常处理
public class SerializeException extends RuntimeException{public SerializeException() {super();}public SerializeException(String message) {super(message);}public SerializeException(Throwable cause) {super(cause);}
}
d.建立序列化工厂
在consumer
包下的Application
启动类中添加序列化方法
// 略...
DcyRpcBootstrap.getInstance().application("first-dcyrpc-consumer").registry(new RegistryConfig("zookeeper://127.0.0.1:2181")).serialize("jdk").reference(reference);
// 略...
在DcyRpcBootstrap
类中,添加序列化配置项和方法
// 略...
public static String SERIALIZE_TYPE = "jdk";
// 略...
/*** 配置序列化的方式* @param serializeType* @return*/
public DcyRpcBootstrap serialize(String serializeType) {SERIALIZE_TYPE = serializeType;return this;
}
在core模块下的serialize
包下创建SerializerWrapper
类:序列化包装类
@NoArgsConstructor
@AllArgsConstructor
@Data
public class SerializerWrapper {private byte code;private String type;private Serializer serializer;
}
在core模块下的serialize
包下创建SerializerFactory
类:序列化工厂类
/*** 序列化工厂类*/
public class SerializerFactory {private final static Map<String, SerializerWrapper> SERIALIZER_CACHE = new ConcurrentHashMap<>(8);private final static Map<Byte, SerializerWrapper> SERIALIZER_CACHE_CODE = new ConcurrentHashMap<>(8);static {SerializerWrapper jdk = new SerializerWrapper((byte) 1, "jdk", new JdkSerializer());SerializerWrapper json = new SerializerWrapper((byte) 2, "json", new JsonSerializer());SERIALIZER_CACHE.put("jdk", jdk);SERIALIZER_CACHE.put("json", json);SERIALIZER_CACHE_CODE.put((byte) 1, jdk);SERIALIZER_CACHE_CODE.put((byte) 2, json);}/*** 使用工厂方法获取一个SerializerWrapper* @param serializeType 序列化的类型* @return*/public static SerializerWrapper getSerializer(String serializeType) {return SERIALIZER_CACHE.get(serializeType);}public static SerializerWrapper getSerializer(byte serializeCode) {return SERIALIZER_CACHE_CODE.get(serializeCode);}
}
修改DcyRpcRequestEncoder
类,在请求类,对请求添加有关序列化的代码
//略...
// 8个字节的请求id
byteBuf.writeLong(dcyRpcRequest.getRequestId());// 写入请求体body(requestPayload)
// 1.根据配置的序列化方式进行序列化
Serializer serializer = SerializerFactory.getSerializer(dcyRpcRequest.getSerializeType()).getSerializer();
byte[] body = serializer.serializer(dcyRpcRequest.getRequestPayload());// 2.根据配置的压缩方式进行压缩if (body != null) {byteBuf.writeBytes(body);byteBuf.writeInt(MessageFormatConstant.HEADER_LENGTH + body.length);
}
//略...
修改RpcConsumerInvocationHandler
类:修改填写序列化器的代码
// 略...
DcyRpcRequest dcyRpcRequest = DcyRpcRequest.builder().requestId(DcyRpcBootstrap.ID_GENERATOR.getId()).compressType((byte) 1).serializeType(SerializerFactory.getSerializer(DcyRpcBootstrap.SERIALIZE_TYPE).getCode()).requestType(RequestType.REQUEST.getId()).requestPayload(requestPayload).build();
// 略...
修改DcyRpcRequestDecoder
类,在响应类,对请求添加反序列化的代码
// 略...
// 9.解析消息体payload
int payloadLength = fullLength - headLength;
byte[] payload = new byte[payloadLength];
byteBuf.readBytes(payload);// 解压缩和反序列化
// todo 解压缩// 反序列化
Serializer serializer = SerializerFactory.getSerializer(serializeType).getSerializer();
RequestPayload requestPayload = serializer.deserialize(payload, RequestPayload.class);dcyRpcRequest.setRequestPayload(requestPayload);return dcyRpcRequest;
修改DcyRpcResponseEncoder
类:在响应类,对响应添加序列化的代码
// 略...
// 8个字节的请求id
byteBuf.writeLong(dcyRpcResponse.getRequestId());// 写入请求体body(requestPayload)
// 对响应做序列化器
Serializer serializer = SerializerFactory.getSerializer(dcyRpcResponse.getSerializeType()).getSerializer();
byte[] body = serializer.serializer(dcyRpcResponse.getBody());if (body != null) {byteBuf.writeBytes(body);byteBuf.writeInt(MessageFormatConstant.HEADER_LENGTH + body.length);
}
// 略...
修改DcyRpcResponseDecoder
类,在请求类,对响应添加反序列化的代码
// 略...
// 9.解析消息体payload
int payloadLength = fullLength - headLength;
byte[] payload = new byte[payloadLength];
byteBuf.readBytes(payload);// 解压缩和反序列化
// todo 解压缩Serializer serializer = SerializerFactory.getSerializer(dcyRpcResponse.getSerializeType()).getSerializer();
Object body = serializer.deserialize(payload, Object.class);dcyRpcResponse.setBody(body);return dcyRpcResponse;
// 略...
e.hessian的序列化方式(拓展)
Hessian序列化是一种支持动态类型、跨语言、基于对象传输的网络协议,Java对象序列化的二进制流可以被其他语言(如,c++,python)。特性如下:
- 1.自描述序列化类型。不依赖外部描述文件或者接口定义,用一个字节表示常用的基础类型,极大缩短二进制流。
- 2.语言无关,支持脚本语言
- 3.协议简单,比Java原生序列化高效
- 4.相比hessian1,hessian2中增加了压缩编码,其序列化二进制流大小是Java序列化的50%,序列化耗时是Java序列化的30%,反序列化耗时是Java序列化的20%
序列化操作:
- 1.new一个Hessian2Output,传入一个OutputStream
- 2.writeObject(),传入具体要序列化的对象
- 3.flush()
反序列化操作:
- 1.new一个Hessian2Output,传入一个InputStream
- 2.readObject()
/*** hessian序列化器*/
@Slf4j
public class HessianSerializer implements Serializer {@Overridepublic byte[] serializer(Object object) {// 心跳请求没有payloadif (object == null) {return null;}// 对象序列化成字节数组try (ByteArrayOutputStream baos = new ByteArrayOutputStream();) {Hessian2Output hessian2Output = new Hessian2Output(baos);hessian2Output.writeObject(object);hessian2Output.flush();log.info("对象使用hessian【{}】完成了序列化", object);return baos.toByteArray();} catch (IOException e) {log.error("使用hessian序列化对象【{}】时出现异常", object);throw new SerializeException(e);}}@Overridepublic <T> T deserialize(byte[] bytes, Class<T> clazz) {if (bytes == null || clazz == null) {return null;}// 字节数组转成对象序列化try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);) {HessianInput hessianInput = new HessianInput(bais);T t = (T) hessianInput.readObject();log.info("对象使用hessian【{}】完成了反序列化", clazz);return t;} catch (IOException e) {log.error("使用hessian反序列化对象【{}】时出现异常", clazz);throw new SerializeException(e);}}
}