当前位置: 首页 > news >正文

[spring-cloud: 动态刷新]-源码分析

项目地址

以 eureka-a 和 service-c为例。

手动刷新

依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId>
</dependency> 
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>

配置

server:port: 8761spring:application:name: eureka-arabbitmq:host: localhostport: 5672username: guestpassword: guest# https://docs.spring.io/spring-cloud-config/reference/server.htmlprofiles:active: compositecloud:config:server: prefix: /configbootstrap: true# monitor:#   fixedDelay: 3000composite:- type: nativesearch-locations: file:///${user.dir}/config-repo# https://docs.spring.io/spring-cloud-netflix/reference/configprops.html
eureka:instance:hostname: localhostclient:registerWithEureka: falsefetchRegistry: falseserviceUrl:defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka/server:enable-self-preservation: true# https://docs.spring.io/spring-cloud-bus/reference/quickstart.html
management:endpoints:web:exposure:include: busrefresh, busshutdown, busenv, health, info

在这里插入图片描述

测试

修改配置
# service-c-dev.ymlk2: v3

在这里插入图片描述

请求刷新
curl -X POST http://localhost:8761/actuator/busrefresh

在这里插入图片描述

自动刷新

依赖

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-config-monitor</artifactId>
</dependency>
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>

配置

server:port: 8761spring:application:name: eureka-arabbitmq:host: localhostport: 5672username: guestpassword: guest# https://docs.spring.io/spring-cloud-config/reference/server.htmlprofiles:active: compositecloud:config:server: prefix: /configbootstrap: truemonitor:fixedDelay: 3000composite:- type: nativesearch-locations: file:///${user.dir}/config-repo# https://docs.spring.io/spring-cloud-netflix/reference/configprops.html
eureka:instance:hostname: localhostclient:registerWithEureka: falsefetchRegistry: falseserviceUrl:defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka/server:enable-self-preservation: true# https://docs.spring.io/spring-cloud-bus/reference/quickstart.html
# management:
#   endpoints:
#     web:
#       exposure:
#         include: busrefresh, busshutdown, busenv, health, info

测试

# service-c-dev.ymlk2: v3

在这里插入图片描述

核心代码

添加配置

BusEnvironmentPostProcessor

BusEnvironmentPostProcessor 的作用是为 Spring Cloud Bus 设置默认的环境配置,确保在启用时自动配置函数定义、输入输出绑定、服务 ID 等属性。

