日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

05 MapReduce应用案例01

發布時間:2023/11/30 编程问答 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 05 MapReduce应用案例01 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1、單詞計數

在一定程度上反映了MapReduce設計的初衷--對日志文件進行分析。

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{//該方法循環調用,從文件的split中讀取每行調用一次,把該行所在的下標為key,該行的內容為valueprotected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {String[] words = StringUtils.split(value.toString(), ' ');for(String w :words){context.write(new Text(w), new IntWritable(1));}} } public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{//每組調用一次,這一組數據特點:key相同,value可能有多個。protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {int sum =0;for(IntWritable i: values){sum=sum+i.get();}context.write(key, new IntWritable(sum));} } public class RunJob {public static void main(String[] args) {Configuration config =new Configuration(); // config.set("fs.defaultFS", "hdfs://node1:8020"); // config.set("yarn.resourcemanager.hostname", "node1"); // config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\wc.jar");try {FileSystem fs =FileSystem.get(config);Job job =Job.getInstance(config);job.setJarByClass(RunJob.class);job.setJobName("wc");job.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path("/usr/input/"));Path outpath =new Path("/usr/output/wc");if(fs.exists(outpath)){fs.delete(outpath, true);}FileOutputFormat.setOutputPath(job, outpath);boolean f= job.waitForCompletion(true);if(f){System.out.println("job completed!");}} catch (Exception e) {e.printStackTrace();}} }

2、數據去重

最終目標是讓原始數據中出現次數超過一次的數據在輸出文件中只出現一次。

自然會想到將同一個數據的所有記錄都交給一臺Reduce機器,無論這個數據出現多少次,只要在最終結果中輸出一次就可以了。

將單次計數程序稍加改動即可。

public class DedupMapper extends Mapper<LongWritable, Text, Text, NullWritable>{protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {context.write(key, NullWritable.get());} } public class DedupReducer extends Reducer<Text, NullWritable, Text, NullWritable>{protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {context.write(key, NullWritable.get());} } public class RunJob {public static void main(String[] args) {Configuration config =new Configuration(); // config.set("fs.defaultFS", "hdfs://node1:8020"); // config.set("yarn.resourcemanager.hostname", "node1");config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\wc.jar");try {FileSystem fs =FileSystem.get(config);Job job =Job.getInstance(config);job.setJarByClass(RunJob.class);job.setJobName("dedup");job.setMapperClass(DedupMapper.class);job.setReducerClass(DedupReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);FileInputFormat.addInputPath(job, new Path("/usr/input/"));Path outpath =new Path("/usr/output/dedup");if(fs.exists(outpath)){fs.delete(outpath, true);}FileOutputFormat.setOutputPath(job, outpath);boolean f= job.waitForCompletion(true);if(f){System.out.println("job completed!");}} catch (Exception e) {e.printStackTrace();}} }



3、排序

對輸入文件中的內容進行排序。

輸入文件中的每行內容均為一個數字,即一個數據。

要求在輸出中每行有兩個間隔的數字,第二個數字代表原始數據,第一個數字代表原始數據的位次。

樣例輸入:

file1:

2

32

654

32

15

765

65223

file2:

5956

22

650

92

file3:

26

54

6

樣例輸出:

1 2

2 6

3 15

4 22

5 26

6 32

7 32

8 54

9 92

10 650

11 654

12 756

13 5956

14 65223


設計思路:

可以利用MapReduce過程中默認的排序,而不需要自己再實現排序。

重點:

1、待排序數據作為Map任務的key;

2、需要重寫partition類,保證整體有序,具體做法是用輸入數據的最大值除以系統partition數量的商作為分割數據的邊界,即分割數據的邊界為此商的1倍、2倍至numPartitions-1倍,這樣就能保證執行完partition后是整體有序的。

3、Reduce獲得<key, value-list>,根據value-list中元素的個數將輸入的key作為value的輸出次數。

package hadoop;import java.io.IOException;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class Sort {public static class SortMapper extends Mapper<Object, Text, IntWritable, NullWritable>{private NullWritable nw = NullWritable.get();@Overrideprotected void map(Object key, Text value, Mapper<Object, Text, IntWritable, NullWritable>.Context context)throws IOException, InterruptedException{context.write(new IntWritable(Integer.parseInt(value.toString().trim())), nw);}}public static class SortReducer extends Reducer<IntWritable, NullWritable, IntWritable, IntWritable>{private IntWritable counter = new IntWritable(1);@Overrideprotected void reduce(IntWritable key, Iterable<NullWritable> values,Reducer<IntWritable, NullWritable, IntWritable, IntWritable>.Context context)throws IOException, InterruptedException{for(NullWritable nw : values){context.write(counter, key);counter = new IntWritable(counter.get() + 1);}}}public static class SortPartitioner extends Partitioner<IntWritable, NullWritable>{//numPartitions equals with the number of reduce tasks@Overridepublic int getPartition(IntWritable key, NullWritable value, int numPartitions){int maxNumber = 65223;int bound = maxNumber/numPartitions;int keyNumber = key.get();for (int i = 0; i < numPartitions; ++i){if (keyNumber <= (i+1)*bound)return i;}return 0;}}public static void main(String[] args) throws Exception{Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(Sort.class);job.setJobName("sort");job.setMapperClass(SortMapper.class);job.setReducerClass(SortReducer.class);job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(IntWritable.class);job.setMapOutputKeyClass(IntWritable.class);job.setMapOutputValueClass(NullWritable.class);job.setNumReduceTasks(5);job.setPartitionerClass(SortPartitioner.class);String inputFile = "/home/jinzhao/dataset/input";String outputFile = "/home/jinzhao/dataset/output";FileInputFormat.setInputPaths(job, new Path(inputFile));Path output = new Path(outputFile);FileSystem fs = FileSystem.get(conf);if (fs.exists(output))fs.delete(output, true);FileOutputFormat.setOutputPath(job, output);job.waitForCompletion(true);} }

4、單表關聯

樣例輸入:

file:

child parent

Tom Lucy

Tom Jack

Jone Lucy

Jone Jack

Lucy Mary

Lucy Ben

Jack Alice

Jack Jesse

Terry Alice

Terry Jesse

Philip Terry

Philip Alma

Mark Terry

Mark Alma

樣例輸出:

file:

grandchild grandparent

Tom Alice

Tom Jesse

Jone Alice

Jone Jesse

Tom Mary

Tom Ben

Jone Mary

Jone Ben

Philip Alice

philip Jesse

Mark Alice

Mark Jesse


package hadoop;import java.io.IOException; import java.util.ArrayList; import java.util.List;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class stlink {private static boolean flag = true;public static class stlinkMapper extends Mapper<Object, Text, Text, Text>{@Overrideprotected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)throws IOException, InterruptedException{String[] names = value.toString().trim().split("\t");if (names[0].compareTo("child") != 0){ context.write(new Text(names[0]), new Text("parent:"+names[1]));context.write(new Text(names[1]), new Text("child:"+names[0]));}}}public static class stlinkReducer extends Reducer<Text, Text, Text, Text>{@Overrideprotected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)throws IOException, InterruptedException{if (flag){context.write(new Text("grandchild"), new Text("grandparent"));flag = false;}List<String> children = new ArrayList<String>();List<String> parents = new ArrayList<String>();for(Text t : values){String[] kv = t.toString().split(":");if (kv[0].compareTo("child") == 0)children.add(kv[1]);elseparents.add(kv[1]);}for(String c : children)for(String p : parents)context.write(new Text(c), new Text(p));}}public static void main(String[] args) throws Exception{Configuration conf = new Configuration();Job stlinkJob = Job.getInstance(conf);stlinkJob.setJarByClass(stlink.class);stlinkJob.setJobName("single table link");stlinkJob.setMapperClass(stlinkMapper.class);stlinkJob.setReducerClass(stlinkReducer.class);stlinkJob.setOutputKeyClass(Text.class);stlinkJob.setOutputValueClass(Text.class);stlinkJob.setMapOutputKeyClass(Text.class);stlinkJob.setMapOutputValueClass(Text.class);Path input = new Path("/home/jinzhao/dataset/input");Path output = new Path("/home/jinzhao/dataset/output");FileInputFormat.setInputPaths(stlinkJob, input);FileSystem fs = FileSystem.get(conf);if (fs.exists(output))fs.delete(output, true);FileOutputFormat.setOutputPath(stlinkJob, output);stlinkJob.waitForCompletion(true);} }

5、多表關聯

樣例輸入:

factory:

factoryname addressed

Beijing Red Star 1

Shenzhen Thunder 3

Guangzhou Honda 2

Beijing Rising 1

Guangzhou Development Bank 2

Tencent 3

Bank of Beijing 1

address:

1 Beijing

2 Guangzhou

3 Shenzhen

4 Xian

package hadoop;import java.io.IOException; import java.util.ArrayList; import java.util.List;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class mtlink {private static boolean flag = true;public static class mtlinkMapper extends Mapper<Object, Text, Text, Text>{@Overrideprotected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context)throws IOException, InterruptedException{String str = value.toString();if (str.contains("factoryname") || str.contains("addressname"))return;String[] infos = str.trim().split(" ");if (infos[0].charAt(0) >= '0' && infos[0].charAt(0) <= '9')context.write(new Text(infos[0]), new Text("right:" + strCombine(infos, "right")));elsecontext.write(new Text(infos[infos.length - 1]), new Text("left:" + strCombine(infos, "left")));}private String strCombine(String[] strs, String direction){StringBuilder sb = new StringBuilder();if (direction.compareTo("right") == 0)for(int i = 1; i < strs.length; ++i)sb.append(strs[i] + " ");elsefor (int i = 0; i < strs.length - 1; ++i)sb.append(strs[i] + " ");return sb.toString().trim();}}public static class mtlinkReducer extends Reducer<Text, Text, Text, Text>{@Overrideprotected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)throws IOException, InterruptedException{if (flag){context.write(new Text("factoryname"), new Text("adressname"));flag = false;}List<String> companies = new ArrayList<String>();String place = "huoxing";for (Text t : values){String[] kv = t.toString().trim().split(":");if (kv[0].compareTo("right") == 0)place = kv[1];elsecompanies.add(kv[1]);}for (String s : companies)context.write(new Text(s), new Text(place));}}public static void main(String[] args) throws Exception{Configuration conf = new Configuration();Job mtlinkJob = Job.getInstance(conf);mtlinkJob.setJarByClass(mtlink.class);mtlinkJob.setJobName("multiple tables link");mtlinkJob.setMapperClass(mtlinkMapper.class);mtlinkJob.setReducerClass(mtlinkReducer.class);mtlinkJob.setOutputKeyClass(Text.class);mtlinkJob.setOutputValueClass(Text.class);Path input = new Path("/home/jinzhao/dataset/input");Path output = new Path("/home/jinzhao/dataset/output");FileInputFormat.setInputPaths(mtlinkJob, input);FileSystem fs = FileSystem.get(conf);if (fs.exists(output))fs.delete(output, true);FileOutputFormat.setOutputPath(mtlinkJob, output);mtlinkJob.waitForCompletion(true);} }

總結

以上是生活随笔為你收集整理的05 MapReduce应用案例01的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。