日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

hadoop临时文件 jar包_hadoop之Mapper/reducer源码分析之二

發(fā)布時間:2025/3/11 编程问答 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 hadoop临时文件 jar包_hadoop之Mapper/reducer源码分析之二 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

若當(dāng)前JobClient (0.22 hadoop) 運行在YARN.則job提交任務(wù)運行在YARNRunner

Hadoop Yarn 框架原理及運作機制

主要步驟

  • 作業(yè)提交
  • 作業(yè)初始化
  • 資源申請與任務(wù)分配
  • 任務(wù)執(zhí)行

具體步驟

在運行作業(yè)之前,Resource Manager和Node Manager都已經(jīng)啟動,所以在上圖中,Resource Manager進程和Node Manager進程不需要啟動

  • 1. 客戶端進程通過runJob(實際中一般使用waitForCompletion提交作業(yè))在客戶端提交Map Reduce作業(yè)(在Yarn中,作業(yè)一般稱為Application應(yīng)用程序)
  • 2. 客戶端向Resource Manager申請應(yīng)用程序ID(application id),作為本次作業(yè)的唯一標(biāo)識
  • 3. 客戶端程序?qū)⒆鳂I(yè)相關(guān)的文件(通常是指作業(yè)本身的jar包以及這個jar包依賴的第三方的jar),保存到HDFS上。也就是說Yarn based MR通過HDFS共享程序的jar包,供Task進程讀取
  • 4. 客戶端通過runJob向ResourceManager提交應(yīng)用程序
  • 5.a/5.b. Resource Manager收到來自客戶端的提交作業(yè)請求后,將請求轉(zhuǎn)發(fā)給作業(yè)調(diào)度組件(Scheduler),Scheduler分配一個Container,然后Resource Manager在這個Container中啟動Application Master進程,并交由Node Manager對Application Master進程進行管理
  • 6. Application Master初始化作業(yè)(應(yīng)用程序),初始化動作包括創(chuàng)建監(jiān)聽對象以監(jiān)聽作業(yè)的執(zhí)行情況,包括監(jiān)聽任務(wù)匯報的任務(wù)執(zhí)行進度以及是否完成(不同的計算框架為集成到Y(jié)ARN資源調(diào)度框架中,都要提供不同的ApplicationMaster,比如Spark、Storm框架為了運行在Yarn之上,它們都提供了ApplicationMaster)
  • 7. Application Master根據(jù)作業(yè)代碼中指定的數(shù)據(jù)地址(數(shù)據(jù)源一般來自HDFS)進行數(shù)據(jù)分片,以確定Mapper任務(wù)數(shù),具體每個Mapper任務(wù)發(fā)往哪個計算節(jié)點,Hadoop會考慮數(shù)據(jù)本地性,本地數(shù)據(jù)本地性、本機架數(shù)據(jù)本地性以及最后跨機架數(shù)據(jù)本地性)。同時還會計算Reduce任務(wù)數(shù),Reduce任務(wù)數(shù)是在程序代碼中指定的,通過job.setNumReduceTask顯式指定的
  • 8.如下幾點是Application Master向Resource Manager申請資源的細(xì)節(jié)
  • 8.1 Application Master根據(jù)數(shù)據(jù)分片確定的Mapper任務(wù)數(shù)以及Reducer任務(wù)數(shù)向Resource Manager申請計算資源(計算資源主要指的是內(nèi)存和CPU,在Hadoop Yarn中,使用Container這個概念來描述計算單位,即計算資源是以Container為單位的,一個Container包含一定數(shù)量的內(nèi)存和CPU內(nèi)核數(shù))。
  • 8.2 Application Master是通過向Resource Manager發(fā)送Heart Beat心跳包進行資源申請的,申請時,請求中還會攜帶任務(wù)的數(shù)據(jù)本地性等信息,使得Resource Manager在分配資源時,不同的Task能夠分配到的計算資源盡可能滿足數(shù)據(jù)本地性
  • 8.3 Application Master向Resource Manager資源申請時,還會攜帶內(nèi)存數(shù)量信息,默認(rèn)情況下,Map任務(wù)和Reduce任務(wù)都會分陪1G內(nèi)存,這個值是可以通過參數(shù)mapreduce.map.memory.mb and mapreduce.reduce.memory.mb進行修改。

5. YARNRunner

