当前位置: 首页 > news >正文

【Nacos】Nacos配置中心服务端源码分析

在这里插入图片描述

上文说了Nacos配置中心客户端的源码流程,这篇介绍下Nacos配置中心服务端的源码。

服务端的启动

先来看服务启动时干了啥?

init()方法上面有@PostConstruct,该方法会在ExternalDumpService实例化后执行。
com.alibaba.nacos.config.server.service.dump.ExternalDumpService#init

@PostConstruct
@Override
protected void init() throws Throwable {dumpOperate(processor, dumpAllProcessor, dumpAllBetaProcessor, dumpAllTagProcessor);
}

dumpOperate()主要干了两件事:

  1. dumpConfigInfo(),这个方法里面也是调用的DumpAllTask
  2. 提交DumpAllTask的定时任务
    com.alibaba.nacos.config.server.service.dump.DumpService#dumpOperate
protected void dumpOperate(DumpProcessor processor, DumpAllProcessor dumpAllProcessor,DumpAllBetaProcessor dumpAllBetaProcessor, DumpAllTagProcessor dumpAllTagProcessor) throws NacosException {String dumpFileContext = "CONFIG_DUMP_TO_FILE";TimerContext.start(dumpFileContext);try {LogUtil.DEFAULT_LOG.warn("DumpService start");Runnable dumpAll = () -> dumpAllTaskMgr.addTask(DumpAllTask.TASK_ID, new DumpAllTask());... ...try {// 转存配置// 执行一次DumpAllTaskdumpConfigInfo(dumpAllProcessor);... ...} catch (Exception e) {LogUtil.FATAL_LOG.error("Nacos Server did not start because dumpservice bean construction failure :\n" + e.toString());throw new NacosException(NacosException.SERVER_ERROR,"Nacos Server did not start because dumpservice bean construction failure :\n" + e.getMessage(),e);}if (!EnvUtil.getStandaloneMode()) {Runnable heartbeat = () -> {String heartBeatTime = TimeUtils.getCurrentTime().toString();// write disktry {DiskUtil.saveHeartBeatToDisk(heartBeatTime);} catch (IOException e) {LogUtil.FATAL_LOG.error("save heartbeat fail" + e.getMessage());}};ConfigExecutor.scheduleConfigTask(heartbeat, 0, 10, TimeUnit.SECONDS);long initialDelay = new Random().nextInt(INITIAL_DELAY_IN_MINUTE) + 10;LogUtil.DEFAULT_LOG.warn("initialDelay:{}", initialDelay);// 6个小时执行一次DumpAllTaskConfigExecutor.scheduleConfigTask(dumpAll, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);ConfigExecutor.scheduleConfigTask(dumpAllBeta, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);ConfigExecutor.scheduleConfigTask(dumpAllTag, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);}ConfigExecutor.scheduleConfigTask(clearConfigHistory, 10, 10, TimeUnit.MINUTES);} finally {TimerContext.end(dumpFileContext, LogUtil.DUMP_LOG);}}

dumpConfigInfo()里面还是执行了DumpAllTask。
com.alibaba.nacos.config.server.service.dump.DumpService#dumpConfigInfo

private void dumpConfigInfo(DumpAllProcessor dumpAllProcessor) throws IOException {int timeStep = 6;Boolean isAllDump = true;// initial dump allFileInputStream fis = null;Timestamp heartheatLastStamp = null;try {... ...if (isAllDump) {LogUtil.DEFAULT_LOG.info("start clear all config-info.");DiskUtil.clearAll();// 执行DumpAllTaskdumpAllProcessor.process(new DumpAllTask());} else {... ...}} catch (IOException e) {LogUtil.FATAL_LOG.error("dump config fail" + e.getMessage());throw e;} finally {if (null != fis) {try {fis.close();} catch (IOException e) {LogUtil.DEFAULT_LOG.warn("close file failed");}}}
}

process()会分页查询出数据库的所有配置,然后一个一个调用ConfigCacheService.dump()。
com.alibaba.nacos.config.server.service.dump.processor.DumpAllProcessor#process

public boolean process(NacosTask task) {long currentMaxId = persistService.findConfigMaxId();long lastMaxId = 0;while (lastMaxId < currentMaxId) {// 分页查询出数据库的所有配置Page<ConfigInfoWrapper> page = persistService.findAllConfigInfoFragment(lastMaxId, PAGE_SIZE);if (page != null && page.getPageItems() != null && !page.getPageItems().isEmpty()) {for (ConfigInfoWrapper cf : page.getPageItems()) {long id = cf.getId();lastMaxId = id > lastMaxId ? id : lastMaxId;if (cf.getDataId().equals(AggrWhitelist.AGGRIDS_METADATA)) {AggrWhitelist.load(cf.getContent());}if (cf.getDataId().equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) {ClientIpWhiteList.load(cf.getContent());}if (cf.getDataId().equals(SwitchService.SWITCH_META_DATAID)) {SwitchService.load(cf.getContent());}// dump为文件boolean result = ConfigCacheService.dump(cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getContent(), cf.getLastModified(),cf.getType());final String content = cf.getContent();final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);LogUtil.DUMP_LOG.info("[dump-all-ok] {}, {}, length={}, md5={}",GroupKey2.getKey(cf.getDataId(), cf.getGroup()), cf.getLastModified(), content.length(),md5);}DEFAULT_LOG.info("[all-dump] {} / {}", lastMaxId, currentMaxId);} else {lastMaxId += PAGE_SIZE;}}return true;
}

dump()就是将数据库的配置,保存到本地,一个配置对应一个文件,这样客户端来查询配置,直接查的本地文件,而不是查数据库。
com.alibaba.nacos.config.server.service.ConfigCacheService#dump

public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs,String type) {String groupKey = GroupKey2.getKey(dataId, group, tenant);CacheItem ci = makeSure(groupKey);ci.setType(type);final int lockResult = tryWriteLock(groupKey);assert (lockResult != 0);if (lockResult < 0) {DUMP_LOG.warn("[dump-error] write lock failed. {}", groupKey);return false;}try {final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);if (md5.equals(ConfigCacheService.getContentMd5(groupKey))) {DUMP_LOG.warn("[dump-ignore] ignore to save cache file. groupKey={}, md5={}, lastModifiedOld={}, "+ "lastModifiedNew={}", groupKey, md5, ConfigCacheService.getLastModifiedTs(groupKey),lastModifiedTs);} else if (!PropertyUtil.isDirectRead()) {// 写入磁盘DiskUtil.saveToDisk(dataId, group, tenant, content);}// 更新md5,发布LocalDataChangeEvent事件updateMd5(groupKey, md5, lastModifiedTs);return true;} catch (IOException ioe) {DUMP_LOG.error("[dump-exception] save disk error. " + groupKey + ", " + ioe.toString(), ioe);if (ioe.getMessage() != null) {String errMsg = ioe.getMessage();if (NO_SPACE_CN.equals(errMsg) || NO_SPACE_EN.equals(errMsg) || errMsg.contains(DISK_QUATA_CN) || errMsg.contains(DISK_QUATA_EN)) {// Protect from disk full.FATAL_LOG.error("磁盘满自杀退出", ioe);System.exit(0);}}return false;} finally {releaseWriteLock(groupKey);}
}

服务启动过程中主要就是将数据库的配置全部保存到本地。

客户端来查询配置

客户端启动时会调用/v1/cs/configs来查询配置。

com.alibaba.nacos.config.server.controller.ConfigController#getConfig

@GetMapping
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void getConfig(HttpServletRequest request, HttpServletResponse response,@RequestParam("dataId") String dataId, @RequestParam("group") String group,@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,@RequestParam(value = "tag", required = false) String tag)throws IOException, ServletException, NacosException {// 读取配置的入口// check tenantParamUtils.checkTenant(tenant);tenant = NamespaceUtil.processNamespaceParameter(tenant);// check paramsParamUtils.checkParam(dataId, group, "datumId", "content");ParamUtils.checkParam(tag);final String clientIp = RequestUtil.getRemoteIp(request);inner.doGetConfig(request, response, dataId, group, tenant, tag, clientIp);
}

doGetConfig()直接找到文件,使用jdk的零拷贝传输直接将文件输入流转response输出流中。
com.alibaba.nacos.config.server.controller.ConfigServletInner#doGetConfig

public String doGetConfig(HttpServletRequest request, HttpServletResponse response, String dataId, String group,String tenant, String tag, String clientIp) throws IOException, ServletException {final String groupKey = GroupKey2.getKey(dataId, group, tenant);String autoTag = request.getHeader("Vipserver-Tag");String requestIpApp = RequestUtil.getAppName(request);int lockResult = tryConfigReadLock(groupKey);final String requestIp = RequestUtil.getRemoteIp(request);boolean isBeta = false;if (lockResult > 0) {// LockResult > 0 means cacheItem is not null and other thread can`t delete this cacheItemFileInputStream fis = null;try {String md5 = Constants.NULL;long lastModified = 0L;CacheItem cacheItem = ConfigCacheService.getContentCache(groupKey);if (cacheItem.isBeta() && cacheItem.getIps4Beta().contains(clientIp)) {isBeta = true;}final String configType =(null != cacheItem.getType()) ? cacheItem.getType() : FileTypeEnum.TEXT.getFileType();response.setHeader("Config-Type", configType);FileTypeEnum fileTypeEnum = FileTypeEnum.getFileTypeEnumByFileExtensionOrFileType(configType);String contentTypeHeader = fileTypeEnum.getContentType();response.setHeader(HttpHeaderConsts.CONTENT_TYPE, contentTypeHeader);File file = null;ConfigInfoBase configInfoBase = null;PrintWriter out = null;if (isBeta) {md5 = cacheItem.getMd54Beta();lastModified = cacheItem.getLastModifiedTs4Beta();if (PropertyUtil.isDirectRead()) {configInfoBase = persistService.findConfigInfo4Beta(dataId, group, tenant);} else {file = DiskUtil.targetBetaFile(dataId, group, tenant);}response.setHeader("isBeta", "true");} else {if (StringUtils.isBlank(tag)) {if (isUseTag(cacheItem, autoTag)) {... ...} else {md5 = cacheItem.getMd5();lastModified = cacheItem.getLastModifiedTs();if (PropertyUtil.isDirectRead()) {// 单节点模式,直接读取数据库configInfoBase = persistService.findConfigInfo(dataId, group, tenant);} else {// 集群模式,读取磁盘文件file = DiskUtil.targetFile(dataId, group, tenant);}... ...}} else {... ...}}response.setHeader(Constants.CONTENT_MD5, md5);// Disable cache.response.setHeader("Pragma", "no-cache");response.setDateHeader("Expires", 0);response.setHeader("Cache-Control", "no-cache,no-store");if (PropertyUtil.isDirectRead()) {response.setDateHeader("Last-Modified", lastModified);} else {fis = new FileInputStream(file);response.setDateHeader("Last-Modified", file.lastModified());}if (PropertyUtil.isDirectRead()) {out = response.getWriter();out.print(configInfoBase.getContent());out.flush();out.close();} else {// 零拷贝fis.getChannel().transferTo(0L, fis.getChannel().size(), Channels.newChannel(response.getOutputStream()));}LogUtil.PULL_CHECK_LOG.warn("{}|{}|{}|{}", groupKey, requestIp, md5, TimeUtils.getCurrentTimeStr());final long delayed = System.currentTimeMillis() - lastModified;// TODO distinguish pull-get && push-get/*Otherwise, delayed cannot be used as the basis of push delay directly,because the delayed value of active get requests is very large.*/ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, lastModified,ConfigTraceService.PULL_EVENT_OK, delayed, requestIp);} finally {releaseConfigReadLock(groupKey);IoUtils.closeQuietly(fis);}} else if (lockResult == 0) {
... ...} else {
... ...}return HttpServletResponse.SC_OK + "";
}

客户端长轮询监听配置

客户端启动成功后,会调用Http接口/v1/cs/configs/listener长轮询来监听配置的变更。

com.alibaba.nacos.config.server.controller.ConfigController#listener

@PostMapping("/listener")
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void listener(HttpServletRequest request, HttpServletResponse response)throws ServletException, IOException {// 监听配置更新的入口request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);String probeModify = request.getParameter("Listening-Configs");if (StringUtils.isBlank(probeModify)) {throw new IllegalArgumentException("invalid probeModify");}probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);Map<String, String> clientMd5Map;try {clientMd5Map = MD5Util.getClientMd5Map(probeModify);} catch (Throwable e) {throw new IllegalArgumentException("invalid probeModify");}// 长轮询// do long-pollinginner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}

doPollingConfig()会判断是否支持长轮询,依据是header是否包含Long-Pulling-Timeout属性。
com.alibaba.nacos.config.server.controller.ConfigServletInner#doPollingConfig

public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {// Long polling.if (LongPollingService.isSupportLongPolling(request)) {// 支持长轮询longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);return HttpServletResponse.SC_OK + "";}
... ...
}

addLongPollingClient()会将客户端保存起来,方便后面有配置变更时找到客户端并进行响应。
com.alibaba.nacos.config.server.service.LongPollingService#addLongPollingClient

public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,int probeRequestSize) {String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);String tag = req.getHeader("Vipserver-Tag");int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);// Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout.long timeout = Math.max(10000, Long.parseLong(str) - delayTime);if (isFixedPolling()) {timeout = Math.max(10000, getFixedPollingInterval());// Do nothing but set fix polling timeout.} else {long start = System.currentTimeMillis();// 校验md5List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);if (changedGroups.size() > 0) {// 如果有变更立马返回generateResponse(req, rsp, changedGroups);LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant",RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,changedGroups.size());return;} else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {// 如果是初始化请求,直接返回,不挂起LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,changedGroups.size());return;}}String ip = RequestUtil.getRemoteIp(req);// Must be called by http thread, or send response.final AsyncContext asyncContext = req.startAsync();// AsyncContext.setTimeout() is incorrect, Control by oneselfasyncContext.setTimeout(0L);// 如果md5是一样的,异步执行ClientLongPollingConfigExecutor.executeLongPolling(new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}

ClientLongPolling()会启动一个延时30执行的任务,如果30s内配置没有变更,任务就会执行,对客户端进行响应,如果30s内配置发生了变更,此任务就会被取消。
com.alibaba.nacos.config.server.service.LongPollingService.ClientLongPolling#run

public void run() {// 延时30s执行asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(new Runnable() {@Overridepublic void run() {try {getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());// Delete subsciber's relations.allSubs.remove(ClientLongPolling.this);if (isFixedPolling()) {... ...} else {LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "timeout",RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),"polling", clientMd5Map.size(), probeRequestSize);// 超时直接返回sendResponse(null);}} catch (Throwable t) {LogUtil.DEFAULT_LOG.error("long polling error:" + t.getMessage(), t.getCause());}}}, timeoutTime, TimeUnit.MILLISECONDS);// 将客户端端缓存至队列中allSubs.add(this);
}

sendResponse()对客户端进行响应,如果配置有变更,就会取消上面创建的任务。
com.alibaba.nacos.config.server.service.LongPollingService.ClientLongPolling#sendResponse

void sendResponse(List<String> changedGroups) {// Cancel time out task.if (null != asyncTimeoutFuture) {// 取消任务asyncTimeoutFuture.cancel(false);}generateResponse(changedGroups);
}

generateResponse()会将变更配置的dataId和group新信息返回给客户端,并不会返回具体的配置内容,内容会由客户端来查询。
com.alibaba.nacos.config.server.service.LongPollingService.ClientLongPolling#generateResponse

void generateResponse(List<String> changedGroups) {if (null == changedGroups) {// Tell web container to send http response.asyncContext.complete();return;}HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();try {// 封装更新的配置,返回客户端final String respString = MD5Util.compareMd5ResultString(changedGroups);// Disable cache.response.setHeader("Pragma", "no-cache");response.setDateHeader("Expires", 0);response.setHeader("Cache-Control", "no-cache,no-store");response.setStatus(HttpServletResponse.SC_OK);response.getWriter().println(respString);asyncContext.complete();} catch (Exception ex) {PULL_LOG.error(ex.toString(), ex);asyncContext.complete();}
}

配置变更通知客户端

当在Nacos管理后台修改了配置后,会调用/v1/cs/configs来更新配置。

publishConfig()会将配置保存到数据库中,并发布ConfigDataChangeEvent事件。
com.alibaba.nacos.config.server.controller.ConfigController#publishConfig

@PostMapping
@Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)
public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,@RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,@RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,@RequestParam(value = "appName", required = false) String appName,@RequestParam(value = "src_user", required = false) String srcUser,@RequestParam(value = "config_tags", required = false) String configTags,@RequestParam(value = "desc", required = false) String desc,@RequestParam(value = "use", required = false) String use,@RequestParam(value = "effect", required = false) String effect,@RequestParam(value = "type", required = false) String type,@RequestParam(value = "schema", required = false) String schema) throws NacosException {// 修改配置入口final String srcIp = RequestUtil.getRemoteIp(request);final String requestIpApp = RequestUtil.getAppName(request);srcUser = RequestUtil.getSrcUserName(request);//check typeif (!ConfigType.isValidType(type)) {type = ConfigType.getDefaultType().getType();}// check tenantParamUtils.checkTenant(tenant);ParamUtils.checkParam(dataId, group, "datumId", content);ParamUtils.checkParam(tag);Map<String, Object> configAdvanceInfo = new HashMap<String, Object>(10);MapUtils.putIfValNoNull(configAdvanceInfo, "config_tags", configTags);MapUtils.putIfValNoNull(configAdvanceInfo, "desc", desc);MapUtils.putIfValNoNull(configAdvanceInfo, "use", use);MapUtils.putIfValNoNull(configAdvanceInfo, "effect", effect);MapUtils.putIfValNoNull(configAdvanceInfo, "type", type);MapUtils.putIfValNoNull(configAdvanceInfo, "schema", schema);ParamUtils.checkParam(configAdvanceInfo);if (AggrWhitelist.isAggrDataId(dataId)) {LOGGER.warn("[aggr-conflict] {} attempt to publish single data, {}, {}", RequestUtil.getRemoteIp(request),dataId, group);throw new NacosException(NacosException.NO_RIGHT, "dataId:" + dataId + " is aggr");}final Timestamp time = TimeUtils.getCurrentTime();String betaIps = request.getHeader("betaIps");ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);configInfo.setType(type);if (StringUtils.isBlank(betaIps)) {if (StringUtils.isBlank(tag)) {// 插入数据库persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true);// 发布ConfigDataChangeEvent事件/*** AsyncNotifyService监听了ConfigDataChangeEvent事件* @see AsyncNotifyService#AsyncNotifyService(com.alibaba.nacos.core.cluster.ServerMemberManager)*/ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));} else {persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, true);ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));}} else {// beta publishpersistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, true);ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));}ConfigTraceService.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), InetUtils.getSelfIP(),ConfigTraceService.PERSISTENCE_EVENT_PUB, content);return true;
}

AsyncNotifyService监听了ConfigDataChangeEvent事件,然后提交了AsyncTask任务来对Nacos集群中的节点进行通知配置的变化。
com.alibaba.nacos.config.server.service.notify.AsyncNotifyService#AsyncNotifyService

public AsyncNotifyService(ServerMemberManager memberManager) {this.memberManager = memberManager;// Register ConfigDataChangeEvent to NotifyCenter.NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);// Register A Subscriber to subscribe ConfigDataChangeEvent.NotifyCenter.registerSubscriber(new Subscriber() {@Overridepublic void onEvent(Event event) {// Generate ConfigDataChangeEvent concurrentlyif (event instanceof ConfigDataChangeEvent) {// 监听ConfigDataChangeEvent事件ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;long dumpTs = evt.lastModifiedTs;String dataId = evt.dataId;String group = evt.group;String tenant = evt.tenant;String tag = evt.tag;Collection<Member> ipList = memberManager.allMembers();// In fact, any type of queue here can be// 遍历集群中的所有节点,封装NotifySingleTaskQueue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();for (Member member : ipList) {queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),evt.isBeta));}// 提交AsyncTask任务,AsyncTask中包含了NotifySingleTask/*** @see AsyncTask#run()*/ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue));}}@Overridepublic Class<? extends Event> subscribeType() {return ConfigDataChangeEvent.class;}});
}

AsyncTask.run()会调用Nacos集群中的所有节点(包含自己)的Http接口/v1/cs/communication/dataChange来通知配置的变化。
com.alibaba.nacos.config.server.service.notify.AsyncNotifyService.AsyncTask#run

@Override
public void run() {executeAsyncInvoke();
}private void executeAsyncInvoke() {// 遍历所有的NotifySingleTask任务while (!queue.isEmpty()) {NotifySingleTask task = queue.poll();String targetIp = task.getTargetIP();if (memberManager.hasMember(targetIp)) {// start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notifyboolean unHealthNeedDelay = memberManager.isUnHealth(targetIp);if (unHealthNeedDelay) {// target ip is unhealthy, then put it in the notification listConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,0, task.target);// get delay time and set fail count to the taskasyncTaskExecute(task);} else {Header header = Header.newInstance();header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified()));header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP());if (task.isBeta) {header.addParam("isBeta", "true");}AuthHeaderUtil.addIdentityToHeader(header);// 调用/v1/cs/communication/dataChange接口/*** @see CommunicationController#notifyConfigInfo(javax.servlet.http.HttpServletRequest, java.lang.String, java.lang.String, java.lang.String, java.lang.String)*/restTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task));}}}
}

notifyConfigInfo()主要负责将变化的配置从数据库中查询出来,然后更新本地的文件。
com.alibaba.nacos.config.server.controller.CommunicationController#notifyConfigInfo

@GetMapping("/dataChange")
public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam("dataId") String dataId,@RequestParam("group") String group,@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,@RequestParam(value = "tag", required = false) String tag) {// 通知配置数据变更的入口dataId = dataId.trim();group = group.trim();String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);String isBetaStr = request.getHeader("isBeta");if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);} else {// 转存数据dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);}return true;
}

dump()操作又提交了一个DumpTask任务。
com.alibaba.nacos.config.server.service.dump.DumpService#dump(java.lang.String, java.lang.String, java.lang.String, java.lang.String, long, java.lang.String)

public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp) {dump(dataId, group, tenant, tag, lastModified, handleIp, false);
}public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,boolean isBeta) {String groupKey = GroupKey2.getKey(dataId, group, tenant);String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta), tag);// 添加DumpTask任务/*** @see DumpProcessor#process(com.alibaba.nacos.common.task.NacosTask)*/dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey);
}

