日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

MapReduce编程基础

發(fā)布時(shí)間:2025/3/21 编程问答 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 MapReduce编程基础 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

MapReduce編程基礎(chǔ)

1.?WordCount示例及MapReduce程序框架

2. ?MapReduce程序執(zhí)行流程

3. ?深入學(xué)習(xí)MapReduce編程(1)

4. 參考資料及代碼下載?

<1>. WordCount示例及MapReduce程序框架?

首先通過(guò)一個(gè)簡(jiǎn)單的程序來(lái)實(shí)際運(yùn)行一個(gè)MapReduce程序,然后通過(guò)這個(gè)程序我們來(lái)哦那個(gè)結(jié)一下MapReduce編程模型。

下載源程序:/Files/xuqiang/WordCount.rar,將該程序打包成wordcount.jar下面的命令,隨便寫(xiě)一個(gè)文本文件,這里是WordCountMrtrial,并上傳到hdfs上,這里的路徑是/tmp/WordCountMrtrial,運(yùn)行下面的命令:

?xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ ./bin/hadoop jar wordcount.jar WordCount /tmp/WordCountMrtrial /tmp/result

?如果該任務(wù)運(yùn)行完成之后,將在hdfs的/tmp/result目錄下生成類(lèi)似于這樣的結(jié)果:

?gentleman 11

get 12 give 8 go 6 good 9 government 16

運(yùn)行一個(gè)程序的基本上就是這樣一個(gè)過(guò)程,我們來(lái)看看具體程序:

main函數(shù)中首先生成一個(gè)Job對(duì)象,?Job job = new Job(conf, "word count");然后設(shè)置job的MapperClass,ReducerClass,設(shè)置輸入文件路徑FileInputFormat.addInputPath(job, new Path(otherArgs[0]));設(shè)置輸出文件路徑:FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));等待程序運(yùn)行完成:System.exit(job.waitForCompletion(true) ? 0 : 1);可以看出main中僅僅是啟動(dòng)了一個(gè)job,然后設(shè)置該job相關(guān)的參數(shù),具體實(shí)現(xiàn)MapReduce是mapper類(lèi)和reducer類(lèi)。

?TokenizerMapper類(lèi)中map函數(shù)將一行分割成<K2, V2>,然后IntSumReducer的reduce將<K2, list<V2>>轉(zhuǎn)換成最終結(jié)果<K3, V3>。

?

通過(guò)這個(gè)示例基本上也能總結(jié)出簡(jiǎn)單的MapReduce編程的模型:一個(gè)Mapper類(lèi),一個(gè)Reducer類(lèi),一個(gè)Driver類(lèi)。

?

<2>. MapReduce程序執(zhí)行流程?

?這里所描述的執(zhí)行流程更加注重是從程序的角度去理解,更加全面的流程可參考[這里]。

?

首先用戶指定待處理的文件,在WordCount就是文件WordCountMrtrial,這是hadoop根據(jù)設(shè)定的InputDataFormat來(lái)將輸入文件分割成一個(gè)record(key/value對(duì)),然后將這些record傳遞給map函數(shù),在WordCount示例中,對(duì)應(yīng)的record就是<line_number行號(hào), line_content該行內(nèi)容>;

然后map函數(shù)根據(jù)輸入的record,形成<K2, V2>,在WordCount示例中形成<K2, V2>就是<single_word, word_count>,例如<"a", 1>;

如果map過(guò)程完成之后,hadoop將這些生成的<K2, V2>按照K2進(jìn)行分組,形成<K2,list(V2) >,之后傳遞給reduce函數(shù),在該函數(shù)中最終得到程序的輸出結(jié)果<K3, V3>。

<3>. 深入學(xué)習(xí)MapReduce編程(1)

3.1 hadoop data types

由于在hadoop需要將key/value對(duì)序列化,然后通過(guò)網(wǎng)絡(luò)network發(fā)送到集群中的其他機(jī)器上,所以說(shuō)hadoop中的類(lèi)型需要能夠序列化。

具體而言,自定義的類(lèi)型,如果一個(gè)類(lèi)class實(shí)現(xiàn)了Writable interface的話,那么這個(gè)可以作為value類(lèi)型,如果一個(gè)class實(shí)現(xiàn)了WritableComparable<T> interface的話,那么這個(gè)class可以作為value類(lèi)型或者是key類(lèi)型。?

hadoop本身已經(jīng)實(shí)現(xiàn)了一些預(yù)定義的類(lèi)型predefined classes,并且這些類(lèi)型實(shí)現(xiàn)了WritableComparable<T>接口。

3.2 Mapper

如果一個(gè)類(lèi)想要成為一個(gè)mapper,那么該類(lèi)需要實(shí)現(xiàn)Mapper接口,同時(shí)繼承自MapReduceBase。在MapReduceBase類(lèi)中,兩個(gè)方法是特別需要注意的:

void configure( JobConf job):這個(gè)方法是在任務(wù)被運(yùn)行之前調(diào)用?

void close():在任務(wù)運(yùn)行完成之后調(diào)用

剩下的工作就是編寫(xiě)map方法,原型如下:

void map(Object key, Text value, Context context

?? ? ? ? ? ? ? ? ? ?) throws IOException, InterruptedException;

?這個(gè)方法根據(jù)<K1, V1>生成<K2, V2>,然后通過(guò)context輸出。

同樣的在hadoop中預(yù)先定義了如下的Mapper:

?

3.3 Reducer

如果一個(gè)類(lèi)想要成為Reducer的話,需要首先實(shí)現(xiàn)Reducer接口,然后需要繼承自MapReduceBase。

當(dāng)reducer接收從mapper傳遞而來(lái)的key/value對(duì),然后根據(jù)key來(lái)排序,分組,最終生成<K2, list<V2>> ,然后reducer根據(jù)<K2, list<V2>>生成<K3, V3>.

同樣在hadoop中預(yù)定義了一些Reducer:

?

3.4?Partitioner

?Partitioner的作用主要是將mapper運(yùn)行的結(jié)果“導(dǎo)向directing”到reducer。如果一個(gè)類(lèi)想要成為Partitioner,那么需要實(shí)現(xiàn)Partitioner接口,該接口繼承自JobConfigurable,定義如下:

public?interface?Partitioner<K2,?V2>?extends?JobConfigurable?{
??
/**?
???*?Get?the?paritition?number?for?a?given?key?(hence?record)?given?the?total?
???*?number?of?partitions?i.e.?number?of?reduce-tasks?for?the?job.
???*???
???*?<p>Typically?a?hash?function?on?a?all?or?a?subset?of?the?key.</p>
???*
???*?
@param?key?the?key?to?be?paritioned.
???*?
@param?value?the?entry?value.
???*?
@param?numPartitions?the?total?number?of?partitions.
???*?
@return?the?partition?number?for?the?<code>key</code>.
???
*/
??
int?getPartition(K2?key,?V2?value,?int?numPartitions);
}?

hadoop將根據(jù)方法getPartition的返回值確定將mapper的值發(fā)送到那個(gè)reducer上。返回值相同的key/value對(duì)將被“導(dǎo)向“至同一個(gè)reducer。

3.5 Input Data Format and Output Data Format

3.5.1 Input Data Format?

上面我們的假設(shè)是MapReduce程序的輸入是key/value對(duì),也就是<K1, V1>,但是實(shí)際上一般情況下MapReduce程序的輸入是big file的形式,那么如何將這個(gè)文件轉(zhuǎn)換成<K1, V1>,即file -> <K1, V1>。這就需要使用InputFormat接口了。?

下面是幾個(gè)常用InputFormat的實(shí)現(xiàn)類(lèi):

?

當(dāng)然除了使用hadoop預(yù)先定義的InputDataFormat之外,還可以自定義,這是需要實(shí)現(xiàn)InputFormat接口。該接口僅僅包含兩個(gè)方法:

?

InputSplit[] getSplits(JobConf job, int numSplits) throws ?IOException;該接口實(shí)現(xiàn)將大文件分割成小塊split。 RecordReader<K, V> getRecordReader(InputSplit split, JobConf job,?

?? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Reporter reporter) throws IOException;?

該方法輸入分割成的split,然后返回RecordReader,通過(guò)RecordReader來(lái)遍歷該split內(nèi)的record。?

3.5.2 Output Data Format

每個(gè)reducer將自己的輸出寫(xiě)入到結(jié)果文件中,這是使用output data format來(lái)配置輸出的文件的格式。hadoop預(yù)先實(shí)現(xiàn)了:

?

3.6 Streaming in Hadoop

3.6.1 執(zhí)行流程

我們知道在linux中存在所謂的“流”的概念,也就是說(shuō)我們可以使用下面的命令:

cat input.txt | RandomSample.py 10 >sampled_output.txt?

同樣在hadoop中我們也可以使用類(lèi)似的命令,顯然這樣能夠在很大程度上加快程序的開(kāi)發(fā)進(jìn)程。下面來(lái)看看hadoop中流執(zhí)行的過(guò)程:

?

hadoop streaming從標(biāo)磚輸入STDIN讀取數(shù)據(jù),默認(rèn)的情況下使用\t來(lái)分割每行,如果不存在\t的話,那么這時(shí)正行的內(nèi)容將被看作是key,而此時(shí)的value內(nèi)容為空;

然后調(diào)用mapper程序,輸出<K2, V2>;

