日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Map-Reduce的过程解析

發布時間:2025/6/15 编程问答 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Map-Reduce的过程解析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、客戶端

Map-Reduce的過程首先是由客戶端提交一個任務開始的。

提交任務主要是通過JobClient.runJob(JobConf)靜態函數實現的:

public static RunningJob runJob(JobConf job) throws IOException {

? //首先生成一個JobClient對象

? JobClient jc = new JobClient(job);

? ……

? //調用submitJob來提交一個任務

? running = jc.submitJob(job);

? JobID jobId = running.getID();

? ……

? while (true) {

???? //while循環中不斷得到此任務的狀態,并打印到客戶端console中

? }

? return running;

}

其中JobClient的submitJob函數實現如下:

?

public RunningJob submitJob(JobConf job) throws FileNotFoundException,

??????????????????????????????? InvalidJobConfException, IOException {

? //從JobTracker得到當前任務的id

? JobID jobId = jobSubmitClient.getNewJobId();

? //準備將任務運行所需要的要素寫入HDFS:

? //任務運行程序所在的jar封裝成job.jar

? //任務所要處理的input split信息寫入job.split

? //任務運行的配置項匯總寫入job.xml

? Path submitJobDir = new Path(getSystemDir(), jobId.toString());

? Path submitJarFile = new Path(submitJobDir, "job.jar");

? Path submitSplitFile = new Path(submitJobDir, "job.split");

? //此處將-libjars命令行指定的jar上傳至HDFS

? configureCommandLineOptions(job, submitJobDir, submitJarFile);

? Path submitJobFile = new Path(submitJobDir, "job.xml");

? ……

? //通過input format的格式獲得相應的input split,默認類型為FileSplit

? InputSplit[] splits =

??? job.getInputFormat().getSplits(job, job.getNumMapTasks());

?

? // 生成一個寫入流,將input split得信息寫入job.split文件

? FSDataOutputStream out = FileSystem.create(fs,

????? submitSplitFile, new FsPermission(JOB_FILE_PERMISSION));

? try {

??? //寫入job.split文件的信息包括:split文件頭,split文件版本號,split的個數,接著依次寫入每一個input split的信息。

??? //對于每一個input split寫入:split類型名(默認FileSplit),split的大小,split的內容(對于FileSplit,寫入文件名,此split在文件中的起始位置),split的location信息(即在那個DataNode上)。

??? writeSplitsFile(splits, out);

? } finally {

??? out.close();

? }

? job.set("mapred.job.split.file", submitSplitFile.toString());

? //根據split的個數設定map task的個數

? job.setNumMapTasks(splits.length);

? // 寫入job的配置信息入job.xml文件??????

? out = FileSystem.create(fs, submitJobFile,

????? new FsPermission(JOB_FILE_PERMISSION));

? try {

??? job.writeXml(out);

? } finally {

??? out.close();

? }

? //真正的調用JobTracker來提交任務

? JobStatus status = jobSubmitClient.submitJob(jobId);

? ……

}

?

二、JobTracker

JobTracker作為一個單獨的JVM運行,其運行的main函數主要調用有下面兩部分:

  • 調用靜態函數startTracker(new JobConf())創建一個JobTracker對象
  • 調用JobTracker.offerService()函數提供服務

在JobTracker的構造函數中,會生成一個taskScheduler成員變量,來進行Job的調度,默認為JobQueueTaskScheduler,也即按照FIFO的方式調度任務。

在offerService函數中,則調用taskScheduler.start(),在這個函數中,為JobTracker(也即taskScheduler的taskTrackerManager)注冊了兩個Listener:

  • JobQueueJobInProgressListener jobQueueJobInProgressListener用于監控job的運行狀態
  • EagerTaskInitializationListener eagerTaskInitializationListener用于對Job進行初始化

EagerTaskInitializationListener中有一個線程JobInitThread,不斷得到jobInitQueue中的JobInProgress對象,調用JobInProgress對象的initTasks函數對任務進行初始化操作。

在上一節中,客戶端調用了JobTracker.submitJob函數,此函數首先生成一個JobInProgress對象,然后調用addJob函數,其中有如下的邏輯:

synchronized (jobs) {

? synchronized (taskScheduler) {

??? jobs.put(job.getProfile().getJobID(), job);

??? //對JobTracker的每一個listener都調用jobAdded函數

??? for (JobInProgressListener listener : jobInProgressListeners) {

????? listener.jobAdded(job);

??? }

? }

}

?

EagerTaskInitializationListener的jobAdded函數就是向jobInitQueue中添加一個JobInProgress對象,于是自然觸發了此Job的初始化操作,由JobInProgress得initTasks函數完成:

public synchronized void initTasks() throws IOException {

? ……

? //從HDFS中讀取job.split文件從而生成input splits

? String jobFile = profile.getJobFile();

? Path sysDir = new Path(this.jobtracker.getSystemDir());

? FileSystem fs = sysDir.getFileSystem(conf);

? DataInputStream splitFile =

??? fs.open(new Path(conf.get("mapred.job.split.file")));

? JobClient.RawSplit[] splits;

? try {

??? splits = JobClient.readSplitFile(splitFile);

? } finally {

??? splitFile.close();

? }

? //map task的個數就是input split的個數

? numMapTasks = splits.length;

? //為每個map tasks生成一個TaskInProgress來處理一個input split

? maps = new TaskInProgress[numMapTasks];

? for(int i=0; i < numMapTasks; ++i) {

??? inputLength += splits[i].getDataLength();

??? maps[i] = new TaskInProgress(jobId, jobFile,

???????????????????????????????? splits[i],

???????????????????????????????? jobtracker, conf, this, i);

? }

? //對于map task,將其放入nonRunningMapCache,是一個Map<Node, List<TaskInProgress>>,也即對于map task來講,其將會被分配到其input split所在的Node上。nonRunningMapCache將在JobTracker向TaskTracker分配map task的時候使用。

? if (numMapTasks > 0) {?
??? nonRunningMapCache = createCache(splits, maxLevel);
? }

?

? //創建reduce task

? this.reduces = new TaskInProgress[numReduceTasks];

? for (int i = 0; i < numReduceTasks; i++) {

??? reduces[i] = new TaskInProgress(jobId, jobFile,

??????????????????????????????????? numMapTasks, i,

??????????????????????????????????? jobtracker, conf, this);

??? //reduce task放入nonRunningReduces,其將在JobTracker向TaskTracker分配reduce task的時候使用。

??? nonRunningReduces.add(reduces[i]);

? }

?

? //創建兩個cleanup task,一個用來清理map,一個用來清理reduce.

? cleanup = new TaskInProgress[2];

? cleanup[0] = new TaskInProgress(jobId, jobFile, splits[0],

????????? jobtracker, conf, this, numMapTasks);

? cleanup[0].setJobCleanupTask();

? cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,

???????????????????? numReduceTasks, jobtracker, conf, this);

? cleanup[1].setJobCleanupTask();

? //創建兩個初始化 task,一個初始化map,一個初始化reduce.

? setup = new TaskInProgress[2];

? setup[0] = new TaskInProgress(jobId, jobFile, splits[0],

????????? jobtracker, conf, this, numMapTasks + 1 );

? setup[0].setJobSetupTask();

? setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,

???????????????????? numReduceTasks + 1, jobtracker, conf, this);

? setup[1].setJobSetupTask();

? tasksInited.set(true);//初始化完畢

? ……

}

?

三、TaskTracker

TaskTracker也是作為一個單獨的JVM來運行的,在其main函數中,主要是調用了new TaskTracker(conf).run(),其中run函數主要調用了:

State offerService() throws Exception {

? long lastHeartbeat = 0;

? //TaskTracker進行是一直存在的

? while (running && !shuttingDown) {

????? ……

????? long now = System.currentTimeMillis();

????? //每隔一段時間就向JobTracker發送heartbeat

????? long waitTime = heartbeatInterval - (now - lastHeartbeat);

????? if (waitTime > 0) {

??????? synchronized(finishedCount) {

????????? if (finishedCount[0] == 0) {

??????????? finishedCount.wait(waitTime);

????????? }

????????? finishedCount[0] = 0;

??????? }

????? }

????? ……

????? //發送Heartbeat到JobTracker,得到response

????? HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);

????? ……

???? //從Response中得到此TaskTracker需要做的事情

????? TaskTrackerAction[] actions = heartbeatResponse.getActions();

????? ……

????? if (actions != null){

??????? for(TaskTrackerAction action: actions) {

????????? if (action instanceof LaunchTaskAction) {

??????????? //如果是運行一個新的Task,則將Action添加到任務隊列中

??????????? addToTaskQueue((LaunchTaskAction)action);

????????? } else if (action instanceof CommitTaskAction) {

??????????? CommitTaskAction commitAction = (CommitTaskAction)action;

??????????? if (!commitResponses.contains(commitAction.getTaskID())) {

????????????? commitResponses.add(commitAction.getTaskID());

??????????? }

????????? } else {

??????????? tasksToCleanup.put(action);

????????? }

??????? }

????? }

? }

? return State.NORMAL;

}

其中transmitHeartBeat主要邏輯如下:

private HeartbeatResponse transmitHeartBeat(long now) throws IOException {

? //每隔一段時間,在heartbeat中要返回給JobTracker一些統計信息

? boolean sendCounters;

? if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) {

??? sendCounters = true;

??? previousUpdate = now;

? }

