一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|JAVA教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|JavaScript|易語言|

服務器之家 - 編程語言 - JAVA教程 - 從Nacos客戶端視角來分析一下配置中心實現原理

從Nacos客戶端視角來分析一下配置中心實現原理

2021-03-16 01:27今日Java麥洛 JAVA教程

今天我們一起從Nacos客戶端視角來看看配置中心實現原理;整理這篇文章時候,也參照學習了部分大佬的博客,這里致謝!

從Nacos客戶端視角來分析一下配置中心實現原理

一 動態配置

  • 1. 環境準備
  • 2.新建配置
  • 3.導入配置
  • 4.配置客戶端
  • 5. 修改配置信息
  • 6.小結

二 配置中心原理(推還是拉)

  • 1.實例化 ConfigService
  • 2.添加 Listener
  • 3.CacheData
  • 4.觸發回調
  • 5.Md5何時變更
  • 6.拉的優勢

三 總結

Hello,大家好,我是麥洛,今天我們一起從Nacos客戶端視角來看看配置中心實現原理;整理這篇文章時候,也參照學習了部分大佬的博客,這里致謝;

在開始閱讀文章之前,有些思路我按我的理解先闡述一些,方便大家更快理清思路,不對的地方還請大家批評指正;

  1. Nacos客戶端會在在本地緩存服務端配置文件,防止服務器奔潰情況下,導致服務不可用;
  2. 本地緩存類在代碼中的體現就是我們下面提到的CacheData,我們知道對應服務端一個配置,肯定可以同時被多個客戶端所使用,當這個配置發生變更,如何去通知到每一個客戶端?
  3. 客戶端啟動之后,回去注冊監視器,監視器最終會被保存到CacheData類中CopyOnWriteArrayList listeners字段,那么,反過來,當執行監視器回調方法時,就可以找到所有客戶端
  4. 長輪詢左右主要就是刷新配置,保持服務端配置和本地緩存配置保持一致;

首先,我們來看看Nacos官網給出的Nacos地圖,我們可以清楚的看到,動態配置服務是 Nacos 的三大功能之一;

從Nacos客戶端視角來分析一下配置中心實現原理

這里借用官網的描述,一起來看看Nacos 為我們帶來什么黑科技?

動態配置服務可以讓您以中心化、外部化和動態化的方式管理所有環境的應用配置和服務配置。動態配置消除了配置變更時重新部署應用和服務的需要,讓配置管理變得更加高效和敏捷。配置中心化管理讓實現無狀態服務變得更簡單,讓服務按需彈性擴展變得更容易。

所以,有了Nacos ,可能我們以前上線打包弄錯配置文件,改配置需要重啟服務等一系列問題,都會顯著改觀

一 動態配置

 

下面我將來和大家一起來了解下 Nacos 的動態配置的能力,看看 Nacos 是如何以簡單、優雅、高效的方式管理配置,實現配置的動態變更的。

我們用一個簡單的例子來了解下 Nacos 的動態配置的功能。

1. 環境準備

首先,我們需要搭建一個Nacos 服務端,由于官網的quick-start已經對此做了詳細的解讀,我們這里就不在贅述

從Nacos客戶端視角來分析一下配置中心實現原理
  1. https://nacos.io/zh-cn/docs/quick-start.html 

安裝完成之后啟動,我們就可以訪問 Nacos 的控制臺了,如下圖所示:

從Nacos客戶端視角來分析一下配置中心實現原理

Nacos控制臺做了簡單的權限控制,默認的賬號和密碼都是 nacos。

登錄進去之后,是這樣的:

從Nacos客戶端視角來分析一下配置中心實現原理

2.新建配置

接下來我們在控制臺上創建一個簡單的配置項,如下圖所示:

從Nacos客戶端視角來分析一下配置中心實現原理

3.導入配置

Nacos支持導入配置,可以直接將配置文件壓縮包導入,這里我們以人人開源的微服務項目為例

從Nacos客戶端視角來分析一下配置中心實現原理

從Nacos客戶端視角來分析一下配置中心實現原理
從Nacos客戶端視角來分析一下配置中心實現原理

4.配置客戶端

下面我以自己搭建的子服務為例,一起來看看Nacos配置中心的使用

