日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Hadoop大数据--Mapreduce程序运行并发度

發(fā)布時間:2025/1/21 编程问答 19 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hadoop大数据--Mapreduce程序运行并发度 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
  • reduce task數(shù)量的決定機制

1、業(yè)務邏輯需要

2、數(shù)據(jù)量大小

設置方法:

job.setNumReduceTasks(5)

  • map task數(shù)量的決定機制

由于map task之間沒有協(xié)作關系,每一個map task都是各自為政,在map task的處理中沒法做“全局”性的聚合操作,所以map task的數(shù)量完全取決于所處理的數(shù)據(jù)量的大小

決定機制:

對待處理數(shù)據(jù)進行“切片”

每一個切片分配一個map task來處理

Mapreduce框架中默認的切片機制:

TextInputFormat.getSplits()繼承自FileInputFormat.getSplits()

1:定義一個切片大小:可以通過參數(shù)來調節(jié),默認情況下等于“hdfs中設置的blocksize”,通常是128M

2:獲取輸入數(shù)據(jù)目錄下所有待處理文件List

3:遍歷文件List,逐個逐個文件進行切片

?????? for(file:List)

?????? ?? 對file從0偏移量開始切,每到128M就構成一個切片,比如a.txt(200M),就會被切成兩個切片:?? a.txt: 0-128M,? a.txt :128M-256M

??????? 再比如b.txt(80M),就會切成一個切片, b.txt :0-80M

  • 如果要處理的數(shù)據(jù)是大量的小文件,使用上述這種默認切片機制,就會導致大量的切片,從而maptask進程數(shù)特別多,但是每一個切片又非常小,每個maptask的處理數(shù)據(jù)量就很小,從而,整體的效率會很低。

通用解決方案:就是將多個小文件劃分成一個切片;實現(xiàn)辦法就是自定義一個Inputformat子類重寫里面的getSplits方法;

Mapreduce框架中自帶了一個用于此場景的Inputformat實現(xiàn)類:CombineFileInputformat

數(shù)據(jù)切片與map任務數(shù)的機制

示例觀察(多文件,大文件)

源碼跟蹤

TextInputFormat源碼閱讀

isSplitable() 判斷要處理的數(shù)據(jù)是否可以做切片

getSplit()? 規(guī)劃切片信息(實現(xiàn)在FileInputFormat類中)

----TextInputformat切片邏輯: 對每一個文件單獨切片;切片大小默認等于blocksize

但是有兩個參數(shù)可以調整:

如果是大量小文件,這種切片邏輯會有重大弊端:切片數(shù)量太多,maptask太多

createRecordReader()? 構造一個記錄讀取器

?

具體讀取數(shù)據(jù)的邏輯是實現(xiàn)在LineRecordReader中 (按行讀取數(shù)據(jù),行起始偏移量作為key,行的內容作為value),比較特別的地方是:

LineRecordReader在讀取一個具體的切片時,總是忽略掉第一行(針對的是:非第一切片),總是跨split多讀一行(針對的是:非最末切片)

  • InputFormat的繼承體系

InputFormat子類介紹:

(1)TextInputFormat(默認的輸入格式類)詳解

-- 源碼結構 getsplits()? reader

-- 為何不會出現(xiàn)一行被割斷處理的原理

  • 在LineRecordReader中,對split的第一行忽略
