日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

MapReduce中map并行度优化及源码分析

發布時間:2023/12/20 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 MapReduce中map并行度优化及源码分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

mapTask并行度的決定機制

  一個job的map階段并行度由客戶端在提交job時決定,而客戶端對map階段并行度的規劃的基本邏輯為:將待處理數據執行邏輯切片(即按照一個特定切片大小,將待處理數據劃分成邏輯上的多個split),然后每一個split分配一個mapTask并行實例處理。

FileInputFormat切片機制

原文和作者一起討論:http://www.cnblogs.com/intsmaze/p/6733968.html

1、默認切片定義在InputFormat類中的getSplit()方法

2、FileInputFormat中默認的切片機制:

a)?簡單地按照文件的內容長度進行切片

b)?切片大小,默認等于hdfs的block大小

c)?切片時不考慮數據集整體,而是逐個針對每一個文件單獨切片

比如待處理數據有兩個文件:

file1.txt 260M file2.txt 10M

經過FileInputFormat的切片機制運算后,形成的切片信息如下: ?

file1.txt.split1-- 0~128 file1.txt.split2-- 128~260 //如果剩余的文件長度/切片長度<=1.1則會將剩余文件的長度并未一個切片 file2.txt.split1-- 0~10M

3、FileInputFormat中切片的大小的參數配置

通過分析源碼,在FileInputFormat中,計算切片大小的邏輯:Math.max(minSize, Math.min(maxSize, blockSize)); 切片主要由這幾個值來運算決定。

minsize:默認值:1 配置參數: mapreduce.input.fileinputformat.split.minsize maxsize:默認值:Long.MAXValue 配置參數:mapreduce.input.fileinputformat.split.maxsizeblocksize:值為hdfs的對應文件的blocksize

配置讀取目錄下文件數量的線程數:public static final String LIST_STATUS_NUM_THREADS =
????? "mapreduce.input.fileinputformat.list-status.num-threads";

因此,默認情況下,Math.max(minSize,?Math.min(maxSize,?blockSize));切片大小=blocksize

maxsize(切片最大值):參數如果調得比blocksize小,則會讓切片變小。

minsize(切片最小值):參數調的比blockSize大,則可以讓切片變得比blocksize還大。

選擇并發數的影響因素:

1、運算節點的硬件配置

2、運算任務的類型:CPU密集型還是IO密集型

3、運算任務的數據量

3、hadoop2.6.4源碼解析

org.apache.hadoop.mapreduce.JobSubmitter類

//得到job的map任務的并行數量private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,Path jobSubmitDir) throws IOException,InterruptedException, ClassNotFoundException {JobConf jConf = (JobConf)job.getConfiguration();int maps;if (jConf.getUseNewMapper()) {maps = writeNewSplits(job, jobSubmitDir);} else {maps = writeOldSplits(jConf, jobSubmitDir);}return maps;}@SuppressWarnings("unchecked")private <T extends InputSplit>int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,InterruptedException, ClassNotFoundException {Configuration conf = job.getConfiguration();InputFormat<?, ?> input =ReflectionUtils.newInstance(job.getInputFormatClass(), conf); List<InputSplit> splits = input.getSplits(job);T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);// sort the splits into order based on size, so that the biggest// go firstArrays.sort(array, new SplitComparator());JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array);return array.length;}

?

切片計算邏輯,關注紅色字體代碼即可。

public List<InputSplit> getSplits(JobContext job) throws IOException {Stopwatch sw = new Stopwatch().start();
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));long maxSize = getMaxSplitSize(job);// generate splitsList<InputSplit> splits = new ArrayList<InputSplit>(); List<FileStatus> files = listStatus(job);
   //遍歷文件,對每一個文件進行如下處理:獲得文件的blocksize,獲取文件的長度,得到切片信息(spilt 文件路徑,切片編號,偏移量范圍)
