当前位置: 首页 > news >正文

Dubbo 3.x源码(32)—Dubbo Provider处理服务调用请求源码

基于Dubbo 3.1,详细介绍了Dubbo Provider处理服务调用请求源码

上文我们学习了,Dubbo消息的编码解的源码。现在我们来学习一下Dubbo Provider处理服务调用请求源码。

当前consumer发起了rpc请求,经过请求编码之后到达provider端,在经过InternalDecoder#decode解码请求解码之后,后续调用链路为:InternalDecoder、IdleStateHandler#channelRead、NettyServerHandler#channelRead, NettyServerHandler#channelRead方法将会进行后续业务处理。

Dubbo 3.x服务调用源码:

  1. Dubbo 3.x源码(29)—Dubbo Consumer服务调用源码(1)服务调用入口
  2. Dubbo 3.x源码(30)—Dubbo Consumer服务调用源码(2)发起远程调用
  3. Dubbo 3.x源码(31)—Dubbo消息的编码解码
  4. Dubbo 3.x源码(32)—Dubbo Provider处理服务调用请求源码

Dubbo 3.x服务引用源码:

  1. Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
  2. Dubbo 3.x源码(18)—Dubbo服务引用源码(1)
  3. Dubbo 3.x源码(19)—Dubbo服务引用源码(2)
  4. Dubbo 3.x源码(20)—Dubbo服务引用源码(3)
  5. Dubbo 3.x源码(21)—Dubbo服务引用源码(4)
  6. Dubbo 3.x源码(22)—Dubbo服务引用源码(5)服务引用bean的获取以及懒加载原理
  7. Dubbo 3.x源码(23)—Dubbo服务引用源码(6)MigrationRuleListener迁移规则监听器
  8. Dubbo 3.x源码(24)—Dubbo服务引用源码(7)接口级服务发现订阅refreshInterfaceInvoker
  9. Dubbo 3.x源码(25)—Dubbo服务引用源码(8)notify订阅服务通知更新
  10. Dubbo 3.x源码(26)—Dubbo服务引用源码(9)应用级服务发现订阅refreshServiceDiscoveryInvoker
  11. Dubbo 3.x源码(27)—Dubbo服务引用源码(10)subscribeURLs订阅应用级服务url

Dubbo 3.x服务发布源码:

  1. Dubbo 3.x源码(11)—Dubbo服务的发布与引用的入口
  2. Dubbo 3.x源码(12)—Dubbo服务发布导出源码(1)
  3. Dubbo 3.x源码(13)—Dubbo服务发布导出源码(2)
  4. Dubbo 3.x源码(14)—Dubbo服务发布导出源码(3)
  5. Dubbo 3.x源码(15)—Dubbo服务发布导出源码(4)
  6. Dubbo 3.x源码(16)—Dubbo服务发布导出源码(5)
  7. Dubbo 3.x源码(17)—Dubbo服务发布导出源码(6)
  8. Dubbo 3.x源码(28)—Dubbo服务发布导出源码(7)应用级服务接口元数据发布

文章目录

  • NettyServerHandler#channelRead业务处理入口
  • AbstractPeer#received接收请求
  • MultiMessageHandler#received处理多个消息
  • HeartbeatHandler处理心跳消息
  • AllChannelHandler#received分发任务
    • getPreferredExecutorService获取首选线程池
  • ChannelEventRunnable#run业务线程执行
  • DecodeHandler#received解码消息体
  • HeaderExchangeHandler#received处理消息
  • HeaderExchangeHandler#handleRequest处理请求
    • DubboProtocol.requestHandler#reply真实方法调用
      • DubboProtocol#getInvoker获取invoker
      • CallbackRegistrationInvoker#invoke过滤器链调用
    • AbstractProxyInvoker#invoke业务实现接口调用
      • doInvoke执行业务调用
  • 总结

NettyServerHandler#channelRead业务处理入口

该方法内部根据netty NioSocketChannel获取对应的dubbo NettyChannel,然后通过handler#received方法处理消息。关于这个handler,我们在服务的导出时就说过了,这实际上时很多个handler的嵌套对象,而最外层的就是在这里的handler,即NettyServer。

此时调用线程还是IO线程。

