Nacos 配置中心源码讲解

时间:2023-01-23 17:55:03

目录

1. 配置中心的优点

2. 配置模型结构

3. 配置中心 Server 端实现原理

3.1 新建配置 / 发布配置

3.2 查询配置

4. 配置中心 Client 端实现原理

4.1 发布配置

4.2 查询配置

4.3 监听机制 Listener


1. 配置中心的优点

  1. 运行时动态修改系统参数配置,不用重启服务

  2. 方便运维人员修改系统参数,不直接改代码,安全性高,防止代码改坏了2

  3. 微服务多,配置统一管理

2. 配置模型结构

Nacos 配置中心源码讲解

3. 配置中心 Server 端实现原理

3.1 新建配置 / 发布配置

配置通过后台管理可以新建,当点击发布按钮时,将会调用接口 /nacos/v1/cs/configs 完成发布

Nacos 配置中心源码讲解

接口 /nacos/v1/cs/configs 定义在 com.alibaba.nacos.config.server.controller.ConfigController#publishConfig

接下来看看这个接口如何实现的

ConfigController.publishConfig

publishConfig 接口主要做的就是数据的组装、字段的非法性校验。

@RestController
@RequestMapping(Constants.CONFIG_CONTROLLER_PATH) // CONFIG_CONTROLLER_PATH = /v1/cs/configs
public class ConfigController {
    
    
    @PostMapping
    public Boolean publishConfig( HttpServletRequest request, HttpServletResponse response,
                                  @RequestParam String dataId, @RequestParam String group 
                                  // 省略其他字段                                
                                ) {
        
        // 省略非关键代码
        
        // 校验字段是否合法
        ParamUtils.checkTenant(tenant);
        ParamUtils.checkParam(dataId, group, "datumId", content);
        ParamUtils.checkParam(tag);
        
        // 从请求中组装数据到 ConfigForm 对象中
        ConfigForm configForm = new ConfigForm();
        configForm.setDataId(dataId);
        configForm.setGroup(group);
        // ... 省略 configForm 其他字段的 set 
    
    
        // 构建一个 ConfigRequestInfo 请求对象
        ConfigRequestInfo configRequestInfo = new ConfigRequestInfo(); 
        configRequestInfo.setSrcIp(RequestUtil.getRemoteIp(request));
        configRequestInfo.setRequestIpApp(RequestUtil.getAppName(request));
        configRequestInfo.setBetaIps(request.getHeader("betaIps"));
    
        String encryptedDataKey = pair.getFirst();
       
        return configOperationService.publishConfig(configForm, configRequestInfo, encryptedDataKey);
    }
    
}

ConfigOperationService.publishConfig

publishConfig 接口最后调用到了 configOperationService.publishConfig

接下来看看这个 service 如何实现,以下代码省略了 betaIps 与 tag 的分支,这里只关注单机情况的当前的主流程。

@Service
public class ConfigOperationService {
    
    private ConfigInfoPersistService configInfoPersistService;
    
    public Boolean publishConfig(ConfigForm configForm, ConfigRequestInfo configRequestInfo, String encryptedDataKey) {
        
        // 省略非关键代码
        
        Map<String, Object> configAdvanceInfo = getConfigAdvanceInfo(configForm);
       
        
        // 将 configForm 转成 与数据库对应的 ConfigInfo 对象
        ConfigInfo configInfo = new ConfigInfo(configForm.getDataId(), ...);        
        configInfo.setEncryptedDataKey(encryptedDataKey);
        
        // 调用持久化 service 执行 insertOrUpdate 插入或更新数据(因为 publishConfig 接口可以同时做新增和编辑)
        configInfoPersistService.insertOrUpdate(configRequestInfo.getSrcIp(), 
                                                configForm.getSrcUser(),
                                                configInfo, 
                                                TimeUtils.getCurrentTime(), 
                                                configAdvanceInfo, 
                                                false);
​
        return true;
    }    
}

可见,上述代码中,最后调用了 ConfigInfoPersistService.insertOrUpdate 方法。

ConfigInfoPersistService.insertOrUpdate

ConfigInfoPersistService,这个类名也能猜出它的作用了,Persist 就是持久化,这个类就是负责持久化数据。

