当前位置: 首页 > news >正文

Apollo架构篇 - 客户端架构

前言

本文基于 Apollo 1.8.0 版本展开分析。

客户端

使用

Apollo 支持 API 方式和 Spring 整合两种方式。

API 方式

API 方式是最简单、高效使用使用 Apollo 配置的方式,不依赖 Spring 框架即可使用。

获取命名空间的配置

// 1、获取默认的命名空间的配置
Config config = ConfigService.getAppConfig();// 2、获取properties格式的命名空间的配置
// String somePublicNamespace = "CAT";
// Config config = ConfigService.getConfig(somePublicNamespace);// 3、获取yaml/yml格式的命名空间的配置
// Config config = ConfigService.getConfig("application.yml");// 4、获取其它格式的命名空间的配置
// ConfigFile configFile = ConfigService.getConfigFile("test", ConfigFileFormat.XML);
// String content = configFile.getContent();String someKey = "someKeyFromDefaultNamespace";
String someDefaultValue = "someDefaultValueForTheKey";
String value = config.getProperty(someKey, someDefaultValue);

通过 Config 的 getProperty 方法可以获取到指定属性对应的属性值。

监听配置变化事件

Config config = ConfigService.getAppConfig();
config.addChangeListener(new ConfigChangeListener() {@Overridepubic void onChange(ConfigChangeEvent changeEvent) {System.out.println("Changes for namespace " + changeEvent.getNamespace());for (String key : changeEvent.changedKeys()) {ConfigChange change = changeEvent.getChange(key);System.out.println(String.format("Found change - key: %s, oldValue: %s, newValue: %s, changeType: %s", change.getPropertyName(), change.getOldValue(), change.getNewValue(), change.getChangeType()));}}
});

希望配置发生变化时得到通知。通过 Config 的 addChangeListener 方法添加一个 ConfigChangeListener 监听器。

Spring整合方式

可以在代码中直接使用,如:@Value("${someKeyFromApollo:someDefaultValue}")

也可以在配置文件中使用,如:spring.datasource.url:${someKeyFromApollo:someDefaultValue}

甚至可以直接托管 Spring 中的配置。如:在 Apollo 中直接配置 spring.datasource.url=jdbc:mysql://localhost:3306/somedb

支持 Spring Boot 的 @ConfigurationProperties 方式。

也支持结合 API 方式使用,如:@ApolloConfig private Config config;

基于 Spring 的配置

注入默认的命名空间的配置到 Spring 中

@Configuration
@EnableApolloConfig
public class AppConfig {}

注入多个命名空间的配置到 Spring 中

@Configuration
@EnableApolloConfig({"FX.apollo", "application.yml"})
public class AnotherAppConfig {}

注入多个命名空间的配置到 Spring 中,并且指定顺序

在 @EnableApolloConfig 注解中的 order 属性指定顺序,值越小则顺序越靠前。

@Configuration
@EnableApolloConfig(order = 2)
public class AppConfig {}@Configuration
@EnableApolloConfig(value = {"FX.apollo", "application.yml"}, order = 1)
public class AnotherAppConfig {}

基于 Spring Boot 的配置

额外支持通过 application.properties / bootstrap.properties 进行配置,该方式可以使配置在更早的阶段注入,比如使用 @ConditionalOnProperty 的场景或者有一些 spring-boot-starter 在启动阶段就需要读取配置然后做一些事情。

# 启动阶段注入application命名空间的配置
apollo.bootstrap.enabled = true

也支持注入多个命名空间的配置。

# 启动阶段注入application,FX.apollo,application.yml命名空间的配置
apollo.bootstrap.enabled = true
apollo.bootstrap.namespaces = application,FX.apollo,application.yml

可以让 Apollo 的加载顺序在日志系统之前,比如希望把日志相关的配置(logging.level.root=info 或者 logback-spring.xml 中的参数)也交给 Apollo 管理。

