日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

hadoop 入门实例【转】

發(fā)布時間:2023/11/29 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 hadoop 入门实例【转】 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

原文鏈接:http://www.cnblogs.com/xia520pi/archive/2012/06/04/2534533.html

1、數(shù)據(jù)去重

  ?"數(shù)據(jù)去重"主要是為了掌握和利用并行化思想來對數(shù)據(jù)進(jìn)行有意義篩選統(tǒng)計(jì)大數(shù)據(jù)集上的數(shù)據(jù)種類個數(shù)從網(wǎng)站日志中計(jì)算訪問地等這些看似龐雜的任務(wù)都會涉及數(shù)據(jù)去重。下面就進(jìn)入這個實(shí)例的MapReduce程序設(shè)計(jì)。

1.1 實(shí)例描述

  對數(shù)據(jù)文件中的數(shù)據(jù)進(jìn)行去重。數(shù)據(jù)文件中的每行都是一個數(shù)據(jù)。

  樣例輸入如下所示:

?????1)file1:

?

2012-3-1 a

2012-3-2 b

2012-3-3 c

2012-3-4 d

2012-3-5 a

2012-3-6 b

2012-3-7 c

2012-3-3 c

?

?????2)file2:

?

2012-3-1 b

2012-3-2 a

2012-3-3 b

2012-3-4 d

2012-3-5 a

2012-3-6 c

2012-3-7 d

2012-3-3 c

?

???? 樣例輸出如下所示:

?

2012-3-1 a

2012-3-1 b

2012-3-2 a

2012-3-2 b

2012-3-3 b

2012-3-3 c

2012-3-4 d

2012-3-5 a

2012-3-6 b

2012-3-6 c

2012-3-7 c

2012-3-7 d

?

1.2 設(shè)計(jì)思路

  數(shù)據(jù)去重最終目標(biāo)是讓原始數(shù)據(jù)出現(xiàn)次數(shù)超過一次數(shù)據(jù)輸出文件只出現(xiàn)一次。我們自然而然會想到將同一個數(shù)據(jù)的所有記錄都交給一臺reduce機(jī)器,無論這個數(shù)據(jù)出現(xiàn)多少次,只要在最終結(jié)果中輸出一次就可以了。具體就是reduce的輸入應(yīng)該以數(shù)據(jù)作為key,而對value-list則沒有要求。當(dāng)reduce接收到一個<key,value-list>時就直接將key復(fù)制到輸出的key中,并將value設(shè)置成空值

  在MapReduce流程中,map的輸出<key,value>經(jīng)過shuffle過程聚集成<key,value-list>后會交給reduce。所以從設(shè)計(jì)好的reduce輸入可以反推出map的輸出key應(yīng)為數(shù)據(jù),value任意。繼續(xù)反推,map輸出數(shù)據(jù)的key為數(shù)據(jù),而在這個實(shí)例中每個數(shù)據(jù)代表輸入文件中的一行內(nèi)容,所以map階段要完成的任務(wù)就是在采用Hadoop默認(rèn)的作業(yè)輸入方式之后,將value設(shè)置為key,并直接輸出(輸出中的value任意)。map中的結(jié)果經(jīng)過shuffle過程之后交給reduce。reduce階段不會管每個key有多少個value,它直接將輸入的key復(fù)制為輸出的key,并輸出就可以了(輸出中的value被設(shè)置成空了)。

1.3 程序代碼

???? 程序代碼如下所示:

?

package?com.hebut.mr;

?

import?java.io.IOException;

?

import?org.apache.hadoop.conf.Configuration;

import?org.apache.hadoop.fs.Path;

import?org.apache.hadoop.io.IntWritable;

import?org.apache.hadoop.io.Text;

import?org.apache.hadoop.mapreduce.Job;

import?org.apache.hadoop.mapreduce.Mapper;

import?org.apache.hadoop.mapreduce.Reducer;

import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import?org.apache.hadoop.util.GenericOptionsParser;

?

public?class?Dedup {

?

????//map將輸入中的value復(fù)制到輸出數(shù)據(jù)的key上,并直接輸出

????public?static?class?Map?extends?Mapper<Object,Text,Text,Text>{

????????private?static?Text?line=new?Text();//每行數(shù)據(jù)

???????

????????//實(shí)現(xiàn)map函數(shù)

????????public?void?map(Object key,Text value,Context context)

????????????????throws?IOException,InterruptedException{

????????????line=value;

??????????? context.write(line,?new?Text(""));

??????? }

???????

??? }

???

????//reduce將輸入中的key復(fù)制到輸出數(shù)據(jù)的key上,并直接輸出

????public?static?class?Reduce?extends?Reducer<Text,Text,Text,Text>{

????????//實(shí)現(xiàn)reduce函數(shù)

????????public?void?reduce(Text key,Iterable<Text> values,Context context)

????????????????throws?IOException,InterruptedException{

??????????? context.write(key,?new?Text(""));

??????? }

???????

??? }

???

????public?static?void?main(String[] args)?throws?Exception{

??????? Configuration conf =?new?Configuration();

????????//這句話很關(guān)鍵

??????? conf.set("mapred.job.tracker",?"192.168.1.2:9001");

???????

??????? String[] ioArgs=new?String[]{"dedup_in","dedup_out"};

???? String[] otherArgs =?new?GenericOptionsParser(conf, ioArgs).getRemainingArgs();

?????if?(otherArgs.length?!= 2) {

???? System.err.println("Usage: Data Deduplication <in> <out>");

???? System.exit(2);

???? }

?????

???? Job job =?new?Job(conf,?"Data Deduplication");

???? job.setJarByClass(Dedup.class);

?????

?????//設(shè)置Map、Combine和Reduce處理類

???? job.setMapperClass(Map.class);

???? job.setCombinerClass(Reduce.class);

???? job.setReducerClass(Reduce.class);

?????

?????//設(shè)置輸出類型

???? job.setOutputKeyClass(Text.class);

???? job.setOutputValueClass(Text.class);

?????

?????//設(shè)置輸入和輸出目錄

???? FileInputFormat.addInputPath(job,?new?Path(otherArgs[0]));

???? FileOutputFormat.setOutputPath(job,?new?Path(otherArgs[1]));

???? System.exit(job.waitForCompletion(true) ? 0 : 1);

???? }

}

?

1.4 代碼結(jié)果

?????1)準(zhǔn)備測試數(shù)據(jù)

???? 通過Eclipse下面的"DFS Locations"在"/user/hadoop"目錄下創(chuàng)建輸入文件"dedup_in"文件夾(備注:"dedup_out"不需要創(chuàng)建。)如圖1.4-1所示,已經(jīng)成功創(chuàng)建。

???? ????

圖1.4-1 創(chuàng)建"dedup_in"?????????????? ??????????????????? 圖1.4.2 上傳"file*.txt"

?

??? ?然后在本地建立兩個txt文件,通過Eclipse上傳到"/user/hadoop/dedup_in"文件夾中,兩個txt文件的內(nèi)容如"實(shí)例描述"那兩個文件一樣。如圖1.4-2所示,成功上傳之后。

???? 從SecureCRT遠(yuǎn)處查看"Master.Hadoop"的也能證實(shí)我們上傳的兩個文件。

?

?

??? 查看兩個文件的內(nèi)容如圖1.4-3所示:

?

圖1.4-3 文件"file*.txt"內(nèi)容

2)查看運(yùn)行結(jié)果

???? 這時我們右擊Eclipse的"DFS Locations"中"/user/hadoop"文件夾進(jìn)行刷新,這時會發(fā)現(xiàn)多出一個"dedup_out"文件夾,且里面有3個文件,然后打開雙其"part-r-00000"文件,會在Eclipse中間把內(nèi)容顯示出來。如圖1.4-4所示。

?

圖1.4-4 運(yùn)行結(jié)果

?

??? 此時,你可以對比一下和我們之前預(yù)期的結(jié)果是否一致。

2、數(shù)據(jù)排序

  "數(shù)據(jù)排序"是許多實(shí)際任務(wù)執(zhí)行時要完成的第一項(xiàng)工作,比如學(xué)生成績評比數(shù)據(jù)建立索引等。這個實(shí)例和數(shù)據(jù)去重類似,都是原始數(shù)據(jù)進(jìn)行初步處理,為進(jìn)一步的數(shù)據(jù)操作打好基礎(chǔ)。下面進(jìn)入這個示例。

2.1 實(shí)例描述