ConfigInfoPersistService 是一个接口代表持久化,而 ExternalConfigInfoPersistServiceImpl 为实现类,代表采用外部的持久化(MySQL)

看看如何实现:

@Service
public class ExternalConfigInfoPersistServiceImpl implements ConfigInfoPersistService {
    
    @Override
    public void insertOrUpdate(String srcIp, String srcUser, ConfigInfo configInfo, Timestamp time,
            Map<String, Object> configAdvanceInfo, boolean notify) {
        try {
            // 添加配置
            addConfigInfo(srcIp, srcUser, configInfo, time, configAdvanceInfo, notify);
        } catch (DataIntegrityViolationException ive) { // Unique constraint conflict
            updateConfigInfo(configInfo, srcIp, srcUser, time, configAdvanceInfo, notify);
        }
    }
    
    
    // 添加配置
    @Override
    public void addConfigInfo(final String srcIp, final String srcUser, final ConfigInfo configInfo,
            final Timestamp time, final Map<String, Object> configAdvanceInfo, final boolean notify) {
        
        // 添加配置
        long configId = addConfigInfoAtomic(-1, srcIp, srcUser, configInfo, time, configAdvanceInfo);
        
        // 省略非关键代码           
    }
    
    
    // addConfigInfoAtomic 完成实际的数据插入
    @Override
    public long addConfigInfoAtomic(final long configId, final String srcIp, final String srcUser,
            final ConfigInfo configInfo, final Timestamp time, Map<String, Object> configAdvanceInfo) {
 
        // 省略非关键代码    
        
        // 根据文件内容使用指定编码(UTF-8) 计算 md5 值
        final String md5Tmp = MD5Utils.md5Hex(configInfo.getContent(), Constants.ENCODE);
        
        KeyHolder keyHolder = new GeneratedKeyHolder();
        
        // 找到 ConfigInfoMapper 
        ConfigInfoMapper configInfoMapper =             mapperManager.findMapper(dataSourceService.getDataSourceType(),TableConstant.CONFIG_INFO);
        
        // 构建 SQL
        final String sql = configInfoMapper.insert(
                Arrays.asList("data_id", "group_id", "tenant_id", "app_name", "content", 
                              "md5", "src_ip", "src_user","gmt_create", "gmt_modified",
                              "c_desc", "c_use", "effect", "type", 
                              "c_schema","encrypted_data_key"));
​
        String[] returnGeneratedKeys = configInfoMapper.getPrimaryKeyGeneratedKeys();
     
            // 使用 JDBC Template 执行 SQL
            jt.update(new PreparedStatementCreator() {
                @Override
                public PreparedStatement createPreparedStatement(Connection connection) throws SQLException {
                    // 构建预编译执行模式,传入对应参数
                    PreparedStatement ps = connection.prepareStatement(sql, returnGeneratedKeys);
                    ps.setString(1, configInfo.getDataId());
                    ps.setString(2, configInfo.getGroup());
                    ps.setString(3, tenantTmp);
                    ps.setString(4, appNameTmp);
                    ps.setString(5, configInfo.getContent());
                    ps.setString(6, md5Tmp);
                    ps.setString(7, srcIp);
                    ps.setString(8, srcUser);
                    ps.setTimestamp(9, time);
                    ps.setTimestamp(10, time);
                    ps.setString(11, desc);
                    ps.setString(12, use);
                    ps.setString(13, effect);
                    ps.setString(14, type);
                    ps.setString(15, schema);
                    ps.setString(16, encryptedDataKey);
                    return ps;
                }
            }, keyHolder);
        
        
            // 生成一个 config id 返回
            Number nu = keyHolder.getKey();
            return nu.longValue();
    }    
}

可以看出,配置的发布主要流程就是向数据库添加了数据。

3.2 查询配置

在 Nacos 后台管理,点击配置详情,观察控制台浏览器发送了一条查询详情接口 GET /nacos/v1/cs/configs

该接口位置在 com.alibaba.nacos.config.server.controller.ConfigController#detailConfigInfo

@RestController
@RequestMapping(Constants.CONFIG_CONTROLLER_PATH)
public class ConfigController {
    
    private ConfigInfoPersistService configInfoPersistService;
    
