当前位置: 首页 > news >正文

[spring-cloud: 负载均衡]-源码分析

获取服务列表

ServiceInstanceListSupplier

ServiceInstanceListSupplier 接口是一个提供 ServiceInstance 列表的供应者,返回一个响应式流 Flux<List<ServiceInstance>>,用于服务发现。

public interface ServiceInstanceListSupplier extends Supplier<Flux<List<ServiceInstance>>> {String getServiceId();default Flux<List<ServiceInstance>> get(Request request) {return get();}static ServiceInstanceListSupplierBuilder builder() {return new ServiceInstanceListSupplierBuilder();}}

DelegatingServiceInstanceListSupplier

DelegatingServiceInstanceListSupplier 是一个抽象类,继承自 ServiceInstanceListSupplier,它通过委托给另一个 ServiceInstanceListSupplier 实例来实现其功能,同时支持选定服务实例的回调、初始化和销毁操作。

public abstract class DelegatingServiceInstanceListSupplier implements ServiceInstanceListSupplier, SelectedInstanceCallback, InitializingBean, DisposableBean {protected final ServiceInstanceListSupplier delegate;public DelegatingServiceInstanceListSupplier(ServiceInstanceListSupplier delegate) {Assert.notNull(delegate, "delegate may not be null");this.delegate = delegate;}public ServiceInstanceListSupplier getDelegate() {return delegate;}@Overridepublic String getServiceId() {return delegate.getServiceId();}@Overridepublic void selectedServiceInstance(ServiceInstance serviceInstance) {if (delegate instanceof SelectedInstanceCallback selectedInstanceCallbackDelegate) {selectedInstanceCallbackDelegate.selectedServiceInstance(serviceInstance);}}@Overridepublic void afterPropertiesSet() throws Exception {if (delegate instanceof InitializingBean) {((InitializingBean) delegate).afterPropertiesSet();}}@Overridepublic void destroy() throws Exception {if (delegate instanceof DisposableBean) {((DisposableBean) delegate).destroy();}}}

负载均衡实现

ReactorLoadBalancer

ReactorLoadBalancer 是基于 Reactor 实现的响应式负载均衡器,通过 Mono<Response<T>> 异步选择服务实例。

public interface ReactorLoadBalancer<T> extends ReactiveLoadBalancer<T> {@SuppressWarnings("rawtypes")Mono<Response<T>> choose(Request request);default Mono<Response<T>> choose() {return choose(REQUEST);}}

ReactorServiceInstanceLoadBalancer

ReactorServiceInstanceLoadBalancer 是一个标记接口,继承自 ReactorLoadBalancer,专门用于选择 ServiceInstance 对象的负载均衡器。

// RandomLoadBalancer, RoundRobinLoadBalancer
public interface ReactorServiceInstanceLoadBalancer extends ReactorLoadBalancer<ServiceInstance> {}

核心代码逻辑

推荐阅读:[spring-cloud: @LoadBalanced & @LoadBalancerClient]-源码分析

1. BlockingLoadBalancerInterceptor

// LoadBalancerInterceptor, RetryLoadBalancerInterceptor 
public interface BlockingLoadBalancerInterceptor extends ClientHttpRequestInterceptor {}

LoadBalancerInterceptor

