日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java多个mapreduce_一个简单的MapReduce示例(多个MapReduce任务处理)

發布時間:2024/9/30 编程问答 18 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java多个mapreduce_一个简单的MapReduce示例(多个MapReduce任务处理) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、需求

有一個列表,只有兩列:id、pro,記錄了id與pro的對應關系,但是在同一個id下,pro有可能是重復的。

現在需要寫一個程序,統計一下每個id下有多少個不重復的pro。

為了寫一個完整的示例,我使用了多job!

二、文件目錄

|- OutCount //單Job的,本次試驗沒有使用到,這里寫出來供參考

|-OutCount2|-OutCountMapper|-OutCountMapper2|-OutCountReduce|- OutCountReduce2

三、樣本數據(部分)

2,10000088379

9,10000088379

6,10000088379

1,10000088379

8,10000088379

0,10000088379

1,10000088379

4,10000091621

3,10000091621

2,10000091621

0,10000091621

6,10000091621

2,10000091621

0,10000091621

0,10000091621

9,10000091621

2,10000091621

四、Java代碼

1、OutCountMapper.java

importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Mapper;importjava.io.IOException;/*** created by wangjunfu on 2017-05-25.

* 4個泛型中,前兩個是指定mapper輸入數據的類型,KEYIN是輸入的key的類型,VALUEIN是輸入的value的類型

* map 和 reduce 的數據輸入輸出都是以 key-value對的形式封裝的

* 默認情況下,Map框架傳遞給我們的mapper的輸入數據中,key是要處理的文本中一行的起始偏移量(選用LongWritable),value是這一行的內容(VALUEIN選用Text)

* 在wordcount中,經過mapper處理數據后,得到的是這樣的結果,所以KEYOUT選用Text,VAULEOUT選用IntWritable*/

public class OutCountMapper extends Mapper{//MapReduce框架每讀一行數據就調用一次map方法

public void map(LongWritable key, Text value, Context context) throwsIOException, InterruptedException {//數據格式:uid skuid

String oneline = value.toString().replace(',', '_').trim();//去重思路:Map的key具有數據去重的功能,以整個數據作為key發送出去, value為null

context.write(new Text(oneline), new Text(""));/*// 這里需要說明一下,我們現在的樣本是標準的,一行一個樣本。

// 有的情況下一行多個,那就需要進行分割。

// 對這一行的文本按特定分隔符切分

String[] words = oneline.split("\t");

for (String word : words) {

// 遍歷這個單詞數組,輸出為key-value形式 key:單詞 value : 1

context.write(new Text(word), new IntWritable(1));

}*/}

}

2、OutCountReduce.java

importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Reducer;importjava.io.IOException;/*** created by wangjunfu on 2017-05-25.

* 經過mapper處理后的數據會被reducer拉取過來,所以reducer的KEYIN、VALUEIN和mapper的KEYOUT、VALUEOUT一致

* 經過reducer處理后的數據格式為,所以KEYOUT為Text,VALUEOUT為IntWritable*/

public class OutCountReduce extends Reducer{//當mapper框架將相同的key的數據處理完成后,reducer框架會將mapper框架輸出的數據變成。//例如,在wordcount中會將mapper框架輸出的所有變為,即這里的,然后將作為reduce函數的輸入//這個將在下面reduce2 中得到體現

public void reduce(Text key, Iterable values, Context context) throwsIOException, InterruptedException {

context.write(key,new Text(""));

}

}

3、OutCountMapper2.java

importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Mapper;importjava.io.IOException;/*** created by wangjunfu on 2017-05-27.

* 將原始數據作為map輸出的key設置為int類型。map會自動的根據key進行排序*/

public class OutCountMapper2 extends Mapper{private final static IntWritable one = new IntWritable(1);public void map(LongWritable key, Text value, Context context) throwsIOException, InterruptedException {//數據格式:uid_skuid

String oneline =value.toString();//將這條數據中的uid 發出去, value為計算one

context.write(new Text(oneline.split("_")[0]), one);

}

}

4、OutCountReduce2.java

importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Reducer;importjava.io.IOException;importjava.util.Iterator;/*** created by wangjunfu on 2017-05-27.

* 按統計數排序:將values作為次序key,將map排序好的key作為value輸出*/

public class OutCountReduce2 extends Reducer{public void reduce(Text key, Iterable values, Context context) throwsIOException, InterruptedException {int sum = 0;//迭代器,訪問容器中的元素,為容器而生

Iterator itr =values.iterator();while(itr.hasNext()) {

sum+=itr.next().get();

}/*// 這種遍歷也可以

// 遍歷v2的list,進行累加求和

for (IntWritable v2 : itr) {

sum = v2.get();

}*/

//按統計數排序:將values作為次序key,將map排序好的key作為value輸出//context.write(new IntWritable(sum), key);//需要再起一個 map-reduce

context.write(key, newIntWritable(sum));

}

}