process()会将变化的配置从数据库中查询出来,交于DumpConfigHandler.configDump()处理配置。
com.alibaba.nacos.config.server.service.dump.processor.DumpProcessor#process

public boolean process(NacosTask task) {// 处理DumpTaskfinal PersistService persistService = dumpService.getPersistService();DumpTask dumpTask = (DumpTask) task;String[] pair = GroupKey2.parseKey(dumpTask.getGroupKey());String dataId = pair[0];String group = pair[1];String tenant = pair[2];long lastModified = dumpTask.getLastModified();String handleIp = dumpTask.getHandleIp();boolean isBeta = dumpTask.isBeta();String tag = dumpTask.getTag();ConfigDumpEvent.ConfigDumpEventBuilder build = ConfigDumpEvent.builder().namespaceId(tenant).dataId(dataId).group(group).isBeta(isBeta).tag(tag).lastModifiedTs(lastModified).handleIp(handleIp);if (isBeta) {
。。。。。。} else {if (StringUtils.isBlank(tag)) {// 从数据库查询配置数据ConfigInfo cf = persistService.findConfigInfo(dataId, group, tenant);build.remove(Objects.isNull(cf));build.content(Objects.isNull(cf) ? null : cf.getContent());build.type(Objects.isNull(cf) ? null : cf.getType());// 转存配置数据return DumpConfigHandler.configDump(build.build());} else {
。。。。。。}}
}