public class LoadBalancerInterceptor implements BlockingLoadBalancerInterceptor {private final LoadBalancerClient loadBalancer;private final LoadBalancerRequestFactory requestFactory;public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {this.loadBalancer = loadBalancer;this.requestFactory = requestFactory;}public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {// for backwards compatibilitythis(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));}// 重点!@Overridepublic ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution)throws IOException {URI originalUri = request.getURI();String serviceName = originalUri.getHost();Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);return loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));}}

2. BlockingLoadBalancerClient

ServiceInstanceChooser

ServiceInstanceChooser 接口用于通过负载均衡器选择与指定服务ID对应的服务实例,支持带请求上下文的选择。

public interface ServiceInstanceChooser {ServiceInstance choose(String serviceId);<T> ServiceInstance choose(String serviceId, Request<T> request);}

LoadBalancerClient

LoadBalancerClient 接口用于客户端负载均衡,选择服务实例并执行请求,同时提供将逻辑服务名重构为实际服务实例的 URI 的功能。

public interface LoadBalancerClient extends ServiceInstanceChooser {<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;URI reconstructURI(ServiceInstance instance, URI original);}
// BlockingLoadBalancerClientAutoConfiguration
@SuppressWarnings({ "unchecked", "rawtypes" })
public class BlockingLoadBalancerClient implements LoadBalancerClient {// org.springframework.cloud.loadbalancer.config.LoadBalancerAutoConfiguration// LoadBalancerClientFactoryprivate final ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory;public BlockingLoadBalancerClient(ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory) {this.loadBalancerClientFactory = loadBalancerClientFactory;}// 重点!@Overridepublic <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {String hint = getHint(serviceId);LoadBalancerRequestAdapter<T, TimedRequestContext> lbRequest = new LoadBalancerRequestAdapter<>(request, buildRequestContext(request, hint));Set<LoadBalancerLifecycle> supportedLifecycleProcessors = getSupportedLifecycleProcessors(serviceId);supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));// 通过 choose 方法来选择一个合适的 ServiceInstanceServiceInstance serviceInstance = choose(serviceId, lbRequest);if (serviceInstance == null) {supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.DISCARD, lbRequest, new EmptyResponse())));throw new IllegalStateException("No instances available for " + serviceId);}return execute(serviceId, serviceInstance, lbRequest);}private <T> TimedRequestContext buildRequestContext(LoadBalancerRequest<T> delegate, String hint) {if (delegate instanceof HttpRequestLoadBalancerRequest) {HttpRequest request = ((HttpRequestLoadBalancerRequest) delegate).getHttpRequest();if (request != null) {RequestData requestData = new RequestData(request);return new RequestDataContext(requestData, hint);}}return new DefaultRequestContext(delegate, hint);}// 通过生命周期钩子函数来管理负载均衡请求的开始与结束,并处理可能的异常,确保负载均衡的执行过程有序@Overridepublic <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException {if (serviceInstance == null) {throw new IllegalArgumentException("Service Instance cannot be null, serviceId: " + serviceId);}DefaultResponse defaultResponse = new DefaultResponse(serviceInstance);Set<LoadBalancerLifecycle> supportedLifecycleProcessors = getSupportedLifecycleProcessors(serviceId);Request lbRequest = request instanceof Request ? (Request) request : new DefaultRequest<>();supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, new DefaultResponse(serviceInstance)));try {T response = request.apply(serviceInstance);Object clientResponse = getClientResponse(response);supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.SUCCESS, lbRequest, defaultResponse, clientResponse)));return response;}catch (IOException iOException) {supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.FAILED, iOException, lbRequest, defaultResponse)));throw iOException;}catch (Exception exception) {supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.FAILED, exception, lbRequest, defaultResponse)));ReflectionUtils.rethrowRuntimeException(exception);}return null;}private <T> Object getClientResponse(T response) {ClientHttpResponse clientHttpResponse = null;if (response instanceof ClientHttpResponse) {clientHttpResponse = (ClientHttpResponse) response;}if (clientHttpResponse != null) {try {return new ResponseData(clientHttpResponse, null);}catch (IOException ignored) {}}return response;}private Set<LoadBalancerLifecycle> getSupportedLifecycleProcessors(String serviceId) {return LoadBalancerLifecycleValidator.getSupportedLifecycleProcessors(loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),DefaultRequestContext.class, Object.class, ServiceInstance.class);}@Overridepublic URI reconstructURI(ServiceInstance serviceInstance, URI original) {return LoadBalancerUriTools.reconstructURI(serviceInstance, original);}@Overridepublic ServiceInstance choose(String serviceId) {return choose(serviceId, REQUEST);}// 重点!通过负载均衡器同步选择一个服务实例并返回@Overridepublic <T> ServiceInstance choose(String serviceId, Request<T> request) {ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId);if (loadBalancer == null) {return null;}Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();if (loadBalancerResponse == null) {return null;}return loadBalancerResponse.getServer();}private String getHint(String serviceId) {LoadBalancerProperties properties = loadBalancerClientFactory.getProperties(serviceId);String defaultHint = properties.getHint().getOrDefault("default", "default");String hintPropertyValue = properties.getHint().get(serviceId);return hintPropertyValue != null ? hintPropertyValue : defaultHint;}}

3. LoadBalancerRequestFactory

LoadBalancerRequestFactory 类用于创建封装负载均衡请求的 LoadBalancerRequest 实例,支持请求转换器和负载均衡客户端的配置。

public class LoadBalancerRequestFactory {private final LoadBalancerClient loadBalancer;private final List<LoadBalancerRequestTransformer> transformers;public LoadBalancerRequestFactory(LoadBalancerClient loadBalancer,List<LoadBalancerRequestTransformer> transformers) {this.loadBalancer = loadBalancer;this.transformers = transformers;}public LoadBalancerRequestFactory(LoadBalancerClient loadBalancer) {this.loadBalancer = loadBalancer;transformers = new ArrayList<>();}public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request, final byte[] body,final ClientHttpRequestExecution execution) {return new BlockingLoadBalancerRequest(loadBalancer, transformers,new BlockingLoadBalancerRequest.ClientHttpRequestData(request, body, execution));}}
LoadBalancerRequestTransformer

LoadBalancerRequestTransformer 接口允许在负载均衡过程中根据不同的服务实例自定义转换 HttpRequest,如修改请求头、URL 等,同时通过 @Order 注解控制其执行顺序。

@Order(LoadBalancerRequestTransformer.DEFAULT_ORDER)
public interface LoadBalancerRequestTransformer {int DEFAULT_ORDER = 0;HttpRequest transformRequest(HttpRequest request, ServiceInstance instance);
}

4. BlockingLoadBalancerRequest

BlockingLoadBalancerRequest 类实现了负载均衡请求接口,负责将原始 HTTP 请求封装为负载均衡请求,并支持应用请求转换器和执行负载均衡操作。

class BlockingLoadBalancerRequest implements HttpRequestLoadBalancerRequest<ClientHttpResponse> {private final LoadBalancerClient loadBalancer;private final List<LoadBalancerRequestTransformer> transformers;private final ClientHttpRequestData clientHttpRequestData;BlockingLoadBalancerRequest(LoadBalancerClient loadBalancer, List<LoadBalancerRequestTransformer> transformers,ClientHttpRequestData clientHttpRequestData) {this.loadBalancer = loadBalancer;this.transformers = transformers;this.clientHttpRequestData = clientHttpRequestData;}@Overridepublic ClientHttpResponse apply(ServiceInstance instance) throws Exception {HttpRequest serviceRequest = new ServiceRequestWrapper(clientHttpRequestData.request, instance, loadBalancer);if (this.transformers != null) {for (LoadBalancerRequestTransformer transformer : this.transformers) {serviceRequest = transformer.transformRequest(serviceRequest, instance);}}return clientHttpRequestData.execution.execute(serviceRequest, clientHttpRequestData.body);}@Overridepublic HttpRequest getHttpRequest() {return clientHttpRequestData.request;}static class ClientHttpRequestData {private final HttpRequest request;private final byte[] body;private final ClientHttpRequestExecution execution;ClientHttpRequestData(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) {this.request = request;this.body = body;this.execution = execution;}}}
http://www.lryc.cn/news/609592.html

相关文章:

  • 软件设计 VS 软件需求:了解成功软件开发外包的关键差异
  • 【数据结构入门】链表
  • Centos7.9安装Oracle11.2.0.1版本问题处理
  • Python实现Word转PDF全攻略:从入门到实战
  • 电商直播流量爆发式增长,华为云分布式流量治理与算力调度服务的应用场景剖析
  • windows内核研究(软件调试-软件断点)
  • 房屋租赁小程序租房小程序房产信息发布系统房屋租赁微信小程序源码
  • 架构师面试(三十九):微服务重构单体应用
  • 剧本杀小程序系统开发:开启沉浸式推理社交新纪元
  • 力扣1124:表现良好的最长时间段
  • 【Java】使用FreeMarker来实现Word自定义导出
  • leetcode-sql-3497分析订阅转化
  • 旧物回收小程序:开启绿色生活新篇章
  • Array容器学习
  • LeetCode 132:分割回文串 II
  • 【YOLO系列】YOLOv12详解:模型结构、损失函数、训练方法及代码实现
  • 关于Npm和Nvm的用法
  • Linux 环境 libpq加载异常导致psql 连接 PostgreSQL 库失败失败案例
  • uniapp开发微信小程序textarea在ios下有默认内边距的问题(textarea兼容问题)
  • 如何给Word和WPS文档添加密码或取消密码
  • Ethereum:拥抱开源,OpenZeppelin 未来的两大基石 Relayers 与 Monitor
  • Jwts用于创建和验证 ​​JSON Web Token(JWT)​​ 的开源库详解
  • OpenLayers 入门指南【五】:Map 容器
  • R 语言科研绘图第 67 期 --- 箱线图-显著性
  • Nestjs框架: Node.js 多环境配置策略与 dotenv 与 config 库详解
  • 政府财政行业云原生转型之路
  • Druid学习笔记 01、快速了解Druid中SqlParser实现
  • 排序算法入门:直接插入排序详解
  • 室内分布系统
  • ICCV 2025|单视频生成动态4D场景!中科大微软突破4D生成瓶颈,动画效果炸裂来袭!