日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

【hadoop】20.MapReduce-InputFormat数据切片机制

發(fā)布時間:2025/3/20 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【hadoop】20.MapReduce-InputFormat数据切片机制 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

2019獨角獸企業(yè)重金招聘Python工程師標準>>>

簡介

通過本章節(jié),您可以學習到:

  • Job的提交流程
  • InputFormat數據切片的機制
  • 1、Job提交流程源碼分析

    1)job提交流程源碼詳解 waitForCompletion() submit(); // 1建立連接 connect(); // 1)創(chuàng)建提交job的代理 new Cluster(getConfiguration());// (1)判斷是本地yarn還是遠程initialize(jobTrackAddr, conf); // 2 提交job submitter.submitJobInternal(Job.this, cluster)// 1)創(chuàng)建給集群提交數據的Stag路徑Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);// 2)獲取jobid ,并創(chuàng)建job路徑JobID jobId = submitClient.getNewJobID();// 3)拷貝jar包到集群 copyAndConfigureFiles(job, submitJobDir); rUploader.uploadFiles(job, jobSubmitDir); // 4)計算切片,生成切片規(guī)劃文件 writeSplits(job, submitJobDir);maps = writeNewSplits(job, jobSubmitDir);input.getSplits(job); // 5)向Stag路徑寫xml配置文件 writeConf(conf, submitJobFile);conf.writeXml(out); // 6)提交job,返回提交狀態(tài) status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

    注意以上代碼只是大過程的提取,并不是連續(xù)的某處的代碼。要了解詳細的過程,可以通過編譯器打斷點了解。

    2、InputFomat數據切片機制

    2.1、FileInputFormat圖解分析

    紅色劃分是均分方式,這種方式比較低下。

    而當前采用的是藍色方式,以一個塊為一個切片。大致流程如下:

  • 找到你數據輸入的目錄。
  • 開始遍歷處理(規(guī)劃切片)目錄下的每一個文件
  • 循環(huán)執(zhí)行4-6步驟,直接遍歷完所有輸入文件。
  • 遍歷第一個文件test1.file
    • 獲取文件大小fs.sizeOf(ss.txt);
    • 計算切片大小computeSliteSize(Math.max(minSize,Math.max(maxSize,blocksize)))=blocksize;
    • 默認情況下,切片大小=blocksize
  • 開始切片,形成第1個切片:test1.file—0:128M;第2個切片test1.file—128:256M 第3個切片test1.file—256M:300M(每次切片時,都要判斷切完剩下的部分是否大于塊的1.1倍,不大于1.1倍就劃分一塊切片)
  • 將切片信息寫到一個切片規(guī)劃文件中。
    • 整個切片的核心過程在getSplit()方法中完成。需要注意的是數據切片只是在邏輯上對輸入數據進行分片,并不會再磁盤上將其切分成分片進行存儲。InputSplit只記錄了分片的元數據信息,比如起始位置、長度以及所在的節(jié)點列表等。
  • 提交切片規(guī)劃文件到yarn上,yarn上的MrAppMaster就可以根據切片規(guī)劃文件計算開啟maptask個數。
  • block是HDFS上物理上存儲的存儲的數據,切片是對數據邏輯上的劃分。

    2.2、FileInputFormat中默認的切片機制

    通過以下的學習,我們可以總結出以下三個結論:

    • 切片過程只是簡單地按照文件的內容長度進行切片
    • 切片大小默認等于block大小
    • 切片時不考慮數據集整體,而是逐個針對每一個文件單獨切片

    舉個例子加入我們有以下兩個文件

    file1.txt 320M file2.txt 10M

    經過FileInputFormat的切片機制運算后,默認配置下形成的切片信息如下:

    file1.txt.split1-- 0~128 file1.txt.split2-- 128~256 file1.txt.split3-- 256~320 file2.txt.split1-- 0~10M

    2.3、FileInputFormat切片大小的參數配置

    通過分析源碼org.apache.hadoop.mapreduce.lib.input.FileInputFormat,我們先來看看他的父類InputFormat

    // // Source code recreated from a .class file by IntelliJ IDEA // (powered by Fernflower decompiler) //package org.apache.hadoop.mapreduce;import java.io.IOException; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable;@Public @Stable public abstract class InputFormat<K, V> {public InputFormat() {}public abstract List<InputSplit> getSplits(JobContext var1) throws IOException, InterruptedException;public abstract RecordReader<K, V> createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException; }

    父類規(guī)定了兩個抽象方法getSplits以及RecordReader。

    再來看看FileInputFormat計算分片大小的相關代碼:

    public List<InputSplit> getSplits(JobContext job) throws IOException {StopWatch sw = (new StopWatch()).start();long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job));long maxSize = getMaxSplitSize(job);List<InputSplit> splits = new ArrayList();List<FileStatus> files = this.listStatus(job);Iterator var9 = files.iterator();while(true) {while(true) {while(var9.hasNext()) {FileStatus file = (FileStatus)var9.next();Path path = file.getPath();long length = file.getLen();if (length != 0L) {BlockLocation[] blkLocations;if (file instanceof LocatedFileStatus) {blkLocations = ((LocatedFileStatus)file).getBlockLocations();} else {FileSystem fs = path.getFileSystem(job.getConfiguration());blkLocations = fs.getFileBlockLocations(file, 0L, length);}if (this.isSplitable(job, path)) {long blockSize = file.getBlockSize();long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);long bytesRemaining;int blkIndex;for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));}if (bytesRemaining != 0L) {blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));}} else {splits.add(this.makeSplit(path, 0L, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts()));}} else {splits.add(this.makeSplit(path, 0L, length, new String[0]));}}job.getConfiguration().setLong("mapreduce.input.fileinputformat.numinputfiles", (long)files.size());sw.stop();if (LOG.isDebugEnabled()) {LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));}return splits;}}}

    從中我們可以了解到,計算分片大小的邏輯為

    // 初始化值 long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); ... // 計算分片大小long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);...protected long computeSplitSize(long blockSize, long minSize, long maxSize) {return Math.max(minSize, Math.min(maxSize, blockSize));}... // minSize默認值為1Lprotected long getFormatMinSplitSize() {return 1L;}

    也就說,切片主要由這幾個值來運算決定

    mapreduce.input.fileinputformat.split.minsize=1 默認值為1 mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默認值Long.MAXValue

    因此,默認情況下,切片大小=blocksize。我們不難得到,要想修改分片的大小,完全可以通過配置文件的mapreduce.input.fileinputformat.split.minsize以及mapreduce.input.fileinputformat.split.maxsize進行配置:

    • mapreduce.input.fileinputformat.split.maxsize(切片最大值):參數如果調得比blocksize小,則會讓切片變小。 mapreduce.input.fileinputformat.split.minsize (切片最小值):參數調的比blockSize大,則可以讓切片變得比blocksize還大。

    2.4、繼承樹

    FileInputFormat有多個底層實現,2.7版本的jdk具有如下的繼承樹

    默認情況下Job任務使用的是

    2.5、獲取切片信息API

    // 根據文件類型獲取切片信息 FileSplit inputSplit = (FileSplit) context.getInputSplit(); // 獲取切片的文件名稱 String name = inputSplit.getPath().getName();

    3、CombineTextInputFormat切片機制

    默認情況下TextInputformat對任務的切片機制是按文件規(guī)劃切片,不管文件多小,都會是一個單獨的切片,都會交給一個maptask,這樣如果有大量小文件,就會產生大量的maptask,處理效率極其低下。最好的辦法,在數據處理系統的最前端(預處理/采集),將小文件先合并成大文件,再上傳到HDFS做后續(xù)分析。

    如果已經是大量小文件在HDFS中了,可以使用另一種InputFormat來做切片(CombineTextInputFormat),它的切片邏輯跟TextFileInputFormat不同:它可以將多個小文件從邏輯上規(guī)劃到一個切片中,這樣,多個小文件就可以交給一個maptask。

    優(yōu)先滿足最小切片大小,不超過最大切片大小

    CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m

    舉例:0.5m+1m+0.3m+5m=2m + 4.8m=2m + 4m + 0.8m

    如果不設置InputFormat,它默認用的是TextInputFormat.class,因此我們需要手動指定InputFormat類型,在執(zhí)行job之前指定:

    job.setInputFormatClass(CombineTextInputFormat.class) CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m

    通過此設置之后,分片會變得更少一些,不會像之前一樣,一個文件形成一個分片(文件過小的情況尤其浪費)。

    轉載于:https://my.oschina.net/u/3091870/blog/3000619

    總結

    以上是生活随笔為你收集整理的【hadoop】20.MapReduce-InputFormat数据切片机制的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。