当前位置: 首页 > news >正文

【Nacos】Nacos配置中心客户端配置更新源码分析

在这里插入图片描述

上文我们说了服务启动的时候从远程Nacos服务端拉取配置,这节我们来说下Nacos服务端配置的变动怎么实时通知到客户端,首先需要注册监听器。

注册监听器

NacosContextRefresher类会监听应用启动发布的ApplicationReadyEvent事件,然后进行配置监听器的注册。

com.alibaba.cloud.nacos.refresh.NacosContextRefresher#onApplicationEvent

public void onApplicationEvent(ApplicationReadyEvent event) {// many Spring contextif (this.ready.compareAndSet(false, true)) {this.registerNacosListenersForApplications();}
}

registerNacosListenersForApplications()方法里会进行判断,如果自动刷新机制是开启的,则进行监听器注册。上文我们说到了会将拉到的配置缓存到NacosPropertySourceRepository中, 这儿就从缓存中获取所有的配置,然后循环进行监听器注册(如果配置文件中配置refresh字段为 false,则不注册监听器)。
com.alibaba.cloud.nacos.refresh.NacosContextRefresher#registerNacosListenersForApplications

private void registerNacosListenersForApplications() {if (isRefreshEnabled()) {for (NacosPropertySource propertySource : NacosPropertySourceRepository.getAll()) {if (!propertySource.isRefreshable()) {continue;}String dataId = propertySource.getDataId();registerNacosListener(propertySource.getGroup(), dataId);}}
}

我们可以看到,监听器是以dataId+groupId+namespace为维度进行注册的,后续配置更新时会回调此监听器。

监听器的逻辑主要就三步:

  1. REFRESH_COUNT++,在之前的loadNacosPropertySource()方法有用到
  2. 往NacosRefreshHistory#records中添加一条刷新记录
  3. 发布一个RefreshEvent事件,该事件是SpringCloud提供的,主要就是用来做环境变更刷新用的

com.alibaba.cloud.nacos.refresh.NacosContextRefresher#registerNacosListener

private void registerNacosListener(final String groupKey, final String dataKey) {String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);Listener listener = listenerMap.computeIfAbsent(key,lst -> new AbstractSharedListener() {@Overridepublic void innerReceive(String dataId, String group,String configInfo) {refreshCountIncrement();nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);// 发布RefreshEvent事件// todo feature: support single refresh for listeningapplicationContext.publishEvent(new RefreshEvent(this, null, "Refresh Nacos config"));if (log.isDebugEnabled()) {log.debug(String.format("Refresh Nacos config group=%s,dataId=%s,configInfo=%s",group, dataId, configInfo));}}});try {configService.addListener(dataKey, groupKey, listener);}catch (NacosException e) {log.warn(String.format("register fail for nacos listener ,dataId=[%s],group=[%s]", dataKey,groupKey), e);}
}

监听器的注册操作又委托到了ConfigService。
com.alibaba.nacos.client.config.NacosConfigService#addListener

public void addListener(String dataId, String group, Listener listener) throws NacosException {worker.addTenantListeners(dataId, group, Arrays.asList(listener));
}

监听器的注册在ClientWorker中处理,这块会创建一个CacheData对象,该对象主要就是用来管理监听器的,也是非常重要的一个类。
com.alibaba.nacos.client.config.impl.ClientWorker#addTenantListeners

public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners)throws NacosException {group = blank2defaultGroup(group);String tenant = agent.getTenant();CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);for (Listener listener : listeners) {cache.addListener(listener);}
}

CacheData中中药字段如下:

// 可对配置进行拦截处理,可用于配置加密解密
private final ConfigFilterChainManager configFilterChainManager;public final String dataId;public final String group;public final String tenant;// 关注此配置的监听器
private final CopyOnWriteArrayList<ManagerListenerWrap> listeners;// 用于比较此配置是否变更
private volatile String md5;/*** whether use local config.*/
private volatile boolean isUseLocalConfig = false;/*** last modify time.*/
private volatile long localConfigLastModified;// 配置的内容
private volatile String content;