??? 對輸入文件中數(shù)據(jù)進(jìn)行排序。輸入文件中的每行內(nèi)容均為一個數(shù)字即一個數(shù)據(jù)。要求在輸出中每行有兩個間隔的數(shù)字,其中,第一個代表原始數(shù)據(jù)在原始數(shù)據(jù)集中的位次第二個代表原始數(shù)據(jù)

??? 樣例輸入

????1)file1:

?

2

32

654

32

15

756

65223

?

????2)file2:

?

5956

22

650

92

?

????3)file3:

?

26

54

6

?

??? 樣例輸出

?

1??? 2

2??? 6

3??? 15

4??? 22

5??? 26

6??? 32

7??? 32

8??? 54

9??? 92

10??? 650

11??? 654

12??? 756

13??? 5956

14??? 65223

?

2.2 設(shè)計(jì)思路

  這個實(shí)例僅僅要求對輸入數(shù)據(jù)進(jìn)行排序,熟悉MapReduce過程的讀者會很快想到在MapReduce過程中就有排序,是否可以利用這個默認(rèn)的排序,而不需要自己再實(shí)現(xiàn)具體的排序呢?答案是肯定的。

  但是在使用之前首先需要了解它的默認(rèn)排序規(guī)則。它是按照key值進(jìn)行排序的,如果key為封裝int的IntWritable類型,那么MapReduce按照數(shù)字大小對key排序,如果key為封裝為String的Text類型,那么MapReduce按照字典順序對字符串排序。

  了解了這個細(xì)節(jié),我們就知道應(yīng)該使用封裝int的IntWritable型數(shù)據(jù)結(jié)構(gòu)了。也就是在map中將讀入的數(shù)據(jù)轉(zhuǎn)化成IntWritable型,然后作為key值輸出(value任意)。reduce拿到<key,value-list>之后,將輸入的key作為value輸出,并根據(jù)value-list元素個數(shù)決定輸出的次數(shù)。輸出的key(即代碼中的linenum)是一個全局變量,它統(tǒng)計(jì)當(dāng)前key的位次。需要注意的是這個程序中沒有配置Combiner,也就是在MapReduce過程中不使用Combiner。這主要是因?yàn)槭褂胢ap和reduce就已經(jīng)能夠完成任務(wù)了。

2.3 程序代碼

??? 程序代碼如下所示:

?

package?com.hebut.mr;

?

import?java.io.IOException;

?

import?org.apache.hadoop.conf.Configuration;

import?org.apache.hadoop.fs.Path;

import?org.apache.hadoop.io.IntWritable;

import?org.apache.hadoop.io.Text;

import?org.apache.hadoop.mapreduce.Job;

import?org.apache.hadoop.mapreduce.Mapper;

import?org.apache.hadoop.mapreduce.Reducer;

import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import?org.apache.hadoop.util.GenericOptionsParser;

?

public?class?Sort {

?

????//map將輸入中的value化成IntWritable類型,作為輸出的key

????public?static?class?Map?extends

        Mapper<Object,Text,IntWritable,IntWritable>{

????????private?static?IntWritable?data=new?IntWritable();

???????

????????//實(shí)現(xiàn)map函數(shù)

????????public?void?map(Object key,Text value,Context context)

????????????????throws?IOException,InterruptedException{

??????????? String line=value.toString();

????????????data.set(Integer.parseInt(line));

??????????? context.write(data,?new?IntWritable(1));

??????? }

???????

??? }

???

????//reduce將輸入中的key復(fù)制到輸出數(shù)據(jù)的key上,

????//然后根據(jù)輸入的value-list中元素的個數(shù)決定key的輸出次數(shù)

????//用全局linenum來代表key的位次

????public?static?class?Reduce?extends

??????????? Reducer<IntWritable,IntWritable,IntWritable,IntWritable>{

???????

????????private?static?IntWritable?linenum?=?new?IntWritable(1);

???????

????????//實(shí)現(xiàn)reduce函數(shù)

????????public?void?reduce(IntWritable key,Iterable<IntWritable> values,Context context)

????????????????throws?IOException,InterruptedException{

????????????for(IntWritable?val:values){

??????????????? context.write(linenum, key);

????????????????linenum?=?new?IntWritable(linenum.get()+1);

??????????? }

???????????

??????? }

?

??? }

???

????public?static?void?main(String[] args)?throws?Exception{

??????? Configuration conf =?new?Configuration();

????????//這句話很關(guān)鍵

??????? conf.set("mapred.job.tracker",?"192.168.1.2:9001");

???????

??????? String[] ioArgs=new?String[]{"sort_in","sort_out"};

???? String[] otherArgs =?new?GenericOptionsParser(conf, ioArgs).getRemainingArgs();

?????if?(otherArgs.length?!= 2) {

???? System.err.println("Usage: Data Sort <in> <out>");

???????? System.exit(2);

???? }

?????

???? Job job =?new?Job(conf,?"Data Sort");

???? job.setJarByClass(Sort.class);

?????

?????//設(shè)置Map和Reduce處理類

???? job.setMapperClass(Map.class);

???? job.setReducerClass(Reduce.class);

?????

?????//設(shè)置輸出類型

???? job.setOutputKeyClass(IntWritable.class);

???? job.setOutputValueClass(IntWritable.class);

?????

?????//設(shè)置輸入和輸出目錄

???? FileInputFormat.addInputPath(job,?new?Path(otherArgs[0]));

???? FileOutputFormat.setOutputPath(job,?new?Path(otherArgs[1]));

???? System.exit(job.waitForCompletion(true) ? 0 : 1);

???? }

}

?

2.4 代碼結(jié)果

1)準(zhǔn)備測試數(shù)據(jù)

??? 通過Eclipse下面的"DFS Locations"在"/user/hadoop"目錄下創(chuàng)建輸入文件"sort_in"文件夾(備注:"sort_out"不需要創(chuàng)建。)如圖2.4-1所示,已經(jīng)成功創(chuàng)建。

??????????????

圖2.4-1 創(chuàng)建"sort_in"????????????????????????????????????????????????? 圖2.4.2 上傳"file*.txt"

?

??? 然后在本地建立三個txt文件,通過Eclipse上傳到"/user/hadoop/sort_in"文件夾中,三個txt文件的內(nèi)容如"實(shí)例描述"那三個文件一樣。如圖2.4-2所示,成功上傳之后。

??? 從SecureCRT遠(yuǎn)處查看"Master.Hadoop"的也能證實(shí)我們上傳的三個文件。

?

?

查看兩個文件的內(nèi)容如圖2.4-3所示:

?

圖2.4-3 文件"file*.txt"內(nèi)容

2)查看運(yùn)行結(jié)果

??? 這時我們右擊Eclipse的"DFS Locations"中"/user/hadoop"文件夾進(jìn)行刷新,這時會發(fā)現(xiàn)多出一個"sort_out"文件夾,且里面有3個文件,然后打開雙其"part-r-00000"文件,會在Eclipse中間把內(nèi)容顯示出來。如圖2.4-4所示。

?

圖2.4-4 運(yùn)行結(jié)果

3、平均成績

??? "平均成績"主要目的還是在重溫經(jīng)典"WordCount"例子,可以說是在基礎(chǔ)上的微變化版,該實(shí)例主要就是實(shí)現(xiàn)一個計(jì)算學(xué)生平均成績的例子。

3.1 實(shí)例描述

  對輸入文件中數(shù)據(jù)進(jìn)行就算學(xué)生平均成績。輸入文件中的每行內(nèi)容均為一個學(xué)生姓名和他相應(yīng)的成績,如果有多門學(xué)科,則每門學(xué)科為一個文件。要求在輸出中每行有兩個間隔的數(shù)據(jù),其中,第一個代表學(xué)生的姓名第二個代表其平均成績

??? 樣本輸入

????1)math:

?

張三??? 88

李四??? 99

王五??? 66

趙六??? 77

?

????2)china:

?

張三??? 78

李四??? 89

王五??? 96

趙六??? 67

?

????3)english:

?

張三??? 80

李四??? 82

王五??? 84

趙六??? 86

?

??? 樣本輸出

?

張三??? 82

李四??? 90

王五??? 82

趙六??? 76

?

3.2 設(shè)計(jì)思路

??? 計(jì)算學(xué)生平均成績是一個仿"WordCount"例子,用來重溫一下開發(fā)MapReduce程序的流程。程序包括兩部分的內(nèi)容:Map部分和Reduce部分,分別實(shí)現(xiàn)了map和reduce的功能。