5、OutCount2.java

importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapred.JobConf;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;importorg.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/*** 需求:給定一個列表uid skuid,求出uid下不重復的skuid數據;然后再按統計大小排序。

* 涉及到多job 處理。

* created by wangjunfu on 2017-05-27.*/

public classOutCount2 {public static void main(String[] args) throwsException {

JobConf conf= new JobConf(OutCount.class);//第一個job的配置

Job job1 = new Job(conf, "Join1");

job1.setJarByClass(OutCount.class);

job1.setMapperClass(OutCountMapper.class);

job1.setReducerClass(OutCountReduce.class);

job1.setMapOutputKeyClass(Text.class); //map階段的輸出的key

job1.setMapOutputValueClass(Text.class); //map階段的輸出的value

job1.setOutputKeyClass(Text.class); //reduce階段的輸出的key

job1.setOutputValueClass(Text.class); //reduce階段的輸出的value//job-1 加入控制容器

ControlledJob ctrljob1 = newControlledJob(conf);

ctrljob1.setJob(job1);//job-1 的輸入輸出文件路徑

FileInputFormat.addInputPath(job1, new Path(args[0]));

FileOutputFormat.setOutputPath(job1,new Path(args[1]));//第二個job的配置

Job job2 = new Job(conf, "Join2");

job2.setJarByClass(OutCount.class); //設置job所在的類在哪個jar包

job2.setMapperClass(OutCountMapper2.class); //指定job所用的mappe類

job2.setReducerClass(OutCountReduce2.class); //指定job所用的reducer類//指定mapper輸出類型和reducer輸出類型//由于在wordcount中mapper和reducer的輸出類型一致,//所以使用setOutputKeyClass和setOutputValueClass方法可以同時設定mapper和reducer的輸出類型//如果mapper和reducer的輸出類型不一致時,可以使用setMapOutputKeyClass和setMapOutputValueClass單獨設置mapper的輸出類型

job2.setMapOutputKeyClass(Text.class); //map階段的輸出的key

job2.setMapOutputValueClass(IntWritable.class); //map階段的輸出的value

job2.setOutputKeyClass(Text.class); //reduce階段的輸出的key

job2.setOutputValueClass(IntWritable.class); //reduce階段的輸出的value//job-2 加入控制容器

ControlledJob ctrljob2 = newControlledJob(conf);

ctrljob2.setJob(job2);//設置多個作業直接的依賴關系//job-2 的啟動,依賴于job-1作業的完成

ctrljob2.addDependingJob(ctrljob1);//輸入路徑是上一個作業的輸出路徑,因此這里填args[1],要和上面對應好

FileInputFormat.addInputPath(job2, new Path(args[1]));//輸出路徑從新傳入一個參數,這里需要注意,因為我們最后的輸出文件一定要是沒有出現過得//因此我們在這里new Path(args[2])因為args[2]在上面沒有用過,只要和上面不同就可以了

FileOutputFormat.setOutputPath(job2, new Path(args[2]));//主的控制容器,控制上面的總的兩個子作業

JobControl jobCtrl = new JobControl("myOutCount");//添加到總的JobControl里,進行控制

jobCtrl.addJob(ctrljob1);

jobCtrl.addJob(ctrljob2);//在線程啟動,記住一定要有這個

Thread t = newThread(jobCtrl);

t.start();while (true) {if(jobCtrl.allFinished()) {//如果作業成功完成,就打印成功作業的信息

System.out.println(jobCtrl.getSuccessfulJobList());

jobCtrl.stop();break;

}

}

}

}

6、OutCount.java

單Job的,本次試驗沒有使用到,這里寫出來供參考

importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.util.GenericOptionsParser;/*** 需求:給定一個列表uid skuid,求出uid下不重復的skuid數據;然后再按統計大小排序。

* 涉及到多job 處理。

* created by wangjunfu on 2017-05-25.*/

public classOutCount {public static void main(String[] args) throwsException {

Configuration conf= new Configuration(); //指定作業執行規范

String[] otherArgs = newGenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {

System.err.println("Usage:wordcount ");

System.exit(2);

}

Job job= new Job(conf, "word count"); //指定job名稱,及運行對象

job.setJarByClass(OutCount.class);

job.setMapperClass(OutCountMapper.class); //指定map函數

job.setCombinerClass(OutCountReduce.class); //是否需要conbiner整合

job.setReducerClass(OutCountReduce.class); //指定reduce函數

job.setOutputKeyClass(Text.class); //輸出key格式

job.setOutputValueClass(IntWritable.class); //輸出value格式

org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //處理文件路徑

org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); //結果輸出路徑//將job提交給集群運行

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

五、結果

11 0

11 1

7 2

10 3

10 4

9 5

10 6

7 7

13 8

9 9

總結

以上是生活随笔為你收集整理的java多个mapreduce_一个简单的MapReduce示例(多个MapReduce任务处理)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。