首先我們需要配置一下,大家只需關注config節點配置就可以,discovery節點可以忽略

  1. cloud: 
  2.   nacos: 
  3.     discovery: 
  4.       metadata: 
  5.         management: 
  6.           context-path: ${server.servlet.context-path}/actuator 
  7.       server-addr: ${nacos-host:nacos-host}:${nacos-port:8848} 
  8.       #nacos的命名空間ID,默認是public 
  9.       namespace: ${nacos-namespace:} 
  10.       service: ets-web 
  11.     config: 
  12.       server-addr: ${spring.cloud.nacos.discovery.server-addr} 
  13.       namespace: ${spring.cloud.nacos.discovery.namespace} 
  14.       group: RENREN_CLOUD_GROUP 
  15.       file-extension: yaml 
  16.       #指定共享配置,且支持動態刷新 
  17.       extension-configs: 
  18.         - data-id: datasource.yaml 
  19.           group: ${spring.cloud.nacos.config.group
  20.           refresh: true 
  21.         - data-id: common.yaml 
  22.           group: ${spring.cloud.nacos.config.group
  23.           refresh: true 

其實extension-configs節點的配置信息對應的是下面的類

從Nacos客戶端視角來分析一下配置中心實現原理

接下來我們啟動服務,來看看控制臺日志

 從Nacos客戶端視角來分析一下配置中心實現原理

5. 修改配置信息

接下來我們在 Nacos 的控制臺上將我們的配置信息改為如下圖所示:

 從Nacos客戶端視角來分析一下配置中心實現原理

修改完配置,點擊 “發布” 按鈕后,客戶端將會收到最新的數據,如下圖所示:

從Nacos客戶端視角來分析一下配置中心實現原理

至此一個簡單的動態配置管理功能已經講完了,刪除配置和更新配置操作類似,這里不再贅述。

6.小結

通過上面的小案例,我們大概了解了Nacos動態配置的服務的使用方法,Nacos服務端將配置信息保存到其配置文件所配置的數據庫中,客戶端連接到服務端之后,根據 dataID,Group可以獲取到具體的配置信息,當服務端的配置發生變更時,客戶端會收到通知。當客戶端拿到變更后的最新配置信息后,就可以做自己的處理了,這非常有用,所有需要使用配置的場景都可以通過 Nacos 來進行管理。

二 配置中心原理(推還是拉)

 

現在我們了解了 Nacos 的動態配置服務的功能了,但是有一個問題我們需要弄明白,那就是 Nacos 客戶端是怎么實時獲取到 Nacos 服務端的最新數據的。

其實客戶端和服務端之間的數據交互,無外乎兩種情況:

  • 服務端推數據給客戶端
  • 客戶端從服務端拉數據

那到底是推還是拉呢,從 Nacos 客戶端通過 Listener 來接收最新數據的這個做法來看,感覺像是服務端推的數據,但是不能想當然,要想知道答案,最快最準確的方法就是從源碼中去尋找。

官方示例代碼

  1. try { 
  2.     // 傳遞配置 
  3.     String serverAddr = "{serverAddr}"
  4.     String dataId = "{dataId}"
  5.     String group = "{group}"
  6.     Properties properties = new Properties(); 
  7.     properties.put("serverAddr", serverAddr); 
  8.  
  9.     // 新建 configService 
  10.     ConfigService configService = NacosFactory.createConfigService(properties); 
  11.     String content = configService.getConfig(dataId, group, 5000); 
  12.     System.out.println(content); 
  13.  
  14.     // 注冊監聽器 
  15.     configService.addListener(dataId, group, new Listener() { 
  16.     @Override 
  17.     public void receiveConfigInfo(String configInfo) { 
  18.         System.out.println("recieve1:" + configInfo); 
  19.     } 
  20.     @Override 
  21.     public Executor getExecutor() { 
  22.         return null
  23.     } 
  24. }); 
  25. } catch (NacosException e) { 
  26.     // TODO  
  27.     -generated catch block 
  28.     e.printStackTrace(); 

1.實例化 ConfigService

當我們引包結束以后,會發現下面三個關于Nacos的包

從Nacos客戶端視角來分析一下配置中心實現原理

從我的理解來說,api包會調用client包的能力來和Nacos服務端進行交互.那再交互時候,主要就會用到我們接下來分析的實現了ConfigService接口的NacosConfigService 類

現在我們來看下 NacosConfigService 的構造方法,看看 ConfigService 是怎么實例化的,如下圖所示:

  1. public class NacosConfigService implements ConfigService { 
  2.      
  3.     private static final Logger LOGGER = LogUtils.logger(NacosConfigService.class); 
  4.      
  5.     private static final long POST_TIMEOUT = 3000L; 
  6.      
  7.     /** 
  8.      * http agent. 
  9.      */ 
  10.     private final HttpAgent agent; 
  11.      
  12.     /** 
  13.      * long polling.  這里是長輪詢 
  14.      */ 
  15.     private final ClientWorker worker; 
  16.      
  17.     private String namespace; 
  18.      
  19.     private final String encode; 
  20.     //省略其他代碼 

  1. //構造方法 
  2. ic NacosConfigService(Properties properties) throws NacosException { 
  3.     ValidatorUtils.checkInitParam(properties); 
  4.     String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE); 
  5.     if (StringUtils.isBlank(encodeTmp)) { 
  6.         this.encode = Constants.ENCODE; 
  7.     } else { 
  8.         this.encode = encodeTmp.trim(); 
  9.     } 
  10.     initNamespace(properties); 
  11.     //對象1 
  12.     this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties)); 
  13.     this.agent.start(); 
  14.     //對象2 
  15.     this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties); 

實例化時主要是初始化了兩個對象,他們分別是:

  • HttpAgent
  • ClientWorker

HttpAgent

其中 agent 是通過裝飾器模式實現的,ServerHttpAgent 是實際工作的類,MetricsHttpAgent 在內部也是調用了 ServerHttpAgent 的方法,另外加上了一些統計操作,所以我們只需要關心 ServerHttpAgent 的功能就可以了。

不熟悉的同學,可以看菜鳥教程對裝飾器模式的解讀

agent 實際是在 ClientWorker 中發揮能力的,而 ClientWorker 也是真正的打工人,下面我們來看下 ClientWorker 類。

ClientWorker

以下是 ClientWorker 的構造方法,如下圖所示:

  1. public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, 
  2.         final Properties properties) { 
  3.     this.agent = agent; 
  4.     this.configFilterChainManager = configFilterChainManager; 
  5.      
  6.     // Initialize the timeout parameter 
  7.      
  8.     init(properties); 
  9.     //創建了一個定時任務的線程池 
  10.     this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() { 
  11.         @Override 
  12.         public Thread newThread(Runnable r) { 
  13.             Thread t = new Thread(r); 
  14.             t.setName("com.alibaba.nacos.client.Worker." + agent.getName()); 
  15.             t.setDaemon(true); 
  16.             return t; 
  17.         } 
  18.     }); 
  19.     //創建了一個保持長輪詢的線程池 
  20.     this.executorService = Executors 
  21.             .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { 
  22.                 @Override 
  23.                 public Thread newThread(Runnable r) { 
  24.                     Thread t = new Thread(r); 
  25.                     t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName()); 
  26.                     t.setDaemon(true); 
  27.                     return t; 
  28.                 } 
  29.             }); 
  30.      
  31.     //創建了一個延遲任務線程池來每隔10ms來檢查配置信息的線程池 
  32.     this.executor.scheduleWithFixedDelay(new Runnable() { 
  33.         @Override 
  34.         public void run() { 
  35.             try { 
  36.                 checkConfigInfo(); 
  37.             } catch (Throwable e) { 
  38.                 LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e); 
  39.             } 
  40.         } 
  41.     }, 1L, 10L, TimeUnit.MILLISECONDS); 