addCacheDataIfAbsent()方法中会将刚才创建的CacheData缓存到ClientWorker中的一个Map中,后续会用到。

com.alibaba.nacos.client.config.impl.ClientWorker#addCacheDataIfAbsent

public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) throws NacosException {String key = GroupKey.getKeyTenant(dataId, group, tenant);CacheData cacheData = cacheMap.get(key);if (cacheData != null) {return cacheData;}cacheData = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant);// multiple listeners on the same dataid+group and race conditionCacheData lastCacheData = cacheMap.putIfAbsent(key, cacheData);if (lastCacheData == null) {//fix issue # 1317if (enableRemoteSyncConfig) {ConfigResponse response = getServerConfig(dataId, group, tenant, 3000L);cacheData.setContent(response.getContent());}int taskId = cacheMap.size() / (int) ParamUtil.getPerTaskConfigSize();cacheData.setTaskId(taskId);lastCacheData = cacheData;}// reset so that server not hang this checklastCacheData.setInitializing(true);LOGGER.info("[{}] [subscribe] {}", agent.getName(), key);MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.size());return lastCacheData;
}

至此,在服务启动后向每一个需要支持热更新的配置都注册了一个监听器,用来监听远程配置的变动,以及做相应的处理。

获取更新的配置

ClientWorker是在ConfigService的构造方法中创建的。

ClientWorker的构造函数里会去创建两个线程池,executor会每隔10ms进行一次配置变更的检查,executorService主要是用来处理长轮询请求的。

com.alibaba.nacos.client.config.impl.ClientWorker#ClientWorker

public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,final Properties properties) {this.agent = agent;this.configFilterChainManager = configFilterChainManager;// Initialize the timeout parameterinit(properties);this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setName("com.alibaba.nacos.client.Worker." + agent.getName());t.setDaemon(true);return t;}});this.executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());t.setDaemon(true);return t;}});this.executor.scheduleWithFixedDelay(new Runnable() {@Overridepublic void run() {try {// 检查配置信息checkConfigInfo();} catch (Throwable e) {LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);}}}, 1L, 10L, TimeUnit.MILLISECONDS);
}

checkConfigInfo()负责提交长轮询任务。
com.alibaba.nacos.client.config.impl.ClientWorker#checkConfigInfo

public void checkConfigInfo() {// Dispatch tasks.int listenerSize = cacheMap.size();// Round up the longingTaskCount.int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());if (longingTaskCount > currentLongingTaskCount) {for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {// The task list is no order.So it maybe has issues when changing.executorService.execute(new LongPollingRunnable(i));}currentLongingTaskCount = longingTaskCount;}
}

长轮询任务的执行过程。
com.alibaba.nacos.client.config.impl.ClientWorker.LongPollingRunnable#run

