Hadoop大数据--Mapreduce程序运行并发度
-
reduce task數(shù)量的決定機制
1、業(yè)務邏輯需要
2、數(shù)據(jù)量大小
設置方法:
job.setNumReduceTasks(5)
-
map task數(shù)量的決定機制
由于map task之間沒有協(xié)作關系,每一個map task都是各自為政,在map task的處理中沒法做“全局”性的聚合操作,所以map task的數(shù)量完全取決于所處理的數(shù)據(jù)量的大小
決定機制:
對待處理數(shù)據(jù)進行“切片”
每一個切片分配一個map task來處理
Mapreduce框架中默認的切片機制:
TextInputFormat.getSplits()繼承自FileInputFormat.getSplits()
1:定義一個切片大小:可以通過參數(shù)來調節(jié),默認情況下等于“hdfs中設置的blocksize”,通常是128M
2:獲取輸入數(shù)據(jù)目錄下所有待處理文件List
3:遍歷文件List,逐個逐個文件進行切片
?????? for(file:List)
?????? ?? 對file從0偏移量開始切,每到128M就構成一個切片,比如a.txt(200M),就會被切成兩個切片:?? a.txt: 0-128M,? a.txt :128M-256M
??????? 再比如b.txt(80M),就會切成一個切片, b.txt :0-80M
- 如果要處理的數(shù)據(jù)是大量的小文件,使用上述這種默認切片機制,就會導致大量的切片,從而maptask進程數(shù)特別多,但是每一個切片又非常小,每個maptask的處理數(shù)據(jù)量就很小,從而,整體的效率會很低。
通用解決方案:就是將多個小文件劃分成一個切片;實現(xiàn)辦法就是自定義一個Inputformat子類重寫里面的getSplits方法;
Mapreduce框架中自帶了一個用于此場景的Inputformat實現(xiàn)類:CombineFileInputformat
數(shù)據(jù)切片與map任務數(shù)的機制
示例觀察(多文件,大文件)
源碼跟蹤
TextInputFormat源碼閱讀
isSplitable() 判斷要處理的數(shù)據(jù)是否可以做切片
getSplit()? 規(guī)劃切片信息(實現(xiàn)在FileInputFormat類中)
----TextInputformat切片邏輯: 對每一個文件單獨切片;切片大小默認等于blocksize
但是有兩個參數(shù)可以調整:
如果是大量小文件,這種切片邏輯會有重大弊端:切片數(shù)量太多,maptask太多
createRecordReader()? 構造一個記錄讀取器
?具體讀取數(shù)據(jù)的邏輯是實現(xiàn)在LineRecordReader中 (按行讀取數(shù)據(jù),行起始偏移量作為key,行的內容作為value),比較特別的地方是:
LineRecordReader在讀取一個具體的切片時,總是忽略掉第一行(針對的是:非第一切片),總是跨split多讀一行(針對的是:非最末切片)
-
InputFormat的繼承體系
InputFormat子類介紹:
(1)TextInputFormat(默認的輸入格式類)詳解
-- 源碼結構 getsplits()? reader
-- 為何不會出現(xiàn)一行被割斷處理的原理
- 在LineRecordReader中,對split的第一行忽略
- 在LineRecordReader中,nextKeyValue ()方法總是跨split多讀一行
它的切片邏輯跟TextInputformat完全不同:
CombineTextInputFormat可以將多個小文件劃為一個切片
這種機制在處理海量小文件的場景下能提高效率
(小文件處理的機制,最優(yōu)的是將小文件先合并再處理)
思路
CombineFileInputFormat涉及到三個重要的屬性:
mapred.max.split.size:同一節(jié)點或同一機架的數(shù)據(jù)塊形成切片時,切片大小的最大值;
mapred.min.split.size.per.node:同一節(jié)點的數(shù)據(jù)塊形成切片時,切片大小的最小值;
mapred.min.split.size.per.rack:同一機架的數(shù)據(jù)塊形成切片時,切片大小的最小值。
切片形成過程:
(1)逐個節(jié)點(數(shù)據(jù)塊)形成切片;
a.遍歷并累加這個節(jié)點上的數(shù)據(jù)塊,如果累加數(shù)據(jù)塊大小大于或等于mapred.max.split.size,則將這些數(shù)據(jù)塊形成一個切片,繼承該過程,直到剩余數(shù)據(jù)塊累加大小小于mapred.max.split.size,則進行下一步;
b.如果剩余數(shù)據(jù)塊累加大小大于或等于mapred.min.split.size.per.node,則將這些剩余數(shù)據(jù)塊形成一個切片,如果剩余數(shù)據(jù)塊累加大小小于mapred.min.split.size.per.node,則這些數(shù)據(jù)塊留待后續(xù)處理。
(2)逐個機架(數(shù)據(jù)塊)形成切片;
a.遍歷并累加這個機架上的數(shù)據(jù)塊(這些數(shù)據(jù)塊即為上一步遺留下來的數(shù)據(jù)塊),如果累加數(shù)據(jù)塊大小大于或等于mapred.max.split.size,則將這些數(shù)據(jù)塊形成一個切片,繼承該過程,直到剩余數(shù)據(jù)塊累加大小小于mapred.max.split.size,則進行下一步;
b.如果剩余數(shù)據(jù)塊累加大小大于或等于mapred.min.split.size.per.rack,則將這些剩余數(shù)據(jù)塊形成一個切片,如果剩余數(shù)據(jù)塊累加大小小于mapred.min.split.size.per.rack,則這些數(shù)據(jù)塊留待后續(xù)處理。
(3)遍歷并累加剩余數(shù)據(jù)塊,如果數(shù)據(jù)塊大小大于或等于mapred.max.split.size,則將這些數(shù)據(jù)塊形成一個切片,繼承該過程,直到剩余數(shù)據(jù)塊累加大小小于mapred.max.split.size,則進行下一步;
(4)剩余數(shù)據(jù)塊形成一個切片。
核心實現(xiàn)
// mapping from a rack name to the list of blocks it has HashMap<String,List<OneBlockInfo>> rackToBlocks = new HashMap<String,List<OneBlockInfo>>(); // mapping from a block to the nodes on which it has replicas HashMap<OneBlockInfo,String[]> blockToNodes = new HashMap<OneBlockInfo,String[]>(); // mapping from a node to the list of blocks that it contains HashMap<String,List<OneBlockInfo>> nodeToBlocks = new HashMap<String,List<OneBlockInfo>>();開始形成切片之前,需要初始化三個重要的映射關系:
rackToBlocks:機架和數(shù)據(jù)塊的對應關系,即某一個機架上有哪些數(shù)據(jù)塊;
blockToNodes:數(shù)據(jù)塊與節(jié)點的對應關系,即一塊數(shù)據(jù)塊的“拷貝”位于哪些節(jié)點;
nodeToBlocks:節(jié)點和數(shù)據(jù)塊的對應關系,即某一個節(jié)點上有哪些數(shù)據(jù)塊;
初始化過程如下代碼所示,其中每一個Path代表的文件被形成一個OneFileInfo對象,映射關系也在形成OneFileInfo的過程中被維護。
// populate all the blocks for all fileslong totLength = 0; for (int i = 0; i < paths.length; i++) {files[i] = new OneFileInfo(paths[i], job, rackToBlocks, blockToNodes, nodeToBlocks, rackToNodes);totLength += files[i].getLength(); }(2)逐個機架(數(shù)據(jù)塊)形成切片,代碼如下:
// if blocks in a rack are below the specified minimum size, then keep them// in 'overflow'. After the processing of all racks is complete, these overflow// blocks will be combined into splits.// overflowBlocks用于保存“同一機架”過程處理之后剩余的數(shù)據(jù)塊ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();ArrayList<String> racks = new ArrayList<String>();// Process all racks over and over again until there is no more work to do.while (blockToNodes.size() > 0) {//Create one split for this rack before moving over to the next rack. // Come back to this rack after creating a single split for each of the // remaining racks.// Process one rack location at a time, Combine all possible blocks that// reside on this rack as one split. (constrained by minimum and maximum// split size).// iterate over all racks // 依次處理每個機架for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = rackToBlocks.entrySet().iterator(); iter.hasNext();) {Map.Entry<String, List<OneBlockInfo>> one = iter.next();racks.add(one.getKey());List<OneBlockInfo> blocks = one.getValue();// for each block, copy it into validBlocks. Delete it from// blockToNodes so that the same block does not appear in// two different splits.boolean createdSplit = false;// 依次處理該機架的每個數(shù)據(jù)塊for (OneBlockInfo oneblock : blocks) {if (blockToNodes.containsKey(oneblock)) {validBlocks.add(oneblock);blockToNodes.remove(oneblock);curSplitSize += oneblock.length;// if the accumulated split size exceeds the maximum, then create this split.// 如果數(shù)據(jù)塊累積大小大于或等于maxSize,則形成一個切片if (maxSize != 0 && curSplitSize >= maxSize) {//create an input split andadd it to the splits array addCreatedSplit(job, splits, getHosts(racks), validBlocks);createdSplit = true;break;}}}// if we created a split, then just go to the next rackif (createdSplit) {curSplitSize = 0;validBlocks.clear();racks.clear();continue;}if (!validBlocks.isEmpty()) {// 如果剩余數(shù)據(jù)塊大小大于或等于minSizeRack,則將這些數(shù)據(jù)塊構成一個切片if (minSizeRack != 0 && curSplitSize >= minSizeRack) {// if there is a mimimum size specified, then create a single split// otherwise, store these blocks into overflow data structure addCreatedSplit(job, splits, getHosts(racks), validBlocks);} else {// There were a few blocks in this rack that remained to be processed.// Keep them in 'overflow' block list. These will be combined later.// 如果剩余數(shù)據(jù)塊大小小于minSizeRack,則將這些數(shù)據(jù)塊加入overflowBlocks overflowBlocks.addAll(validBlocks);}}curSplitSize = 0;validBlocks.clear();racks.clear();}}(3)遍歷并累加剩余數(shù)據(jù)塊,代碼如下:
// Process all overflow blocksfor (OneBlockInfo oneblock : overflowBlocks) {validBlocks.add(oneblock); curSplitSize += oneblock.length;// This might cause an exiting rack location to be re-added, // but it should be ok.for (int i = 0; i < oneblock.racks.length; i++) {racks.add(oneblock.racks[i]); } // if the accumulated split size exceeds the maximum, then //create this split. // 如果剩余數(shù)據(jù)塊大小大于或等于maxSize,則將這些數(shù)據(jù)塊構成一個切片if (maxSize != 0 && curSplitSize >= maxSize) { //create an input split andadd it to the splits array addCreatedSplit(job, splits, getHosts(racks), validBlocks); curSplitSize = 0;validBlocks.clear();racks.clear();}}(4)剩余數(shù)據(jù)塊形成一個切片,代碼如下:
// Process any remaining blocks, if any.if (!validBlocks.isEmpty()) {addCreatedSplit(job, splits, getHosts(racks), validBlocks);}總結
CombineFileInputFormat形成切片過程中考慮數(shù)據(jù)本地性(同一節(jié)點、同一機架),首先處理同一節(jié)點的數(shù)據(jù)塊,然后處理同一機架的數(shù)據(jù)塊,最后處理剩余的數(shù)據(jù)塊,可見本地性是逐步減弱的。另外CombineFileInputFormat是抽象的,具體使用時需要自己實現(xiàn)getRecordReader方法。
(3)SequenceFileInputFormat/SequenceFileOutputFormat
sequenceFile是hadoop中非常重要的一種數(shù)據(jù)格式
sequenceFile文件內部的數(shù)據(jù)組織形式是:K-V對
讀入/寫出為hadoop序列文件
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
總結
以上是生活随笔為你收集整理的Hadoop大数据--Mapreduce程序运行并发度的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spring MVC在参数绑定前通过Fi
- 下一篇: Collections工具类常用API使