日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Hadoop学习笔记—11.MapReduce中的排序和分组

發(fā)布時間:2024/4/17 编程问答 38 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hadoop学习笔记—11.MapReduce中的排序和分组 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Hadoop學習筆記—11.MapReduce中的排序和分組

一、寫在之前的

1.1 回顧Map階段四大步驟

  首先,我們回顧一下在MapReduce中,排序和分組在哪里被執(zhí)行:

  從上圖中可以清楚地看出,在Step1.4也就是第四步中,需要對不同分區(qū)中的數據進行排序和分組,默認情況下,是按照key進行排序和分組。

1.2 實驗場景數據文件

  在一些特定的數據文件中,不一定都是類似于WordCount單次統計這種規(guī)范的數據,比如下面這類數據,它雖然只有兩列,但是卻有一定的實踐意義。

3 3 3 2 3 1 2 2 2 1 1 1

  (1)如果按照第一列升序排列,當第一列相同時,第二列升序排列,結果如下所示

1 1 2 1 2 2 3 1 3 2 3 3

  (2)如果當第一列相同時,求出第二列的最小值,結果如下所示

3 1 2 1 1 1

  接著,我們會針對這個數據文件,進行排序和分組的實踐嘗試,以求達到結果所示的效果。

二、初步探索排序

2.1 默認的排序

  在Hadoop默認的排序算法中,只會針對key值進行排序,我們最初的代碼如下(這里只展示了map和reduce函數):

public class MySortJob extends Configured implements Tool {public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, LongWritable> { protected void map( LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, LongWritable>.Context context) throws java.io.IOException, InterruptedException { String[] spilted = value.toString().split("\t"); long firstNum = Long.parseLong(spilted[0]); long secondNum = Long.parseLong(spilted[1]); context.write(new LongWritable(firstNum), new LongWritable( secondNum)); }; } public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { protected void reduce( LongWritable key, java.lang.Iterable<LongWritable> values, Reducer<LongWritable, LongWritable, LongWritable, LongWritable>.Context context) throws java.io.IOException, InterruptedException { for (LongWritable value : values) { context.write(key, value); } }; } }

  這里我們將第一列作為了key,第二列作為了value。

  可以查看一下運行后的結果,如下所示:

1 1 2 2 2 1 3 3 3 2 3 1

  從運行結果來看,并沒有達到我們最初的目的,于是,我們需要拋棄默認的排序規(guī)則,因此我們要自定義排序。

2.2 自定義排序

  (1)封裝一個自定義類型作為key的新類型:將第一列與第二列都作為key

private static class MyNewKey implements WritableComparable<MyNewKey> {long firstNum; long secondNum; public MyNewKey() { } public MyNewKey(long first, long second) { firstNum = first; secondNum = second; } @Override public void write(DataOutput out) throws IOException { out.writeLong(firstNum); out.writeLong(secondNum); } @Override public void readFields(DataInput in) throws IOException { firstNum = in.readLong(); secondNum = in.readLong(); } /* * 當key進行排序時會調用以下這個compreTo方法 */ @Override public int compareTo(MyNewKey anotherKey) { long min = firstNum - anotherKey.firstNum; if (min != 0) { // 說明第一列不相等,則返回兩數之間小的數 return (int) min; } else { return (int) (secondNum - anotherKey.secondNum); } } }

PS:這里為什么需要封裝一個新類型呢?因為原來只有key參與排序,現在將第一個數和第二個數都參與排序,作為一個新的key。

  (2)改寫最初的MapReduce方法函數代碼:(只展示了map和reduce函數,還需要修改map和reduce輸出的類型設置)

public static class MyMapper extendsMapper<LongWritable, Text, MyNewKey, LongWritable> { protected void map( LongWritable key, Text value, Mapper<LongWritable, Text, MyNewKey, LongWritable>.Context context) throws java.io.IOException, InterruptedException { String[] spilted = value.toString().split("\t"); long firstNum = Long.parseLong(spilted[0]); long secondNum = Long.parseLong(spilted[1]); // 使用新的類型作為key參與排序 MyNewKey newKey = new MyNewKey(firstNum, secondNum); context.write(newKey, new LongWritable(secondNum)); }; } public static class MyReducer extends Reducer<MyNewKey, LongWritable, LongWritable, LongWritable> { protected void reduce( MyNewKey key, java.lang.Iterable<LongWritable> values, Reducer<MyNewKey, LongWritable, LongWritable, LongWritable>.Context context) throws java.io.IOException, InterruptedException { context.write(new LongWritable(key.firstNum), new LongWritable( key.secondNum)); }; }

  從上面的代碼中我們可以發(fā)現,新類型MyNewKey實現了一個叫做WritableComparable的接口,該接口中有一個compareTo()方法,當對key進行比較時會調用該方法,而我們將其改為了我們自己定義的比較規(guī)則,從而實現我們想要的效果。