public void run() {List<CacheData> cacheDatas = new ArrayList<CacheData>();List<String> inInitializingCacheList = new ArrayList<String>();try {// check failover configfor (CacheData cacheData : cacheMap.values()) {if (cacheData.getTaskId() == taskId) {cacheDatas.add(cacheData);try {checkLocalConfig(cacheData);if (cacheData.isUseLocalConfigInfo()) {cacheData.checkListenerMd5();}} catch (Exception e) {LOGGER.error("get local config info error", e);}}}// check server config// 校验配置List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);if (!CollectionUtils.isEmpty(changedGroupKeys)) {LOGGER.info("get changedGroupKeys:" + changedGroupKeys);}for (String groupKey : changedGroupKeys) {String[] key = GroupKey.parseKey(groupKey);String dataId = key[0];String group = key[1];String tenant = null;if (key.length == 3) {tenant = key[2];}try {// 根据dataId从服务端查询最新的配置ConfigResponse response = getServerConfig(dataId, group, tenant, 3000L);CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));cache.setContent(response.getContent());cache.setEncryptedDataKey(response.getEncryptedDataKey());if (null != response.getConfigType()) {cache.setType(response.getConfigType());}LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",agent.getName(), dataId, group, tenant, cache.getMd5(),ContentUtils.truncateContent(response.getContent()), response.getConfigType());} catch (NacosException ioe) {String message = String.format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",agent.getName(), dataId, group, tenant);LOGGER.error(message, ioe);}}for (CacheData cacheData : cacheDatas) {if (!cacheData.isInitializing() || inInitializingCacheList.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {// 校验md5是否变化,有变化就发通知cacheData.checkListenerMd5();cacheData.setInitializing(false);}}inInitializingCacheList.clear();executorService.execute(this);} catch (Throwable e) {// If the rotation training task is abnormal, the next execution time of the task will be punishedLOGGER.error("longPolling error : ", e);executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);}
}

checkUpdateDataIds()该方法中,会将所有的dataId按定义格式拼接出一个字符串,构造一个长轮询请求,发给服务端,Long-Pulling-Timeout 超时时间默认30s,如果服务端没有配置变更,则会保持该请求直到超时,有配置变更则直接返回有变更的dataId列表。
com.alibaba.nacos.client.config.impl.ClientWorker#checkUpdateDataIds

List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws Exception {StringBuilder sb = new StringBuilder();for (CacheData cacheData : cacheDatas) {if (!cacheData.isUseLocalConfigInfo()) {sb.append(cacheData.dataId).append(WORD_SEPARATOR);sb.append(cacheData.group).append(WORD_SEPARATOR);if (StringUtils.isBlank(cacheData.tenant)) {sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);} else {sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);}if (cacheData.isInitializing()) {// It updates when cacheData occurs in cacheMap by first time.// 添加要初始化的cacheDatainInitializingCacheList.add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant));}}}boolean isInitializingCacheList = !inInitializingCacheList.isEmpty();// 检验服务器端的配置return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);
}

checkUpdateConfigStr()会发起HTTP接口/v1/cs/configs/listener的调用。
com.alibaba.nacos.client.config.impl.ClientWorker#checkUpdateConfigStr

List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception {Map<String, String> params = new HashMap<String, String>(2);params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);Map<String, String> headers = new HashMap<String, String>(2);// 使用长轮询headers.put("Long-Pulling-Timeout", "" + timeout);// told server do not hang me up if new initializing cacheData added inif (isInitializingCacheList) {headers.put("Long-Pulling-Timeout-No-Hangup", "true");}if (StringUtils.isBlank(probeUpdateString)) {return Collections.emptyList();}try {// In order to prevent the server from handling the delay of the client's long task,// increase the client's read timeout to avoid this problem.long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);// 调用远程的监听接口HttpRestResult<String> result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(),readTimeoutMs);if (result.ok()) {setHealthServer(true);return parseUpdateDataIdResponse(result.getData());} else {setHealthServer(false);LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(),result.getCode());}} catch (Exception e) {setHealthServer(false);LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e);throw e;}return Collections.emptyList();
}

checkListenerMd5()主要就是判断两个md5是不是相同,不同则调用safeNotifyListener()处理。
com.alibaba.nacos.client.config.impl.CacheData#checkListenerMd5

void checkListenerMd5() {for (ManagerListenerWrap wrap : listeners) {if (!md5.equals(wrap.lastCallMd5)) {// 配置有变化通知监听器safeNotifyListener(dataId, group, content, type, md5, encryptedDataKey, wrap);}}
}

safeNotifyListener()方法主要就是调用监听器的receiveConfigInfo()方法,然后更新监听器包装器中的lastContent、lastCallMd5字段。
com.alibaba.nacos.client.config.impl.CacheData#safeNotifyListener

private void safeNotifyListener(final String dataId, final String group, final String content, final String type,final String md5, final String encryptedDataKey, final ManagerListenerWrap listenerWrap) {final Listener listener = listenerWrap.listener;Runnable job = new Runnable() {@Overridepublic void run() {ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();ClassLoader appClassLoader = listener.getClass().getClassLoader();try {if (listener instanceof AbstractSharedListener) {AbstractSharedListener adapter = (AbstractSharedListener) listener;adapter.fillContext(dataId, group);LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);}// 执行回调之前先将线程classloader设置为具体webapp的classloader,以免回调方法中调用spi接口是出现异常或错用(多应用部署才会有该问题)。Thread.currentThread().setContextClassLoader(appClassLoader);ConfigResponse cr = new ConfigResponse();cr.setDataId(dataId);cr.setGroup(group);cr.setContent(content);cr.setEncryptedDataKey(encryptedDataKey);configFilterChainManager.doFilter(null, cr);String contentTmp = cr.getContent();/*** @see com.alibaba.cloud.nacos.refresh.NacosContextRefresher#registerNacosListener(java.lang.String, java.lang.String)*/listener.receiveConfigInfo(contentTmp);// compare lastContent and contentif (listener instanceof AbstractConfigChangeListener) {Map data = ConfigChangeHandler.getInstance().parseChangeData(listenerWrap.lastContent, content, type);ConfigChangeEvent event = new ConfigChangeEvent(data);((AbstractConfigChangeListener) listener).receiveConfigChange(event);listenerWrap.lastContent = content;}listenerWrap.lastCallMd5 = md5;LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5,listener);} catch (NacosException ex) {LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}",name, dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg());} catch (Throwable t) {LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId,group, md5, listener, t.getCause());} finally {Thread.currentThread().setContextClassLoader(myClassLoader);}}};final long startNotify = System.currentTimeMillis();try {if (null != listener.getExecutor()) {listener.getExecutor().execute(job);} else {job.run();}} catch (Throwable t) {LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId,group, md5, listener, t.getCause());}final long finishNotify = System.currentTimeMillis();LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",name, (finishNotify - startNotify), dataId, group, md5, listener);
}

监听器要执行的方法我们上面也已经讲过了,主要就是发布RefreshEvent事件。至此,Nacos的处理流程已经结束了,RefreshEvent事件主要由 SpringCloud相关类来处理。

RefreshEvent事件处理

RefreshEvent事件会由RefreshEventListener来处理。

org.springframework.cloud.endpoint.event.RefreshEventListener#onApplicationEvent

public void onApplicationEvent(ApplicationEvent event) {if (event instanceof ApplicationReadyEvent) {handle((ApplicationReadyEvent) event);}else if (event instanceof RefreshEvent) {handle((RefreshEvent) event);}
}

委托给ContextRefresher来刷新容器中的配置。
org.springframework.cloud.endpoint.event.RefreshEventListener#handle(org.springframework.cloud.endpoint.event.RefreshEvent)

public void handle(RefreshEvent event) {if (this.ready.get()) { // don't handle events before app is readylog.debug("Event received " + event.getEventDesc());Set<String> keys = this.refresh.refresh();log.info("Refresh keys changed: " + keys);}
}

org.springframework.cloud.context.refresh.ContextRefresher#refresh

public synchronized Set<String> refresh() {Set<String> keys = refreshEnvironment();this.scope.refreshAll();return keys;
}

refreshEnvironment()会去刷新Spring环境变量,实际上是交给addConfigFilesToEnvironment()方法去做的刷新,具体刷新思想就是重新创建一个新的Spring容器,然后将这个新容器中的环境信息设置到原有的Spring环境中。拿到所有变化的配置项后,发布一个环境变化的 EnvironmentChangeEvent事件。

org.springframework.cloud.context.refresh.ContextRefresher#refreshEnvironment

public synchronized Set<String> refreshEnvironment() {Map<String, Object> before = extract(this.context.getEnvironment().getPropertySources());addConfigFilesToEnvironment();Set<String> keys = changes(before,extract(this.context.getEnvironment().getPropertySources())).keySet();this.context.publishEvent(new EnvironmentChangeEvent(this.context, keys));return keys;
}/* For testing. */ ConfigurableApplicationContext addConfigFilesToEnvironment() {ConfigurableApplicationContext capture = null;try {StandardEnvironment environment = copyEnvironment(this.context.getEnvironment());SpringApplicationBuilder builder = new SpringApplicationBuilder(Empty.class).bannerMode(Mode.OFF).web(WebApplicationType.NONE).environment(environment);// Just the listeners that affect the environment (e.g. excluding logging// listener because it has side effects)builder.application().setListeners(Arrays.asList(new BootstrapApplicationListener(),new ConfigFileApplicationListener()));capture = builder.run();if (environment.getPropertySources().contains(REFRESH_ARGS_PROPERTY_SOURCE)) {environment.getPropertySources().remove(REFRESH_ARGS_PROPERTY_SOURCE);}MutablePropertySources target = this.context.getEnvironment().getPropertySources();String targetName = null;for (PropertySource<?> source : environment.getPropertySources()) {String name = source.getName();if (target.contains(name)) {targetName = name;}if (!this.standardSources.contains(name)) {if (target.contains(name)) {target.replace(name, source);}else {if (targetName != null) {target.addAfter(targetName, source);// update targetName to preserve orderingtargetName = name;}else {// targetName was null so we are at the start of the listtarget.addFirst(source);targetName = name;}}}}}finally {ConfigurableApplicationContext closeable = capture;while (closeable != null) {try {closeable.close();}catch (Exception e) {// Ignore;}if (closeable.getParent() instanceof ConfigurableApplicationContext) {closeable = (ConfigurableApplicationContext) closeable.getParent();}else {break;}}}return capture;
}

org.springframework.cloud.context.scope.refresh.RefreshScope#refreshAll

public void refreshAll() {super.destroy();this.context.publishEvent(new RefreshScopeRefreshedEvent());
}

@Value注解的属性要实现热更新就需要配合@RefreshScope注解,被@RefreshScope注解的对象的作用域为RefreshScope,这种对象不是存在Spring容器的一级缓存中,而是存在GenericScope对象的cache属性中,当配置变更时会清空缓存在cache属性的对象,这样Bean下次使用时就会被重新创建,从而从Environment中获取最新的配置。
org.springframework.cloud.context.scope.GenericScope#destroy()

public void destroy() {List<Throwable> errors = new ArrayList<Throwable>();// 清空缓存Collection<BeanLifecycleWrapper> wrappers = this.cache.clear();for (BeanLifecycleWrapper wrapper : wrappers) {try {Lock lock = this.locks.get(wrapper.getName()).writeLock();lock.lock();try {wrapper.destroy();}finally {lock.unlock();}}catch (RuntimeException e) {errors.add(e);}}if (!errors.isEmpty()) {throw wrapIfNecessary(errors.get(0));}this.errors.clear();
}
http://www.lryc.cn/news/1809.html

相关文章:

  • 按钮防抖与节流-vue2
  • PyTorch学习笔记:nn.SmoothL1Loss——平滑L1损失
  • 2年时间,涨薪20k,想拿高薪还真不能老老实实的工作...
  • Spark - Spark SQL中RBO, CBO与AQE简单介绍
  • NeurIPS/ICLR/ICML AI三大会国内高校和企业近年中稿量完整统计
  • Android IO 框架 Okio 的实现原理,到底哪里 OK?
  • 一文讲解Linux 设备模型 kobject,kset
  • linux配置密码过期的安全策略(/etc/login.defs的解读)
  • c_character_string 字符串----我认真的弄明白了,也希望你们也是。
  • spring面试题 一
  • C++中char *,char a[ ]的特殊应用
  • 【Windows10】电脑副屏无法调节屏幕亮度?解决方法
  • Paper简读 - ProGen2: Exploring the Boundaries of Protein Language Models
  • leaflet 加载WKT数据(示例代码050)
  • 设计模式-组合模式和建筑者模式详解
  • Pcap文件的magic_number
  • MDS75-16-ASEMI三相整流模块MDS75-16
  • 基本TCP编程
  • 【沁恒WCH CH32V307V-R1开发板读取板载温度实验】
  • 学习SpringCloudAlibaba(二)微服务的拆分与编写
  • 通过对HashMap的源码分析解决部分关于HashMap的问题
  • 【无标题】
  • 渗透测试 -- 网站信息收集
  • Windows 搭建ARM虚拟机 UOS系统
  • day58每日温度_下一个更大元素1
  • 超清遥感影像语义分割处理
  • RabbitMQ安装及配置
  • 网络协议(四):网络互联模型、物理层、数据链路层
  • 请问有没有关于数据预测的方法?
  • [CVPR 2021] Your “Flamingo“ is My “Bird“: Fine-Grained, or Not