/*** NettyServerHandler的方法* <p>* 请求业务处理入口** @param ctx DefaultChannelHandlerContext* @param msg Request* @throws Exception*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//根据netty NioSocketChannel获取对应的dubbo NettyChannelNettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);/** 通过NettyServer#received方法处理消息*/handler.received(channel, msg);// trigger qos handlerctx.fireChannelRead(msg);
}

AbstractPeer#received接收请求

NettyServer#received接收请求的方法由其父类AbstractPeer#received实现,该方法内部又委托MultiMessageHandler#received方法实现。

此时调用线程还是IO线程。

/*** AbstractPeer的方法* * @param ch NettyChannel* @param msg 消息*/
@Override
public void received(Channel ch, Object msg) throws RemotingException {if (closed) {return;}//调用MultiMessageHandler#received方法handler.received(ch, msg);
}

MultiMessageHandler#received处理多个消息

MultiMessageHandler#received方法主要是处理同时接收到多条消息的情况。如果msg属于MultiMessage,那么循环调用下层handler#received方法,否则直接调用下层handler#received方法,下层handler是HeartbeatHandler。

此时调用线程还是IO线程。

/*** MultiMessageHandler的方法* <p>* 处理多消息的情况** @param channel  NettyChannel* @param message 消息*/
@SuppressWarnings("unchecked")
@Override
public void received(Channel channel, Object message) throws RemotingException {//如果是多消息类型,即包含多条消息if (message instanceof MultiMessage) {MultiMessage list = (MultiMessage) message;//循环处理多条消息for (Object obj : list) {try {//调用下层HeartbeatHandler#received方法handler.received(channel, obj);} catch (Throwable t) {logger.error("MultiMessageHandler received fail.", t);try {handler.caught(channel, t);} catch (Throwable t1) {logger.error("MultiMessageHandler caught fail.", t1);}}}} else {//直接调用下层HeartbeatHandler#received方法handler.received(channel, message);}
}

HeartbeatHandler处理心跳消息

该方法专门用于处理心跳请求和响应,对于心跳消息,那么这里处理了之后直接return了,不再进行下层handler处理,而普通rpc请求则继续调用下层AllChannelHandler#received方法处理。

此时调用线程还是IO线程。

/*** HeartbeatHandler的方法* <p>* 处理心跳请求** @param channel NettyChannel* @param message 消息*/
@Override
public void received(Channel channel, Object message) throws RemotingException {//为NettyChannel设置READ_TIMESTAMP属性,置为当前时间戳setReadTimestamp(channel);//是否是心跳请求if (isHeartbeatRequest(message)) {Request req = (Request) message;//如果是双向请求if (req.isTwoWay()) {//那么直接创建一个Response返回Response res = new Response(req.getId(), req.getVersion());res.setEvent(HEARTBEAT_EVENT);//直接发送响应channel.send(res);if (logger.isDebugEnabled()) {int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()+ ", cause: The channel has no data-transmission exceeds a heartbeat period"+ (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));}}//直接返回,不用后续处理return;}//如果是心跳响应,直接返回,不用后续处理if (isHeartbeatResponse(message)) {if (logger.isDebugEnabled()) {logger.debug("Receive heartbeat response in thread " + Thread.currentThread().getName());}return;}//调用下层AllChannelHandler#received方法handler.received(channel, message);
}

AllChannelHandler#received分发任务

将当前消息包装为一个ChannelEventRunnable分发给对应的线程池执行,这里的线程池就是dubbo业务线程池,到此IO线程的任务结束。


这种方式实现了线程资源的隔离,释放了IO线程,可以快速处理更多的IO操作,提升了系统吞吐量。


实际上这对应的就是AllDispatcher的线程模型,也是Dubbo的默认线程模型,即所有消息都派发到业务线程池,包括建立连接CONNECTED、关闭连接DISCONNECTED、接受请求RECEIVED、处理异常CAUGHT、发送响应SENT等等,响应消息会优先使用对于请求所使用的线程池。这是默认策略。

/*** AllChannelHandler的方法* <p>* 处理普通rpc请求请求** @param channel NettyChannel* @param message 消息*/
@Override
public void received(Channel channel, Object message) throws RemotingException {//获取对应的线程池,可能是ThreadlessExecutorExecutorService executor = getPreferredExecutorService(message);try {//创建一个线程任务ChannelEventRunnable,通过线程池执行//这里的handler是DecodeHandlerexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {if (message instanceof Request && t instanceof RejectedExecutionException) {sendFeedback(channel, (Request) message, t);return;}throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);}
}