    @GetMapping
    public ConfigAllInfo detailConfigInfo(String dataId, String group,String tenant) {
​
        // 参数校验
        ParamUtils.checkTenant(tenant);
        ParamUtils.checkParam(dataId, group, "datumId", "content");
        
        // 查询配置详情
        ConfigAllInfo configAllInfo = configInfoPersistService.findConfigAllInfo(dataId, group, tenant);
        
        // 省略非关键代码
        return configAllInfo;
    }
}

configInfoPersistService.findConfigAllInfo 最终调用以下方法

@Service
public class ExternalConfigInfoPersistServiceImpl implements ConfigInfoPersistService {
    
    protected JdbcTemplate jt;
    
    @Override
    public ConfigAllInfo findConfigAllInfo(final String dataId, final String group, final String tenant) {
        
        // 省略非关键代码
        
        // 获取到 mapper
        ConfigInfoMapper configInfoMapper = mapperManager.findMapper(
                                                dataSourceService.getDataSourceType(),
                                                TableConstant.CONFIG_INFO);
                
        
        // 使用 JdbcTemplate 执行 SQL 查询数据
        ConfigAllInfo configAdvance = this.jt.queryForObject(
                
            configInfoMapper.select(
                            Arrays.asList("id", "data_id", "group_id", "tenant_id", 
                                          "app_name", "content", "md5", "gmt_create",
                                        "gmt_modified", "src_user", "src_ip", "c_desc",
                                          "c_use", "effect", "type", "c_schema",
                                    "encrypted_data_key"),
                            Arrays.asList("data_id", "group_id", "tenant_id")),
            
                    new Object[] {dataId, group, tenantTmp}, CONFIG_ALL_INFO_ROW_MAPPER);
        
            return configAdvance;
    }
}

可见,查询配置详情就是从数据库查询一条记录。

4. 配置中心 Client 端实现原理

Server 端看起来很简单,复杂度其实都在 Client 端。

4.1 发布配置

我们使用官方提供的代码示例来研究,代码模块在 nacos/example

Nacos 配置中心源码讲解

客户端调用的发布配置方法其实就是向 server 端发送了一个请求进行发布。而 server 端如何处理上面已经介绍过了。

Nacos 2.x 版本 Client 是使用 RPC 发送的消息。后台管理则是使用 HTTP 接口调用的。

可见最终采用 GRPC 发送请求。

Nacos 配置中心源码讲解

4.2 查询配置

NacosClient 获取配置调用如下方法

Nacos 配置中心源码讲解

接下来,研究 getConfig 方法

public class NacosConfigService implements ConfigService {
    
    @Override
    public String getConfig(String dataId, String group, long timeoutMs) throws NacosException {
        return getConfigInner(namespace, dataId, group, timeoutMs);
    }
    
    
    private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) {
        
        // 省略非关键代码
 
        ConfigResponse cr = new ConfigResponse();
        
        cr.setDataId(dataId);
        cr.setTenant(tenant);
        cr.setGroup(group);
         
        // 读取本地故障切换文件(如果存在)
        String content = LocalConfigInfoProcessor.getFailover(worker.getAgentName(), dataId, group, tenant);
        if (content != null) {
            // 本地故障切换文件存在,就读取内容并返回
​
            cr.setContent(content);            
            String encryptedDataKey = LocalEncryptedDataKeyProcessor.getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);
            cr.setEncryptedDataKey(encryptedDataKey);          
            return content;
        }
        
        // 故障切换文件不存在,发送一个 RPC 请求来查询 server 端的配置文件内容
        try {
            ConfigResponse response = worker.getServerConfig(dataId, group, tenant, timeoutMs, false);
            cr.setContent(response.getContent());
            cr.setEncryptedDataKey(response.getEncryptedDataKey());
            content = cr.getContent();
            
            return content;
        } catch (NacosException ioe) {
            if (NacosException.NO_RIGHT == ioe.getErrCode()) {
                throw ioe;
            }
        }
        
        // 走到这里代表请求 server 端失败了,那么就读取本地的快照文件获取配置内容
        content = LocalConfigInfoProcessor.getSnapshot(worker.getAgentName(), dataId, group, tenant);
        
        cr.setContent(content);
        String encryptedDataKey = LocalEncryptedDataKeyProcessor
                .getEncryptDataKeySnapshot(agent.getName(), dataId, group, tenant);
        cr.setEncryptedDataKey(encryptedDataKey);
        configFilterChainManager.doFilter(null, cr);
        content = cr.getContent();
        return content;
    }
    
}

