日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Hadoop学习笔记—4.初识MapReduce

發(fā)布時間:2025/3/21 编程问答 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hadoop学习笔记—4.初识MapReduce 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

一、神馬是高大上的MapReduce

MapReduce是Google的一項重要技術,它首先是一個 編程模型 ,用以進行大數(shù)據(jù)量的計算。對于大 數(shù)據(jù)量的計算,通常采用的處理手法就是并行計算。但對許多開發(fā)者來說,自己完完全全實現(xiàn)一個并行計算程序難度太大,而MapReduce就是一種簡化并行 計算的編程模型,它使得那些沒有多有多少并行計算經(jīng)驗的開發(fā)人員也可以開發(fā)并行應用程序。這也就是MapReduce的價值所在, 通過簡化編程模型,降低了開發(fā)并行應用的入門門檻 。

1.1 MapReduce是什么

Hadoop MapReduce是一個軟件框架,基于該框架能夠容易地編寫應用程序,這些應用程序能夠運行在由上千個商用機器組成的大集群上,并以一種可靠的,具有容 錯能力的方式并行地處理上TB級別的海量數(shù)據(jù)集。這個定義里面有著這些關鍵詞,一是軟件框架,二是并行處理,三是可靠且容錯,四是大規(guī)模集群,五是海量數(shù) 據(jù)集。

因此,對于MapReduce,可以簡潔地認為,它是一個軟件框架,海量數(shù)據(jù)是它的“菜”,它在大規(guī)模集群上以一種可靠且容錯的方式并行地“烹飪這道菜”。

1.2 MapReduce做什么

簡單地講,MapReduce可以做 大數(shù)據(jù)處理 。所謂大數(shù)據(jù)處理,即以價值為導向,對大數(shù)據(jù)加工、挖掘和優(yōu)化等各種處理。

MapReduce擅長處理大數(shù)據(jù),它為什么具有這種能力呢?這可由MapReduce的設計思想發(fā)覺。MapReduce的思想就是“ 分而治之 ”。

(1)Mapper負責“分”,即把復雜的任務分解為若干個“簡單的任務”來處理?!昂唵蔚娜蝿铡卑龑雍x:一是數(shù)據(jù)或計算的規(guī)模相對原任 務要大大縮小;二是就近計算原則,即任務會分配到存放著所需數(shù)據(jù)的節(jié)點上進行計算;三是這些小任務可以并行計算,彼此間幾乎沒有依賴關系。

(2)Reducer負責對map階段的結果進行匯總。至于需要多少個Reducer,用戶可以根據(jù)具體問題,通過在mapred-site.xml配置文件里設置參數(shù)mapred.reduce.tasks的值,缺省值為1。

一個比較形象的語言解釋MapReduce:

We want to count all the books in the library. You count up shelf #1, I count up shelf #2. That’s map. The more people we get, the faster it goes.

我們要數(shù)圖書館中的所有書。你數(shù)1號書架,我數(shù)2號書架。這就是“ Map ”。我們?nèi)嗽蕉?#xff0c;數(shù)書就更快。

Now we get together and add our individual counts. That’s reduce.

現(xiàn)在我們到一起,把所有人的統(tǒng)計數(shù)加在一起。這就是“ Reduce ”。

1.3 MapReduce工作機制

MapReduce的整個工作過程如上圖所示,它包含如下4個獨立的實體:

實體一: 客戶端 ,用來提交MapReduce作業(yè)。

實體二: JobTracker ,用來協(xié)調(diào)作業(yè)的運行。

實體三: TaskTracker ,用來處理作業(yè)劃分后的任務。

實體四: HDFS ,用來在其它實體間共享作業(yè)文件。

通過審閱MapReduce的工作流程圖,可以看出MapReduce整個工作過程有序地包含如下工作環(huán)節(jié):

二、Hadoop中的MapReduce框架

在Hadoop中,一個MapReduce作業(yè)通常會把輸入的數(shù)據(jù)集切分為若干獨立的數(shù)據(jù)塊,由Map任務以完全并行的方式去處理它們??蚣軙?對Map的輸出先進行排序,然后把結果輸入給Reduce任務。通常作業(yè)的輸入和輸出都會被存儲在文件系統(tǒng)中,整個框架負責任務的調(diào)度和監(jiān)控,以及重新執(zhí) 行已經(jīng)關閉的任務。