? else {

??? sendCounters = false;

? }

? ……

? //報告給JobTracker,此TaskTracker的當前狀態

? if (status == null) {

??? synchronized (this) {

????? status = new TaskTrackerStatus(taskTrackerName, localHostname,

???????????????????????????????????? httpPort,

???????????????????????????????????? cloneAndResetRunningTaskStatuses(

?????????????????????????????????????? sendCounters),

???????????????????????????????????? failures,

???????????????????????????????????? maxCurrentMapTasks,

???????????????????????????????????? maxCurrentReduceTasks);

??? }

? }

? ……

? //當滿足下面的條件的時候,此TaskTracker請求JobTracker為其分配一個新的Task來運行:

? //當前TaskTracker正在運行的map task的個數小于可以運行的map task的最大個數

? //當前TaskTracker正在運行的reduce task的個數小于可以運行的reduce task的最大個數

? boolean askForNewTask;

? long localMinSpaceStart;

? synchronized (this) {

??? askForNewTask = (status.countMapTasks() < maxCurrentMapTasks ||

???????????????????? status.countReduceTasks() < maxCurrentReduceTasks) &&

??????????????????? acceptNewTasks;

??? localMinSpaceStart = minSpaceStart;

? }

? ……

? //向JobTracker發送heartbeat,這是一個RPC調用

? HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,

??????????????????????????????????????????????????????????? justStarted, askForNewTask,

??????????????????????????????????????????????????????????? heartbeatResponseId);

? ……

? return heartbeatResponse;

}

?

四、JobTracker

當JobTracker被RPC調用來發送heartbeat的時候,JobTracker的heartbeat(TaskTrackerStatus status,boolean initialContact, boolean acceptNewTasks, short responseId)函數被調用:

public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,

??????????????????????????????????????????????? boolean initialContact, boolean acceptNewTasks, short responseId)

? throws IOException {

? ……

? String trackerName = status.getTrackerName();

? ……

? short newResponseId = (short)(responseId + 1);

? ……

? HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);

? List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();

? //如果TaskTracker向JobTracker請求一個task運行

? if (acceptNewTasks) {

??? TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName);

??? if (taskTrackerStatus == null) {

????? LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);

??? } else {

????? //setup和cleanup的task優先級最高

????? List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);

????? if (tasks == null ) {

??????? //任務調度器分配任務

??????? tasks = taskScheduler.assignTasks(taskTrackerStatus);

????? }

????? if (tasks != null) {

??????? for (Task task : tasks) {

????????? //將任務放入actions列表,返回給TaskTracker

????????? expireLaunchingTasks.addNewTask(task.getTaskID());

????????? actions.add(new LaunchTaskAction(task));

??????? }

????? }

??? }

? }

? ……

? int nextInterval = getNextHeartbeatInterval();

? response.setHeartbeatInterval(nextInterval);

? response.setActions(

????????????????????? actions.toArray(new TaskTrackerAction[actions.size()]));

? ……

? return response;

}

默認的任務調度器為JobQueueTaskScheduler,其assignTasks如下:

public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)

