Spark源码分析之Worker
Spark中Worker的啟動(dòng)有多種方式,但是最終調(diào)用的都是org.apache.spark.deploy.worker.Worker類,啟動(dòng)Worker節(jié)點(diǎn)的時(shí)候可以傳很多的參數(shù):內(nèi)存、核、工作目錄等。如果你不知道如何傳遞,沒關(guān)系,help一下即可:
| [wyp@iteblogspark]$ ./bin/spark-classorg.apache.spark.deploy.worker.Worker -h Spark assembly has been built with Hive, including Datanucleus jars on classpath Usage: Worker [options] <master> Master must be a URL of the form spark://hostname:port Options: ??-c CORES, --cores CORES? Number of cores to use ??-m MEM, --memory MEM???? Amount of memory to use (e.g. 1000M, 2G) ??-d DIR, --work-dir DIR?? Directory to run apps in (default: SPARK_HOME/work) ??-i HOST, --ip IP???????? Hostname to listen on (deprecated, please use --host or -h) ??-h HOST, --host HOST???? Hostname to listen on ??-p PORT, --port PORT???? Port to listen on (default: random) ??--webui-port PORT??????? Portfor web UI (default:8081) |
從上面的輸出我們可以看出Worker的啟動(dòng)支持多達(dá)7個(gè)參數(shù)!這樣每個(gè)都這樣輸入豈不是很麻煩?其實(shí),我們不用擔(dān)心,Worker節(jié)點(diǎn)啟動(dòng)地時(shí)候?qū)⑾茸x取conf/spark-env.sh里面的配置,這些參數(shù)配置的解析都是由Worker中的WorkerArguments類進(jìn)行解析的。如果你沒有設(shè)置內(nèi)存,那么將會(huì)把Worker啟動(dòng)所在機(jī)器的所有內(nèi)存(會(huì)預(yù)先留下1G內(nèi)存給操作系統(tǒng))分給Worker,具體的代碼實(shí)現(xiàn)如下:
| def inferDefaultMemory(): Int = { ????valibmVendor = System.getProperty("java.vendor").contains("IBM") ????vartotalMb = 0 ????try{ ??????valbean = ManagementFactory.getOperatingSystemMXBean() ??????if(ibmVendor) { ????????valbeanClass = Class.forName("com.ibm.lang.management.OperatingSystemMXBean") ????????valmethod = beanClass.getDeclaredMethod("getTotalPhysicalMemory") ????????totalMb= (method.invoke(bean).asInstanceOf[Long] /1024 / 1024).toInt ??????}else { ????????valbeanClass = Class.forName("com.sun.management.OperatingSystemMXBean") ????????valmethod = beanClass.getDeclaredMethod("getTotalPhysicalMemorySize") ????????totalMb= (method.invoke(bean).asInstanceOf[Long] /1024 / 1024).toInt ??????} ????}catch { ??????casee: Exception => { ????????totalMb= 2*1024 ????????System.out.println("Failed to get total physical memory. Using "+ totalMb + " MB") ??????} ????} ????// Leave out 1 GB for the operating system, but don't return a negative memory size ????math.max(totalMb -1024, 512) ??} |
同樣,如果你沒設(shè)置cores,那么Spark將會(huì)獲取你機(jī)器的所有可用的核作為參數(shù)傳進(jìn)去。解析完參數(shù)之后,將運(yùn)行preStart函數(shù),進(jìn)行一些啟動(dòng)相關(guān)的操作,比如判斷是否已經(jīng)向Master注冊(cè)過,創(chuàng)建工作目錄,啟動(dòng)Worker的WEB UI,向Master進(jìn)行注冊(cè)等操作,如下:
| overridedef preStart() { ??assert(!registered) ??logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format( ????host, port, cores, Utils.megabytesToString(memory))) ??logInfo("Spark home: "+ sparkHome) ??createWorkDir() ??context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) ??webUi= newWorkerWebUI(this, workDir, Some(webUiPort)) ??webUi.bind() ??registerWithMaster() ??metricsSystem.registerSource(workerSource) ??metricsSystem.start() } |
Worker向Master注冊(cè)的超時(shí)時(shí)間為20秒,如果在這20秒內(nèi)沒有成功地向Master注冊(cè),那么將會(huì)進(jìn)行重試,重試的次數(shù)為3,如過重試的次數(shù)大于等于3,那么將無法啟動(dòng)Worker,這時(shí)候,你就該看看你的網(wǎng)絡(luò)環(huán)境或者你的Master是否存在問題了。
Worker在運(yùn)行的過程中將會(huì)觸發(fā)許多的事件, 比如:RegisteredWorker、SendHeartbeat、WorkDirCleanup以及MasterChanged等等,收到不同的事件,Worker進(jìn)行不同的操作。比如,如果需要運(yùn)行一個(gè)作業(yè),Worker將會(huì)啟動(dòng)一個(gè)或多個(gè)ExecutorRunner,具體的代碼可參見receiveWithLogging函數(shù):
| overridedef receiveWithLogging= { ????caseRegisteredWorker(masterUrl, masterWebUiUrl) => ????caseSendHeartbeat => ????caseWorkDirCleanup => ????caseMasterChanged(masterUrl, masterWebUiUrl) => ????caseHeartbeat => ????? ????caseRegisterWorkerFailed(message) => ????? ????caseLaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_)=> ?????? ????caseExecutorStateChanged(appId, execId, state, message, exitStatus)=> ??????? ????caseKillExecutor(masterUrl, appId, execId) => ??????? ????caseLaunchDriver(driverId, driverDesc) => { ?????? ????caseKillDriver(driverId) => { ????caseDriverStateChanged(driverId, state, exception) => { ?????? ????casex: DisassociatedEvent if x.remoteAddress == masterAddress => ???? ????caseRequestWorkerState => { ??} |
上面的代碼是經(jīng)過處理的,其實(shí)receiveWithLogging 方法是從ActorLogReceive繼承下來的。
當(dāng)Worker節(jié)點(diǎn)Stop的時(shí)候,將會(huì)執(zhí)行postStop函數(shù),如下:
| overridedef postStop() { ??metricsSystem.report() ??registrationRetryTimer.foreach(_.cancel()) ??executors.values.foreach(_.kill()) ??drivers.values.foreach(_.kill()) ??webUi.stop() ??metricsSystem.stop() } |
總結(jié)
以上是生活随笔為你收集整理的Spark源码分析之Worker的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark配置属性详解
- 下一篇: 自定义线程类中实例变量与其他线程共享与不