  其實,這個WritableComparable還實現了兩個接口,我們看看其定義:

public interface WritableComparable<T> extends Writable, Comparable<T> { }

  Writable接口是為了實現序列化,而Comparable則是為了實現比較。

  (3)現在看看運行結果:

1 1 2 1 2 2 3 1 3 2 3 3

  運行結果與預期的已經一致,自定義排序生效!

三、初步探索分組

3.1 默認的分組

  在Hadoop中的默認分組規(guī)則中,也是基于Key進行的,會將相同key的value放到一個集合中去。這里以上面的例子繼續(xù)看看分組,因為 我們自定義了一個新的key,它是以兩列數據作為key的,因此這6行數據中每個key都不相同,也就是說會產生6組,它們是:1 1,2 1,2 2,3 1,3 2,3 3。而實際上只可以分為3組,分別是1,2,3。

  現在首先改寫一下reduce函數代碼,目的是求出第一列相同時第二列的最小值,看看它會有怎么樣的分組:

public static class MyReducer extendsReducer<MyNewKey, LongWritable, LongWritable, LongWritable> { protected void reduce( MyNewKey key, java.lang.Iterable<LongWritable> values, Reducer<MyNewKey, LongWritable, LongWritable, LongWritable>.Context context) throws java.io.IOException, InterruptedException { long min = Long.MAX_VALUE; for (LongWritable number : values) { long temp = number.get(); if (temp < min) { min = temp; } } context.write(new LongWritable(key.firstNum), new LongWritable(min)); }; }

  其運行結果為:

1 1 2 1 2 2 3 1 3 2 3 3

  但是我們預期的結果為:

#當第一列相同時,求出第二列的最小值 3 3 3 2 3 1 2 2 2 1 1 1 ------------------- #預期結果應該是 3 1 2 1 1 1

3.2 自定義分組

  為了針對新的key類型作分組,我們也需要自定義一下分組規(guī)則:

  (1)編寫一個新的分組比較類型用于我們的分組:

private static class MyGroupingComparator implementsRawComparator<MyNewKey> { /* * 基本分組規(guī)則:按第一列firstNum進行分組 */ @Override public int compare(MyNewKey key1, MyNewKey key2) { return (int) (key1.firstNum - key2.firstNum); } /* * @param b1 表示第一個參與比較的字節(jié)數組 * * @param s1 表示第一個參與比較的字節(jié)數組的起始位置 * * @param l1 表示第一個參與比較的字節(jié)數組的偏移量 * * @param b2 表示第二個參與比較的字節(jié)數組 * * @param s2 表示第二個參與比較的字節(jié)數組的起始位置 * * @param l2 表示第二個參與比較的字節(jié)數組的偏移量 */ @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8); } }

  從代碼中我們可以知道,我們自定義了一個分組比較器MyGroupingComparator,該類實現了RawComparator接口,而RawComparator接口又實現了Comparator接口,下面看看這兩個接口的定義:

  首先是RawComparator接口的定義:

public interface RawComparator<T> extends Comparator<T> {public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2); }

  其次是Comparator接口的定義:

public interface Comparator<T> {int compare(T o1, T o2);boolean equals(Object obj); }

  在MyGroupingComparator中分別對這兩個接口中的定義進行了實現,RawComparator中的compare()方法是基于字節(jié)的比較,Comparator中的compare()方法是基于對象的比較。

  在基于字節(jié)的比較方法中,有六個參數,一下子眼花了:

Params:

* @param arg0 表示第一個參與比較的字節(jié)數組
* @param arg1 表示第一個參與比較的字節(jié)數組的起始位置
* @param arg2 表示第一個參與比較的字節(jié)數組的偏移量
*?
* @param arg3 表示第二個參與比較的字節(jié)數組
* @param arg4 表示第二個參與比較的字節(jié)數組的起始位置
* @param arg5 表示第二個參與比較的字節(jié)數組的偏移量

  由于在MyNewKey中有兩個long類型,每個long類型又占8個字節(jié)。這里因為比較的是第一列數字,所以讀取的偏移量為8字節(jié)。

  (2)添加對分組規(guī)則的設置:

  // 設置自定義分組規(guī)則job.setGroupingComparatorClass(MyGroupingComparator.class);

  (3)現在看看運行結果:

參考資料

(1)吳超,《深入淺出Hadoop》:http://www.superwu.cn/

(2)Suddenly,《Hadoop日記Day18-MapReduce排序和分組》:http://www.cnblogs.com/sunddenly/p/4009751.html

?

作者:周旭龍

出處:http://edisonchou.cnblogs.com/

本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文鏈接。

轉載于:https://www.cnblogs.com/1130136248wlxk/p/4975111.html

總結

以上是生活随笔為你收集整理的Hadoop学习笔记—11.MapReduce中的排序和分组的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。