mapreduce框架详解
開(kāi)始聊mapreduce,mapreduce是hadoop的計(jì)算框架,我學(xué)hadoop是從hive開(kāi)始入手,再到hdfs,當(dāng)我學(xué)習(xí)hdfs時(shí)候,就感覺(jué)到hdfs和mapreduce關(guān)系的緊密。這個(gè)可能是我做技術(shù)研究的思路有關(guān),我開(kāi)始學(xué)習(xí)某一套技術(shù)總是想著這套技術(shù)到底能干什么,只有當(dāng)我真正理解了這套技術(shù)解決了什么問(wèn)題時(shí)候,我后續(xù)的學(xué)習(xí)就能逐步的加快,而學(xué)習(xí)hdfs時(shí)候我就發(fā)現(xiàn),要理解hadoop框架的意義,hdfs和mapreduce是密不可分,所以當(dāng)我寫(xiě)分布式文件系統(tǒng)時(shí)候,總是感覺(jué)自己的理解膚淺,今天我開(kāi)始寫(xiě)mapreduce了,今天寫(xiě)文章時(shí)候比上周要進(jìn)步多,不過(guò)到底能不能寫(xiě)好本文了,只有試試再說(shuō)了。
Mapreduce初析
Mapreduce是一個(gè)計(jì)算框架,既然是做計(jì)算的框架,那么表現(xiàn)形式就是有個(gè)輸入(input),mapreduce操作這個(gè)輸入(input),通過(guò)本身定義好的計(jì)算模型,得到一個(gè)輸出(output),這個(gè)輸出就是我們所需要的結(jié)果。
我們要學(xué)習(xí)的就是這個(gè)計(jì)算模型的運(yùn)行規(guī)則。在運(yùn)行一個(gè)mapreduce計(jì)算任務(wù)時(shí)候,任務(wù)過(guò)程被分為兩個(gè)階段:map階段和reduce階段,每個(gè)階段都是用鍵值對(duì)(key/value)作為輸入(input)和輸出(output)。而程序員要做的就是定義好這兩個(gè)階段的函數(shù):map函數(shù)和reduce函數(shù)。
Mapreduce的基礎(chǔ)實(shí)例
講解mapreduce運(yùn)行原理前,首先我們看看mapreduce里的hello world實(shí)例WordCount,這個(gè)實(shí)例在任何一個(gè)版本的hadoop安裝程序里都會(huì)有,大家很容易找到,這里我還是貼出代碼,便于我后面的講解,代碼如下:
| /** ?* Licensed to the Apache Software Foundation (ASF) under one ?* or more contributor license agreements.? See the NOTICE file ?* distributed with this work for additional information ?* regarding copyright ownership.? The ASF licenses this file ?* to you under the Apache License, Version 2.0 (the ?* "License"); you may not use this file except in compliance ?* with the License.? You may obtain a copy of the License at ?* ?*???? http://www.apache.org/licenses/LICENSE-2.0 ?* ?* Unless required by applicable law or agreed to in writing, software ?* distributed under the License is distributed on an "AS IS" BASIS, ?* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ?* See the License for the specific language governing permissions and ?* limitations under the License. ?*/ package?org.apache.hadoop.examples; import?java.io.IOException; import?java.util.StringTokenizer; import?org.apache.hadoop.conf.Configuration; import?org.apache.hadoop.fs.Path; import?org.apache.hadoop.io.IntWritable; import?org.apache.hadoop.io.Text; import?org.apache.hadoop.mapreduce.Job; import?org.apache.hadoop.mapreduce.Mapper; import?org.apache.hadoop.mapreduce.Reducer; import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import?org.apache.hadoop.util.GenericOptionsParser; public?class?WordCount { ??public?static?class?TokenizerMapper ???????extends?Mapper<Object, Text, Text, IntWritable>{ ????? ????private?final?static?IntWritable one = new?IntWritable(1); ????private?Text word = new?Text(); ??????? ????public?void?map(Object key, Text value, Context context ????????????????????) throws?IOException, InterruptedException { ??????StringTokenizer itr = new?StringTokenizer(value.toString()); ??????while?(itr.hasMoreTokens()) { ????????word.set(itr.nextToken()); ????????context.write(word, one); ??????} ????} ??} ??? ??public?static?class?IntSumReducer ???????extends?Reducer<Text,IntWritable,Text,IntWritable> { ????private?IntWritable result = new?IntWritable(); ????public?void?reduce(Text key, Iterable<IntWritable> values, ???????????????????????Context context ???????????????????????) throws?IOException, InterruptedException { ??????int?sum = 0; ??????for?(IntWritable val : values) { ????????sum += val.get(); ??????} ??????result.set(sum); ??????context.write(key, result); ????} ??} ??public?static?void?main(String[] args) throws?Exception { ????Configuration conf = new?Configuration(); ????String[] otherArgs = new?GenericOptionsParser(conf, args).getRemainingArgs(); ????if?(otherArgs.length != 2) { ??????System.err.println("Usage: wordcount <in> <out>"); ??????System.exit(2); ????} ????Job job = new?Job(conf, "word count"); ????job.setJarByClass(WordCount.class); ????job.setMapperClass(TokenizerMapper.class); ????job.setCombinerClass(IntSumReducer.class); ????job.setReducerClass(IntSumReducer.class); ????job.setOutputKeyClass(Text.class); ????job.setOutputValueClass(IntWritable.class); ????FileInputFormat.addInputPath(job, new?Path(otherArgs[0])); ????FileOutputFormat.setOutputPath(job, new?Path(otherArgs[1])); ????System.exit(job.waitForCompletion(true) ? 0?: 1); ??} } |
?
如何運(yùn)行它,這里不做累述了,大伙可以百度下,網(wǎng)上這方面的資料很多。這里的實(shí)例代碼是使用新的api,大家可能在很多書(shū)籍里看到講解mapreduce的WordCount實(shí)例都是老版本的api,這里我不給出老版本的api,因?yàn)槔习姹镜腶pi不太建議使用了,大家做開(kāi)發(fā)最好使用新版本的api,新版本api和舊版本api有區(qū)別在哪里:
其他還有很多區(qū)別,都是說(shuō)明新版本api的優(yōu)勢(shì),因?yàn)槲姨岢褂眯掳鎍pi,這里就不講這些,因?yàn)闆](méi)必要再用舊版本,因此這種比較也沒(méi)啥意義了。
下面我對(duì)代碼做簡(jiǎn)單的講解,大家看到要寫(xiě)一個(gè)mapreduce程序,我們的實(shí)現(xiàn)一個(gè)map函數(shù)和reduce函數(shù)。我們看看map的方法:
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {…}這里有三個(gè)參數(shù),前面兩個(gè)Object key, Text value就是輸入的key和value,第三個(gè)參數(shù)Context context這是可以記錄輸入的key和value,例如:context.write(word, one);此外context還會(huì)記錄map運(yùn)算的狀態(tài)。
對(duì)于reduce函數(shù)的方法:
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {…}reduce函數(shù)的輸入也是一個(gè)key/value的形式,不過(guò)它的value是一個(gè)迭代器的形式Iterable<IntWritable> values,也就是說(shuō)reduce的輸入是一個(gè)key對(duì)應(yīng)一組的值的value,reduce也有context和map的context作用一致。
至于計(jì)算的邏輯就是程序員自己去實(shí)現(xiàn)了。
下面就是main函數(shù)的調(diào)用了,這個(gè)我要詳細(xì)講述下,首先是:
Configuration conf = new Configuration();運(yùn)行mapreduce程序前都要初始化Configuration,該類(lèi)主要是讀取mapreduce系統(tǒng)配置信息,這些信息包括hdfs還有mapreduce,也就是安裝hadoop時(shí)候的配置文件例如:core-site.xml、hdfs-site.xml和mapred-site.xml等等文件里的信息,有些童鞋不理解為啥要這么做,這個(gè)是沒(méi)有深入思考mapreduce計(jì)算框架造成,我們程序員開(kāi)發(fā)mapreduce時(shí)候只是在填空,在map函數(shù)和reduce函數(shù)里編寫(xiě)實(shí)際進(jìn)行的業(yè)務(wù)邏輯,其它的工作都是交給mapreduce框架自己操作的,但是至少我們要告訴它怎么操作啊,比如hdfs在哪里啊,mapreduce的jobstracker在哪里啊,而這些信息就在conf包下的配置文件里。
接下來(lái)的代碼是:
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: wordcount <in> <out>");System.exit(2);}If的語(yǔ)句好理解,就是運(yùn)行WordCount程序時(shí)候一定是兩個(gè)參數(shù),如果不是就會(huì)報(bào)錯(cuò)退出。至于第一句里的GenericOptionsParser類(lèi),它是用來(lái)解釋常用hadoop命令,并根據(jù)需要為Configuration對(duì)象設(shè)置相應(yīng)的值,其實(shí)平時(shí)開(kāi)發(fā)里我們不太常用它,而是讓類(lèi)實(shí)現(xiàn)Tool接口,然后再main函數(shù)里使用ToolRunner運(yùn)行程序,而ToolRunner內(nèi)部會(huì)調(diào)用GenericOptionsParser。
接下來(lái)的代碼是:
Job job = new Job(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);第一行就是在構(gòu)建一個(gè)job,在mapreduce框架里一個(gè)mapreduce任務(wù)也叫mapreduce作業(yè)也叫做一個(gè)mapreduce的job,而具體的map和reduce運(yùn)算就是task了,這里我們構(gòu)建一個(gè)job,構(gòu)建時(shí)候有兩個(gè)參數(shù),一個(gè)是conf這個(gè)就不累述了,一個(gè)是這個(gè)job的名稱(chēng)。
第二行就是裝載程序員編寫(xiě)好的計(jì)算程序,例如我們的程序類(lèi)名就是WordCount了。這里我要做下糾正,雖然我們編寫(xiě)mapreduce程序只需要實(shí)現(xiàn)map函數(shù)和reduce函數(shù),但是實(shí)際開(kāi)發(fā)我們要實(shí)現(xiàn)三個(gè)類(lèi),第三個(gè)類(lèi)是為了配置mapreduce如何運(yùn)行map和reduce函數(shù),準(zhǔn)確的說(shuō)就是構(gòu)建一個(gè)mapreduce能執(zhí)行的job了,例如WordCount類(lèi)。
第三行和第五行就是裝載map函數(shù)和reduce函數(shù)實(shí)現(xiàn)類(lèi)了,這里多了個(gè)第四行,這個(gè)是裝載Combiner類(lèi),這個(gè)我后面講mapreduce運(yùn)行機(jī)制時(shí)候會(huì)講述,其實(shí)本例去掉第四行也沒(méi)有關(guān)系,但是使用了第四行理論上運(yùn)行效率會(huì)更好。
接下來(lái)的代碼:
job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);這個(gè)是定義輸出的key/value的類(lèi)型,也就是最終存儲(chǔ)在hdfs上結(jié)果文件的key/value的類(lèi)型。
最后的代碼是:
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);第一行就是構(gòu)建輸入的數(shù)據(jù)文件,第二行是構(gòu)建輸出的數(shù)據(jù)文件,最后一行如果job運(yùn)行成功了,我們的程序就會(huì)正常退出。FileInputFormat和FileOutputFormat是很有學(xué)問(wèn)的,我會(huì)在下面的mapreduce運(yùn)行機(jī)制里講解到它們。
好了,mapreduce里的hello word程序講解完畢,我這個(gè)講解是從新辦api進(jìn)行,這套講解在網(wǎng)絡(luò)上還是比較少的,應(yīng)該很具有代表性的。
Mapreduce運(yùn)行機(jī)制
下面我要講講mapreduce的運(yùn)行機(jī)制了,前不久我為公司出了一套hadoop面試題,里面就問(wèn)道了mapreduce運(yùn)行機(jī)制,出題時(shí)候我發(fā)現(xiàn)這個(gè)問(wèn)題我自己似乎也將不太清楚,因此最近幾天惡補(bǔ)了下,希望在本文里能說(shuō)清楚這個(gè)問(wèn)題。
下面我貼出幾張圖,這些圖都是我在百度圖片里找到的比較好的圖片:
圖片一:
圖片二:
圖片三:
圖片四:
圖片五:
圖片六:
我現(xiàn)在學(xué)習(xí)技術(shù)很喜歡看圖,每次有了新理解就會(huì)去看看圖,每次都會(huì)有新的發(fā)現(xiàn)。
談mapreduce運(yùn)行機(jī)制,可以從很多不同的角度來(lái)描述,比如說(shuō)從mapreduce運(yùn)行流程來(lái)講解,也可以從計(jì)算模型的邏輯流程來(lái)進(jìn)行講解,也許有些深入理解了mapreduce運(yùn)行機(jī)制還會(huì)從更好的角度來(lái)描述,但是將mapreduce運(yùn)行機(jī)制有些東西是避免不了的,就是一個(gè)個(gè)參入的實(shí)例對(duì)象,一個(gè)就是計(jì)算模型的邏輯定義階段,我這里講解不從什么流程出發(fā),就從這些一個(gè)個(gè)牽涉的對(duì)象,不管是物理實(shí)體還是邏輯實(shí)體。
首先講講物理實(shí)體,參入mapreduce作業(yè)執(zhí)行涉及4個(gè)獨(dú)立的實(shí)體:
那么mapreduce到底是如何運(yùn)行的呢?
?
首先是客戶(hù)端要編寫(xiě)好mapreduce程序,配置好mapreduce的作業(yè)也就是job,接下來(lái)就是提交job了,提交job是提交到JobTracker上的,這個(gè)時(shí)候JobTracker就會(huì)構(gòu)建這個(gè)job,具體就是分配一個(gè)新的job任務(wù)的ID值,接下來(lái)它會(huì)做檢查操作,這個(gè)檢查就是確定輸出目錄是否存在,如果存在那么job就不能正常運(yùn)行下去,JobTracker會(huì)拋出錯(cuò)誤給客戶(hù)端,接下來(lái)還要檢查輸入目錄是否存在,如果不存在同樣拋出錯(cuò)誤,如果存在JobTracker會(huì)根據(jù)輸入計(jì)算輸入分片(Input Split),如果分片計(jì)算不出來(lái)也會(huì)拋出錯(cuò)誤,至于輸入分片我后面會(huì)做講解的,這些都做好了JobTracker就會(huì)配置Job需要的資源了。分配好資源后,JobTracker就會(huì)初始化作業(yè),初始化主要做的是將Job放入一個(gè)內(nèi)部的隊(duì)列,讓配置好的作業(yè)調(diào)度器能調(diào)度到這個(gè)作業(yè),作業(yè)調(diào)度器會(huì)初始化這個(gè)job,初始化就是創(chuàng)建一個(gè)正在運(yùn)行的job對(duì)象(封裝任務(wù)和記錄信息),以便JobTracker跟蹤job的狀態(tài)和進(jìn)程。初始化完畢后,作業(yè)調(diào)度器會(huì)獲取輸入分片信息(input split),每個(gè)分片創(chuàng)建一個(gè)map任務(wù)。接下來(lái)就是任務(wù)分配了,這個(gè)時(shí)候tasktracker會(huì)運(yùn)行一個(gè)簡(jiǎn)單的循環(huán)機(jī)制定期發(fā)送心跳給jobtracker,心跳間隔是5秒,程序員可以配置這個(gè)時(shí)間,心跳就是jobtracker和tasktracker溝通的橋梁,通過(guò)心跳,jobtracker可以監(jiān)控tasktracker是否存活,也可以獲取tasktracker處理的狀態(tài)和問(wèn)題,同時(shí)tasktracker也可以通過(guò)心跳里的返回值獲取jobtracker給它的操作指令。任務(wù)分配好后就是執(zhí)行任務(wù)了。在任務(wù)執(zhí)行時(shí)候jobtracker可以通過(guò)心跳機(jī)制監(jiān)控tasktracker的狀態(tài)和進(jìn)度,同時(shí)也能計(jì)算出整個(gè)job的狀態(tài)和進(jìn)度,而tasktracker也可以本地監(jiān)控自己的狀態(tài)和進(jìn)度。當(dāng)jobtracker獲得了最后一個(gè)完成指定任務(wù)的tasktracker操作成功的通知時(shí)候,jobtracker會(huì)把整個(gè)job狀態(tài)置為成功,然后當(dāng)客戶(hù)端查詢(xún)job運(yùn)行狀態(tài)時(shí)候(注意:這個(gè)是異步操作),客戶(hù)端會(huì)查到j(luò)ob完成的通知的。如果job中途失敗,mapreduce也會(huì)有相應(yīng)機(jī)制處理,一般而言如果不是程序員程序本身有bug,mapreduce錯(cuò)誤處理機(jī)制都能保證提交的job能正常完成。
?
下面我從邏輯實(shí)體的角度講解mapreduce運(yùn)行機(jī)制,這些按照時(shí)間順序包括:輸入分片(input split)、map階段、combiner階段、shuffle階段和reduce階段。
Mapreduce的相關(guān)問(wèn)題
這里我要談?wù)勎覍W(xué)習(xí)mapreduce思考的一些問(wèn)題,都是我自己想出解釋的問(wèn)題,但是某些問(wèn)題到底對(duì)不對(duì),就要廣大童鞋幫我確認(rèn)了。
好了,文章寫(xiě)完了,呵呵,這篇我自己感覺(jué)寫(xiě)的不錯(cuò),是目前hadoop系列文章里寫(xiě)的最好的,我后面會(huì)再接再厲的。加油!!!
總結(jié)
以上是生活随笔為你收集整理的mapreduce框架详解的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Hadoop源码分析-Text
- 下一篇: 算法竞赛入门经典读书笔记(二)7.1简单