【橘子分布式】gRPC(番外篇-客户端重试机制)
一、简介
在客户端和服务端的交互模式中,我们不免的遇到一类问题。那就是客户端访问的时候服务端可能并非是异常了,只是因为网络延迟或者抖动,可能操作客户端暂时访问不上服务端的情况,如果出现了此类情况的话。那么可以让客户端重试连接。尽可能的避免,因为网络问题,影响通信。从而保证了系统的可靠性。
grpc同样是支持这个能力的,我们可以在客户端通过重试来保证请求的稳定性。
接下来我们就在服务端模拟网络异常来学习一下grpc的客户端的重试机制。
二、代码实现
1、api模块
我们需要在api模块提供一些能力。我们引入gosn用来解析json能力。
<dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.10.1</version>
</dependency>
并且提供解析类路径下的文件的工具类。
package com.levi.utils;import com.google.gson.Gson;import java.io.FileNotFoundException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;public class FileUtil {/*** 从classpath下读取json文件,封装为map* @param fileName* @return* @throws Exception*/public static Map<String, ?> getJsonMap(String fileName) throws Exception {// 通过 ClassLoader 拿到 classpath 下资源的 URLvar url = Thread.currentThread().getContextClassLoader().getResource(fileName);if (url == null) {throw new FileNotFoundException("Resource not found: " + fileName);}// 如果资源在文件系统里(IDE 或解压目录),可以用 Files.readString// 如果将来打成 JAR,则会走 jar:file:...!/ 协议,Files 读不到,需改用 InputStreamString json;if ("file".equalsIgnoreCase(url.getProtocol())) {json = Files.readString(Path.of(url.toURI()));} else {// JAR 内资源,用 InputStream 读try (var in = url.openStream()) {json = new String(in.readAllBytes(), StandardCharsets.UTF_8);}}// 解析 JSONreturn new Gson().fromJson(json, Map.class);}
}
然后我们再次来回顾一下我们在api模块中的proto文件。
syntax = "proto3";package com.levi;option java_multiple_files = false;
option java_package = "com.levi";
option java_outer_classname = "HelloProto";message HelloRequest{string name = 1;
}message HelloRespnose{string result = 1;
}service HelloService{// 普通方法rpc hello(HelloRequest) returns (HelloRespnose);
}
我们以一个简单的一元调用来测试一下重试机制。
2、服务端
我们在服务端修改一下hello方法来模拟网络异常。
@Slf4j
public class HelloServiceImpl extends HelloServiceGrpc.HelloServiceImplBase {private Random random = new Random();@Overridepublic void hello(HelloProto.HelloRequest request, StreamObserver<HelloProto.HelloRespnose> responseObserver) {var randNum = random.nextInt(100);log.info("生成的随机数为: {}", randNum);/** 模拟服务端网络异常的情况,以一个100以内的随机数。当这个数小于33的时候我们就返回一个异常,差不多三分之一的概率* 这个异常是grpc内置的Status一堆异常。只有这种异常才能被客户端重试所感知。我们这里选择UNAVAILABLE异常* 来模拟网络异常。*/if (randNum < 33) {log.info("模拟服务端网络异常的情况,这里通过返回对应的异常来模拟");// 触发error事件,客户端就会感知到,等于是给客户端返回一个异常通知responseObserver.onError(Status.UNAVAILABLE.augmentDescription("您的网络异常,请检查......").asRuntimeException());}else {String name = request.getName();System.out.println("接收到客户端的参数name = " + name);responseObserver.onNext(HelloProto.HelloRespnose.newBuilder().setResult("this is server result").build());responseObserver.onCompleted();}}
}
我们制造三分之一的概率来模拟网络异常,至此服务端就到此为止了。我们接下来来看客户端的开发。
3、客户端
在客户端这里我们要定义重试的策略机制。
首先我们需要定义一个json文件,放在哪里都可以,名字叫啥都行,我们放在resources类路径下面,叫做service_config.json 。
{"methodConfig": [{"name": [{"service": "com.levi.HelloService","method": "hello"}],"retryPolicy": {"maxAttempts": 5,"initialBackoff": "0.5s","maxBackoff": "30s","backoffMultiplier": 2,"retryableStatusCodes": ["UNAVAILABLE"]}}]
}
下面我们来解释一下这个文件的内容。
methodConfig:最外层的大数组key,这个是固定的,没啥说的。
内置一个数组,里面的每一个元素都是一个重试机制,你有几个就配几个。我们来看我们配置的这个。name里面配置你要重试的服务类和你要重试的方法,只有配置了才会有重试。
我们配置的是HelloService类里面的hello方法,注意这里我们配置的服务类是全路径
com.levi.HelloService,这个包名是com.levi,他取自于你的proto定义文件里面的,我们来看下当初的定义。
package com.levi;
option java_package = "com.levi";
我们看到我们有两处定义了com.levi,我们这里只是碰巧了一样,其实你定义的时候是可以不一样的。
那么问题来了,我们在重试配置里用的是哪个呢,我们来想一下,option java_package是给java的包用的,而这个重试机制是不分语言都有的grpc内置的能力。所以我们这里不能用特定的东西,要用通用的跨语言的东西。
所以记住,我们的重试文件里面用的是package这个逻辑包管理后面定义的路径。而不是option后面的。这个同样适用于其他的通用的配置,不过实际开发我们还是把package和java_package 配置为一样的。避免有误用。retryPolicy:你配置的这个方法的重试的策略,单独对这个方法生效的策略。maxAttempts : 包括第一次正常调用在内,最多尝试的次数,我们这里是 3 次。也就是说,如果第一 次失败,后面最多再重试 2 次。initialBackoff : 第一次重试前等待多久。支持后缀:ms / s / m / h,这里是 0.5 秒。maxBackoff : 不论指数回退算出来多久,都不能超过这个配置,我们这里是30 秒。backoffMultiplier : 指数回退系数。每次失败后,等待时间 = 前一次等待时间 × 系数。计算公式:next = min(initialBackoff * (multiplier ^ (attempt-1)), maxBackoff)retryableStatusCodes : ["UNAVAILABLE"],只有当服务端返回的 Status.Code 等于 UNAVAILABLE时才触发重试。如果想重试更多错误,可以往数组里加Status那些异常,看你想怎么设计了,例如 ["UNAVAILABLE", "INTERNAL","RESOURCE_EXHAUSTED"]。
至此我们就完成了这个重试的客户端配置。然后我们来修改客户端调用的代码,把这个配置交给grpc的客户端,使其生效。
package com.levi;import com.levi.utils.FileUtil;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;import java.util.Map;
import java.util.concurrent.TimeUnit;public class GrpcClient {public static void main(String[] args) throws Exception {ManagedChannel managedChannel = null;try {// 读取配置文件,封装为map返回Map<String, ?> serviceConfig = FileUtil.getJsonMap("service_config.json");managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000)// 注册重试策略配置.defaultServiceConfig(serviceConfig)// 开启重试.enableRetry().usePlaintext().build();HelloServiceGrpc.HelloServiceBlockingStub helloServiceBlockingStub = HelloServiceGrpc.newBlockingStub(managedChannel);HelloProto.HelloRespnose helloRespnose = helloServiceBlockingStub.hello(HelloProto.HelloRequest.newBuilder().setName("levi").build());System.out.println("接收到的服务端响应为: " + helloRespnose.getResult());// 客户端等待一下,不然直接关闭就无法重试了。managedChannel.awaitTermination(10, TimeUnit.SECONDS);} catch (Exception e) {e.printStackTrace();} finally {if (managedChannel != null) {managedChannel.shutdown();}}}
}
此时我们就完成了重试的配置,然后我们来启动服务端,测试一下。
第一次测试:
服务端没有异常,正常返回。服务端也正常。
在经历了多次测试之后,我们可以看到客户端返回异常。
此时我们来观察服务端的日志。
此时我们可以看到他发起重试了,因为我把代码改了一下为了提高异常的概率,所以你观察到的随机数可能有点不一样。总之我们是可以看到他会发起重试的。而且我们配置的策略也是可以生效的。没有问题。
至此我们就完成了客户端重试的操作实现。