for (FileStatus file: files) {Path path = file.getPath();long length = file.getLen();if (length != 0) {BlockLocation[] blkLocations;if (file instanceof LocatedFileStatus) {blkLocations = ((LocatedFileStatus) file).getBlockLocations();} else {FileSystem fs = path.getFileSystem(job.getConfiguration());blkLocations = fs.getFileBlockLocations(file, 0, length);}if (isSplitable(job, path)) {long blockSize = file.getBlockSize();long splitSize = computeSplitSize(blockSize, minSize, maxSize);long bytesRemaining = length;while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path, length-bytesRemaining, splitSize,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));bytesRemaining -= splitSize;}if (bytesRemaining != 0) {int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));}} else { // not splitablesplits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),blkLocations[0].getCachedHosts()));}} else { //Create empty hosts array for zero length filessplits.add(makeSplit(path, 0, length, new String[0]));}}// Save the number of input files for metrics/loadgen job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());sw.stop();if (LOG.isDebugEnabled()) {LOG.debug("Total # of splits generated by getSplits: " + splits.size()+ ", TimeTaken: " + sw.elapsedMillis());}return splits;}

?

public static final String SPLIT_MINSIZE = "mapreduce.input.fileinputformat.split.minsize";public static final String SPLIT_MAXSIZE = "mapreduce.input.fileinputformat.split.maxsize";long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));//保證切分的文件長度最小不得小于1字節protected long getFormatMinSplitSize() {return 1;}//如果沒有在conf中設置SPLIT_MINSIZE參數,則取默認值1字節。public static long getMinSplitSize(JobContext job) {return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);}//得到切片文件的最大長度long maxSize = getMaxSplitSize(job);//如果沒有在conf中設置SPLIT_MAXSIZE參數,則去默認值Long.MAX_VALUE字節。public static long getMaxSplitSize(JobContext context) {return context.getConfiguration().getLong(SPLIT_MAXSIZE, Long.MAX_VALUE);}//讀取指定目錄下的所有文件的信息List<FileStatus> files = listStatus(job);//如果沒有指定開啟幾個線程讀取,則默認一個線程去讀文件信息,因為存在目錄下有上億個文件的情況,所以有需要開啟多個線程加快讀取。int numThreads = job.getConfiguration().getInt(LIST_STATUS_NUM_THREADS,DEFAULT_LIST_STATUS_NUM_THREADS);public static final String LIST_STATUS_NUM_THREADS ="mapreduce.input.fileinputformat.list-status.num-threads";public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1;//計算切片文件的邏輯大小long splitSize = computeSplitSize(blockSize, minSize, maxSize);protected long computeSplitSize(long blockSize, long minSize,long maxSize) {return Math.max(minSize, Math.min(maxSize, blockSize));}private static final double SPLIT_SLOP = 1.1; // 10% slop//判斷剩余文件與切片大小的比是否為1.1.while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);splits.add(makeSplit(path, length-bytesRemaining, splitSize,blkLocations[blkIndex].getHosts(),blkLocations[blkIndex].getCachedHosts()));bytesRemaining -= splitSize;}

map并行度

  如果job的每個map或者reduce的task的運行時間都只有30-40秒鐘(最好每個map的執行時間最少不低于一分鐘),那么就減少該job的map或者reduce數。每一個task的啟動和加入到調度器中進行調度,這個中間的過程可能都要花費幾秒鐘,所以如果每個task都非常快就跑完了,就會在task的開始和結束的時候浪費太多的時間。

  配置task的JVM重用可以改善該問題:   (mapred.job.reuse.jvm.num.tasks,默認是1,表示一個JVM上最多可以順序執行的task數目(屬于同一個Job)是1。也就是說一個task啟一個JVM)。

小文件的場景下,默認的切片機制會造成大量的maptask處理很少量的數據,效率低下:

解決方案:

  推薦:把小文件存入hdfs之前進行預處理,先合并為大文件后再上傳。

  折中:寫程序對hdfs上小文件進行合并再跑job處理。

  補救措施:如果大量的小文件已經存在hdfs上了,使用combineInputFormate組件,它可以將眾多的小文件從邏輯上規劃到一個切片中,這樣多個小文件就可以交給一個maptask操作了。

轉載于:https://www.cnblogs.com/intsmaze/p/6733968.html

總結

以上是生活随笔為你收集整理的MapReduce中map并行度优化及源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。