日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Hadoop生态圈(二十一)- MapReduce编程基础

發布時間:2024/1/1 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hadoop生态圈(二十一)- MapReduce编程基础 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

  • 1. MapReduce Partition、Combiner
    • 1.1 MapReduce Partition分區
      • 1.1.1 默認情況下MR輸出文件個數
      • 1.1.2 修改reducetask個數
      • 1.1.3 數據分區概念
      • 1.1.4 默認分區規則
      • 1.1.5 Partition注意事項
    • 1.2 MapReduce Combiner規約
      • 1.2.1 數據規約的含義
      • 1.2.2 MapReduce弊端
      • 1.2.3 Combiner組件概念
      • 1.2.4 Combiner組件使用
      • 1.2.5 Combiner使用注意事項
  • 2. MapReduce編程指南
    • 2.1 編程技巧
    • 2.2 MapReduce執行流程圖
      • 2.2.1 執行流程圖
      • 2.2.2 Map階段執行過程
      • 2.2.3 Redue階段執行過程
    • 2.3 key的重要性體現
  • 3. 案例:美國新冠疫情COVID-19統計
    • 3.1 MapReduce自定義對象序列化
      • 3.1.1 需求
      • 3.1.2 分析
      • 3.1.3 代碼實現
        • 3.1.3.1 自定義JavaBean
        • 3.1.3.2 Mapper類
        • 3.1.3.3 Reducer類
        • 3.1.3.4 程序驅動類
      • 3.1.4 代碼執行結果
    • 3.2 MapReduce自定義排序
      • 3.2.1 需求
      • 3.2.2 分析
      • 3.2.3 代碼實現
        • 3.2.3.1 自定義JavaBean
        • 3.2.3.2 Mapper類
        • 3.2.3.3 Reducer類
        • 3.2.3.4 驅動程序類
      • 3.2.4 代碼執行結果
    • 3.3 MapReduce自定義分區
      • 3.3.1 需求
      • 3.3.2 分析
      • 3.3.3 代碼實現
        • 3.3.3.1 自定義JavaBean
        • 3.3.3.2 自定義分區器
        • 3.3.3.3 Mapper類
        • 3.3.3.4 Reducer類
        • 3.3.3.5 驅動程序類
      • 3.3.4 代碼執行結果
      • 3.3.5 分區個數和reducetask個數的關系
    • 3.4 MapReduce自定義分組
      • 3.4.1 分組概念和默認分組規則
      • 3.4.2 自定義分組規則
      • 3.4.3 需求
      • 3.4.4 分析
      • 3.4.5 代碼實現
        • 3.4.5.1 自定義對象
        • 3.4.5.2 Mapper類
        • 3.4.5.3 Reducer類
        • 3.4.5.4 自定義分組
        • 3.4.5.5 驅動程序類
      • 3.4.6 代碼執行結果
    • 3.5 自定義分組擴展:topN問題
      • 3.5.1 需求
      • 3.5.2 分析
      • 3.5.3 代碼實現
        • 3.5.3.1 自定義對象、自定義分組類
        • 3.5.3.2 Mapper類
        • 3.5.3.3 Reducer類
        • 3.5.3.4 程序驅動類
      • 3.5.4 代碼執行結果

原文地址:https://program-park.github.io/2022/02/04/hadoop_25/

1. MapReduce Partition、Combiner

1.1 MapReduce Partition分區

1.1.1 默認情況下MR輸出文件個數

??在默認情況下,不管 map 階段有多少個并發執行 task,到 reduce 階段,所有的結果都將有一個 reduce 來處理,并且最終結果輸出到一個文件中。
??此時,MapReduce 的執行流程如下所示:

1.1.2 修改reducetask個數

??在 MapReduce 程序的驅動類中,通過 job 提供的方法,可以修改 reducetask 的個數。

??默認情況下不設置,reducetask 個數為 1,結果輸出到一個文件中。
??使用 api 修改 reducetask 個數之后,輸出結果文件的個數和reducetask個數對應。比如設置為 6 個,此時的輸出結果如下所示:

??此時,MapReduce 的執行流程如下所示:

1.1.3 數據分區概念

??當 MapReduce 中有多個reducetask執行的時候,此時maptask的輸出就會面臨一個問題:究竟將自己的輸出數據交給哪一個reducetask來處理,這就是所謂的數據分區(partition)問題。

1.1.4 默認分區規則

??MapReduce 默認分區規則是HashPartitioner。跟 map 輸出的數據 key 有關。

??當然用戶也可以自己自定義分區規則。

1.1.5 Partition注意事項

  • reducetask個數的改變導致了數據分區的產生,而不是有數據分區導致了 reducetask 個數改變。
  • 數據分區的核心是分區規則。即如何分配數據給各個 reducetask。
  • 默認的規則可以保證只要map階段輸出的key一樣,數據就一定可以分區到同一個reducetask,但是不能保證數據平均分區。
  • reducetask 個數的改變還會導致輸出結果文件不再是一個整體,而是輸出到多個文件中。

1.2 MapReduce Combiner規約

1.2.1 數據規約的含義

??數據規約是指在盡可能保持數據原貌的前提下,最大限度地精簡數據量。

1.2.2 MapReduce弊端

  • MapReduce 是一種具有兩個執行階段的分布式計算程序,Map 階段和 Reduce 階段之間會涉及到跨網絡數據傳遞。
  • 每一個 MapTask 都可能會產生大量的本地輸出,這就導致跨網絡傳輸數據量變大,網絡 IO 性能低。

??比如 WordCount 單詞統計案例,假如文件中有 1000 個單詞,其中 999 個為 hello,這將產生 999 個 <hello,1>的鍵值對在網絡中傳遞,性能及其低下。

1.2.3 Combiner組件概念

  • Combiner中文叫做數據規約,是 MapReduce 的一種優化手段。
  • Combiner 的作用就是對map端的輸出先做一次合并,以減少在map和reduce節點之間的數據傳輸量。

1.2.4 Combiner組件使用

  • combiner 是 MapReduce 程序中 Mapper 和 Reducer 之外的一種組件,默認情況下不啟用。
  • combiner本質就是Reducer,combiner 和 reducer的區別在于運行的位置:
    • combiner 是在每一個 maptask 所在的節點運行,是局部聚合;
    • Reducer是對所有 maptask 的輸出結果計算,是全局聚合;
  • 具體實現步驟:
    • 自定義一個 CustomCombiner 繼承 Reducer,重寫 reduce 方法;
    • 在 job 中設置:job.setCombinerClass(CustomCombiner.class);

1.2.5 Combiner使用注意事項

  • Combiner 能夠應用的前提是不能影響最終的業務邏輯,而且,Combiner 的輸出 kv 應該跟 reducer 的輸入 kv 類型要對應起來。
  • 下述場景禁止使用Combiner,不僅優化了數據量,還改變了最終的結果:
    • 業務和數據個數相關的;
    • 業務和整體排序相關的;
  • Combiner 組件不是禁用,而是慎用。用的好提升程序性能,用不好,改變程序結果且不易發現。

2. MapReduce編程指南

2.1 編程技巧

  • MapReduce執行流程了然于心,能夠知道數據在 MapReduce 中的流轉過程。
  • 業務需求解讀準確,即需要明白做什么。
  • 牢牢把握住key的選擇,因為 MapReduce 很多行為跟key相關, 比如:排序、分區、分組。
  • 學會自定義組件修改默認行為,當默認的行為不滿足業務需求,可以嘗試自定義規則。
  • 通過畫圖梳理業務執行流程,確定每個階段的數據類型。

2.2 MapReduce執行流程圖

2.2.1 執行流程圖