??? throws IOException {

? ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();

? int numTaskTrackers = clusterStatus.getTaskTrackers();

? Collection<JobInProgress> jobQueue = jobQueueJobInProgressListener.getJobQueue();

? int maxCurrentMapTasks = taskTracker.getMaxMapTasks();

? int maxCurrentReduceTasks = taskTracker.getMaxReduceTasks();

? int numMaps = taskTracker.countMapTasks();

? int numReduces = taskTracker.countReduceTasks();

? //計算剩余的map和reduce的工作量:remaining

? int remainingReduceLoad = 0;

? int remainingMapLoad = 0;

? synchronized (jobQueue) {

??? for (JobInProgress job : jobQueue) {

????? if (job.getStatus().getRunState() == JobStatus.RUNNING) {

??????? int totalMapTasks = job.desiredMaps();

??????? int totalReduceTasks = job.desiredReduces();

??????? remainingMapLoad += (totalMapTasks - job.finishedMaps());

??????? remainingReduceLoad += (totalReduceTasks - job.finishedReduces());

????? }

??? }

? }

? //計算平均每個TaskTracker應有的工作量,remaining/numTaskTrackers是剩余的工作量除以TaskTracker的個數。

? int maxMapLoad = 0;

? int maxReduceLoad = 0;

? if (numTaskTrackers > 0) {

??? maxMapLoad = Math.min(maxCurrentMapTasks,

????????????????????????? (int) Math.ceil((double) remainingMapLoad /

????????????????????????????????????????? numTaskTrackers));

??? maxReduceLoad = Math.min(maxCurrentReduceTasks,

???????????????????????????? (int) Math.ceil((double) remainingReduceLoad

???????????????????????????????????????????? / numTaskTrackers));

? }

? ……

?

? //map優先于reduce,當TaskTracker上運行的map task數目小于平均的工作量,則向其分配map task

? if (numMaps < maxMapLoad) {

??? int totalNeededMaps = 0;

??? synchronized (jobQueue) {

????? for (JobInProgress job : jobQueue) {

??????? if (job.getStatus().getRunState() != JobStatus.RUNNING) {

????????? continue;

??????? }

??????? Task t = job.obtainNewMapTask(taskTracker, numTaskTrackers,

??????????? taskTrackerManager.getNumberOfUniqueHosts());

??????? if (t != null) {

????????? return Collections.singletonList(t);

??????? }

??????? ……

????? }

??? }

? }

? //分配完map task,再分配reduce task

? if (numReduces < maxReduceLoad) {

??? int totalNeededReduces = 0;

??? synchronized (jobQueue) {

????? for (JobInProgress job : jobQueue) {

??????? if (job.getStatus().getRunState() != JobStatus.RUNNING ||

??????????? job.numReduceTasks == 0) {

????????? continue;

??????? }

??????? Task t = job.obtainNewReduceTask(taskTracker, numTaskTrackers,

??????????? taskTrackerManager.getNumberOfUniqueHosts());

??????? if (t != null) {

????????? return Collections.singletonList(t);

??????? }

??????? ……

????? }

??? }

? }

? return null;

}

從上面的代碼中我們可以知道,JobInProgress的obtainNewMapTask是用來分配map task的,其主要調用findNewMapTask,根據TaskTracker所在的Node從nonRunningMapCache中查找TaskInProgress。JobInProgress的obtainNewReduceTask是用來分配reduce task的,其主要調用findNewReduceTask,從nonRunningReduces查找TaskInProgress。

?

五、TaskTracker

在向JobTracker發送heartbeat后,返回的reponse中有分配好的任務LaunchTaskAction,將其加入隊列,調用addToTaskQueue,如果是map task則放入mapLancher(類型為TaskLauncher),如果是reduce task則放入reduceLancher(類型為TaskLauncher):

private void addToTaskQueue(LaunchTaskAction action) {

? if (action.getTask().isMapTask()) {

??? mapLauncher.addToTaskQueue(action);

? } else {

??? reduceLauncher.addToTaskQueue(action);

? }

}

TaskLauncher是一個線程,其run函數從上面放入的queue中取出一個TaskInProgress,然后調用startNewTask(TaskInProgress tip)來啟動一個task,其又主要調用了localizeJob(TaskInProgress tip):

private void localizeJob(TaskInProgress tip) throws IOException {

? //首先要做的一件事情是有關Task的文件從HDFS拷貝的TaskTracker的本地文件系統中:job.split,job.xml以及job.jar

? Path localJarFile = null;

? Task t = tip.getTask();

? JobID jobId = t.getJobID();

? Path jobFile = new Path(t.getJobFile());

? ……

? Path localJobFile = lDirAlloc.getLocalPathForWrite(

????????????????????????????????? getLocalJobDir(jobId.toString())

????????????????????????????????? + Path.SEPARATOR + "job.xml",

????????????????????????????????? jobFileSize, fConf);

? RunningJob rjob = addTaskToJob(jobId, tip);

? synchronized (rjob) {

??? if (!rjob.localized) {

????? FileSystem localFs = FileSystem.getLocal(fConf);

????? Path jobDir = localJobFile.getParent();

????? ……

????? //將job.split拷貝到本地

????? systemFS.copyToLocalFile(jobFile, localJobFile);

????? JobConf localJobConf = new JobConf(localJobFile);

????? Path workDir = lDirAlloc.getLocalPathForWrite(

?????????????????????? (getLocalJobDir(jobId.toString())

?????????????????????? + Path.SEPARATOR + "work"), fConf);

????? if (!localFs.mkdirs(workDir)) {

??????? throw new IOException("Mkdirs failed to create "

??????????????????? + workDir.toString());

????? }

????? System.setProperty("job.local.dir", workDir.toString());

????? localJobConf.set("job.local.dir", workDir.toString());

????? // copy Jar file to the local FS and unjar it.

????? String jarFile = localJobConf.getJar();

????? long jarFileSize = -1;

????? if (jarFile != null) {

??????? Path jarFilePath = new Path(jarFile);

??????? localJarFile = new Path(lDirAlloc.getLocalPathForWrite(

?????????????????????????????????? getLocalJobDir(jobId.toString())

?????????????????????????????????? + Path.SEPARATOR + "jars",

?????????????????????????????????? 5 * jarFileSize, fConf), "job.jar");

??????? if (!localFs.mkdirs(localJarFile.getParent())) {

????????? throw new IOException("Mkdirs failed to create jars directory ");

??????? }

??????? //將job.jar拷貝到本地

??????? systemFS.copyToLocalFile(jarFilePath, localJarFile);

??????? localJobConf.setJar(localJarFile.toString());

?????? //將job得configuration寫成job.xml

??????? OutputStream out = localFs.create(localJobFile);

??????? try {

????????? localJobConf.writeXml(out);

??????? } finally {

????????? out.close();

??????? }

??????? // 解壓縮job.jar

??????? RunJar.unJar(new File(localJarFile.toString()),

???????????????????? new File(localJarFile.getParent().toString()));

????? }

????? rjob.localized = true;

????? rjob.jobConf = localJobConf;

??? }

? }

? //真正的啟動此Task

? launchTaskForJob(tip, new JobConf(rjob.jobConf));

}

