当前位置: 首页 > news >正文

Eureka 学习笔记3:EurekaHttpClient

版本 awsVersion = ‘1.11.277’

EurekaTransport 用于客户端和服务端之间进行通信,封装了以下接口的实现:

  1. ClosableResolver 接口实现
  2. TransportClientFactory 接口实现
  3. EurekaHttpClient 接口实现及其对应的 EurekaHttpClientFactory 接口实现
private static final class EurekaTransport {// 服务端集群解析器,用于获取服务端列表private ClosableResolver bootstrapResolver;// 底层工厂,用于创建具体通信协议的实现private TransportClientFactory transportClientFactory;// EurekaHttpClient实现,用于注册实例、取消注册、续约private EurekaHttpClient registrationClient;// 上层工厂,用于创建registrationClientprivate EurekaHttpClientFactory registrationClientFactory;// EurekaHttpClient实现,用于全量拉取、增量拉取服务列表private EurekaHttpClient queryClient;// 上层工厂,用于创建queryClientprivate EurekaHttpClientFactory queryClientFactory;
}

EurekaHttpClientFactory 是上层工厂接口(A top level factory interface),用于创建 EurekaHttpClientDecorator 对象(to create http clients for application/eurekaClient use)。

public interface EurekaHttpClientFactory {EurekaHttpClient newClient();void shutdown();
}

例如 DiscoveryClient # scheduleServerEndpointTask 方法:

newRegistrationClientFactory = EurekaHttpClients.registrationClientFactory(eurekaTransport.bootstrapResolver,eurekaTransport.transportClientFactory,transportConfig);
newRegistrationClient = newRegistrationClientFactory.newClient();

TransportClientFactory 是底层工厂接口(A low level client factory interface),用于创建 JerseyApplicationClient(Eureka 原生实现)、Jersey2ApplicationClient(Eureka 原生实现)、RestTemplateEurekaHttpClient(SpringCloud 实现)等具体协议相关的实现。底层工厂创建的对象不建议直接使用(Not advised to be used by top level consumers),需要经过上层工厂加工。

public interface TransportClientFactory {EurekaHttpClient newClient(EurekaEndpoint serviceUrl);void shutdown();
}

例如 DiscoveryClient # scheduleServerEndpointTask 方法:

TransportClientFactories transportClientFactories = argsTransportClientFactories == null? new Jersey1TransportClientFactories(): argsTransportClientFactories;

EurekaHttpClient 接口,用于客户端和服务端之间进行通信,封装了注册、取消注册、续约、拉取服务列表等一系列操作。

public interface EurekaHttpClient {// 注册实例(服务注册)EurekaHttpResponse<Void> register(InstanceInfo info);// 取消注册(服务下线)EurekaHttpResponse<Void> cancel(String appName, String id);// 发送心跳(服务续约)EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus);// 更新实例的服务状态InstanceStatus(服务状态更新)EurekaHttpResponse<Void> statusUpdate(String appName, String id, InstanceStatus newStatus, InstanceInfo info);// 删除实例的覆盖状态,overriddenStatus将变成UNKNOWNEurekaHttpResponse<Void> deleteStatusOverride(String appName, String id, InstanceInfo info);// 获取指定区域regions的注册表(全量获取)EurekaHttpResponse<Applications> getApplications(String... regions);// 获取指定区域regions的注册列表(增量获取)EurekaHttpResponse<Applications> getDelta(String... regions);EurekaHttpResponse<Applications> getVip(String vipAddress, String... regions);EurekaHttpResponse<Applications> getSecureVip(String secureVipAddress, String... regions);// 获取指定应用的实例列表EurekaHttpResponse<Application> getApplication(String appName);// 获取指定实例EurekaHttpResponse<InstanceInfo> getInstance(String appName, String id);// 获取指定实例EurekaHttpResponse<InstanceInfo> getInstance(String id);void shutdown();
}

EurekaHttpClient
HttpReplicationClient 用于服务端和服务端之间进行通信,在 EurekaHttpClient 的基础上封装了更新服务端实例状态、批量同步数据等操作。

public interface HttpReplicationClient extends EurekaHttpClient {// 更新服务端实例的状态EurekaHttpResponse<Void> statusUpdate(String asgName, ASGStatus newStatus);// 向其他服务端批量同步数据EurekaHttpResponse<ReplicationListResponse> submitBatchUpdates(ReplicationList replicationList);
}

Eureka 服务端集群节点是对等节点(peerNode),对等节点之间进行数据同步会产生循环问题和数据冲突问题。

  • 对于循环问题,Eureka 使用 Http 请求头 x-netflix-discovery-replication 表示是否是服务端同步数据操作,如果是服务端同步数据,就不会再继续向其他服务端进行同步数据。
// com.netflix.eureka.resources.InstanceResource
@PUT
public Response renewLease(@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,@QueryParam("overriddenstatus") String overriddenStatus,@QueryParam("status") String status,@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {boolean isFromReplicaNode = "true".equals(isReplication);boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);// ...
}// com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl
private void replicateToPeers(Action action,String appName,String id,InstanceInfo info /* optional */,InstanceStatus newStatus /* optional */,boolean isReplication) {// ...// If it is a replication already,// do not replicate again as this will create a poison replicationif (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {return;}// ...
}
  • 对于数据冲突问题,Eureka 通过比较 ReplicationInstance 类的 lastDirtyTimestamp 属性解决。lastDirtyTimestamp 是服务实例 InstanceInfo 的属性,表示服务实例最近一次变更时间。

