日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark修炼之道(高级篇)——Spark源码阅读:第六节 Task提交

發布時間:2023/12/20 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark修炼之道(高级篇)——Spark源码阅读:第六节 Task提交 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Task提交

在上一節中的 Stage提交中我們提到,最終stage被封裝成TaskSet,使用taskScheduler.submitTasks提交,具體代碼如下:

taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties))

Stage由一系列的tasks組成,這些task被封裝成TaskSet,TaskSet類定義如下:

/*** A set of tasks submitted together to the low-level TaskScheduler, usually representing* missing partitions of a particular stage.*/ private[spark] class TaskSet(val tasks: Array[Task[_]],val stageId: Int,val stageAttemptId: Int,val priority: Int,val properties: Properties) {val id: String = stageId + "." + stageAttemptIdoverride def toString: String = "TaskSet " + id }

submitTasks方法定義在TaskScheduler Trait當中,目前TaskScheduler 只有一個子類TaskSchedulerImpl,其submitTasks方法源碼如下:

//TaskSchedulerImpl類中的submitTasks方法 override def submitTasks(taskSet: TaskSet) {val tasks = taskSet.taskslogInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")this.synchronized {//創建TaskSetManager,TaskSetManager用于對TaskSet中的Task進行調度,包括跟蹤Task的運行、Task失敗重試等val manager = createTaskSetManager(taskSet, maxTaskFailures)val stage = taskSet.stageIdval stageTaskSets =taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])stageTaskSets(taskSet.stageAttemptId) = managerval conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>ts.taskSet != taskSet && !ts.isZombie}if (conflictingTaskSet) {throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")}//schedulableBuilder中添加TaskSetManager,用于完成所有TaskSet的調度,即整個Spark程序生成的DAG圖對應Stage的TaskSet調度schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)if (!isLocal && !hasReceivedTask) {starvationTimer.scheduleAtFixedRate(new TimerTask() {override def run() {if (!hasLaunchedTask) {logWarning("Initial job has not accepted any resources; " +"check your cluster UI to ensure that workers are registered " +"and have sufficient resources")} else {this.cancel()}}}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)}hasReceivedTask = true}//為Task分配運行資源backend.reviveOffers()}

SchedulerBackend有多種實現,如下圖所示:

我們以SparkDeploySchedulerBackend為例進行說明,SparkDeploySchedulerBackend繼承自CoarseGrainedSchedulerBackend中的reviveOffers方法,具有代碼如下:

//CoarseGrainedSchedulerBackend中定義的reviveOffers方法override def reviveOffers() {//driverEndpoint發送ReviveOffers消息,由DriverEndPoint接受處理driverEndpoint.send(ReviveOffers)}

driverEndpoint的類型是RpcEndpointRef

//CoarseGrainedSchedulerBackend中的成員變量driverEndpoint var driverEndpoint: RpcEndpointRef = null

它具有如下定義形式:

//RpcEndpointRef是遠程RpcEndpoint的引用,它是一個抽象類,有一個子類AkkaRpcEndpointRef /*** A reference for a remote [[RpcEndpoint]]. [[RpcEndpointRef]] is thread-safe.*/ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)extends Serializable with Logging //在底層采用的是Akka進行實現 private[akka] class AkkaRpcEndpointRef(@transient defaultAddress: RpcAddress,@transient _actorRef: => ActorRef,@transient conf: SparkConf,@transient initInConstructor: Boolean = true)extends RpcEndpointRef(conf) with Logging {lazy val actorRef = _actorRefoverride lazy val address: RpcAddress = {val akkaAddress = actorRef.path.addressRpcAddress(akkaAddress.host.getOrElse(defaultAddress.host),akkaAddress.port.getOrElse(defaultAddress.port))}override lazy val name: String = actorRef.path.nameprivate[akka] def init(): Unit = {// Initialize the lazy valsactorRefaddressname}if (initInConstructor) {init()}override def send(message: Any): Unit = {actorRef ! AkkaMessage(message, false)} //其它代碼省略

DriverEndpoint中的receive方法接收driverEndpoint.send(ReviveOffers)發來的消息,DriverEndpoint繼承了ThreadSafeRpcEndpoint trait,具體如下:

class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])extends ThreadSafeRpcEndpoint with Logging

ThreadSafeRpcEndpoint 繼承 RpcEndpoint trait,RpcEndpoint對receive方法進行了描述,具體如下:

/*** Process messages from [[RpcEndpointRef.send]] or [[RpcCallContext.reply)]]. If receiving a* unmatched message, [[SparkException]] will be thrown and sent to `onError`.*/def receive: PartialFunction[Any, Unit] = {case _ => throw new SparkException(self + " does not implement 'receive'")}

DriverEndpoint 中的對其receive方法進行了重寫,具體實現如下:

override def receive: PartialFunction[Any, Unit] = {case StatusUpdate(executorId, taskId, state, data) =>scheduler.statusUpdate(taskId, state, data.value)if (TaskState.isFinished(state)) {executorDataMap.get(executorId) match {case Some(executorInfo) =>executorInfo.freeCores += scheduler.CPUS_PER_TASKmakeOffers(executorId)case None =>// Ignoring the update since we don't know about the executor.logWarning(s"Ignored task status update ($taskId state $state) " +s"from unknown executor with ID $executorId")}}//重要!處理發送來的ReviveOffers消息case ReviveOffers =>makeOffers()case KillTask(taskId, executorId, interruptThread) =>executorDataMap.get(executorId) match {case Some(executorInfo) =>executorInfo.executorEndpoint.send(KillTask(taskId, executorId, interruptThread))case None =>// Ignoring the task kill since the executor is not registered.logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")}}

從上面的代碼可以看到,處理ReviveOffers消息時,調用的是makeOffers方法

// Make fake resource offers on all executorsprivate def makeOffers() {// Filter out executors under killing//所有可用的Executorval activeExecutors = executorDataMap.filterKeys(!executorsPendingToRemove.contains(_))//WorkOffer表示Executor上可用的資源,val workOffers = activeExecutors.map { case (id, executorData) =>new WorkerOffer(id, executorData.executorHost, executorData.freeCores)}.toSeq//先調用TaskSchedulerImpl的resourceOffers方法,為Task的運行分配資源//再調用CoarseGrainedSchedulerBackend中的launchTasks方法啟動Task的運行,最終Task被提交到Worker節點上的Executor上運行launchTasks(scheduler.resourceOffers(workOffers))}

上面的代碼邏輯全部是在Driver端進行的,調用完launchTasks方法后,Task的執行便在Worker節點上運行了,至此完成Task的提交。
關于resourceOffers方法及launchTasks方法的具體內容,在后續章節中將進行進一步的解析。

總結

以上是生活随笔為你收集整理的Spark修炼之道(高级篇)——Spark源码阅读:第六节 Task提交的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。