當所有的task運行所需要的資源都拷貝到本地后,則調用launchTaskForJob,其又調用TaskInProgress的launchTask函數:

public synchronized void launchTask() throws IOException {

??? ……

??? //創建task運行目錄

??? localizeTask(task);

??? if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {

????? this.taskStatus.setRunState(TaskStatus.State.RUNNING);

??? }

??? //創建并啟動TaskRunner,對于MapTask,創建的是MapTaskRunner,對于ReduceTask,創建的是ReduceTaskRunner

??? this.runner = task.createRunner(TaskTracker.this, this);

??? this.runner.start();

??? this.taskStatus.setStartTime(System.currentTimeMillis());

}

TaskRunner是一個線程,其run函數如下:

public final void run() {

??? ……

??? TaskAttemptID taskid = t.getTaskID();

??? LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");

??? File jobCacheDir = null;

??? if (conf.getJar() != null) {

????? jobCacheDir = new File(

??????????????????????? new Path(conf.getJar()).getParent().toString());

??? }

??? File workDir = new File(lDirAlloc.getLocalPathToRead(

????????????????????????????? TaskTracker.getLocalTaskDir(

??????????????????????????????? t.getJobID().toString(),

??????????????????????????????? t.getTaskID().toString(),

??????????????????????????????? t.isTaskCleanupTask())

????????????????????????????? + Path.SEPARATOR + MRConstants.WORKDIR,

????????????????????????????? conf). toString());

??? FileSystem fileSystem;

??? Path localPath;

??? ……

??? //拼寫classpath

??? String baseDir;

??? String sep = System.getProperty("path.separator");

??? StringBuffer classPath = new StringBuffer();

??? // start with same classpath as parent process

??? classPath.append(System.getProperty("java.class.path"));

??? classPath.append(sep);

??? if (!workDir.mkdirs()) {

????? if (!workDir.isDirectory()) {

??????? LOG.fatal("Mkdirs failed to create " + workDir.toString());

????? }

??? }

??? String jar = conf.getJar();

??? if (jar != null) {??????

????? // if jar exists, it into workDir

????? File[] libs = new File(jobCacheDir, "lib").listFiles();

????? if (libs != null) {

??????? for (int i = 0; i < libs.length; i++) {

????????? classPath.append(sep);??????????? // add libs from jar to classpath

????????? classPath.append(libs[i]);

??????? }

????? }

????? classPath.append(sep);

????? classPath.append(new File(jobCacheDir, "classes"));

????? classPath.append(sep);

????? classPath.append(jobCacheDir);

??? }

??? ……

??? classPath.append(sep);

??? classPath.append(workDir);

??? //拼寫命令行java及其參數

??? Vector<String> vargs = new Vector<String>(8);

??? File jvm =

????? new File(new File(System.getProperty("java.home"), "bin"), "java");

??? vargs.add(jvm.toString());

??? String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m");

??? javaOpts = javaOpts.replace("@taskid@", taskid.toString());

??? String [] javaOptsSplit = javaOpts.split(" ");

??? String libraryPath = System.getProperty("java.library.path");

??? if (libraryPath == null) {

????? libraryPath = workDir.getAbsolutePath();

??? } else {

????? libraryPath += sep + workDir;

??? }

??? boolean hasUserLDPath = false;

??? for(int i=0; i<javaOptsSplit.length ;i++) {

????? if(javaOptsSplit[i].startsWith("-Djava.library.path=")) {

??????? javaOptsSplit[i] += sep + libraryPath;

??????? hasUserLDPath = true;

??????? break;

????? }

??? }

??? if(!hasUserLDPath) {

????? vargs.add("-Djava.library.path=" + libraryPath);

??? }

??? for (int i = 0; i < javaOptsSplit.length; i++) {

????? vargs.add(javaOptsSplit[i]);

??? }

??? //添加Child進程的臨時文件夾

??? String tmp = conf.get("mapred.child.tmp", "./tmp");

??? Path tmpDir = new Path(tmp);

??? if (!tmpDir.isAbsolute()) {

????? tmpDir = new Path(workDir.toString(), tmp);

??? }

??? FileSystem localFs = FileSystem.getLocal(conf);

??? if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()) {

????? throw new IOException("Mkdirs failed to create " + tmpDir.toString());

??? }