????Map處理的是一個純文本文件,文件中存放的數(shù)據(jù)時每一行表示一個學(xué)生的姓名和他相應(yīng)一科成績。Mapper處理的數(shù)據(jù)是由InputFormat分解過的數(shù)據(jù)集,其中InputFormat的作用是將數(shù)據(jù)集切割成小數(shù)據(jù)集InputSplit,每一個InputSlit將由一個Mapper負(fù)責(zé)處理。此外,InputFormat中還提供了一個RecordReader的實(shí)現(xiàn),并將一個InputSplit解析成<key,value>對提供給了map函數(shù)。InputFormat的默認(rèn)值是TextInputFormat,它針對文本文件,按行將文本切割成InputSlit,并用LineRecordReader將InputSplit解析成<key,value>對,key是行在文本中的位置,value是文件中的一行。

??? Map的結(jié)果會通過partion分發(fā)到Reducer,Reducer做完Reduce操作后,將通過以格式OutputFormat輸出。

??? Mapper最終處理的結(jié)果對<key,value>,會送到Reducer中進(jìn)行合并,合并的時候,有相同key的鍵/值對則送到同一個Reducer上。Reducer是所有用戶定制Reducer類地基礎(chǔ),它的輸入是key和這個key對應(yīng)的所有value的一個迭代器,同時還有Reducer的上下文。Reduce的結(jié)果由Reducer.Context的write方法輸出到文件中。

3.3 程序代碼

??? 程序代碼如下所示:

?

package?com.hebut.mr;

?

import?java.io.IOException;

import?java.util.Iterator;

import?java.util.StringTokenizer;

?

import?org.apache.hadoop.conf.Configuration;

import?org.apache.hadoop.fs.Path;

import?org.apache.hadoop.io.IntWritable;

import?org.apache.hadoop.io.LongWritable;

import?org.apache.hadoop.io.Text;

import?org.apache.hadoop.mapreduce.Job;

import?org.apache.hadoop.mapreduce.Mapper;

import?org.apache.hadoop.mapreduce.Reducer;

import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import?org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import?org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import?org.apache.hadoop.util.GenericOptionsParser;

?

public?class?Score {

?

????public?static?class?Map?extends

??????????? Mapper<LongWritable, Text, Text, IntWritable> {

?

????????//?實(shí)現(xiàn)map函數(shù)

????????public?void?map(LongWritable?key, Text value, Context context)

????????????????throws?IOException, InterruptedException {

????????????//?將輸入的純文本文件的數(shù)據(jù)轉(zhuǎn)化成String

??????????? String line = value.toString();

?

????????????//?將輸入的數(shù)據(jù)首先按行進(jìn)行分割

????????????StringTokenizer tokenizerArticle =?new?StringTokenizer(line,?"\n");

?

????????????//?分別對每一行進(jìn)行處理

????????????while?(tokenizerArticle.hasMoreElements()) {

????????????????//?每行按空格劃分

??????????????? StringTokenizer tokenizerLine =?new?StringTokenizer(tokenizerArticle.nextToken());

?

??????????????? String strName = tokenizerLine.nextToken();//?學(xué)生姓名部分

??????????????? String strScore = tokenizerLine.nextToken();//?成績部分

?

??????????????? Text name =?new?Text(strName);

????????????????int?scoreInt = Integer.parseInt(strScore);

????????????????//?輸出姓名和成績

??????????????? context.write(name,?new?IntWritable(scoreInt));

??????????? }

??????? }

?

??? }

?

????public?static?class?Reduce?extends

??????????? Reducer<Text, IntWritable, Text, IntWritable> {

????????//?實(shí)現(xiàn)reduce函數(shù)

????????public?void?reduce(Text key, Iterable<IntWritable> values,

??????????????? Context context)?throws?IOException, InterruptedException {

?

????????????int?sum = 0;

????????????int?count = 0;

?

??????????? Iterator<IntWritable> iterator = values.iterator();

????????????while?(iterator.hasNext()) {

??????????????? sum += iterator.next().get();//?計(jì)算總分

??????????????? count++;//?統(tǒng)計(jì)總的科目數(shù)

??????????? }

?

????????????int?average = (int) sum / count;//?計(jì)算平均成績

??????????? context.write(key,?new?IntWritable(average));

??????? }

?

??? }

?

????public?static?void?main(String[] args)?throws?Exception {

??????? Configuration conf =?new?Configuration();

????????//?這句話很關(guān)鍵

??????? conf.set("mapred.job.tracker",?"192.168.1.2:9001");

?

??????? String[] ioArgs =?new?String[] {?"score_in",?"score_out"?};

??????? String[] otherArgs =?new?GenericOptionsParser(conf, ioArgs).getRemainingArgs();

????????if?(otherArgs.length?!= 2) {

??????????? System.err.println("Usage: Score Average <in> <out>");

??????????? System.exit(2);

??????? }

?

??????? Job job =?new?Job(conf,?"Score Average");

??????? job.setJarByClass(Score.class);

?

????????//?設(shè)置Map、Combine和Reduce處理類

??????? job.setMapperClass(Map.class);

??????? job.setCombinerClass(Reduce.class);

??????? job.setReducerClass(Reduce.class);

?

????????//?設(shè)置輸出類型

??????? job.setOutputKeyClass(Text.class);

??????? job.setOutputValueClass(IntWritable.class);

?

????????//?將輸入的數(shù)據(jù)集分割成小數(shù)據(jù)塊splites,提供一個RecordReder的實(shí)現(xiàn)

??????? job.setInputFormatClass(TextInputFormat.class);

????????//?提供一個RecordWriter的實(shí)現(xiàn),負(fù)責(zé)數(shù)據(jù)輸出

??????? job.setOutputFormatClass(TextOutputFormat.class);

?

????????//?設(shè)置輸入和輸出目錄

??????? FileInputFormat.addInputPath(job,?new?Path(otherArgs[0]));

??????? FileOutputFormat.setOutputPath(job,?new?Path(otherArgs[1]));

??????? System.exit(job.waitForCompletion(true) ? 0 : 1);

??? }

}

?

3.4 代碼結(jié)果

1)準(zhǔn)備測試數(shù)據(jù)

??? 通過Eclipse下面的"DFS Locations"在"/user/hadoop"目錄下創(chuàng)建輸入文件"score_in"文件夾(備注:"score_out"不需要創(chuàng)建。)如圖3.4-1所示,已經(jīng)成功創(chuàng)建。

?

????? ???? ?

圖3.4-1 創(chuàng)建"score_in"????????????????????????????????????????????????????? ?圖3.4.2 上傳三門分?jǐn)?shù)

?

??? 然后在本地建立三個txt文件,通過Eclipse上傳到"/user/hadoop/score_in"文件夾中,三個txt文件的內(nèi)容如"實(shí)例描述"那三個文件一樣。如圖3.4-2所示,成功上傳之后。

????備注:文本文件的編碼為"UTF-8",默認(rèn)為"ANSI",可以另存為時選擇,不然中文會出現(xiàn)亂碼

??? 從SecureCRT遠(yuǎn)處查看"Master.Hadoop"的也能證實(shí)我們上傳的三個文件。

?

?

查看三個文件的內(nèi)容如圖3.4-3所示:

?

圖3.4.3 三門成績的內(nèi)容

2)查看運(yùn)行結(jié)果

??? 這時我們右擊Eclipse的"DFS Locations"中"/user/hadoop"文件夾進(jìn)行刷新,這時會發(fā)現(xiàn)多出一個"score_out"文件夾,且里面有3個文件,然后打開雙其"part-r-00000"文件,會在Eclipse中間把內(nèi)容顯示出來。如圖3.4-4所示。

?

圖3.4-4 運(yùn)行結(jié)果

4、單表關(guān)聯(lián)

??? 前面的實(shí)例都是在數(shù)據(jù)上進(jìn)行一些簡單的處理,為進(jìn)一步的操作打基礎(chǔ)。"單表關(guān)聯(lián)"這個實(shí)例要求給出的數(shù)據(jù)尋找關(guān)心的數(shù)據(jù),它是對原始數(shù)據(jù)所包含信息的挖掘。下面進(jìn)入這個實(shí)例。

4.1 實(shí)例描述

??? 實(shí)例中給出child-parent(孩子——父母)表,要求輸出grandchild-grandparent(孫子——爺奶)表。

??? 樣例輸入如下所示。

????file:

?

child??????? parent

Tom??????? Lucy

Tom??????? Jack

Jone??????? Lucy

Jone??????? Jack

Lucy??????? Mary

