当前位置: 首页 > news >正文

【源码解析】Nacos配置热更新的实现原理

使用入门

  1. 使用@RefreshScope+@Value,实现动态刷新
@RestController
@RefreshScope
public class TestController {@Value("${cls.name}")private String clsName;}
  1. 使用ConfigurationProperties,通过@Autowired注入使用
@Data
@ConfigurationProperties(prefix = "redis")
@Component
public class RedisProperties {private String userName;private String password;private String url;
}

源码解析

CacheData

CacheData#checkListenerMd5,当发现监听到的数据和本地配置不一致,会进行提醒。核心点在于会执行listener.receiveConfigInfo(contentTmp);

    void checkListenerMd5() {for (ManagerListenerWrap wrap : listeners) {if (!md5.equals(wrap.lastCallMd5)) {safeNotifyListener(dataId, group, content, type, md5, encryptedDataKey, wrap);}}}private void safeNotifyListener(final String dataId, final String group, final String content, final String type,final String md5, final String encryptedDataKey, final ManagerListenerWrap listenerWrap) {final Listener listener = listenerWrap.listener;Runnable job = new Runnable() {@Overridepublic void run() {ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();ClassLoader appClassLoader = listener.getClass().getClassLoader();try {if (listener instanceof AbstractSharedListener) {AbstractSharedListener adapter = (AbstractSharedListener) listener;adapter.fillContext(dataId, group);LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);}// 执行回调之前先将线程classloader设置为具体webapp的classloader,以免回调方法中调用spi接口是出现异常或错用(多应用部署才会有该问题)。Thread.currentThread().setContextClassLoader(appClassLoader);ConfigResponse cr = new ConfigResponse();cr.setDataId(dataId);cr.setGroup(group);cr.setContent(content);cr.setEncryptedDataKey(encryptedDataKey);configFilterChainManager.doFilter(null, cr);String contentTmp = cr.getContent();listener.receiveConfigInfo(contentTmp);// compare lastContent and contentif (listener instanceof AbstractConfigChangeListener) {Map data = ConfigChangeHandler.getInstance().parseChangeData(listenerWrap.lastContent, content, type);ConfigChangeEvent event = new ConfigChangeEvent(data);((AbstractConfigChangeListener) listener).receiveConfigChange(event);listenerWrap.lastContent = content;}listenerWrap.lastCallMd5 = md5;LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5,listener);} catch (NacosException ex) {LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}",name, dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg());} catch (Throwable t) {LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId,group, md5, listener, t.getCause());} finally {Thread.currentThread().setContextClassLoader(myClassLoader);}}};final long startNotify = System.currentTimeMillis();try {if (null != listener.getExecutor()) {listener.getExecutor().execute(job);} else {job.run();}} catch (Throwable t) {LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId,group, md5, listener, t.getCause());}final long finishNotify = System.currentTimeMillis();LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",name, (finishNotify - startNotify), dataId, group, md5, listener);}

NacosContextRefresher

NacosContextRefresher#onApplicationEvent,系统启动的时候,会注册监听器。该监听器重写了innerReceive方法。

	public void onApplicationEvent(ApplicationReadyEvent event) {// many Spring contextif (this.ready.compareAndSet(false, true)) {this.registerNacosListenersForApplications();}}private void registerNacosListenersForApplications() {if (isRefreshEnabled()) {for (NacosPropertySource propertySource : NacosPropertySourceRepository.getAll()) {if (!propertySource.isRefreshable()) {continue;}String dataId = propertySource.getDataId();registerNacosListener(propertySource.getGroup(), dataId);}}}private void registerNacosListener(final String groupKey, final String dataKey) {String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);Listener listener = listenerMap.computeIfAbsent(key,lst -> new AbstractSharedListener() {@Overridepublic void innerReceive(String dataId, String group,String configInfo) {refreshCountIncrement();nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);// todo feature: support single refresh for listeningapplicationContext.publishEvent(new RefreshEvent(this, null, "Refresh Nacos config"));if (log.isDebugEnabled()) {log.debug(String.format("Refresh Nacos config group=%s,dataId=%s,configInfo=%s",group, dataId, configInfo));}}});try {configService.addListener(dataKey, groupKey, listener);}catch (NacosException e) {log.warn(String.format("register fail for nacos listener ,dataId=[%s],group=[%s]", dataKey,groupKey), e);}}

AbstractSharedListener执行receiveConfigInfo方法,会调用innerReceive方法。

public abstract class AbstractSharedListener implements Listener {private volatile String dataId;private volatile String group;public AbstractSharedListener() {}public final void fillContext(String dataId, String group) {this.dataId = dataId;this.group = group;}public final void receiveConfigInfo(String configInfo) {this.innerReceive(this.dataId, this.group, configInfo);}public Executor getExecutor() {return null;}public abstract void innerReceive(String var1, String var2, String var3);
}

RefreshEventListener

RefreshEventListener监听到RefreshEvent,会执行this.refresh.refresh();

    public void onApplicationEvent(ApplicationEvent event) {if (event instanceof ApplicationReadyEvent) {this.handle((ApplicationReadyEvent)event);} else if (event instanceof RefreshEvent) {this.handle((RefreshEvent)event);}}    public void handle(RefreshEvent event) {if (this.ready.get()) {log.debug("Event received " + event.getEventDesc());Set<String> keys = this.refresh.refresh();log.info("Refresh keys changed: " + keys);}}

ContextRefresher

ContextRefresher#refresh,会发布EnvironmentChangeEvent事件。ContextRefresher#addConfigFilesToEnvironment启动了一个spring applicaiton程序去加载了一次配置,将变化后的配置重新加载进来了,然后进行this.scope.refreshAll();

	public synchronized Set<String> refresh() {Set<String> keys = refreshEnvironment();this.scope.refreshAll();return keys;}public synchronized Set<String> refreshEnvironment() {Map<String, Object> before = extract(this.context.getEnvironment().getPropertySources());addConfigFilesToEnvironment();Set<String> keys = changes(before,extract(this.context.getEnvironment().getPropertySources())).keySet();this.context.publishEvent(new EnvironmentChangeEvent(this.context, keys));return keys;}ConfigurableApplicationContext addConfigFilesToEnvironment() {ConfigurableApplicationContext capture = null;try {StandardEnvironment environment = copyEnvironment(this.context.getEnvironment());SpringApplicationBuilder builder = new SpringApplicationBuilder(Empty.class).bannerMode(Mode.OFF).web(WebApplicationType.NONE).environment(environment);// Just the listeners that affect the environment (e.g. excluding logging// listener because it has side effects)builder.application().setListeners(Arrays.asList(new BootstrapApplicationListener(),new ConfigFileApplicationListener()));capture = builder.run();if (environment.getPropertySources().contains(REFRESH_ARGS_PROPERTY_SOURCE)) {environment.getPropertySources().remove(REFRESH_ARGS_PROPERTY_SOURCE);}MutablePropertySources target = this.context.getEnvironment().getPropertySources();String targetName = null;for (PropertySource<?> source : environment.getPropertySources()) {String name = source.getName();if (target.contains(name)) {targetName = name;}if (!this.standardSources.contains(name)) {if (target.contains(name)) {target.replace(name, source);}else {if (targetName != null) {target.addAfter(targetName, source);// update targetName to preserve orderingtargetName = name;}else {// targetName was null so we are at the start of the listtarget.addFirst(source);targetName = name;}}}}}finally {ConfigurableApplicationContext closeable = capture;while (closeable != null) {try {closeable.close();}catch (Exception e) {// Ignore;}if (closeable.getParent() instanceof ConfigurableApplicationContext) {closeable = (ConfigurableApplicationContext) closeable.getParent();}else {break;}}}return capture;}

RefreshScope#refreshAll,会发布RefreshScopeRefreshedEvent事件。

	@ManagedOperation(description = "Dispose of the current instance of all beans "+ "in this scope and force a refresh on next method execution.")public void refreshAll() {super.destroy();this.context.publishEvent(new RefreshScopeRefreshedEvent());}

GenericScope#destroy(),清理GenericScope缓存。

    protected boolean destroy(String name) {GenericScope.BeanLifecycleWrapper wrapper = this.cache.remove(name);if (wrapper != null) {Lock lock = ((ReadWriteLock)this.locks.get(wrapper.getName())).writeLock();lock.lock();try {wrapper.destroy();} finally {lock.unlock();}this.errors.remove(name);return true;} else {return false;}}

获取Bean对象

@Target({ ElementType.TYPE, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Scope("refresh")
@Documented
public @interface RefreshScope {/*** @see Scope#proxyMode()* @return proxy mode*/ScopedProxyMode proxyMode() default ScopedProxyMode.TARGET_CLASS;}

AbstractBeanFactory#doGetBean,获取Bean。如果该类有@Refresh注解,获取对应的scope,执行scope.get来获取Bean对象。

	protected <T> T doGetBean(String name, @Nullable Class<T> requiredType, @Nullable Object[] args, boolean typeCheckOnly)throws BeansException {String beanName = transformedBeanName(name);Object bean;// Eagerly check singleton cache for manually registered singletons.Object sharedInstance = getSingleton(beanName);if (sharedInstance != null && args == null) {//...}else {// ...try {// ...// Create bean instance.if (mbd.isSingleton()) {sharedInstance = getSingleton(beanName, () -> {try {return createBean(beanName, mbd, args);}catch (BeansException ex) {// Explicitly remove instance from singleton cache: It might have been put there// eagerly by the creation process, to allow for circular reference resolution.// Also remove any beans that received a temporary reference to the bean.destroySingleton(beanName);throw ex;}});bean = getObjectForBeanInstance(sharedInstance, name, beanName, mbd);}else if (mbd.isPrototype()) {// It's a prototype -> create a new instance.Object prototypeInstance = null;try {beforePrototypeCreation(beanName);prototypeInstance = createBean(beanName, mbd, args);}finally {afterPrototypeCreation(beanName);}bean = getObjectForBeanInstance(prototypeInstance, name, beanName, mbd);}else {String scopeName = mbd.getScope();if (!StringUtils.hasLength(scopeName)) {throw new IllegalStateException("No scope name defined for bean ´" + beanName + "'");}Scope scope = this.scopes.get(scopeName);if (scope == null) {throw new IllegalStateException("No Scope registered for scope name '" + scopeName + "'");}try {Object scopedInstance = scope.get(beanName, () -> {beforePrototypeCreation(beanName);try {return createBean(beanName, mbd, args);}finally {afterPrototypeCreation(beanName);}});bean = getObjectForBeanInstance(scopedInstance, name, beanName, mbd);}catch (IllegalStateException ex) {throw new BeanCreationException(beanName,"Scope '" + scopeName + "' is not active for the current thread; consider " +"defining a scoped proxy for this bean if you intend to refer to it from a singleton",ex);}}}catch (BeansException ex) {cleanupAfterBeanCreationFailure(beanName);throw ex;}}// Check if required type matches the type of the actual bean instance.if (requiredType != null && !requiredType.isInstance(bean)) {try {T convertedBean = getTypeConverter().convertIfNecessary(bean, requiredType);if (convertedBean == null) {throw new BeanNotOfRequiredTypeException(name, requiredType, bean.getClass());}return convertedBean;}catch (TypeMismatchException ex) {if (logger.isTraceEnabled()) {logger.trace("Failed to convert bean '" + name + "' to required type '" +ClassUtils.getQualifiedName(requiredType) + "'", ex);}throw new BeanNotOfRequiredTypeException(name, requiredType, bean.getClass());}}return (T) bean;}

GenericScope#get,获取缓存中的对象

    public Object get(String name, ObjectFactory<?> objectFactory) {GenericScope.BeanLifecycleWrapper value = this.cache.put(name, new GenericScope.BeanLifecycleWrapper(name, objectFactory));this.locks.putIfAbsent(name, new ReentrantReadWriteLock());try {return value.getBean();} catch (RuntimeException var5) {this.errors.put(name, var5);throw var5;}}

StandardScopeCache#put,如果旧值不存在,存放成功,返回当前value值;如果旧值存在,返回旧值。

    public Object put(String name, Object value) {Object result = this.cache.putIfAbsent(name, value);return result != null ? result : value;}

GenericScope.BeanLifecycleWrapper#getBean,调用objectFactory.getObject()。也就是创建Bean。

        public Object getBean() {if (this.bean == null) {synchronized(this.name) {if (this.bean == null) {this.bean = this.objectFactory.getObject();}}}return this.bean;}

ConfigurationPropertiesRebinderAutoConfiguration

ConfigurationPropertiesRebinderAutoConfiguration注入了ConfigurationPropertiesBeansConfigurationPropertiesRebinder

@Configuration(proxyBeanMethods = false)
@ConditionalOnBean(ConfigurationPropertiesBindingPostProcessor.class)
public class ConfigurationPropertiesRebinderAutoConfigurationimplements ApplicationContextAware, SmartInitializingSingleton {private ApplicationContext context;@Overridepublic void setApplicationContext(ApplicationContext applicationContext) {this.context = applicationContext;}@Bean@ConditionalOnMissingBean(search = SearchStrategy.CURRENT)public static ConfigurationPropertiesBeans configurationPropertiesBeans() {return new ConfigurationPropertiesBeans();}@Bean@ConditionalOnMissingBean(search = SearchStrategy.CURRENT)public ConfigurationPropertiesRebinder configurationPropertiesRebinder(ConfigurationPropertiesBeans beans) {ConfigurationPropertiesRebinder rebinder = new ConfigurationPropertiesRebinder(beans);return rebinder;}
}

ConfigurationPropertiesBeans实现了BeanPostProcessor,启动的时候会执行postProcessBeforeInitialization。寻找带有ConfigurationProperties注解的Bean对象,封装成ConfigurationPropertiesBean,存放到ConfigurationPropertiesBeansbeans属性中。

	@Overridepublic Object postProcessBeforeInitialization(Object bean, String beanName)throws BeansException {if (isRefreshScoped(beanName)) {return bean;}ConfigurationPropertiesBean propertiesBean = ConfigurationPropertiesBean.get(this.applicationContext, bean, beanName);if (propertiesBean != null) {this.beans.put(beanName, propertiesBean);}return bean;}public static ConfigurationPropertiesBean get(ApplicationContext applicationContext, Object bean, String beanName) {Method factoryMethod = findFactoryMethod(applicationContext, beanName);return create(beanName, bean, bean.getClass(), factoryMethod);}private static ConfigurationPropertiesBean create(String name, Object instance, Class<?> type, Method factory) {ConfigurationProperties annotation = findAnnotation(instance, type, factory, ConfigurationProperties.class);if (annotation == null) {return null;}Validated validated = findAnnotation(instance, type, factory, Validated.class);Annotation[] annotations = (validated != null) ? new Annotation[] { annotation, validated }: new Annotation[] { annotation };ResolvableType bindType = (factory != null) ? ResolvableType.forMethodReturnType(factory): ResolvableType.forClass(type);Bindable<Object> bindTarget = Bindable.of(bindType).withAnnotations(annotations);if (instance != null) {bindTarget = bindTarget.withExistingValue(instance);}return new ConfigurationPropertiesBean(name, instance, annotation, bindTarget);}

ConfigurationPropertiesRebinder实现了ApplicationListener<EnvironmentChangeEvent>,当监听到EnvironmentChangeEvent,会销毁Bean对象,再重新生成Bean对象。

    public void onApplicationEvent(EnvironmentChangeEvent event) {if (this.applicationContext.equals(event.getSource()) || event.getKeys().equals(event.getSource())) {this.rebind();}}@ManagedOperationpublic void rebind() {this.errors.clear();for (String name : this.beans.getBeanNames()) {rebind(name);}}@ManagedOperationpublic boolean rebind(String name) {if (!this.beans.getBeanNames().contains(name)) {return false;}if (this.applicationContext != null) {try {Object bean = this.applicationContext.getBean(name);if (AopUtils.isAopProxy(bean)) {bean = ProxyUtils.getTargetObject(bean);}if (bean != null) {// TODO: determine a more general approach to fix this.// see https://github.com/spring-cloud/spring-cloud-commons/issues/571if (getNeverRefreshable().contains(bean.getClass().getName())) {return false; // ignore}this.applicationContext.getAutowireCapableBeanFactory().destroyBean(bean);this.applicationContext.getAutowireCapableBeanFactory().initializeBean(bean, name);return true;}}catch (RuntimeException e) {this.errors.put(name, e);throw e;}catch (Exception e) {this.errors.put(name, e);throw new IllegalStateException("Cannot rebind to " + name, e);}}return false;}

总结一下

  • 所有的RefreshScope注解修饰的bean都保存在GenericScope的缓存里,RefreshScope#refreshAll会将缓存全部清空,当spring获取这些Bean的时候,会重新生成保存到缓存中。
  • ConfigurationProperties注解的Bean在监听到EnvironmentChangeEvent,会进行销毁和重新初始化的操作。
  • 如果想要所有的@Value都可以热更新,可以看一下dynamic-config-starter这个项目的实现。

在这里插入图片描述

http://www.lryc.cn/news/91692.html

相关文章:

  • 界面组件DevExpress ASP.NET Core v22.2 - UI组件升级
  • 阿里系文生图(PAI+通义)
  • Netty概述及Hello word入门
  • 汇编寄存器之内存访问
  • C++进阶 —— lambda表达式(C++11新特性)
  • 数据结构04:串的存储结构与KMP算法
  • 零基础快速搭建私人影音媒体平台
  • C++map和set
  • python接口测试之测试报告
  • HGFormer:用于领域广义语义分割的层级式分组Transformer
  • async函数用法
  • 简谈软件版本周期 | Alpha、Beta、RC、Stable版本之间的区别
  • VS2022发布独立部署的.net程序
  • 5-网络初识——封装和分用
  • 机器学习——特征工程
  • ubuntu安装搜狗输入法,图文详解+踩坑解决
  • docker 数据持久化
  • Pytest运行指定的case,这个方法真的很高效……
  • 操作系统复习2.3.4-进程同步问题
  • 3ds MAX 基本体建模,长方体、圆柱体和球体
  • 搭建个人博客
  • JavaScript进阶(下)
  • 基于PyQt5的图形化界面开发——堆栈动画演示
  • 2023 年第三届长三角高校数学建模竞赛赛题浅析
  • sqlite3免费加密开源项目sqlcipher简单使用
  • SOLIDWORKS PDM Professional中的Add-ins
  • 干货 | 郭晓雷:数智安全监管机制研究与思考
  • 感应雷电浪涌的防线,SPD浪涌保护器
  • ThreeJS教程:屏幕坐标转标准设备坐标
  • [elasticsearch 实现插入查询小demo ]