??? vargs.add("-Djava.io.tmpdir=" + tmpDir.toString());

??? // Add classpath.

??? vargs.add("-classpath");

??? vargs.add(classPath.toString());

??? //log文件夾

??? long logSize = TaskLog.getTaskLogLength(conf);

??? vargs.add("-Dhadoop.log.dir=" +

??????? new File(System.getProperty("hadoop.log.dir")

??????? ).getAbsolutePath());

??? vargs.add("-Dhadoop.root.logger=INFO,TLA");

??? vargs.add("-Dhadoop.tasklog.taskid=" + taskid);

??? vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);

??? // 運行map task和reduce task的子進程的main class是Child

??? vargs.add(Child.class.getName());? // main of Child

??? ……

??? //運行子進程

??? jvmManager.launchJvm(this,

??????? jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize,

??????????? workDir, env, pidFile, conf));

}

?

六、Child

真正的map task和reduce task都是在Child進程中運行的,Child的main函數的主要邏輯如下:

while (true) {

? //從TaskTracker通過網絡通信得到JvmTask對象

? JvmTask myTask = umbilical.getTask(jvmId);

? ……

? idleLoopCount = 0;

? task = myTask.getTask();

? taskid = task.getTaskID();

? isCleanup = task.isTaskCleanupTask();

? JobConf job = new JobConf(task.getJobFile());

? TaskRunner.setupWorkDir(job);

? numTasksToExecute = job.getNumTasksToExecutePerJvm();

? task.setConf(job);

? defaultConf.addResource(new Path(task.getJobFile()));

? ……

? //運行task

? task.run(job, umbilical);???????????? // run the task

? if (numTasksToExecute > 0 && ++numTasksExecuted == numTasksToExecute) {

??? break;

? }

}

6.1、MapTask

如果task是MapTask,則其run函數如下:

public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)

? throws IOException {

? //用于同TaskTracker進行通信,匯報運行狀況

? final Reporter reporter = getReporter(umbilical);

? startCommunicationThread(umbilical);

? initialize(job, reporter);

? ……

? //map task的輸出

? int numReduceTasks = conf.getNumReduceTasks();

? MapOutputCollector collector = null;

? if (numReduceTasks > 0) {

??? collector = new MapOutputBuffer(umbilical, job, reporter);

? } else {

??? collector = new DirectMapOutputCollector(umbilical, job, reporter);

? }

? //讀取input split,按照其中的信息,生成RecordReader來讀取數據

instantiatedSplit = (InputSplit)

????? ReflectionUtils.newInstance(job.getClassByName(splitClass), job);

? DataInputBuffer splitBuffer = new DataInputBuffer();

? splitBuffer.reset(split.getBytes(), 0, split.getLength());

? instantiatedSplit.readFields(splitBuffer);

? if (instantiatedSplit instanceof FileSplit) {

??? FileSplit fileSplit = (FileSplit) instantiatedSplit;

??? job.set("map.input.file", fileSplit.getPath().toString());

??? job.setLong("map.input.start", fileSplit.getStart());

??? job.setLong("map.input.length", fileSplit.getLength());

? }

? RecordReader rawIn =????????????????? // open input

??? job.getInputFormat().getRecordReader(instantiatedSplit, job, reporter);

? RecordReader in = isSkipping() ?

????? new SkippingRecordReader(rawIn, getCounters(), umbilical) :

????? new TrackedRecordReader(rawIn, getCounters());

? job.setBoolean("mapred.skip.on", isSkipping());

? //對于map task,生成一個MapRunnable,默認是MapRunner

? MapRunnable runner =

??? ReflectionUtils.newInstance(job.getMapRunnerClass(), job);

? try {

??? //MapRunner的run函數就是依次讀取RecordReader中的數據,然后調用Mapper的map函數進行處理。

??? runner.run(in, collector, reporter);?????

??? collector.flush();

? } finally {

??? in.close();?????????????????????????????? // close input

??? collector.close();

? }

? done(umbilical);

}

MapRunner的run函數就是依次讀取RecordReader中的數據,然后調用Mapper的map函數進行處理:

public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,

??????????????? Reporter reporter)

