日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Hadoop中Partition解析

發(fā)布時間:2025/3/17 编程问答 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hadoop中Partition解析 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

1.解析Partition

Map的結(jié)果,會通過partition分發(fā)到Reducer上,Reducer做完Reduce操作后,通過OutputFormat,進行輸出,下面我們就來分析參與這個過程的類。

Mapper的結(jié)果,可能送到Combiner做合并,Combiner在系統(tǒng)中并沒有自己的基類,而是用Reducer作為Combiner的基類,他們對外的功能是一樣的,只是使用的位置和使用時的上下文不太一樣而已。Mapper最終處理的鍵值對<key, value>,是需要送到Reducer去合并的,合并的時候,有相同key的鍵/值對會送到同一個Reducer那。哪個key到哪個Reducer的分配過程,是由Partitioner規(guī)定的。它只有一個方法,

[java]?view plaincopyprint?
  • getPartition(Text?key,?Text?value,?int?numPartitions)??
  • 輸入是Map的結(jié)果對<key, value>和Reducer的數(shù)目,輸出則是分配的Reducer(整數(shù)編號)。就是指定Mappr輸出的鍵值對到哪一個reducer上去。系統(tǒng)缺省的Partitioner是HashPartitioner,它以key的Hash值對Reducer的數(shù)目取模,得到對應(yīng)的Reducer。這樣保證如果有相同的key值,肯定被分配到同一個reducre上。如果有N個reducer,編號就為0,1,2,3……(N-1)。

    Reducer是所有用戶定制Reducer類的基類,和Mapper類似,它也有setup,reduce,cleanup和run方法,其中setup和cleanup含義和Mapper相同,reduce是真正合并Mapper結(jié)果的地方,它的輸入是key和這個key對應(yīng)的所有value的一個迭代器,同時還包括Reducer的上下文。系統(tǒng)中定義了兩個非常簡單的Reducer,IntSumReducer和LongSumReducer,分別用于對整形/長整型的value求和。

    Reduce的結(jié)果,通過Reducer.Context的方法collect輸出到文件中,和輸入類似,Hadoop引入了OutputFormat。OutputFormat依賴兩個輔助接口:RecordWriter和OutputCommitter,來處理輸出。RecordWriter提供了write方法,用于輸出<key, value>和close方法,用于關(guān)閉對應(yīng)的輸出。OutputCommitter提供了一系列方法,用戶通過實現(xiàn)這些方法,可以定制OutputFormat生存期某些階段需要的特殊操作。我們在TaskInputOutputContext中討論過這些方法(明顯,TaskInputOutputContext是OutputFormat和Reducer間的橋梁)。OutputFormat和RecordWriter分別對應(yīng)著InputFormat和RecordReader,系統(tǒng)提供了空輸出NullOutputFormat(什么結(jié)果都不輸出,NullOutputFormat.RecordWriter只是示例,系統(tǒng)中沒有定義),LazyOutputFormat(沒在類圖中出現(xiàn),不分析),FilterOutputFormat(不分析)和基于文件FileOutputFormat的SequenceFileOutputFormat和TextOutputFormat輸出。

    基于文件的輸出FileOutputFormat利用了一些配置項配合工作,包括:
    mapred.output.compress:是否壓縮;
    mapred.output.compression.codec:壓縮方法;
    mapred.output.dir:輸出路徑;
    mapred.work.output.dir:輸出工作路徑。
    FileOutputFormat還依賴于FileOutputCommitter,通過FileOutputCommitter提供一些和Job,Task相關(guān)的臨時文件管理功能。如FileOutputCommitter的setupJob,會在輸出路徑下創(chuàng)建一個名為_temporary的臨時目錄,cleanupJob則會刪除這個目錄。
    SequenceFileOutputFormat輸出和TextOutputFormat輸出分別對應(yīng)輸入的SequenceFileInputFormat和TextInputFormat。

    2.代碼實例

    [java]?view plaincopyprint?
  • package?org.apache.hadoop.examples;??
  • ??
  • import?java.io.IOException;??
  • import?java.util.*;??
  • import?org.apache.hadoop.fs.Path;??
  • import?org.apache.hadoop.conf.*;??
  • import?org.apache.hadoop.io.*;??
  • import?org.apache.hadoop.mapred.*;??
  • import?org.apache.hadoop.util.*;??
  • ??
  • /**?
  • ?*?輸入文本,以tab間隔?
  • ?*?kaka????1???????28?
  • ?*?hua?????0???????26?
  • ?*?chao????1?
  • ?*?tao?????1???????22?
  • ?*?mao?????0???????29??????22?
  • ?*?*/??
  • ??
  • //Partitioner函數(shù)的使用??
  • ??
  • public?class?MyPartitioner?{??
  • ????//?Map函數(shù)??
  • ????public?static?class?MyMap?extends?MapReduceBase?implements??
  • ????????????Mapper<LongWritable,?Text,?Text,?Text>?{??
  • ????????public?void?map(LongWritable?key,?Text?value,??
  • ????????????????OutputCollector<Text,?Text>?output,?Reporter?reporter)??
  • ????????????????throws?IOException?{??
  • ????????????String[]?arr_value?=?value.toString().split("\t");??
  • ????????????//測試輸出??
  • //??????????for(int?i=0;i<arr_value.length;i++)??
  • //??????????{??
  • //??????????????System.out.print(arr_value[i]+"\t");??
  • //??????????}??
  • //??????????System.out.print(arr_value.length);??
  • //??????????System.out.println();?????????
  • ????????????Text?word1?=?new?Text();??
  • ????????????Text?word2?=?new?Text();??
  • ????????????if?(arr_value.length?>?3)?{??
  • ????????????????word1.set("long");??
  • ????????????????word2.set(value);??
  • ????????????}?else?if?(arr_value.length?<?3)?{??
  • ????????????????word1.set("short");??
  • ????????????????word2.set(value);??
  • ????????????}?else?{??
  • ????????????????word1.set("right");??
  • ????????????????word2.set(value);??
  • ????????????}??
  • ????????????output.collect(word1,?word2);??
  • ????????}??
  • ????}??
  • ??????
  • ????public?static?class?MyReduce?extends?MapReduceBase?implements??
  • ????????????Reducer<Text,?Text,?Text,?Text>?{??
  • ????????public?void?reduce(Text?key,?Iterator<Text>?values,??
  • ????????????????OutputCollector<Text,?Text>?output,?Reporter?reporter)??
  • ????????????????throws?IOException?{??
  • ????????????int?sum?=?0;??
  • ????????????System.out.println(key);??
  • ????????????while?(values.hasNext())?{??
  • ????????????????output.collect(key,?new?Text(values.next().getBytes()));??????
  • ????????????}??
  • ????????}??
  • ????}??
  • ??
  • ????//?接口Partitioner繼承JobConfigurable,所以這里有兩個override方法??
  • ????public?static?class?MyPartitionerPar?implements?Partitioner<Text,?Text>?{??
  • ????????/**?
  • ?????????*?getPartition()方法的?
  • ?????????*?輸入?yún)?shù):鍵/值對<key,value>與reducer數(shù)量numPartitions?
  • ?????????*?輸出參數(shù):分配的Reducer編號,這里是result?
  • ?????????*?*/??
  • ????????@Override??
  • ????????public?int?getPartition(Text?key,?Text?value,?int?numPartitions)?{??
  • ????????????//?TODO?Auto-generated?method?stub??
  • ????????????int?result?=?0;??
  • ????????????System.out.println("numPartitions--"?+?numPartitions);??
  • ????????????if?(key.toString().equals("long"))?{??
  • ????????????????result?=?0?%?numPartitions;??
  • ????????????}?else?if?(key.toString().equals("short"))?{??
  • ????????????????result?=?1?%?numPartitions;??
  • ????????????}?else?if?(key.toString().equals("right"))?{??
  • ????????????????result?=?2?%?numPartitions;??
  • ????????????}??
  • ????????????System.out.println("result--"?+?result);??
  • ????????????return?result;??
  • ????????}??
  • ??????????
  • ????????@Override??
  • ????????public?void?configure(JobConf?arg0)???
  • ????????{??
  • ????????????//?TODO?Auto-generated?method?stub??
  • ????????}??
  • ????}??
  • ??
  • ????//輸入?yún)?shù):/home/hadoop/input/PartitionerExample?/home/hadoop/output/Partitioner??
  • ????public?static?void?main(String[]?args)?throws?Exception?{??
  • ????????JobConf?conf?=?new?JobConf(MyPartitioner.class);??
  • ????????conf.setJobName("MyPartitioner");??
  • ??????????
  • ????????//控制reducer數(shù)量,因為要分3個區(qū),所以這里設(shè)定了3個reducer??
  • ????????conf.setNumReduceTasks(3);??
  • ??
  • ????????conf.setMapOutputKeyClass(Text.class);??
  • ????????conf.setMapOutputValueClass(Text.class);??
  • ??
  • ????????//設(shè)定分區(qū)類??
  • ????????conf.setPartitionerClass(MyPartitionerPar.class);??
  • ??
  • ????????conf.setOutputKeyClass(Text.class);??
  • ????????conf.setOutputValueClass(Text.class);??
  • ??
  • ????????//設(shè)定mapper和reducer類??
  • ????????conf.setMapperClass(MyMap.class);??
  • ????????conf.setReducerClass(MyReduce.class);??
  • ??
  • ????????conf.setInputFormat(TextInputFormat.class);??
  • ????????conf.setOutputFormat(TextOutputFormat.class);??
  • ??
  • ????????FileInputFormat.setInputPaths(conf,?new?Path(args[0]));??
  • ????????FileOutputFormat.setOutputPath(conf,?new?Path(args[1]));??
  • ??
  • ????????JobClient.runJob(conf);??
  • ????}??
  • }??



  • 本文轉(zhuǎn)自xwdreamer博客園博客,原文鏈接:http://www.cnblogs.com/xwdreamer/archive/2011/10/27/2296943.html,如需轉(zhuǎn)載請自行聯(lián)系原作者

    總結(jié)

    以上是生活随笔為你收集整理的Hadoop中Partition解析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。