可以看到 ClientWorker 除了將 HttpAgent 維持在自己內部,還創建了兩個線程池:

  1. final ScheduledExecutorService executor; 
  2.      
  3. final ScheduledExecutorService executorService; 
  • 第一個線程池負責與配置中心進行數據的交互,并且啟動后延遲1ms,之后每隔10ms對配置信息進行定時檢查
  • 第二個線程池則是負責保持一個長輪詢鏈接

接下來讓我們來看下 executor 每 10ms 執行的方法到底做了什么工作,如下圖所示:

  1. /** 
  2.  * groupKey -> cacheData. 
  3.  */ 
  4. private final AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<Map<String, CacheData>>( 
  5.         new HashMap<String, CacheData>()); 

  1. /** 
  2.   * Check config info. 檢查配置信息 
  3.   */ 
  4.  public void checkConfigInfo() { 
  5.      // 分任務(解決大數據量的傳輸問題) 
  6.      int listenerSize = cacheMap.get().size(); 
  7.      // 向上取整為批數,分批次進行檢查 
  8.      //ParamUtil.getPerTaskConfigSize() =3000 
  9.      int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); 
  10.      if (longingTaskCount > currentLongingTaskCount) { 
  11.          for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) { 
  12.              // 要判斷任務是否在執行 這塊需要好好想想。 任務列表現在是無序的。變化過程可能有問題 
  13.              executorService.execute(new LongPollingRunnable(i)); 
  14.          } 
  15.          currentLongingTaskCount = longingTaskCount; 
  16.      } 
  17.  } 

