日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

辅助排序和Mapreduce整体流程

發布時間:2025/3/15 编程问答 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 辅助排序和Mapreduce整体流程 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、輔助排序

  需求:先有一個訂單數據文件,包含了訂單id、商品id、商品價格,要求將訂單id正序,商品價格倒序,且生成結果文件個數為訂單id的數量,每個結果文件中只要一條該訂單最貴商品的數據。

  思路:1.封裝訂單類OrderBean,實現WritableComparable接口;

     2.自定義Mapper類,確定輸入輸出數據類型,寫業務邏輯;

     3.自定義分區,根據不同的訂單id返回不同的分區值;

     4.自定義Reducer類;

     5.輔助排序類OrderGroupingComparator繼承WritableComparator類,并定義無參構成方法、重寫compare方法;

     6.書寫Driver類;

  代碼如下:

/*** @author: PrincessHug* @date: 2019/3/25, 21:42* @Blog: https://www.cnblogs.com/HelloBigTable/*/ public class OrderBean implements WritableComparable<OrderBean> {private int orderId;private double orderPrice;public OrderBean() {}public OrderBean(int orderId, double orderPrice) {this.orderId = orderId;this.orderPrice = orderPrice;}public int getOrderId() {return orderId;}public void setOrderId(int orderId) {this.orderId = orderId;}public double getOrderPrice() {return orderPrice;}public void setOrderPrice(double orderPrice) {this.orderPrice = orderPrice;}@Overridepublic String toString() {return orderId + "\t" + orderPrice;}@Overridepublic int compareTo(OrderBean o) {int rs ;if (this.orderId > o.getOrderId()){rs = 1;}else if (this.orderId < o.getOrderId()){rs = -1;}else {rs = (this.orderPrice > o.getOrderPrice()) ? -1:1;}return rs;}@Overridepublic void write(DataOutput out) throws IOException {out.writeInt(orderId);out.writeDouble(orderPrice);}@Overridepublic void readFields(DataInput in) throws IOException {orderId = in.readInt();orderPrice = in.readDouble();} }public class OrderMapper extends Mapper<LongWritable, Text,OrderBean, NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//獲取數據String line = value.toString();//切割數據String[] fields = line.split("\t");//封裝數據int orderId = Integer.parseInt(fields[0]);double orderPrice = Double.parseDouble(fields[2]);OrderBean orderBean = new OrderBean(orderId, orderPrice);//發送數據context.write(orderBean,NullWritable.get());} }public class OrderPartitioner extends Partitioner<OrderBean, NullWritable> {@Overridepublic int getPartition(OrderBean orderBean, NullWritable nullWritable, int i) {//構造參數中i的值為reducetask的個數return (orderBean.getOrderId() & Integer.MAX_VALUE ) % i;} }public class OrderReducer extends Reducer<OrderBean, NullWritable,OrderBean,NullWritable> {@Overrideprotected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {context.write(key,NullWritable.get());} }public class OrderGrouptingComparator extends WritableComparator {//必須使用super調用父類的構造方法來定義對比的類為OrderBeanprotected OrderGrouptingComparator(){super(OrderBean.class,true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {OrderBean aBean = (OrderBean)a;OrderBean bBean = (OrderBean)b;int rs ;if (aBean.getOrderId() > bBean.getOrderId()){rs = 1;}else if (aBean.getOrderId() < bBean.getOrderId()){rs = -1;}else {rs = 0;}return rs;} }public class OrderDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//配置信息,Job對象Configuration conf = new Configuration();Job job = Job.getInstance(conf);//執行類job.setJarByClass(OrderBean.class);//設置Mapper、Reducer類job.setMapperClass(OrderMapper.class);job.setReducerClass(OrderReducer.class);//設置Mapper輸出數據類型job.setMapOutputKeyClass(OrderBean.class);job.setMapOutputValueClass(NullWritable.class);//設置Reducer輸出數據類型job.setOutputKeyClass(OrderBean.class);job.setOutputValueClass(NullWritable.class);//設置輔助排序job.setGroupingComparatorClass(OrderGrouptingComparator.class);//設置分區類job.setPartitionerClass(OrderPartitioner.class);//設置reducetask數量job.setNumReduceTasks(3);//設置文件輸入輸出流FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\order\\in"));FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\order\\out"));//提交任務if (job.waitForCompletion(true)){System.out.println("運行完成!");}else {System.out.println("運行失敗!");}} }

  由于這是敲了很多次的代碼,沒有加太多注釋,請諒解!

?

二、Mapreduce整體的流程

  1.有一塊200M的文本文件,首先將待處理的數據提交客戶端;

  2.客戶端會向Yarn平臺提交切片信息,然后Yarn計算出所需要的maptask的數量為2;

  3.程序默認使用FileInputFormat的TextInputFormat方法將文件數據讀到maptask;

  4.maptask運行業務邏輯,然后將數據通過InputOutputContext寫入到環形緩沖區;

  5.環形緩沖區其實是內存開辟的一塊空間,就是內存,當環形緩沖區內數據達到默認大小100M的80%時,發生溢寫;

  6.溢寫出的數據會進行多次的分區排序(shuffle機制,下一個隨筆詳細解釋)

  7.分區排序后的數據塊可以選擇進行Combiner合并,然后寫入本地磁盤;

  8.reducetask等maptask完全運行完畢后,開始從磁盤中讀取maptask產出寫出的數據,然后進行合并文件,歸并排序(這時就是進行上面輔助排序的時候);

  9.Reducer一次讀取一組數據,然后使用默認的TextOutputFormat方法將數據寫出到結果文件。

?

轉載于:https://www.cnblogs.com/HelloBigTable/p/10617937.html

總結

以上是生活随笔為你收集整理的辅助排序和Mapreduce整体流程的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。