configDump()又调用了ConfigCacheService.dump(),这个方法在服务端启动时保存所有的配置文件时也使用了。
com.alibaba.nacos.config.server.service.dump.DumpConfigHandler#configDump

public static boolean configDump(ConfigDumpEvent event) {final String dataId = event.getDataId();final String group = event.getGroup();final String namespaceId = event.getNamespaceId();final String content = event.getContent();final String type = event.getType();final long lastModified = event.getLastModifiedTs();if (event.isBeta()) {。。。。。。}if (StringUtils.isBlank(event.getTag())) {if (dataId.equals(AggrWhitelist.AGGRIDS_METADATA)) {AggrWhitelist.load(content);}if (dataId.equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) {ClientIpWhiteList.load(content);}if (dataId.equals(SwitchService.SWITCH_META_DATAID)) {SwitchService.load(content);}boolean result;if (!event.isRemove()) {// dump数据result = ConfigCacheService.dump(dataId, group, namespaceId, content, lastModified, type);if (result) {ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified,content.length());}} else {。。。。。。}return result;} else {。。。。。。}}

dump()会将新的配置写入磁盘文件,更新md5,然后发布LocalDataChangeEvent事件。
com.alibaba.nacos.config.server.service.ConfigCacheService#dump

public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs,String type) {String groupKey = GroupKey2.getKey(dataId, group, tenant);CacheItem ci = makeSure(groupKey);ci.setType(type);final int lockResult = tryWriteLock(groupKey);assert (lockResult != 0);if (lockResult < 0) {DUMP_LOG.warn("[dump-error] write lock failed. {}", groupKey);return false;}try {final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);if (md5.equals(ConfigCacheService.getContentMd5(groupKey))) {DUMP_LOG.warn("[dump-ignore] ignore to save cache file. groupKey={}, md5={}, lastModifiedOld={}, "+ "lastModifiedNew={}", groupKey, md5, ConfigCacheService.getLastModifiedTs(groupKey),lastModifiedTs);} else if (!PropertyUtil.isDirectRead()) {// 写入磁盘DiskUtil.saveToDisk(dataId, group, tenant, content);}// 更新md5,发布LocalDataChangeEvent事件updateMd5(groupKey, md5, lastModifiedTs);return true;} catch (IOException ioe) {DUMP_LOG.error("[dump-exception] save disk error. " + groupKey + ", " + ioe.toString(), ioe);if (ioe.getMessage() != null) {String errMsg = ioe.getMessage();if (NO_SPACE_CN.equals(errMsg) || NO_SPACE_EN.equals(errMsg) || errMsg.contains(DISK_QUATA_CN) || errMsg.contains(DISK_QUATA_EN)) {// Protect from disk full.FATAL_LOG.error("磁盘满自杀退出", ioe);System.exit(0);}}return false;} finally {releaseWriteLock(groupKey);}
}

updateMd5()会更新md5,然后发布LocalDataChangeEvent事件。
com.alibaba.nacos.config.server.service.ConfigCacheService#updateMd5

public static void updateMd5(String groupKey, String md5, long lastModifiedTs) {CacheItem cache = makeSure(groupKey);if (cache.md5 == null || !cache.md5.equals(md5)) {cache.md5 = md5;cache.lastModifiedTs = lastModifiedTs;// 发布LocalDataChangeEvent事件/*** LongPollingService监听了LocalDataChangeEvent事件* @see LongPollingService#LongPollingService()*/NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));}
}

LongPollingService会监听LocalDataChangeEvent事件,然后提交DataChangeTask。
com.alibaba.nacos.config.server.service.LongPollingService#LongPollingService

public LongPollingService() {allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 10L, TimeUnit.SECONDS);// Register LocalDataChangeEvent to NotifyCenter.NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize);// Register A Subscriber to subscribe LocalDataChangeEvent.NotifyCenter.registerSubscriber(new Subscriber() {@Overridepublic void onEvent(Event event) {if (isFixedPolling()) {// Ignore.} else {// 监听LocalDataChangeEvent事件if (event instanceof LocalDataChangeEvent) {LocalDataChangeEvent evt = (LocalDataChangeEvent) event;// 提交DataChangeTask任务/*** @see DataChangeTask#run()*/ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));}}}@Overridepublic Class<? extends Event> subscribeType() {return LocalDataChangeEvent.class;}});}

DataChangeTask会找到监听这个配置的客户端,然后进行通知。
com.alibaba.nacos.config.server.service.LongPollingService.DataChangeTask#run

public void run() {try {ConfigCacheService.getContentBetaMd5(groupKey);for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {ClientLongPolling clientSub = iter.next();// 找到监听这个配置的客户端if (clientSub.clientMd5Map.containsKey(groupKey)) {// If published tag is not in the beta list, then it skipped.if (isBeta && !CollectionUtils.contains(betaIps, clientSub.ip)) {continue;}// If published tag is not in the tag list, then it skipped.if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {continue;}getRetainIps().put(clientSub.ip, System.currentTimeMillis());iter.remove(); // Delete subscribers' relationships.LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - changeTime), "in-advance",RequestUtil.getRemoteIp((HttpServletRequest) clientSub.asyncContext.getRequest()),"polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);// 通知客户端配置更新了clientSub.sendResponse(Arrays.asList(groupKey));}}} catch (Throwable t) {LogUtil.DEFAULT_LOG.error("data change error: {}", ExceptionUtil.getStackTrace(t));}
}
http://www.lryc.cn/news/1480.html

相关文章:

  • 第十五章 栅格数据重分类、栅格计算器、插值分析
  • CS5260测试版|CS5260demoboard|typec转VGA参考PCB原理图
  • winform开发心得
  • 学习周报-2023-0210
  • 百度富文本UE的问题集合
  • 在Linux上安装node-v14.17.3和npm-6.14.13
  • 机器学习框架sklearn之特征降维
  • java实现二叉树(一文带你详细了解二叉树的)
  • 学弟学妹少走弯路,超完整算法刷题路线出炉
  • Windows截取gif动态图的软件 ScreenToGif 的安装、使用教程
  • C++程序设计——多态:虚函数、抽象类、虚函数表
  • OpenMMLab AI实战营 第6课 语义分割与MMSegmentation
  • 产业互联网是对互联网的衍生和进化,也是一次重塑和再造
  • Shell脚本之——Hadoop3单机版安装
  • 代码随想录NO39 |0-1背包问题理论基础 416.分割等和子集
  • FITC-PEG-FA,荧光素-聚乙二醇-叶酸,FA-PEG-FITC,实验室科研试剂,提供质量检测
  • 简洁易懂:源码+实战讲解Redisson并发锁及看门狗自动续期
  • TCP 三次握手和四次挥手
  • JavaWeb复习
  • P14 PyTorch AutoGrad
  • 前端报表如何实现无预览打印解决方案或静默打印
  • Operating System Course 2 - My OS
  • 离散数学 课时一 命题逻辑的基本概念
  • Word文档带有权限密码怎么办?
  • C++多态
  • 访问学者如何申请美国J1签证?
  • 使用gitlab ci/cd来发布一个.net 项目
  • 笔试题-2023-蔚来-数字芯片设计【纯净题目版】
  • ThreadLocal 详解
  • 【Java 面试合集】重写以及重载有什么区别能简单说说嘛