這里主要是先去拿緩存中 Map

現在我們來看看 LongPollingRunnable 做了什么,主要分為兩部分,

  • 第一部分是檢查本地的配置信息,
  • 第二部分是獲取服務端的配置信息然后更新到本地。

1.本地檢查

首先取出與該 taskId 相關的 CacheData,然后對 CacheData 進行檢查,包括本地配置檢查和緩存數據的 md5 檢查,本地檢查主要是做一個故障容錯,當服務端掛掉后,Nacos 客戶端可以從本地的文件系統中獲取相關的配置信息,如下圖所示:

  1. public void run() { 
  2.              
  3.             List<CacheData> cacheDatas = new ArrayList<CacheData>(); 
  4.             List<String> inInitializingCacheList = new ArrayList<String>(); 
  5.             try { 
  6.                 // 
  7.                 for (CacheData cacheData : cacheMap.get().values()) { 
  8.                     if (cacheData.getTaskId() == taskId) { 
  9.                         cacheDatas.add(cacheData); 
  10.                         try { 
  11.                             //執行檢查本地配置 
  12.                             checkLocalConfig(cacheData); 
  13.                             if (cacheData.isUseLocalConfigInfo()) { 
  14.                                 //緩存數據的md5的檢查 
  15.                                 cacheData.checkListenerMd5(); 
  16.                             } 
  17.                         } catch (Exception e) { 
  18.                             LOGGER.error("get local config info error", e); 
  19.                         } 
  20.                     } 
  21.                 } 
  22.                
  23.         } 

  1. //檢查本地配置 
  2.      
  3. private void checkLocalConfig(CacheData cacheData) { 
  4.         final String dataId = cacheData.dataId; 
  5.         final String group = cacheData.group
  6.         final String tenant = cacheData.tenant; 
  7.     //本地緩存文件 
  8.         File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant); 
  9.         //不使用本地配置,但是持久化文件存在,需要讀取文件加載至內存 
  10.         if (!cacheData.isUseLocalConfigInfo() && path.exists()) { 
  11.             String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant); 
  12.             final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE); 
  13.             cacheData.setUseLocalConfigInfo(true); 
  14.             cacheData.setLocalConfigInfoVersion(path.lastModified()); 
  15.             cacheData.setContent(content); 
  16.              
  17.             LOGGER.warn( 
  18.                     "[{}] [failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}"
  19.                     agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content)); 
  20.             return
  21.         } 
  22.          
  23.        // 有 -> 沒有。不通知業務監聽器,從server拿到配置后通知。 
  24.         //使用本地配置,但是持久化文件不存在  
  25.         if (cacheData.isUseLocalConfigInfo() && !path.exists()) { 
  26.             cacheData.setUseLocalConfigInfo(false); 
  27.             LOGGER.warn("[{}] [failover-change] failover file deleted. dataId={}, group={}, tenant={}", agent.getName(), 
  28.                     dataId, group, tenant); 
  29.             return
  30.         } 
  31.          
  32.         // 有變更 
  33.         //使用本地配置,持久化文件存在,緩存跟文件最后修改時間不一致 
  34.         if (cacheData.isUseLocalConfigInfo() && path.exists() && cacheData.getLocalConfigInfoVersion() != path 
  35.                 .lastModified()) { 
  36.             String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant); 
  37.             final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE); 
  38.             cacheData.setUseLocalConfigInfo(true); 
  39.             cacheData.setLocalConfigInfoVersion(path.lastModified()); 
  40.             cacheData.setContent(content); 
  41.             LOGGER.warn( 
  42.                     "[{}] [failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}"
  43.                     agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content)); 
  44.         } 
  45.     } 

本地檢查主要是通過是否使用本地配置,繼而尋找持久化緩存文件,再通過判斷文件的最后修改事件與本地緩存的版本是否一致來判斷是否由變更

通過跟蹤 checkLocalConfig 方法,可以看到 Nacos 將緩存配置信息保存在了

  1. ~/nacos/config/fixed-{address}_8848_nacos/snapshot/DEFAULT_GROUP/{dataId} 