之后,調(diào)用Partitioner來(lái)將<K2, V2>輸出到對(duì)應(yīng)的reducer上;

reducer根據(jù)輸入的<K2, list(V2)> 得到最終結(jié)果<K3, V3>并輸出到STDOUT上。?

3.6.2 簡(jiǎn)單示例程序?

下面我們假設(shè)需要做這樣一個(gè)工作,輸入一個(gè)文件,文件中每行是一個(gè)數(shù)字,然后得到該文件中的數(shù)字的最大值(當(dāng)然這里可以使用streaming中自帶的Aggregate)。 首先我們編寫(xiě)一個(gè)python文件(如果對(duì)python不是很熟悉,看看[這里]):

3.6.2.1 準(zhǔn)備數(shù)據(jù)

?xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ echo "http://www.cs.brandeis.edu" >url1

?xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ echo "http://www.nytimes.com" >url2

上傳到hdfs上:

?xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ bin/hadoop dfs -mkdir urls

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ bin/hadoop dfs -put url1 urls/ xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ bin/hadoop dfs -put url2 urls/

3.6.2.2 編寫(xiě)mapper?multifetch.py

#!/usr/bin/env?python

import?sys,?urllib,?re

title_re?
=?re.compile("<title>(.*?)</title>",
????????re.MULTILINE?
|?re.DOTALL?|?re.IGNORECASE)

for?line?in?sys.stdin:
????
#?We?assume?that?we?are?fed?a?series?of?URLs,?one?per?line
????url?=?line.strip()
????
#?Fetch?the?content?and?output?the?title?(pairs?are?tab-delimited)
????match?=?title_re.search(urllib.urlopen(url).read())
????
if?match:
????????
print?url,?"\t",?match.group(1).strip()

該文件的主要作用是給定一個(gè)url,然后輸出該url代表的html頁(yè)面的title部分。

在本地測(cè)試一下該程序:

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ echo "http://www.cs.brandeis.edu" >urls xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ echo "http://www.nytimes.com" >>urls

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ sudo chmod u+x ./multifetch.py?

?xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ cat urls | ./multifetch.py 將輸出:

?http://www.cs.brandeis.edu? Computer Science Department | Brandeis University

http://www.nytimes.com The New York Times - Breaking News, World News &amp; Multimedia

3.6.2.3 編寫(xiě)reducer reducer.py

編寫(xiě)reducer.py文件?

?#!/usr/bin/env?python from?operator?import?itemgetter
import?sys

for?line?in?sys.stdin:
????line?
=?line.strip()
????
print?line

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ chmod u+x ./reducer.py??

現(xiàn)在我們的mapper和reducer已經(jīng)準(zhǔn)備好了,那么首先在本地上運(yùn)行測(cè)試一下程序的功能,下面的命令模擬在hadoop上運(yùn)行的過(guò)程:

首先mapper從stdin讀取數(shù)據(jù),這里是一行;

然后讀取該行的內(nèi)容作為一個(gè)url,然后得到該url代表的html的title的內(nèi)容,輸出<url, url-title-content>;

調(diào)用sort命令將mapper輸出排序;

將排序完成的結(jié)果交給reducer,這里的reducer僅僅是將結(jié)果輸出。?

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$?cat?urls?|?./multifetch.py?|?sort?|?./reducer.py?
http:
//www.cs.brandeis.edu?????Computer?Science?Department?|?Brandeis?University
http://www.nytimes.com?????The?New?York?Times?-?Breaking?News,?World?News?&amp;?Multimedia??

顯然程序能夠正確?

3.6.2.4 在hadoop streaming上運(yùn)行

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$?bin/hadoop?jar?./mapred/contrib/streaming/hadoop-0.21.0-streaming.jar?\
>?-mapper?/home/xuqiang/hadoop/src/hadoop-0.21.0/multifetch.py?\
>?-reducer?/home/xuqiang/hadoop/src/hadoop-0.21.0/reducer.py?\
>?-input?urls/*?\
>?-output?titles?

?程序運(yùn)行完成之后,查看運(yùn)行結(jié)果:

?xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ bin/hadoop dfs -cat titles/part-00000

<4>. 參考資料及代碼下載

http://pages.cs.brandeis.edu/~cs147a/lab/hadoop-example/?

http://hadoop.apache.org/mapreduce/docs/r0.21.0/streaming.html#Hadoop+Streaming?

<Hadoop In Action>


出處:http://www.cnblogs.com/xuqiang/archive/2011/06/05/2071935.html

《新程序員》:云原生和全面數(shù)字化實(shí)踐50位技術(shù)專(zhuān)家共同創(chuàng)作,文字、視頻、音頻交互閱讀

總結(jié)

以上是生活随笔為你收集整理的MapReduce编程基础的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。