getPreferredExecutorService获取首选线程池

该方法获取处理业务的首选线程池,目前这种方法主要是为了方便消费者端的线程模型而定制的。这是Dubo2.7.5之后的线程模型的优化:

  1. 对于响应消息,那么从DefaultFuture的静态字段缓存映射FUTURES中获取请求id对应的DefaultFuture。请求的id就是对应的响应的id。
    1. 然后获取执DefaultFuture的执行器,对于默认的同步请求,那自然是ThreadlessExecutor,这里面阻塞着发起同步调用的线程,将回调直接委派给发起调用的线程。对于异步请求,则获取异步请求的线程池。
    2. 因此后续的处理包括响应体的解码都是由发起调用的线程来执行,这样减轻了业务线程池的压力。后面我们讲消费者接受响应的时候,会讲解对应的源码。
  2. 对于请求消息,则使用共享executor执行后续逻辑。对于共享线程池,默认为FixedThreadPool,固定200线程,阻塞队列长度为0,拒绝策略为打印异常日志并且抛出异常。
/*** WrappedChannelHandler的方法* <p>* 目前,这种方法主要是为了方便消费者端的线程模型而定制的。* 1. 使用ThreadlessExecutor,又名,将回调直接委派给发起调用的线程。* 2. 使用共享executor执行回调。** @param msg 消息* @return 执行器*/
public ExecutorService getPreferredExecutorService(Object msg) {//如果是响应消息if (msg instanceof Response) {Response response = (Response) msg;//从DefaultFuture的静态字段缓存映射FUTURES中获取请求id对应的DefaultFutureDefaultFuture responseFuture = DefaultFuture.getFuture(response.getId());// a typical scenario is the response returned after timeout, the timeout response may have completed the future//一个典型的场景是响应超时后返回,超时后的响应可能已经完成if (responseFuture == null) {//获取当前服务器或客户端的共享执行器return getSharedExecutorService();} else {//获取执future的执行器,对于默认的同步请i去,那自然是ThreadlessExecutor,这里面阻塞着发起同步调用的线程ExecutorService executor = responseFuture.getExecutor();if (executor == null || executor.isShutdown()) {//获取当前服务器或客户端的共享执行器executor = getSharedExecutorService();}return executor;}} else {//获取当前服务器或客户端的共享执行器return getSharedExecutorService();}
}/*** get the shared executor for current Server or Client** @return*/
public ExecutorService getSharedExecutorService() {// Application may be destroyed before channel disconnected, avoid create new application model// see https://github.com/apache/dubbo/issues/9127//在断开通道之前,应用程序可能被销毁,避免创建新的应用程序模型if (url.getApplicationModel() == null || url.getApplicationModel().isDestroyed()) {return GlobalResourcesRepository.getGlobalExecutorService();}// note: url.getOrDefaultApplicationModel() may create new application modelApplicationModel applicationModel = url.getOrDefaultApplicationModel();ExecutorRepository executorRepository =applicationModel.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();//从执行器仓库中根据url获取对应的执行器,默认INTERNAL_SERVICE_EXECUTOR对应的执行器ExecutorService executor = executorRepository.getExecutor(url);if (executor == null) {executor = executorRepository.createExecutorIfAbsent(url);}return executor;
}

ChannelEventRunnable#run业务线程执行

ChannelEventRunnable#run方法就是业务线程执行的入口,构建任务的时候传递的handler是DecodeHandler。此时调用线程已经变成了业务线程。

该方法将会判断通道状态的各种状态,并且调用handler的各种对应的方法处理。这里的handler是DecodeHandler。