Lucy??????? Ben

Jack??????? Alice

Jack??????? Jesse

Terry??????? Alice

Terry??????? Jesse

Philip??????? Terry

Philip??????? Alma

Mark??????? Terry

Mark??????? Alma

?

??? 家族樹狀關(guān)系譜:

?

?

圖4.2-1 家族譜

??? 樣例輸出如下所示。

????file:

?

grandchild??????? grandparent

Tom???????????   Alice

Tom???????????   Jesse

Jone???????????   Alice

Jone???????????   Jesse

Tom???????????   Mary

Tom???????????   Ben

Jone???????????   Mary

Jone???????????   Ben

Philip??????????  ? Alice

Philip???????????   Jesse

Mark???????????   Alice

Mark???????????   Jesse

?

4.2 設(shè)計(jì)思路

?????? 分析這個實(shí)例,顯然需要進(jìn)行單表連接,連接的是左表parent列和右表child列,且左表右表同一個表

  連接結(jié)果除去連接的兩列就是所需要的結(jié)果——"grandchild--grandparent"表。要用MapReduce解決這個實(shí)例,首先應(yīng)該考慮如何實(shí)現(xiàn)自連接其次就是連接列設(shè)置最后結(jié)果整理

????? 考慮到MapReduce的shuffle過程會將相同的key會連接在一起,所以可以將map結(jié)果的key設(shè)置成待連接,然后列中相同的值就自然會連接在一起了。再與最開始的分析聯(lián)系起來:

  要連接的是左表的parent列和右表的child列,且左表和右表是同一個表,所以在map階段讀入數(shù)據(jù)分割childparent之后,會將parent設(shè)置成keychild設(shè)置成value進(jìn)行輸出,并作為左表;再將同一對childparent中的child設(shè)置成keyparent設(shè)置成value進(jìn)行輸出,作為右表。為了區(qū)分輸出中的左右表,需要在輸出的value加上左右表信息,比如在value的String最開始處加上字符1表示左表,加上字符2表示右表。這樣在map的結(jié)果中就形成了左表和右表,然后在shuffle過程中完成連接。reduce接收到連接的結(jié)果,其中每個key的value-list就包含了"grandchild--grandparent"關(guān)系。取出每個key的value-list進(jìn)行解析,將左表中的child放入一個數(shù)組右表中的parent放入一個數(shù)組,然后對兩個數(shù)組求笛卡爾積就是最后的結(jié)果了。

4.3 程序代碼

??? 程序代碼如下所示。

?

package?com.hebut.mr;

?

import?java.io.IOException;

import?java.util.*;

?

import?org.apache.hadoop.conf.Configuration;

import?org.apache.hadoop.fs.Path;

import?org.apache.hadoop.io.IntWritable;

import?org.apache.hadoop.io.Text;

import?org.apache.hadoop.mapreduce.Job;

import?org.apache.hadoop.mapreduce.Mapper;

import?org.apache.hadoop.mapreduce.Reducer;

import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import?org.apache.hadoop.util.GenericOptionsParser;

?

public?class?STjoin {

?

????public?static?int?time?= 0;

?

????/*

???? * map將輸出分割child和parent,然后正序輸出一次作為右表,

???? *?反序輸出一次作為左表,需要注意的是在輸出的value中必須

???? *?加上左右表的區(qū)別標(biāo)識。

???? */

????public?static?class?Map?extends?Mapper<Object, Text, Text, Text> {

?

????????//?實(shí)現(xiàn)map函數(shù)

????????public?void?map(Object key, Text value, Context context)

????????????????throws?IOException, InterruptedException {

??????????? String childname =?new?String();//?孩子名稱

??????????? String parentname =?new?String();//?父母名稱

??????????? String relationtype =?new?String();//?左右表標(biāo)識

?

????????????//?輸入的一行預(yù)處理文本

??????????? StringTokenizer itr=new?StringTokenizer(value.toString());

??????????? String[] values=new?String[2];

????????????int?i=0;

????????????while(itr.hasMoreTokens()){

??????????????? values[i]=itr.nextToken();

??????????????? i++;

??????????? }

???????????

????????????if?(values[0].compareTo("child") != 0) {

??????????????? childname = values[0];

??????????????? parentname = values[1];

?

????????????????//?輸出左表

??????????????? relationtype =?"1";

??????????????? context.write(new?Text(values[1]),?new?Text(relationtype +

????????????????????????"+"+ childname +?"+"?+ parentname));

?

????????????????//?輸出右表

??????????????? relationtype =?"2";

??????????????? context.write(new?Text(values[0]),?new?Text(relationtype +

????????????????????????"+"+ childname +?"+"?+ parentname));

??????????? }

??????? }

?

??? }

?

????public?static?class?Reduce?extends?Reducer<Text, Text, Text, Text> {

?

????????//?實(shí)現(xiàn)reduce函數(shù)

????????public?void?reduce(Text key, Iterable<Text> values, Context context)

????????????????throws?IOException, InterruptedException {

?

????????????//?輸出表頭

????????????if?(0 ==?time) {

????????????????context.write(new?Text("grandchild"),?new?Text("grandparent"));

????????????????time++;

??????????? }

?

????????????int?grandchildnum = 0;

??????????? String[] grandchild =?new?String[10];

????????????int?grandparentnum = 0;

??????????? String[] grandparent =?new?String[10];

?

????????????Iterator?ite = values.iterator();

????????????while?(ite.hasNext()) {

??????????????? String record = ite.next().toString();

????????????????int?len = record.length();

????????????????int?i = 2;

????????????????if?(0 == len) {

????????????????????continue;

??????????????? }

?

????????????????//?取得左右表標(biāo)識

????????????????char?relationtype = record.charAt(0);

????????????????//?定義孩子和父母變量

??????????????? String childname =?new?String();

??????????????? String parentname =?new?String();

?

????????????????//?獲取value-list中value的child

????????????????while?(record.charAt(i) !=?'+') {

??????????????????? childname += record.charAt(i);

??????????????????? i++;

??????????????? }

?

??????????????? i = i + 1;

?

????????????????//?獲取value-list中value的parent

????????????????while?(i < len) {

??????????????????? parentname += record.charAt(i);

??????????????????? i++;

??????????????? }

?

????????????????//?左表,取出child放入grandchildren

????????????????if?('1'?== relationtype) {

??????????????????? grandchild[grandchildnum] = childname;

??????????????????? grandchildnum++;

??????????????? }

?

????????????????//?右表,取出parent放入grandparent

????????????????if?('2'?== relationtype) {

??????????????????? grandparent[grandparentnum] = parentname;

??????????????????? grandparentnum++;

??????????????? }

??????????? }

?

????????????//?grandchild和grandparent數(shù)組求笛卡爾兒積

????????????if?(0 != grandchildnum && 0 != grandparentnum) {

????????????????for?(int?m = 0; m < grandchildnum; m++) {

????????????????????for?(int?n = 0; n < grandparentnum; n++) {

????????????????????????//?輸出結(jié)果

??????????????????????? context.write(new?Text(grandchild[m]),?new?Text(grandparent[n]));

??????????????????? }

??????????????? }

??????????? }

??????? }

??? }

?

????public?static?void?main(String[] args)?throws?Exception {

??????? Configuration conf =?new?Configuration();

????????//?這句話很關(guān)鍵

??????? conf.set("mapred.job.tracker",?"192.168.1.2:9001");

?

??????? String[] ioArgs =?new?String[] {?"STjoin_in",?"STjoin_out"?};

??????? String[] otherArgs =?new?GenericOptionsParser(conf, ioArgs).getRemainingArgs();

????????if?(otherArgs.length?!= 2) {

??????????? System.err.println("Usage: Single Table Join <in> <out>");

??????????? System.exit(2);

??????? }

?

??????? Job job =?new?Job(conf,?"Single Table Join");

??????? job.setJarByClass(STjoin.class);

?

????????//?設(shè)置Map和Reduce處理類

??????? job.setMapperClass(Map.class);

??????? job.setReducerClass(Reduce.class);

?

????????//?設(shè)置輸出類型

??????? job.setOutputKeyClass(Text.class);

??????? job.setOutputValueClass(Text.class);

?

????????//?設(shè)置輸入和輸出目錄

??????? FileInputFormat.addInputPath(job,?new?Path(otherArgs[0]));

??????? FileOutputFormat.setOutputPath(job,?new?Path(otherArgs[1]));

??????? System.exit(job.waitForCompletion(true) ? 0 : 1);

??? }

}

