日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Hadoop入门(十)Mapreduce高级shuffle之Sort和Group

發(fā)布時(shí)間:2023/12/3 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hadoop入门(十)Mapreduce高级shuffle之Sort和Group 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

一、排序分組概述

MapReduce中排序和分組在哪里被執(zhí)行

第3步中需要對不同分區(qū)中的數(shù)據(jù)進(jìn)行排序和分組,默認(rèn)情況按照key進(jìn)行排序和分組

?

二、排序

在Hadoop默認(rèn)的排序算法中,只會針對key值進(jìn)行排序

任務(wù):
數(shù)據(jù)文件中,如果按照第一列升序排列,
當(dāng)?shù)谝涣邢嗤瑫r(shí),第二列升序排列
如果當(dāng)?shù)谝涣邢嗤瑫r(shí),求出第二列的最小值

自定義排序

1.封裝一個(gè)自定義類型作為key的新類型:將第一列與第二列都作為key

WritableComparable接口

定義:

public interface WritableComparable<T> extends Writable, Comparable<T> { }

自定義類型MyNewKey實(shí)現(xiàn)了WritableComparable的接口,該接口中有一個(gè)compareTo()方法,當(dāng)對key進(jìn)行比較時(shí)會調(diào)用該方法,而我們將其改為了我們自己定義的比較規(guī)則,從而實(shí)現(xiàn)我們想要的效果
?

private static class MyNewKey implements WritableComparable<MyNewKey> {long firstNum;long secondNum;public MyNewKey() {}public MyNewKey(long first, long second) {firstNum = first;secondNum = second;}@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(firstNum);out.writeLong(secondNum);}@Overridepublic void readFields(DataInput in) throws IOException {firstNum = in.readLong();secondNum = in.readLong();}/** 當(dāng)key進(jìn)行排序時(shí)會調(diào)用以下這個(gè)compreTo方法*/@Overridepublic int compareTo(MyNewKey anotherKey) {long min = firstNum - anotherKey.firstNum;if (min != 0) {// 說明第一列不相等,則返回兩數(shù)之間小的數(shù)return (int) min;} else {return (int) (secondNum - anotherKey.secondNum);}}}

2.改寫最初的MapReduce方法函數(shù)

public static class MyMapper extendsMapper<LongWritable, Text, MyNewKey, LongWritable> {protected void map(LongWritable key,Text value,Mapper<LongWritable, Text, MyNewKey, LongWritable>.Context context)throws java.io.IOException, InterruptedException {String[] spilted = value.toString().split("\t");long firstNum = Long.parseLong(spilted[0]);long secondNum = Long.parseLong(spilted[1]);// 使用新的類型作為key參與排序MyNewKey newKey = new MyNewKey(firstNum, secondNum);context.write(newKey, new LongWritable(secondNum));};} public static class MyReducer extendsReducer<MyNewKey, LongWritable, LongWritable, LongWritable> {protected void reduce(MyNewKey key,java.lang.Iterable<LongWritable> values,Reducer<MyNewKey, LongWritable, LongWritable, LongWritable>.Context context)throws java.io.IOException, InterruptedException {context.write(new LongWritable(key.firstNum), new LongWritable(key.secondNum));};}

?

三、分組

在Hadoop中的默認(rèn)分組規(guī)則中,也是基于Key進(jìn)行的,會將相同key的value放到一個(gè)集合中去

目的:求出第一列相同時(shí)第二列的最小值
上面的例子看分組,因?yàn)槲覀冏远x了一個(gè)新的key,它是以兩列數(shù)據(jù)作為key的,因此這6行數(shù)據(jù)中每個(gè)key都不相同產(chǎn)生6組
它們是:1 1,2 1,2 2,3 1,3 2,3 3。

而實(shí)際上只可以分為3組,分別是1,2,3。現(xiàn)在首先改寫一下reduce函數(shù)代碼

public static class MyReducer extendsReducer<MyNewKey, LongWritable, LongWritable, LongWritable> {protected void reduce(MyNewKey key,java.lang.Iterable<LongWritable> values,Reducer<MyNewKey, LongWritable, LongWritable, LongWritable>.Context context)throws java.io.IOException, InterruptedException {long min = Long.MAX_VALUE;for (LongWritable number : values) {long temp = number.get();if (temp < min) {min = temp;}}context.write(new LongWritable(key.firstNum), new LongWritable(min));};}

自定義分組

為了針對新的key類型作分組,我們也需要自定義一下分組規(guī)則:

private static class MyGroupingComparator implementsRawComparator<MyNewKey> {/** 基本分組規(guī)則:按第一列firstNum進(jìn)行分組*/@Overridepublic int compare(MyNewKey key1, MyNewKey key2) {return (int) (key1.firstNum - key2.firstNum);}/** @param b1 表示第一個(gè)參與比較的字節(jié)數(shù)組*?* @param s1 表示第一個(gè)參與比較的字節(jié)數(shù)組的起始位置*?* @param l1 表示第一個(gè)參與比較的字節(jié)數(shù)組的偏移量*?* @param b2 表示第二個(gè)參與比較的字節(jié)數(shù)組*?* @param s2 表示第二個(gè)參與比較的字節(jié)數(shù)組的起始位置*?* @param l2 表示第二個(gè)參與比較的字節(jié)數(shù)組的偏移量*/@Overridepublic int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);}}

自定義了一個(gè)分組比較器MyGroupingComparator,該類實(shí)現(xiàn)了RawComparator接口,而RawComparator接口又實(shí)現(xiàn)了Comparator接口,這兩個(gè)接口的定義:

public interface RawComparator<T> extends Comparator<T> {public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2); } public interface Comparator<T> {int compare(T o1, T o2);boolean equals(Object obj); }

分組實(shí)現(xiàn)步驟:

1.MyGroupingComparator實(shí)現(xiàn)這兩個(gè)接口
RawComparator中的compare()方法是基于字節(jié)的比較,
Comparator中的compare()方法是基于對象的比較

由于在MyNewKey中有兩個(gè)long類型,每個(gè)long類型又占8個(gè)字節(jié)。這里因?yàn)楸容^的是第一列數(shù)字,所以讀取的偏移量為8字節(jié)。

2.添加對分組規(guī)則的設(shè)置:
  // 設(shè)置自定義分組規(guī)則
? ?job.setGroupingComparatorClass(MyGroupingComparator.class);

3. 運(yùn)行結(jié)果:

?

?

總結(jié)

以上是生活随笔為你收集整理的Hadoop入门(十)Mapreduce高级shuffle之Sort和Group的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。