Yarn ResourceManager High Availability
2019獨角獸企業重金招聘Python工程師標準>>>
在 Hadoop 生態中(Hadoop2.x及以后版本), JobTracker 和 TaskTracker 演變為 Yarn 作為 Hadoop 的資源管理器。 同時, MapReduce、Spark、Flink、等計算框架也支持 Yarn 來調度, 因此, Yarn 高可用極為重要。 關于 Yarn 相關內容, 詳情查看Apache Hadoop YARN Architecture, 這里對 Yarn ResourceManager 的 HA 做一個簡單介紹。 文章部分來自官方文檔。
ResourceManager HA 的目的是當 Active RM 無法工作時, Standby RM 能夠階梯正在服務的 Active RM, 防止集群出現不可用狀態。
體系結構
ResourceManager HA 通過 Active/Standby 體系結構實現, 即在任意時刻, 都有一個 RM(ResourceManager) 處于 Active 狀態, 一個或多個 RM 處于 Standby 狀態, 如果 Active RM 發生故障, 由 Standby 的 RM 接管 Active RM 的工作。 啟動自動故障轉移時, 通過 Admin 接口或集成故障轉移器將 Standby RM 轉換為 Active RM。
手動轉換和故障轉移
不啟動自動故障轉移時, 管理員必須手動將其中一個 RM 轉換成 Active 狀態。 要從另一個 RM 進行故障轉移到另一個 RM, 需要先將 Active-RM 轉換成 Standby, 并將備用的 RM 轉換成 Active。 這些操作可以通過 Yarn admin client 完成。
自動故障轉移
RM 可以選擇基于 zookeeper 的 ActiveStandbyElector 來決定那個 RM 是 Active 的, 當 Active 的 RM 停止或無響應時, 自動選擇一個 Standby RM 作為 Active RM 來接管。 RM 的 HA 不需要單獨的守護進程(如 HDFS 的 ZKFC), 而是由嵌入到 RM 中的 ActiveStandbyElector 充當故障檢測器和 leader elector。
RM 故障轉移中的 Client、ApplicationMaster、NodeManager
當存在多個 RM 時, Client 和 所有節點的配置(yarn-site.xml)中需要列出所有的 RM, Client、AM(ApplicationMaster)、NodeManager 將循環嘗試連接 RM, 知道連接到 Active 的 RM。 如果 Active 的 RM 停止了, 將繼續輪詢, 直到能連接到新的 Active 的 RM 為止。 可以通過實現 org.apache.hadoop.yarn.client.RMFailoverProxyProvider 或者 配置 yarn.client.failover-proxy-provider 來實現。
Active-RM 狀態恢復
啟用 ResourceManager 后, 將 RM 狀態轉換成 Active 狀態需要加載 RM 內部狀態, 并根據 RM Restart 特性盡可能從之前停止的位置繼續執行。 對于之前提交給 RM 托管的 Application, 都會生成一個新的 Application。 Application 可以定期 CheckPoint, 以免丟失任何 work。 狀態存儲必須在 Active & Standby 的 RM 中可見, 目前有兩種用于持久化的 RMStateStore 實現: FileSystemRMStateStore 和 ZKRMStateStore。 ZKRMStateStore 隱式允許任何時間任何節點對單個 RM 進行寫訪問, 因此官方推薦使用 ZKRMStateStore。 在使用 ZKRMStateStore 時, 不需要單獨的隔離機制來處理潛在的腦裂情況。 在這種情況下, 多個 RM 可以潛在地承擔活動角色。 在使用 ZKRMStateStore 時, 官方建議不要在 zookeeper 集群中設置 zookeeper.DigestAuthenticationProvider, 同時 zookeeper 管理員用戶不能有 Yarn 的 application/user 的憑證信息。
配置清單
| yarn.resourcemanager.zk-address | zk-quorum的地址。同時用于狀態存儲和leader選舉。 |
| yarn.resourcemanager.ha.enabled | Enable RM HA. |
| yarn.resourcemanager.ha.rm-ids | RM 的邏輯 ID, 比如 rm1, rm2 |
| yarn.resourcemanager.hostname.rm-id | 為每個 rm-id 聲明一個對應的主機名, 也可以聲明 RM 的服務地址來替換。 |
| yarn.resourcemanager.address.rm-id | 為每個 rm-id 聲明一個對應的 RM 服務地址, 也可以聲明 rm 對應主機名來替換。 |
| yarn.resourcemanager.scheduler.address.rm-id | For each rm-id, specify scheduler host:port for ApplicationMasters to obtain resources. If set, overrides the hostname set inv yarn.resourcemanager.hostname.rm-id. |
| yarn.resourcemanager.resource-tracker.address.rm-id | For each rm-id, specify host:port for NodeManagers to connect. If set, overrides the hostname set in yarn.resourcemanager.hostname.rm-id. |
| yarn.resourcemanager.admin.address.rm-id | For each rm-id, specify host:port for administrative commands. If set, overrides the hostname set in yarn.resourcemanager.hostname.rm-id. |
| yarn.resourcemanager.webapp.address.rm-id | For each rm-id, specify host:port of the RM web application corresponds to. You do not need this if you set yarn.http.policy to HTTPS_ONLY. If set, overrides the hostname set in yarn.resourcemanager.hostname.rm-id. |
| yarn.resourcemanager.webapp.https.address.rm-id | For each rm-id, specify host:port of the RM https web application corresponds to. You do not need this if you set yarn.http.policy to HTTP_ONLY. If set, overrides the hostname set in yarn.resourcemanager.hostname.rm-id. |
| yarn.resourcemanager.ha.id | Identifies the RM in the ensemble. This is optional; however, if set, admins have to ensure that all the RMs have their own IDs in the config. |
| yarn.resourcemanager.ha.automatic-failover.enabled | 啟動自動故障轉移, 啟用 RM HA 后默認開啟。 |
| yarn.resourcemanager.ha.automatic-failover.embedded | 啟用后, 通過內置的 leader 選舉來選 Active RM。 啟用 RM HA 時默認開啟。 |
| yarn.resourcemanager.cluster-id | 集群標識, 確保 RM 不會接管另一個集群(即不會成為其他集群的 Active RM)。 |
| yarn.client.failover-proxy-provider | Client、AM、NM 連接 Active RM 故障轉移的類。 |
| yarn.client.failover-max-attempts | FailoverProxyProvider 嘗試故障轉移的最大次數。 |
| yarn.client.failover-sleep-base-ms | 故障轉移之間計算延遲的 sleep 毫秒數。 |
| yarn.client.failover-sleep-max-ms | 故障轉移之間的 sleep 最大毫秒數。 |
| yarn.client.failover-retries | 每次連接 RM 的重試次數。 |
| yarn.client.failover-retries-on-socket-timeouts | 每次連接 RM 的 socket 超時重試次數。 |
可以根據以上配置項對 RM HA 進行優化。
簡單文件 sample:
<property><name>yarn.resourcemanager.ha.enabled</name><value>true</value> </property> <property><name>yarn.resourcemanager.cluster-id</name><value>cluster1</value> </property> <property><name>yarn.resourcemanager.ha.rm-ids</name><value>rm1,rm2</value> </property> <property><name>yarn.resourcemanager.hostname.rm1</name><value>master1</value> </property> <property><name>yarn.resourcemanager.hostname.rm2</name><value>master2</value> </property> <property><name>yarn.resourcemanager.webapp.address.rm1</name><value>master1:8088</value> </property> <property><name>yarn.resourcemanager.webapp.address.rm2</name><value>master2:8088</value> </property> <property><name>yarn.resourcemanager.zk-address</name><value>zk1:2181,zk2:2181,zk3:2181</value> </property>切換 Active RM
# 獲取 active 狀態 yarn rmadmin -getServiceState rm1 yarn rmadmin -getServiceState rm2# 切換 rm1 到 active 狀態 yarn rmadmin -transitionToActive rm1RM HA & ZK
RM HA 中使用 ZK 的地方是 ZKRMStateStore 和 ZKFailoverController。
ZKRMStateStore
ZKRMStateStore 繼承了抽象類 RMStateStore, 用來存儲 RM 的狀態。
RMStateStore 中包含對 RMState, RMDTSecretManagerState, ApplicationStaateData, ApplicationAttemptStateData 的 store, load, remove, update 操作。
在 RM 啟動時, 會加載上述幾種狀態(RMStateStore#loadState(), 見 ResourceManager#RMActiveServices#serviceStart()):
protected void serviceStart() throws Exception {RMStateStore rmStore = rmContext.getStateStore();// The state store needs to start irrespective of recoveryEnabled as apps// need events to move to further states.rmStore.start();pauseMonitor.start();if(recoveryEnabled) {try {LOG.info("Recovery started");rmStore.checkVersion();if (rmContext.isWorkPreservingRecoveryEnabled()) {rmContext.setEpoch(rmStore.getAndIncrementEpoch());}// 加載上一次的 RMStateRMState state = rmStore.loadState();recover(state);LOG.info("Recovery ended");} catch (Exception e) {// the Exception from loadState() needs to be handled for// HA and we need to give up master status if we got fencedLOG.error("Failed to load/recover state", e);throw e;}}super.serviceStart(); }ZK 中存儲 RM 狀態目錄結構如下:
ROOT_DIR_PATH |--- VERSION_INFO |--- EPOCH_NODE # RM 重啟的元信息 |--- RM_ZK_FENCING_LOCK |--- RM_APP_ROOT | |----- (#ApplicationId1) | | |----- (#ApplicationAttemptIds) | | | |----- (#ApplicationId2) | | |----- (#ApplicationAttemptIds) | .... | |--- RM_DT_SECRET_MANAGER_ROOT|----- RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME|----- RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME| |----- Token_1| |----- Token_2| ....||----- RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME| |----- Key_1| |----- Key_2.... |--- AMRMTOKEN_SECRET_MANAGER_ROOT|----- currentMasterKey|----- nextMasterKey |-- RESERVATION_SYSTEM_ROOT|------PLAN_1| |------ RESERVATION_1| |------ RESERVATION_2| ....|------PLAN_2....存儲的信息主要包 Application 和 SECRET_MANAGER (安全與權限相關) 的信息。
實現隔離
ZKRMStoreStateStore#startInternal() 會隔離相關路徑、ACL、OPS。
private synchronized void fence() throws Exception {if (LOG.isTraceEnabled()) {logRootNodeAcls("Before fencing\n");}curatorFramework.setACL().withACL(zkRootNodeAcl).forPath(zkRootNodePath);delete(fencingNodePath);if (LOG.isTraceEnabled()) {logRootNodeAcls("After fencing\n");} }還原狀態
RMActiveServices 在啟動時 (serviceStart) 會調用 RMStateStore#loadState() 方法加載已經存儲的 RM 狀態。 RM HA 啟動后默認進入 Standby 狀態, 通過手動或者配置自動選舉的方式選擇 Active, 此時 RM 會加載已經存儲的狀態并還原。
ActiveStandbyElector
ActiveStandbyElector 主要負責完成自動的主被選舉(NameNode/ResourceManager), 內部封裝 ZK 的處理邏輯, 一旦主被選舉完成, 回調進行切換主備。
實現分析
- 創建鎖節點
如果目前還沒有進行過主備選舉的話, 那么相應的 ActiveStandbyElector 就會發起一次主備選舉, Zookeeper 的寫一致性會保證最終只會有一個 ActiveStandbyElector 創建成功, 創建成功的 ActiveStandbyElector 對應的 RM 切換成 Active RM, 創建失敗的 ActiveStandbyElector 對應的 RM 為 Stabdby RM, ActiveStandbyElector 回調 EmbeddedElectorService 的方法將對應的 RM 切換為相應的 RM。
- 注冊 Watcher 監聽
注冊 Watcher 的實現在 org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore 中, 關注點為 org.apache.zookeeper.Watcher.Event.EventType#NodeDeleted 和 org.apache.zookeeper.Watcher.Event.EventType#NodeDataChanged 的事件, 詳見 org.apache.hadoop.ha.ActiveStandbyElector#processWatchEvent()。 具體實現如下:
/*** interface implementation of Zookeeper watch events (connection and node),* 監控對應 ZNode 的 change 或 delete 事件。* proxied by {@link WatcherWithClientRef}.*/ synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {Event.EventType eventType = event.getType();if (isStaleClient(zk)) return;LOG.debug("Watcher event type: " + eventType + " with state:"+ event.getState() + " for path:" + event.getPath()+ " connectionState: " + zkConnectionState+ " for " + this);if (eventType == Event.EventType.None) {// the connection state has changedswitch (event.getState()) {case SyncConnected:LOG.info("Session connected.");// if the listener was asked to move to safe state then it needs to// be undoneConnectionState prevConnectionState = zkConnectionState;zkConnectionState = ConnectionState.CONNECTED;if (prevConnectionState == ConnectionState.DISCONNECTED &&wantToBeInElection) {monitorActiveStatus();}break;case Disconnected:LOG.info("Session disconnected. Entering neutral mode...");// ask the app to move to safe state because zookeeper connection// is not active and we dont know our statezkConnectionState = ConnectionState.DISCONNECTED;enterNeutralMode();break;case Expired:// the connection got terminated because of session timeout// call listener to reconnectLOG.info("Session expired. Entering neutral mode and rejoining...");enterNeutralMode();reJoinElection(0);break;case SaslAuthenticated:LOG.info("Successfully authenticated to ZooKeeper using SASL.");break;default:fatalError("Unexpected Zookeeper watch event state: "+ event.getState());break;}return; } /*** Watcher implementation which forward events to the ZKRMStateStore This* hides the ZK methods of the store from its public interface*/ private final class ForwardingWatcher implements Watcher {private ZooKeeper watchedZkClient;public ForwardingWatcher(ZooKeeper client) {this.watchedZkClient = client;}@Overridepublic void process(WatchedEvent event) {try {ZKRMStateStore.this.processWatchEvent(watchedZkClient, event);} catch (Throwable t) {LOG.error("Failed to process watcher event " + event + ": "+ StringUtils.stringifyException(t));}} }- 自動觸發主備選舉
監控到對應的 ZNode 被刪除的事件, 作出相應的操作:
switch (eventType) {case NodeDeleted:if (state == State.ACTIVE) {enterNeutralMode();}joinElectionInternal();break;case NodeDataChanged:monitorActiveStatus();break;default:LOG.debug("Unexpected node event: " + eventType + " for path: " + path);monitorActiveStatus(); } private void enterNeutralMode() {if (state != State.NEUTRAL) {LOG.debug("Entering neutral mode for " + this);state = State.NEUTRAL;appClient.enterNeutralMode();} }- 防止腦裂
Zookeeper 在工程實踐中經常發生 Zookeeper Client 假死, 導致 Zookeeper Client 到 Zookeeper Server 的心跳不能正常發出, 超過 Zookeeper Session Timeout 后, Zookeeper Server 會認為 Client 的 Session 已經過期而關閉 Session。 假死 可能引發分布式系統常說的雙主或腦裂(brain-split)現象。 導致 Zookeeper Client 假死 的原因可能是 ZK Client 正在進行 Full GC 或 Client 所在機器負載過高等。 Zookeeper 社區針對這種問題的解決方法是隔離, 將舊的 Active RM 隔離起來, 使其不能對外提供服務。
為實現隔離, ActiveStandbyElector 會創建一個 fencing 節點, 在 RM 中是 RM_ZK_FENCING_LOCK, 其實現類似于 ZKFC。 暫未去模擬腦裂的場景。
對 ActiveStandbyElector 主被選舉狀態變化的處理
當 ActiveStandbyElector 的貯備選舉狀態發生變化時, 會調用 EmbeddedElectorService 中注冊的回調函數進行相應的處理。
- 如果 ActiveStandbyElector 選主成功, 那么 ActiveStandbyElector 對應的 RM 成為 Active RM, ActiveStandbyElector 會回調 EmbeddedElectorService 的 becomeActive 方法。
- 如果 ActiveStandbyElector 選主失敗, 那么 EmbeddedElectorService 對應的 RM 成為 Standby RM, ActiveStandbyElector 會回調 EmbeddedElectorService 對應的 becomeStandby 方法。
轉載于:https://my.oschina.net/u/3034870/blog/3038612
總結
以上是生活随笔為你收集整理的Yarn ResourceManager High Availability的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python json模块使用详情
- 下一篇: 数据处理踩过的坑(不断更新):