Spark详解(四):Spark组件以及消息通信原理
1. Spark核心基本概念
- Application(應(yīng)用程序):指用戶編寫的Spark應(yīng)用程序,包含驅(qū)動程序(Driver)和分布在集群中多個節(jié)點(diǎn)之上的Executor代碼,在執(zhí)行過程中由一個或多個作業(yè)組成。
- Driver(驅(qū)動程序):Spark中的Driver即運(yùn)行上述Application的main函數(shù)并且創(chuàng)建SparkContext,其中創(chuàng)建SparkContext的目的是為了準(zhǔn)備Spark應(yīng)用程序的運(yùn)行環(huán)境。在Spark中由SparkContext負(fù)責(zé)與ClusterManager通信,進(jìn)行資源申請、任務(wù)的分配和監(jiān)控等;當(dāng)Executor部分運(yùn)行完成時候,Driver負(fù)責(zé)將SparkContext關(guān)閉。通常用SparkContext 代表Driver。
- Cluster Manager(集群資源管理器):指在集群上獲取資源的外部服務(wù),目前有以下幾種。
- Standalane:Spark原生的資源管理,是由Master負(fù)責(zé)資源的管理。
- Hadoop Yarn:由Yarn中的ResourceManager負(fù)責(zé)資源的管理。
- Mesos:由Mesos中的Mesos Master負(fù)責(zé)資源管理
- Worker(工作節(jié)點(diǎn)):集群中任何可以運(yùn)行Application代碼的節(jié)點(diǎn),類似于YARN
的NodeManager - Master(總控進(jìn)程):Spark Standalane運(yùn)行模型下的主節(jié)點(diǎn),負(fù)責(zé)資源管理和分配資源來運(yùn)行Spark Application。
- Exeuctor(執(zhí)行進(jìn)程):Application運(yùn)行Worker節(jié)點(diǎn)上的一個進(jìn)程,該進(jìn)程負(fù)責(zé)運(yùn)行Task,并負(fù)責(zé)將數(shù)據(jù)存在內(nèi)存或者磁盤上,每個Application都有各自一批的Executor。在Spark On Yarn 模式下,其進(jìn)程名字為CoarseGraniedExecutorBackend,類似于Hadoop MapReduce中的Yarn Child。
2. 消息通信原理
2.1 Spark消息通信架構(gòu)
在Spark中定義了通信框架接口,這些接口實(shí)現(xiàn)中調(diào)用Netty的具體方法(在Spark 2.0版本之前使用的是Akka)。在框架中以RpcEndPoint和RpcEndPointRef實(shí)現(xiàn)了Actor和ActorRef的相關(guān)動作,其中RpcEndPointRef是RpcEndPoint的引用,在消息通信中消息發(fā)送方持有引用RpcEndPointRef,它們之間的關(guān)系如下圖所示:
通信框架使用了工廠設(shè)計(jì)模式實(shí)現(xiàn),這種設(shè)計(jì)方式實(shí)現(xiàn)了對Netty的解耦,能夠根據(jù)需要引入其他的消息通信工具。
具體的實(shí)現(xiàn)步驟如下:首先定義了RpcEnv和RpcEnvFactory兩個抽象類,在RpcEnv中定義了RPC通信框架的啟動、停止和關(guān)閉等抽象方法,在RpcEnvFactory中定義了創(chuàng)建抽象方法。然后在NettyRpcEnv和NettoyRpcEnvFactory類使用Netty對繼承的方法進(jìn)行了實(shí)現(xiàn)。
在各個模塊中的實(shí)現(xiàn),如Master和Worker等,會先使用RpcEnv的靜態(tài)方法創(chuàng)建RpcEnv實(shí)例,然后實(shí)例化Master,由于Master繼承與ThreadSafeRpcEndPoin,創(chuàng)建的Master實(shí)例是一個線程安全的終端點(diǎn),接著調(diào)用RpcEnv的啟動終端點(diǎn)方法,把Master的終端點(diǎn)和其對應(yīng)的引用注冊到RpcEnv中。在消息通信中,其他對象只需要獲取到了Master終端點(diǎn)的引用,就能發(fā)送消息給Master進(jìn)行通信。下面是Master.scala中的startRpcEnvAndEndPoint方法:
/*** Start the Master and return a three tuple of:* (1) The Master RpcEnv* (2) The web UI bound port* (3) The REST server bound port, if any*/def startRpcEnvAndEndpoint(host: String,port: Int,webUiPort: Int,conf: SparkConf): (RpcEnv, Int, Option[Int]) = {val securityMgr = new SecurityManager(conf)val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)(rpcEnv, portsResponse.webUIPort, portsResponse.restPort)}2.2 Spark啟動消息通信
Spark啟動過程中主要是進(jìn)行Master和Worker之間的通信,其消息發(fā)送關(guān)系如下,首先由worker節(jié)點(diǎn)向Master發(fā)送注冊消息,然后Master處理完畢后,返回注冊成功消息或失敗消息。
其詳細(xì)過程如下:
(1) 當(dāng)Master啟動后,隨之啟動各Worker,Worker啟動時會創(chuàng)建通信環(huán)境RpcEnv和終端點(diǎn)EndPoint,并向Master發(fā)送注冊Worker的消息RegisterWorker.Worker.tryRegisterAllMasters方法如下:
// 因?yàn)镸aster可能不止一個 private def tryRegisterAllMasters(): Array[JFuture[_]] = {masterRpcAddresses.map { masterAddress =>registerMasterThreadPool.submit(new Runnable {override def run(): Unit = {try {logInfo("Connecting to master " + masterAddress + "...")// 獲取Master終端點(diǎn)的引用val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)registerWithMaster(masterEndpoint)} catch {}... }private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {// 根據(jù)Master節(jié)點(diǎn)的引用發(fā)送注冊信息masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(workerId, host, port, self, cores, memory, workerWebUiUrl)).onComplete {// 返回注冊成功或失敗的結(jié)果// This is a very fast action so we can use "ThreadUtils.sameThread"case Success(msg) =>Utils.tryLogNonFatalError {handleRegisterResponse(msg)}case Failure(e) =>logError(s"Cannot register with master: ${masterEndpoint.address}", e)System.exit(1)}(ThreadUtils.sameThread)}(2) Master收到消息后,需要對Worker發(fā)送的信息進(jìn)行驗(yàn)證、記錄。如果注冊成功,則發(fā)送RegisteredWorker消息給對應(yīng)的Worker,告訴Worker已經(jīng)完成注冊,
隨之進(jìn)行步驟3,即Worker定期發(fā)送心跳給Master;如果注冊過程中失敗,則會發(fā)送RegisterWorkerFailed消息,Woker打印出錯日志并結(jié)束Worker啟動。Master.receiverAndReply方法如下:
(3) 當(dāng)Worker接收到注冊成功后,會定時發(fā)送心跳信息Heartbeat給Master,以便Master了解Worker的實(shí)時狀態(tài)。間隔時間可以在spark.worker.timeout中設(shè)置,注意,該設(shè)置值的1/4為心跳間隔。
2.3 Spark運(yùn)行時消息通信
用戶提交應(yīng)用程序時,應(yīng)用程序的SparkContext會向Master發(fā)送注冊應(yīng)用信息,并由Master給該應(yīng)用分配Executor,Executor啟動后會向SparkContext發(fā)送注冊成功消息;
當(dāng)SparkContext的RDD觸發(fā)行動操作后,通過DAGScheduler進(jìn)行劃分stage,并將stage轉(zhuǎn)化為TaskSet,接著由TaskScheduler向注冊的Executor發(fā)送執(zhí)行消息,Executor接收到任務(wù)消息后啟動并運(yùn)行;最后當(dāng)所有任務(wù)運(yùn)行時,由Driver處理結(jié)果并回收資源。如下圖所示:
Spark啟動過程中主要是進(jìn)行Master和Worker之間的通信,其消息發(fā)送關(guān)系如下,首先由worker節(jié)點(diǎn)向Master發(fā)送注冊消息,然后Master處理完畢后,返回注冊成功消息或失敗消息。
其詳細(xì)過程如下:
(1) 在SparkContext創(chuàng)建過程中會先實(shí)例化SchedulerBackend對象,standalone模式中實(shí)際創(chuàng)建的是StandaloneSchedulerBackend對象,在該對象啟動過程中會繼承父類DriverEndpoint和創(chuàng)建StandaloneAppClient的ClientEndpoint兩個終端點(diǎn)。
在ClientEndpoint的tryRegisterAllMasters方法中創(chuàng)建注冊線程池registerMasterThreadPool, 在該線程池中啟動注冊線程并向Master發(fā)送RegisterApplication注冊應(yīng)用的消息,代碼如下:
/*** Register with all masters asynchronously and returns an array `Future`s for cancellation.*/private def tryRegisterAllMasters(): Array[JFuture[_]] = {// 遍歷所有的Master, 這是一個for推導(dǎo)式,會構(gòu)造會一個集合for (masterAddress <- masterRpcAddresses) yield {// 在線程池中啟動注冊線程,當(dāng)該線程讀到應(yīng)用注冊成功標(biāo)識registered==true時退出注冊線程registerMasterThreadPool.submit(new Runnable {override def run(): Unit = try {if (registered.get) { // private val registered = new AtomicBoolean(false) 原子類型return}logInfo("Connecting to master " + masterAddress.toSparkURL + "...")val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)// 發(fā)送注冊消息masterRef.send(RegisterApplication(appDescription, self))} catch {case ie: InterruptedException => // Cancelledcase NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)}})}}當(dāng)Master接收到注冊應(yīng)用消息時,在registerApplication方法中記錄應(yīng)用信息并把該應(yīng)用加入到等待運(yùn)行列表中,發(fā)送注冊成功消息
RegisteredApplication給ClientEndpoint,同時調(diào)用startExecutorsOnWorkers方法運(yùn)行應(yīng)用。Master.startExecutorsOnWorkers方法代碼如下:
(2) StandaloneAppClient.ClientEndpoint接收到Master發(fā)送的RegisteredApplication消息,需要把注冊標(biāo)識registered置為true。代碼如下:
case RegisteredApplication(appId_, masterRef) =>appId.set(appId_)registered.set(true)master = Some(masterRef)listener.connected(appId.get)(3) 在Master類的starExecutorsOnWorkers方法中分配資源運(yùn)行應(yīng)用程序時,調(diào)用allocateWorkerResourceToExecutors方法實(shí)現(xiàn)在Worker中啟動Executor。當(dāng)
Worker收到Master發(fā)送過來的LaunchExecutor消息,先實(shí)例化ExecutorRunner對象,在ExecutorRunner啟動中會創(chuàng)建進(jìn)程生成器ProcessBuilder, 然后由該生成器使用command創(chuàng)建CoarseGrainedExecutorBackend對象,該對象是Executor運(yùn)行的容器,最后Worker發(fā)送ExecutorStateChanged消息給Master,通知Executor容器已經(jīng)創(chuàng)建完畢。
在ExecutorRunner創(chuàng)建中調(diào)用了fetchAndRunExecutor方法進(jìn)行實(shí)現(xiàn),在該方法中command內(nèi)容在StandaloneSchedulerBackend中定義,指定構(gòu)造Executor運(yùn)行容器CoarseGrainedExecutorBackend,
代碼如下:
(4) Master接收到Worker發(fā)送的ExecutorStateChanged消息,代碼如下:
case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>// 找到executor對應(yīng)的app,然后flatMap,通過app內(nèi)部的緩存獲取executor信息val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))execOption match {case Some(exec) =>// 設(shè)置executor的當(dāng)前狀態(tài)val appInfo = idToApp(appId)val oldState = exec.stateexec.state = stateif (state == ExecutorState.RUNNING) {assert(oldState == ExecutorState.LAUNCHING,s"executor $execId state transfer from $oldState to RUNNING is illegal")appInfo.resetRetryCount()}// 向Driver發(fā)送ExecutorUpdated消息exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false))...(5) 在3中的CoarseGrainedExecutorBackend啟動方法onStart中,會發(fā)送注冊Executor消息RegisterExecutor給DriverEndpoint,DriverEndpoint先判斷該Executor是否已經(jīng)注冊,在makeOffers()方法
中分配運(yùn)行任務(wù)資源,最后發(fā)送LaunchTask消息執(zhí)行任務(wù)。
(6) CoarseGrainedExecutorBackend接收到Executor注冊成功RegisteredExecutor消息時,在CoarseGrainedExecutorBackend容器中實(shí)例化
Executor對象。啟動完畢后,會定時向Driver發(fā)送心跳信息, 等待接收從DriverEndpoint發(fā)送執(zhí)行任務(wù)的消息。CoarseGrainedExecutorBackend處理注冊成功代碼如下:
(7) CoarseGrainedExecutorBackend的Executor啟動后接收從DriverEndpoint發(fā)送的LaunchTask執(zhí)行任務(wù)消息,任務(wù)執(zhí)行是在Executor的launchTask方法實(shí)現(xiàn)的。在執(zhí)行時會創(chuàng)建TaskRunner進(jìn)程,由該進(jìn)程進(jìn)行任務(wù)處理,
處理完畢后發(fā)送StateUpdate消息返回給CoarseGrainedExecutorBackend。任務(wù)執(zhí)行和獲取結(jié)果見后?
(8) 在TaskRunner執(zhí)行任務(wù)完成時,會向DriverEndpoint發(fā)送StatusUpdate消息,DriverEndpoint接收到消息會調(diào)用TaskSchedulerImpl的statusUpdate方法,根據(jù)任務(wù)執(zhí)行不同的結(jié)果處理,處理完畢后再給該Executor分配執(zhí)行任務(wù)。代碼如下:
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {var failedExecutor: Option[String] = Nonevar reason: Option[ExecutorLossReason] = Nonesynchronized {try {taskIdToTaskSetManager.get(tid) match {case Some(taskSet) =>if (state == TaskState.LOST) {// TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode,// where each executor corresponds to a single task, so mark the executor as failed.val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException("taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)"))if (executorIdToRunningTaskIds.contains(execId)) {reason = Some(SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))removeExecutor(execId, reason.get)failedExecutor = Some(execId)}}// 調(diào)用TaskSchedulerImpl的statusUpdate方法,根據(jù)任務(wù)執(zhí)行不同的結(jié)果處理if (TaskState.isFinished(state)) {cleanupTaskState(tid)taskSet.removeRunningTask(tid)if (state == TaskState.FINISHED) {// 任務(wù)執(zhí)行成功后,回收該Executor運(yùn)行該任務(wù)的CPU,再根據(jù)實(shí)際情況分配任務(wù)taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)}}case None =>logError(("Ignoring update with state %s for TID %s because its task set is gone (this is " +"likely the result of receiving duplicate task finished status updates) or its " +"executor has been marked as failed.").format(state, tid))}} catch {case e: Exception => logError("Exception in statusUpdate", e)}}// Update the DAGScheduler without holding a lock on this, since that can deadlockif (failedExecutor.isDefined) {assert(reason.isDefined)dagScheduler.executorLost(failedExecutor.get, reason.get)backend.reviveOffers()}}總結(jié)
以上是生活随笔為你收集整理的Spark详解(四):Spark组件以及消息通信原理的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark详解(三):Spark编程模型
- 下一篇: Spark详解(五):Spark作业执行