@Override public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, InterruptedException { addHistoryToken(ts); // Construct necessary information to start the MR AM ApplicationSubmissionContext appContext = createApplicationSubmissionContext(conf, jobSubmitDir, ts); // Submit to ResourceManager try { ApplicationId applicationId = resMgrDelegate.submitApplication(appContext); ApplicationReport appMaster = resMgrDelegate .getApplicationReport(applicationId); String diagnostics = (appMaster == null ? "application report is null" : appMaster.getDiagnostics()); if (appMaster == null || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) { throw new IOException("Failed to run job : " + diagnostics); } return clientCache.getClient(jobId).getJobStatus(jobId); } catch (YarnException e) { throw new IOException(e); } }

調(diào)用YarnClient的submitApplication()方法,其實現(xiàn)如下: 

6. YarnClientImpl

@Override public ApplicationId submitApplication(ApplicationSubmissionContext appContext) throws YarnException, IOException { ApplicationId applicationId = appContext.getApplicationId(); if (applicationId == null) { throw new ApplicationIdNotProvidedException( "ApplicationId is not provided in ApplicationSubmissionContext"); } SubmitApplicationRequest request = Records.newRecord(SubmitApplicationRequest.class); request.setApplicationSubmissionContext(appContext); // Automatically add the timeline DT into the CLC // Only when the security and the timeline service are both enabled if (isSecurityEnabled() && timelineServiceEnabled) { addTimelineDelegationToken(appContext.getAMContainerSpec()); } //TODO: YARN-1763:Handle RM failovers during the submitApplication call. rmClient.submitApplication(request); int pollCount = 0; long startTime = System.currentTimeMillis(); EnumSet waitingStates = EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING, YarnApplicationState.SUBMITTED); EnumSet failToSubmitStates = EnumSet.of(YarnApplicationState.FAILED, YarnApplicationState.KILLED); while (true) { try { ApplicationReport appReport = getApplicationReport(applicationId); YarnApplicationState state = appReport.getYarnApplicationState(); if (!waitingStates.contains(state)) { if(failToSubmitStates.contains(state)) { throw new YarnException("Failed to submit " + applicationId + " to YARN : " + appReport.getDiagnostics()); } LOG.info("Submitted application " + applicationId); break; } long elapsedMillis = System.currentTimeMillis() - startTime; if (enforceAsyncAPITimeout() && elapsedMillis >= asyncApiPollTimeoutMillis) { throw new YarnException("Timed out while waiting for application " + applicationId + " to be submitted successfully"); } // Notify the client through the log every 10 poll, in case the client // is blocked here too long. if (++pollCount % 10 == 0) { LOG.info("Application submission is not finished, " + "submitted application " + applicationId + " is still in " + state); } try { Thread.sleep(submitPollIntervalMillis); } catch (InterruptedException ie) { LOG.error("Interrupted while waiting for application " + applicationId + " to be successfully submitted."); } } catch (ApplicationNotFoundException ex) { // FailOver or RM restart happens before RMStateStore saves // ApplicationState LOG.info("Re-submit application " + applicationId + "with the " + "same ApplicationSubmissionContext"); rmClient.submitApplication(request); } } return applicationId; }

7. ClientRMService

ClientRMService是resource manager的客戶端接口。這個模塊處理從客戶端到resource mananger的rpc接口。

@Override public SubmitApplicationResponse submitApplication( SubmitApplicationRequest request) throws YarnException { ApplicationSubmissionContext submissionContext = request .getApplicationSubmissionContext(); ApplicationId applicationId = submissionContext.getApplicationId(); // ApplicationSubmissionContext needs to be validated for safety - only // those fields that are independent of the RM's configuration will be // checked here, those that are dependent on RM configuration are validated // in RMAppManager. String user = null; try { // Safety user = UserGroupInformation.getCurrentUser().getShortUserName(); } catch (IOException ie) { LOG.warn("Unable to get the current user.

總結(jié)

以上是生活随笔為你收集整理的hadoop临时文件 jar包_hadoop之Mapper/reducer源码分析之二的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 原创av| 一区二区欧美精品 | 91天堂在线视频 | 五月99久久婷婷国产综合亚洲 | 欧美性免费 | 国产美女精品久久久 | 在线观看国产一区二区三区 | 欧美一级片在线 | 国产尤物视频 | 麻豆精品国产传媒mv男同 | 91视频色 | 男人天堂网址 | 黄片毛片在线看 | 自拍视频在线播放 | 艳妇臀荡乳欲伦交换在线播放 | 自拍偷拍欧美视频 | 久久91亚洲 | 亚洲一区二区日韩 | a天堂资源 | 色骚综合 | 自拍偷拍 亚洲 | 欧美日韩不卡一区二区三区 | 丰满秘书被猛烈进入高清播放在 | 亚洲一区二区三区高清视频 | 夜夜春影院 | 国产精品美女www爽爽爽 | 久久久国产精品黄毛片 | 午夜激情国产 | 一区二区三区成人 | 国产精品久久久毛片 | 人妻在线一区二区三区 | 成人91免费 | 久久免费网| 中国黄色一级毛片 | 久久中文字幕av | 男人天堂亚洲天堂 | 韩国久久久 | 玉丸(双性调教) | 91影院在线观看 | 国产一区不卡视频 | 亚日韩一区 | 男人添女人下部高潮全视频 | av福利网 | 就要操就要日 | 亚洲精品国产一区 | 少妇久久久久久被弄高潮 | 神马午夜电影一区二区三区在线观看 | 欧美浓毛大泬视频 | 重囗味sm一区二区三区 | 国产一级片 | 久久精品噜噜噜成人 | 天堂8在线 | 国产乱码精品一品二品 | 女女h百合无遮羞羞漫画软件 | 久久精品一区二 | 欧美福利精品 | 91插插插插插插插插 | 欧洲做受高潮欧美裸体艺术 | 亚洲欧美自拍一区 | 射久久久 | 九九在线精品视频 | 上原亚衣在线观看 | 欧美亚洲精品一区二区 | 亚洲精品国产无码 | 亚洲午夜av久久乱码 | 国产极品探花 | 免费在线观看一区二区 | 特级西西444www大胆免费看 | 久久久96人妻无码精品 | 三级网站在线播放 | 老司机深夜福利在线观看 | 久久久久久久久久久久久av | 精品第一页 | 亚洲午夜国产 | 国产又粗又硬又黄的视频 | 精品国产乱 | 久草免费看 | 精品人妻一区二区三区含羞草 | 久久大综合 | 欧美精品在线第一页 | 亚洲乱仑 | 四虎中文字幕 | www插插 | 毛片网站网址 | 亚洲在线播放 | 精品免费国产一区二区三区四区 | 国产免费福利 | 色呦呦网站入口 | 国产欧美精品一区二区在线播放 | 亚洲av中文无码乱人伦在线观看 | 亚洲色图自拍 | 久久久久久在线观看 | 久草国产在线视频 | 欧美亚洲一级 | 中文字幕乱伦视频 | 久久久无码精品亚洲无少妇 | 中国爆后菊女人的视频 | 国产乱子伦农村叉叉叉 | 国产成人在线一区二区 |