# https://docs.spring.io/spring-cloud-stream/reference/spring-cloud-stream.html# 函数定义配置
spring.cloud.function.definition=busConsumer# Spring Cloud Stream 输入绑定配置
spring.cloud.stream.function.bindings.busConsumer-in-0=springCloudBusInput
spring.cloud.stream.bindings.springCloudBusInput.destination=spring.cloud.bus.destination.springCloudBus# Spring Cloud Stream 输出绑定配置
spring.cloud.stream.bindings.springCloudBusOutput.destination=spring.cloud.bus.destination.springCloudBus# Spring Cloud Bus ID 配置
spring.cloud.bus.id=...  # 自动生成服务 ID
public class BusEnvironmentPostProcessor implements EnvironmentPostProcessor {static final String DEFAULTS_PROPERTY_SOURCE_NAME = "springCloudBusDefaultProperties";static final String OVERRIDES_PROPERTY_SOURCE_NAME = "springCloudBusOverridesProperties";private static final String FN_DEF_PROP = FunctionProperties.PREFIX + ".definition";@Overridepublic void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {if (environment.containsProperty(ConditionalOnBusEnabled.SPRING_CLOUD_BUS_ENABLED)) {if (Boolean.FALSE.toString().equalsIgnoreCase(environment.getProperty(ConditionalOnBusEnabled.SPRING_CLOUD_BUS_ENABLED))) {return;}}Map<String, Object> overrides = new HashMap<>();String definition = BusConstants.BUS_CONSUMER;if (environment.containsProperty(FN_DEF_PROP)) {String property = environment.getProperty(FN_DEF_PROP);if (property != null && property.contains(BusConstants.BUS_CONSUMER)) {// in the case that EnvironmentPostProcessor are run more than once.return;}definition = property + ";" + definition;}overrides.put(FN_DEF_PROP, definition);addOrReplace(environment.getPropertySources(), overrides, OVERRIDES_PROPERTY_SOURCE_NAME, true);Map<String, Object> defaults = new HashMap<>();defaults.put("spring.cloud.stream.function.bindings." + BusConstants.BUS_CONSUMER + "-in-0",BusConstants.INPUT);String destination = environment.getProperty(PREFIX + ".destination", BusConstants.DESTINATION);defaults.put("spring.cloud.stream.bindings." + BusConstants.INPUT + ".destination", destination);defaults.put("spring.cloud.stream.bindings." + BusConstants.OUTPUT + ".destination", destination);if (!environment.containsProperty(PREFIX + ".id")) {String unresolvedServiceId = IdUtils.getUnresolvedServiceId();if (StringUtils.hasText(environment.getProperty("spring.profiles.active"))) {unresolvedServiceId = IdUtils.getUnresolvedServiceIdWithActiveProfiles();}defaults.put(PREFIX + ".id", unresolvedServiceId);}addOrReplace(environment.getPropertySources(), defaults, DEFAULTS_PROPERTY_SOURCE_NAME, false);}public static void addOrReplace(MutablePropertySources propertySources, Map<String, Object> map,String propertySourceName, boolean first) {MapPropertySource target = null;if (propertySources.contains(propertySourceName)) {PropertySource<?> source = propertySources.get(propertySourceName);if (source instanceof MapPropertySource) {target = (MapPropertySource) source;for (String key : map.keySet()) {if (!target.containsProperty(key)) {target.getSource().put(key, map.get(key));}}}}if (target == null) {target = new MapPropertySource(propertySourceName, map);}if (!propertySources.contains(propertySourceName)) {if (first) {propertySources.addFirst(target);}else {propertySources.addLast(target);}}}}

BusConsumer

BusConsumer 是一个实现了 Consumer<RemoteApplicationEvent> 接口的类,处理来自 Spring Cloud Stream 数据流的远程事件,基于事件的来源和配置条件进行事件发布,支持事件的跟踪和确认(ACK)

public class BusConsumer implements Consumer<RemoteApplicationEvent> {private final Log log = LogFactory.getLog(getClass());private final ApplicationEventPublisher publisher;private final ServiceMatcher serviceMatcher;private final ObjectProvider<BusBridge> busBridge;private final BusProperties properties;private final Destination.Factory destinationFactory;public BusConsumer(ApplicationEventPublisher publisher, ServiceMatcher serviceMatcher, ObjectProvider<BusBridge> busBridge, BusProperties properties, Destination.Factory destinationFactory) {this.publisher = publisher;this.serviceMatcher = serviceMatcher;this.busBridge = busBridge;this.properties = properties;this.destinationFactory = destinationFactory;}@Overridepublic void accept(RemoteApplicationEvent event) {if (event instanceof AckRemoteApplicationEvent) {if (this.properties.getTrace().isEnabled() && !this.serviceMatcher.isFromSelf(event)&& this.publisher != null) {this.publisher.publishEvent(event);}// If it's an ACK we are finished processing at this pointreturn;}if (log.isDebugEnabled()) {log.debug("Received remote event from bus: " + event);}if (this.serviceMatcher.isForSelf(event) && this.publisher != null) {if (!this.serviceMatcher.isFromSelf(event)) {this.publisher.publishEvent(event);}if (this.properties.getAck().isEnabled()) {AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this, this.serviceMatcher.getBusId(),destinationFactory.getDestination(this.properties.getAck().getDestinationService()),event.getDestinationService(), event.getId(), event.getClass());this.busBridge.ifAvailable(bridge -> bridge.send(ack));this.publisher.publishEvent(ack);}}if (this.properties.getTrace().isEnabled() && this.publisher != null) {// We are set to register sent events so publish it for local consumption,// irrespective of the originthis.publisher.publishEvent(new SentApplicationEvent(this, event.getOriginService(),event.getDestinationService(), event.getId(), event.getClass()));}}}

事件传递

RemoteApplicationEventListener

public class RemoteApplicationEventListener implements ApplicationListener<RemoteApplicationEvent> {private final Log log = LogFactory.getLog(getClass());private final ServiceMatcher serviceMatcher;private final BusBridge busBridge;public RemoteApplicationEventListener(ServiceMatcher serviceMatcher, BusBridge busBridge) {this.serviceMatcher = serviceMatcher;this.busBridge = busBridge;}@Overridepublic void onApplicationEvent(RemoteApplicationEvent event) {if (this.serviceMatcher.isFromSelf(event) && !(event instanceof AckRemoteApplicationEvent)) {if (log.isDebugEnabled()) {log.debug("Sending remote event on bus: " + event);}// TODO: configurable mimetype?this.busBridge.send(event);}}}
public class StreamBusBridge implements BusBridge {private final StreamBridge streamBridge;private final BusProperties properties;public StreamBusBridge(StreamBridge streamBridge, BusProperties properties) {this.streamBridge = streamBridge;this.properties = properties;}public void send(RemoteApplicationEvent event) {// TODO: configurable mimetype?this.streamBridge.send(BusConstants.OUTPUT, MessageBuilder.withPayload(event).build());}}

刷新配置

RefreshListener

public class RefreshListener implements ApplicationListener<RefreshRemoteApplicationEvent> {private static Log log = LogFactory.getLog(RefreshListener.class);private ContextRefresher contextRefresher;private ServiceMatcher serviceMatcher;public RefreshListener(ContextRefresher contextRefresher, ServiceMatcher serviceMatcher) {this.contextRefresher = contextRefresher;this.serviceMatcher = serviceMatcher;}@Overridepublic void onApplicationEvent(RefreshRemoteApplicationEvent event) {log.info("Received remote refresh request.");if (serviceMatcher.isForSelf(event)) {Set<String> keys = this.contextRefresher.refresh();log.info("Keys refreshed " + keys);}else {log.info("Refresh not performed, the event was targeting " + event.getDestinationService());}}}

ContextRefresher

public abstract class ContextRefresher {protected final Log logger = LogFactory.getLog(getClass());protected static final String[] DEFAULT_PROPERTY_SOURCES = new String[] {// order matters, if cli args aren't first, things get messyCommandLinePropertySource.COMMAND_LINE_PROPERTY_SOURCE_NAME, "defaultProperties"};protected Set<String> standardSources = new HashSet<>(Arrays.asList(StandardEnvironment.SYSTEM_PROPERTIES_PROPERTY_SOURCE_NAME,StandardEnvironment.SYSTEM_ENVIRONMENT_PROPERTY_SOURCE_NAME,StandardServletEnvironment.JNDI_PROPERTY_SOURCE_NAME,StandardServletEnvironment.SERVLET_CONFIG_PROPERTY_SOURCE_NAME,StandardServletEnvironment.SERVLET_CONTEXT_PROPERTY_SOURCE_NAME, "configurationProperties"));protected final List<String> additionalPropertySourcesToRetain;private ConfigurableApplicationContext context;private RefreshScope scope;// ...public synchronized Set<String> refresh() {Set<String> keys = refreshEnvironment();this.scope.refreshAll();return keys;}public synchronized Set<String> refreshEnvironment() {Map<String, Object> before = extract(this.context.getEnvironment().getPropertySources());updateEnvironment();Set<String> keys = changes(before, extract(this.context.getEnvironment().getPropertySources())).keySet();this.context.publishEvent(new EnvironmentChangeEvent(this.context, keys));return keys;}protected abstract void updateEnvironment();// Don't use ConfigurableEnvironment.merge() in case there are clashes with property// source namesprotected StandardEnvironment copyEnvironment(ConfigurableEnvironment input) {StandardEnvironment environment = new StandardEnvironment();MutablePropertySources capturedPropertySources = environment.getPropertySources();// Only copy the default property source(s) and the profiles over from the main// environment (everything else should be pristine, just like it was on startup).List<String> propertySourcesToRetain = new ArrayList<>(Arrays.asList(DEFAULT_PROPERTY_SOURCES));if (!CollectionUtils.isEmpty(additionalPropertySourcesToRetain)) {propertySourcesToRetain.addAll(additionalPropertySourcesToRetain);}for (String name : propertySourcesToRetain) {if (input.getPropertySources().contains(name)) {if (capturedPropertySources.contains(name)) {capturedPropertySources.replace(name, input.getPropertySources().get(name));}else {capturedPropertySources.addLast(input.getPropertySources().get(name));}}}environment.setActiveProfiles(input.getActiveProfiles());environment.setDefaultProfiles(input.getDefaultProfiles());return environment;}private Map<String, Object> changes(Map<String, Object> before, Map<String, Object> after) {Map<String, Object> result = new HashMap<>();for (String key : before.keySet()) {if (!after.containsKey(key)) {result.put(key, null);}else if (!Objects.equals(before.get(key), after.get(key))) {result.put(key, after.get(key));}}for (String key : after.keySet()) {if (!before.containsKey(key)) {result.put(key, after.get(key));}}return result;}private Map<String, Object> extract(MutablePropertySources propertySources) {Map<String, Object> result = new HashMap<>();List<PropertySource<?>> sources = new ArrayList<>();for (PropertySource<?> source : propertySources) {sources.add(0, source);}for (PropertySource<?> source : sources) {if (!this.standardSources.contains(source.getName())) {extract(source, result);}}return result;}private void extract(PropertySource<?> parent, Map<String, Object> result) {if (parent instanceof CompositePropertySource) {try {List<PropertySource<?>> sources = new ArrayList<>();for (PropertySource<?> source : ((CompositePropertySource) parent).getPropertySources()) {sources.add(0, source);}for (PropertySource<?> source : sources) {extract(source, result);}}catch (Exception e) {return;}}else if (parent instanceof EnumerablePropertySource) {for (String key : ((EnumerablePropertySource<?>) parent).getPropertyNames()) {result.put(key, parent.getProperty(key));}}}@Configuration(proxyBeanMethods = false)protected static class Empty {}}

ConfigDataContextRefresher

public class ConfigDataContextRefresher extends ContextRefresher implements ApplicationListener<ContextRefreshedWithApplicationEvent> {private SpringApplication application;public ConfigDataContextRefresher(ConfigurableApplicationContext context, RefreshScope scope, RefreshAutoConfiguration.RefreshProperties properties) {super(context, scope, properties);}@Overridepublic void onApplicationEvent(ContextRefreshedWithApplicationEvent event) {application = event.getSpringApplication();}@Overrideprotected void updateEnvironment() {if (logger.isTraceEnabled()) {logger.trace("Re-processing environment to add config data");}StandardEnvironment environment = copyEnvironment(getContext().getEnvironment());ConfigurableBootstrapContext bootstrapContext = getContext().getBeanProvider(ConfigurableBootstrapContext.class).getIfAvailable(DefaultBootstrapContext::new);// run thru all EnvironmentPostProcessor instances. This lets things like vcap and// decrypt happen after refresh. The hard coded call to// ConfigDataEnvironmentPostProcessor.applyTo() is now automated as well.DeferredLogFactory logFactory = new PassthruDeferredLogFactory();List<String> classNames = SpringFactoriesLoader.loadFactoryNames(EnvironmentPostProcessor.class, getClass().getClassLoader());Instantiator<EnvironmentPostProcessor> instantiator = new Instantiator<>(EnvironmentPostProcessor.class,(parameters) -> {parameters.add(DeferredLogFactory.class, logFactory);parameters.add(Log.class, logFactory::getLog);parameters.add(ConfigurableBootstrapContext.class, bootstrapContext);parameters.add(BootstrapContext.class, bootstrapContext);parameters.add(BootstrapRegistry.class, bootstrapContext);});List<EnvironmentPostProcessor> postProcessors = instantiator.instantiate(classNames);for (EnvironmentPostProcessor postProcessor : postProcessors) {postProcessor.postProcessEnvironment(environment, application);}MutablePropertySources target = getContext().getEnvironment().getPropertySources();String targetName = null;for (PropertySource<?> source : environment.getPropertySources()) {String name = source.getName();if (target.contains(name)) {targetName = name;}if (!this.standardSources.contains(name)) {if (target.contains(name)) {target.replace(name, source);}else {if (targetName != null) {target.addAfter(targetName, source);// update targetName to preserve orderingtargetName = name;}else {// targetName was null so we are at the start of the listtarget.addFirst(source);targetName = name;}}}}}static class PassthruDeferredLogFactory implements DeferredLogFactory {@Overridepublic Log getLog(Supplier<Log> destination) {return destination.get();}@Overridepublic Log getLog(Class<?> destination) {return getLog(() -> LogFactory.getLog(destination));}@Overridepublic Log getLog(Log destination) {return getLog(() -> destination);}}}
http://www.lryc.cn/news/613324.html

相关文章:

  • 基于MATLAB实现支持向量机(SVM)分类
  • android 之 Kotlin中Handler的使用
  • 栅栏密码的加密解密原理
  • zookeeper因jute.maxbuffer启动异常问题排查处理
  • 使用 decimal 包解决 go float 浮点数运算失真
  • 可执行文件的生成与加载执行
  • Linux的进程间通信
  • 嵌入式学习硬件(一)ARM体系架构
  • 简单手写Transformer:原理与代码详解
  • Java中的反射机制
  • 土壤盐分传感器与土壤电导率传感器直接的关系
  • 深入理解String类:揭秘Java字符串常量池的优化机制
  • 【2025最新版】火狐浏览器(官方版)安装-附教程
  • 飞算JavaAI深度解析:Java开发者的智能革命
  • AUTOSAR进阶图解==>AUTOSAR_EXP_BSWDistributionGuide
  • 损耗对信号质量的影响
  • Java 八大经典排序算法全解析
  • 数组指针-函数指针-回调函数
  • 人工智能——自动微分
  • Docker容器部署harbor-小白级教学
  • Dlib库是什么?白话,详细介绍版
  • python中用xlrd、xlwt读取和写入Excel中的日期值
  • GIT操作卡顿
  • 机器学习核心算法与实践要素(全篇)
  • java excel转图片常用的几种方法
  • 玳瑁的嵌入式日记D14-0807(C语言)
  • NVIDIA/k8s-device-plugin仓库中GPU无法识别问题的issues分析报告
  • Linux学习记录 DNS
  • LocalSqueeze(图片压缩工具) v1.0.4 压缩
  • nlp-句法分析