# 启动阶段注入application命名空间的配置
apollo.bootstrap.enabled = true
# 让 Apollo 的加载顺序在日志系统之前
apollo.bootstrap.eagerLoad.enabled = true

原理

1、客户端和服务端保持了一个长连接,从而能第一时间获得配置更新的推送。

2、客户端还会定时从服务端拉取应用的最新配置。

  • 这是一个 fallback 机制,为了防止推送机制失效导致配置不更新。
  • 客户端定时拉取会上报本地版本,所以一般情况下,对于定时拉取的操作,服务端都会返回 304 - Not Modified。
  • 定时频率默认为每 5 分钟拉取一次,客户端也可以通过运行时指定 apollo.refreshInterval 系统参数来覆盖,单位为分钟。

3、客户端从服务端获取到应用的最新配置后,会保存在内存中。

4、客户端会把从服务端获取到的配置在本地文件系统缓存一份。在遇到服务不可用,或者网络不通的时候,依然能从本地恢复配置。

5、应用程序可以从 Apollo 客户端获取最新的配置、订阅配置更新通知。

源码分析

RemoteConfigRepository

在 ConfigService 的 getConfig 方法中会调用 RemoteConfigRepository 的构造器方法。

先来看 RemoteConfigRepository 构造器方法。

public RemoteConfigRepository(String namespace) {m_namespace = namespace;m_configCache = new AtomicReference<>();m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);m_httpUtil = ApolloInjector.getInstance(HttpUtil.class);m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class);m_longPollServiceDto = new AtomicReference<>();m_remoteMessages = new AtomicReference<>();m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());m_configNeedForceRefresh = new AtomicBoolean(true);m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(),m_configUtil.getOnErrorRetryInterval() * 8);// 客户端从Apollo配置中心服务端获取到应用的最新配置后,会保存在内存中,同时持久化到磁盘中// 应用程序可以从客户端获取最新的配置、订阅配置更新通知this.trySync();// 客户端定时调度处理this.schedulePeriodicRefresh();// 客户端长轮询处理this.scheduleLongPollingRefresh();
}

1、trySync 方法

首先看下 trySync 方法的处理逻辑。

protected boolean trySync() {try {sync();return true;} catch (Throwable ex) {Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));logger.warn("Sync config failed, will retry. Repository {}, reason: {}", this.getClass(), ExceptionUtil.getDetailMessage(ex));}return false;
}

进入 sync 方法内部一窥究竟。

@Override
protected synchronized void sync() {Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");try {// 从 m_configCache 缓存中获取本地配置ApolloConfig previous = m_configCache.get();// 从服务端加载远程配置ApolloConfig current = loadApolloConfig();// 如果本地配置与远程配置不一致,即远程配置发生了变化if (previous != current) {logger.debug("Remote Config refreshed!");// 更新 m_configCache 缓存m_configCache.set(current);// 回调所有 RepositoryChangeListener 的 onRepositoryChange 方法this.fireRepositoryChange(m_namespace, this.getConfig());}if (current != null) {Tracer.logEvent(String.format("Apollo.Client.Configs.%s", current.getNamespaceName()),current.getReleaseKey());}transaction.setStatus(Transaction.SUCCESS);} catch (Throwable ex) {transaction.setStatus(ex);throw ex;} finally {transaction.complete();}
}

不难看出 sync 方法实际上对应原理 3 - 客户端从Apollo配置中心服务端获取到应用的最新配置后,会保存在内存中。

2、schedulePeriodicRefresh 方法

初始延迟 5 分钟,之后每隔 5 分钟重复调度一次 trySync 方法。

private void schedulePeriodicRefresh() {logger.debug("Schedule periodic refresh with interval: {} {}",m_configUtil.getRefreshInterval(), m_configUtil.getRefreshIntervalTimeUnit());// 默认初始延迟5分钟,之后每隔5分钟重复调度一次m_executorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {Tracer.logEvent("Apollo.ConfigService", String.format("periodicRefresh: %s", m_namespace));logger.debug("refresh config for namespace: {}", m_namespace);trySync();Tracer.logEvent("Apollo.Client.Version", Apollo.VERSION);}}, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(),m_configUtil.getRefreshIntervalTimeUnit());
}

