日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

使用secondary sort实现数据关联 完整示例代码

發布時間:2025/3/21 编程问答 17 豆豆
生活随笔 收集整理的這篇文章主要介紹了 使用secondary sort实现数据关联 完整示例代码 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

需要進行兩種數據的關聯, 費了好大勁, 最后才使用secondary sort解決

?

其中有幾個地方需要注意:

1 readFields 和 writeFields 的寫法需要完全一致, 否則寫入和讀取的數據就會錯亂。 (在此耽擱良久)

2 要override 成員函數hashCode(), 否則“相同”key的數據不一定在唯一的機器上

3 要定義groupbyComparator

?

在reduce操作之前,

------1 會對數據進行partition: RecordKey.hashCode, 該函數會被partitioner調用, 使得數據分配到不同的reduce機器上。 【因此對于我們的需求需要override】

------2 會使用groupbyComparator對數據進行分組:Comparator.compare()

------3 會排序: RecordKey.compareTo()

------4 要設置Comparator

?

?

ps: 自定義了partitioner,Job.setPartitionerClass(class), 但是沒起作用。 【0.19.1??? 0.19.2】, 就是在此耽擱良久, 后來才嘗試使用secondary sort

?

package com.qq.dma;public class testsecondarysort extends Configured implements Tool {public static class RecordKey implements WritableComparable<RecordKey>{String md5;String type;@Overridepublic void readFields(DataInput arg0) throws IOException {// TODO Auto-generated method stubString s = arg0.readLine();String t[] = s.split("/t");System.out.println("t.length: " + t.length + "<br>");System.out.println("content: " + s + "<br>");if(t.length == 2){md5 = t[0];type = t[1];}else{md5 = "0";type = "0";}}@Overridepublic void write(DataOutput arg0) throws IOException {// TODO Auto-generated method stubarg0.writeBytes(md5 + "/t" + type + "/n");}@Overridepublic int compareTo(RecordKey o) {System.out.println("recordkey compareto called <br>");// TODO Auto-generated method stubint r = md5.compareTo(o.md5);if(r != 0){if(r < 0)return -1;else if(r > 0)return 1;}else{ //按第2字段逆序r = type.compareTo(o.type);if(r < 0)return 1;else if(r > 0)return -1;}return 0;}public int hashCode(){return md5.hashCode();}}public class PkFkComparator extends WritableComparator {public PkFkComparator(){super(RecordKey.class);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {RecordKey key1 = (RecordKey)a;RecordKey key2 = (RecordKey)b;System.out.println("Comparator call compare");int r = key1.md5.compareTo(key2.md5);if(r == 0)return 0;else if(r > 0)return 1;else return -1;}}public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, RecordKey, Text> {StringBuffer newresult = new StringBuffer("");public void configure(JobConf job) {System.out.println("configure invoked/n");super.configure(job);{}}public void map(LongWritable arg0, Text arg1, OutputCollector<RecordKey, Text> arg2, Reporter arg3) throws IOException {newresult.setLength(0);String all[] = arg1.toString().split("/t");//if(all.length == 3){RecordKey k = new RecordKey();k.md5 = all[0];k.type = all[1];arg2.collect(k, new Text(all[2]));System.out.println("map: " + arg1.toString() + "<br>");}}}public static class Reduce extends MapReduceBase implements Reducer<RecordKey, Text, Text, Text> {private final String FieldSeperator = "/t";public static int reducetimes = 0;String md5 = "";//Overridepublic void close() {}//Overridepublic void configure(JobConf job) {super.configure(job);}public void reduce(RecordKey arg1, Iterator<Text> arg2, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { System.out.println( "the " + ++reducetimes + " times <br>"); while(arg2.hasNext()){output.collect(new Text(arg1.md5 + "/t" + arg1.type), arg2.next());}}}static int printUsage() {ToolRunner.printGenericCommandUsage(System.out);return -1;}public int run(String[] args) throws Exception {JobConf conf = new JobConf(getConf(), testsecondarysort.class);conf.setOutputKeyClass(Text.class);conf.setOutputValueClass(Text.class);conf.setMapOutputKeyClass(RecordKey.class);conf.setOutputValueGroupingComparator(PkFkComparator.class);conf.setOutputValueClass(Text.class);conf.setMapperClass(Map.class);conf.setReducerClass(Reduce.class);conf.setInputFormat(TextInputFormat.class);conf.setOutputFormat(TextOutputFormat.class);List<String> other_args = new ArrayList<String>();for (int i = 0; i < args.length; ++i) {try {if ("-m".equals(args[i])) {conf.setNumMapTasks(Integer.parseInt(args[++i]));} else if ("-r".equals(args[i])) {conf.setNumReduceTasks(Integer.parseInt(args[++i]));} else {other_args.add(args[i]);}} catch (NumberFormatException except) {return printUsage();} catch (ArrayIndexOutOfBoundsException except) {return printUsage();}}FileInputFormat.setInputPaths(conf, other_args.get(0));FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));JobClient.runJob(conf);return 0;}public static void main(String[] args) throws Exception {int res = ToolRunner.run(new Configuration(), new testsecondarysort(), args);System.exit(res);} }

總結

以上是生活随笔為你收集整理的使用secondary sort实现数据关联 完整示例代码的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 天天干天天做 | 黑料av在线| 精射女上司 | 99久久婷婷国产精品综合 | 久久精品香蕉视频 | 国产一区视频在线 | 久久av一区二区三 | 丰满肉肉bbwwbbww| 天天操天天干天天干 | 操人视频免费 | 成 人 黄 色 片 在线播放 | 精品视频三区 | 性xxxx视频 | 丁香婷婷网 | 麻豆亚洲一区 | 日本免费a级片 | 亚洲xx网站 | av不卡在线免费观看 | 9.1在线观看免费 | 一二三区在线视频 | 91污网站 | 我和岳m愉情xxxⅹ视频 | 青草国产视频 | 国产精品国产精品国产 | 欧美激情图| 永久在线观看 | aa视频在线 | 香蕉大人久久国产成人av | www.成人在线观看 | 波多野结衣视频免费 | 福利精品在线 | 九一福利视频 | 国产乡下妇女做爰视频 | 日本韩国欧美一区二区 | 一级免费视频 | 性生交大片免费看女人按摩 | 在线免费观看麻豆 | 国产成人精品免费视频 | 国产福利99 | 日韩毛片免费看 | 人人人超碰| 亚洲人午夜射精精品日韩 | 无码gogo大胆啪啪艺术 | 成人免费大片黄在线播放 | 伊人久久视频 | 成人黄色一区二区三区 | 国产乱子伦视频一区二区三区 | 操她视频在线观看 | 久久久香蕉网 | 久久无毛 | 秋霞电影网一区二区 | 怡红院综合网 | 欧美做受高潮 | 青草综合 | 8x8ⅹ8成人免费视频观看 | aa片在线观看视频在线播放 | 男女性杂交内射妇女bbwxz | 欧美福利在线视频 | 欧美国产日韩在线 | 我不卡一区二区 | 中文在线字幕观看 | 激情片网站 | 日韩国产欧美在线观看 | 日韩午夜激情视频 | 妺妺窝人体色www在线小说 | 国产免费片 | 午夜在线观看一区 | 久久人人爽人人爽人人片av免费 | 特级a级片| 亚洲乱码国产乱码精品精98午夜 | 欧美一区二区三区在线免费观看 | 亚洲精品女人 | 老熟女毛茸茸 | 黑鬼大战白妞高潮喷白浆 | 91精品国产乱码久久久张津瑜 | 东方av正在进入 | 久久久久久久偷拍 | 国产欧美日韩精品在线 | 97超碰福利 | 国产成人精品无码播放 | 亚洲熟女乱综合一区二区 | 自拍偷拍中文字幕 | 天堂中文av| 百合sm惩罚室羞辱调教 | 91精品国产综合久久精品 | 国产 第1190页| 色妇网 | 日本特黄特色aaa大片免费 | 日韩第一页在线 | 欧美一卡二卡在线观看 | 日韩欧美偷拍 | 欧美精品网站 | 亚洲成人av综合 | 91网在线看 | 国产嘿咻视频 | 91人人澡人人爽人人精品 | 国产成人精品久久久 | 在线看一区二区 | 激情一区二区三区 |