這個文件中,我們看下這個文件中保存的內容,如下圖所示:

從Nacos客戶端視角來分析一下配置中心實現原理

2.服務端檢查

然后通過 checkUpdateDataIds() 方法從服務端獲取值變化的 dataId 列表,

通過 getServerConfig 方法,根據 dataId 到服務端獲取最新的配置信息,接著將最新的配置信息保存到 CacheData 中。

最后調用 CacheData 的 checkListenerMd5 方法,可以看到該方法在第一部分也被調用過,我們需要重點關注一下。

  1. // 檢查服務器配置 
  2.   List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList); 
  3.   if (!CollectionUtils.isEmpty(changedGroupKeys)) { 
  4.       LOGGER.info("get changedGroupKeys:" + changedGroupKeys); 
  5.   } 
  6.    
  7.   for (String groupKey : changedGroupKeys) { 
  8.       String[] key = GroupKey.parseKey(groupKey); 
  9.       String dataId = key[0]; 
  10.       String group = key[1]; 
  11.       String tenant = null
  12.       if (key.length == 3) { 
  13.           tenant = key[2]; 
  14.       } 
  15.       try { 
  16.           //從服務器端獲取相關id的最新配置 
  17.           String[] ct = getServerConfig(dataId, group, tenant, 3000L); 
  18.           CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant)); 
  19.           cache.setContent(ct[0]); 
  20.           if (null != ct[1]) { 
  21.               cache.setType(ct[1]); 
  22.           } 
  23.           LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}"
  24.                   agent.getName(), dataId, group, tenant, cache.getMd5(), 
  25.                   ContentUtils.truncateContent(ct[0]), ct[1]); 
  26.       } catch (NacosException ioe) { 
  27.           String message = String 
  28.                   .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s"
  29.                           agent.getName(), dataId, group, tenant); 
  30.           LOGGER.error(message, ioe); 
  31.       } 
  32.   } 
  33.   for (CacheData cacheData : cacheDatas) { 
  34.       if (!cacheData.isInitializing() || inInitializingCacheList 
  35.               .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) { 
  36.           //校驗MD5值 
  37.           cacheData.checkListenerMd5(); 
  38.           cacheData.setInitializing(false); 
  39.       } 
  40.   } 
  41.   inInitializingCacheList.clear(); 
  42.    
  43.   executorService.execute(this); 
  44.    
  45. catch (Throwable e) { 
  46.    
  47.   // If the rotation training task is abnormal, the next execution time of the task will be punished 
  48.   LOGGER.error("longPolling error : ", e); 
  49.   executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); 

這里大家也發現,當客戶端從服務器拉去配置文件之后,會將配置文件在本地進行緩存,所以,一般會優先使用本地配置,如果本地文件不存在或者內容為空,則再通過 HTTP GET 方法從遠端拉取配置,并保存到本地緩存中

  1. private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException { 
  2.       group = null2defaultGroup(group); 
  3.       ParamUtils.checkKeyParam(dataId, group); 
  4.       ConfigResponse cr = new ConfigResponse(); 
  5.        
  6.       cr.setDataId(dataId); 
  7.       cr.setTenant(tenant); 
  8.       cr.setGroup(group); 
  9.        
  10.       // 優先使用本地配置 
  11.       String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant); 
  12.       if (content != null) { 
  13.           LOGGER.warn("[{}] [get-config] get failover ok, dataId={}, group={}, tenant={}, config={}", agent.getName(), 
  14.                   dataId, group, tenant, ContentUtils.truncateContent(content)); 
  15.           cr.setContent(content); 
  16.           configFilterChainManager.doFilter(null, cr); 
  17.           content = cr.getContent(); 
  18.           return content; 
  19.       } 
  20.        
  21.       try { 
  22.           String[] ct = worker.getServerConfig(dataId, group, tenant, timeoutMs); 
  23.           cr.setContent(ct[0]); 
  24.            
  25.           configFilterChainManager.doFilter(null, cr); 
  26.           content = cr.getContent(); 
  27.            
  28.           return content; 
  29.       } catch (NacosException ioe) { 
  30.           if (NacosException.NO_RIGHT == ioe.getErrCode()) { 
  31.               throw ioe; 
  32.           } 
  33.           LOGGER.warn("[{}] [get-config] get from server error, dataId={}, group={}, tenant={}, msg={}"
  34.                   agent.getName(), dataId, group, tenant, ioe.toString()); 
  35.       } 
  36.        
  37.       LOGGER.warn("[{}] [get-config] get snapshot ok, dataId={}, group={}, tenant={}, config={}", agent.getName(), 
  38.               dataId, group, tenant, ContentUtils.truncateContent(content)); 
  39.       content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant); 
  40.       cr.setContent(content); 
  41.       configFilterChainManager.doFilter(null, cr); 
  42.       content = cr.getContent(); 
  43.       return content; 
  44.   } 