整体流程图

Nacos 配置中心源码讲解

故障切换文件是什么?

该文件由用户手动维护,Nacos 不负责创建。

该功能设计可发现,当我们为 NacosClient 所在的服务器的文件系统中创建一个故障切换文件后,NacosClient 将从该文件中读取配置项,而不再请求服务端数据。

根据源码注释可知,该功能可用于当 NacosServer 关闭的同一时间,客户端需要同时更改配置(因为启动需要时间,这个时间段直接读取本地的故障切换文件)

本地快照文件是什么

本地快照文件由每次 RPC 请求远程 Server 文件获取到返回结果时,将结果存储起来到一个快照文件。

后续 Server 访问不通了,就使用本地快照文件。流程见下图中的红色区域。

Nacos 配置中心源码讲解

存储快照的原理就是 将配置的内容写入到本地指定文件中。

com.alibaba.nacos.client.config.impl.LocalConfigInfoProcessor#saveSnapshot

Nacos 配置中心源码讲解

4.3 监听机制 Listener

监听机制代表 NacosClient 监听某个配置,当配置内容发生变更时,NacosClient 能够知道并拿到最新的配置内容。

NacosClient 使用监听功能的代码如下:

Nacos 配置中心源码讲解

接下来研究下该功能如何实现。

入口是:添加监听器 addListener

public class NacosConfigService implements ConfigService {
    
    private final ClientWorker worker;
    
    @Override
    public void addListener(String dataId, String group, Listener listener) throws NacosException {
        worker.addTenantListeners(dataId, group, Arrays.asList(listener));
    }
}  

看出添加监听器调用了 ClientWorker.addListener

ClientWorker.addListener 源码如下:

public class ClientWorker {
    
    private ConfigTransportClient agent;
    
    public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners) {
       
        // 根据 dataId,group 包装为 CacheData 放入缓存
        CacheData cache = addCacheDataIfAbsent(dataId, group, agent.getTenant());
        
        synchronized (cache) {
            for (Listener listener : listeners) {
                // 添加监听器
                cache.addListener(listener);
            }
            // 通知监听器读取最新配置
            agent.notifyListenConfig();
        }   
        
        // 省略非关键代码
    }
}   

该方法主要完成两点

  1. 构建一个 CacheData 对象,放入缓存

  2. 通知监听器读取最新配置

我们分开来看

1. 构建一个 CacheData 对象,放入缓存

完成这个功能的方法为 ClientWorker.addCacheDataIfAbsent(),下面单独研究这个方法:

/**
 * groupKey -> cacheData.
 */
private final AtomicReference<Map<String, CacheData>> cacheMap =
    															new AtomicReference<>(new HashMap<>());

public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) {
    
     	// 先从缓存中获取
        CacheData cache = getCache(dataId, group, tenant); // cacheMap.get(key)
        if (null != cache) {
            // 能够获取到直接返回
            return cache;
        }
    	
    	// 下面开始构建 CacheData 
    
        // 构建一个 Key
        String key = GroupKey.getKeyTenant(dataId, group, tenant);
        
		CacheData cache = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant);
		int taskId = cacheMap.get().size() / (int) ParamUtil.getPerTaskConfigSize();
		cache.setTaskId(taskId);
              
		
    	// 将此次构建的 CacheData 放入缓存 Map
    	Map<String, CacheData> copy = new HashMap<>(this.cacheMap.get());
		copy.put(key, cache);
		cacheMap.set(copy);
       
        return cache;
}

再来看看 CacheData 的字段

public class CacheData {
      
    // 配置的 dataId
    public final String dataId;
    // 配置 group 
    public final String group;
    
    // 当前"配置" 的监听器列表
    private final CopyOnWriteArrayList<ManagerListenerWrap> listeners;
    
    // 配置内容
    private volatile String content;

	// 配置内容 MD5 值
    private volatile String md5;
    
    // 本地缓存修改时间
    private volatile AtomicLong lastModifiedTs = new AtomicLong(0);
    
    private int taskId;
  
    private String type;
    
