用Hadoop1.0.3实现KMeans算法
從理論上來講用MapReduce技術實現KMeans算法是很Natural的想法:在Mapper中逐個計算樣本點離哪個中心最近,然后Emit(樣本點所屬的簇編號,樣本點);在Reducer中屬于同一個質心的樣本點在一個鏈表中,方便我們計算新的中心,然后Emit(質心編號,質心)。但是技術上的事并沒有理論層面那么簡單。
Mapper和Reducer都要用到K個中心(我習慣稱之為質心),Mapper要讀這些質心,Reducer要寫這些質心。另外Mapper還要讀存儲樣本點的數據文件。我先后嘗試以下3種方法,只有第3種是可行的,如果你不想被我誤導,請直接跳過前兩種。
一、用一個共享變量在存儲K個質心
由于K很小,所以我們認為用一個Vector<Sample>來存儲K個質心是沒有問題的。以下代碼是錯誤的:
| class?MyJob extends Tool{ static?Vector<Sample> centers=new?Vector<Sample>(K); static?class?MyMapper extends Mapper{ //read centers } static?class?MyMapper extends Reducer{ //update centers } void?run(){ until ( convergence ){ map(); reduce(); } } |
發生這種錯誤是因為對hadoop執行流程不清楚,對數據流不清楚。簡單地說Mapper和Reducer作為MyJob的內部靜態類,它們應該是獨立的--它們不應該與MyJob有任何交互,因為Mapper和Reducer分別在Task Tracker的不同JVM中運行,而MyJob以及MyJob的內部其他類都在客戶端上運行,自然不能在不同的JVM中共享一個變量。
詳細的流程是這樣的:
首先在客戶端上,JVM加載MyJob時先初始化靜態變量,執行static塊。然后提交作業到Job Tracker。
在Job Tracker上,分配Mapper和Reducer到不同的Task Tracker上。Mapper和Reducer線程獲得了MyJob類靜態變量的初始拷貝(這份拷貝是指MyJob執行完靜態塊之后靜態變量的模樣)。
在Task Tracker上,Mapper和Reducer分別地讀寫MyJob的靜態變量的本地拷貝,但是并不影響原始的MyJob中的靜態變量的值。
二、用分布式緩存文件存儲K個質心
既然不能通過共享外部類變量的方式,那我們通過文件在map和reduce之間傳遞數據總可以吧,Mapper從文件中讀取質心,Reducer把更新后的質心再寫入這個文件。這里的問題是:如果確定要把質心放在文件中,那Mapper就需要從2個文件中讀取數據--質心文件和樣本數據文件。雖然有MutipleInputs可以指定map()的輸入文件有多個,并可以為每個輸入文件分別指定解析方式,但是MutipleInputs不能保證每條記錄從不同文件中傳給map()的順序。在我們的KMeans中,我們希望質心文件全部被讀入后再逐條讀入樣本數據。
于是乎就想到了DistributedCache,它主要用于Mapper和Reducer之間共享數據。DistributedCacheFile是緩存在本地文件,在Mapper和Reducer中都可使用本地Java I/O的方式讀取它。于是我又有了一個錯誤的思路:
| class?MyMaper{ ????Vector<Sample> centers=new?Vector<Sample>(K); ????void?setup(){ ????????//讀取cacheFile,給centers賦值 ????} ????void?map(){ ????????//計算樣本離哪個質心最近 ????} } class?MyReducer{ ????Vector<Sample> centers=new?Vector<Sample>(K); ????void?reduce(){ ????????//更新centers ????} ????void?cleanup(){ ????????//把centers寫回cacheFile ????} } |
錯因:DistributedCacheFile是只讀的,在任務運行前,TaskTracker從JobTracker文件系統復制文件到本地磁盤作為緩存,這是單向的復制,是不能寫回的。試想在分布式環境下,如果不同的mapper和reducer可以把緩存文件寫回的話,那豈不又需要一套復雜的文件共享機制,嚴重地影響hadoop執行效率。
三、用分布式緩存文件存儲樣本數據
其實DistributedCache還有一個特點,它更適合于“大文件”(各節點內存容不下)緩存在本地。僅存儲了K個質心的文件顯然是小文件,與之相比樣本數據文件才是大文件。
此時我們需要2個質心文件:一個存放上一次的質心prevCenterFile,一個存放reducer更新后的質心currCenterFile。Mapper從prevCenterFile中讀取質心,Reducer把更新后有質心寫入currCenterFile。在Driver中讀入prevCenterFile和currCenterFile,比較前后兩次的質心是否相同(或足夠地接近),如果相同則停止迭代,否則就用currCenterFile覆蓋prevCenterFile(使用fs.rename),進入下一次的迭代。
這時候Mapper就是這樣的:
| class?MyMaper{ ????Vector<Sample> centers=new?Vector<Sample>(K); ????void?map(){ ????????//逐條讀取質心,給centers賦值 ????} ????void?cleanup(){ ????????//逐行讀取cacheFile,計算每個樣本點離哪個質心最近 ????????//然后Emit(樣本點所屬的簇編號,樣本點) ????} } |
源代碼
試驗數據是在Mahout項目中作為example提供的,600個樣本點,每個樣本是一個60維的浮點向量。點擊下載
為樣本數據建立一個類Sample.java。
+ View CodeKMeans.java
+ View Code注意在Driver中創建Job實例時一定要把Configuration類型的參數傳遞進去,否則在Mapper或Reducer中調用DistributedCache.getLocalCacheFiles(context.getConfiguration());返回值就為null。因為空構造函數的Job采用的Configuration是從hadoop的配置文件中讀出來的(使用new Configuration()創建的Configuration就是從hadoop的配置文件中讀出來的),請注意在main()函數中有一句:DistributedCache.addCacheFile(dataFile.toUri(), conf);即此時的Configuration中多了一個DistributedCacheFile,所以你需要把這個Configuration傳遞給Job構造函數,如果傳遞默認的Configuration,那在Job中當然不知道DistributedCacheFile的存在了。
Further
方案三還是不如人意,質心文件是很小的(因為質心總共就沒幾個),用map()函數僅僅是來讀一個質心文件根本就沒有發揮并行的作用,而且在map()中也沒有調用context.write(),所以Mapper中做的事情可以放在Reducer的setup()中來完成,這樣就不需要Mapper了,或者說上面設計的就不是MapReduce程序,跟平常的單線程串行程序是一樣的。sigh
原文來自:博客園(華夏35度)http://www.cnblogs.com/zhangchaoyang 作者:Orisun總結
以上是生活随笔為你收集整理的用Hadoop1.0.3实现KMeans算法的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: hadoop下实现kmeans算法——一
- 下一篇: 数据分类:特征处理