2.添加 Listener

好了現在我們可以為 ConfigService 來添加一個 Listener 了,最終是調用了 ClientWorker 的 addTenantListeners 方法,如下圖所示:

  1. /** 
  2.  * Add listeners for tenant. 
  3.  * 
  4.  * @param dataId    dataId of data 
  5.  * @param group     group of data 
  6.  * @param listeners listeners 
  7.  * @throws NacosException nacos exception 
  8.  */ 
  9. public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners) 
  10.         throws NacosException { 
  11.     //設置默認組 
  12.     group = null2defaultGroup(group); 
  13.     String tenant = agent.getTenant(); 
  14.     CacheData cache = addCacheDataIfAbsent(dataId, group, tenant); 
  15.     for (Listener listener : listeners) { 
  16.         cache.addListener(listener); 
  17.     } 

該方法分為兩個部分,首先根據 dataId,group 和tenant獲取一個 CacheData 對象,然后將當前要添加的 listener 對象添加到 CacheData 中去。

接下來,我們要重點關注下 CacheData 類了。

3.本地緩存CacheData

首先讓我們來看一下 CacheData 中的成員變量,如下圖所示:

  1. private final String name
  2.  
  3. private final ConfigFilterChainManager configFilterChainManager; 
  4.  
  5. public final String dataId; 
  6.  
  7. public final String group
  8.  
  9. public final String tenant; 
  10. //監聽器 
  11.  
  12. private final CopyOnWriteArrayList<ManagerListenerWrap> listeners; 
  13.  
  14. private volatile String md5; 
  15.  
  16. /** 
  17.  * whether use local config. 
  18.  */ 
  19. private volatile boolean isUseLocalConfig = false
  20.  
  21. /** 
  22.  * last modify time
  23.  */ 
  24. private volatile long localConfigLastModified; 
  25.  
  26. private volatile String content; 
  27.  
  28. private int taskId; 
  29.  
  30. private volatile boolean isInitializing = true
  31.  
  32. private String type; 

我們可以看到,成員變量包括tenant ,dataId,group,content,taskId等,還有兩個值得我們關注的:

  • listeners
  • md5

listeners 是該 CacheData 所關聯的所有 listener,不過不是保存的原始的 Listener對象,而是包裝后的 ManagerListenerWrap 對象,該對象除了持有 Listener 對象,還持有了一個 lastCallMd5 和lastContent屬性。

  1. private static class ManagerListenerWrap { 
  2.       
  3.      final Listener listener; 
  4.       
  5.      //關注 
  6.      String lastCallMd5 = CacheData.getMd5String(null); 
  7.       
  8.      String lastContent = null
  9.       
  10.      ManagerListenerWrap(Listener listener) { 
  11.          this.listener = listener; 
  12.      } 
  13.       
  14.      ManagerListenerWrap(Listener listener, String md5) { 
  15.          this.listener = listener; 
  16.          this.lastCallMd5 = md5; 
  17.      } 
  18.       
  19.      ManagerListenerWrap(Listener listener, String md5, String lastContent) { 
  20.          this.listener = listener; 
  21.          this.lastCallMd5 = md5; 
  22.          this.lastContent = lastContent; 
  23.      } 
  24.       
  25.  } 

另外一個屬性 md5 就是根據當前對象的 content 計算出來的 md5 值。

4.觸發監聽器回調

現在我們對 ConfigService 有了大致的了解了,現在剩下最后一個重要的問題還沒有答案,那就是 ConfigService 的 Listener 是在什么時候觸發回調方法 receiveConfigInfo 的。

現在讓我們回過頭來想一下,在 ClientWorker 中的定時任務中,啟動了一個長輪詢的任務:LongPollingRunnable,該任務多次執行了 cacheData.checkListenerMd5() 方法,那現在就讓我們來看下這個方法到底做了些什么,如下圖所示:

  1. void checkListenerMd5() { 
  2.     for (ManagerListenerWrap wrap : listeners) { 
  3.         if (!md5.equals(wrap.lastCallMd5)) { 
  4.             safeNotifyListener(dataId, group, content, type, md5, wrap); 
  5.         } 
  6.     } 

到這里應該就比較清晰了,該方法會檢查 CacheData 當前的 md5 與 CacheData 持有的所有 Listener 中保存的 md5 的值是否一致,如果不一致,就執行一個安全的監聽器的通知方法:safeNotifyListener,通知什么呢?我們可以大膽的猜一下,應該是通知 Listener 的使用者,該 Listener 所關注的配置信息已經發生改變了。現在讓我們來看一下 safeNotifyListener 方法,如下圖所示:

  1. private void safeNotifyListener(final String dataId, final String group, final String content, final String type, 
  2.           final String md5, final ManagerListenerWrap listenerWrap) { 
  3.       final Listener listener = listenerWrap.listener; 
  4.        
  5.       Runnable job = new Runnable() { 
  6.           @Override 
  7.           public void run() { 
  8.               ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader(); 
  9.               ClassLoader appClassLoader = listener.getClass().getClassLoader(); 
  10.               try { 
  11.                   if (listener instanceof AbstractSharedListener) { 
  12.                       AbstractSharedListener adapter = (AbstractSharedListener) listener; 
  13.                       adapter.fillContext(dataId, group); 
  14.                       LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}"name, dataId, group, md5); 
  15.                   } 
  16.                   // 執行回調之前先將線程classloader設置為具體webapp的classloader,以免回調方法中調用spi接口是出現異常或錯用(多應用部署才會有該問題)。 
  17.                   Thread.currentThread().setContextClassLoader(appClassLoader); 
  18.                    
  19.                   ConfigResponse cr = new ConfigResponse(); 
  20.                   cr.setDataId(dataId); 
  21.                   cr.setGroup(group); 
  22.                   cr.setContent(content); 
  23.                    
  24.                   //重點關注,在這里調用 
  25.                   //重點關注,在這里調用 
  26.                   //重點關注,在這里調用 
  27.                    
  28.                   configFilterChainManager.doFilter(null, cr); 
  29.                   String contentTmp = cr.getContent(); 
  30.                   listener.receiveConfigInfo(contentTmp); 
  31.                    
  32.                    
  33.                    
  34.                    
  35.                   // compare lastContent and content 
  36.                   if (listener instanceof AbstractConfigChangeListener) { 
  37.                       Map data = ConfigChangeHandler.getInstance() 
  38.                               .parseChangeData(listenerWrap.lastContent, content, type); 
  39.                       ConfigChangeEvent event = new ConfigChangeEvent(data); 
  40.                       ((AbstractConfigChangeListener) listener).receiveConfigChange(event); 
  41.                       listenerWrap.lastContent = content; 
  42.                   } 
  43.                    
  44.                   listenerWrap.lastCallMd5 = md5; 
  45.                   LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} "name, dataId, group, md5, 
  46.                           listener); 
  47.               } catch (NacosException ex) { 
  48.                   LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}"
  49.                           name, dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg()); 
  50.               } catch (Throwable t) { 
  51.                   LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}"name, dataId, 
  52.                           group, md5, listener, t.getCause()); 
  53.               } finally { 
  54.                   Thread.currentThread().setContextClassLoader(myClassLoader); 
  55.               } 
  56.           } 
  57.       }; 
  58.        
  59.       final long startNotify = System.currentTimeMillis(); 
  60.       try { 
  61.           if (null != listener.getExecutor()) { 
  62.               listener.getExecutor().execute(job); 
  63.           } else { 
  64.               job.run(); 
  65.           } 
  66.       } catch (Throwable t) { 
  67.           LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}"name, dataId, 
  68.                   group, md5, listener, t.getCause()); 
  69.       } 
  70.       final long finishNotify = System.currentTimeMillis(); 
  71.       LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} "
  72.               name, (finishNotify - startNotify), dataId, group, md5, listener); 
  73.   } 