    // 省略部分字段,省略部分 setter getter
    
    
	public void setContent(String content) {
        this.content = content;
        // 每次设置配置内容都会重新计算 它的MD5 值
        this.md5 = getMd5String();
    }
	public String getMd5String() {
        return (null == content) ? Constants.NULL : MD5Utils.md5Hex(content, Constants.ENCODE);
    }

2. 通知监听器读取最新配置

在添加监听器代码的最后调用了 agent.notifyListenConfig(); 代码如下

public class ClientWorker implements Closeable {
   	        
   
	public class ConfigRpcTransportClient extends ConfigTransportClient {
           
		private final BlockingQueue<Object> listenExecutebell = new ArrayBlockingQueue<>(1);

        private Object bellItem = new Object();
           
		@Override
		public void notifyListenConfig() {
			listenExecutebell.offer(bellItem);
		}
	}
       
}

可见源码,只是向 ArrayBlockingQueue 中加入了一个 数据。加入的这个 bellItem 对象只是一个占位符,对象无实际意义。

再来看看 ArrayBlockingQueue 取数据的地方,(还是在当前类)

public class ClientWorker implements Closeable {
   	
   
	public class ConfigRpcTransportClient extends ConfigTransportClient {
		     
		private final BlockingQueue<Object> listenExecutebell = new ArrayBlockingQueue<>(1);
        
        @Override
        public void startInternal() {
            // executor 是 ScheduledExecutorService executor
            executor.schedule(() -> {
                while (
                        // 线程池没有关闭
                        !executor.isShutdown()
                        // 并且 线程池没有执行完毕:有任务正在执行
                        && !executor.isTerminated()) {
                    
                    
                        // 从 listenExecutebell 取出一个数据,取到数据立即返回,没有的话等待5秒再返回
                        listenExecutebell.poll(5L, TimeUnit.SECONDS);
                       
                        
                        // 执行配置监听
                        executeConfigListen();
                   
                }
            }, 0L, TimeUnit.MILLISECONDS);
            
        }
    }
}

可见这段代码,其实最返回值没有接收,返回值其实不重要,也就是队列中的数据内容本身不重要。

重要的是有数据就够了,那么这段代码代表什么意思呢?

这里相当于一个不断无限循环,如果有数据了就理解执行后面的 执行配置监听,没有数据了就每隔5秒执行一次。

其实,这里将队列用作了一种时间控制手段,然后向队列插入一条数据代表着立即执行一次后面的方法。否则就走正常的每隔5秒执行一次。

看看 executeConfigListen 干了什么吧

代码有点长,所以我将代码做了一些精简,省略一些次重要的代码。这个方法要做的事情也有点多,先总结一下:

该方法做的事情就是 读取到最新的配置内容,如有变更,就回调当前客户端的 监听器。

public class ConfigRpcTransportClient extends ConfigTransportClient {
    
	@Override
	public void executeConfigListen() {
		// 有监听组
		// taskId -> List<CacheData>: 存储的是  不需要同步数据 或者 需要同步数据但是 不需要全部同步数据
		Map<String, List<CacheData>> listenCachesMap = new HashMap<>(16);
  
		// 是否需要全量同步(5 分钟全量同步一次)
		boolean needAllSync = System.currentTimeMillis() - lastAllSyncTime >= ALL_SYNC_INTERNAL;
            
		// 遍历当前全部 Cache
		for (CacheData cache : cacheMap.get().values()) {
			// 判断该 CacheData 是否需要检查, 如果 isSyncWithServer == false,必定进行检查
			
			if (cache.isSyncWithServer()) { // 本来是 false 什么时候变成 true 了
				// 检查 CacheData.MD5 与 Listenter.MD5 比对,如果不等于的话直接通知
				cache.checkListenerMd5();
				
                if (!needAllSync) {
					// 不需要全部数据同步,直接循环下一个
					// 上面已经同步了单个
        
					// 是否需要全量同步, 如果未达到全量同步时间或距离上次全量同步小于五分钟,则跳过这个 CacheData:本次的 CacheData 无需更换
					continue;
				}
			}
                    
			// 没同步过数据下来,   或者 需要全量同步的下来
                    
            // 维护 listenCachesMap
            List<CacheData> cacheDatas = listenCachesMap.get(String.valueOf(cache.getTaskId()));
            // 将 CacheData 标记起来
            cacheDatas.add(cache);
		}
            
		boolean hasChangedKeys = false;
 
                
		for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) {
			String taskId = entry.getKey();
                    
			// GroupKey.getKeyTenant -> 上一次修改时间
			Map<String, Long> timestampMap = new HashMap<>(listenCachesMap.size() * 2);
                    
			List<CacheData> listenCaches = entry.getValue();
			for (CacheData cacheData : listenCaches) {
				timestampMap.put(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant), cacheData.getLastModifiedTs().longValue());
            }
                    
