日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

spark源码分析之Executor启动与任务提交篇

發布時間:2025/3/8 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark源码分析之Executor启动与任务提交篇 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

任務提交流程

概述

在闡明了Spark的Master的啟動流程與Worker啟動流程。接下繼續執行的就是Worker上的Executor進程了,本文繼續分析整個Executor的啟動與任務提交流程

Spark-submit

提交一個任務到集群通過的是Spark-submit
通過啟動腳本的方式啟動它的主類,這里以WordCount為例子
spark-submit --class cn.apache.spark.WordCount

  • bin/spark-clas -> org.apache.spark.deploy.SparkSubmit 調用這個類的main方法
  • doRunMain方法中傳進來一個自定義spark應用程序的main方法
    class cn.apache.spark.WordCount
  • 通過反射拿到類的實例的引用mainClass = Utils.classForName(childMainClass)
  • 在通過反射調用class cn.apache.spark.WordCount的main方法
  • 我們來看SparkSubmit的main方法

    def main(args: Array[String]): Unit = {val appArgs = new SparkSubmitArguments(args)if (appArgs.verbose) {printStream.println(appArgs)}//匹配任務類型appArgs.action match {case SparkSubmitAction.SUBMIT => submit(appArgs)case SparkSubmitAction.KILL => kill(appArgs)case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)}}

    這里的類型是submit,調用submit方法

    private[spark] def submit(args: SparkSubmitArguments): Unit = {val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)def doRunMain(): Unit = {。。。。。。try {proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {override def run(): Unit = {//childMainClass這個你自己定義的App的main所在的全類名runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)}})} catch {。。。。。。 }} 。。。。。。。//掉用上面的doRunMaindoRunMain()}

    submit里調用了doRunMain(),然后調用了runMain,來看runMain

    private def runMain(。。。。。。try {//通過反射mainClass = Class.forName(childMainClass, true, loader)} catch {。。。。。。}//反射拿到面方法實例val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)if (!Modifier.isStatic(mainMethod.getModifiers)) {throw new IllegalStateException("The main method in the given main class must be static")}。。。。。。try {//調用App的main方法mainMethod.invoke(null, childArgs.toArray)} catch {case t: Throwable =>throw findCause(t)}}

    最主要的流程就在這里了,上面的代碼注釋很清楚,通過反射調用我們寫的類的main方法,大體的流程到此

    SparkSubmit時序圖

    Executor啟動流程

    SparkSubmit通過反射調用了我們程序的main方法后,就開始執行我們的代碼
    ,一個Spark程序中需要創建SparkContext對象,我們就從這個對象開始

    SparkContext的構造方法代碼很長,主要關注的地方如下

    class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {。。。。。。private[spark] def createSparkEnv(conf: SparkConf,isLocal: Boolean,listenerBus: LiveListenerBus): SparkEnv = {//通過SparkEnv來創建createDriverEnvSparkEnv.createDriverEnv(conf, isLocal, listenerBus)}//在這里調用了createSparkEnv,返回一個SparkEnv對象,這個對象里面有很多重要屬性,最重要的ActorSystemprivate[spark] val env = createSparkEnv(conf, isLocal, listenerBus)SparkEnv.set(env)//創建taskScheduler// Create and start the schedulerprivate[spark] var (schedulerBackend, taskScheduler) =SparkContext.createTaskScheduler(this, master)//創建DAGSchedulerdagScheduler = new DAGScheduler(this)//啟動TaksSchedulertaskScheduler.start()。。。。。 }

    Spark的構造方法主要干三件事,創建了一個SparkEnv,taskScheduler,dagScheduler,我們先來看createTaskScheduler里干了什么

    //通過給定的URL創建TaskSchedulerprivate def createTaskScheduler(.....//匹配URL選擇不同的方式master match {。。。。。。//這個是Spark的Standalone模式case SPARK_REGEX(sparkUrl) =>//首先創建TaskSchedulerval scheduler = new TaskSchedulerImpl(sc)val masterUrls = sparkUrl.split(",").map("spark://" + _)//很重要val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)//初始化了一個調度器,默認是FIFOscheduler.initialize(backend)(backend, scheduler)。。。。。} }

    通過master的url來匹配到Standalone模式:然后初始化了SparkDeploySchedulerBackendTaskSchedulerImpl,這兩個對象很重要,是啟動任務調度的核心,然后調用了scheduler.initialize(backend)進行初始化

    啟動TaksScheduler初始化完成,回到我們的SparkContext構造方法后面繼續調用了
    taskScheduler.start() 啟動TaksScheduler
    來看start方法

    override def start() {//調用backend的實現的start方法backend.start()if (!isLocal && conf.getBoolean("spark.speculation", false)) {logInfo("Starting speculative execution thread")import sc.env.actorSystem.dispatchersc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,SPECULATION_INTERVAL milliseconds) {Utils.tryOrExit { checkSpeculatableTasks() }}}}

    這里的backend是SparkDeploySchedulerBackend調用了它的start

    override def start() {//CoarseGrainedSchedulerBackend的start方法,在這個方法里面創建了一個DriverActorsuper.start()// The endpoint for executors to talk to us//下面是為了啟動java子進程做準備,準備一下參數val driverUrl = AkkaUtils.address(AkkaUtils.protocol(actorSystem),SparkEnv.driverActorSystemName,conf.get("spark.driver.host"),conf.get("spark.driver.port"),CoarseGrainedSchedulerBackend.ACTOR_NAME)val args = Seq("--driver-url", driverUrl,"--executor-id", "{{EXECUTOR_ID}}","--hostname", "{{HOSTNAME}}","--cores", "{{CORES}}","--app-id", "{{APP_ID}}","--worker-url", "{{WORKER_URL}}")val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions").map(Utils.splitCommandString).getOrElse(Seq.empty)val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath").map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath").map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)// When testing, expose the parent class path to the child. This is processed by// compute-classpath.{cmd,sh} and makes all needed jars available to child processes// when the assembly is built with the "*-provided" profiles enabled.val testingClassPath =if (sys.props.contains("spark.testing")) {sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq} else {Nil}// Start executors with a few necessary configs for registering with the schedulerval sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)val javaOpts = sparkJavaOpts ++ extraJavaOpts//用command拼接參數,最終會啟動org.apache.spark.executor.CoarseGrainedExecutorBackend子進程val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")//用ApplicationDescription封裝了一些重要的參數val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,appUIAddress, sc.eventLogDir, sc.eventLogCodec)//在這里面創建ClientActorclient = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)//啟動ClientActorclient.start()waitForRegistration()}

    這里是拼裝了啟動Executor的一些參數,類名+參數 封裝成ApplicationDescription。最后傳給并創建AppClient并調用它的start方法

    AppClient創建時序圖

    AppClient的start方法

    接來下關注start方法

    def start() {// Just launch an actor; it will call back into the listener.actor = actorSystem.actorOf(Props(new ClientActor))}

    在start方法里創建了與Master通信的ClientActor,然后會調用它的preStart方法向Master注冊,接下來看它的preStart

    override def preStart() {context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])try {//ClientActor向Master注冊registerWithMaster()} catch {case e: Exception =>logWarning("Failed to connect to master", e)markDisconnected()context.stop(self)}}

    最后會調用該方法向所有Master注冊

    def tryRegisterAllMasters() {for (masterAkkaUrl <- masterAkkaUrls) {logInfo("Connecting to master " + masterAkkaUrl + "...")//t通過actorSelection拿到了Master的引用val actor = context.actorSelection(masterAkkaUrl)//向Master發送異步的注冊App的消息actor ! RegisterApplication(appDescription)}}

    ClientActor發送來的注冊App的消息,ApplicationDescription,他包含了需求的資源,要求啟動的Executor類名和一些參數
    Master的Receiver

    case RegisterApplication(description) => {if (state == RecoveryState.STANDBY) {// ignore, don't send response} else {logInfo("Registering app " + description.name)//創建App sender:ClientActorval app = createApplication(description, sender)//注冊AppregisterApplication(app)logInfo("Registered app " + description.name + " with ID " + app.id)//持久化ApppersistenceEngine.addApplication(app)//向ClientActor反饋信息,告訴他app注冊成功了sender ! RegisteredApplication(app.id, masterUrl)//TODO 調度任務schedule()}}

    registerApplication(app)

    def registerApplication(app: ApplicationInfo): Unit = {val appAddress = app.driver.path.addressif (addressToApp.contains(appAddress)) {logInfo("Attempted to re-register application at same address: " + appAddress)return}//把App放到集合里面applicationMetricsSystem.registerSource(app.appSource)apps += appidToApp(app.id) = appactorToApp(app.driver) = appaddressToApp(appAddress) = appwaitingApps += app}

    Master將接受的信息保存到集合并序列化后發送一個RegisteredApplication消息通知反饋給ClientActor,接著執行schedule()方法,該方法中會遍歷workers集合,并執行launchExecutor

    def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)//記錄該worker上使用了多少資源worker.addExecutor(exec)//Master向Worker發送啟動Executor的消息worker.actor ! LaunchExecutor(masterUrl,exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)//Master向ClientActor發送消息,告訴ClientActor executor已經啟動了exec.application.driver ! ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)}

    這里Master向Worker發送啟動Executor的消息

    `worker.actor ! LaunchExecutor(masterUrl,exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)`

    application.desc里包含了Executor類的啟動信息

    case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>。。。。。appDirectories(appId) = appLocalDirs//創建一個ExecutorRunner,這個很重要,保存了Executor的執行配置和參數val manager = new ExecutorRunner(appId,execId,appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),cores_,memory_,self,workerId,host,webUi.boundPort,publicAddress,sparkHome,executorDir,akkaUrl,conf,appLocalDirs, ExecutorState.LOADING)executors(appId + "/" + execId) = manager//TODO 開始啟動ExecutorRunnermanager.start()。。。。。。}}}

    Worker的Receiver接受到了啟動Executor的消息,appDesc對象保存了Command命令、Executor的實現類和參數

    manager.start()里會創建一個線程

    def start() {//啟動一個線程workerThread = new Thread("ExecutorRunner for " + fullId) {//用一個子線程來幫助Worker啟動Executor子進程override def run() { fetchAndRunExecutor() }}workerThread.start()// Shutdown hook that kills actors on shutdown.shutdownHook = new Thread() {override def run() {killProcess(Some("Worker shutting down"))}}Runtime.getRuntime.addShutdownHook(shutdownHook)}

    在線程中調用了fetchAndRunExecutor()方法,我們來看該方法

    def fetchAndRunExecutor() {try {// Launch the processval builder = CommandUtils.buildProcessBuilder(appDesc.command, memory,sparkHome.getAbsolutePath, substituteVariables)//構建命令val command = builder.command()logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))builder.directory(executorDir)builder.environment.put("SPARK_LOCAL_DIRS", appLocalDirs.mkString(","))// In case we are running this from within the Spark Shell, avoid creating a "scala"// parent process for the executor commandbuilder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")// Add webUI log urlsval baseUrl =s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")//啟動子進程process = builder.start()val header = "Spark Executor Command: %s\n%s\n\n".format(command.mkString("\"", "\" \"", "\""), "=" * 40)// Redirect its stdout and stderr to filesval stdout = new File(executorDir, "stdout")stdoutAppender = FileAppender(process.getInputStream, stdout, conf)val stderr = new File(executorDir, "stderr")Files.write(header, stderr, UTF_8)stderrAppender = FileAppender(process.getErrorStream, stderr, conf)// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)// or with nonzero exit code//開始執行,等待結束信號val exitCode = process.waitFor()。。。。}}

    這里面進行了類名和參數的拼裝,具體拼裝過程不用關心,最終builder.start()會以SystemRuntime的方式啟動一個子進程,這個是進程的類名是CoarseGrainedExecutorBackend
    到此Executor進程就啟動起來了

    Executor創建時序圖

    Executor任務調度對象啟動

    Executor進程后,就首先要執行main方法,main的代碼如下

    //Executor進程啟動的入口def main(args: Array[String]) {。。。。//拼裝參數while (!argv.isEmpty) {argv match {case ("--driver-url") :: value :: tail =>driverUrl = valueargv = tailcase ("--executor-id") :: value :: tail =>executorId = valueargv = tailcase ("--hostname") :: value :: tail =>hostname = valueargv = tailcase ("--cores") :: value :: tail =>cores = value.toIntargv = tailcase ("--app-id") :: value :: tail =>appId = valueargv = tailcase ("--worker-url") :: value :: tail =>// Worker url is used in spark standalone mode to enforce fate-sharing with workerworkerUrl = Some(value)argv = tailcase ("--user-class-path") :: value :: tail =>userClassPath += new URL(value)argv = tailcase Nil =>case tail =>System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")printUsageAndExit()}}if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||appId == null) {printUsageAndExit()}//開始執行Executorrun(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)}

    執行了run方法

    private def run(driverUrl: String,executorId: String,hostname: String,cores: Int,appId: String,workerUrl: Option[String],userClassPath: Seq[URL]) 。。。。。//通過actorSystem創建CoarseGrainedExecutorBackend -> Actor//CoarseGrainedExecutorBackend -> DriverActor通信env.actorSystem.actorOf(Props(classOf[CoarseGrainedExecutorBackend],driverUrl, executorId, sparkHostPort, cores, userClassPath, env),name = "Executor")。。。。。。}env.actorSystem.awaitTermination()}}

    run方法中創建了CoarseGrainedExecutorBackend的Actor對象用于準備和DriverActor通信,接著會繼續調用preStart生命周期方法

    override def preStart() {logInfo("Connecting to driver: " + driverUrl)//Executor跟DriverActor建立連接driver = context.actorSelection(driverUrl)//Executor向DriverActor發送消息driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])}

    Executor向DriverActor發送注冊的消息
    driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)

    DriverActor的receiver收到消息后

    def receiveWithLogging = {//Executor發送給DriverActor的注冊消息case RegisterExecutor(executorId, hostPort, cores, logUrls) =>Utils.checkHostPort(hostPort, "Host port expected " + hostPort)if (executorDataMap.contains(executorId)) {sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)} else {logInfo("Registered executor: " + sender + " with ID " + executorId)//DriverActor向Executor發送注冊成功的消息sender ! RegisteredExecutoraddressToExecutorId(sender.path.address) = executorIdtotalCoreCount.addAndGet(cores)totalRegisteredExecutors.addAndGet(1)val (host, _) = Utils.parseHostPort(hostPort)//將Executor的信息封裝起來val data = new ExecutorData(sender, sender.path.address, host, cores, cores, logUrls)// This must be synchronized because variables mutated// in this block are read when requesting executorsCoarseGrainedSchedulerBackend.this.synchronized {//往集合添加Executor的信息對象executorDataMap.put(executorId, data)if (numPendingExecutors > 0) {numPendingExecutors -= 1logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")}}listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))//將來用來執行真正的業務邏輯makeOffers()}

    DriverActor的receiver里將Executor信息封裝到Map中保存起來,并發送反饋消息 sender ! RegisteredExecutor
    給CoarseGrainedExecutorBackend

    override def receiveWithLogging = {case RegisteredExecutor =>logInfo("Successfully registered with driver")val (hostname, _) = Utils.parseHostPort(hostPort)executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)

    CoarseGrainedExecutorBackend收到消息后創建一個Executor對象用于準備任務的執行,到此Executor的創建就完成了,接下來下篇介紹任務的調度。

    總結

    以上是生活随笔為你收集整理的spark源码分析之Executor启动与任务提交篇的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。