可以看到在 safeNotifyListener 方法中,重點關注下紅框中的三行代碼:獲取最新的配置信息,調用 Listener 的回調方法,將最新的配置信息作為參數傳入,這樣 Listener 的使用者就能接收到變更后的配置信息了,最后更新 ListenerWrap 的 md5 值。和我們猜測的一樣, Listener 的回調方法就是在該方法中觸發的。

5.Md5何時變更

那 CacheData 的 md5 值是何時發生改變的呢?我們可以回想一下,在上面的 LongPollingRunnable 所執行的任務中,在獲取服務端發生變更的配置信息時,將最新的 content 數據寫入了 CacheData 中,我們可以看下該方法如下:

  1. public void setContent(String content) { 
  2.      this.content = content; 
  3.      this.md5 = getMd5String(this.content); 
  4.  } 

可以看到是在長輪詢的任務中,當服務端配置信息發生變更時,客戶端將最新的數據獲取下來之后,保存在了 CacheData 中,同時更新了該 CacheData 的 md5 值,所以當下次執行 checkListenerMd5 方法時,就會發現當前 listener 所持有的 md5 值已經和 CacheData 的 md5 值不一樣了,也就意味著服務端的配置信息發生改變了,這時就需要將最新的數據通知給 Listener 的持有者。

