日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Mapreduce的排序、全排序以及二次排序

發(fā)布時(shí)間:2025/3/11 编程问答 20 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Mapreduce的排序、全排序以及二次排序 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

一:背景

Hadoop中雖然有自動(dòng)排序和分組,由于自帶的排序是按照Key進(jìn)行排序的,有些時(shí)候,我們希望同時(shí)對Key和Value進(jìn)行排序。自帶的排序功能就無法滿足我們了,還好Hadoop提供了一些組件可以讓開發(fā)人員進(jìn)行二次排序。

?

二:技術(shù)實(shí)現(xiàn)

我們先來看案例需求

#需求1: 首先按照第一列數(shù)字升序排列,當(dāng)?shù)谝涣袛?shù)字相同時(shí),第二列數(shù)字也升序排列(列之間用制表符\t隔開)

?

  • 3 3

  • 3 2

  • 3 1

  • 2 2

  • 2 1

  • 1 1

  • MapReduce計(jì)算之后的結(jié)果應(yīng)該是:

    ?

    ?

  • 1 1

  • 2 1

  • 2 2

  • 3 1

  • 3 2

  • 3 3


  • #需求2:第一列不相等時(shí),第一列按降序排列,當(dāng)?shù)谝涣邢嗟葧r(shí),第二列按升序排列

    ?

    ?

  • 3 3

  • 3 2

  • 3 1

  • 2 2

  • 2 1

  • 1 1

  • MapReduce計(jì)算之后的結(jié)果應(yīng)該是:

  • 3 1

  • 3 2

  • 3 3

  • 2 1

  • 2 2

  • 1 1



  • 下面是實(shí)現(xiàn)代碼,實(shí)現(xiàn)兩種需求的關(guān)鍵是compareTo()方法的實(shí)現(xiàn)不同:

    ?

    public class SecondSortTest {// 定義輸入路徑private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/data";// 定義輸出路徑private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";public static void main(String[] args) {try {// 創(chuàng)建配置信息Configuration conf = new Configuration();/**********************************************///對Map端輸出進(jìn)行壓縮//conf.setBoolean("mapred.compress.map.output", true);//設(shè)置map端輸出使用的壓縮類//conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);//對reduce端輸出進(jìn)行壓縮//conf.setBoolean("mapred.output.compress", true);//設(shè)置reduce端輸出使用的壓縮類//conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);// 添加配置文件(我們可以在編程的時(shí)候動(dòng)態(tài)配置信息,而不需要手動(dòng)去改變集群)/** conf.addResource("classpath://hadoop/core-site.xml");* conf.addResource("classpath://hadoop/hdfs-site.xml");* conf.addResource("classpath://hadoop/hdfs-site.xml");*/// 創(chuàng)建文件系統(tǒng)FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);// 如果輸出目錄存在,我們就刪除if (fileSystem.exists(new Path(OUT_PATH))) {fileSystem.delete(new Path(OUT_PATH), true);}// 創(chuàng)建任務(wù)Job job = new Job(conf, SecondSortTest.class.getName());//1.1 設(shè)置輸入目錄和設(shè)置輸入數(shù)據(jù)格式化的類FileInputFormat.setInputPaths(job, INPUT_PATH);job.setInputFormatClass(TextInputFormat.class);//1.2 設(shè)置自定義Mapper類和設(shè)置map函數(shù)輸出數(shù)據(jù)的key和value的類型job.setMapperClass(MySecondSortMapper.class);job.setMapOutputKeyClass(CombineKey.class);job.setMapOutputValueClass(LongWritable.class);//1.3 設(shè)置分區(qū)和reduce數(shù)量(reduce的數(shù)量,和分區(qū)的數(shù)量對應(yīng),因?yàn)榉謪^(qū)為一個(gè),所以reduce的數(shù)量也是一個(gè))job.setPartitionerClass(HashPartitioner.class);job.setNumReduceTasks(1);//1.4 排序、分組//1.5 歸約//2.1 Shuffle把數(shù)據(jù)從Map端拷貝到Reduce端。//2.2 指定Reducer類和輸出key和value的類型job.setReducerClass(MySecondSortReducer.class);job.setOutputKeyClass(LongWritable.class);job.setOutputValueClass(LongWritable.class);//2.3 指定輸出的路徑和設(shè)置輸出的格式化類FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));job.setOutputFormatClass(TextOutputFormat.class);// 提交作業(yè) 退出System.exit(job.waitForCompletion(true) ? 0 : 1);} catch (Exception e) {e.printStackTrace();}}public static class MySecondSortMapper extends Mapper<LongWritable, Text, CombineKey, LongWritable>{//定義聯(lián)合的keyprivate CombineKey combineKey = new CombineKey();protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, CombineKey, LongWritable>.Context context) throws IOException,InterruptedException {//對輸入的value進(jìn)行切分String[] splits = value.toString().split("\t");//設(shè)置聯(lián)合的keycombineKey.setComKey(Long.parseLong(splits[0]));combineKey.setComVal(Long.parseLong(splits[1]));//通過context寫出去context.write(combineKey, new LongWritable(Long.parseLong(splits[1])));}}public static class MySecondSortReducer extends Reducer<CombineKey, LongWritable, LongWritable, LongWritable>{@Overrideprotected void reduce(CombineKey combineKey, Iterable<LongWritable> values, Reducer<CombineKey, LongWritable, LongWritable, LongWritable>.Context context)throws IOException, InterruptedException {//因?yàn)檩斎氲腃ombineKey已經(jīng)排好序了,所有我們只要獲取其中的兩個(gè)成員變量寫出去就可以了。values在這個(gè)例子中沒有什么作用context.write(new LongWritable(combineKey.getComKey()), new LongWritable(combineKey.getComVal()));}}}/*** 重新組合成一個(gè)key,實(shí)現(xiàn)二次排序* @author 廖*民* time : 2015年1月18日下午7:27:52* @version*/class CombineKey implements WritableComparable<CombineKey>{public long comKey;public long comVal;//必須提供無參構(gòu)造函數(shù),否則hadoop反射機(jī)制會(huì)出錯(cuò)public CombineKey() {}//有參構(gòu)造函數(shù)public CombineKey(long comKey, long comVal) {this.comKey = comKey;this.comVal = comVal;}public long getComKey() {return comKey;}public void setComKey(long comKey) {this.comKey = comKey;}public long getComVal() {return comVal;}public void setComVal(long comVal) {this.comVal = comVal;}public void write(DataOutput out) throws IOException {out.writeLong(comKey);out.writeLong(comVal);}public void readFields(DataInput in) throws IOException {this.comKey = in.readLong();this.comVal = in.readLong();}/*** 這個(gè)方法一定要實(shí)現(xiàn)* java里面排序默認(rèn)是小的放在前面,即返回負(fù)數(shù)的放在前面,這樣就是所謂的升序排列* 我們在下面的方法中直接返回一個(gè)差值,也就相當(dāng)于會(huì)升序排列。* 如果我們要實(shí)現(xiàn)降序排列,那么我們就可以返回一個(gè)正數(shù)*//*public int compareTo(CombineKey o) {//第一列不相同時(shí)按升序排列,當(dāng)?shù)谝涣邢嗤瑫r(shí)第二列按升序排列l(wèi)ong minus = this.comKey - o.comKey;//如果第一個(gè)值不相等時(shí),我們就先對第一列進(jìn)行排序if (minus != 0){return (int) minus;}//如果第一列相等時(shí),我們就對第二列進(jìn)行排序return (int) (this.comVal - o.comVal);}*//*** 為了實(shí)現(xiàn)第一列不同時(shí)按降序排序,第一列相同時(shí)第二列按升序排列* 第一列:降序,當(dāng)?shù)谝涣邢嗤瑫r(shí),第二列:升序* 為了實(shí)現(xiàn)降序,*/public int compareTo(CombineKey o) {//如果a-b<0即,a小于b,按這樣 的思路應(yīng)該是升序排列,我們可以返回一個(gè)相反數(shù)使其降序long tmp = this.comKey - o.comKey;//如果第一個(gè)值不相等時(shí),我們就先對第一列進(jìn)行排序if (tmp != 0){return (int) (-tmp);}//如果第一列相等時(shí),我們就對第二列進(jìn)行升序排列return (int) (this.comVal - o.comVal);}@Overridepublic int hashCode() {final int prime = 31;int result = 1;result = prime * result + (int) (comKey ^ (comKey >>> 32));return result;}@Overridepublic boolean equals(Object obj) {if (this == obj)return true;if (obj == null)return false;if (getClass() != obj.getClass())return false;CombineKey other = (CombineKey) obj;if (comKey != other.comKey)return false;return true;}}

    ?

    總結(jié)

    以上是生活随笔為你收集整理的Mapreduce的排序、全排序以及二次排序的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。