mapreduce文本排序_MapReduce:通过数据密集型文本处理
mapreduce文本排序
自上次發布以來已經有一段時間了,因為我一直忙于Coursera提供的一些課程。 有一些非常有趣的產品,值得一看。 前一段時間,我購買了Jimmy Lin和Chris Dyer的MapReduce數據密集型處理程序 。 本書以偽代碼格式介紹了幾種關鍵的MapReduce算法。 我的目標是采用第3-6章中介紹的算法,并以Hadoop: Tom White的《權威指南》作為參考在Hadoop中實現它們。 我將假設您熟悉Hadoop和MapReduce,并且不介紹任何入門資料。 因此,讓我們從局部聚合開始進入第3章– MapReduce算法設計。本地聚集
在很高的級別上,當Mappers發出數據時,中間結果將寫入磁盤,然后通過網絡發送到Reducers以進行最終處理。 在MapReduce作業的處理中,寫入磁盤然后通過網絡傳輸數據的延遲是一項昂貴的操作。 因此,有理由認為,只要有可能,減少從映射器發送的數據量將提高MapReduce作業的速度。 本地聚合是一種用于減少數據量并提高MapReduce工作效率的技術。 局部聚合不能代替reducers,因為我們需要一種方法來使用來自不同映射器的相同鍵來收集結果。 我們將考慮實現本地聚合的3種方法:
當然,任何優化都需要權衡取舍,我們也會對此進行討論。
為了演示本地聚合,我們將使用hadoop-0.20.2-cdh3u3在MacBookPro上安裝的偽分布式集群上,在Charles Dickens的純文本版本的A Christmas Carol (從Project Gutenberg下載)上運行無處不在的字數統計作業。從Cloudera發行。 我計劃在以后的文章中在具有更實際大小的數據的EC2集群上運行相同的實驗。
合路器
組合器函數是擴展Reducer類的對象。 實際上,對于此處的示例,我們將重復使用單詞計數作業中使用的相同的reduce。 設置MapReduce作業時會指定組合器功能,如下所示:
job.setReducerClass(TokenCountReducer.class);這是化簡器代碼:
public class TokenCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int count = 0;for (IntWritable value : values) {count+= value.get();}context.write(key,new IntWritable(count));} } 組合器的工作就是按照名稱所暗示的那樣進行工作,聚合數據的最終結果是減少了數據在網絡中的混洗,從而提高了效率。 如前所述,請記住,還需要簡化器將結果與來自不同映射器的相同鍵組合在一起。 由于組合器功能是一種優化,因此Hadoop框架無法保證組合器將被調用多少次(如果有的話)。
在Mapper組合選項1中
使用組合器的第一種選擇(圖3.2,第41頁)非常簡單,對我們原始的字數映射器進行了一些修改:
public class PerDocumentMapper extends Mapper<LongWritable, Text, Text, IntWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {IntWritable writableCount = new IntWritable();Text text = new Text();Map<String,Integer> tokenMap = new HashMap<String, Integer>();StringTokenizer tokenizer = new StringTokenizer(value.toString());while(tokenizer.hasMoreElements()){String token = tokenizer.nextToken();Integer count = tokenMap.get(token);if(count == null) count = new Integer(0);count+=1;tokenMap.put(token,count);}Set<String> keys = tokenMap.keySet();for (String s : keys) {text.set(s);writableCount.set(tokenMap.get(s));context.write(text,writableCount);}} } 正如我們在這里看到的,對于遇到的每個單詞,我們不會發出計數為1的單詞,而是使用映射來跟蹤已處理的每個單詞。 然后,在處理完所有標記后,我們遍歷該映射并發出該行中遇到的每個單詞的總數。
在Mapper組合選項2中
in mapper組合的第二個選項(圖3.3,第41頁)與上述示例非常相似,但有兩個區別–創建哈希映射時和發出映射中包含的結果時。 在上面的示例中,創建了一個映射,并在每次調用map方法時將其內容轉儲到網絡上。 在此示例中,我們將使地圖成為實例變量,并將地圖的實例化移動到我們的映射器中的setUp方法。 同樣,在完成對mapper的所有調用并調用cleanUp方法之前,不會將映射的內容發送到reducers。
public class AllDocumentMapper extends Mapper<LongWritable,Text,Text,IntWritable> {private Map<String,Integer> tokenMap;@Overrideprotected void setup(Context context) throws IOException, InterruptedException {tokenMap = new HashMap<String, Integer>();}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer tokenizer = new StringTokenizer(value.toString());while(tokenizer.hasMoreElements()){String token = tokenizer.nextToken();Integer count = tokenMap.get(token);if(count == null) count = new Integer(0);count+=1;tokenMap.put(token,count);}}@Overrideprotected void cleanup(Context context) throws IOException, InterruptedException {IntWritable writableCount = new IntWritable();Text text = new Text();Set<String> keys = tokenMap.keySet();for (String s : keys) {text.set(s);writableCount.set(tokenMap.get(s));context.write(text,writableCount);}} } 從上面的代碼示例中可以看出,在對map方法的所有調用中,映射器都跟蹤唯一字數。 通過跟蹤唯一令牌及其計數,應該大大減少發送給reducer的記錄數量,這反過來又可以改善MapReduce作業的運行時間。 這樣可以達到與使用MapReduce框架提供的Combiner Function選項相同的效果,但是在這種情況下,可以確保將調用組合代碼。 但是這種方法也有一些警告。 在地圖調用之間保持狀態可能會出現問題,并且絕對違反了“地圖”功能的功能精神。 同樣,通過保持所有映射器的狀態,取決于作業中使用的數據,內存可能是另一個要解決的問題。 最終,必須權衡所有權衡以確定最佳方法。
結果
現在,讓我們看一下不同映射器的一些結果。 由于作業是在偽分布式模式下運行的,因此實際的運行時間是無關緊要的,但是我們仍然可以推斷出使用本地聚合會如何影響在實際集群上運行的MapReduce作業的效率。
每個令牌映射器:
12/09/13 21:25:32 INFO mapred.JobClient: Reduce shuffle bytes=366010 12/09/13 21:25:32 INFO mapred.JobClient: Reduce output records=7657 12/09/13 21:25:32 INFO mapred.JobClient: Spilled Records=63118 12/09/13 21:25:32 INFO mapred.JobClient: Map output bytes=302886在Mapper精簡選項1中:
12/09/13 21:28:15 INFO mapred.JobClient: Reduce shuffle bytes=354112 12/09/13 21:28:15 INFO mapred.JobClient: Reduce output records=7657 12/09/13 21:28:15 INFO mapred.JobClient: Spilled Records=60704 12/09/13 21:28:15 INFO mapred.JobClient: Map output bytes=293402在Mapper精簡選項2中:
12/09/13 21:30:49 INFO mapred.JobClient: Reduce shuffle bytes=105885 12/09/13 21:30:49 INFO mapred.JobClient: Reduce output records=7657 12/09/13 21:30:49 INFO mapred.JobClient: Spilled Records=15314 12/09/13 21:30:49 INFO mapred.JobClient: Map output bytes=90565組合器選項:
12/09/13 21:22:18 INFO mapred.JobClient: Reduce shuffle bytes=105885 12/09/13 21:22:18 INFO mapred.JobClient: Reduce output records=7657 12/09/13 21:22:18 INFO mapred.JobClient: Spilled Records=15314 12/09/13 21:22:18 INFO mapred.JobClient: Map output bytes=302886 12/09/13 21:22:18 INFO mapred.JobClient: Combine input records=31559 12/09/13 21:22:18 INFO mapred.JobClient: Combine output records=7657 不出所料,不進行合并的Mapper的結果最差,緊隨其后的是第一個映射器內合并選項(盡管如果在運行字數統計之前清除數據,這些結果本來可以做得更好)。 第二個映射器內合并選項和合并器功能實際上具有相同的結果。 重要的事實是,作為前兩個選項,兩者產生的結果都減少了2/3,減少了混洗字節。 將通過網絡發送到縮減程序的字節數量減少該數量一定會對MapReduce作業的效率產生積極影響。 這里要牢記一點,那就是合并器/映射器合并不能僅在所有MapReduce作業中使用,在這種情況下,字數計數非常適合于這種增強,但這可能并不總是正確的。
結論
正如您所看到的,在尋求提高MapReduce作業的性能時,需要認真考慮使用映射器內合并或Hadoop合并器功能的好處。 至于哪種方法,則要權衡每種方法的權衡。
相關鏈接
- Jimmy Lin和Chris Dyer 使用MapReduce進行的數據密集型處理
- Hadoop: Tom White 的權威指南
- 來自博客的源代碼
- MRUnit用于單元測試Apache Hadoop映射減少工作
- Gutenberg項目提供了大量純文本格式的書籍,非常適合在本地測試Hadoop作業。
祝您編程愉快,別忘了分享!
參考: 《 隨機編碼》博客上的JCG合作伙伴 Bill Bejeck 提供的MapReduce數據密集型文本處理功能 。
翻譯自: https://www.javacodegeeks.com/2012/09/mapreduce-working-through-data.html
mapreduce文本排序
總結
以上是生活随笔為你收集整理的mapreduce文本排序_MapReduce:通过数据密集型文本处理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 新ANTLR 4.6的重要更改
- 下一篇: JHipster入门,第2部分