不难看出 schedulePeriodicRefresh 方法实际上对应原理 2 - 客户端还会定时从服务端拉取应用的最新配置。

3、scheduleLongPollingRefresh 方法

客户端向服务端发起长轮询请求。实际上对应原理 1 - 客户端和服务端保持了一个长连接,从而能第一时间获得配置更新的推送。

private void scheduleLongPollingRefresh() {// 交给 RemoteConfigLongPollService 处理remoteConfigLongPollService.submit(m_namespace, this);
}

接着看下 RemoteConfigLongPollService 的 submit 方法是如何处理的。

public boolean submit(String namespace, RemoteConfigRepository remoteConfigRepository) {// 更新 m_longPollNamespaces 缓存boolean added = m_longPollNamespaces.put(namespace, remoteConfigRepository);// 更新 m_notifications 缓存m_notifications.putIfAbsent(namespace, INIT_NOTIFICATION_ID);if (!m_longPollStarted.get()) {// 执行 startLongPolling 方法startLongPolling();}return added;
}

接着看下 startLongPolling 方法的处理逻辑。

private void startLongPolling() {if (!m_longPollStarted.compareAndSet(false, true)) {return;}try {final String appId = m_configUtil.getAppId();final String cluster = m_configUtil.getCluster();final String dataCenter = m_configUtil.getDataCenter();final String secret = m_configUtil.getAccessKeySecret();// 默认 2000 毫秒final long longPollingInitialDelayInMills = m_configUtil.getLongPollingInitialDelayInMills();m_longPollingService.submit(new Runnable() {@Overridepublic void run() {if (longPollingInitialDelayInMills > 0) {try {logger.debug("Long polling will start in {} ms.", longPollingInitialDelayInMills);TimeUnit.MILLISECONDS.sleep(longPollingInitialDelayInMills);} catch (InterruptedException e) {//ignore}}// 执行 doLongPollingRefresh 方法doLongPollingRefresh(appId, cluster, dataCenter, secret);}});} catch (Throwable ex) {m_longPollStarted.set(false);ApolloConfigException exception =new ApolloConfigException("Schedule long polling refresh failed", ex);Tracer.logError(exception);logger.warn(ExceptionUtil.getDetailMessage(exception));}
}

接着看下 doLongPollingRefresh(…) 方法的处理逻辑。

private void doLongPollingRefresh(String appId, String cluster, String dataCenter, String secret) {final Random random = new Random();ServiceDTO lastServiceDto = null;while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) {// 限流判断if (!m_longPollRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {}}Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "pollNotification");String url = null;try {if (lastServiceDto == null) {// 向服务端发起一个 /services/config 的 GET 请求,从响应体中得到 ServiceDTO 列表List<ServiceDTO> configServices = getConfigServices();// 采用随机策略从 ServiceDTO 列表中选出一个 ServiceDTO 实例lastServiceDto = configServices.get(random.nextInt(configServices.size()));}// 组装请求的 URLurl = assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster, dataCenter,m_notifications);logger.debug("Long polling from {}", url);HttpRequest request = new HttpRequest(url);// 默认读操作的超时时间为 90 秒request.setReadTimeout(LONG_POLLING_READ_TIMEOUT);if (!StringUtils.isBlank(secret)) {Map<String, String> headers = Signature.buildHttpHeaders(url, appId, secret);request.setHeaders(headers);}transaction.addData("Url", url);// 1、底层采用 HttpURLConnection 向服务端发起一个 /notifications/v2 的 GET 请求final HttpResponse<List<ApolloConfigNotification>> response =m_httpUtil.doGet(request, m_responseType);logger.debug("Long polling response: {}, url: {}", response.getStatusCode(), url);if (response.getStatusCode() == 200 && response.getBody() != null) {// 2、更新客户端本地 m_notifications 缓存updateNotifications(response.getBody());// 3、更新客户端本地 m_remoteNotificationMessages 缓存updateRemoteNotifications(response.getBody());transaction.addData("Result", response.getBody().toString());// 4、对比客户端本地缓存与服务端返回的配置信息,如果发生变化,则更新本地缓存并触发监听器回调notify(lastServiceDto, response.getBody());}if (response.getStatusCode() == 304 && random.nextBoolean()) {lastServiceDto = null;}m_longPollFailSchedulePolicyInSecond.success();transaction.addData("StatusCode", response.getStatusCode());transaction.setStatus(Transaction.SUCCESS);} catch (Throwable ex) {lastServiceDto = null;Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));transaction.setStatus(ex);long sleepTimeInSecond = m_longPollFailSchedulePolicyInSecond.fail();logger.warn("Long polling failed, will retry in {} seconds. appId: {}, cluster: {}, namespaces: {}, long polling url: {}, reason: {}",sleepTimeInSecond, appId, cluster, assembleNamespaces(), url, ExceptionUtil.getDetailMessage(ex));try {TimeUnit.SECONDS.sleep(sleepTimeInSecond);} catch (InterruptedException ie) {//ignore}} finally {transaction.complete();}}
}

默认的监听器是 LocalFileConfigRepository。

LocalFileConfigRepository

看下 onRepositoryChange 方法,它是如何处理的。

@Override
public void onRepositoryChange(String namespace, Properties newProperties) {if (newProperties.equals(m_fileProperties)) {return;}Properties newFileProperties = propertiesFactory.getPropertiesInstance();newFileProperties.putAll(newProperties);// 1、将配置持久化到磁盘中updateFileProperties(newFileProperties, m_upstream.getSourceType());// 2、触发监听器回调this.fireRepositoryChange(namespace, newProperties);
}

1、updateFileProperties 方法

private synchronized void updateFileProperties(Properties newProperties, ConfigSourceType sourceType) {this.m_sourceType = sourceType;if (newProperties.equals(m_fileProperties)) {return;}this.m_fileProperties = newProperties;persistLocalCacheFile(m_baseDir, m_namespace);
}

进入 persistLocalCacheFile 方法一窥究竟。

void persistLocalCacheFile(File baseDir, String namespace) {if (baseDir == null) {return;}// 文件名为 ${appId} + ${cluster} + ${namespace}.propertiesFile file = assembleLocalCacheFile(baseDir, namespace);OutputStream out = null;Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "persistLocalConfigFile");transaction.addData("LocalConfigFile", file.getAbsolutePath());try {out = new FileOutputStream(file);// 底层调用 Properties 的 store 方法将配置持久化到磁盘中m_fileProperties.store(out, "Persisted by DefaultConfig");transaction.setStatus(Transaction.SUCCESS);} catch (IOException ex) {ApolloConfigException exception =new ApolloConfigException(String.format("Persist local cache file %s failed", file.getAbsolutePath()), ex);Tracer.logError(exception);transaction.setStatus(exception);logger.warn("Persist local cache file {} failed, reason: {}.", file.getAbsolutePath(),ExceptionUtil.getDetailMessage(ex));} finally {if (out != null) {try {out.close();} catch (IOException ex) {//ignore}}transaction.complete();}
}

简单看下 Properties 的 store 方法是如何持久化的。

public void store(OutputStream out, String comments)throws IOException
{store0(new BufferedWriter(new OutputStreamWriter(out, "8859_1")),comments,true);
}private void store0(BufferedWriter bw, String comments, boolean escUnicode)throws IOException
{if (comments != null) {writeComments(bw, comments);}bw.write("#" + new Date().toString());bw.newLine();synchronized (this) {for (Enumeration<?> e = keys(); e.hasMoreElements();) {String key = (String)e.nextElement();String val = (String)get(key);key = saveConvert(key, true, escUnicode);val = saveConvert(val, false, escUnicode);bw.write(key + "=" + val);bw.newLine();}}bw.flush();
}

可以看出底层使用 BufferedWriter 的 write 方法将数据写入到磁盘中。

而本地磁盘文件的路径在 findLocalCacheDir 方法中进行定义。

LocalFileConfigRepository 构造器方法中会调用该方法。

private File findLocalCacheDir() {try {String defaultCacheDir = m_configUtil.getDefaultLocalCacheDir();Path path = Paths.get(defaultCacheDir);if (!Files.exists(path)) {Files.createDirectories(path);}if (Files.exists(path) && Files.isWritable(path)) {// 一种是通过参数指定的文件路径return new File(defaultCacheDir, CONFIG_DIR);}} catch (Throwable ex) {//ignore}// 另一种是应用程序的上下文路径return new File(ClassLoaderUtil.getClassPath(), CONFIG_DIR);
}

简单看下第一种通过参数指定的文件路径。

public String getDefaultLocalCacheDir() {String cacheRoot = getCustomizedCacheRoot();if (!Strings.isNullOrEmpty(cacheRoot)) {return cacheRoot + File.separator + getAppId();}cacheRoot = isOSWindows() ? "C:\\opt\\data\\%s" : "/opt/data/%s";return String.format(cacheRoot, getAppId());
}

进入 getCustomizedCacheRoot 方法一窥究竟。

private String getCustomizedCacheRoot() {// 1. Get from System PropertyString cacheRoot = System.getProperty("apollo.cacheDir");if (Strings.isNullOrEmpty(cacheRoot)) {// 2. Get from OS environment variablecacheRoot = System.getenv("APOLLO_CACHEDIR");}if (Strings.isNullOrEmpty(cacheRoot)) {// 3. Get from server.propertiescacheRoot = Foundation.server().getProperty("apollo.cacheDir", null);}if (Strings.isNullOrEmpty(cacheRoot)) {// 4. Get from app.propertiescacheRoot = Foundation.app().getProperty("apollo.cacheDir", null);}return cacheRoot;
}

2、fireRepositoryChange 方法

实际上对应原理 5 - 应用程序可以从 Apollo 客户端获取最新的配置、订阅配置更新通知。

protected void fireRepositoryChange(String namespace, Properties newProperties) {for (RepositoryChangeListener listener : m_listeners) {try {listener.onRepositoryChange(namespace, newProperties);} catch (Throwable ex) {Tracer.logError(ex);logger.error("Failed to invoke repository change listener {}", listener.getClass(), ex);}}
}

默认的监听器是 DefaultConfig。

DefaultConfig

看下 DefaultConfig 是如何回调处理的。

@Override
public synchronized void onRepositoryChange(String namespace, Properties newProperties) {if (newProperties.equals(m_configProperties.get())) {return;}ConfigSourceType sourceType = m_configRepository.getSourceType();Properties newConfigProperties = propertiesFactory.getPropertiesInstance();newConfigProperties.putAll(newProperties);Map<String, ConfigChange> actualChanges = updateAndCalcConfigChanges(newConfigProperties, sourceType);if (actualChanges.isEmpty()) {return;}// 监听器回调处理this.fireConfigChange(new ConfigChangeEvent(m_namespace, actualChanges));Tracer.logEvent("Apollo.Client.ConfigChanges", m_namespace);
}

进入 fireConfigChange 方法一窥究竟。

protected void fireConfigChange(final ConfigChangeEvent changeEvent) {for (final ConfigChangeListener listener : m_listeners) {if (!isConfigChangeListenerInterested(listener, changeEvent)) {continue;}m_executorService.submit(new Runnable() {@Overridepublic void run() {String listenerName = listener.getClass().getName();Transaction transaction = Tracer.newTransaction("Apollo.ConfigChangeListener", listenerName);try {// 触发监听器回调listener.onChange(changeEvent);transaction.setStatus(Transaction.SUCCESS);} catch (Throwable ex) {transaction.setStatus(ex);Tracer.logError(ex);logger.error("Failed to invoke config change listener {}", listenerName, ex);} finally {transaction.complete();}}});}

监听器默认是 AutoUpdateConfigChangeListener。

AutoUpdateConfigChangeListener

看下 AutoUpdateConfigChangeListener 是如何回调处理的。

@Override
public void onChange(ConfigChangeEvent changeEvent) {// 获取发生变化的属性集合Set<String> keys = changeEvent.changedKeys();if (CollectionUtils.isEmpty(keys)) {return;}for (String key : keys) {Collection<SpringValue> targetValues = springValueRegistry.get(beanFactory, key);if (targetValues == null || targetValues.isEmpty()) {continue;}// 更新应用程序使用到的属性对应的属性值for (SpringValue val : targetValues) {updateSpringValue(val);}}
}

进入 updateSpringValue 方法一窥究竟。

private void updateSpringValue(SpringValue springValue) {try {// 获取经过解析后的属性值Object value = resolvePropertyValue(springValue);// 更新应用程序使用到的属性对应的属性值springValue.update(value);// 日志打印logger.info("Auto update apollo changed value successfully, new value: {}, {}", value,springValue);} catch (Throwable ex) {logger.error("Auto update apollo changed value failed, {}", springValue.toString(), ex);}
}

SpringValue

简单看下,如何更新应用程序使用到的属性对应的属性值的。

public void update(Object newVal) throws IllegalAccessException, InvocationTargetException {if (isField()) {injectField(newVal);} else {injectMethod(newVal);}
}private void injectField(Object newVal) throws IllegalAccessException {Object bean = beanRef.get();if (bean == null) {return;}boolean accessible = field.isAccessible();field.setAccessible(true);// 底层使用 Field 的 set 方法更新属性值field.set(bean, newVal);field.setAccessible(accessible);
}private void injectMethod(Object newVal)throws InvocationTargetException, IllegalAccessException {Object bean = beanRef.get();if (bean == null) {return;}// 底层使用 Method 的 invoke 方法更新属性值methodParameter.getMethod().invoke(bean, newVal);
}
http://www.lryc.cn/news/369.html

相关文章:

  • JVM调优最全面的成长 :参数详解+垃圾算法+示例展示+类文件到源码+面试问题
  • linux驱动常用函数
  • Flowable进阶学习(九)数据对象DataObject、租户Tenant、接收任务ReceiveTask
  • C语言实现五子棋(n子棋)
  • OpenStack云平台搭建(2) | 安装Keystone
  • 基于javaFX的固定资产管理系统
  • 板子登录和挂载问题记录
  • 二、Linux文件 - Open函数讲解实战
  • 源码分析Spring解决循环依赖的过程
  • LabVIEW中加载.NET 2.0,3.0和3.5程序集
  • Fluent Python 笔记 第 2 章 序列构成的数组
  • 句子扩充法
  • Java并发编程概述
  • Java常见数据结构的排序与遍历(包括数组,List,Map)
  • 数据结构|绪论
  • 内网渗透(十二)之内网信息收集-内网端口扫描和发现
  • RabbitMq相关面试题
  • 树莓派开机自启动Python脚本或者应用程序
  • 全国青少年编程等级考试scratch四级真题2022年9月(含题库答题软件账号)
  • NodeJS与npm版本不一致时降级npm的方法
  • 《C++ Primer Plus》第16章:string类和标准模板库(8)
  • Linux安装达梦8数据库
  • [数据库]初识数据库
  • Redis的缓存雪崩、击穿、穿透和解决方案
  • 52000000
  • 内网资源探测
  • Java后端内部面试题(前一部分)
  • 关于如何抄引擎源码
  • 差分模拟信号转单端输出电路设计
  • Java中的clone方法