Dubbo 3.x源码(31)—Dubbo消息的编码解码
上文我们学习了,Dubbo 发起服务调用的源码。现在我们来学习一下Dubbo消息的编码解码。
Dubbo 3.x服务调用源码:
- Dubbo 3.x源码(29)—Dubbo Consumer服务调用源码(1)服务调用入口
- Dubbo 3.x源码(30)—Dubbo Consumer服务调用源码(2)发起远程调用
- Dubbo 3.x源码(31)—Dubbo消息的编码解码
Dubbo 3.x服务引用源码:
- Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
- Dubbo 3.x源码(18)—Dubbo服务引用源码(1)
- Dubbo 3.x源码(19)—Dubbo服务引用源码(2)
- Dubbo 3.x源码(20)—Dubbo服务引用源码(3)
- Dubbo 3.x源码(21)—Dubbo服务引用源码(4)
- Dubbo 3.x源码(22)—Dubbo服务引用源码(5)服务引用bean的获取以及懒加载原理
- Dubbo 3.x源码(23)—Dubbo服务引用源码(6)MigrationRuleListener迁移规则监听器
- Dubbo 3.x源码(24)—Dubbo服务引用源码(7)接口级服务发现订阅refreshInterfaceInvoker
- Dubbo 3.x源码(25)—Dubbo服务引用源码(8)notify订阅服务通知更新
- Dubbo 3.x源码(26)—Dubbo服务引用源码(9)应用级服务发现订阅refreshServiceDiscoveryInvoker
- Dubbo 3.x源码(27)—Dubbo服务引用源码(10)subscribeURLs订阅应用级服务url
Dubbo 3.x服务发布源码:
- Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
- Dubbo 3.x源码(12)—Dubbo服务发布导出源码(1)
- Dubbo 3.x源码(13)—Dubbo服务发布导出源码(2)
- Dubbo 3.x源码(14)—Dubbo服务发布导出源码(3)
- Dubbo 3.x源码(15)—Dubbo服务发布导出源码(4)
- Dubbo 3.x源码(16)—Dubbo服务发布导出源码(5)
- Dubbo 3.x源码(17)—Dubbo服务发布导出源码(6)
- Dubbo 3.x源码(28)—Dubbo服务发布导出源码(7)应用级服务接口元数据发布
目前版本Dubbo底层默认使用netty4作为网络通信框架。在发送消息时,在最后一步涉及到消息的编码或者说序列化,例如consumer发送请求以及provider返回响应。而在接收消息时,第一步就是需要进行消息的解码或者说反序列化,例如provider接收请求以及consumer接收响应。
下面我们仅仅讲解consumer发送请求以及provider接收请求的情况,两外两种情况可以对照着理解。
文章目录
- 编码解码的协议
- Dubbo如何解决TCP粘包和拆包的问题
- InternalEncoder#encode编码
- DubboCountCodec#encode编码
- ExchangeCodec#encode编码
- ExchangeCodec#encodeRequest编码请求
- DubboCodec#encodeRequestData编码请求数据
- InternalDecoder#decode解码
- DubboCountCodec#decode解码
- ExchangeCodec#decode解码
- DubboCodec#decodeBody解码请求体
- 总结
编码解码的协议
在NettyClient的Pipeline中,NettyCodecAdapter的内部类InternalEncoder用于请求的编码,内部类InternalDecoder用于请求的解码。
Dubbo的编码默认采用hessian2 协议,完整的编码的数据格式如下:
主要包括两个部分,定长的header部分,变长的Data(body)部分,header部分保存了请求的基本信息,包括Data的长度等,Data部分保存了具体的请求体数据,长度不固定。
编码的规则如下,参考官方文档:https://dubbo.apache.org/zh/docs3-v2/java-sdk/reference-manual/protocol/overview/#protocol-spec
Bit位 | 字段 | 长度 | 说明 |
---|---|---|---|
0~15 | Magic Number | 16bit | 魔数,Dubbo协议固定采用0xdabb表示 |
16 | Req/Res | 1bit | 标识这是一个请求或响应。请求- 1;响应- 0。 |
17 | 2Way | 1bit | 只有当Req/Res标志为1 (Request)时才有用。如果需要从服务器返回值,则设置为1。 |
18 | Event | 1bit | 标识事件消息,如心跳事件。如果这是一个事件,则设置为1。 |
19~23 | Serialization ID | 5bit | 识别序列化类型:fastjson的值是6。 |
24~31 | Status | 16bit | 只有当Req/Res标志为0 (Response)时才有用,标识响应状态。 |
32~95 | Request ID | 64bit | 请求唯一标识,即创建Request时生成的mid |
96~127 | Data Length | 32bit | 序列化后内容(可变部分)的长度,按字节计数,即Body的长度。 |
变长部分 | Body | 对象序列化的byte[],具有特定的序列化类型,由序列化ID标识 |
Dubbo如何解决TCP粘包和拆包的问题
具体的请求体长度是不固定的,但是Dubbo Header中记录有请求体的长度,因此接收端可以根据Data Length解决TCP粘包和拆包的问题。
- 如果读取数据不足16Byte,说明连一个完整的Header都没有接收到,此时会返回NEED_MORE_INPUT(内部状态码),表示本次请求的数据不再处理,等待对端发送更多的数据之后再重新读取数据。
- 当读取到一个完整的Header,会解析内部的Data Length字段,该字段表示后面的请求体的大小,基于此大小判断当前请求的数据包是否完整,及判断(本次可读的数据长度)是否大于(请求体+header的长度),如果不完整则同样会返回NEED_MORE_INPUT,本次请求的数据不再处理,等待对端传输更多的数据之后再重新读取数据。
- 如果数据包完整,则解析完整的消息进行后续的请求处理。
InternalEncoder#encode编码
InternalEncoder 继承了netty编码类MessageToByteEncoder,属于ChannelOutboundHandler,InternalEncoder#encode方法是实现消息编码的入口。
InternalEncoder#encode方法内部委托DubboCountCodec#encode方法实现编码。
private class InternalEncoder extends MessageToByteEncoder {@Overrideprotected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {//创建一个buffer包装ByteBufChannelBuffer buffer = new NettyBackedChannelBuffer(out);Channel ch = ctx.channel();//NioSocketChannel//基于NioSocketChannel获取对应的NettyChannelNettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);//委托DubboCountCodec#encode方法实现编码codec.encode(channel, buffer, msg);}
}
DubboCountCodec#encode编码
DubboCountCodec#encode方法内部委托DubboCodec#encode方法实现编码。
/*** DubboCountCodec的方法* <p>* 请求编码** @param channel NettyChannel* @param buffer NettyBackedChannelBuffer* @param msg Request*/
@Override
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {//委托DubboCodec#encode方法实现编码codec.encode(channel, buffer, msg);
}
ExchangeCodec#encode编码
DubboCodec#encode方法是由父类ExchangeCodec实现的。ExchangeCodec#encode方法定义了发送的消息编码的骨架方法。
- 如果请求属于Request,那么调用encodeRequest方法对 Request 对象进行编码。这通常表示对当前端主动发起的请求的消息的编码。
- 如果请求属于Response,那么调用encodeResponse方法对 Response对象进行编码。这通常表示对当前端接收到的请求的响应消息的编码。
/*** ExchangeCodec的方法* <p>* 请求编码** @param channel NettyChannel* @param buffer NettyBackedChannelBuffer* @param msg Request*/
@Override
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {if (msg instanceof Request) {// 对 Request 对象进行编码encodeRequest(channel, buffer, (Request) msg);} else if (msg instanceof Response) {// 对 Response 对象进行编码encodeResponse(channel, buffer, (Response) msg);} else {super.encode(channel, buffer, msg);}
}
ExchangeCodec#encodeRequest编码请求
该方法对 Request 对象进行编码,即当前端主动发起的请求的消息的编码。
前面我们介绍了Dubbo默认编码解码的协议组成,包括定长的header,和变长的data,这里实际上就是对该协议的编码的实现,对照着上面的协议详情看即可。
从该方法可知,默认编码协议为Hessian2Serialization,即hessian2 协议。整个Request使用ExchangeCodec编码,而Reuqest Body(Request.mData)则使用DubboCodec编码。
/*** ExchangeCodec的方法* <p>* 请求消息的编码,序列化** @param channel NettyChannel* @param buffer NettyBackedChannelBuffer* @param req Request*/
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {//获取序列化协议,默认Hessian2Serialization,即hessian2 协议Serialization serialization = getSerialization(channel, req);// header./** 协议头16字节*/byte[] header = new byte[HEADER_LENGTH];// set magic number.//魔数,固定0xdabb,16bit,2byteBytes.short2bytes(MAGIC, header);// set request and serialization flag.//设置请求和序列化标志。1byte//Req/Res,1bit,Serialization ID,5bitheader[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());//双向请求标记,1bitif (req.isTwoWay()) {header[2] |= FLAG_TWOWAY;}//是否是事件消息,例如心跳事件,1bitif (req.isEvent()) {header[2] |= FLAG_EVENT;}//对于request,第4个byte位置置空// set request id.//请求唯一Id,64bit,8byte,从第5个字节位置开始Bytes.long2bytes(req.getId(), header, 4);// encode request data./** 编码请求体*///返回缓冲区的writerIndex。即其实写入索引int savedWriteIndex = buffer.writerIndex();//更新 writerIndex,为消息头预留 16 个字节的空间buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);//输出流ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);if (req.isHeartbeat()) {// heartbeat request data is always null//心跳请求的数据总是空的bos.write(CodecSupport.getNullBytesOf(serialization));} else {//即序列化协议获取data的序列化器,默认Hessian2ObjectOutputObjectOutput out = serialization.serialize(channel.getUrl(), bos);if (req.isEvent()) {//编码事件数据encodeEventData(channel, out, req.getData());} else {/** 通过DubboCodec编码请求数据,即RpcInvocation,即Request.mData*/encodeRequestData(channel, out, req.getData(), req.getVersion());}out.flushBuffer();//重置所有计数器和引用if (out instanceof Cleanable) {((Cleanable) out).cleanup();}}bos.flush();bos.close();//获取写入的请求体的字节数int len = bos.writtenBytes();//载荷校验,即请求体大小是否过大,默认最大8388608 Byte,大概8MBcheckPayload(channel, len);//写入请求数据的长度,32bit,4byte,从第13个字节位置开始Bytes.int2bytes(len, header, 12);// write//写入请求头buffer.writerIndex(savedWriteIndex);buffer.writeBytes(header); // write header.buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}
DubboCodec#encodeRequestData编码请求数据
该方法专门对于Request请求的请求体进行编码,也就是对RpcInvocation进行编码,即Request.mData。
RpcInvocation中的信息并不会被全部编码,而是编码某些重要信息,按照顺序包括:协议版本号、服务名(服务接口全路径名)、调用服务版本号、接口名、方法参数类型数组、请求传递的方法参数数组、attachments(附加信息)。
/*** DubboCodec的方法* <p>* 编码请求数据(请求体)** @param channel NettyChannel* @param out 一个序列化实现实例,默认Hessian2ObjectOutput* @param data 序列化的数据,RpcInvocation* @param version 协议版本号,Dubbo RPC protocol version*/
@Override
protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {RpcInvocation inv = (RpcInvocation) data;//Dubbo协议版本号 Dubbo RPC protocol versionout.writeUTF(version);// https://github.com/apache/dubbo/issues/6138//服务名,服务接口全路径名String serviceName = inv.getAttachment(INTERFACE_KEY);if (serviceName == null) {serviceName = inv.getAttachment(PATH_KEY);}out.writeUTF(serviceName);//调用服务版本号out.writeUTF(inv.getAttachment(VERSION_KEY));//接口名out.writeUTF(inv.getMethodName());//方法参数类型数组out.writeUTF(inv.getParameterTypesDesc());//方法参数数组Object[] args = inv.getArguments();if (args != null) {for (int i = 0; i < args.length; i++) {//请求传递的方法参数out.writeObject(callbackServiceCodec.encodeInvocationArgument(channel, inv, i));}}//attachments,附加信息out.writeAttachments(inv.getObjectAttachments());
}
InternalDecoder#decode解码
InternalDecoder继承了netty编码类ByteToMessageDecoder,属于ChannelInboundHandler,InternalDecoder#decode方法是实现消息解码的入口。
InternalDecoder#decode方法不断循环的从缓冲区中读取数据并且委托DubboCountCodec#decode方法实现解码。解码的结果将会加入一个list集合并且传递给NettyServerHandler#channelRead方法进行后续业务处理。
该方法会判断数据完整性,如果数据不完整,即发生了拆包,此时需要读取更多的数据,那么本次读取的数据废弃,等待下一次请求到来时在从本次读取的起始位置读取完整的数据。
private class InternalDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {ChannelBuffer message = new NettyBackedChannelBuffer(input);//NettyChannelNettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);// decode object.//循环读取缓冲区数据do {//返回该缓冲区的readerIndexint saveReaderIndex = message.readerIndex();//委托DubboCountCodec#decode方法实现解码Object msg = codec.decode(channel, message);//如果数据不完整,需要读取更多的数据,即发生了拆包if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {//重设readerIndex,在下一次还是从此处读取message.readerIndex(saveReaderIndex);//跳出循环,等待下一个数据包的到来break;} else {//is it possible to go here ?//没读取到任何data数据if (saveReaderIndex == message.readerIndex()) {throw new IOException("Decode without read data.");}if (msg != null) {out.add(msg);}}//如果缓冲区数据可读,那么循环读取} while (message.readable());}
}
DubboCountCodec#decode解码
DubboCountCodec#decode方法主要是新增了多条消息解码的能力,内部委托DubboCodec#decode方法实现解码。
/*** DubboCountCodec的方法* <p>* 请求解码** @param channel NettyChannel* @param buffer NettyBackedChannelBuffer*/
@Override
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {//readerIndexint save = buffer.readerIndex();//创建一个MultiMessage,多消息对象MultiMessage result = MultiMessage.create();do {//委托DubboCodec#decode方法实现解码Object obj = codec.decode(channel, buffer);//如果数据不完整,需要读取更多的数据,即发生了拆包if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {//重设readerIndex,在下一次还是从此处读取buffer.readerIndex(save);//跳出循环,等待下一个数据包的到来break;} else {//解码结果保存result.addMessage(obj);logMessageLength(obj, buffer.readerIndex() - save);save = buffer.readerIndex();}} while (true);if (result.isEmpty()) {return Codec2.DecodeResult.NEED_MORE_INPUT;}//如果只有一条消息,那么直接返回该消息if (result.size() == 1) {return result.get(0);}//返回多条消息return result;
}
ExchangeCodec#decode解码
DubboCodec#decode方法是由父类ExchangeCodec实现的。
- 该方法会首先会获取本次缓冲区的所有可读的数据长度,如果小于header长度(16),那么表示当前数据包中的数据不是完整的一次请求的数据,发生了拆包,那么本次不再处理,返回DecodeResult.NEED_MORE_INPUT。
- 否则会从header中获取请求data的长度,并且计算出正常的请求数据包长度tt = 记录的data长度+ header长度。
- 如果本次缓冲区的所有可读的数据长度小于正常的数据包长度tt,那么表示发生了拆包,那么本次不再处理,返回DecodeResult.NEED_MORE_INPUT。
- 调用decodeBody方法解码并返回请求体。该方法由子类DubboCodec重写。
/*** ExchangeCodec的方法* <p>* 解码** @param channel NettyChannel* @param buffer NettyBackedChannelBuffer*/
@Override
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {//获取缓冲区所有可读的字节数int readable = buffer.readableBytes();//存放header数据的数组byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];//尝试读取header,可能不完整buffer.readBytes(header);//继续解码return decode(channel, buffer, readable, header);
}/*** ExchangeCodec的方法* <p>* 解码** @param channel NettyChannel* @param buffer NettyBackedChannelBuffer* @param readable 可读的数据长度* @param header 头部数据*/
@Override
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {// check magic number.//校验header中的MAGIC魔数if (readable > 0 && header[0] != MAGIC_HIGH|| readable > 1 && header[1] != MAGIC_LOW) {//如果魔数校验不通过int length = header.length;if (header.length < readable) {header = Bytes.copyOf(header, readable);buffer.readBytes(header, length, readable - length);}for (int i = 1; i < header.length - 1; i++) {if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {buffer.readerIndex(buffer.readerIndex() - header.length + i);header = Bytes.copyOf(header, i);break;}}return super.decode(channel, buffer, readable, header);}// check length.//校验数据长度是否小于header长度,即小于16字节//如果小于则表示当前数据包中的数据不是完整的一次请求的数据,发生了拆包,那么本次不再处理if (readable < HEADER_LENGTH) {return DecodeResult.NEED_MORE_INPUT;}// get data length.//从header中获取请求data的长度int len = Bytes.bytes2int(header, 12);// When receiving response, how to exceed the length, then directly construct a response to the client.// see more detail from https://github.com/apache/dubbo/issues/7021.//如果这是一个收到的响应,那么判断如果长度超过最大长度,则直接构造一个Response返回给客户端Object obj = finishRespWhenOverPayload(channel, len, header);if (null != obj) {return obj;}//正常的请求数据包长度int tt = len + HEADER_LENGTH;//如果可读的数据长度小于数据包长度,那么表示发生了拆包,那么本次不再处理if (readable < tt) {return DecodeResult.NEED_MORE_INPUT;}// limit input stream.ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);try {/** 解码请求体*/return decodeBody(channel, is, header);} finally {if (is.available() > 0) {try {if (logger.isWarnEnabled()) {logger.warn("Skip input stream " + is.available());}StreamUtils.skipUnusedStream(is);} catch (IOException e) {logger.warn(e.getMessage(), e);}}}
}
DubboCodec#decodeBody解码请求体
该方法对于到达的消息进行解码,对于Response消息会返回一个Response,而Request消息会返回一个Request,对于心跳请求则响应data总是null。
对于Request消息的data(请求体),会判断参数是否使用当前IO线程直接反序列化请求体,默认false。如果为true,那么直接利用当前IO线程反序列化data,将可以解码出调用方法名、调用方法参数、attachments等信息解码出来。否则,返回一个DecodeableRpcInvocation,内部的Data没有真正的反序列化。
可通过url参数decode.in.io来设置是否使用当前IO线程直接反序列化请求体,默认false,即默认情况下,Dubbo 对于请求体的反序列化是在业务线程上执行的。
实际上,Request消息的data(请求体)最终会在DecodeHandler#received方法中被真正的解码,此时处于业务线程中。
/*** DubboCodec的方法* <p>* 解码数据data** @param channel NettyChannel* @param is buffer* @param header 头部数据*/
@Override
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);// get request id. 获取请求idlong id = Bytes.bytes2long(header, 4);//判断是Response - 0还是Request - 1的解码if ((flag & FLAG_REQUEST) == 0) {//解码response// decode response.Response res = new Response(id);if ((flag & FLAG_EVENT) != 0) {res.setEvent(true);}// get status.byte status = header[3];res.setStatus(status);try {if (status == Response.OK) {Object data;if (res.isEvent()) {byte[] eventPayload = CodecSupport.getPayload(is);if (CodecSupport.isHeartBeat(eventPayload, proto)) {// heart beat response data is always null;data = null;} else {ObjectInput in = CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto);data = decodeEventData(channel, in, eventPayload);}} else {DecodeableRpcResult result;if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {result = new DecodeableRpcResult(channel, res, is,(Invocation) getRequestData(id), proto);result.decode();} else {result = new DecodeableRpcResult(channel, res,new UnsafeByteArrayInputStream(readMessageData(is)),(Invocation) getRequestData(id), proto);}data = result;}res.setResult(data);} else {ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);res.setErrorMessage(in.readUTF());}} catch (Throwable t) {if (log.isWarnEnabled()) {log.warn("Decode response failed: " + t.getMessage(), t);}res.setStatus(Response.CLIENT_ERROR);res.setErrorMessage(StringUtils.toString(t));}return res;} else {//解码request// decode request.Request req = new Request(id);//设置Dubbo RPC protocol version,2.0.2req.setVersion(Version.getProtocolVersion());//是否是双向请求req.setTwoWay((flag & FLAG_TWOWAY) != 0);if ((flag & FLAG_EVENT) != 0) {req.setEvent(true);}try {Object data;//是否是事件请求,例如心跳请求if (req.isEvent()) {byte[] eventPayload = CodecSupport.getPayload(is);//心跳请求响应data总是nullif (CodecSupport.isHeartBeat(eventPayload, proto)) {// heart beat response data is always null;data = null;} else {ObjectInput in = CodecSupport.deserialize(channel.getUrl(), new ByteArrayInputStream(eventPayload), proto);data = decodeEventData(channel, in, eventPayload);}}//其他rpc请求else {DecodeableRpcInvocation inv;//是否使用当前IO线程直接反序列化请求体,默认falseif (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {inv = new DecodeableRpcInvocation(frameworkModel, channel, req, is, proto);//利用IO线程直接反序列化inv.decode();} else {//创建一个DecodeableRpcInvocation,但是还并未反序列化inv = new DecodeableRpcInvocation(frameworkModel, channel, req,new UnsafeByteArrayInputStream(readMessageData(is)), proto);}data = inv;}//设置data,可能还没有真正的反序列化req.setData(data);} catch (Throwable t) {if (log.isWarnEnabled()) {log.warn("Decode request failed: " + t.getMessage(), t);}// bad requestreq.setBroken(true);req.setData(t);}return req;}
}
总结
上次我们学习了Dubbo3.1的 Consumer发起服务调用请求的过程源码,本次我们学习了Dubbo消息的编码解码的源码,另外还有Dubbo Provider处理服务调用请求以及Dubbo Consumer接收服务调用响应这两个阶段的源码后续再聊。