public void initialize(InputSplit genericSplit,TaskAttemptContext context) throws IOException {FileSplit split = (FileSplit) genericSplit;Configuration job = context.getConfiguration();… ………..// open the file and seek to the start of the splitfinal FileSystem fs = file.getFileSystem(job);fileIn = fs.open(file);CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);if (null!=codec) {… … … … //我們總是將第一條記錄拋棄(文件第一個split除外) //因為我們總是在nextKeyValue ()方法中跨split多讀了一行(文件最后一個split除外)if (start != 0) {start += in.readLine(new Text(), 0, maxBytesToConsume(start));}this.pos = start;}
  • 在LineRecordReader中,nextKeyValue ()方法總是跨split多讀一行
public boolean nextKeyValue() throws IOException {if (key == null) {key = new LongWritable();}key.set(pos);if (value == null) {value = new Text();}int newSize = 0;// 使用<=來多讀取一行while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {newSize = in.readLine(value, maxLineLength,Math.max(maxBytesToConsume(pos), maxLineLength));pos += newSize;if (newSize < maxLineLength) {break;…. …. }
  • CombineTextInputFormat?
  • 它的切片邏輯跟TextInputformat完全不同:

    CombineTextInputFormat可以將多個小文件劃為一個切片

    這種機制在處理海量小文件的場景下能提高效率

    (小文件處理的機制,最優(yōu)的是將小文件先合并再處理)

    思路

    CombineFileInputFormat涉及到三個重要的屬性:

    mapred.max.split.size:同一節(jié)點或同一機架的數(shù)據(jù)塊形成切片時,切片大小的最大值;

    mapred.min.split.size.per.node:同一節(jié)點的數(shù)據(jù)塊形成切片時,切片大小的最小值;

    mapred.min.split.size.per.rack:同一機架的數(shù)據(jù)塊形成切片時,切片大小的最小值。

    切片形成過程:

    (1)逐個節(jié)點(數(shù)據(jù)塊)形成切片;

    a.遍歷并累加這個節(jié)點上的數(shù)據(jù)塊,如果累加數(shù)據(jù)塊大小大于或等于mapred.max.split.size,則將這些數(shù)據(jù)塊形成一個切片,繼承該過程,直到剩余數(shù)據(jù)塊累加大小小于mapred.max.split.size,則進行下一步;

    b.如果剩余數(shù)據(jù)塊累加大小大于或等于mapred.min.split.size.per.node,則將這些剩余數(shù)據(jù)塊形成一個切片,如果剩余數(shù)據(jù)塊累加大小小于mapred.min.split.size.per.node,則這些數(shù)據(jù)塊留待后續(xù)處理。

    (2)逐個機架(數(shù)據(jù)塊)形成切片;

    a.遍歷并累加這個機架上的數(shù)據(jù)塊(這些數(shù)據(jù)塊即為上一步遺留下來的數(shù)據(jù)塊),如果累加數(shù)據(jù)塊大小大于或等于mapred.max.split.size,則將這些數(shù)據(jù)塊形成一個切片,繼承該過程,直到剩余數(shù)據(jù)塊累加大小小于mapred.max.split.size,則進行下一步;

    b.如果剩余數(shù)據(jù)塊累加大小大于或等于mapred.min.split.size.per.rack,則將這些剩余數(shù)據(jù)塊形成一個切片,如果剩余數(shù)據(jù)塊累加大小小于mapred.min.split.size.per.rack,則這些數(shù)據(jù)塊留待后續(xù)處理。

    (3)遍歷并累加剩余數(shù)據(jù)塊,如果數(shù)據(jù)塊大小大于或等于mapred.max.split.size,則將這些數(shù)據(jù)塊形成一個切片,繼承該過程,直到剩余數(shù)據(jù)塊累加大小小于mapred.max.split.size,則進行下一步;

    (4)剩余數(shù)據(jù)塊形成一個切片。

    核心實現(xiàn)

    // mapping from a rack name to the list of blocks it has HashMap<String,List<OneBlockInfo>> rackToBlocks = new HashMap<String,List<OneBlockInfo>>(); // mapping from a block to the nodes on which it has replicas HashMap<OneBlockInfo,String[]> blockToNodes = new HashMap<OneBlockInfo,String[]>(); // mapping from a node to the list of blocks that it contains HashMap<String,List<OneBlockInfo>> nodeToBlocks = new HashMap<String,List<OneBlockInfo>>();

    開始形成切片之前,需要初始化三個重要的映射關系:

    rackToBlocks:機架和數(shù)據(jù)塊的對應關系,即某一個機架上有哪些數(shù)據(jù)塊;

    blockToNodes:數(shù)據(jù)塊與節(jié)點的對應關系,即一塊數(shù)據(jù)塊的“拷貝”位于哪些節(jié)點;

    nodeToBlocks:節(jié)點和數(shù)據(jù)塊的對應關系,即某一個節(jié)點上有哪些數(shù)據(jù)塊;

    初始化過程如下代碼所示,其中每一個Path代表的文件被形成一個OneFileInfo對象,映射關系也在形成OneFileInfo的過程中被維護。

    // populate all the blocks for all fileslong totLength = 0; for (int i = 0; i < paths.length; i++) {files[i] = new OneFileInfo(paths[i], job, rackToBlocks, blockToNodes, nodeToBlocks, rackToNodes);totLength += files[i].getLength(); }
  • 逐個節(jié)點(數(shù)據(jù)塊)形成切片,代碼如下:
  • // 保存當前切片所包含的數(shù)據(jù)塊ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();// 保存當前切片中的數(shù)據(jù)塊屬于哪些節(jié)點ArrayList<String> nodes = new ArrayList<String>();// 保存當前切片的大小long curSplitSize = 0;// process all nodes and create splits that arelocalto a node. // 依次處理每個節(jié)點上的數(shù)據(jù)塊for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator(); iter.hasNext();) {Map.Entry<String, List<OneBlockInfo>> one = iter.next();nodes.add(one.getKey());List<OneBlockInfo> blocksInNode = one.getValue();// for each block, copy it into validBlocks. Delete it from blockToNodes so that the same block does not appear in// two different splits.// 依次處理每個數(shù)據(jù)塊,注意blockToNodes變量的作用,它保證了同一數(shù)據(jù)塊不會出現(xiàn)在兩個切片中for (OneBlockInfo oneblock : blocksInNode) {if (blockToNodes.containsKey(oneblock)) {validBlocks.add(oneblock);blockToNodes.remove(oneblock);curSplitSize += oneblock.length;// if the accumulated split size exceeds the maximum, then create this split.// 如果數(shù)據(jù)塊累積大小大于或等于maxSize,則形成一個切片if (maxSize != 0 && curSplitSize >= maxSize) {//create an input split andadd it to the splits array addCreatedSplit(job, splits, nodes, validBlocks);curSplitSize = 0;validBlocks.clear();}}}// if there were any blocks left over and their combined size is// larger than minSplitNode, then combine them into one split.// Otherwise add them back to the unprocessed pool. It is likely // that they will be combined with other blocks from the same rack later on.// 如果剩余數(shù)據(jù)塊大小大于或等于minSizeNode,則將這些數(shù)據(jù)塊構成一個切片;// 如果剩余數(shù)據(jù)塊大小小于minSizeNode,則將這些數(shù)據(jù)塊歸還給blockToNodes,交由后期“同一機架”過程處理if (minSizeNode != 0 && curSplitSize >= minSizeNode) {//create an input split andadd it to the splits array addCreatedSplit(job, splits, nodes, validBlocks);} else {for (OneBlockInfo oneblock : validBlocks) {blockToNodes.put(oneblock, oneblock.hosts);}}validBlocks.clear();nodes.clear();curSplitSize = 0;}

    (2)逐個機架(數(shù)據(jù)塊)形成切片,代碼如下:

    // if blocks in a rack are below the specified minimum size, then keep them// in 'overflow'. After the processing of all racks is complete, these overflow// blocks will be combined into splits.// overflowBlocks用于保存“同一機架”過程處理之后剩余的數(shù)據(jù)塊ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();ArrayList<String> racks = new ArrayList<String>();// Process all racks over and over again until there is no more work to do.while (blockToNodes.size() > 0) {//Create one split for this rack before moving over to the next rack. // Come back to this rack after creating a single split for each of the // remaining racks.// Process one rack location at a time, Combine all possible blocks that// reside on this rack as one split. (constrained by minimum and maximum// split size).// iterate over all racks // 依次處理每個機架for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = rackToBlocks.entrySet().iterator(); iter.hasNext();) {Map.Entry<String, List<OneBlockInfo>> one = iter.next();racks.add(one.getKey());List<OneBlockInfo> blocks = one.getValue();// for each block, copy it into validBlocks. Delete it from// blockToNodes so that the same block does not appear in// two different splits.boolean createdSplit = false;// 依次處理該機架的每個數(shù)據(jù)塊for (OneBlockInfo oneblock : blocks) {if (blockToNodes.containsKey(oneblock)) {validBlocks.add(oneblock);blockToNodes.remove(oneblock);curSplitSize += oneblock.length;// if the accumulated split size exceeds the maximum, then create this split.// 如果數(shù)據(jù)塊累積大小大于或等于maxSize,則形成一個切片if (maxSize != 0 && curSplitSize >= maxSize) {//create an input split andadd it to the splits array addCreatedSplit(job, splits, getHosts(racks), validBlocks);createdSplit = true;break;}}}// if we created a split, then just go to the next rackif (createdSplit) {curSplitSize = 0;validBlocks.clear();racks.clear();continue;}if (!validBlocks.isEmpty()) {// 如果剩余數(shù)據(jù)塊大小大于或等于minSizeRack,則將這些數(shù)據(jù)塊構成一個切片if (minSizeRack != 0 && curSplitSize >= minSizeRack) {// if there is a mimimum size specified, then create a single split// otherwise, store these blocks into overflow data structure addCreatedSplit(job, splits, getHosts(racks), validBlocks);} else {// There were a few blocks in this rack that remained to be processed.// Keep them in 'overflow' block list. These will be combined later.// 如果剩余數(shù)據(jù)塊大小小于minSizeRack,則將這些數(shù)據(jù)塊加入overflowBlocks overflowBlocks.addAll(validBlocks);}}curSplitSize = 0;validBlocks.clear();racks.clear();}}

    (3)遍歷并累加剩余數(shù)據(jù)塊,代碼如下:

    // Process all overflow blocksfor (OneBlockInfo oneblock : overflowBlocks) {validBlocks.add(oneblock); curSplitSize += oneblock.length;// This might cause an exiting rack location to be re-added, // but it should be ok.for (int i = 0; i < oneblock.racks.length; i++) {racks.add(oneblock.racks[i]); } // if the accumulated split size exceeds the maximum, then //create this split. // 如果剩余數(shù)據(jù)塊大小大于或等于maxSize,則將這些數(shù)據(jù)塊構成一個切片if (maxSize != 0 && curSplitSize >= maxSize) { //create an input split andadd it to the splits array addCreatedSplit(job, splits, getHosts(racks), validBlocks); curSplitSize = 0;validBlocks.clear();racks.clear();}}

    (4)剩余數(shù)據(jù)塊形成一個切片,代碼如下:

    // Process any remaining blocks, if any.if (!validBlocks.isEmpty()) {addCreatedSplit(job, splits, getHosts(racks), validBlocks);}

    總結

    CombineFileInputFormat形成切片過程中考慮數(shù)據(jù)本地性(同一節(jié)點、同一機架),首先處理同一節(jié)點的數(shù)據(jù)塊,然后處理同一機架的數(shù)據(jù)塊,最后處理剩余的數(shù)據(jù)塊,可見本地性是逐步減弱的。另外CombineFileInputFormat是抽象的,具體使用時需要自己實現(xiàn)getRecordReader方法。

    (3)SequenceFileInputFormat/SequenceFileOutputFormat

    sequenceFile是hadoop中非常重要的一種數(shù)據(jù)格式

    sequenceFile文件內部的數(shù)據(jù)組織形式是:K-V對

    讀入/寫出為hadoop序列文件

    ?

    ?

    ?

    ?

    ?

    ?

    ?

    ?

    ?

    ?

    ?

    ?

    ?

    ?

    ?

    ?

    總結

    以上是生活随笔為你收集整理的Hadoop大数据--Mapreduce程序运行并发度的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。