Nacos源码集群数据同步
生活随笔
收集整理的這篇文章主要介紹了
Nacos源码集群数据同步
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
在DistroConsistencyServiceImpl的put方法中分為兩步:
?
其中的onPut方法已經(jīng)分析過了。
下面的distroProtocol.sync()就是集群同步的邏輯了。
DistroProtocol類的sync方法如下:
public void sync(DistroKey distroKey, DataOperation action, long delay) {// 遍歷 Nacos 集群中除自己以外的其它節(jié)點for (Member each : memberManager.allMembersWithoutSelf()) {DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),each.getAddress());// 定義一個Distro的同步任務DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);// 交給線程池去執(zhí)行distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());}} }其中同步的任務封裝為一個DistroDelayTask對象。
交給了distroTaskEngineHolder.getDelayTaskExecuteEngine()執(zhí)行,這行代碼的返回值是:
NacosDelayTaskExecuteEngine,這個類維護了一個線程池,并且接收任務,執(zhí)行任務。
執(zhí)行任務的方法為processTasks()方法:
protected void processTasks() {Collection<Object> keys = getAllTaskKeys();for (Object taskKey : keys) {AbstractDelayTask task = removeTask(taskKey);if (null == task) {continue;}NacosTaskProcessor processor = getProcessor(taskKey);if (null == processor) {getEngineLog().error("processor not found for task, so discarded. " + task);continue;}try {// 嘗試執(zhí)行同步任務,如果失敗會重試if (!processor.process(task)) {retryFailedTask(taskKey, task);}} catch (Throwable e) {getEngineLog().error("Nacos task execute error : " + e.toString(), e);retryFailedTask(taskKey, task);}} }可以看出來基于Distro模式的同步是異步進行的,并且失敗時會將任務重新入隊并充實,因此不保證同步結(jié)果的強一致性,屬于AP模式的一致性策略。
總結(jié)
以上是生活随笔為你收集整理的Nacos源码集群数据同步的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Nacos源码覆盖实例列表
- 下一篇: Nacos服务端流程图