?

4.4 代碼結(jié)果

1)準(zhǔn)備測試數(shù)據(jù)

??? 通過Eclipse下面的"DFS Locations"在"/user/hadoop"目錄下創(chuàng)建輸入文件"STjoin_in"文件夾(備注:"STjoin_out"不需要創(chuàng)建。)如圖4.4-1所示,已經(jīng)成功創(chuàng)建。

?

???????????????? ?

圖4.4-1 創(chuàng)建"STjoin_in"?????????????????????????????????????? 圖4.4.2 上傳"child-parent"表

?

??? 然后在本地建立一個txt文件,通過Eclipse上傳到"/user/hadoop/STjoin_in"文件夾中,一個txt文件的內(nèi)容如"實(shí)例描述"那個文件一樣。如圖4.4-2所示,成功上傳之后。

??? 從SecureCRT遠(yuǎn)處查看"Master.Hadoop"的也能證實(shí)我們上傳的文件,顯示其內(nèi)容如圖4.4-3所示:

?

圖4.4-3 表"child-parent"內(nèi)容

????2)運(yùn)行詳解

????(1)Map處理:

??? map函數(shù)輸出結(jié)果如下所示。

?

child??????? parent????????????????àà??????????????????? 忽略此行

Tom??????? Lucy???????????????????àà??????????????? <Lucy,1+Tom+Lucy>

???????????????????????????????????????????         <Tom,2+Tom+Lucy >

Tom??????? Jack????????????????????àà??????????????? <Jack,1+Tom+Jack>

???????????????????????????????????????????         <Tom,2+Tom+Jack>

Jone??????? Lucy???????????????  àà??????????????? <Lucy,1+Jone+Lucy>

???????????????????????????????????????????         <Jone,2+Jone+Lucy>

Jone??????? Jack????????????????????àà??????????????? <Jack,1+Jone+Jack>

???????????????????????????????????????????         <Jone,2+Jone+Jack>

Lucy??????? Mary???????????????????àà??????????????? <Mary,1+Lucy+Mary>

???????????????????????????????????????????         <Lucy,2+Lucy+Mary>

Lucy??????? Ben????????????????????àà??????????????? <Ben,1+Lucy+Ben>

???????????????????????????????????????????          <Lucy,2+Lucy+Ben>

Jack??????? Alice????????????????????àà??????????????? <Alice,1+Jack+Alice>

???????????????????????????????????????????           <Jack,2+Jack+Alice>

Jack??????? Jesse???????????????????àà??????????????? <Jesse,1+Jack+Jesse>

???????????????????????????????????????????           <Jack,2+Jack+Jesse>

Terry??????? Alice???????????????????àà??????????????? <Alice,1+Terry+Alice>

???????????????????????????????????????????           <Terry,2+Terry+Alice>

Terry??????? Jesse??????????????????àà??????????????? <Jesse,1+Terry+Jesse>

???????????????????????????????????????????           <Terry,2+Terry+Jesse>

Philip??????? Terry??????????????????àà??????????????? <Terry,1+Philip+Terry>

???????????????????????????????????????????           <Philip,2+Philip+Terry>

Philip??????? Alma???????????????????àà??????????????? <Alma,1+Philip+Alma>

???????????????????????????????????????????           <Philip,2+Philip+Alma>

Mark??????? Terry???????????????????àà??????????????? <Terry,1+Mark+Terry>

???????????????????????????????????????????           <Mark,2+Mark+Terry>

Mark??????? Alma???????????????  àà??????????????? <Alma,1+Mark+Alma>

???????????????????????????????????????????           <Mark,2+Mark+Alma>

?

????(2)Shuffle處理

??? 在shuffle過程中完成連接。

?

map函數(shù)輸出

排序結(jié)果

shuffle連接

<Lucy,1+Tom+Lucy>

<Tom,2+Tom+Lucy>

<Jack,1+Tom+Jack>

<Tom,2+Tom+Jack>

<Lucy,1+Jone+Lucy>

<Jone,2+Jone+Lucy>

<Jack,1+Jone+Jack>

<Jone,2+Jone+Jack>

<Mary,1+Lucy+Mary>

<Lucy,2+Lucy+Mary>

<Ben,1+Lucy+Ben>

<Lucy,2+Lucy+Ben>

<Alice,1+Jack+Alice>

<Jack,2+Jack+Alice>

<Jesse,1+Jack+Jesse>

<Jack,2+Jack+Jesse>

<Alice,1+Terry+Alice>

<Terry,2+Terry+Alice>

<Jesse,1+Terry+Jesse>

<Terry,2+Terry+Jesse>

<Terry,1+Philip+Terry>

<Philip,2+Philip+Terry>

<Alma,1+Philip+Alma>

<Philip,2+Philip+Alma>

<Terry,1+Mark+Terry>

<Mark,2+Mark+Terry>

<Alma,1+Mark+Alma>

<Mark,2+Mark+Alma>

<Alice,1+Jack+Alice>

<Alice,1+Terry+Alice>

<Alma,1+Philip+Alma>

<Alma,1+Mark+Alma>

<Ben,1+Lucy+Ben>

<Jack,1+Tom+Jack>

<Jack,1+Jone+Jack>

<Jack,2+Jack+Alice>

<Jack,2+Jack+Jesse>

<Jesse,1+Jack+Jesse>

<Jesse,1+Terry+Jesse>

<Jone,2+Jone+Lucy>

<Jone,2+Jone+Jack>

<Lucy,1+Tom+Lucy>

<Lucy,1+Jone+Lucy>

<Lucy,2+Lucy+Mary>

<Lucy,2+Lucy+Ben>

<Mary,1+Lucy+Mary>

<Mark,2+Mark+Terry>

<Mark,2+Mark+Alma>

<Philip,2+Philip+Terry>

<Philip,2+Philip+Alma>

<Terry,2+Terry+Alice>

<Terry,2+Terry+Jesse>

<Terry,1+Philip+Terry>

<Terry,1+Mark+Terry>

<Tom,2+Tom+Lucy>

<Tom,2+Tom+Jack>

<Alice,1+Jack+Alice,

??????? 1+Terry+Alice?,

??????? 1+Philip+Alma,

??????? 1+Mark+Alma >

<Ben,1+Lucy+Ben>

<Jack,1+Tom+Jack,

??????? 1+Jone+Jack,

??????? 2+Jack+Alice,

??????? 2+Jack+Jesse >

<Jesse,1+Jack+Jesse,

??????? 1+Terry+Jesse >

<Jone,2+Jone+Lucy,

??????? 2+Jone+Jack>

<Lucy,1+Tom+Lucy,

??????? 1+Jone+Lucy,

??????? 2+Lucy+Mary,

??????? 2+Lucy+Ben>

<Mary,1+Lucy+Mary,

??????? 2+Mark+Terry,

??????? 2+Mark+Alma>

<Philip,2+Philip+Terry,

??????? 2+Philip+Alma>

<Terry,2+Terry+Alice,

??????? 2+Terry+Jesse,

??????? 1+Philip+Terry,

??????? 1+Mark+Terry>

<Tom,2+Tom+Lucy,

??????? 2+Tom+Jack>

?

????(3)Reduce處理

????首先由語句"0 != grandchildnum && 0 != grandparentnum"得知,只要在"value-list"中沒有左表或者右表,則不會做處理,可以根據(jù)這條規(guī)則去除無效shuffle連接

?

無效的shuffle連接

有效的shuffle連接

<Alice,1+Jack+Alice,

??????? 1+Terry+Alice?,

??????? 1+Philip+Alma,

??????? 1+Mark+Alma >

<Ben,1+Lucy+Ben>

<Jesse,1+Jack+Jesse,

??????? 1+Terry+Jesse >

<Jone,2+Jone+Lucy,

??????? 2+Jone+Jack>

<Mary,1+Lucy+Mary,

??????? 2+Mark+Terry,

??????? 2+Mark+Alma>

<Philip,2+Philip+Terry,

??????? 2+Philip+Alma>

<Tom,2+Tom+Lucy,

??????? 2+Tom+Jack>

<Jack,1+Tom+Jack,

??????? 1+Jone+Jack,

??????? 2+Jack+Alice,

??????? 2+Jack+Jesse >

<Lucy,1+Tom+Lucy,

??????? 1+Jone+Lucy,

??????? 2+Lucy+Mary,

??????? 2+Lucy+Ben>

<Terry,2+Terry+Alice,

??????? 2+Terry+Jesse,

??????? 1+Philip+Terry,

