日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kerberos体系下的应用(yarn,spark on yarn)

發布時間:2024/1/17 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kerberos体系下的应用(yarn,spark on yarn) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

kerberos 介紹

閱讀本文之前建議先預讀下面這篇博客
kerberos認證原理---講的非常細致,易懂

Kerberos實際上一個基于Ticket的認證方式。Client想要獲取Server端的資源,先得通過Server的認證;而認證的先決條件是ClientServer提供從KDC獲得的一個有ServerMaster Key進行加密的Session Ticket(Session Key + Client Info)

image.png

?

大體邏輯流程如下:

  • Client向KDC申請TGT(Ticket Granting Ticket)。
  • Client通過獲得TGT向KDC申請用于訪問Server的Ticket。
  • Client最終向為了Server對自己的認證向其提交Ticket。

kerberos 中的幾個概念

  • Principals

簡單的說, principals 類似于多用戶系統中的用戶名,每個server都對應一個 principals

principals由幾個部分的字符串構成。
例如:

component1 / component2 @ REALM

  • @ 后面是一個 principals 必不可少的部分 REALM,為大寫英文字符。
  • @ 前面是一個 principals 的具體身份,它可能由多個部分組成,使用/ 分割。
  • reborn@EXAMPLE.COM
    代表的是一個屬于EXAMPLE.COM領域的用戶reborn
    這類principals 我們稱之為 User Principals。
    還有一類我們稱之為 Service Principals。 它代表的不是具體的user,而是服務:
    yarn/ctum2f0302002.idc.xxx-group.net@IDC.XXX-GROUP.NET
    比如上面的這個, / 前面的部分為 yarn,說明它代表的是 yarn的服務,/ 后面的部分則是DNS域名,@后面的則是每個principals都必須有的 REALM

    上面所提及的 Client通過獲得TGT向KDC申請用于訪問Server的Ticket 就是通過 Service Principals 來向KDC 來申請Ticket的。

    • Keys 和 KeyTab

    每個 principals 都有自己的 Master key 用來證明自己就是 principals的擁有者。同時 在 ClientKDCServerTGTTicket加密。具體方式可才考開篇的 博客鏈接。
    一般來說,User Principals的 key是用戶密碼,Service Principals的key是隨機數串,他們都分別被存放在 KDC中一份,keytab 文件中一份。

    keytab文件就是一個密碼本,除非對該用戶重新生成keytab,否則這個文件是不會過期的,使用該keytab即可以登錄相應的principals

    獲取TGT

    從上面的概念上大家可以看出,為了訪問有kerberos認證的服務,作為Client首先要先向KDC發起請求獲取TGT 得到 KDC的授權,才繼而才能申請對 service 的Ticket。

    • kerberos client 的安裝
      Client 所在的機器環境必須是 kerberos client 環境,具體的安裝操作,網上有很多 �Installing Kerberos ,在安裝的過程中,最普遍出現的問題就是默認的加解密方式 jce不支持,解決方式網上也有java-jce-hadoop-kerberos 要么改變加解密方式,要么給jre打補丁

    • 使用命令行來獲取TGT環境
      這里列出幾個簡單的常用的命令:

      • kinit: 用來獲取TGT的命令, 可以使用密碼來向KDC申請,也可以直接使用keytab
      kinit wanghuan70 Password for wanghuan70@IDC.XXX-GROUP.NET: kinit -kt wanghuan70.keytab wanghuan70
      • kdestroy: 用來銷毀當前的tgt情況
      • klist: 用來展示當前的tgt情況

      如果當前還沒有申請TGT:

      klist klist: Credentials cache file '/tmp/krb5cc_2124' not found

      如果已經通過 kinit 申請過了TGT:

      -sh-4.2$ klist Ticket cache: FILE:/tmp/krb5cc_2124 Default principal: wanghuan70@IDC.XXX-GROUP.NETValid starting Expires Service principal 08/03/2017 09:31:52 08/11/2017 09:31:52 krbtgt/IDC.XXX- GROUP.NET@IDC.XXX-GROUP.NETrenew until 08/10/2017 09:31:52

      klist 中的信息展示的很詳細了,標明Client principal為 wanghuan70@IDC.XXX-GROUP.NET
      Service principal為 krbtgt/IDC.XXX- GROUP.NET@IDC.XXX-GROUP.NET
      這個 Service principal 實際上是 上圖中的 Tickt Granting Service(TGS)的principal。
      TGT是有時效性的,超過過期日期就不可以再使用,但是可以在 renew時間之前 使用

      klist -r

      來刷新。

    • 在代碼中登錄
      首先要保證的是運行代碼的機器上是有kerberos client 環境

      /*** User and group information for Hadoop.* This class wraps around a JAAS Subject and provides methods to determine the* user's username and groups. It supports both the Windows, Unix and Kerberos * login modules.*/ @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce", "HBase", "Hive", "Oozie"}) @InterfaceStability.Evolving public class UserGroupInformation {

      hadoop-common 的工程下提供了如上的 UserGroupInformation 用于用戶認證。我們在代碼中只需要調用 其中的api即可,簡單舉例子,我們想用 wanghuan70@IDC.XXX-GROUP.NET 這個 principal 來執行后續的代碼, 只需要調用如下api:

      UserGroupInformation.setConfiguration(configuration);System.setProperty("sun.security.krb5.debug", "true");UserGroupInformation.loginUserFromKeytab("wanghuan70", "/home/wanghuan70/wanghuan70.keytab");

      該api會改變當前環境下的tgt。
      如果我們想只對部分代碼使用另一個principal來執行,那么可以使用如下api,然后調用doAs執行:

      ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI("hbase" , "hbase.keytab");ugi.doAs(new PrivilegedExceptionAction<Void>() {@Overridepublic Void run() throws Exception {try {Connection connection = ConnectionFactory.createConnection(conf);Admin admin = connection.getAdmin();HTableDescriptor[] tables = admin.listTables();for (HTableDescriptor descriptor : tables) {HTable hTable = new HTable();hTable.setTableName(descriptor.getTableName().getNameAsString( ));Collection<HColumnDescriptor> families = descriptor.getFamilies();for (HColumnDescriptor family : families) {hTable.addFamily(family.getNameAsString());}hTables.addhTable(hTable);}} catch (Exception ex) {logger.info("list hbase table internal failed: %s", ex);throw new Exception(ex);}return null;}});

    在業務系統中訪問需要kerberos認證的服務

    這個比較簡單,如上節例子中可以看到,只需要改變當前進程環境下的tgt即可,可以使用 命令行也可以在代碼中實現。該部分暫時不討論 tgt的過期問題,后續會擴展分析。

    編寫yarn application提交到kerberos認證的集群中

    這類業務可能比較少,因為各種框架都自行實現了 xxx on yarn的代碼,比如 spark on yarn、flink on yarn。但是也有一些熱門的框架還沒有來得及實現on yarn。 如 tf on yarn,storm on datax on yarn ,datax on yarn或者apache twill。我們可以自己動手去完成一個 yarn application的工程,繼而可以推測 其他框架的on yarn是怎么去實現的。
    官網的參考文件如下:
    Hadoop: Writing YARN Applications
    YARN應用開發流程
    上述文章已經很詳細的講解了如何編寫 yarn application,我們再這里不再累述,而我們的關注點在于提交到kerberos認證的集群

    image.png

    在上面這個圖大概的描述了我們的 yarn application的邏輯流程,這里需要注意的是:

    • Client Node 需要使用 ApplicationClientProtocol(Client-RM之間的協議) 將應用提交給 RM。
    • AM 需要使用 ApplicationMasterProtocol(AM-RM之間的協議)向RM申請資源。
    • AM需要使用 ContainerManagementProtocol(AM-NM之間的協議)向NM發起啟動container的命令

    也就是說這三次的rpc通訊,我們的應用是需要與Yarn進行通訊的,在kerberos認證的系統中,換句話說,我們需要與yarn service 進行通訊的Ticket

    • Client Node 需要使用ClientNamenodeProtocol(DFSClient-HDFS協議)將應用需要的資源上傳到HDFS上;
    • AM (可能的操作)需要使用ClientNamenodeProtocol(DFSClient-HDFS協議)將資源下載下來;
    • Container 需要使用ClientNamenodeProtocol(DFSClient-HDFS協議)將資源下載下來;

    也就是說這三次的rpc通訊,我們的應用是需要與HDFS進行通訊的,在kerberos認證的系統中,換句話說,我們需要與hdfs service 進行通訊的Ticket

    還有一個問題需要注意的是,在應用中,我們發起RPC通訊 可能在不同的機器上這個時候如何進行構造相同的環境是我們這里需要表述的東西;

    • 從上面的鏈接我們可以知道,Client是如何提交Application到RM,代碼可如下:

      ApplicationId submitApplication(YarnClientApplication app,String appName,ContainerLaunchContext launchContext,Resource resource,String queue) throws Exception {ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();appContext.setApplicationName(appName);appContext.setApplicationTags(new HashSet<String>());appContext.setAMContainerSpec(launchContext);appContext.setResource(resource);appContext.setQueue(queue);return yarnClient.submitApplication(appContext);}

      Client調用 YarnClientApplication 向RM提交 ApplicationSubmissionContext
      這里包含了

      • 應用的名稱
      • 所依賴的資源
      • 提交的隊列
      • 還有一個重要的東西 ContainerLaunchContext 它是什么東西呢。
      /** * <p><code>ContainerLaunchContext</code> represents all of the information * needed by the <code>NodeManager</code> to launch a container.</p> * * <p>It includes details such as: * <ul> * <li>{@link ContainerId} of the container.</li> * <li>{@link Resource} allocated to the container.</li> * <li>User to whom the container is allocated.</li> * <li>Security tokens (if security is enabled).</li> * <li> * {@link LocalResource} necessary for running the container such * as binaries, jar, shared-objects, side-files etc. * </li> * <li>Optional, application-specific binary service data.</li> * <li>Environment variables for the launched process.</li> * <li>Command to launch the container.</li> * </ul> * </p> * * @see ContainerManagementProtocol#startContainers(org.apache.hadoop .yarn.api.protocolrecords.StartContainersRequest) */

      我們的ApplucationMaster 本身上也是在Container里面執行的,所以也有這個上下文,構造函數如下:

      public static ContainerLaunchContext newInstance(Map<String, LocalResource> localResources,Map<String, String> environment, List<String> commands,Map<String, ByteBuffer> serviceData, ByteBuffer tokens,Map<ApplicationAccessType, String> acls) {ContainerLaunchContext container =Records.newRecord(ContainerLaunchContext.class);container.setLocalResources(localResources);container.setEnvironment(environment);container.setCommands(commands);container.setServiceData(serviceData);container.setTokens(tokens);container.setApplicationACLs(acls);return container; }

      可以從構造函數來看到我們在設置Container中的環境、資源、執行命令等之外,還添加了 ByteBuffer tokens

      * Set security tokens needed by this container.* @param tokens security tokens */ @Public @Stable public abstract void setTokens(ByteBuffer tokens);

      沒錯! 這個tokens就是我們傳遞給container里面的安全信息。

      kerberos 和 Delegation token的關系需要說明一下,我們使用kerberos通過認證后,可以獲取一個帶有時效的委托token,如果我們把這個信息儲存起來,在token沒過期之前,使用這個token就可以直接連接服務,而無需再走kerberos那一套授權流程了。

      那這個值,我們Client是從哪里獲取并賦予給container的呢?

      /*** setup security token given current user* @return the ByeBuffer containing the security tokens* @throws IOException*/private ByteBuffer setupTokens(FileSystem fs) throws IOException {DataOutputBuffer buffer = new DataOutputBuffer();String loc = System.getenv().get("HADOOP_TOKEN_FILE_LOCATION");if ((loc != null && loc.trim().length() > 0)|| (!UserGroupInformation.isSecurityEnabled())) {this.credentials.writeTokenStorageToStream(buffer);} else {// Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduceCredentials credentials = new Credentials();String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);if (tokenRenewer == null || tokenRenewer.length() == 0) {throw new IOException("Can't get Master Kerberos principal for the RM to use as renewer");}// For now, only getting tokens for the default file-system.final org.apache.hadoop.security.token.Token<?> tokens[] = fs.addDelegationTokens(tokenRenewer, credentials);if (tokens != null) {for (org.apache.hadoop.security.token.Token<?> token : tokens) {LOG.info("Got dt for " + fs.getUri() + "; " + token);}}credentials.writeTokenStorageToStream(buffer);}return ByteBuffer.wrap(buffer.getData(), 0, buffer.getLength());}

      不同的 xx on yarn可能代碼寫法不同,但是,思路都是一致的:

      /*** Obtain all delegation tokens used by this FileSystem that are not* already present in the given Credentials. Existing tokens will neither* be verified as valid nor having the given renewer. Missing tokens will* be acquired and added to the given Credentials.* * Default Impl: works for simple fs with its own token* and also for an embedded fs whose tokens are those of its* children file system (i.e. the embedded fs has not tokens of its* own).* * @param renewer the user allowed to renew the delegation tokens* @param credentials cache in which to add new delegation tokens* @return list of new delegation tokens* @throws IOException*/ @InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce" }) public Token<?>[] addDelegationTokens( 17/08/03 15:48:49 INFO client.LaunchCluster: tokenRenewer is yarn/_HOST@IDC.WANDA-GROUP.NET 17/08/03 15:48:49 INFO hdfs.DFSClient: Created HDFS_DELEGATION_TOKEN token 762341 for wanghuan70 on ha-hdfs:nn-idc 17/08/03 15:48:49 INFO client.LaunchCluster: Got dt for hdfs://nn- idc; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:nn-idc, Ident: (HDFS_DELEGATION_TOKEN token 762341 for wanghuan70) 17/08/03 15:48:49 WARN token.Token: Cannot find class for token kind kms-dt 17/08/03 15:48:49 INFO client.LaunchCluster: Got dt for hdfs://nn- idc; Kind: kms-dt, Service: 10.214.129.150:16000, Ident: 00 0a 77 61 6e 67 68 75 61 6e 37 30 04 79 61 72 6e 00 8a 01 5d a7 14 e1 22 8a 01 5d cb 21 65 22 8d 0d d1 8e 8f d7

      我們這里是生成了訪問hdfs的Token HDFS_DELEGATION_TOKEN 以及 在hdfs上的 KMS的token,
      這里我們可以注意到,在上面的分析中,我們的AM也要去連接RM和NM,但是為什么這里沒有去生成Token呢。我們可以看一下AM里面的 ** UserGroupInformation**的狀態,我們通過在我們的 ApplicationMaster的啟動類中,加入如下代碼:

      LOG.info("isSecurityEnabled: {}", UserGroupInformation.getCurrentUser().isSecurityEnabled());LOG.info("isLoginKeytabBased: {}", UserGroupInformation.getCurrentUser().isLoginKeytabBased());LOG.info("isLoginTicketBased: {}", UserGroupInformation.getCurrentUser().isLoginTicketBased());LOG.info("userName: {}", UserGroupInformation.getCurrentUser().getUserName());for (org.apache.hadoop.security.token.Token<? extends TokenIdentifier> token : UserGroupInformation.getCurrentUser().getTokens()) {LOG.info("Token kind is " + token.getKind().toString()+ " and the token's service name is " + token.getService());}

      讓我們來看下AM端的日志:

    image.png

    可以看到 AM端的 初始UserGroupInformation是不帶要tgt的, 也就是說,沒辦法進行kerberos認證流程,AM端不管是與yarn還是 hdfs的通訊都應該是使用Token的。在圖片中Token列表中,我們看到出現了一個 名字叫 YARN_AM_RM_TOKEN ,這個并不是我們Client加進去的,但是可以確信的是AM使用該token與RM進行通訊,這個token哪里來的呢?

    帶著這個疑問,我們需要從Client開始扒拉一下代碼了,在client端我們使用 YarnClient 將我們的啟動的信息提交給了RM,這個YarnClient是經過kerberos認證的連接,那么我們可以看下RM端是怎么來處理這個 啟動ApplicationMaster請求的。我們提交給RM的是一個名叫ApplicationSubmissionContext, RM要從中創建出ContainerLaunchContext

    image.png

    這RM端的createAMContainerLaunchContext中,我們查到了我們的疑問之處,這里出現了

    // Finalize the containersetupTokens(container, containerID);

    進去看看這個方法做了什么?:

    image.png

    image.png

    我們看到了我們想要的東西,container中新的tokens除了我們老的ContainerLaunchContext中我們從client傳遞過來的tokens,還額外添加了AMRMToken,到了這里我們解決了我們上面的一個疑問:

    AM和RM通訊是使用Token來認證的,這個AMRMToken是RM端啟動am的container的時候加塞進來的。

    現在整理一下我們邏輯,啟動之后AM使用** YARN_AM_RM_TOKEN來和RM通訊,使用 HDFS_DELEGATION_TOKEN**來和hdfs filesystem通訊,那么,AM是怎么通知NN來啟動自己的 excutor的呢?不妨再看一下代碼。

    image.png

    上面的圖很明了了,nmTokens由RM提供給AM,在AM創建NMClientAsync的時候,

    image.png


    從單例 NMTokenCache 中獲取到 nmTokens來進行連接NN。

    ?

    到此,AM中的認證問題,我們已經整明白了,那邊由AM,啟動的其他的container的認證呢?,其實套路是一樣的!

    LOG.info("Launching a new container."+ ", containerId=" + container.getId()+ ", containerNode=" + container.getNodeId().getHost()+ ":" + container.getNodeId().getPort()+ ", containerNodeURI=" + container.getNodeHttpAddress()+ ", containerResourceMemory="+ container.getResource().getMemory()+ ", containerResourceVirtualCores="+ container.getResource().getVirtualCores()+ ", command: " + command);ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(localResources, env, Lists.newArrayList(command), null, setupTokens(), null);appMaster.addContainer(container);appMaster.getNMClientAsync().startContainerAsync(container, ctx);

    只需要把AM中的token做傳遞即可。

    長任務在kerberos系統中執行,以spark為例子

    什么是長任務? 就是long-running services,長時間運行的任務,可能是流也可以不是。
    那么為什么,要把長任務單獨來說呢,因為從上述的yarn應用的描述,我們知道,am和excutor中使用的是token來訪問hdfs和rm 的,token是有時效性的我們是知道的,那么,長時間運行,token肯定會失效,如果token失效的話,肯定就不能訪問hdfs了。所以這個就是 long-running 任務需要考慮的東西。
    spark on yarn模式,分為兩種: 一種是 yarn client模式,一種是yarn cluster模式。一般來說業務上都會使用yarn cluster模式來執行,但是隨著分析類工具的推出,比如zeppelin,jupter的使用, 常駐的yarn client 所以這兩種模式都很重要。為了把事情講清楚,我們兩種方式分類來說明,本章節源碼(1.6.0)通讀可以較多。

    yarn clientyarn cluter 說到底都是yarn application,那么client 和 cluster的區別到底區別在哪呢?-- Spark Driver是在本地運行還是在AM中來執行

    擴展閱讀
    過往記憶
    kerberos_and_hadoop

    yarn cluster 模式

    image.png

    • spark 使用SparkSubmitAction來提交作業

      image.png

      • prepareSubmitEnvironment 根據 master(YARN/STANDALONE/MESOS/LOCAL)和deployMode(CLIENT/CLUSTER)來得到我們需要執行的Class入口
      • runMain 通過反射執行childMainClass中的main函數,因為這里是 cluster模式,所在這里執行的并不是用戶的代碼,而是org.apache.spark.deploy.yarn.Client
    • Client里面執行的是編譯一個yarn application必要的步驟:

      image.png

      • 建立 yarnClient 用于和RM通信
      • 向RM申請一個newApp
      • 創建am需要的containerContext
      • 創建ApplicationSubmissionContext并提交,amClass為
      org.apache.spark.deploy.yarn.ApplicationMaster
      • Client完成;
    • ApplicationMaster

      • 啟動用戶的代碼線程:

      image.png

      • 當SparkContext、Driver初始化完成的時候,通過amClient向ResourceManager注冊ApplicationMaster

      image.png

      • 上面的邏輯是yarn application必須的步驟,我們注意來看看spark 如何來處理 token失效的:
      // If the credentials file config is present, we must periodically renew tokens. So create// a new AMDelegationTokenRenewerif (sparkConf.contains("spark.yarn.credentials.file")) {delegationTokenRenewerOption = Some(new AMDelegationTokenRenewer(sparkConf, yarnConf))// If a principal and keytab have been set, use that to create new credentials for executors// periodicallydelegationTokenRenewerOption.foreach(_.scheduleLoginFromKeytab())}

      1.如果用戶在提交應用的過程中,使用 --keytab 參數上傳了kerberos認證文件的話,AM里面會啟動一個線程專門用來處理,我們可以看看 AMDelegationTokenRenewer 里面都做了什么:

      private[spark] def scheduleLoginFromKeytab(): Unit = {val principal = sparkConf.get("spark.yarn.principal")val keytab = sparkConf.get("spark.yarn.keytab")/*** Schedule re-login and creation of new tokens. If tokens have already expired, this method* will synchronously create new ones.*/def scheduleRenewal(runnable: Runnable): Unit = {val credentials = UserGroupInformation.getCurrentUser.getCredentialsval renewalInterval = hadoopUtil.getTimeFromNowToRenewal(sparkConf, 0.75, credentials)// Run now!if (renewalInterval <= 0) {logInfo("HDFS tokens have expired, creating new tokens now.")runnable.run()} else {logInfo(s"Scheduling login from keytab in $renewalInterval millis.")delegationTokenRenewer.schedule(runnable, renewalInterval, TimeUnit.MILLISECONDS)}}// This thread periodically runs on the driver to update the delegation tokens on HDFS.val driverTokenRenewerRunnable =new Runnable {override def run(): Unit = {try {writeNewTokensToHDFS(principal, keytab)cleanupOldFiles()} catch {case e: Exception =>// Log the error and try to write new tokens back in an hourlogWarning("Failed to write out new credentials to HDFS, will try again in an " +"hour! If this happens too often tasks will fail.", e)delegationTokenRenewer.schedule(this, 1, TimeUnit.HOURS)return}scheduleRenewal(this)}}// Schedule update of credentials. This handles the case of updating the tokens right now// as well, since the renenwal interval will be 0, and the thread will get scheduled// immediately.scheduleRenewal(driverTokenRenewerRunnable) } private def writeNewTokensToHDFS(principal: String, keytab: String): Unit = {// Keytab is copied by YARN to the working directory of the AM, so full path is// not needed.// HACK:// HDFS will not issue new delegation tokens, if the Credentials object// passed in already has tokens for that FS even if the tokens are expired (it really only// checks if there are tokens for the service, and not if they are valid). So the only real// way to get new tokens is to make sure a different Credentials object is used each time to// get new tokens and then the new tokens are copied over the the current user's Credentials.// So:// - we login as a different user and get the UGI// - use that UGI to get the tokens (see doAs block below)// - copy the tokens over to the current user's credentials (this will overwrite the tokens// in the current user's Credentials object for this FS).// The login to KDC happens each time new tokens are required, but this is rare enough to not// have to worry about (like once every day or so). This makes this code clearer than having// to login and then relogin every time (the HDFS API may not relogin since we don't use this// UGI directly for HDFS communication.logInfo(s"Attempting to login to KDC using principal: $principal")val keytabLoggedInUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)logInfo("Successfully logged into KDC.")val tempCreds = keytabLoggedInUGI.getCredentialsval credentialsPath = new Path(credentialsFile)val dst = credentialsPath.getParentkeytabLoggedInUGI.doAs(new PrivilegedExceptionAction[Void] {// Get a copy of the credentialsoverride def run(): Void = {val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dsthadoopUtil.obtainTokensForNamenodes(nns, freshHadoopConf, tempCreds)null}})// Add the temp credentials back to the original ones.UserGroupInformation.getCurrentUser.addCredentials(tempCreds)val remoteFs = FileSystem.get(freshHadoopConf)// If lastCredentialsFileSuffix is 0, then the AM is either started or restarted. If the AM// was restarted, then the lastCredentialsFileSuffix might be > 0, so find the newest file// and update the lastCredentialsFileSuffix.if (lastCredentialsFileSuffix == 0) {hadoopUtil.listFilesSorted(remoteFs, credentialsPath.getParent,credentialsPath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION).lastOption.foreach { status =>lastCredentialsFileSuffix = hadoopUtil.getSuffixForCredentialsPath(status.getPath)}}val nextSuffix = lastCredentialsFileSuffix + 1val tokenPathStr =credentialsFile + SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM + nextSuffixval tokenPath = new Path(tokenPathStr)val tempTokenPath = new Path(tokenPathStr + SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)logInfo("Writing out delegation tokens to " + tempTokenPath.toString)val credentials = UserGroupInformation.getCurrentUser.getCredentialscredentials.writeTokenStorageFile(tempTokenPath, freshHadoopConf)logInfo(s"Delegation Tokens written out successfully. Renaming file to $tokenPathStr")remoteFs.rename(tempTokenPath, tokenPath)logInfo("Delegation token file rename complete.")lastCredentialsFileSuffix = nextSuffix }

      代碼很長,邏輯可以概括為如下:
      1.根據token時效判斷是否需要進行token刷新行為;
      2.使用hdfs上的keytab獲取新的tgt -- keytabLoggedInUGI
      3.在新的UserGroupInformation下,重新獲取新的 HDFS_DELEGATION_TOKEN 加到當前的 UserGroupInformation中,這里大家留意一下
      freshHadoopConf

      image.png


      我們后面緊接著會具體講 如何與hdfs通訊的時候分析一下https://issues.apache.org/jira/browse/HDFS-9276
      4.將新的token信息更新到hdfs目錄下。

      ?

      • Excutor的啟動的類為org.apache.spark.executor.org.apache.spark.executor

        image.png

        ?

        如果需要刷新token,excutor會啟動一個更新token程序

      def updateCredentialsIfRequired(): Unit = {try {val credentialsFilePath = new Path(credentialsFile)val remoteFs = FileSystem.get(freshHadoopConf)SparkHadoopUtil.get.listFilesSorted(remoteFs, credentialsFilePath.getParent,credentialsFilePath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION).lastOption.foreach { credentialsStatus =>val suffix = SparkHadoopUtil.get.getSuffixForCredentialsPath(credentialsStatus.getPath)if (suffix > lastCredentialsFileSuffix) {logInfo("Reading new delegation tokens from " + credentialsStatus.getPath)val newCredentials = getCredentialsFromHDFSFile(remoteFs, credentialsStatus.getPath)lastCredentialsFileSuffix = suffixUserGroupInformation.getCurrentUser.addCredentials(newCredentials)logInfo("Tokens updated from credentials file.")} else {// Check every hour to see if new credentials arrived.logInfo("Updated delegation tokens were expected, but the driver has not updated the " +"tokens yet, will check again in an hour.")delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, TimeUnit.HOURS)return}}val timeFromNowToRenewal =SparkHadoopUtil.get.getTimeFromNowToRenewal(sparkConf, 0.8, UserGroupInformation.getCurrentUser.getCredentials)if (timeFromNowToRenewal <= 0) {executorUpdaterRunnable.run()} else {logInfo(s"Scheduling token refresh from HDFS in $timeFromNowToRenewal millis.")delegationTokenRenewer.schedule(executorUpdaterRunnable, timeFromNowToRenewal, TimeUnit.MILLISECONDS)}} catch {// Since the file may get deleted while we are reading it, catch the Exception and come// back in an hour to try againcase NonFatal(e) =>logWarning("Error while trying to update credentials, will try again in 1 hour", e)delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, TimeUnit.HOURS)} }

      邏輯也很明了:

    • 從 hdfs相應目錄讀取由AM寫入的token文件信息;
    • 更新到自己的ugi中;
      這里也需要 對
    • ?

      image.png


      也和上述https://issues.apache.org/jira/browse/HDFS-9276有關。

      ?


    至此,實際上啟動的過程大概就是這樣,那么現在我們需要對我們關心的問題來具體分析:

    • 我們的應用是怎么連接到hdfs的?
      在hadoop api中提供 FileSystem 接口用于與各種文件系統進行連接,HDFS也不除外,其具體類為DistributedFileSystem,進入這個類,可以看到連接hdfs的客戶端

      image.png

      DEBUG [2017-07-28 13:24:46,255] ({main} DFSClient.java[<init>]:455) - dfs.client.use.legacy.blockreader.local = false DEBUG [2017-07-28 13:24:46,255] ({main} DFSClient.java[<init>]:458) - dfs.client.read.shortcircuit = false DEBUG [2017-07-28 13:24:46,256] ({main} DFSClient.java[<init>]:461) - dfs.client.domain.socket.data.traffic = false DEBUG [2017-07-28 13:24:46,256] ({main} DFSClient.java[<init>]:464) - dfs.domain.socket.path = /var/run/hdfs- sockets/dn DEBUG [2017-07-28 13:24:46,282] ({main} HAUtil.java[cloneDelegationTokenForLogicalUri]:329) - No HA service delegation token found for logical URI hdfs://nn-idc DEBUG [2017-07-28 13:24:46,282] ({main} DFSClient.java[<init>]:455) - dfs.client.use.legacy.blockreader.local = false DEBUG [2017-07-28 13:24:46,282] ({main} DFSClient.java[<init>]:458) - dfs.client.read.shortcircuit = false DEBUG [2017-07-28 13:24:46,283] ({main} DFSClient.java[<init>]:461) - dfs.client.domain.socket.data.traffic = false DEBUG [2017-07-28 13:24:46,283] ({main} DFSClient.java[<init>]:464) - dfs.domain.socket.path = /var/run/hdfs- sockets/dn DEBUG [2017-07-28 13:24:46,285] ({main} RetryUtils.java[getDefaultRetryPolicy]:75) - multipleLinearRandomRetry = null DEBUG [2017-07-28 13:24:46,290] ({main} ClientCache.java[getClient]:63) - getting client out of cache: org.apache.hadoop.ipc.Client@416b681c DEBUG [2017-07-28 13:24:46,514] ({main} NativeCodeLoader.java[<clinit>]:46) - Trying to load the custom-built native-hadoop library... DEBUG [2017-07-28 13:24:46,515] ({main} NativeCodeLoader.java[<clinit>]:50) - Loaded the native-hadoop library DEBUG [2017-07-28 13:24:46,520] ({Thread-36} DomainSocketWatcher.java[run]:453) - org.apache.hadoop.net.unix.DomainSocketWatcher$2@dbe5911: starting with interruptCheckPeriodMs = 60000 DEBUG [2017-07-28 13:24:46,524] ({main} DomainSocketFactory.java[<init>]:110) - Both short-circuit local reads and UNIX domain socket are disabled. DEBUG [2017-07-28 13:24:46,530] ({main} DataTransferSaslUtil.java[getSaslPropertiesResolver]:183) - DataTransferProtocol not using SaslPropertiesResolver, no QOP found in configuration for dfs.data.transfer.protection DEBUG [2017-07-28 13:24:46,534] ({main} Logging.scala[logDebug]:62) - delegation token renewer is: yarn/ctum2f0302002.idc.xxx-group.net@IDC.XXX-GROUP.NETINFO [2017-07-28 13:24:46,535] ({main} Logging.scala[logInfo]:58) - getting token for namenode: hdfs://nn- idc/user/wanghuan70/.sparkStaging/application_1499341382704_7 8490 DEBUG [2017-07-28 13:24:46,537] ({main} Client.java[<init>]:434) - The ping interval is 60000 ms. DEBUG [2017-07-28 13:24:46,537] ({main} Client.java[setupIOstreams]:704) - Connecting to ctum2f0302002.idc.xxx-group.net/10.214.128.51:8020 DEBUG [2017-07-28 13:24:46,538] ({main} UserGroupInformation.java[logPrivilegedAction]:1715) - PrivilegedAction as:wanghuan70@IDC.XXX-GROUP.NET (auth:KERBEROS) from:org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Cli ent.java:725) DEBUG [2017-07-28 13:24:46,539] ({main} SaslRpcClient.java[sendSaslMessage]:457) - Sending sasl message state: NEGOTIATEDEBUG [2017-07-28 13:24:46,541] ({main} SaslRpcClient.java[saslConnect]:389) - Received SASL message state: NEGOTIATE auths {method: "TOKEN"mechanism: "DIGEST-MD5"protocol: ""serverId: "default"challenge: "realm=\"default\",nonce=\"FsxK1F2sX0QvIYFTYdwpNFYlB+uCuXr x7se1tCAa\",qop=\"auth\",charset=utf-8,algorithm=md5-sess" } auths {method: "KERBEROS"mechanism: "GSSAPI"protocol: "hdfs"serverId: "ctum2f0302002.idc.xxx-group.net" }DEBUG [2017-07-28 13:24:46,541] ({main} SaslRpcClient.java[getServerToken]:264) - Get token info proto:interface org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB info:@org.apache.hadoop.security.token.TokenInfo(value=class org.apache.hadoop.hdfs.security.token.delegation.DelegationToken Selector) DEBUG [2017-07-28 13:24:46,542] ({main} SaslRpcClient.java[getServerPrincipal]:291) - Get kerberos info proto:interface org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB info:@org.apache.hadoop.security.KerberosInfo(clientPrincipal=, serverPrincipal=dfs.namenode.kerberos.principal) DEBUG [2017-07-28 13:24:46,545] ({main} SaslRpcClient.java[createSaslClient]:236) - RPC Server's Kerberos principal name for protocol=org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProto colPB is hdfs/ctum2f0302002.idc.wanda-group.net@IDC.XXX- GROUP.NETDEBUG [2017-07-28 13:24:46,546] ({main} SaslRpcClient.java[createSaslClient]:247) - Creating SASL GSSAPI(KERBEROS) client to authenticate to service at ctum2f0302002.idc.wanda-group.net DEBUG [2017-07-28 13:24:46,547] ({main} SaslRpcClient.java[selectSaslClient]:176) - Use KERBEROS authentication for protocol ClientNamenodeProtocolPB DEBUG [2017-07-28 13:24:46,564] ({main} SaslRpcClient.java[sendSaslMessage]:457) - Sending sasl message state: INITIATE

      這里摘錄了部分debug日志,這樣就很好的邏輯描述清楚了

    • DFSClient 通過 ClientNamenodeProtocolPB協議來和namenode建立聯系。底層RPC在簡歷連接的時候如果有token則使用token進行建立連接,如果沒有token再進行kerberos認證后建立連接。
    • ?

      image.png


      在dfsclient中使用 DelegationTokenSelector來選取即id為 HDFS_DELEGATION_TOKEN的token。在我們沒有使用

      ?

      YarnSparkHadoopUtil.get.obtainTokensForNamenodes(nns, hadoopConf, credentials)

      當前的UGI中是不能使用token進行連接的。
      在初始化 DFSClient 中,使用的 dfs.client.failover.proxy.provider 是 org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider 在構造過程中會調用

      // The client may have a delegation token set for the logical// URI of the cluster. Clone this token to apply to each of the// underlying IPC addresses so that the IPC code can find it.HAUtil.cloneDelegationTokenForLogicalUri(ugi, uri, addressesOfNns);

      這里的作用在 HA mode下很重要,在HA mode的形式下,我們使用 obtainTokensForNamenodes 生成的 token的 service name 為 ha-hdfs:nn-idc

      DFSClient.java[getDelegationToken]:1066) - Created HDFS_DELEGATION_TOKEN token 735040 for wanghuan70 on ha-hdfs:nn-idc

      但是呢,在rpc連接的時候,使用的host或者ip加port的 service name來尋找 token的,換句話說,即時我們獲取了token,saslRPC在連接的時候也找不到,這里就是使用 HAUtil.java[cloneDelegationTokenForLogicalUri]:329) 將 service name為ha-hdfs:nn-idc 拷貝成和 ip對應的token,這樣
      saslRPC才可以順利使用token。但是要注意的是 只有在DFSClient初始化過程中,才會進行這個token的拷貝。 可是呢,

      ?

      image.png


      在獲取 FileSystem的時候,默認的情況下,這個實例會被cache的,也就是說,DFSClient就不會初始化了,我們更新的token就不會使用 HAUtil.java[cloneDelegationTokenForLogicalUri]:329) 將 service name為ha-hdfs:nn-idc 拷貝成和 ip對應的token,這樣即使這樣
      saslRPC使用仍然是老token,就會過期掉,這就是 https://issues.apache.org/jira/browse/HDFS-9276的描述的問題。針對這個問題,hadoop版本升級后可以修復,還有一個方法就是,如果不cache的話,就會調用 DFSClient 初始化方法,所以,我們可以設置這個默認參數為 true

      ?

    • spark的excutor并不一定一開始就是給定的,是動態的增加的,也就是說一個長應用的AM可能在很長的一段時間內都會和 RM通訊,我們回顧一下上面的內容,我們知道AMRMToken是RM在啟動AM的時候下發的,而且,我們在刷新機制中,僅僅刷新了HDFS_DELEGATION_TOKEN,那邊怎么來處理AMRMToken過期呢,這spark里面其實并沒有在對此做處理,為什么呢?

    建立的saslRPC連接只有空閑時間超過10s中,連接才會被關閉,如果我們的AM保持著對RM的心跳,也就不需要重新與RM建立連接(讀者可以推演一下RM發生準備切換的情景)。

    yarn client 模式

    image.png

    這里只講一下和 yarn cluster的不同之處:

    • 因為Spark Driver是在本地執行,所以在使用SparkSubmit提交的時候 runMain 通過反射執行childMainClass中的main函數,這里的childMainClass 是用戶的代碼。

    • SparkContext生成的過程,根據提交方式,使用YarnClientSchedulerBackend來調度

      image.png

    • 因為用戶的代碼已經本地啟動了,那么啟動的AM里面執行什么呢?
      什么業務代碼都不執行,只負責向RM申請資源。

    • Driver 因為需要獲悉application的執行情況,啟動了一個監控線程,每1s鐘向RM咨詢一次狀態,也不需要刷新token

      image.png


    我們上面所說的 hdfs的token刷新都是在用戶使用 --keytab的方式提交的,如果不是以這種方式提交的長任務,token肯定會失效,會報錯。



    作者:PunyGod
    鏈接:https://www.jianshu.com/p/ae5a3f39a9af
    來源:簡書
    簡書著作權歸作者所有,任何形式的轉載都請聯系作者獲得授權并注明出處。

    總結

    以上是生活随笔為你收集整理的kerberos体系下的应用(yarn,spark on yarn)的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。