2.2.2 Map階段執行過程

  • 第一階段是把輸入目錄下文件按照一定的標準逐個進行邏輯切片,形成切片規劃。默認情況下,Split size=Block size。每一個切片由一個 MapTask 處理(getSplits)。
  • 第二階段是對切片中的數據按照一定的規則解析成<key,value>對。默認規則是把每一行文本內容解析成鍵值對。key 是每一行的起始位置(單位是字節),value 是本行的文本內容(TextInputFormat)。
  • 第三階段是調用 Mapper 類中的 map 方法。上階段中每解析出來的一個<k,v>,調用一次map方法。每次調用 map 方法會輸出零個或多個鍵值對。
  • 第四階段是按照一定的規則對第三階段輸出的鍵值對進行分區。默認是只有一個區。分區的數量就是 Reducer 任務運行的數量。默認只有一個 Reducer 任務。
  • 第五階段是對每個分區中的鍵值對進行排序。首先,按照鍵進行排序,對于鍵相同的鍵值對,按照值進行排序。比如三個鍵值對 <2,2>、<1,3>、<2,1>,鍵和值分別是整數。那么排序后的結果是 <1,3>、<2,1>、<2,2>。如果有第六階段,那么進入第六階段;如果沒有,直接輸出到文件中。
  • 第六階段是對數據進行局部聚合處理,也就是 combiner 處理。鍵相等的鍵值對會調用一次 reduce 方法。經過這一階段,數據量會減少。本階段默認是沒有的。

2.2.3 Redue階段執行過程

  • 第一階段是 Reducer 任務會主動從 Mapper 任務復制其輸出的鍵值對。Mapper 任務可能會有很多,因此 Reducer 會復制多個 Mapper 的輸出。
  • 第二階段是把復制到 Reducer 本地數據,全部進行合并,即把分散的數據合并成一個大的數據。再對合并后的數據排序。
  • 第三階段是對排序后的鍵值對調用 reduce 方法。鍵相等的鍵值對調用一次 reduce 方法,每次調用會產生零個或者多個鍵值對。最后把這些輸出的鍵值對寫入到 HDFS 文件中。

2.3 key的重要性體現

  • 在 MapReduce 編程中,核心是牢牢把握住每個階段的輸入輸出key是什么。
  • 因為 MapReduce 中很多默認行為都跟 key 相關。
    • 排序:key 的字典序a-z 正序
    • 分區:key.hashcode % reducetask 個數
    • 分組:key 相同的分為一組
  • 最重要的是,如果覺得默認的行為不滿足業務需求,MapReduce 還支持自定義排序、分區、分組的規則,這將使得編程更加靈活和方便。

3. 案例:美國新冠疫情COVID-19統計

??現有美國 2021-1-28 號,各個縣 county 的新冠疫情累計案例信息,包括確診病例和死亡病例,數據格式如下所示:

2021-01-28,Juneau City and Borough,Alaska,02110,1108,3 2021-01-28,Kenai Peninsula Borough,Alaska,02122,3866,18 2021-01-28,Ketchikan Gateway Borough,Alaska,02130,272,1 2021-01-28,Kodiak Island Borough,Alaska,02150,1021,5 2021-01-28,Kusilvak Census Area,Alaska,02158,1099,3 2021-01-28,Lake and Peninsula Borough,Alaska,02164,5,0 2021-01-28,Matanuska-Susitna Borough,Alaska,02170,7406,27 2021-01-28,Nome Census Area,Alaska,02180,307,0 2021-01-28,North Slope Borough,Alaska,02185,973,3 2021-01-28,Northwest Arctic Borough,Alaska,02188,567,1 2021-01-28,Petersburg Borough,Alaska,02195,43,0

??字段含義如下:date(日期),county(縣),state(州),fips(縣編碼code),cases(累計確診病例),deaths(累計死亡病例)。
??完整數據集鏈接:https://pan.baidu.com/s/1AdWWprwEdeyfELOY7YP6ug,提取碼:6666

3.1 MapReduce自定義對象序列化

3.1.1 需求

??統計美國 2021-1-28,每個州 state 累積確診案例數、累計死亡案例數。

