Hadoop入门(十)Mapreduce高级shuffle之Sort和Group
一、排序分組概述
MapReduce中排序和分組在哪里被執(zhí)行
第3步中需要對不同分區(qū)中的數(shù)據(jù)進(jìn)行排序和分組,默認(rèn)情況按照key進(jìn)行排序和分組
?
二、排序
在Hadoop默認(rèn)的排序算法中,只會針對key值進(jìn)行排序
任務(wù):
數(shù)據(jù)文件中,如果按照第一列升序排列,
當(dāng)?shù)谝涣邢嗤瑫r(shí),第二列升序排列
如果當(dāng)?shù)谝涣邢嗤瑫r(shí),求出第二列的最小值
自定義排序
1.封裝一個(gè)自定義類型作為key的新類型:將第一列與第二列都作為key
WritableComparable接口
定義:
public interface WritableComparable<T> extends Writable, Comparable<T> { }自定義類型MyNewKey實(shí)現(xiàn)了WritableComparable的接口,該接口中有一個(gè)compareTo()方法,當(dāng)對key進(jìn)行比較時(shí)會調(diào)用該方法,而我們將其改為了我們自己定義的比較規(guī)則,從而實(shí)現(xiàn)我們想要的效果
?
2.改寫最初的MapReduce方法函數(shù)
public static class MyMapper extendsMapper<LongWritable, Text, MyNewKey, LongWritable> {protected void map(LongWritable key,Text value,Mapper<LongWritable, Text, MyNewKey, LongWritable>.Context context)throws java.io.IOException, InterruptedException {String[] spilted = value.toString().split("\t");long firstNum = Long.parseLong(spilted[0]);long secondNum = Long.parseLong(spilted[1]);// 使用新的類型作為key參與排序MyNewKey newKey = new MyNewKey(firstNum, secondNum);context.write(newKey, new LongWritable(secondNum));};} public static class MyReducer extendsReducer<MyNewKey, LongWritable, LongWritable, LongWritable> {protected void reduce(MyNewKey key,java.lang.Iterable<LongWritable> values,Reducer<MyNewKey, LongWritable, LongWritable, LongWritable>.Context context)throws java.io.IOException, InterruptedException {context.write(new LongWritable(key.firstNum), new LongWritable(key.secondNum));};}?
三、分組
在Hadoop中的默認(rèn)分組規(guī)則中,也是基于Key進(jìn)行的,會將相同key的value放到一個(gè)集合中去
目的:求出第一列相同時(shí)第二列的最小值
上面的例子看分組,因?yàn)槲覀冏远x了一個(gè)新的key,它是以兩列數(shù)據(jù)作為key的,因此這6行數(shù)據(jù)中每個(gè)key都不相同產(chǎn)生6組
它們是:1 1,2 1,2 2,3 1,3 2,3 3。
而實(shí)際上只可以分為3組,分別是1,2,3。現(xiàn)在首先改寫一下reduce函數(shù)代碼
public static class MyReducer extendsReducer<MyNewKey, LongWritable, LongWritable, LongWritable> {protected void reduce(MyNewKey key,java.lang.Iterable<LongWritable> values,Reducer<MyNewKey, LongWritable, LongWritable, LongWritable>.Context context)throws java.io.IOException, InterruptedException {long min = Long.MAX_VALUE;for (LongWritable number : values) {long temp = number.get();if (temp < min) {min = temp;}}context.write(new LongWritable(key.firstNum), new LongWritable(min));};}自定義分組
為了針對新的key類型作分組,我們也需要自定義一下分組規(guī)則:
private static class MyGroupingComparator implementsRawComparator<MyNewKey> {/** 基本分組規(guī)則:按第一列firstNum進(jìn)行分組*/@Overridepublic int compare(MyNewKey key1, MyNewKey key2) {return (int) (key1.firstNum - key2.firstNum);}/** @param b1 表示第一個(gè)參與比較的字節(jié)數(shù)組*?* @param s1 表示第一個(gè)參與比較的字節(jié)數(shù)組的起始位置*?* @param l1 表示第一個(gè)參與比較的字節(jié)數(shù)組的偏移量*?* @param b2 表示第二個(gè)參與比較的字節(jié)數(shù)組*?* @param s2 表示第二個(gè)參與比較的字節(jié)數(shù)組的起始位置*?* @param l2 表示第二個(gè)參與比較的字節(jié)數(shù)組的偏移量*/@Overridepublic int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);}}自定義了一個(gè)分組比較器MyGroupingComparator,該類實(shí)現(xiàn)了RawComparator接口,而RawComparator接口又實(shí)現(xiàn)了Comparator接口,這兩個(gè)接口的定義:
public interface RawComparator<T> extends Comparator<T> {public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2); } public interface Comparator<T> {int compare(T o1, T o2);boolean equals(Object obj); }分組實(shí)現(xiàn)步驟:
1.MyGroupingComparator實(shí)現(xiàn)這兩個(gè)接口
RawComparator中的compare()方法是基于字節(jié)的比較,
Comparator中的compare()方法是基于對象的比較
由于在MyNewKey中有兩個(gè)long類型,每個(gè)long類型又占8個(gè)字節(jié)。這里因?yàn)楸容^的是第一列數(shù)字,所以讀取的偏移量為8字節(jié)。
2.添加對分組規(guī)則的設(shè)置:
// 設(shè)置自定義分組規(guī)則
? ?job.setGroupingComparatorClass(MyGroupingComparator.class);
3. 運(yùn)行結(jié)果:
?
?
總結(jié)
以上是生活随笔為你收集整理的Hadoop入门(十)Mapreduce高级shuffle之Sort和Group的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 杂牌电源对电脑影响大吗杂牌电源对电脑的影
- 下一篇: Hadoop入门(十四)Mapreduc