MapReduce程序的运行全貌
? ? ? ? 為了更詳細地探討mapper和reducer之間的關系,并揭示Hadoop的一些內部工作機理,現在我們將全景呈現WordCount是如
何執行的,序號并非完全按照上圖。
1 . 啟動
? ? ? ? 調用驅動中的Job.waitForCompletion()是所有行動的開始。該驅動程序是唯一一段運行在本地機器上的代碼,上述調用開
啟了本地主機與JobTracker的通信。請記住,JobTracker負責作業調度和執行的各個方面,所以當執行任何與作業管理相關的
任務時,它成為了我們的主要接口,JobTracker代表我們與NameNode通信,并對儲存在HDFS上的數據相關的所有交互進行
管理。
2 . 將輸入分塊
? ? ? ? 這些交互首先發生在JobTracker接受輸入數據,并確定如何將其分配給map任務的時候。回想一下,HDFS文件通常被分
成至少64MB的數據塊,JobTracker會將每個數據塊分配給一個map任務。
? ? ? ? 當然,WordCount示例涉及的數量是微不足道的,它剛好適合放在一個數據塊中。設想一個更大的以TB為單位的輸入文
件,切分模型變得更有意義。每段文件(或用MapReduce術語來講,每個split)由一個map作業處理。
? ? ? ? 一旦對各分塊完成了運算,JobTracker就會將它們和包含Mapper與Reducer類的JAR文件放置在HDFS上作業專用的目
錄,而該路徑在任務開始時將被傳遞給每個任務。
3 . 任務分配
? ? ? ? 一旦JobTracker確定了所需的map任務數,它就會檢查集群中的主機數,正在運行的TaskTracker數以及可并發執行的
map任務數(用戶自定義的配置變量)。JobTracker也會查看各個輸入數據塊在集群中的分布位置,并嘗試定義一個執行計
劃,使TaskTracker盡可能處理位于相同物理主機上的數據塊。或者即使做不到這一點,TaskTracker至少處理一個位于相同
硬件機架中的數據塊。
? ? ? ? 數據局部優化是Hadoop能高效處理巨大數據集的一個關鍵原因。默認情況下,每個數據塊會被復制到三臺不同主機。
所以,在本地處理大部分數據塊的任務/主機計劃比起初預想的可能性更高。
4 . 任務啟動
? ? ? ? 然后,每個TaskTracker開啟一個獨立的Java虛擬機來執行任務。這確實增加了啟動時間損失,但它將因錯誤運行map
或reduce任務所引發的問題與TaskTracker隔離開來,而且可以將它配置成在隨后執行的任務之間共享。
? ? ? ? 如果集群有足夠的能力一次性執行所有的map任務,它們將會被全部啟動,并獲得它們將要處理的分塊數據和作業JAR文
件。每個TaskTracker隨后將分塊復制到本地文件系統。
如果任務數超出了集群處理能力,JobTracker將維護一個掛起隊列,并在節點完成最初分配的map任務后,將掛起任務分
配給節點。
? ? ? ? 現在,我們準備查看map任務執行完畢的數據。聽起來工作量似乎很大,事實卻是如此。這也解釋了在運行任意
MapReduce作業時,為什么系統啟動及執行上述步驟會花費大量時間。
5 . 不斷監視JobTracker
? ? ? ? 現在,JobTracker執行所有的mapper和reducer。它不斷地與TaskTracker交換心跳和狀態消息,查找進度或問題的證
據。它還從整個作業執行過程的所有任務中收集指標,其中一些指標是Hadoop提供的,還有一些是map和reduce任務的開發
人員指定的,不過本例中我們沒有使用任何指標。
6 . mapper的輸入
假設輸入文件是一個極為普通的兩行文本文件。
? ? ? ? This is ?a ?test.
? ? ? ? Yes this is
? ? ? ?驅動類使用TextInputFormat指定了輸入文件的格式和結構,因此,Hadoop會把輸入文件看做以偏移量為鍵并以該行內容
為值的文本。因此mapper的兩次調用將被賦予以下輸入。
? ? ? ? 0 ?This is a test.
? ? ? ?15 ?Yes ?this ?is?
7 . mapper的執行
? ? ? ? 根據作業配置的方式,mapper接收到的鍵/值對分別是相應行在文件中多我們不關心每行文本在文件中的位置,所以
WordCountMapper類的map方法舍棄了鍵,并使用標準的Java String類的split方法將每行文本內容拆分成詞。需要注意的是,
使用正則表達式或StringTokenizer類可以更好地斷詞,但對于我們的需求,這種簡單的方法就足夠了。
? ? ? ? 然后,針對每個單獨的詞,mapper輸出由單詞本身組成的鍵和值1。
注意:
? ? ? ? 以靜態變量的形式創建IntWritable對象,并在每次調用時復用該對象。這樣做的原因是,盡管它對我們小型的輸入文件幫助不大,但處理巨大數
據集時,可能會對mapper進行成千上萬次調用。如果每次調用都為輸出的鍵和值創建一個新對象,這將消耗大量的資源,同時垃圾回收會引發更頻繁
的停滯。我們使用這個值,知道Context .write方法不會對其進行改動。
8 . mapper的輸出和reducer的輸入
? ? ? ? mapper的輸出是一系列形式為(word,1)的鍵值對。本例中,mapper的輸出為:
? ? ? ? ( This , 1 ) , ( is , 1 ) , ( a , 1 ) , ( test. , 1 ) , ( Yes , 1 ) , ( this , 1 ) , ( is , 1 )
? ? ? ? 這些從mapper輸出的鍵值對并不會直接傳給reducer。在map和reduce之間,還有一個shuffle階段,這也是許多
MapReduce奇跡發生的地方。
9 . 分塊
? ? ? ? Reduce接口的隱性保證之一是與給定鍵相關的所有值都會被提交到同一個reducer。由于一個集群中運行著多個reduce
任務,因此,每個mapper的輸出必須被分塊,使其分別傳入相應的各個reducer。這些分塊文件保存在本地節點的文件系統。
集群中的Reduce任務數并不像mapper數量一樣是動態,事實上,我們可以在作業提交階段指定reduce任務數。因此,每
個TaskTracker就知道集群中有多少個reducer,并據此得知mapper輸出應切為多少塊。
假如reducer失敗了,會給本次計算帶來什么影響呢?
? ? ? ? JobTracker會保證重新執行發生故障的reduce任務,可能是在不同的節點重新執行,因此臨時故障不是問題。更為嚴重的
問題是,數據塊中的數據敏感性缺陷或錯誤數據可能導致整個作業失敗,除非采取一些手段。
10 . 可選分塊函數
Partitioner類在org.apache.hadoop.mapreduce包中,該抽象類具有如下特征:
public abstract class Partitioner<Key, Value>{public abstract int getPartition( Key key, Value value,int numPartitions); }? ? ? ? 默認情況下,Hadoop將對輸出的鍵進行哈希運算,從而實現分塊。此功能由org.apache.hadoop.mapreduce.lib.
partition包里的HashPartitioner類實現,但某些情況下,用戶有必要提供一個自定義的partitioner子類,在該子類中實現針對具
體應用的分塊邏輯。特別是當應用標準哈希函數導致數據分布極不均勻時,自定義partitioner子類尤為必要。
11 . reducer類的輸入
? ? ? ? reducer的TaskTracker從JobTracker接收更新,這些更新指明了集群中哪些節點承載著map的輸出分塊,這些分塊將由本
地reduce任務處理。之后,TaskTracker從各個節點獲取分塊,并將它們合并為一個文件反饋給reduce任務。
12 . reducer類的執行
? ? ? ? 我們實現的WordCountReducer類很簡單。針對每個詞,該類僅對數組中的元素數目進行統計并為每個詞輸出最終的
(Word,count)鍵值對。
? ? ? ? reducer的調用次數通常小于mapper的調用次數,因此調用reducer帶來的開銷無需特別在意。
13 . reducer類的輸出
? ? ? ? ? 因此,本例中的 reducer的最終輸出集合為:
? ? ? ? ( This , 1 ),( is , 2 ) , ( a , 1 ) , ( test. , 1 ) , ( Yes , 1 ) , ( this , 1 )
? ? ? ? ?這些數據將被輸出到驅動程序指定的輸出路徑下的分塊文件中,并將使用指定的OutputFormat對其進行格式化。每個
reduce任務寫入一個以part -r-nnnnn為文件名的文件,其中nnnnn從00000開始并逐步遞增。
14 . 關機 ? ?
? ? ? ? 一旦成功完成所有任務,JobTracker向客戶端輸出作業的最終狀態,以及作業運行過程中一些比較重要的計數器集合。
完整的作業和任務歷史記錄存儲在每個節點的日志路徑中,通過JobTracker的網絡用戶接口更易于訪問,只需將瀏覽器指向
JobTracker節點的50030端口即可。
15 . 這就是MapReduce的全部
? ? ? ? 如你所見,Hadoop為每個MapReduce程序提供了大量機制,同時,Hadoop提供的框架在許多方面都進行了簡化。如前
所述,對于WordCount這樣的小程序來說,MapReduce的大部分機制并沒有多大價值,但是不要忘了,無論在本地Hadoop或
是集群上,我們可以使用相同的軟件和mapper/reducer在巨大的集群上對更大的數據集進行字數統計。那時,Hadoop所做的
大量工作使用戶能夠在如此大的數據集上進行數據分析。否則,手工實現代碼分發、代碼同步以及并行運算將付出超乎想象的
努力。
16 . 也許缺了combiner
? ? ? ? 前面講述了MapReduce程序運行的各個步驟,卻漏掉了另外一個可選步驟。在reducer獲取map方法的輸出之前,
Hadoop允許使用combiner類對map方法的輸出執行一些前期的排序操作。
為什么要有combiner?
? ? ? ? Hadoop設計的前提是,減少作業中成本較高的部分,通常指磁盤和網絡輸入輸出。mapper的輸出往往是巨大的---它的大
小通常是原始輸入數據的許多倍。Hadoop的一些配置選項可以幫助減少reducer在網絡上傳輸如此大的數據帶來的性能影響。
combiner則采取了不同的方法,它對數據進行早期聚合以減少所需傳輸的數據量。
? ? ? ? combiner沒有自己的接口,它必須具有與reducer相同的特征,因此也要繼承org.apache.hadoop.mapreduce包里的
Reduce類。這樣做的效果主要是,在map節點上對發往各個reducer的輸出執行mini-reduce操作。
? ? ? ? Hadoop不保證combiner是否被執行。有時候,它可能根本不執行,而某些時候,它可能被執行一次、兩次甚至多次,這
取決與mapper為每個reducer生成的輸出文件的大小和數量。
參自《Hadoop基礎教程》
學之,以記之。
轉載于:https://www.cnblogs.com/baalhuo/p/5762088.html
總結
以上是生活随笔為你收集整理的MapReduce程序的运行全貌的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java学习笔记之折半查找法(二分法)
- 下一篇: python_Django之模板模型