日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Mapreduce中maptask过程详解

發布時間:2023/11/30 编程问答 42 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Mapreduce中maptask过程详解 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、Maptask并行度與決定機制

  1.一個job任務的map階段的并行度默認是由該任務的大小決定的;

  2.一個split切分分配一個maprask來并行處理;

  3.默認情況下,split切分的大小等于blocksize大小;

  4.切片不是mapper類中對單詞的切片,而是對每一個處理文件的單獨切片。

  eg.? 默認情況下,一個maptask處理的文件大小為128M,比如一個400M的數據文件,就需要4個maptask并行來處理,而500M的數據文件也是需要4個maptask。

?

二、Maptask運行機制

  1.讀數據文件:執行類Driver通過InputFormat類讀取文件中的數據;

  2.mapper階段:通過文件的大小決定了maptask的數量,然后mapper進行邏輯運行(讀數據、切分、封裝);

  3.OutputCollector階段:mapper方法通過OutputCollector接口將KV對寫入到環形緩沖區中(這個過程不需要我們處理我們);

  4.溢寫階段:環形緩沖區默認的大小為100M,當環形緩沖區中數據量到達閾值的80%的時候發生溢寫,溢寫的過程中會保證數據KV對使用默認的分區和排序(HashPartitioner分區、字典排序,而環形緩沖區大小和閾值的大小都是可以通過配置來修改的);

  5.歸并排序:將溢寫的數據進行合并排序。

?

三、MR的小文件優化案例

  當許多個小文件上傳到HDFS集群上時,每個小文件都將會占用一個blocksize的大小(128M),而且在對它們進行MR計算時,一個文件就會開啟一個maptask,這樣會浪費很多的資源,下面有兩種解決方案:

  1.在文件上傳到HDFS集群前,先將文件進行合并成一個大的文件,再上傳到HDFS集群進行存儲和計算;

  2.若文件已經上傳到HDFS集群,需要直接進行計算時,

  可以再Driver類中設置輸入流之前設置InputFormatClass屬性為CombinerTextInputFormat(它的默認為TextInputFormat),

  原理是:CombineTextInputFormat類可以將多個小文件交給一個split切片,然后交給一個maptask來處理,即再Driver類中設置輸入流FileInputFormat前加入代碼:

job.setInputFormatClass(CombinerTextInputFormat.class); CombinerTextInputFormat.setMaxInputSplitSize(job,4194304); //設置切片最大值為4M CombinerTextInputFormat.setMinInputSplitSize(job,3145725); //設置切片最大值為3M

  表示大小在3M~4M的文件會被方法一個切片中,那么如果有無數的小文件,一個maptask中大概會有28~42個小文件一起處理。

?

四、自定義分區Partitioner

  在MR程序中,默認分區為HashPartitioner,以下為源碼:

public class HashPartitioner<K, V> extends Partitioner<K, V> {public HashPartitioner() {}public int getPartition(K key, V value, int numReduceTasks) {return (key.hashCode() & 2147483647) % numReduceTasks;} }

  HashPartitioner繼承了父類Partitioner,其中getPartition方法返回int值0(注釋:分區數量決定了reducetask的數量,不分區reducetask值為1,所以一直返回int值0,也就只會產生一個結果文件!!!)

  而如果我們想要進行自定義分區,就要重新定義一個分區類繼承Partitioner類:

public class FlowPartitioner extends Partitioner<Text,FlowBean> {@Overridepublic int getPartition(Text key, FlowBean value, int i) {//獲取用來分區的電話號碼前三位String phoneNum = key.toString().substring(0, 3);//設置分區邏輯int partitionNum = 4;if ("135".equals(phoneNum)){return 0;}else if ("137".equals(phoneNum)){return 1;}else if ("138".equals(phoneNum)){return 2;}else if ("139".equals(phoneNum)){return 3;}return partitionNum;} }

  我在流量統計案例中也寫了該分區類,然后再Driver類中的InputFormat類之前加入設置的自定義分區代碼:

?

job.setPartitionClass(PhoneNumPartitioner.class); job.setNumReduceTasks(5); (注意:輸出文件數量要大于partitioner分區的數量)

?

  總結:MR程序運算過程中,決定maptask個數的有塊大小(blocksize)、數據文件大小、文件輸入方式(小文件優化);而決定reducetask個數的是分區(無分區時reducetask個數為1,生成一個結果文件)。

  

  

  

?

轉載于:https://www.cnblogs.com/HelloBigTable/p/10591105.html

總結

以上是生活随笔為你收集整理的Mapreduce中maptask过程详解的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。