			// 构建 RPC 请求对象
            ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(listenCaches);
            // 获取一个 RPC Client
			RpcClient rpcClient = ensureRpcClient(taskId);

            // 发送请求
            ConfigChangeBatchListenResponse configChangeBatchListenResponse =  	(ConfigChangeBatchListenResponse) requestProxy(rpcClient, configChangeListenRequest);
            
     		// 已改变的的配置
			Set<String> changeKeys = new HashSet<>();
                            
                           
			if (!CollectionUtils.isEmpty(configChangeBatchListenResponse.getChangedConfigs())) {
				
                // 已有变更
				hasChangedKeys = true;
                                
				for (ConfigChangeBatchListenResponse.ConfigContext changeConfig : configChangeBatchListenResponse.getChangedConfigs()) {
                                    
					String changeKey = GroupKey.getKeyTenant(changeConfig.getDataId(), changeConfig.getGroup(), changeConfig.getTenant());
					
                    changeKeys.add(changeKey);
                                    
					// 通知改变 刷新内容并且检查、触发 Listener 回调
					refreshContentAndCheck(changeKey);
				}
                                
			}
                            
                            
			for (CacheData cacheData : listenCaches) {
				String groupKey = GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.getTenant());
				
                if (!changeKeys.contains(groupKey)) {
					// 返回来要改变的配置 没有 包含当前的配置
					// 上一次修改时间
					Long previousTimesStamp = timestampMap.get(groupKey);
					if (previousTimesStamp != null &&
! cacheData.getLastModifiedTs().compareAndSet(previousTimesStamp, System.currentTimeMillis())) {
                                                
						// 修改失败了, 两个时间不等于, 说明别的地方已经同步过了, 比如 接收到服务端的消息推送
						continue;
					}
                                            
					// 两个时间等于、修改成功了,修改为了当前时间,开始同步数据
					cacheData.setSyncWithServer(true);     
				}
                                
				cacheData.setInitializing(false);
			}         
		}
  
        // If it has changed keys,notify re-sync md5.
  		if (hasChangedKeys) {
        	notifyListenConfig();
		}
	}
}

代码有点长,这里将它分为两大部分来看

1. 找出需要监听的配置

最终目标就是将需要监听的配置放到 listenCachesMap 中

满足什么样的规则才能放进去呢?

Nacos 每隔 5 分钟进行一次全量数据同步,就是全部配置(CacheData) 都去远程获取一次最新数据。

否则当 CacheData 首次添加监听器、接收到服务端的配置变更推送、最后一个监听器被移除的时候,就去获取一次最新数据。

也就意味着,配置如果没变更的话,就 5 分钟去远程获取一次数据,否则立即从远程服务获取一次数据。

listenCachesMap 代表什么呢,这意味着本次需要检查的 需要监听的 配置。代表这些配置可能发生变更。所以把他们先收集起来。

再来看看,什么样的配置会放进去?

  1. 就是上面提到的(首次添加监听器、接收到服务端的配置变更推送、最后一个监听器被移除的时候,就去获取一次最新数据) 这三种方式,只要有一项触发了就会放入。

  2. 不使用本地配置的放入(这项可以忽略掉,主要是第一条)

2. 获取最新的配置

上一步获取到了 listenCachesMap , 这一步就开始对这个 map 进行处理。

怎么处理呢?

此时会发送一个 RPC 请求,参数是这些 CacheData ,目的是向服务器查询,发送过去的这些 CacheData 的内容是否发生了变更?服务器接收到请求如何判断是否变更呢?其实就是比较客户端的 Content 和 服务端存的 Content 是不是一致的。Content 也许内容很多,直接比较效率不高,所以比较的其实是 Content 的 md5 值。

Nacos Server 端的比较代码