至此配置中心的完整流程已經分析完畢了,可以發現,Nacos 并不是通過推的方式將服務端最新的配置信息發送給客戶端的,而是客戶端維護了一個長輪詢的任務,定時去拉取發生變更的配置信息,然后將最新的數據推送給 Listener 的持有者。

6.為什么要拉?

客戶端拉取服務端的數據與服務端推送數據給客戶端相比,優勢在哪呢,為什么 Nacos 不設計成主動推送數據,而是要客戶端去拉取呢?如果用推的方式,服務端需要維持與客戶端的長連接,這樣的話需要耗費大量的資源,并且還需要考慮連接的有效性,例如需要通過心跳來維持兩者之間的連接。而用拉取的方式,客戶端只需要通過一個無狀態的 http 請求即可獲取到服務端的數據。

三 總結

 

 從Nacos客戶端視角來分析一下配置中心實現原理

現在,我們來簡單復盤一下Nacos客戶端視角下的配置中心實現原理

首先我們假設Nacos服務端一切正常,Nacos客戶端啟動以后

第一步是根據我們配置的服務端信息,新建 ConfigService 實例,它的實現就是我們文中提到的NacosConfigService;

第二步可以通過相應的接口獲取配置和注冊配置監聽器,

考慮到服務端故障的問題,客戶端將最新數據獲取后會保存在本地的 緩存文件中,以后會優先從文件中獲取配置信息的值,如果獲取不到,會直接從服務器拉去,并保存到緩存中;

其實真正干活的就是ClientWorker類;客戶端是通過一個定時的長輪詢來檢查自己監聽的配置項的數據的,一旦服務端的數據發生變化時,會從服務端獲取到dataID的列表,

客戶端根據dataID列表從服務端獲取到最新的數據,并將最新的數據保存在一個 CacheData 對象中,在輪詢過程中,如果決定使用本地配置,就會比較當前CacheData 的MD5值是否和所有監聽者所持有的MD5值相等,如果不相等,,此時就會對該 CacheData 所綁定的 Listener 觸發 receiveConfigInfo 回調,來通知使用者此配置信息已經變更;

原文地址:https://mp.weixin.qq.com/s/qmT-SsYr6yPmqEtN-4XAoQ

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 91在线 一区 二区三区 | 日韩免费一级片 | 亚洲精品有码在线观看 | 欧美性4khd720 | 精品国产麻豆AV无码 | 99er在线观看| 亚洲好骚综合 | 日本视频观看 | 国产精品第 | a在线观看欧美在线观看 | 国产一级在线观看视频 | 999久久久免费精品国产牛牛 | 日韩性事| 福利国模私拍视频在线观看 | 欧美美女一区二区三区 | 国产成人精品视频一区 | 精品视频 久久久 | 欧美国产精品久久 | 亚洲欧美日韩综合一区久久 | 成人猫咪maomiav永久网址 | 吃胸膜奶视频456 | 欧美精品久久一区二区三区 | 国产精品亚洲片在线va | 热剧库| 四虎最新网址在线观看 | 特级非洲黑人一级毛片 | 91国在线观看 | 国产原创一区二区 | 国产第一页在线视频 | 亚州vs欧州vs日 | 视频在线观看一区二区三区 | 国产成人精品免费2021 | 欧美怡红院视频一区二区三区 | 欧美一级片免费 | 午夜深情在线观看免费 | jizz女16处 | 欧美人交性视频在线香蕉 | 欧美久草在线 | 精品精品国产自在久久高清 | 国产重口老太伦 | 天天色国产 |