@Override
public void run() {//通道状态如果是处理消息状态,该状态占大多数情况if (state == ChannelState.RECEIVED) {try {/** 调用DecodeHandler#received方法*/handler.received(channel, message);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is " + message, e);}} else {//其他状态switch (state) {case CONNECTED:try {handler.connected(channel);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);}break;case DISCONNECTED:try {handler.disconnected(channel);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);}break;case SENT:try {handler.sent(channel, message);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is " + message, e);}break;case CAUGHT:try {handler.caught(channel, exception);} catch (Exception e) {logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel+ ", message is: " + message + ", exception is " + exception, e);}break;default:logger.warn("unknown state: " + state + ", message is " + message);}}}

DecodeHandler#received解码消息体

这个DecodeHandler主要是对于请求消息或者响应消息的消息体进行解码,对于请求消息(request.mData),这一步将会解码出调用的方法名,方法参数类型,传递的方法参数,attachments等信息,随后调用下层HeaderExchangeHandler#received方法。

此前的InternalDecoder#decode方法解码了Reuqest,但是对于内部的data并没有解码,而是留给了业务线程在DecodeHandler中去操作,这样可以尽量快速释放IO线程。

/*** DecodeHandler的方法** 消息体的解码** @param channel NettyChannel* @param message 消息*/
@Override
public void received(Channel channel, Object message) throws RemotingException {//Decodeable类型,直接处理if (message instanceof Decodeable) {decode(message);}//请求消息if (message instanceof Request) {decode(((Request) message).getData());}//响应消息if (message instanceof Response) {decode(((Response) message).getResult());}//调用下层HeaderExchangeHandler#received方法handler.received(channel, message);
}

HeaderExchangeHandler#received处理消息

HeaderExchangeHandler#received方法对于消息进行分类并调用不同的方法继续处理。

对于请求消息,如果是双向消息,那么调用handleRequest方法继续处理,将会创建Response对象,然后调用dubboProtocol.requestHandler完成请求处理获取结果,并将结果封装到Response中后返回给客户端。如果是单向消息则仅仅调用dubboProtocol.requestHandler完成请求处理即可。

对于响应消息,将会调用DefaultFuture.received方法处理,此时就会根据响应id获取对应的DefaultFuture,将响应结果设置进去。这里的源码我们后面将consumer获取响应结果的时候再讲解。

/*** HeaderExchangeHandler的方法* <p>* 分类处理消息** @param channel NettyChannel* @param message 消息*/
@Override
public void received(Channel channel, Object message) throws RemotingException {final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);//请求消息if (message instanceof Request) {// handle request.Request request = (Request) message;if (request.isEvent()) {//处理事件消息handlerEvent(channel, request);} else {if (request.isTwoWay()) {//额外处理双向消息handleRequest(exchangeChannel, request);} else {//处理单向消息,直接调用下层DubboProtocol.requestHandler#received方法handler.received(exchangeChannel, request.getData());}}}//响应消息else if (message instanceof Response) {handleResponse(channel, (Response) message);}//字符串else if (message instanceof String) {if (isClientSide(channel)) {Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());logger.error(e.getMessage(), e);} else {String echo = handler.telnet(channel, (String) message);if (StringUtils.isNotEmpty(echo)) {channel.send(echo);}}} else {handler.received(exchangeChannel, message);}
}

HeaderExchangeHandler#handleRequest处理请求

该方法用于处理双向请求,也就是普通rpc请求。将会创建Response对象,然后调用dubboProtocol.requestHandler完成请求处理获取结果,并将结果封装到Response中后返回给客户端。

/*** HeaderExchangeHandler的方法* <p>* 处理双向rpc请求消息** @param channel NettyChannel* @param req     请求消息*/
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {//创建一个Response响应对象//id是请求id,这样当响应返回之后,请求端便能找到对应的DefaultFuture进行后续结果的封装Response res = new Response(req.getId(), req.getVersion());//如果是损坏的请求,则直接返回错误响应,通常解码失败这样返回if (req.isBroken()) {Object data = req.getData();String msg;if (data == null) {msg = null;} else if (data instanceof Throwable) {msg = StringUtils.toString((Throwable) data);} else {msg = data.toString();}res.setErrorMessage("Fail to decode request due to: " + msg);res.setStatus(Response.BAD_REQUEST);channel.send(res);return;}// find handler by message class.//获取请求体 DecodeableRpcInvocationObject msg = req.getData();try {/** 异步调用下层handler  DubboProtocol.requestHandler#reply方法* DubboProtocol.requestHandler是DubboProtocol中的一个匿名内部类实现*/CompletionStage<Object> future = handler.reply(channel, msg);/** 阻塞等待直到reply调用完成,则将结果设置到Response中,然后调用channel.send方法将调用结果返回给服务消费端*/future.whenComplete((appResult, t) -> {try {if (t == null) {res.setStatus(Response.OK);res.setResult(appResult);} else {res.setStatus(Response.SERVICE_ERROR);res.setErrorMessage(StringUtils.toString(t));}//将调用结果返回给服务消费端channel.send(res);} catch (RemotingException e) {logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);}});} catch (Throwable e) {//调用异常处理,设置SERVICE_ERROR状态码res.setStatus(Response.SERVICE_ERROR);res.setErrorMessage(StringUtils.toString(e));//将异常调用结果返回给服务消费端channel.send(res);}
}