? throws IOException {

? try {

??? K1 key = input.createKey();

??? V1 value = input.createValue();

??? while (input.next(key, value)) {

????? mapper.map(key, value, output, reporter);

????? if(incrProcCount) {

??????? reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,

??????????? SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);

????? }

??? }

? } finally {

??? mapper.close();

? }

}

結果集全部收集到MapOutputBuffer中,其collect函數如下:

public synchronized void collect(K key, V value)

??? throws IOException {

? reporter.progress();

? ……

? //從此處看,此buffer是一個ring的數據結構

? final int kvnext = (kvindex + 1) % kvoffsets.length;

? spillLock.lock();

? try {

??? boolean kvfull;

??? do {

????? //在ring中,如果下一個空閑位置接上起始位置的話,則表示滿了

????? kvfull = kvnext == kvstart;

????? //在ring中計算是否需要將buffer寫入硬盤的閾值

????? final boolean kvsoftlimit = ((kvnext > kvend)

????????? ? kvnext - kvend > softRecordLimit

????????? : kvend - kvnext <= kvoffsets.length - softRecordLimit);

????? //如果到達閾值,則開始將buffer寫入硬盤,寫成spill文件。

????? //startSpill主要是notify一個背后線程SpillThread的run()函數,開始調用sortAndSpill()開始排序,合并,寫入硬盤

????? if (kvstart == kvend && kvsoftlimit) {

??????? startSpill();

????? }

????? //如果buffer滿了,則只能等待寫入完畢

????? if (kvfull) {

????????? while (kvstart != kvend) {

??????????? reporter.progress();

??????????? spillDone.await();

????????? }

????? }

??? } while (kvfull);

? } finally {

??? spillLock.unlock();

? }

? try {

??? //如果buffer不滿,則將key, value寫入buffer

??? int keystart = bufindex;

??? keySerializer.serialize(key);

??? final int valstart = bufindex;

??? valSerializer.serialize(value);

??? int valend = bb.markRecord();

??? //調用設定的partitioner,根據key, value取得partition id

??? final int partition = partitioner.getPartition(key, value, partitions);

??? mapOutputRecordCounter.increment(1);

??? mapOutputByteCounter.increment(valend >= keystart

??????? ? valend - keystart

??????? : (bufvoid - keystart) + valend);

??? //將parition id以及key, value在buffer中的偏移量寫入索引數組

??? int ind = kvindex * ACCTSIZE;

??? kvoffsets[kvindex] = ind;

??? kvindices[ind + PARTITION] = partition;

??? kvindices[ind + KEYSTART] = keystart;

??? kvindices[ind + VALSTART] = valstart;

??? kvindex = kvnext;

? } catch (MapBufferTooSmallException e) {

??? LOG.info("Record too large for in-memory buffer: " + e.getMessage());

??? spillSingleRecord(key, value);

??? mapOutputRecordCounter.increment(1);

??? return;

? }

}

內存buffer的格式如下:

(見幾位hadoop大俠的分析http://blog.csdn.net/HEYUTAO007/archive/2010/07/10/5725379.aspx?以及http://caibinbupt.javaeye.com/)

kvoffsets是為了寫入內存前排序使用的。

從上面可知,內存buffer寫入硬盤spill文件的函數為sortAndSpill:

private void sortAndSpill() throws IOException {

? ……

? FSDataOutputStream out = null;

? FSDataOutputStream indexOut = null;

? IFileOutputStream indexChecksumOut = null;

? //創建硬盤上的spill文件

? Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),

????????????????????????????????? numSpills, size);

? out = rfs.create(filename);

? ……

? final int endPosition = (kvend > kvstart)

??? ? kvend

??? : kvoffsets.length + kvend;

? //按照partition的順序對buffer中的數據進行排序

? sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);

? int spindex = kvstart;

? InMemValBytes value = new InMemValBytes();

? //依次一個一個parition的寫入文件

? for (int i = 0; i < partitions; ++i) {

??? IFile.Writer<K, V> writer = null;

??? long segmentStart = out.getPos();

??? writer = new Writer<K, V>(job, out, keyClass, valClass, codec);

??? //如果combiner為空,則直接寫入文件

??? if (null == combinerClass) {

??????? ……

??????? writer.append(key, value);

??????? ++spindex;

???? }

???? else {

??????? ……

??????? //如果combiner不為空,則先combine,調用combiner.reduce(…)函數后再寫入文件

??????? combineAndSpill(kvIter, combineInputCounter);

???? }

? }

? ……

}

當map階段結束的時候,MapOutputBuffer的flush函數會被調用,其也會調用sortAndSpill將buffer中的寫入文件,然后再調用mergeParts來合并寫入在硬盤上的多個spill:

private void mergeParts() throws IOException {

??? ……

??? //對于每一個partition

??? for (int parts = 0; parts < partitions; parts++){

????? //create the segments to be merged

????? List<Segment<K, V>> segmentList =

??????? new ArrayList<Segment<K, V>>(numSpills);

????? TaskAttemptID mapId = getTaskID();

?????? //依次從各個spill文件中收集屬于當前partition的段

????? for(int i = 0; i < numSpills; i++) {

??????? final IndexRecord indexRecord =

????????? getIndexInformation(mapId, i, parts);

??????? long segmentOffset = indexRecord.startOffset;

??????? long segmentLength = indexRecord.partLength;

??????? Segment<K, V> s =

????????? new Segment<K, V>(job, rfs, filename[i], segmentOffset,

??????????????????????????? segmentLength, codec, true);

??????? segmentList.add(i, s);

????? }

????? //將屬于同一個partition的段merge到一起

????? RawKeyValueIterator kvIter =

??????? Merger.merge(job, rfs,

???????????????????? keyClass, valClass,

???????????????????? segmentList, job.getInt("io.sort.factor", 100),

???????????????????? new Path(getTaskID().toString()),

???????????????????? job.getOutputKeyComparator(), reporter);

????? //寫入合并后的段到文件

????? long segmentStart = finalOut.getPos();

????? Writer<K, V> writer =

????????? new Writer<K, V>(job, finalOut, keyClass, valClass, codec);

????? if (null == combinerClass || numSpills < minSpillsForCombine) {

??????? Merger.writeFile(kvIter, writer, reporter, job);

????? } else {

??????? combineCollector.setWriter(writer);

??????? combineAndSpill(kvIter, combineInputCounter);

????? }

????? ……

??? }

}

6.2、ReduceTask

ReduceTask的run函數如下:

public void run(JobConf job, final TaskUmbilicalProtocol umbilical)

? throws IOException {

? job.setBoolean("mapred.skip.on", isSkipping());

? //對于reduce,則包含三個步驟:拷貝,排序,Reduce

? if (isMapOrReduce()) {

??? copyPhase = getProgress().addPhase("copy");

??? sortPhase? = getProgress().addPhase("sort");

??? reducePhase = getProgress().addPhase("reduce");

? }

? startCommunicationThread(umbilical);

? final Reporter reporter = getReporter(umbilical);

? initialize(job, reporter);

? //copy階段,主要使用ReduceCopier的fetchOutputs函數獲得map的輸出。創建多個線程MapOutputCopier,其中copyOutput進行拷貝。

? boolean isLocal = "local".equals(job.get("mapred.job.tracker", "local"));

? if (!isLocal) {

??? reduceCopier = new ReduceCopier(umbilical, job);

??? if (!reduceCopier.fetchOutputs()) {

??????? ……

??? }

? }

? copyPhase.complete();

? //sort階段,將得到的map輸出合并,直到文件數小于io.sort.factor時停止,返回一個Iterator用于訪問key-value

? setPhase(TaskStatus.Phase.SORT);

? statusUpdate(umbilical);

? final FileSystem rfs = FileSystem.getLocal(job).getRaw();

? RawKeyValueIterator rIter = isLocal

??? ? Merger.merge(job, rfs, job.getMapOutputKeyClass(),

??????? job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),

??????? !conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),

??????? new Path(getTaskID().toString()), job.getOutputKeyComparator(),

??????? reporter)

??? : reduceCopier.createKVIterator(job, rfs, reporter);

? mapOutputFilesOnDisk.clear();

? sortPhase.complete();

? //reduce階段

? setPhase(TaskStatus.Phase.REDUCE);

? ……

? Reducer reducer = ReflectionUtils.newInstance(job.getReducerClass(), job);

? Class keyClass = job.getMapOutputKeyClass();

? Class valClass = job.getMapOutputValueClass();

? ReduceValuesIterator values = isSkipping() ?

???? new SkippingReduceValuesIterator(rIter,

????????? job.getOutputValueGroupingComparator(), keyClass, valClass,

????????? job, reporter, umbilical) :

????? new ReduceValuesIterator(rIter,

????? job.getOutputValueGroupingComparator(), keyClass, valClass,

????? job, reporter);

? //逐個讀出key-value list,然后調用Reducer的reduce函數

? while (values.more()) {

??? reduceInputKeyCounter.increment(1);

??? reducer.reduce(values.getKey(), values, collector, reporter);

??? values.nextKey();

??? values.informReduceProgress();

? }

? reducer.close();

? out.close(reporter);

? done(umbilical);

}

?

七、總結

Map-Reduce的過程總結如下圖:

總結

以上是生活随笔為你收集整理的Map-Reduce的过程解析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。