例如 Server A 向 Server B 同步数据:

  1. 如果 Server A 的数据比 Server B 的新,Server B 返回 Status.NOT_FOUND,Server A 重新将该服务实例注册到 Server B
  2. 如果 Server A 的数据比 Server B 的旧,Server B 返回 Status.CONFLICT,要求 Server A 同步 Server B 的数据
// com.netflix.eureka.resources.InstanceResource # renewLease
private Response validateDirtyTimestamp(Long lastDirtyTimestamp,boolean isReplication) {InstanceInfo appInfo = registry.getInstanceByAppAndId(app.getName(), id, false);if (appInfo != null) {if ((lastDirtyTimestamp != null) && (!lastDirtyTimestamp.equals(appInfo.getLastDirtyTimestamp()))) {Object[] args = {id,appInfo.getLastDirtyTimestamp(),lastDirtyTimestamp,isReplication};if (lastDirtyTimestamp > appInfo.getLastDirtyTimestamp()) {logger.debug("Time to sync, " + "since the last dirty timestamp differs - " + "ReplicationInstance id : {}," + "Registry: {} Incoming: {} Replication: {}",args);return Response.status(Status.NOT_FOUND).build();} else if (appInfo.getLastDirtyTimestamp() > lastDirtyTimestamp) {// In the case of replication, // send the current instance info in the registry // for the replicating node to sync itself with this one.if (isReplication) {logger.debug("Time to sync, " + "since the last dirty timestamp differs - " + "ReplicationInstance id : {}," + "Registry: {} Incoming: {} Replication: {}",args);return Response.status(Status.CONFLICT).entity(appInfo).build();} else {return Response.ok().build();}}} // end if ((lastDirtyTimestamp != null)}// end if (appInfo != null)return Response.ok().build();
}

抽象类 EurekaHttpClientDecorator 使用装饰者模式为 EurekaHttpClient 添加新的功能。

EurekaHttpClientDecorator

  • SessionedEurekaHttpClient 为 EurekaHttpClient 设置随机的会话时间,超过会话时间则调用 EurekaHttpClientFactory 的 newClient 方法创建一个新的 EurekaHttpClient 执行请求。随机的会话时间在配置 sessionedClientReconnectIntervalSeconds0.5-1.5 之间。

  • RetryableEurekaHttpClient 为 EurekaHttpClient 提供重试功能,默认重试 3 次。当服务端响应异常时,将服务端添加到 quarantineSet 中,并从 ClusterResolver 解析得到的服务端列表中移除 quarantineSet ,选择其他的服务端创建一个新的 EurekaHttpClient 执行请求,超过重试次数则抛出 TransportException 异常。

  • RedirectingEurekaHttpClient 为 EurekaHttpClient 提供重定向功能,默认可重定向次数 10 次。当服务端响应 302 时,获取响应中的 targetUrl,创建一个新的 EurekaHttpClient 执行请求,超过重定向次数则抛出 TransportException 异常。

  • MetricsCollectingEurekaHttpClient 为 EurekaHttpClient 提供指标收集功能,用于集成 Netflix Servo 监控组件,统计各类型请求的耗时以及不同响应码和异常的计数。

http://www.lryc.cn/news/105601.html

相关文章:

  • Android Framework 之 启动流程
  • Qt、C/C++环境中内嵌LUA脚本、实现LUA函数的调用执行
  • 超详细 | 模拟退火算法及其MATLAB实现
  • 在线餐饮油烟实时监测系统的设计与实现
  • 7-2 凯撒密码 (20分)
  • LeetCode_贪心算法_中等_763.划分字母区间
  • 【算法提高:动态规划】1.5 状态压缩DP TODO
  • 建网站一般使用Windows还是liunx好?
  • NodeJs后端项目使用docker打包部署
  • ARM单片机中断处理过程解析
  • 关于SEDEX会员与平台的相关问题汇总
  • 解读Spring-context的property-placeholder
  • 【Rust】枚举类型创建单链表以及常见的链表操作方法
  • Excel 两列数据中相同的数据进行同行显示
  • Windows本地安装配置Qcadoo MES系统
  • 涛思数据与拾贝云达成战略合作,携手赋能工业数字化转型
  • nginx 配置多域名多站点 Ubuntu
  • Docker实践:使用Docker搭建个人开发环境(极简版)
  • SQL从三个表中根据时间分别查询并汇总数量一行展示
  • 同样是跨端框架,React会不会被VUE取代?
  • Excel·VBA定量装箱、凑数值金额、组合求和问题
  • 通过Jmeter压测存储过程
  • Spring笔记之Spring对IoC的实现
  • 【eNSP】Telnet远程登录
  • SOP/详解*和**/python数据结构(iter,list,tuple,dict)/ 解包
  • 使用webdriver-manager解决浏览器与驱动不匹配所带来自动化无法执行的问题
  • 【vue】Vue中debugger报错 unexpected ‘debugger’ statement no-debugger
  • 课题方向a
  • 【Matter】基于Ubuntu 22.04 交叉编译chip-tool
  • Qt/C++音视频开发50-不同ffmpeg版本之间的差异处理