// com.alibaba.nacos.config.server.service.ConfigCacheService#isUptodate(java.lang.String, java.lang.String, java.lang.String, java.lang.String)

// NacosServer 端的比较代码 , 参数 md5 是 NacosClient 传过来的
public static boolean isUptodate(String groupKey, String md5, String ip, String tag) {
    // 获取 Server 端的 md5 
	String serverMd5 = ConfigCacheService.getContentMd5(groupKey, ip, tag);
    // 与客户端的比较
	return StringUtils.equals(md5, serverMd5);
}

NacosServer 比较完后,会将 md5 不同的 CacheData 返回给客户端,也就是告诉 Nacos Client 哪些 配置与 NacosClient 传来的 是不一样的,也就是说,Nacos Client 不同的这些配置是旧的,然后将 不同的返回给 NacosClient。

再来会到 NacosClient ,NacosClient 拿到结果后,那就得需要处理这些不同的 CacheData 了,怎么处理呢?

for (ConfigChangeBatchListenResponse.ConfigContext changeConfig : configChangeBatchListenResponse
                                        .getChangedConfigs()) {
                                    
	String changeKey = GroupKey.getKeyTenant(changeConfig.getDataId(), changeConfig.getGroup(), changeConfig.getTenant());
    changeKeys.add(changeKey);
                                    
    // 通知改变 刷新内容并且检查、触发 Listener 回调
    refreshContentAndCheck(changeKey);
}

最终调用了 refreshContentAndCheck

看看其实现源码:

public class ClientWorker implements Closeable {
	
    // 省略部分代码,精简化部分代码
    
    private void refreshContentAndCheck(String groupKey) {
        CacheData cache = cacheMap.get().get(groupKey);
      
		refreshContentAndCheck(cache, true);  
    }
    
    private void refreshContentAndCheck(CacheData cacheData, boolean notify) {
 		
        // 从 Server 端获取最新的配置
        ConfigResponse response = getServerConfig(cacheData.dataId, cacheData.group, cacheData.tenant, 3000L,notify);
        
        // 更新本地的 CacheData
		cacheData.setContent(response.getContent());
         
		// 检查监听器的 MD5
        cacheData.checkListenerMd5();
    }
}

这个方法要做的事情主要就是从服务端获取最新的数据并刷新本地的旧数据,然后通知当前 CacheData 的监听器。

接下来,看看最后调用 cacheData.checkListenerMd5() 如何实现

public class CacheData {
    
	void checkListenerMd5() {
        for (ManagerListenerWrap wrap : listeners) {
            // 比较 监听器的 MD5 是否与最新的 MD5 一致
            if (!md5.equals(wrap.lastCallMd5)) {
                // MD5 不一致 -》 通知监听器,发生了改变。
                safeNotifyListener(dataId, group, content, type, md5, encryptedDataKey, wrap);
            }
        }
    }      
    
    // 通知当前 CacheData 的监听器
    // 此方法经过我很大的简化,去除了一些次重要代码,但不影响主流程,目的是更容易的看到本节主题(如何回调到监听器)流程
   
	private void safeNotifyListener(final String dataId, final String group, final String content, 										final String type,final String md5, 
                                    final String encryptedDataKey, 
                                    final ManagerListenerWrap listenerWrap) {
        
        final Listener listener = listenerWrap.listener;
        
        Runnable job = () -> {

			// 监听回调
			listener.receiveConfigInfo(content);
         
            // 更新监听器最新的 MD5 值
			listenerWrap.lastCallMd5 = md5;
  
        };

		// 执行任务
		listener.getExecutor().execute(job);
    }
}

listener.receiveConfigInfo(content); 这个回调方法就是示例中的回调

Nacos 配置中心源码讲解

至此,整个流程完毕了但没有完全完毕。先将上述流程画个图总结下

Nacos 配置中心源码讲解

看出来执行上图中的配置监听,是通过队列机制触发的,事实上,不止添加监听器会向队列中加数据,还有其他几种方式:

Nacos 配置中心源码讲解

完整的通知变更来源有以下5点

  1. RPC 连接建立成功

  2. 服务端推送变更

  3. 内容发生变更

  4. 移除监听器

  5. 添加监听器

本篇篇幅太多,所以 服务务端推送变更实现原理 放在下一篇