3.1.2 分析

  • 自定義對象CovidCountBean,用于封裝每個縣的確診病例數和死亡病例數。
  • 注意需要實現Hadoop的序列化機制。
  • 以州state作為map階段輸出的key,以 CovidCountBean 作為 value,這樣經過 MapReduce 的默認排序分組規則,屬于同一個州的數據就會變成一組進行 reduce 處理,進行累加即可得出每個州累計確診病例。
  • 3.1.3 代碼實現

    3.1.3.1 自定義JavaBean

    public class CovidCountBean implements Writable{private long cases;//確診病例數private long deaths;//死亡病例數public CovidCountBean() {}public CovidCountBean(long cases, long deaths) {this.cases = cases;this.deaths = deaths;}public void set(long cases, long deaths) {this.cases = cases;this.deaths = deaths;}public long getCases() {return cases;}public void setCases(long cases) {this.cases = cases;}public long getDeaths() {return deaths;}public void setDeaths(long deaths) {this.deaths = deaths;}/*** 序列化方法*/@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(cases);out.writeLong(deaths);}/*** 反序列化方法 注意順序*/@Overridepublic void readFields(DataInput in) throws IOException {this.cases = in.readLong();this.deaths =in.readLong();}@Overridepublic String toString() {return cases +"\t"+ deaths;} }

    3.1.3.2 Mapper類

    public class CovidSumMapper extends Mapper<LongWritable, Text, Text, CovidCountBean> {Text outKey = new Text();CovidCountBean outValue = new CovidCountBean();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] fields = value.toString().split(",");//州outKey.set(fields[2]);//Covid數據 確診病例 死亡病例outValue.set(Long.parseLong(fields[fields.length-2]),Long.parseLong(fields[fields.length-1]));//map輸出結果context.write(outKey,outValue);} }

    3.1.3.3 Reducer類

    public class CovidSumReducer extends Reducer<Text, CovidCountBean,Text,CovidCountBean> {CovidCountBean outValue = new CovidCountBean();@Overrideprotected void reduce(Text key, Iterable<CovidCountBean> values, Context context) throws IOException, InterruptedException {long totalCases = 0;long totalDeaths =0;//累加統計for (CovidCountBean value : values) {totalCases += value.getCases();totalDeaths +=value.getDeaths();}outValue.set(totalCases,totalDeaths);context.write(key,outValue);} }

    3.1.3.4 程序驅動類

    public class CovidSumDriver {public static void main(String[] args) throws Exception{//配置文件對象Configuration conf = new Configuration();// 創建作業實例Job job = Job.getInstance(conf, CovidSumDriver.class.getSimpleName());// 設置作業驅動類job.setJarByClass(CovidSumDriver.class);// 設置作業mapper reducer類job.setMapperClass(CovidSumMapper.class);job.setReducerClass(CovidSumReducer.class);// 設置作業mapper階段輸出key value數據類型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(CovidCountBean.class);//設置作業reducer階段輸出key value數據類型 也就是程序最終輸出數據類型job.setOutputKeyClass(Text.class);job.setOutputValueClass(CovidCountBean.class);// 配置作業的輸入數據路徑FileInputFormat.addInputPath(job, new Path(args[0]));// 配置作業的輸出數據路徑FileOutputFormat.setOutputPath(job, new Path(args[1]));//判斷輸出路徑是否存在 如果存在刪除FileSystem fs = FileSystem.get(conf);if(fs.exists(new Path(args[1]))){fs.delete(new Path(args[1]),true);}// 提交作業并等待執行完成boolean resultFlag = job.waitForCompletion(true);//程序退出System.exit(resultFlag ? 0 :1);} }

    3.1.4 代碼執行結果

    3.2 MapReduce自定義排序

    3.2.1 需求

    ??統計美國 2021-01-28,每個州state的累積確證案例數、累積死亡案例數。
    ??將美國 2021-01-28,每個州state的確證案例數進行倒序排序。

    3.2.2 分析

    ??如果你的需求中需要根據某個屬性進行排序 ,不妨把這個屬性作為 key。因為 MapReduce 中key有默認排序行為的。但是需要進行如下考慮:

    • 如果你的需求是正序,并且數據類型是 Hadoop 封裝好的基本類型。這種情況下不需要任何修改,直接使用基本類型作為 key 即可。因為 Hadoop 封裝好的類型已經實現了排序規則。
      • 比如,LongWritable 類型:
    • 如果你的需求是倒序,或者數據類型是自定義對象。需要重寫排序規則。需要對象實現Comparable接口,重寫ComparTo方法。

    3.2.3 代碼實現

    3.2.3.1 自定義JavaBean

    public class CovidCountBean implements WritableComparable<CovidCountBean> {private long cases;//確診病例數private long deaths;//死亡病例數public CovidCountBean() {}public CovidCountBean(long cases, long deaths) {this.cases = cases;this.deaths = deaths;}public void set(long cases, long deaths) {this.cases = cases;this.deaths = deaths;}public long getCases() {return cases;}public void setCases(long cases) {this.cases = cases;}public long getDeaths() {return deaths;}public void setDeaths(long deaths) {this.deaths = deaths;}/*** 序列化方法*/@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(cases);out.writeLong(deaths);}/*** 反序列化方法 注意順序*/@Overridepublic void readFields(DataInput in) throws IOException {this.cases = in.readLong();this.deaths =in.readLong();}@Overridepublic String toString() {return cases +"\t"+ deaths;}/*** 排序比較器 本業務中根據確診案例數倒序排序*/@Overridepublic int compareTo(CovidCountBean o) {return this.cases - o.getCases()> 0 ? -1:(this.cases - o.getCases() < 0 ? 1 : 0);} }

    3.2.3.2 Mapper類

    public class CovidSortSumMapper extends Mapper<LongWritable, Text, CovidCountBean,Text> {CovidCountBean outKey = new CovidCountBean();Text outValue = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] fields = value.toString().split("\t");outKey.set(Long.parseLong(fields[1]),Long.parseLong(fields[2]));outValue.set(fields[0]);context.write(outKey,outValue);} }

    3.2.3.3 Reducer類

    public class CovidSortSumReducer extends Reducer<CovidCountBean, Text,Text,CovidCountBean> {@Overrideprotected void reduce(CovidCountBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {Text outKey = values.iterator().next();context.write(outKey,key);} }

    3.2.3.4 驅動程序類

    public class CovidSortSumDriver {public static void main(String[] args) throws Exception{//配置文件對象Configuration conf = new Configuration();// 創建作業實例Job job = Job.getInstance(conf, CovidSortSumDriver.class.getSimpleName());// 設置作業驅動類job.setJarByClass(CovidSortSumDriver.class);// 設置作業mapper reducer類job.setMapperClass(CovidSortSumMapper.class);job.setReducerClass(CovidSortSumReducer.class);// 設置作業mapper階段輸出key value數據類型job.setMapOutputKeyClass(CovidCountBean.class);job.setMapOutputValueClass(Text.class);//設置作業reducer階段輸出key value數據類型 也就是程序最終輸出數據類型job.setOutputKeyClass(Text.class);job.setOutputValueClass(CovidCountBean.class);// 配置作業的輸入數據路徑FileInputFormat.addInputPath(job, new Path(args[0]));// 配置作業的輸出數據路徑FileOutputFormat.setOutputPath(job, new Path(args[1]));//判斷輸出路徑是否存在 如果存在刪除FileSystem fs = FileSystem.get(conf);if(fs.exists(new Path(args[1]))){fs.delete(new Path(args[1]),true);}// 提交作業并等待執行完成boolean resultFlag = job.waitForCompletion(true);//程序退出System.exit(resultFlag ? 0 :1);} }

    3.2.4 代碼執行結果

    3.3 MapReduce自定義分區

    3.3.1 需求

    ??將美國每個州的疫情數據輸出到各自不同的文件中,即一個州的數據在一個結果文件中。

    3.3.2 分析

    ??輸出到不同文件中表示 reducetask 有多個,而 reducetask 默認只有1個,可以通過job.setNumReduceTasks(N)設置。當有多個 reducetask 意味著數據分區,默認分區規則是hashPartitioner,默認分區規則符合業務需求的話,就直接使用;不符合,再自定義分區。

    3.3.3 代碼實現

    3.3.3.1 自定義JavaBean

    public class CovidCountBean implements WritableComparable<CovidCountBean> {private long cases;//確診病例數private long deaths;//死亡病例數public CovidCountBean() {}public CovidCountBean(long cases, long deaths) {this.cases = cases;this.deaths = deaths;}public void set(long cases, long deaths) {this.cases = cases;this.deaths = deaths;}public long getCases() {return cases;}public void setCases(long cases) {this.cases = cases;}public long getDeaths() {return deaths;}public void setDeaths(long deaths) {this.deaths = deaths;}/*** 序列化方法*/@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(cases);out.writeLong(deaths);}/*** 反序列化方法 注意順序*/@Overridepublic void readFields(DataInput in) throws IOException {this.cases = in.readLong();this.deaths =in.readLong();}@Overridepublic String toString() {return cases +"\t"+ deaths;}/*** 排序比較器 本業務中根據確診案例數倒序排序*/@Overridepublic int compareTo(CovidCountBean o) {return this.cases - o.getCases()> 0 ? -1:(this.cases - o.getCases() < 0 ? 1 : 0);} }

    3.3.3.2 自定義分區器

    public class StatePartitioner extends Partitioner<Text, Text> {//模擬美國各州數據字典 實際中可以從redis中快速查詢 如果數據不大也可以使用數據集合保存public static HashMap<String, Integer> stateMap = new HashMap<String, Integer>();static{stateMap.put("Alabama", 0);stateMap.put("Arkansas", 1);stateMap.put("California", 2);stateMap.put("Florida", 3);stateMap.put("Indiana", 4);}@Overridepublic int getPartition(Text key, Text value, int numPartitions) {Integer code = stateMap.get(key.toString());if (code != null) {return code;}return 5;} }

    3.3.3.3 Mapper類

    public class CovidPartitionMapper extends Mapper<LongWritable, Text,Text, Text> {Text outKey = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] splits = value.toString().split(",");//以州作為輸出的keyoutKey.set(splits[2]);context.write(outKey,value);} }

    3.3.3.4 Reducer類

    public class CovidPartitionReducer extends Reducer<Text,Text,Text, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {for (Text value : values) {context.write(value,NullWritable.get());}} }

    3.3.3.5 驅動程序類

    public class CovidPartitionDriver {public static void main(String[] args) throws Exception{//配置文件對象Configuration conf = new Configuration();// 創建作業實例Job job = Job.getInstance(conf, CovidPartitionDriver.class.getSimpleName());// 設置作業驅動類job.setJarByClass(CovidPartitionDriver.class);// 設置作業mapper reducer類job.setMapperClass(CovidPartitionMapper.class);job.setReducerClass(CovidPartitionReducer.class);// 設置作業mapper階段輸出key value數據類型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);//設置作業reducer階段輸出key value數據類型 也就是程序最終輸出數據類型job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);//todo 設置reducetask個數 和自定義分區器job.setNumReduceTasks(6);job.setPartitionerClass(StatePartitioner.class);// 配置作業的輸入數據路徑FileInputFormat.addInputPath(job, new Path(args[0]));// 配置作業的輸出數據路徑FileOutputFormat.setOutputPath(job, new Path(args[1]));//判斷輸出路徑是否存在 如果存在刪除FileSystem fs = FileSystem.get(conf);if(fs.exists(new Path(args[1]))){fs.delete(new Path(args[1]),true);}// 提交作業并等待執行完成boolean resultFlag = job.waitForCompletion(true);//程序退出System.exit(resultFlag ? 0 :1);} }

    3.3.4 代碼執行結果


    3.3.5 分區個數和reducetask個數的關系

    ??正常情況下:分區的個數 = reducetask個數

    • 分區的個數 > reducetask個數
      • 程序執行報錯
    • 分區的個數 < reducetask個數
      • 有空文件產生

    3.4 MapReduce自定義分組

    3.4.1 分組概念和默認分組規則

    • 分組在發生在 reduce 階段,決定了同一個reduce中哪些數據將組成一組去調用reduce方法處理。
    • 默認分組規則是:key相同的就會分為一組(前后兩個 key 直接比較是否相等)。
    • 需要注意的是,在 reduce 階段進行分組之前,因為進行數據排序行為,因此排序+分組將會使得key一樣的數據一定被分到同一組,一組去調用reduce方法處理。

    3.4.2 自定義分組規則

    • 寫類繼承WritableComparator,重寫Compare方法。
    • 只要Compare方法返回為 0,MapReduce框架在分組的時候就會認為前后兩個相等,分為一組。
    • 在 job 對象中進行設置才能讓自己的重寫分組類生效:
      job.setGroupingComparatorClass(xxxx.class);

    3.4.3 需求

    ??找出美國 2021-01-28,每個州 state 的確診案例數最多的縣 county 是哪一個。該問題也是俗稱的 TopN 問題。

    3.4.4 分析

    • 在 ma p階段將 “州state和累計確診病例數cases” 作為 key 輸出;
    • 重寫對象的排序規則,首先根據州的正序排序,如果州相等,按照確診病例數cases倒序排序,發送到 reduce;
    • 在 reduce 端利用自定義分組規則,將州state相同的分為一組,然后取第一個即是最大值;

    3.4.5 代碼實現

    3.4.5.1 自定義對象

    public class CovidBean implements WritableComparable<CovidBean> {private String state;//州private String county;//縣private long cases;//確診病例public CovidBean() {}public CovidBean(String state, String county, long cases) {this.state = state;this.county = county;this.cases = cases;}public void set (String state, String county, long cases) {this.state = state;this.county = county;this.cases = cases;}public String getState() {return state;}public void setState(String state) {this.state = state;}public String getCounty() {return county;}public void setCounty(String county) {this.county = county;}public long getCases() {return cases;}public void setCases(long cases) {this.cases = cases;}@Overridepublic String toString() {return "CovidBean{" +"state='" + state + '\'' +", county='" + county + '\'' +", cases=" + cases +'}';}//todo 排序規則 根據州state正序進行排序 如果州相同 則根據確診數量cases倒序排序@Overridepublic int compareTo(CovidBean o) {int result ;int i = state.compareTo(o.getState());if ( i > 0) {result =1;} else if (i <0 ) {result = -1;} else {// 確診病例數倒序排序result = cases > o.getCases() ? -1 : 1;}return result;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(state);out.writeUTF(county);out.writeLong(cases);}@Overridepublic void readFields(DataInput in) throws IOException {this.state =in.readUTF();this.county =in.readUTF();this.cases =in.readLong();} }

    3.4.5.2 Mapper類

    public class CovidTop1Mapper extends Mapper<LongWritable, Text, CovidBean, NullWritable> {CovidBean outKey = new CovidBean();NullWritable outValue = NullWritable.get();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] fields = value.toString().split(",");//封裝數據: 州 縣 確診病例outKey.set(fields[2],fields[1],Long.parseLong(fields[4]));context.write(outKey,outValue);} }

    3.4.5.3 Reducer類

    public class CovidTop1Reducer extends Reducer<CovidBean, NullWritable,CovidBean,NullWritable> {@Overrideprotected void reduce(CovidBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {//不遍歷迭代器,此時key就是分組中的第一個key 也就是該州確診病例數最多的縣對應的數據context.write(key,NullWritable.get());} }

    3.4.5.4 自定義分組

    public class CovidGroupingComparator extends WritableComparator {protected CovidGroupingComparator(){super(CovidBean.class,true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {CovidBean aBean = (CovidBean) a;CovidBean bBean = (CovidBean) b;return aBean.getState().compareTo(bBean.getState());} }

    3.4.5.5 驅動程序類

    public class CovidTop1Driver {public static void main(String[] args) throws Exception{//配置文件對象Configuration conf = new Configuration();// 創建作業實例Job job = Job.getInstance(conf, CovidTop1Driver.class.getSimpleName());// 設置作業驅動類job.setJarByClass(CovidTop1Driver.class);// 設置作業mapper reducer類job.setMapperClass(CovidTop1Mapper.class);job.setReducerClass(CovidTop1Reducer.class);// 設置作業mapper階段輸出key value數據類型job.setMapOutputKeyClass(CovidBean.class);job.setMapOutputValueClass(NullWritable.class);//設置作業reducer階段輸出key value數據類型 也就是程序最終輸出數據類型job.setOutputKeyClass(CovidBean.class);job.setOutputValueClass(NullWritable.class);//todo 設置自定義分組job.setGroupingComparatorClass(CovidGroupingComparator.class);// 配置作業的輸入數據路徑FileInputFormat.addInputPath(job, new Path(args[0]));// 配置作業的輸出數據路徑FileOutputFormat.setOutputPath(job, new Path(args[1]));//判斷輸出路徑是否存在 如果存在刪除FileSystem fs = FileSystem.get(conf);if(fs.exists(new Path(args[1]))){fs.delete(new Path(args[1]),true);}// 提交作業并等待執行完成boolean resultFlag = job.waitForCompletion(true);//程序退出System.exit(resultFlag ? 0 :1);} }

    3.4.6 代碼執行結果

    3.5 自定義分組擴展:topN問題

    3.5.1 需求

    ??找出美國 2021-01-28,每個州 state 的確診案例數最多的縣 county 前 3 個。(Top3 問題)

    3.5.2 分析

    • 在 map 階段將 “州state和累計確診病例數cases” 作為 key 輸出;
    • 重寫對象的排序規則,首先根據州的正序排序,如果州相等,按照確診病例數cases倒序排序,發送到 reduce;
    • 在 reduce 端利用自定義分組規則,將州state相同的分為一組,然后遍歷取值,取出每組中的前 3 個即可。

    ??為了驗證驗證結果方便,可以在輸出的時候以 cases 作為 value,實際上為空即可,value 并無實際意義。

    3.5.3 代碼實現

    3.5.3.1 自定義對象、自定義分組類

    ??這兩個和上述的 Top1 一樣,此處就不再重復編寫。可以直接使用。

    3.5.3.2 Mapper類

    public class CovidTopNMapper extends Mapper<LongWritable, Text, CovidBean,LongWritable> {CovidBean outKey = new CovidBean();LongWritable outValue = new LongWritable();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] fields = value.toString().split(",");//封裝數據: 州 縣 確診病例outKey.set(fields[2],fields[1],Long.parseLong(fields[4]));outValue.set(Long.parseLong(fields[4]));context.write(outKey,outValue);} }

    3.5.3.3 Reducer類

    public class CovidTopNReducer extends Reducer<CovidBean, LongWritable,CovidBean,LongWritable> {@Overrideprotected void reduce(CovidBean key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {int num =0;for (LongWritable value : values) {if(num < 3 ){ //輸出每個州最多的前3個context.write(key,value);num++;}else{return;}}} }

    3.5.3.4 程序驅動類

    public class CovidTopNDriver {public static void main(String[] args) throws Exception{//配置文件對象Configuration conf = new Configuration();// 創建作業實例Job job = Job.getInstance(conf, CovidTopNDriver.class.getSimpleName());// 設置作業驅動類job.setJarByClass(CovidTopNDriver.class);// 設置作業mapper reducer類job.setMapperClass(CovidTopNMapper.class);job.setReducerClass(CovidTopNReducer.class);// 設置作業mapper階段輸出key value數據類型job.setMapOutputKeyClass(CovidBean.class);job.setMapOutputValueClass(LongWritable.class);//設置作業reducer階段輸出key value數據類型 也就是程序最終輸出數據類型job.setOutputKeyClass(CovidBean.class);job.setOutputValueClass(LongWritable.class);//todo 設置自定義分組job.setGroupingComparatorClass(CovidGroupingComparator.class);// 配置作業的輸入數據路徑FileInputFormat.addInputPath(job, new Path(args[0]));// 配置作業的輸出數據路徑FileOutputFormat.setOutputPath(job, new Path(args[1]));//判斷輸出路徑是否存在 如果存在刪除FileSystem fs = FileSystem.get(conf);if(fs.exists(new Path(args[1]))){fs.delete(new Path(args[1]),true);}// 提交作業并等待執行完成boolean resultFlag = job.waitForCompletion(true);//程序退出System.exit(resultFlag ? 0 :1);} }

    3.5.4 代碼執行結果

    總結

    以上是生活随笔為你收集整理的Hadoop生态圈(二十一)- MapReduce编程基础的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。