hadoop 学习笔记:mapreduce框架详解
開始聊mapreduce,mapreduce是hadoop的計算框架,我學hadoop是從hive開始入手,再到hdfs,當我學習hdfs時候,就感覺到hdfs和mapreduce關系的緊密。這個可能是我做技術研究的思路有關,我開始學習某一套技術總是想著這套技術到底能干什么,只有當我真正理解了這套技術解決了什么問題時候,我后續的學習就能逐步的加快,而學習hdfs時候我就發現,要理解hadoop框架的意義,hdfs和mapreduce是密不可分,所以當我寫分布式文件系統時候,總是感覺自己的理解膚淺,今天我開始寫mapreduce了,今天寫文章時候比上周要進步多,不過到底能不能寫好本文了,只有試試再說了。
Mapreduce初析
Mapreduce是一個計算框架,既然是做計算的框架,那么表現形式就是有個輸入(input),mapreduce操作這個輸入(input),通過本身定義好的計算模型,得到一個輸出(output),這個輸出就是我們所需要的結果。
我們要學習的就是這個計算模型的運行規則。在運行一個mapreduce計算任務時候,任務過程被分為兩個階段:map階段和reduce階段,每個階段都是用鍵值對(key/value)作為輸入(input)和輸出(output)。而程序員要做的就是定義好這兩個階段的函數:map函數和reduce函數。
Mapreduce的基礎實例
講解mapreduce運行原理前,首先我們看看mapreduce里的hello world實例WordCount,這個實例在任何一個版本的hadoop安裝程序里都會有,大家很容易找到,這里我還是貼出代碼,便于我后面的講解,代碼如下:
/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/ package org.apache.hadoop.examples;import java.io.IOException; import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser;public class WordCount {public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(word, one);}}}public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: wordcount <in> <out>");System.exit(2);}Job job = new Job(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);} }如何運行它,這里不做累述了,大伙可以百度下,網上這方面的資料很多。這里的實例代碼是使用新的api,大家可能在很多書籍里看到講解mapreduce的WordCount實例都是老版本的api,這里我不給出老版本的api,因為老版本的api不太建議使用了,大家做開發最好使用新版本的api,新版本api和舊版本api有區別在哪里:
- 新的api放在:org.apache.hadoop.mapreduce,舊版api放在:org.apache.hadoop.mapred
- 新版api使用虛類,而舊版的使用的是接口,虛類更加利于擴展,這個是一個經驗,大家可以好好學習下hadoop的這個經驗。
其他還有很多區別,都是說明新版本api的優勢,因為我提倡使用新版api,這里就不講這些,因為沒必要再用舊版本,因此這種比較也沒啥意義了。
下面我對代碼做簡單的講解,大家看到要寫一個mapreduce程序,我們的實現一個map函數和reduce函數。我們看看map的方法:
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {…}這里有三個參數,前面兩個Object key, Text value就是輸入的key和value,第三個參數Context context這是可以記錄輸入的key和value,例如:context.write(word, one);此外context還會記錄map運算的狀態。
對于reduce函數的方法:
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {…}reduce函數的輸入也是一個key/value的形式,不過它的value是一個迭代器的形式Iterable values,也就是說reduce的輸入是一個key對應一組的值的value,reduce也有context和map的context作用一致。
至于計算的邏輯就是程序員自己去實現了。
下面就是main函數的調用了,這個我要詳細講述下,首先是:
Configuration conf = new Configuration();運行mapreduce程序前都要初始化Configuration,該類主要是讀取mapreduce系統配置信息,這些信息包括hdfs還有mapreduce,也就是安裝hadoop時候的配置文件例如:core-site.xml、hdfs-site.xml和mapred-site.xml等等文件里的信息,有些童鞋不理解為啥要這么做,這個是沒有深入思考mapreduce計算框架造成,我們程序員開發mapreduce時候只是在填空,在map函數和reduce函數里編寫實際進行的業務邏輯,其它的工作都是交給mapreduce框架自己操作的,但是至少我們要告訴它怎么操作啊,比如hdfs在哪里啊,mapreduce的jobstracker在哪里啊,而這些信息就在conf包下的配置文件里。
接下來的代碼是:
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: wordcount <in> <out>");System.exit(2);}If的語句好理解,就是運行WordCount程序時候一定是兩個參數,如果不是就會報錯退出。至于第一句里的GenericOptionsParser類,它是用來解釋常用hadoop命令,并根據需要為Configuration對象設置相應的值,其實平時開發里我們不太常用它,而是讓類實現Tool接口,然后再main函數里使用ToolRunner運行程序,而ToolRunner內部會調用GenericOptionsParser。
接下來的代碼是:
Job job = new Job(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);第一行就是在構建一個job,在mapreduce框架里一個mapreduce任務也叫mapreduce作業也叫做一個mapreduce的job,而具體的map和reduce運算就是task了,這里我們構建一個job,構建時候有兩個參數,一個是conf這個就不累述了,一個是這個job的名稱。
第二行就是裝載程序員編寫好的計算程序,例如我們的程序類名就是WordCount了。這里我要做下糾正,雖然我們編寫mapreduce程序只需要實現map函數和reduce函數,但是實際開發我們要實現三個類,第三個類是為了配置mapreduce如何運行map和reduce函數,準確的說就是構建一個mapreduce能執行的job了,例如WordCount類。
第三行和第五行就是裝載map函數和reduce函數實現類了,這里多了個第四行,這個是裝載Combiner類,這個我后面講mapreduce運行機制時候會講述,其實本例去掉第四行也沒有關系,但是使用了第四行理論上運行效率會更好。
接下來的代碼:
job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);這個是定義輸出的key/value的類型,也就是最終存儲在hdfs上結果文件的key/value的類型。
最后的代碼是:
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);第一行就是構建輸入的數據文件,第二行是構建輸出的數據文件,最后一行如果job運行成功了,我們的程序就會正常退出。FileInputFormat和FileOutputFormat是很有學問的,我會在下面的mapreduce運行機制里講解到它們。
好了,mapreduce里的hello word程序講解完畢,我這個講解是從新辦api進行,這套講解在網絡上還是比較少的,應該很具有代表性的。
Mapreduce運行機制
下面我要講講mapreduce的運行機制了,前不久我為公司出了一套hadoop面試題,里面就問道了mapreduce運行機制,出題時候我發現這個問題我自己似乎也將不太清楚,因此最近幾天惡補了下,希望在本文里能說清楚這個問題。
下面我貼出幾張圖,這些圖都是我在百度圖片里找到的比較好的圖片:
圖片一:
圖片二:
圖片三:
圖片四:
圖片五:
圖片六:
??我現在學習技術很喜歡看圖,每次有了新理解就會去看看圖,每次都會有新的發現。
談mapreduce運行機制,可以從很多不同的角度來描述,比如說從mapreduce運行流程來講解,也可以從計算模型的邏輯流程來進行講解,也許有些深入理解了mapreduce運行機制還會從更好的角度來描述,但是將mapreduce運行機制有些東西是避免不了的,就是一個個參入的實例對象,一個就是計算模型的邏輯定義階段,我這里講解不從什么流程出發,就從這些一個個牽涉的對象,不管是物理實體還是邏輯實體。
首先講講物理實體,參入mapreduce作業執行涉及4個獨立的實體:
那么mapreduce到底是如何運行的呢?
首先是客戶端要編寫好mapreduce程序,配置好mapreduce的作業也就是job,接下來就是提交job了,提交job是提交到JobTracker上的,這個時候JobTracker就會構建這個job,具體就是分配一個新的job任務的ID值,接下來它會做檢查操作,這個檢查就是確定輸出目錄是否存在,如果存在那么job就不能正常運行下去,JobTracker會拋出錯誤給客戶端,接下來還要檢查輸入目錄是否存在,如果不存在同樣拋出錯誤,如果存在JobTracker會根據輸入計算輸入分片(Input Split),如果分片計算不出來也會拋出錯誤,至于輸入分片我后面會做講解的,這些都做好了JobTracker就會配置Job需要的資源了。分配好資源后,JobTracker就會初始化作業,初始化主要做的是將Job放入一個內部的隊列,讓配置好的作業調度器能調度到這個作業,作業調度器會初始化這個job,初始化就是創建一個正在運行的job對象(封裝任務和記錄信息),以便JobTracker跟蹤job的狀態和進程。初始化完畢后,作業調度器會獲取輸入分片信息(input split),每個分片創建一個map任務。接下來就是任務分配了,這個時候tasktracker會運行一個簡單的循環機制定期發送心跳給jobtracker,心跳間隔是5秒,程序員可以配置這個時間,心跳就是jobtracker和tasktracker溝通的橋梁,通過心跳,jobtracker可以監控tasktracker是否存活,也可以獲取tasktracker處理的狀態和問題,同時tasktracker也可以通過心跳里的返回值獲取jobtracker給它的操作指令。任務分配好后就是執行任務了。在任務執行時候jobtracker可以通過心跳機制監控tasktracker的狀態和進度,同時也能計算出整個job的狀態和進度,而tasktracker也可以本地監控自己的狀態和進度。當jobtracker獲得了最后一個完成指定任務的tasktracker操作成功的通知時候,jobtracker會把整個job狀態置為成功,然后當客戶端查詢job運行狀態時候(注意:這個是異步操作),客戶端會查到job完成的通知的。如果job中途失敗,mapreduce也會有相應機制處理,一般而言如果不是程序員程序本身有bug,mapreduce錯誤處理機制都能保證提交的job能正常完成。
下面我從邏輯實體的角度講解mapreduce運行機制,這些按照時間順序包括:輸入分片(input split)、map階段、combiner階段、shuffle階段和reduce階段。
reduce階段:和map函數一樣也是程序員編寫的,最終結果是存儲在hdfs上的。
Mapreduce的相關問題
這里我要談談我學習mapreduce思考的一些問題,都是我自己想出解釋的問題,但是某些問題到底對不對,就要廣大童鞋幫我確認了。
好了,文章寫完了,呵呵,這篇我自己感覺寫的不錯,是目前hadoop系列文章里寫的最好的,我后面會再接再厲的。加油!!!
轉載于:https://www.cnblogs.com/treasure716/p/9669895.html
與50位技術專家面對面20年技術見證,附贈技術全景圖總結
以上是生活随笔為你收集整理的hadoop 学习笔记:mapreduce框架详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 灯鹭的简单开放,促进网站一举多赢
- 下一篇: celery delay 没反应