??????? 1+Mark+Terry>

??? 然后根據(jù)下面語句進(jìn)一步對有效的shuffle連接做處理。

?

// 左表,取出child放入grandchildren

if ('1' == relationtype) {

??? grandchild[grandchildnum] = childname;

??? grandchildnum++;

}

?

// 右表,取出parent放入grandparent

if ('2' == relationtype) {

??? grandparent[grandparentnum] = parentname;

??? grandparentnum++;

}

?

??? 針對一條數(shù)據(jù)進(jìn)行分析:

?

<Jack,1+Tom+Jack,

??????? 1+Jone+Jack,

??????? 2+Jack+Alice,

??????? 2+Jack+Jesse >

?

????分析結(jié)果左表用"字符1"表示,右表用"字符2"表示,上面的<key,value-list>中的"key"表示左表與右表連接鍵。而"value-list"表示以"key"連接左表與右表相關(guān)數(shù)據(jù)

??? 根據(jù)上面針對左表與右表不同的處理規(guī)則,取得兩個數(shù)組的數(shù)據(jù)如下所示:

?

grandchild

Tom、Jone(grandchild[grandchildnum] = childname;)

grandparent

Alice、Jesse(grandparent[grandparentnum] = parentname;)

????

??? 然后根據(jù)下面語句進(jìn)行處理。

?

for (int m = 0; m < grandchildnum; m++) {

??? for (int n = 0; n < grandparentnum; n++) {

??????? context.write(new Text(grandchild[m]), new Text(grandparent[n]));

??? }

}

?

??

?

處理結(jié)果如下面所示:

?

Tom??????? Jesse

Tom??????? Alice

Jone??????? Jesse

Jone??????? Alice?

??? 其他的有效shuffle連接處理都是如此

3)查看運(yùn)行結(jié)果

??? 這時我們右擊Eclipse的"DFS Locations"中"/user/hadoop"文件夾進(jìn)行刷新,這時會發(fā)現(xiàn)多出一個"STjoin_out"文件夾,且里面有3個文件,然后打開雙其"part-r-00000"文件,會在Eclipse中間把內(nèi)容顯示出來。如圖4.4-4所示。

?

圖4.4-4 運(yùn)行結(jié)果

5、多表關(guān)聯(lián)

????多表關(guān)聯(lián)和單表關(guān)聯(lián)類似,它也是通過對原始數(shù)據(jù)進(jìn)行一定的處理,從其中挖掘出關(guān)心的信息。下面進(jìn)入這個實(shí)例。

5.1 實(shí)例描述

??? 輸入是兩個文件,一個代表工廠表,包含工廠名列和地址編號列;另一個代表地址表,包含地址名列和地址編號列。要求從輸入數(shù)據(jù)中找出工廠名地址名對應(yīng)關(guān)系,輸出"工廠名——地址名"表。

??? 樣例輸入如下所示。

????1)factory:

?

factoryname???????????????     addressed

Beijing Red Star???????????????     1

Shenzhen Thunder???????????     3

Guangzhou Honda???????????     2

Beijing Rising??????????????????     1

Guangzhou Development Bank??????2

Tencent???????????????         3

Back of Beijing???????????????      1

?

????2)address:

?

addressID??? addressname

1???????     Beijing

2???????     Guangzhou

3???????     Shenzhen

4???????     Xian

?

??? 樣例輸出如下所示。

?

factoryname???????????????????     addressname

Back of Beijing???????????????????     ? Beijing

Beijing Red Star???????????????????     Beijing

Beijing Rising???????????????????       Beijing

Guangzhou Development Bank??????????Guangzhou

Guangzhou Honda???????????????     Guangzhou

Shenzhen Thunder???????????????     Shenzhen

Tencent???????????????????         Shenzhen

?

5.2 設(shè)計(jì)思路

??? 多表關(guān)聯(lián)和單表關(guān)聯(lián)相似,都類似于數(shù)據(jù)庫中的自然連接。相比單表關(guān)聯(lián),多表關(guān)聯(lián)的左右表和連接列更加清楚。所以可以采用和單表關(guān)聯(lián)的相同處理方式,map識別出輸入的行屬于哪個表之后,對其進(jìn)行分割,將連接的列值保存在key中,另一列和左右表標(biāo)識保存在value中,然后輸出。reduce拿到連接結(jié)果之后,解析value內(nèi)容,根據(jù)標(biāo)志將左右表內(nèi)容分開存放,然后求笛卡爾積,最后直接輸出。

??? 這個實(shí)例的具體分析參考單表關(guān)聯(lián)實(shí)例。下面給出代碼。

5.3 程序代碼

??? 程序代碼如下所示:

?

package?com.hebut.mr;

?

import?java.io.IOException;

import?java.util.*;

?

import?org.apache.hadoop.conf.Configuration;

import?org.apache.hadoop.fs.Path;

import?org.apache.hadoop.io.IntWritable;

import?org.apache.hadoop.io.Text;

import?org.apache.hadoop.mapreduce.Job;

import?org.apache.hadoop.mapreduce.Mapper;

import?org.apache.hadoop.mapreduce.Reducer;

import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import?org.apache.hadoop.util.GenericOptionsParser;

?

public?class?MTjoin {

?

????public?static?int?time?= 0;

?

????/*

???? *?在map中先區(qū)分輸入行屬于左表還是右表,然后對兩列值進(jìn)行分割,

???? *?保存連接列在key值,剩余列和左右表標(biāo)志在value中,最后輸出

???? */

????public?static?class?Map?extends?Mapper<Object, Text, Text, Text> {

?

????????//?實(shí)現(xiàn)map函數(shù)

????????public?void?map(Object key, Text value, Context context)

????????????????throws?IOException, InterruptedException {

??????????? String line = value.toString();//?每行文件

??????????? String relationtype =?new?String();//?左右表標(biāo)識

?

????????????//?輸入文件首行,不處理

????????????if?(line.contains("factoryname") ==?true

??????????????????? || line.contains("addressed") ==?true) {

????????????????return;

??????????? }

?

????????????//?輸入的一行預(yù)處理文本

??????????? StringTokenizer itr =?new?StringTokenizer(line);

??????????? String mapkey =?new?String();

??????????? String mapvalue =?new?String();

????????????int?i = 0;

????????????while?(itr.hasMoreTokens()) {

????????????????//?先讀取一個單詞

??????????????? String token = itr.nextToken();

????????????????//?判斷該地址ID就把存到"values[0]"

????????????????if?(token.charAt(0) >=?'0'?&& token.charAt(0) <=?'9') {

??????????????????? mapkey = token;

????????????????????if?(i > 0) {

??????????????????????? relationtype =?"1";

??????????????????? }?else?{

??????????????????????? relationtype =?"2";

??????????????????? }

????????????????????continue;

??????????????? }

?

????????????????//?存工廠名

??????????????? mapvalue += token +?" ";

??????????????? i++;

??????????? }

?

????????????//?輸出左右表

??????????? context.write(new?Text(mapkey),?new?Text(relationtype +?"+"+ mapvalue));

??????? }

??? }

?

????/*

???? * reduce解析map輸出,將value中數(shù)據(jù)按照左右表分別保存,

  *?然后求出笛卡爾積,并輸出。

???? */

????public?static?class?Reduce?extends?Reducer<Text, Text, Text, Text> {

?

????????//?實(shí)現(xiàn)reduce函數(shù)

????????public?void?reduce(Text key, Iterable<Text> values, Context context)

????????????????throws?IOException, InterruptedException {

?

????????????//?輸出表頭

????????????if?(0 ==?time) {

????????????????context.write(new?Text("factoryname"),?new?Text("addressname"));

????????????????time++;

??????????? }

?

????????????int?factorynum = 0;

??????????? String[] factory =?new?String[10];

????????????int?addressnum = 0;

??????????? String[]?address?=?new?String[10];

?

????????????Iterator?ite = values.iterator();

????????????while?(ite.hasNext()) {

??????????????? String record = ite.next().toString();

????????????????int?len = record.length();

????????????????int?i = 2;

????????????????if?(0 == len) {

????????????????????continue;

??????????????? }

?

????????????????//?取得左右表標(biāo)識

????????????????char?relationtype = record.charAt(0);

?

????????????????//?左表

????????????????if?('1'?== relationtype) {

??????????????????? factory[factorynum] = record.substring(i);

??????????????????? factorynum++;

??????????????? }

?

????????????????//?右表

????????????????if?('2'?== relationtype) {

????????????????????address[addressnum] = record.substring(i);

??????????????????? addressnum++;

??????????????? }

??????????? }

?

????????????//?求笛卡爾積

????????????if?(0 != factorynum && 0 != addressnum) {

????????????????for?(int?m = 0; m < factorynum; m++) {

????????????????????for?(int?n = 0; n < addressnum; n++) {

????????????????????????//?輸出結(jié)果

??????????????????????? context.write(new?Text(factory[m]),

????????????????????????????????new?Text(address[n]));

??????????????????? }

??????????????? }

??????????? }

?

??????? }

??? }

?

????public?static?void?main(String[] args)?throws?Exception {

??????? Configuration conf =?new?Configuration();

????????//?這句話很關(guān)鍵

??????? conf.set("mapred.job.tracker",?"192.168.1.2:9001");

?

??????? String[] ioArgs =?new?String[] {?"MTjoin_in",?"MTjoin_out"?};

??????? String[] otherArgs =?new?GenericOptionsParser(conf, ioArgs).getRemainingArgs();

????????if?(otherArgs.length?!= 2) {

??????????? System.err.println("Usage: Multiple Table Join <in> <out>");

??????????? System.exit(2);

??????? }

?

??????? Job job =?new?Job(conf,?"Multiple Table Join");

??????? job.setJarByClass(MTjoin.class);

?

????????//?設(shè)置Map和Reduce處理類

??????? job.setMapperClass(Map.class);

??????? job.setReducerClass(Reduce.class);

?

????????//?設(shè)置輸出類型

??????? job.setOutputKeyClass(Text.class);

??????? job.setOutputValueClass(Text.class);

?

????????//?設(shè)置輸入和輸出目錄

??????? FileInputFormat.addInputPath(job,?new?Path(otherArgs[0]));

??????? FileOutputFormat.setOutputPath(job,?new?Path(otherArgs[1]));

??????? System.exit(job.waitForCompletion(true) ? 0 : 1);

??? }

}

