日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark源码分析之Worker

發(fā)布時(shí)間:2024/1/23 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark源码分析之Worker 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
Spark支持三種模式的部署:YARN、Standalone以及Mesos。 Worker節(jié)點(diǎn)是Spark的工作節(jié)點(diǎn),用于執(zhí)行提交的作業(yè)。我們先從Worker節(jié)點(diǎn)的啟動(dòng)開始介紹。
  Spark中Worker的啟動(dòng)有多種方式,但是最終調(diào)用的都是org.apache.spark.deploy.worker.Worker類,啟動(dòng)Worker節(jié)點(diǎn)的時(shí)候可以傳很多的參數(shù):內(nèi)存、核、工作目錄等。如果你不知道如何傳遞,沒關(guān)系,help一下即可:
[wyp@iteblogspark]$ ./bin/spark-classorg.apache.spark.deploy.worker.Worker -h Spark assembly has been built with Hive, including Datanucleus jars on classpath Usage: Worker [options] <master> Master must be a URL of the form spark://hostname:port Options: ??-c CORES, --cores CORES? Number of cores to use ??-m MEM, --memory MEM???? Amount of memory to use (e.g. 1000M, 2G) ??-d DIR, --work-dir DIR?? Directory to run apps in (default: SPARK_HOME/work) ??-i HOST, --ip IP???????? Hostname to listen on (deprecated, please use --host or -h) ??-h HOST, --host HOST???? Hostname to listen on ??-p PORT, --port PORT???? Port to listen on (default: random) ??--webui-port PORT??????? Portfor web UI (default:8081)

  從上面的輸出我們可以看出Worker的啟動(dòng)支持多達(dá)7個(gè)參數(shù)!這樣每個(gè)都這樣輸入豈不是很麻煩?其實(shí),我們不用擔(dān)心,Worker節(jié)點(diǎn)啟動(dòng)地時(shí)候?qū)⑾茸x取conf/spark-env.sh里面的配置,這些參數(shù)配置的解析都是由Worker中的WorkerArguments類進(jìn)行解析的。如果你沒有設(shè)置內(nèi)存,那么將會(huì)把Worker啟動(dòng)所在機(jī)器的所有內(nèi)存(會(huì)預(yù)先留下1G內(nèi)存給操作系統(tǒng))分給Worker,具體的代碼實(shí)現(xiàn)如下:

def inferDefaultMemory(): Int = { ????valibmVendor = System.getProperty("java.vendor").contains("IBM") ????vartotalMb = 0 ????try{ ??????valbean = ManagementFactory.getOperatingSystemMXBean() ??????if(ibmVendor) { ????????valbeanClass = Class.forName("com.ibm.lang.management.OperatingSystemMXBean") ????????valmethod = beanClass.getDeclaredMethod("getTotalPhysicalMemory") ????????totalMb= (method.invoke(bean).asInstanceOf[Long] /1024 / 1024).toInt ??????}else { ????????valbeanClass = Class.forName("com.sun.management.OperatingSystemMXBean") ????????valmethod = beanClass.getDeclaredMethod("getTotalPhysicalMemorySize") ????????totalMb= (method.invoke(bean).asInstanceOf[Long] /1024 / 1024).toInt ??????} ????}catch { ??????casee: Exception => { ????????totalMb= 2*1024 ????????System.out.println("Failed to get total physical memory. Using "+ totalMb + " MB") ??????} ????} ????// Leave out 1 GB for the operating system, but don't return a negative memory size ????math.max(totalMb -1024, 512) ??}

  同樣,如果你沒設(shè)置cores,那么Spark將會(huì)獲取你機(jī)器的所有可用的核作為參數(shù)傳進(jìn)去。解析完參數(shù)之后,將運(yùn)行preStart函數(shù),進(jìn)行一些啟動(dòng)相關(guān)的操作,比如判斷是否已經(jīng)向Master注冊(cè)過,創(chuàng)建工作目錄,啟動(dòng)Worker的WEB UI,向Master進(jìn)行注冊(cè)等操作,如下:

overridedef preStart() { ??assert(!registered) ??logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format( ????host, port, cores, Utils.megabytesToString(memory))) ??logInfo("Spark home: "+ sparkHome) ??createWorkDir() ??context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) ??webUi= newWorkerWebUI(this, workDir, Some(webUiPort)) ??webUi.bind() ??registerWithMaster() ??metricsSystem.registerSource(workerSource) ??metricsSystem.start() }

  Worker向Master注冊(cè)的超時(shí)時(shí)間為20秒,如果在這20秒內(nèi)沒有成功地向Master注冊(cè),那么將會(huì)進(jìn)行重試,重試的次數(shù)為3,如過重試的次數(shù)大于等于3,那么將無法啟動(dòng)Worker,這時(shí)候,你就該看看你的網(wǎng)絡(luò)環(huán)境或者你的Master是否存在問題了。
Worker在運(yùn)行的過程中將會(huì)觸發(fā)許多的事件, 比如:RegisteredWorker、SendHeartbeat、WorkDirCleanup以及MasterChanged等等,收到不同的事件,Worker進(jìn)行不同的操作。比如,如果需要運(yùn)行一個(gè)作業(yè),Worker將會(huì)啟動(dòng)一個(gè)或多個(gè)ExecutorRunner,具體的代碼可參見receiveWithLogging函數(shù):

overridedef receiveWithLogging= { ????caseRegisteredWorker(masterUrl, masterWebUiUrl) => ????caseSendHeartbeat => ????caseWorkDirCleanup => ????caseMasterChanged(masterUrl, masterWebUiUrl) => ????caseHeartbeat => ????? ????caseRegisterWorkerFailed(message) => ????? ????caseLaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_)=> ?????? ????caseExecutorStateChanged(appId, execId, state, message, exitStatus)=> ??????? ????caseKillExecutor(masterUrl, appId, execId) => ??????? ????caseLaunchDriver(driverId, driverDesc) => { ?????? ????caseKillDriver(driverId) => { ????caseDriverStateChanged(driverId, state, exception) => { ?????? ????casex: DisassociatedEvent if x.remoteAddress == masterAddress => ???? ????caseRequestWorkerState => { ??}

  上面的代碼是經(jīng)過處理的,其實(shí)receiveWithLogging 方法是從ActorLogReceive繼承下來的。
  當(dāng)Worker節(jié)點(diǎn)Stop的時(shí)候,將會(huì)執(zhí)行postStop函數(shù),如下:

overridedef postStop() { ??metricsSystem.report() ??registrationRetryTimer.foreach(_.cancel()) ??executors.values.foreach(_.kill()) ??drivers.values.foreach(_.kill()) ??webUi.stop() ??metricsSystem.stop() }
  殺掉所有還未執(zhí)行完的executors、drivers等,操作。這方法也是從Actor繼承下來的。

總結(jié)

以上是生活随笔為你收集整理的Spark源码分析之Worker的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。