日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Hadoop之Shuffle机制详解

發布時間:2024/2/28 编程问答 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hadoop之Shuffle机制详解 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Hadoop之Shuffle機制詳解


目錄

  • Shuffle機制
  • Partition分區
  • WritableComparable排序
  • Combiner合并
  • GroupingComparator分組(輔助排序)

  • 1. Shuffle機制

    Mapreduce確保每個reducer的輸入都是按key排序的。系統執行排序的過程(即將mapper輸出作為輸入傳給reducer)稱為shuffle,如下圖所示


    2. Partition分區

    問題引出:要求將統計結果按照條件輸出到不同文件中(分區)。比如:將統計結果按照手機歸屬地不同省份輸出到不同文件中(分區)

  • 默認partition分區
  • public class HashPartitioner<K, V> extends Partitioner<K, V> {public int getPartition(K key, V value, int numReduceTasks) {return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;} } 默認分區是根據key的hashCode對reduceTasks個數取模得到的。用戶沒法控制哪個key存儲到哪個分區。
  • 自定義Partitioner步驟
  • 自定義類繼承Partitioner,重寫getPartition()方法
  • public class ProvincePartitioner extends Partitioner<Text, FlowBean> {@Overridepublic int getPartition(Text key, FlowBean value, int numPartitions) {// 1 獲取電話號碼的前三位String preNum = key.toString().substring(0, 3);int partition = 4;// 2 判斷是哪個省if ("136".equals(preNum)) {partition = 0;}else if ("137".equals(preNum)) {partition = 1;}else if ("138".equals(preNum)) {partition = 2;}else if ("139".equals(preNum)) {partition = 3;}return partition;} }
  • 在job驅動中,設置自定義partitioner:
  • job.setPartitionerClass(CustomPartitioner.class);
  • 自定義partition后,要根據自定義partitioner的邏輯設置相應數量的reduce task
  • job.setNumReduceTasks(5);

    注意

  • 如果reduceTask的數量> getPartition的結果數,則會多產生幾個空的輸出文件part-r-000xx;

  • 如果1<reduceTask的數量<getPartition的結果數,則有一部分分區數據無處安放,會Exception;

  • 如果reduceTask的數量=1,則不管mapTask端輸出多少個分區文件,最終結果都交給這一個reduceTask,最終也就只會產生一個結果文件 part-r-00000;

    例如:假設自定義分區數為5,則
    (1)job.setNumReduceTasks(1);會正常運行,只不過會產生一個輸出文件
    (2)job.setNumReduceTasks(2);會報錯
    (3)job.setNumReduceTasks(6);大于5,程序會正常運行,會產生空文件


  • 3. WritableComparable排序

    排序是MapReduce框架中最重要的操作之一。Map Task和Reduce Task均會對數據(按照key)進行排序。該操作屬于Hadoop的默認行為。任何應用程序中的數據均會被排序,而不管邏輯上是否需要。默認排序是按照字典順序排序,且實現該排序的方法是快速排序。

    對于Map Task,它會將處理的結果暫時放到一個緩沖區中,當緩沖區使用率達到一定閾值后,再對緩沖區中的數據進行一次排序,并將這些有序數據寫到磁盤上,而當數據處理完畢后,它會對磁盤上所有文件進行一次合并,以將這些文件合并成一個大的有序文件。

    對于Reduce Task,它從每個Map Task上遠程拷貝相應的數據文件,如果文件大小超過一定閾值,則放到磁盤上,否則放到內存中。如果磁盤上文件數目達到一定閾值,則進行一次合并以生成一個更大文件;如果內存中文件大小或者數目超過一定閾值,則進行一次合并后將數據寫到磁盤上。當所有數據拷貝完畢后,Reduce Task統一對內存和磁盤上的所有數據進行一次合并。

    每個階段的默認排序

  • 排序的分類

  • 部分排序:區內排序——環形緩沖區
    MapReduce根據輸入記錄的鍵對數據集排序。保證輸出的每個文件內部排序。
  • 全排序:
    如何用Hadoop產生一個全局排序的文件?最簡單的方法是使用一個分區。但該方法在處理大型文件時效率極低,因為一臺機器必須處理所有輸出文件,從而完全喪失了MapReduce所提供的并行架構。
    替代方案:首先創建一系列排好序的文件;其次,串聯這些文件;最后,生成一個全局排序的文件。主要思路是使用一個分區來描述輸出的全局排序。例如:可以為上述文件創建3個分區,在第一分區中,記錄的單詞首字母a-g,第二分區記錄單詞首字母h-n, 第三分區記錄單詞首字母o-z。
  • 輔助排序:(GroupingComparator分組)
    Mapreduce框架在記錄到達reducer之前按鍵對記錄排序,但鍵所對應的值并沒有被排序。甚至在不同的執行輪次中,這些值的排序也不固定,因為它們來自不同的map任務且這些map任務在不同輪次中完成時間各不相同。一般來說,大多數MapReduce程序會避免讓reduce函數依賴于值的排序。但是,有時也需要通過特定的方法對鍵進行排序和分組等以實現對值的排序。
  • 二次排序:
    在自定義排序過程中,如果compareTo中的判斷條件為兩個即為二次排序。
  • 自定義排序WritableComparable
    (1)原理分析
    bean對象實現WritableComparable接口重寫compareTo方法,就可以實現排序

  • @Override public int compareTo(FlowBean o) {// 倒序排列,從大到小return this.sumFlow > o.getSumFlow() ? -1 : 1; }

    4. Combiner合并

  • combiner是MR程序中Mapper和Reducer之外的一種組件。

  • combiner組件的父類就是Reducer。

  • combiner和reducer的區別在于運行的位置:

  • Combiner是在每一個maptask所在的節點運行;
  • Reducer是接收全局所有Mapper的輸出結果;
  • combiner的意義就是對每一個maptask的輸出進行局部匯總,以減小網絡傳輸量。

  • combiner能夠應用的前提是不能影響最終的業務邏輯,而且,combiner的輸出kv應該跟reducer的輸入kv類型要對應起來。

  • 自定義Combiner實現步驟:

  • 自定義一個combiner繼承Reducer,重寫reduce方法
  • public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{@Overrideprotected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {// 1 匯總操作int count = 0;for(IntWritable v :values){count += v.get();}// 2 寫出context.write(key, new IntWritable(count));} }
  • 在job驅動類中設置:
  • job.setCombinerClass(WordcountCombiner.class);

    5. GroupingComparator分組(輔助排序)

    對reduce階段的數據根據某一個或幾個字段進行分組。

    總結

    以上是生活随笔為你收集整理的Hadoop之Shuffle机制详解的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。