?

5.4 代碼結(jié)果

1)準(zhǔn)備測試數(shù)據(jù)

??? 通過Eclipse下面的"DFS Locations"在"/user/hadoop"目錄下創(chuàng)建輸入文件"MTjoin_in"文件夾(備注:"MTjoin_out"不需要創(chuàng)建。)如圖5.4-1所示,已經(jīng)成功創(chuàng)建。

?

???????? ?????? ?

圖5.4-1 創(chuàng)建"MTjoin_in"??????????????????????????????????????????????????????????? ?圖5.4.2 上傳兩個數(shù)據(jù)表

?

??? 然后在本地建立兩個txt文件,通過Eclipse上傳到"/user/hadoop/MTjoin_in"文件夾中,兩個txt文件的內(nèi)容如"實(shí)例描述"那兩個文件一樣。如圖5.4-2所示,成功上傳之后。

??? 從SecureCRT遠(yuǎn)處查看"Master.Hadoop"的也能證實(shí)我們上傳的兩個文件。

?

圖5.4.3 兩個數(shù)據(jù)表的內(nèi)容

2)查看運(yùn)行結(jié)果

??? 這時我們右擊Eclipse的"DFS Locations"中"/user/hadoop"文件夾進(jìn)行刷新,這時會發(fā)現(xiàn)多出一個"MTjoin_out"文件夾,且里面有3個文件,然后打開雙其"part-r-00000"文件,會在Eclipse中間把內(nèi)容顯示出來。如圖5.4-4所示。

?

圖5.4-4 運(yùn)行結(jié)果

6、倒排索引

??? "倒排索引"是文檔檢索系統(tǒng)最常用數(shù)據(jù)結(jié)構(gòu),被廣泛地應(yīng)用于全文搜索引擎。它主要是用來存儲某個單詞(或詞組)在一個文檔或一組文檔中的存儲位置映射,即提供了一種根據(jù)內(nèi)容來查找文檔方式。由于不是根據(jù)文檔來確定文檔所包含的內(nèi)容,而是進(jìn)行相反的操作,因而稱為倒排索引(Inverted Index)。

6.1 實(shí)例描述

??? 通常情況下,倒排索引由一個單詞(或詞組)以及相關(guān)的文檔列表組成,文檔列表中的文檔或者是標(biāo)識文檔的ID號,或者是指文檔所在位置的URL,如圖6.1-1所示。

?

圖6.1-1 倒排索引結(jié)構(gòu)

??? 從圖6.1-1可以看出,單詞1出現(xiàn)在{文檔1,文檔4,文檔13,……}中,單詞2出現(xiàn)在{文檔3,文檔5,文檔15,……}中,而單詞3出現(xiàn)在{文檔1,文檔8,文檔20,……}中。在實(shí)際應(yīng)用中,還需要每個文檔添加一個權(quán)值,用來指出每個文檔與搜索內(nèi)容的相關(guān)度,如圖6.1-2所示。

?

?

圖6.1-2 添加權(quán)重的倒排索引

??? 最常用的是使用詞頻作為權(quán)重,即記錄單詞在文檔中出現(xiàn)的次數(shù)。以英文為例,如圖6.1-3所示,索引文件中的"MapReduce"一行表示:"MapReduce"這個單詞在文本T0中出現(xiàn)過1次,T1中出現(xiàn)過1次,T2中出現(xiàn)過2次。當(dāng)搜索條件為"MapReduce"、"is"、"Simple"時,對應(yīng)的集合為:{T0,T1,T2}∩{T0,T1}∩{T0,T1}={T0,T1},即文檔T0和T1包含了所要索引的單詞,而且只有T0是連續(xù)的。

?

?

圖6.1-3 倒排索引示例

??? 更復(fù)雜的權(quán)重還可能要記錄單詞在多少個文檔中出現(xiàn)過,以實(shí)現(xiàn)TF-IDF(Term Frequency-Inverse Document Frequency)算法,或者考慮單詞在文檔中的位置信息(單詞是否出現(xiàn)在標(biāo)題中,反映了單詞在文檔中的重要性)等。

??? 樣例輸入如下所示。

????1)file1:

?

MapReduce is simple

?

????2)file2:

?

MapReduce is powerful is simple

?

????3)file3:

?

Hello MapReduce bye MapReduce

?

??? 樣例輸出如下所示。

?

MapReduce????? file1.txt:1;file2.txt:1;file3.txt:2;

is???????     file1.txt:1;file2.txt:2;

simple???????  ? file1.txt:1;file2.txt:1;

powerful???   file2.txt:1;

Hello???????   file3.txt:1;

bye???????  ?? file3.txt:1;

?

6.2 設(shè)計(jì)思路

??? 實(shí)現(xiàn)"倒排索引"只要關(guān)注的信息為:單詞文檔URL詞頻,如圖3-11所示。但是在實(shí)現(xiàn)過程中,索引文件的格式與圖6.1-3會略有所不同,以避免重寫OutPutFormat類。下面根據(jù)MapReduce的處理過程給出倒排索引設(shè)計(jì)思路

????1)Map過程

??? 首先使用默認(rèn)的TextInputFormat類對輸入文件進(jìn)行處理,得到文本中每行偏移量及其內(nèi)容。顯然,Map過程首先必須分析輸入的<key,value>對,得到倒排索引中需要的三個信息:單詞、文檔URL和詞頻,如圖6.2-1所示。

?

圖6.2-1 Map過程輸入/輸出

?

  這里存在兩個問題第一,<key,value>對只能有兩個值,在不使用Hadoop自定義數(shù)據(jù)類型的情況下,需要根據(jù)情況將其中兩個值合并成一個值,作為key或value值;第二,通過一個Reduce過程無法同時完成詞頻統(tǒng)計(jì)生成文檔列表,所以必須增加一個Combine過程完成詞頻統(tǒng)計(jì)

??? 這里講單詞和URL組成key值(如"MapReduce:file1.txt"),將詞頻作為value,這樣做的好處是可以利用MapReduce框架自帶的Map端排序,將同一文檔相同單詞詞頻組成列表,傳遞給Combine過程,實(shí)現(xiàn)類似于WordCount的功能。

????2)Combine過程

??? 經(jīng)過map方法處理后,Combine過程將key值相同的value值累加,得到一個單詞在文檔在文檔中的詞頻,如圖6.2-2所示。如果直接將圖6.2-2所示的輸出作為Reduce過程的輸入,在Shuffle過程時將面臨一個問題:所有具有相同單詞的記錄(由單詞、URL和詞頻組成)應(yīng)該交由同一個Reducer處理,但當(dāng)前的key值無法保證這一點(diǎn),所以必須修改key值和value值。這次將單詞作為key值,URL和詞頻組value值(如"file1.txt:1")。這樣做的好處是可以利用MapReduce框架默認(rèn)的HashPartitioner類完成Shuffle過程,將相同單詞所有記錄發(fā)送給同一個Reducer進(jìn)行處理