DubboProtocol.requestHandler#reply真实方法调用

该方法首先根据请求参数从DubboProtocol的exporterMap缓存中获取对应的exporter,然后获取exporter中的invoker,然后调用invoker#invoke方法对于真实的方法实现进行调用,然后返回调用结果。

这里获取的Invoker,实际上是FilterChainBuilder的内部类CallbackRegistrationInvoker,我们在前面学习consumer发起服务调用请求的时候,实际上也是走的这个方法。可见dubbo在方法的复用的设计上还是很牛的。

CallbackRegistrationInvoker实际上内部持有是一个过滤器链,调用它的invoke方法,会触发一系列过滤器filter的的调用,执行过滤操作。

/*** DubboProtocol.ExchangeHandler的匿名内部类实现*/
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {if (!(message instanceof Invocation)) {throw new RemotingException(channel, "Unsupported request: "+ (message == null ? null : (message.getClass().getName() + ": " + message))+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());}//转换为调用抽象Invocation inv = (Invocation) message;//获取对应的服务提供者invoker实例Invoker<?> invoker = getInvoker(channel, inv);inv.setServiceModel(invoker.getUrl().getServiceModel());// switch TCCLif (invoker.getUrl().getServiceModel() != null) {Thread.currentThread().setContextClassLoader(invoker.getUrl().getServiceModel().getClassLoader());}// need to consider backward-compatibility if it's a callback//如果是回调方法,需要考虑向后兼容,验回调方法是否存在if (Boolean.TRUE.toString().equals(inv.getObjectAttachmentWithoutConvert(IS_CALLBACK_SERVICE_INVOKE))) {String methodsStr = invoker.getUrl().getParameters().get("methods");boolean hasMethod = false;if (methodsStr == null || !methodsStr.contains(",")) {hasMethod = inv.getMethodName().equals(methodsStr);} else {String[] methods = methodsStr.split(",");for (String method : methods) {if (inv.getMethodName().equals(method)) {hasMethod = true;break;}}}if (!hasMethod) {logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()+ " not found in callback service interface ,invoke will be ignored."+ " please update the api interface. url is:"+ invoker.getUrl()) + " ,invocation is :" + inv);return null;}}//设置调用端地址RpcContext.getServiceContext().setRemoteAddress(channel.getRemoteAddress());/** 基于invoker调用invoke方法,内部将会对服务接口的真实实现进行调用并获取结果*/Result result = invoker.invoke(inv);//同步等待结果并将结果封装为CompletableFuture返回return result.thenApply(Function.identity());
}

DubboProtocol#getInvoker获取invoker

该方法获取从调用端传递的RpcInvocation中获取各种信息,然后组装成serviceKey:{group}/{servicePath}:{version}:{port},随后从DubboProtocol的exporterMap缓存中根据serviceKey获取对应的exporter,然后获取exporter中的invoker返回。

我们在此前学习dubbo服务导出的时候,就学习了这个exporterMap,其內部存放着serviecKey到对应的exporter的实现。现在,在服务调用的时候,就用到了这个缓存,这样一下就和之前学习的内容对应上了。

/*** DubboProtocol的方法* * @param channel HeaderExchangeChannel* @param inv 调用抽象,RpcInvocation* @return invoker*/
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {boolean isCallBackServiceInvoke;boolean isStubServiceInvoke;//端口号int port = channel.getLocalAddress().getPort();//获取path,服务接口String path = (String) inv.getObjectAttachmentWithoutConvert(PATH_KEY);//if it's stub service on client side(after enable stubevent, usually is set up onconnect or ondisconnect method)//客户端的存根服务isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getObjectAttachmentWithoutConvert(STUB_EVENT_KEY));if (isStubServiceInvoke) {//when a stub service export to local, it usually can't be exposed to portport = 0;}// if it's callback service on client side//如果是客户端的回调服务isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;if (isCallBackServiceInvoke) {path += "." + inv.getObjectAttachmentWithoutConvert(CALLBACK_SERVICE_KEY);inv.setObjectAttachment(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());}//构建服务key,{group}/{servicePath}:{version}:{port}String serviceKey = serviceKey(port,path,(String) inv.getObjectAttachmentWithoutConvert(VERSION_KEY),(String) inv.getObjectAttachmentWithoutConvert(GROUP_KEY));//从DubboProtocol的exporterMap缓存中根据serviceKey获取对应的exporterDubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);if (exporter == null) {throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " +", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + getInvocationWithoutData(inv));}//获取exporter中的invokerreturn exporter.getInvoker();
}