通常,MapReduce框架和分布式文件系統(tǒng)是運行在一組相同的節(jié)點上,也就是說,計算節(jié)點和存儲節(jié)點通常都是在一起的。這種配置允許框架在那些已經(jīng)存好數(shù)據(jù)的節(jié)點上高效地調(diào)度任務,這可以使得整個集群的網(wǎng)絡帶寬被非常高效地利用。

2.1 MapReduce框架的組成

(1)JobTracker

JobTracker負責調(diào)度構成一個作業(yè)的所有任務,這些任務分布在不同的TaskTracker上(由上圖的JobTracker可以看到 2 assign map 和 3 assign reduce)。你可以將其理解為公司的項目經(jīng)理,項目經(jīng)理接受項目需求,并劃分具體的任務給下面的開發(fā)工程師。

(2)TaskTracker

TaskTracker負責執(zhí)行由JobTracker指派的任務,這里我們就可以將其理解為開發(fā)工程師,完成項目經(jīng)理安排的開發(fā)任務即可。

2.2 MapReduce的輸入輸出

MapReduce框架運轉在 <key,value> 鍵值對上,也就是說,框架把作業(yè)的輸入看成是一組<key,value>鍵值對,同樣也產(chǎn)生一組<key,value>鍵值對作為作業(yè)的輸出,這兩組鍵值對有可能是不同的。

一個MapReduce作業(yè)的輸入和輸出類型如下圖所示:可以看出在整個流程中,會有三組<key,value>鍵值對類型的存在。

2.3 MapReduce的處理流程

這里以WordCount單詞計數(shù)為例,介紹map和reduce兩個階段需要進行哪些處理。單詞計數(shù)主要完成的功能是:統(tǒng)計一系列文本文件中每個單詞出現(xiàn)的次數(shù),如圖所示:

(1)map任務處理

(2)reduce任務處理

三、第一個MapReduce程序:WordCount

WordCount單詞計數(shù)是最簡單也是最能體現(xiàn)MapReduce思想的程序之一,該程序完整的代碼可以在Hadoop安裝包的src/examples目錄下找到。

WordCount單詞計數(shù)主要完成的功能是: 統(tǒng)計一系列文本文件中每個單詞出現(xiàn)的次數(shù)

3.1 初始化一個words.txt文件并上傳HDFS

首先在Linux中通過Vim編輯一個簡單的words.txt,其內(nèi)容很簡單如下所示:

Hello Edison Chou Hello Hadoop RPC Hello Wncud Chou Hello Hadoop MapReduce Hello Dick Gu

通過Shell命令將其上傳到一個指定目錄中,這里指定為:/testdir/input

3.2 自定義Map函數(shù)

在Hadoop 中,?map 函數(shù)位于內(nèi)置類org.apache.hadoop.mapreduce. Mapper <KEYIN,VALUEIN, KEYOUT, VALUEOUT>中,reduce 函數(shù)位于內(nèi)置類org.apache.hadoop. mapreduce. Reducer <KEYIN, VALUEIN, KEYOUT, VALUEOUT>中。

我們要做的就是 覆蓋map 函數(shù)和reduce 函數(shù) ,首先我們來覆蓋map函數(shù):繼承Mapper類并重寫map方法

/*** @author Edison Chou* @version 1.0* @param KEYIN* →k1 表示每一行的起始位置(偏移量offset)* @param VALUEIN* →v1 表示每一行的文本內(nèi)容* @param KEYOUT* →k2 表示每一行中的每個單詞* @param VALUEOUT* →v2 表示每一行中的每個單詞的出現(xiàn)次數(shù),固定值為1*/ public static class MyMapper extendsMapper<LongWritable, Text, Text, LongWritable> {protected void map(LongWritable key, Text value,Mapper<LongWritable, Text, Text, LongWritable>.Context context)throws java.io.IOException, InterruptedException {String[] spilted = value.toString().split(" ");for (String word : spilted) {context.write(new Text(word), new LongWritable(1L));}}; }