?

?

圖6.2-2 Combine過程輸入/輸出

????3)Reduce過程

??? 經(jīng)過上述兩個過程后,Reduce過程只需將相同key值的value值組合成倒排索引文件所需的格式即可,剩下的事情就可以直接交給MapReduce框架進(jìn)行處理了。如圖6.2-3所示。索引文件的內(nèi)容除分隔符外與圖6.1-3解釋相同。

????4)需要解決的問題

??? 本實(shí)例設(shè)計(jì)的倒排索引在文件數(shù)目沒有限制,但是單詞文件不宜過大(具體值與默認(rèn)HDFS塊大小及相關(guān)配置有關(guān)),要保證每個文件對應(yīng)一個split。否則,由于Reduce過程沒有進(jìn)一步統(tǒng)計(jì)詞頻,最終結(jié)果可能出現(xiàn)詞頻未統(tǒng)計(jì)完全單詞。可以通過重寫InputFormat類將每個文件為一個split,避免上述情況。或者執(zhí)行兩次MapReduce第一次MapReduce用于統(tǒng)計(jì)詞頻第二次MapReduce用于生成倒排索引。除此之外,還可以利用復(fù)合鍵值對等實(shí)現(xiàn)包含更多信息的倒排索引。

?

?

圖6.2-3 Reduce過程輸入/輸出

6.3 程序代碼

  程序代碼如下所示:

?

package?com.hebut.mr;

?

import?java.io.IOException;

import?java.util.StringTokenizer;

?

import?org.apache.hadoop.conf.Configuration;

import?org.apache.hadoop.fs.Path;

import?org.apache.hadoop.io.IntWritable;

import?org.apache.hadoop.io.Text;

import?org.apache.hadoop.mapreduce.Job;

import?org.apache.hadoop.mapreduce.Mapper;

import?org.apache.hadoop.mapreduce.Reducer;

import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import?org.apache.hadoop.mapreduce.lib.input.FileSplit;

import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import?org.apache.hadoop.util.GenericOptionsParser;

?

public?class?InvertedIndex {

?

????public?static?class?Map?extends?Mapper<Object, Text, Text, Text> {

?

????????private?Text?keyInfo?=?new?Text();?//?存儲單詞和URL組合

????????private?Text?valueInfo?=?new?Text();?//?存儲詞頻

????????private?FileSplit?split;?//?存儲Split對象

?

????????//?實(shí)現(xiàn)map函數(shù)

????????public?void?map(Object key, Text value, Context context)

????????????????throws?IOException, InterruptedException {

?

????????????//?獲得<key,value>對所屬的FileSplit對象

????????????split?= (FileSplit) context.getInputSplit();

?

??????????? StringTokenizer itr =?new?StringTokenizer(value.toString());

?

????????????while?(itr.hasMoreTokens()) {

????????????????// key值由單詞和URL組成,如"MapReduce:file1.txt"

????????????????//?獲取文件的完整路徑

????????????????//?keyInfo.set(itr.nextToken()+":"+split.getPath().toString());

????????????????//?這里為了好看,只獲取文件的名稱。

????????????????int?splitIndex =?split.getPath().toString().indexOf("file");

????????????????keyInfo.set(itr.nextToken() +?":"

??????????????????? +?split.getPath().toString().substring(splitIndex));

????????????????//?詞頻初始化為1

????????????????valueInfo.set("1");

?

??????????????? context.write(keyInfo,?valueInfo);

??????????? }

??????? }

??? }

?

????public?static?class?Combine?extends?Reducer<Text, Text, Text, Text> {

?

????????private?Text?info?=?new?Text();

?

????????//?實(shí)現(xiàn)reduce函數(shù)

????????public?void?reduce(Text key, Iterable<Text> values, Context context)

????????????????throws?IOException, InterruptedException {

?

????????????//?統(tǒng)計(jì)詞頻

????????????int?sum = 0;

????????????for?(Text value : values) {

??????????????? sum += Integer.parseInt(value.toString());

??????????? }

?

????????????int?splitIndex = key.toString().indexOf(":");

????????????//?重新設(shè)置value值由URL和詞頻組成

????????????info.set(key.toString().substring(splitIndex + 1) +?":"?+ sum);

????????????//?重新設(shè)置key值為單詞

??????????? key.set(key.toString().substring(0, splitIndex));

?

??????????? context.write(key,?info);

??????? }

??? }

?

????public?static?class?Reduce?extends?Reducer<Text, Text, Text, Text> {

?

????????private?Text?result?=?new?Text();

?

????????//?實(shí)現(xiàn)reduce函數(shù)

????????public?void?reduce(Text key, Iterable<Text> values, Context context)

????????????????throws?IOException, InterruptedException {

?

????????????//?生成文檔列表

??????????? String fileList =?new?String();

????????????for?(Text value : values) {

??????????????? fileList += value.toString() +?";";

??????????? }

?

????????????result.set(fileList);

?

??????????? context.write(key,?result);

??????? }

??? }

?

????public?static?void?main(String[] args)?throws?Exception {

??????? Configuration conf =?new?Configuration();

????????//?這句話很關(guān)鍵

??????? conf.set("mapred.job.tracker",?"192.168.1.2:9001");

?

??????? String[] ioArgs =?new?String[] {?"index_in",?"index_out"?};

??????? String[] otherArgs =?new?GenericOptionsParser(conf, ioArgs)

??????????????? .getRemainingArgs();

????????if?(otherArgs.length?!= 2) {

??????????? System.err.println("Usage: Inverted Index <in> <out>");

??????????? System.exit(2);

??????? }

?

??????? Job job =?new?Job(conf,?"Inverted Index");

??????? job.setJarByClass(InvertedIndex.class);

?

????????//?設(shè)置Map、Combine和Reduce處理類

??????? job.setMapperClass(Map.class);

??????? job.setCombinerClass(Combine.class);

??????? job.setReducerClass(Reduce.class);

?

????????//?設(shè)置Map輸出類型

??????? job.setMapOutputKeyClass(Text.class);

??????? job.setMapOutputValueClass(Text.class);

?

????????//?設(shè)置Reduce輸出類型

??????? job.setOutputKeyClass(Text.class);

??????? job.setOutputValueClass(Text.class);

?

????????//?設(shè)置輸入和輸出目錄

??????? FileInputFormat.addInputPath(job,?new?Path(otherArgs[0]));

??????? FileOutputFormat.setOutputPath(job,?new?Path(otherArgs[1]));

??????? System.exit(job.waitForCompletion(true) ? 0 : 1);

??? }

}

?

6.4 代碼結(jié)果

1)準(zhǔn)備測試數(shù)據(jù)

??? 通過Eclipse下面的"DFS Locations"在"/user/hadoop"目錄下創(chuàng)建輸入文件"index_in"文件夾(備注:"index_out"不需要創(chuàng)建。)如圖6.4-1所示,已經(jīng)成功創(chuàng)建。

?

????????????????

圖6.4-1 創(chuàng)建"index_in"???????????????????????????????????????????? 圖6.4.2 上傳"file*.txt"

?

??? 然后在本地建立三個txt文件,通過Eclipse上傳到"/user/hadoop/index_in"文件夾中,三個txt文件的內(nèi)容如"實(shí)例描述"那三個文件一樣。如圖6.4-2所示,成功上傳之后。

??? 從SecureCRT遠(yuǎn)處查看"Master.Hadoop"的也能證實(shí)我們上傳的三個文件。

?

圖6.4.3 三個"file*.txt"的內(nèi)容

2)查看運(yùn)行結(jié)果

??? 這時我們右擊Eclipse的"DFS Locations"中"/user/hadoop"文件夾進(jìn)行刷新,這時會發(fā)現(xiàn)多出一個"index_out"文件夾,且里面有3個文件,然后打開雙其"part-r-00000"文件,會在Eclipse中間把內(nèi)容顯示出來。如圖6.4-4所示。

?

圖6.4-4 運(yùn)行結(jié)果

?

?

  文章下載地址:http://files.cnblogs.com/xia520pi/HadoopCluster_Vol.9.rar

總結(jié)

以上是生活随笔為你收集整理的hadoop 入门实例【转】的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。