CallbackRegistrationInvoker#invoke过滤器链调用

CallbackRegistrationInvoker#invoke方法用于添加一个回调,它可以在RPC调用完成时触发,在这回调函数将会倒序执行filters中的过滤器。

/*** FilterChainBuilder的内部类CallbackRegistrationInvoker的方法*/
@Override
public Result invoke(Invocation invocation) throws RpcException {/** 继续调用下层invoker#invoke*/Result asyncResult = filterInvoker.invoke(invocation);//添加一个回调,它可以在RPC调用完成时触发。asyncResult.whenCompleteWithContext((r, t) -> {RuntimeException filterRuntimeException = null;//过滤器倒序执行for (int i = filters.size() - 1; i >= 0; i--) {FILTER filter = filters.get(i);try {InvocationProfilerUtils.releaseDetailProfiler(invocation);if (filter instanceof ListenableFilter) {//执行过滤器ListenableFilter listenableFilter = ((ListenableFilter) filter);Filter.Listener listener = listenableFilter.listener(invocation);try {if (listener != null) {if (t == null) {listener.onResponse(r, filterInvoker, invocation);} else {listener.onError(t, filterInvoker, invocation);}}} finally {listenableFilter.removeListener(invocation);}} else if (filter instanceof FILTER.Listener) {FILTER.Listener listener = (FILTER.Listener) filter;if (t == null) {listener.onResponse(r, filterInvoker, invocation);} else {listener.onError(t, filterInvoker, invocation);}}} catch (RuntimeException runtimeException) {LOGGER.error(String.format("Exception occurred while executing the %s filter named %s.", i, filter.getClass().getSimpleName()));if (LOGGER.isDebugEnabled()) {LOGGER.debug(String.format("Whole filter list is: %s", filters.stream().map(tmpFilter -> tmpFilter.getClass().getSimpleName()).collect(Collectors.toList())));}filterRuntimeException = runtimeException;t = runtimeException;}}if (filterRuntimeException != null) {throw filterRuntimeException;}});return asyncResult;
}/*** FilterChainBuilder#CopyOfFilterChainNode的方法*/
@Override
public Result invoke(Invocation invocation) throws RpcException {Result asyncResult;try {//进入细节分析器InvocationProfilerUtils.enterDetailProfiler(invocation, () -> "Filter " + filter.getClass().getName() + " invoke.");/** 执行过滤器操作*/asyncResult = filter.invoke(nextNode, invocation);} catch (Exception e) {InvocationProfilerUtils.releaseDetailProfiler(invocation);if (filter instanceof ListenableFilter) {ListenableFilter listenableFilter = ((ListenableFilter) filter);try {Filter.Listener listener = listenableFilter.listener(invocation);if (listener != null) {listener.onError(e, originalInvoker, invocation);}} finally {listenableFilter.removeListener(invocation);}} else if (filter instanceof FILTER.Listener) {FILTER.Listener listener = (FILTER.Listener) filter;listener.onError(e, originalInvoker, invocation);}throw e;} finally {}return asyncResult;
}

过滤器链的尽头就是真实invoker,invoke方法经过的过滤器filter如下:ContextFilter、ProfilerServerFilter、EchoFilter、ClassLoaderFilter、GenericFilter、ExceptionFilter、MonitorFilter、TimeoutFilter、TraceFilter、ClassLoaderCallbackFilter、InvokerWrapper、DelegateProviderMetaDataInvoker、AbstractProxyInvoker。

AbstractProxyInvoker#invoke业务实现接口调用

AbstractProxyInvoker#invoke方法内部将会调用doInvoke方法,该方法会调用业务接口,最终统一个返回AsyncRpcResult。

/*** AbstractProxyInvoker的方法* <p>* 执行业务接口实现** @param invocation 调用抽象*/
@Override
public Result invoke(Invocation invocation) throws RpcException {try {ProfilerEntry originEntry = null;//性能分析if (ProfilerSwitch.isEnableSimpleProfiler()) {Object fromInvocation = invocation.get(Profiler.PROFILER_KEY);if (fromInvocation instanceof ProfilerEntry) {ProfilerEntry profiler = Profiler.enter((ProfilerEntry) fromInvocation, "Receive request. Server biz impl invoke begin.");invocation.put(Profiler.PROFILER_KEY, profiler);originEntry = Profiler.setToBizProfiler(profiler);}}/** 执行doInvoke,该方法由子类实现*/Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());//性能分析if (ProfilerSwitch.isEnableSimpleProfiler()) {Object fromInvocation = invocation.get(Profiler.PROFILER_KEY);if (fromInvocation instanceof ProfilerEntry) {ProfilerEntry profiler = Profiler.release((ProfilerEntry) fromInvocation);invocation.put(Profiler.PROFILER_KEY, profiler);}}Profiler.removeBizProfiler();if (originEntry != null) {Profiler.setToBizProfiler(originEntry);}//将返回结果转换为CompletableFutureCompletableFuture<Object> future = wrapWithFuture(value, invocation);//转换为AppResponse的CompletableFutureCompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {//创建AppResponseAppResponse result = new AppResponse(invocation);if (t != null) {if (t instanceof CompletionException) {result.setException(t.getCause());} else {result.setException(t);}} else {result.setValue(obj);}return result;});//统一返回AsyncRpcResultreturn new AsyncRpcResult(appResponseFuture, invocation);} catch (InvocationTargetException e) {if (RpcContext.getServiceContext().isAsyncStarted() && !RpcContext.getServiceContext().stopAsync()) {logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e);}return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);} catch (Throwable e) {throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);}
}

doInvoke执行业务调用

到这一步,实际上以及调用到了基于JavassistProxyFactory#getInvoker或者JdkProxyFactory#getInvoker方法创建的AbstractProxyInvoker匿名实现类实例。我们在学习Dubbo服务导出的时候就说过了,这就是最底层的Invoker,其内部包含了真实的服务实现。

JavassistProxyFactory 是默认的invoker工厂。dubbo利用javassist动态创建了Class对应的Wrapper对象,动态生成的Wrapper类改写invokeMethod方法,其内部会被改写为根据接口方法名和参数直接调用ref对应名字的方法,避免通过Jdk的反射调用方法带来的性能问题。

JavassistProxyFactory#getInvoker方法如下,可以看到返回的AbstractProxyInvoker实例重写了doInvoke方法,而AbstractProxyInvoker#invoke方法内部将会调用doInvoke方法,在doInvoke方法中,内部实际调用的wrapper#invokeMethod方法,该方法中将会通过方法名字符串去调用对应真实接口实现的方法,而不是反射查找真实接口实现的方法去调用,提升了调用速度。

/*** JavassistProxyFactory的方法* <p>* 将ref、interfaceClass、url包装成一个Invoker代理对象** @param proxy 被代理的实例* @param type  代理接口Class* @param url   服务url,可能是基于injvm协议的服务url(本地导出),也可能是追加了export=url参数的注册中心url(远程导出),还可能是原始的服务url(直连导出)* @return 一个Invoker可执行体实例*/
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {try {// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'//基于javassist动态创建了Class对应的Wrapper对象,动态生成的Wrapper类改写invokeMethod方法,其内部会被改写为根据接口方法名和参数直接调用ref对应名字的方法//这样后续调用方法时,就可以避免Jdk动态代理中通过反射调用方法带来的性能问题final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);//创建一个AbstractProxyInvoker的匿名实例return new AbstractProxyInvoker<T>(proxy, type, url) {@Overrideprotected Object doInvoke(T proxy, String methodName,Class<?>[] parameterTypes,Object[] arguments) throws Throwable {//调用wrapper的invokeMethod方法return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);}};} catch (Throwable fromJavassist) {// try fall back to JDK proxy factorytry {Invoker<T> invoker = jdkProxyFactory.getInvoker(proxy, type, url);logger.error("Failed to generate invoker by Javassist failed. Fallback to use JDK proxy success. " +"Interfaces: " + type, fromJavassist);// log out errorreturn invoker;} catch (Throwable fromJdk) {logger.error("Failed to generate invoker by Javassist failed. Fallback to use JDK proxy is also failed. " +"Interfaces: " + type + " Javassist Error.", fromJavassist);logger.error("Failed to generate invoker by Javassist failed. Fallback to use JDK proxy is also failed. " +"Interfaces: " + type + " JDK Error.", fromJdk);throw fromJavassist;}}
}

总结

本次我们学习了provider接收并处理consumer远程rpc请求的流程,大概的流程如下:

服务端接受请求之后会反序列化(解码)请求参数并通过参数找到之前暴露存储的exporterMap缓存中获取对应的exporter,然后获取exporter中的invoker,而invoker内部持有真正的业务服务实现类对象,最终会调用真正的实现类对象的方法,获取结果后会组装响应并序列化并返回,这个响应的id和对应的请求的id是一样的。

对应的方法调用栈为,IO线程调用栈:

  1. NettyServerHandler#channelRead(ChannelHandlerContext, MessageEvent)
  2. AbstractPeer#received(Channel, Object)
  3. MultiMessageHandler#received(Channel, Object)
  4. HeartbeatHandler#received(Channel, Object)
  5. AllChannelHandler#received(Channel, Object) – 业务线程池执行线程任务

业务线程调用栈:

  1. ChannelEventRunnable#run()
  2. DecodeHandler#received(Channel, Object)
  3. HeaderExchangeHandler#received(Channel, Object)
  4. HeaderExchangeHandler#handleRequest(ExchangeChannel, Request)
  5. DubboProtocol.requestHandler#reply(ExchangeChannel, Object)
  6. CallbackRegistrationInvoker#invoke(Invocation) --各种Filter
  7. InvokerWrapper#invoke(Invocation)
  8. DelegateProviderMetaDataInvoker#invoke(Invocation)
  9. AbstractProxyInvoker#invoke(Invocation)
  10. AbstractProxyInvoker#doInvoke(Invocation) --基于javassist创建的invoker
  11. wrapper#invokeMethod(Object, String, Class[], Object[])
  12. xxxServiceImpl#xxx() --业务方法调用
http://www.lryc.cn/news/609881.html

相关文章:

  • 《算法导论》第 1 章 - 算法在计算中的作用
  • Java开发时出现的问题---语言特性与基础机制陷阱
  • 从HTTP到WebSocket:打造极速实时通讯体验
  • 安全扫描:目标主机支持RSA密钥交换问题
  • 国产化低代码平台如何筑牢企业数字化安全底座
  • 消防器材检测数据集介绍-9,600 张图片 智慧安防系统 建筑施工安全监管 AI 消防巡检机器人 自动审核系统 公共场所安全监测
  • Solidity全局变量与安全实践指南
  • [论文阅读] 人工智能 + 教学 | 从代码到职业:用机器学习预测竞赛程序员的就业潜力
  • 安全扫描:目标使用过期的TLS1.0 版协议问题
  • 【乐企板式文件】不动产销售类发票已支持
  • MySQL三大日志详解(binlog、undo log、redo log)
  • 赋能未来:数字孪生驱动能源系统智能化升级
  • 【项目实践】在系统接入天气api,根据当前天气提醒,做好plan
  • Linux(centos)安全狗
  • 【芯片设计专用执行单元:PWM如何重塑能源与智能控制】
  • sqli-labs靶场less29~less35
  • 2025.08.04 移除元素
  • 【测试工程思考】测试自动化基础能力建设
  • 使用mybatis生成器生成实体类mapper和查询参数文件,实现简单增删改查。使用log4j输出日志到控制台。使用配置文件注册Bean,配置视图解析器
  • 每天学一个Linux命令(38):vi/vim
  • Excel商业智能分析报表 【销售管理分析仪】
  • 免费MCP: JSON 转 Excel MCP
  • Vim 高效使用指南
  • Flutter 事件总线 Event Bus
  • HarmonyOS 多屏适配最佳实践:基于 ArkUI 的响应式 UI 方案
  • 计算机网络:理解路由的下一跳
  • 第四十六章:AI的“瞬时记忆”与“高效聚焦”:llama.cpp的KV Cache与Attention机制
  • C++ 中 initializer_list 类型推导
  • 2.1 vue组件
  • 【AMD】编译llama.cpp实践版