Mapper 類,有四個泛型,分別是KEYIN、VALUEIN、KEYOUT、VALUEOUT,前面兩個KEYIN、VALUEIN 指的是map 函數(shù)輸入的參數(shù)key、value 的類型;后面兩個KEYOUT、VALUEOUT 指的是map 函數(shù)輸出的key、value 的類型;

從代碼中可以看出,在Mapper類和Reducer類中都使用了Hadoop自帶的基本數(shù)據(jù)類型,例如String對應Text,long對應 LongWritable,int對應IntWritable。這是因為HDFS涉及到序列化的問題,Hadoop的基本數(shù)據(jù)類型都實現(xiàn)了一個 Writable接口,而實現(xiàn)了這個接口的類型都支持序列化。

這里的map函數(shù)中通過空格符號來分割文本內(nèi)容,并對其進行記錄;

3.3 自定義Reduce函數(shù)

現(xiàn)在我們來覆蓋reduce函數(shù):繼承Reducer類并重寫reduce方法

/*** @author Edison Chou* @version 1.0* @param KEYIN* →k2 表示每一行中的每個單詞* @param VALUEIN* →v2 表示每一行中的每個單詞的出現(xiàn)次數(shù),固定值為1* @param KEYOUT* →k3 表示每一行中的每個單詞* @param VALUEOUT* →v3 表示每一行中的每個單詞的出現(xiàn)次數(shù)之和*/ public static class MyReducer extendsReducer<Text, LongWritable, Text, LongWritable> {protected void reduce(Text key,java.lang.Iterable<LongWritable> values,Reducer<Text, LongWritable, Text, LongWritable>.Context context)throws java.io.IOException, InterruptedException {long count = 0L;for (LongWritable value : values) {count += value.get();}context.write(key, new LongWritable(count));}; }

Reducer 類,也有四個泛型,同理,分別指的是reduce 函數(shù)輸入的key、value類型(這里輸入的key、value類型通常和map的輸出key、value類型保持一致)和輸出的key、value 類型。

這里的reduce函數(shù)主要是將傳入的<k2,v2>進行最后的合并統(tǒng)計,形成最后的統(tǒng)計結果。

3.4 設置Main函數(shù)

(1)設定輸入目錄,當然也可以作為參數(shù)傳入

public static final String INPUT_PATH = "hdfs://hadoop-master:9000/testdir/input/words.txt";

(2)設定輸出目錄( 輸出目錄需要是空目錄 ),當然也可以作為參數(shù)傳入

public static final String OUTPUT_PATH = "hdfs://hadoop-master:9000/testdir/output/wordcount";

(3)Main函數(shù)的主要代碼

public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); // 0.0:首先刪除輸出路徑的已有生成文件 FileSystem fs = FileSystem.get(new URI(INPUT_PATH), conf); Path outPath = new Path(OUTPUT_PATH); if (fs.exists(outPath)) { fs.delete(outPath, true); } Job job = new Job(conf, "WordCount"); job.setJarByClass(MyWordCountJob.class); // 1.0:指定輸入目錄 FileInputFormat.setInputPaths(job, new Path(INPUT_PATH)); // 1.1:指定對輸入數(shù)據(jù)進行格式化處理的類(可以省略) job.setInputFormatClass(TextInputFormat.class); // 1.2:指定自定義的Mapper類 job.setMapperClass(MyMapper.class); // 1.3:指定map輸出的<K,V>類型(如果<k3,v3>的類型與<k2,v2>的類型一致則可以省略) job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 1.4:分區(qū)(可以省略) job.setPartitionerClass(HashPartitioner.class); // 1.5:設置要運行的Reducer的數(shù)量(可以省略) job.setNumReduceTasks(1); // 1.6:指定自定義的Reducer類 job.setReducerClass(MyReducer.class); // 1.7:指定reduce輸出的<K,V>類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 1.8:指定輸出目錄 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); // 1.9:指定對輸出數(shù)據(jù)進行格式化處理的類(可以省略) job.setOutputFormatClass(TextOutputFormat.class); // 2.0:提交作業(yè) boolean success = job.waitForCompletion(true); if (success) { System.out.println("Success"); System.exit(0); } else { System.out.println("Failed"); System.exit(1); } }

在Main函數(shù)中,主要做了三件事:一是指定輸入、輸出目錄;二是指定自定義的Mapper類和Reducer類;三是提交作業(yè);匆匆看下來,代碼有點多,但有些其實是可以省略的。

(4)完整代碼如下所示

View Code

3.5 運行吧小DEMO

(1)調(diào)試查看控制臺狀態(tài)信息

(2)通過Shell命令查看統(tǒng)計結果

四、使用ToolRunner類改寫WordCount

Hadoop有個ToolRunner類,它是個好東西,簡單好用。無論在《Hadoop權威指南》還是Hadoop項目源碼自帶的example,都推薦使用ToolRunner。

4.1 最初的寫法

下面我們看下src/example目錄下WordCount.java文件,它的代碼結構是這樣的:

public class WordCount {// 略...public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();// 略...Job job = new Job(conf, "word count");// 略...System.exit(job.waitForCompletion(true) ? 0 : 1);} }

WordCount.java中使用到了GenericOptionsParser這個類,它的作用是 將命令行中參數(shù)自動設置到變量conf中 。舉個例子,比如我希望通過命令行設置reduce task數(shù)量,就這么寫:

bin/hadoop jar MyJob.jar com.xxx.MyJobDriver -Dmapred.reduce.tasks=5

上面這樣就可以了,不需要將其硬編碼到java代碼中,很輕松就可以將參數(shù)與代碼分離開。

4.2 加入ToolRunner的寫法

至此,我們還沒有說到ToolRunner,上面的代碼我們使用了GenericOptionsParser幫我們解析命令行參數(shù),編寫 ToolRunner的程序員更懶,它將 GenericOptionsParser調(diào)用隱藏到自身run方法,被自動執(zhí)行了,修改后的代碼變成了這樣:

public class WordCount extends Configured implements Tool { @Override public int run(String[] arg0) throws Exception { Job job = new Job(getConf(), "word count"); // 略... System.exit(job.waitForCompletion(true) ? 0 : 1); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new WordCount(), args); System.exit(res); } }

看看這段代碼上有什么不同:

(1)讓WordCount 繼承Configured并實現(xiàn)Tool接口 。

(2) 重寫Tool接口的run方法 ,run方法不是static類型,這很好。

(3)在WordCount中我們將 通過getConf()獲取Configuration對象 。

可以看出,通過簡單的幾步,就可以實現(xiàn)代碼與配置隔離、上傳文件到DistributeCache等功能。修改MapReduce參數(shù)不需要修改java代碼、打包、部署,提高工作效率。

4.3 重寫WordCount程序

public class MyJob extends Configured implements Tool {public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws java.io.IOException, InterruptedException { ...... } };}public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { protected void reduce(Text key, java.lang.Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws java.io.IOException, InterruptedException { ...... };}// 輸入文件路徑public static final String INPUT_PATH = "hdfs://hadoop-master:9000/testdir/input/words.txt";// 輸出文件路徑public static final String OUTPUT_PATH = "hdfs://hadoop-master:9000/testdir/output/wordcount";@Overridepublic int run(String[] args) throws Exception { // 首先刪除輸出路徑的已有生成文件 FileSystem fs = FileSystem.get(new URI(INPUT_PATH), getConf()); Path outPath = new Path(OUTPUT_PATH); if (fs.exists(outPath)) { fs.delete(outPath, true); } Job job = new Job(getConf(), "WordCount"); // 設置輸入目錄 FileInputFormat.setInputPaths(job, new Path(INPUT_PATH)); // 設置自定義Mapper job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 設置自定義Reducer job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 設置輸出目錄 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); System.exit(job.waitForCompletion(true) ? 0 : 1); return 0;}public static void main(String[] args) { Configuration conf = new Configuration(); try { int res = ToolRunner.run(conf, new MyJob(), args); System.exit(res); } catch (Exception e) { e.printStackTrace(); }} }

推薦閱讀:

  • Hadoop學習筆記—4.初識MapReduce ?
  • Hadoop學習筆記—3.Hadoop RPC機制 ?
  • Hadoop學習筆記—2.不怕故障的海量 ?
  • Hadoop學習筆記—1.基礎概論與環(huán)境

作者:周旭龍

出處:http://edisonchou.cnblogs.com/

總結

以上是生活随笔為你收集整理的Hadoop学习笔记—4.初识MapReduce的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。