日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

MapReduce编程实践

發(fā)布時間:2023/12/13 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 MapReduce编程实践 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

一、MapReduce編程思想

學些MapRedcue主要是學習它的編程思想,在MR的編程模型中,主要思想是把對數(shù)據(jù)的運算流程分成map和reduce兩個階段:

Map階段:讀取原始數(shù)據(jù),形成key-value數(shù)據(jù)(map方法)。即,負責數(shù)據(jù)的過濾分發(fā)。

Reduce階段:把map階段的key-value數(shù)據(jù)按照相同的key進行分組聚合(reduce方法)。即,數(shù)據(jù)的計算歸并

它其實是一種數(shù)據(jù)邏輯運算模型,對于這樣的運算模型,有一些成熟的具體軟件實現(xiàn),比如hadoop中的mapreduce框架、spark等,例如在hadoop的mr框架中,對map階段的具體實現(xiàn)是map task,對reduce階段的實現(xiàn)是reduce task。這些框架已經(jīng)為我們提供了一些通用功能的實現(xiàn),讓我們專注于數(shù)據(jù)處理的邏輯,而不考慮分布式的具體實現(xiàn),比如讀取文件、寫文件、數(shù)據(jù)分發(fā)等。我們要做的工作就是在這些編程框架下,來實現(xiàn)我們的具體需求。

下面我們先介紹一些map task和reduce task中的一些具體實現(xiàn):

二、MapTask和ReduceTask

2.1 Map Task

讀數(shù)據(jù):利用InputFormat組件完成數(shù)據(jù)的讀取。

    InputFormat-->TextInputFormat 讀取文本文件的具體實現(xiàn)

            -->SequenceFileInputFormat 讀取Sequence文件

            -->DBInputFormat 讀數(shù)據(jù)庫

處理數(shù)據(jù):這一階段將讀取到的數(shù)據(jù)按照規(guī)則進行處理,生成key-value形式的結果。maptask通過調用用Mapper類的map方法實現(xiàn)對數(shù)據(jù)的處理。

分區(qū):這一階段主要是把map階段產生的key-value數(shù)據(jù)進行分區(qū),以分發(fā)給不同的reduce task來處理,使用的是Partitioner類。maptask通過調用Partitioner類的getPartition()方法來決定如何劃分數(shù)據(jù)給不同的reduce task。

排序:這一階段,對key-value數(shù)據(jù)做排序。maptask會按照key對數(shù)據(jù)進行排序,排序時調用key.compareTo()方法來實現(xiàn)對key-value數(shù)據(jù)排序。

2.2 Reduce Task

讀數(shù)據(jù):這一階段通過http方式從maptask產生的數(shù)據(jù)文件中下載屬于自己的“區(qū)”的數(shù)據(jù)。由于一個區(qū)的數(shù)據(jù)可能來自多個maptask,所以reduce還要把這些分散的數(shù)據(jù)進行合并(歸并排序)

處理數(shù)據(jù):一個reduce task中,處理剛才下載到自己本地的數(shù)據(jù)。通過調用GroupingComparator的compare()方法來判斷文件中的哪些key-value屬于同一組。然后將這一組數(shù)傳給Reducer類的reduce()方法聚合一次。

輸出結果:調用OutputFormat組件將結果key-value數(shù)據(jù)寫出去。

    Outputformat --> TextOutputFormat 寫文本文件(會把一個key-value對寫一行,分隔符為制表符\t

          --> SequenceFileOutputFormat 寫Sequence文件(直接將key-value對象序列化到文件中)

          --> DBOutputFormat?

下面介紹下利用MapReduce框架下的一般編程過程。我們要做的 工作就是把我們對數(shù)據(jù)的處理邏輯加入到框架的業(yè)務邏輯中。我們編寫的MapReduce的job客戶端主要包括三個部分,Mapper 、 Reducer和JobSubmitter,三個部分分別完成MR程序的map邏輯、reduce邏輯以及將我們編寫的job程序提交給集群。下面分別介紹這三個部分如何實現(xiàn)。

三、Hadoop中MapReduce框架下的一般編程步驟

Mapper:創(chuàng)建類,該類要實現(xiàn)Mapper父類,復寫read()方法,在方法內實現(xiàn)當前工程中的map邏輯。

Reducer:創(chuàng)建類,繼承Reducer父類,復寫reduce()方法,方法內實現(xiàn)當前工程中的reduce邏輯。

jobSubmitter:這是job在集群上實際運行的類,主要是通過main方法,封裝job相關參數(shù),并把job提交。jobsubmitter內一般包括以下操作

step1:創(chuàng)建Configuration對象,并通過創(chuàng)建的對象對集群進行配置,同時支持用戶自定義一些變量并配置。這一步有些像我們集群搭建的時候對$haoop_home/etc/hadoop/*下的一些文件進行的配置。

step2:獲得job對象,并通過job對象對我們job運行進行一些配置。例如,設置集群運行的jar文件、設置實際執(zhí)行map和reduce的類等,下面列出一些必要設置和可選設置。

Configuration conf = new Configuration(); //創(chuàng)建集群配置對象。Job job = Job.getInstance(conf);//根據(jù)配置對象獲取一個job客戶端實例。job.setJarByClass(JobSubmitter.class);//設置集群上job執(zhí)行的類job.setMapperClass(FlowCountMapper.class);//設置job執(zhí)行時使用的Mapper類job.setReducerClass(FlowCountReducer.class);//設置job執(zhí)行時使用的Reducer類job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);FileInputFormat.setInputPaths(job, new Path("I:\\hadooptest\\input"));FileOutputFormat.setOutputPath(job, new Path("I:\\hadooptest\\output_pri"));//設置maptask做數(shù)據(jù)分發(fā)時使用的分發(fā)邏輯類,如果不指定,默認使用hashparjob.setPartitionerClass(ProvincePartitioner.class);job.setNumReduceTasks(4);//自定義的分發(fā)邏輯下,可能產生n個分區(qū),所以reducetask的數(shù)量需要是nboolean res = job.waitForCompletion(true);System.exit(res ? 0:-1);

?一般實踐中,可以定義一個類,其中添加main方法對job進行提交,并在其中定義靜態(tài)內部類maper和reduce類。

四、MapReduce框架中的可自定義項

<不小心刪除以后就沒有再補充了,挺重要的。。。。補上吧。。。。>

總結,你要把bean寫到文本嗎?重寫toString方法

要傳輸嗎?實現(xiàn)Writable接口

要排序嗎?實現(xiàn)writablecompareble接口

?

遇到一些復雜的需求,需要我們自定義實現(xiàn)一些組件

2.1 自定義序列化數(shù)據(jù)類型

MapReduce框架為我們提供了基本數(shù)據(jù)類型的序列化類型,如String的Text類型,int的IntWritalbe類型,null的NullWritable類型等。但是有時候會有一些我們自定義的類型需要我們在map和reduce之間進行傳輸或者需要寫到hdfs上。hadoop提供了自己的序列化機制,實現(xiàn)自定義類型的序列化和反序列化將自定義的類實現(xiàn)hadoop提供的Writable接口。

自定義類實現(xiàn)Writable接口,實現(xiàn)readFields(in)write(out)方法。

同時,重寫toString()方法,可以自定義在寫到文件系統(tǒng)時候寫入的字段內容。

* hadoop系統(tǒng)在序列化該類的對象時要調用的方法*/@Overridepublic void write(DataOutput out) throws IOException {out.writeInt(upFlow);out.writeUTF(phone);out.writeInt(dFlow);out.writeInt(amountFlow);}/*** hadoop系統(tǒng)在反序列化該類的對象時要調用的方法*/@Overridepublic void readFields(DataInput in) throws IOException {this.upFlow = in.readInt();this.phone = in.readUTF();this.dFlow = in.readInt();this.amountFlow = in.readInt();}@Overridepublic String toString() {return this.phone + ","+this.upFlow +","+ this.dFlow +"," + this.amountFlow;} View Code

2.2 自定義排序規(guī)則

MapReduce中提供了一個排序機制,map worker 和reduce worker ,都會對數(shù)據(jù)按照key的大小來排序,所以map和reduce階段輸出的記錄都是經(jīng)過排序的(按照key排序)。我們在實踐中有時候需要對計算出來的結果進行排序,比如一個這樣的需求:計算每個頁面訪問次數(shù),并按照訪問量倒序輸出。我們可以在統(tǒng)計了每個頁面訪問次數(shù)之后進行排序,但是我們還可以直接應用MR自身的排序特性,在MR處理的時候按照我們的需求進行排序。這時候就需要我們自定義排序規(guī)則。

自定義類,實現(xiàn)WritableComparable接口,實現(xiàn)其中的compareTo()方法,在其中自定義排序的規(guī)則。同時一般還要實現(xiàn)readFields(in) 和write(out)和toString()方法。

public class PageCount implements WritableComparable<PageCount>{private String page;private int count;public void set(String page, int count) {this.page = page;this.count = count;}public String getPage() {return page;}public void setPage(String page) {this.page = page;}public int getCount() {return count;}public void setCount(int count) {this.count = count;}@Overridepublic int compareTo(PageCount o) {return o.getCount()-this.count==0?this.page.compareTo(o.getPage()):o.getCount()-this.count;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(this.page);out.writeInt(this.count);}@Overridepublic void readFields(DataInput in) throws IOException {this.page= in.readUTF();this.count = in.readInt();}@Overridepublic String toString() {return this.page + "," + this.count;}} View Code

總結:

實現(xiàn)Writable接口,是為了bean能夠傳輸,能夠寫到文件系統(tǒng)中。

實現(xiàn)WritableComparable還為了bean能夠按照你定義的規(guī)則進行排序。

2.2 自定義分區(qū)規(guī)則

我們知道,map計算出來的結果會分發(fā)給不同的reduce任務去進一步處理。MR中提供了一個默認的數(shù)據(jù)分發(fā)規(guī)則,會按照map的輸出中的key的hashcode,然后模除reduce task的數(shù)量,模除的結果就是數(shù)據(jù)的分區(qū)。我們可以通過自定義map數(shù)據(jù)分發(fā)給reduce的規(guī)則,實現(xiàn)把數(shù)據(jù)按照自己的需求記錄到不同的數(shù)據(jù)中。比如實現(xiàn)這樣的需求,有一個通話記錄的文件,按照歸屬地分別存儲數(shù)據(jù)。

?自定義類,繼承Partitioner父類(類的泛型為MapTask的輸出的key,value的類型),重寫?getPartition(<>key, <>value, int numPartitions)?方法,在其中自定義分區(qū)的規(guī)則,方法返回計算出來的分區(qū)數(shù)。MapTask每處理一行數(shù)據(jù)都會調用getPartition方法。因此最好不要在方法中創(chuàng)建可以給很多數(shù)據(jù)行共同使用的對象。在jobsubmitter中,設置maptask在做數(shù)據(jù)分區(qū)時使用的分區(qū)邏輯類,?job.setPartitonerClass(your.class)?,同時注意設置reduceTask的任務數(shù)量為我們在分區(qū)邏輯中定義的規(guī)則下回產生的分區(qū)數(shù)量,?job.setNumReduceTasks(numOfPartition);?

/*** 本類是提供給MapTask用的* MapTask通過這個類的getPartition方法,來計算它所產生的每一對kv數(shù)據(jù)該分發(fā)給哪一個reduce task* @author ThinkPad**/ public class ProvincePartitioner extends Partitioner<Text, FlowBean>{static HashMap<String,Integer> codeMap = new HashMap<>();static{codeMap.put("135", 0);codeMap.put("136", 1);codeMap.put("137", 2);codeMap.put("138", 3);codeMap.put("139", 4);}@Overridepublic int getPartition(Text key, FlowBean value, int numPartitions) {Integer code = codeMap.get(key.toString().substring(0, 3));return code==null?5:code;}} Partitioner public class JobSubmitter {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(JobSubmitter.class);job.setMapperClass(FlowCountMapper.class);job.setReducerClass(FlowCountReducer.class);// 設置參數(shù):maptask在做數(shù)據(jù)分區(qū)時,用哪個分區(qū)邏輯類 (如果不指定,它會用默認的HashPartitioner)job.setPartitionerClass(ProvincePartitioner.class);// 由于我們的ProvincePartitioner可能會產生6種分區(qū)號,所以,需要有6個reduce task來接收job.setNumReduceTasks(6);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);FileInputFormat.setInputPaths(job, new Path("F:\\mrdata\\flow\\input"));FileOutputFormat.setOutputPath(job, new Path("F:\\mrdata\\flow\\province-output"));job.waitForCompletion(true);}} JobSubmitter

2.3 自定義分組規(guī)則

MapTask每調用一次map就會產生一個k-v,多次調用后,生成多個k-v,具有相同key的的記錄稱為一組,會存入一個partition中,注意一個patition可以包含多個組。

?

一個ReduceTask處理一個partition,在處理的時候 ,按照key的順序進行。調用一次reduce會聚合一組數(shù)據(jù),就是reduce方法中傳入的一個Itetor。為了確認一個分區(qū)中的兩條記錄是不是同一個組,會調用一個工具類GroupingCompatator的compare(01,02)方法,用來判斷兩個key是否相同,如果兩個key相等,則為同一組。利用這樣的機制,我們可以自定義一個分組規(guī)則。

自定義類,實現(xiàn)?WritableComparator?類實現(xiàn)?compare?方法,在其中告知MapTask如何判斷兩個 記錄是不是屬于同一個組。調用父類構造函數(shù),指定比較的類。

public class OrderIdGroupingComparator extends WritableComparator {pbulic OrderIdGroupingComparator(){//通過構造函數(shù)指定要比較的類super(OrderBean.class, true);// }@Overridepublic int compare(WritableComparable a, WritableComparable b) {//參數(shù)中將來會傳入我們自定義的繼承了WritableComparable的bean,把a、b向下轉型為我們自定義類型的bean,才能比較a和bOrderBean o1 = (OrderBean)a;OrderBean o2 = (OrderBean)b;return o1.getOrderId().compareTo(o2.getOrderID);//id相同就是同一組 } } View Code

在jobSubmiter中指定分組規(guī)則,

job.setGroupingComparatorClass(OrderIdGroupingComparator.class);

注意:關于區(qū)分分區(qū)和分組:

分區(qū)比分組的范圍更加大。分區(qū)是指,在map task結束之后,中間結果數(shù)據(jù)會被分給哪些reduce task,而分組是指,同一個分區(qū)中(即一個reduce task處理的數(shù)據(jù)中)數(shù)據(jù)的分組。在默認的計算分區(qū)的方法中,不同key的hash code對reduce task取模計算出來的結果可能相同,這樣的數(shù)據(jù)會被分到同一個分區(qū);這一個分區(qū)中的key的haashcode不同,這樣就在一個區(qū)中分了不同組。

那么什么時候使用分區(qū),什么時候使用分組呢?

再如在計算每個訂單中總金額最大的3筆中的案例中,可以考慮進行倒序排序,然后取前三;按照id進行倒序排序嗎?不現(xiàn)實,因為訂單id太多,不可能啟動那么多的reduce task。那么就要把多個訂單的數(shù)據(jù)存儲到第一個分區(qū)中,同時保證同一個訂單的數(shù)據(jù)全部在一個分區(qū)中,這時候,就需要自定義分區(qū)規(guī)則(保證同一訂單中的數(shù)據(jù)在同一個分區(qū)),但是又要分組排序,所以這時候就需要自定義分組規(guī)則(保證該分區(qū)中同一訂單在一組,不同訂單在不同組)

2.3自定義MapTask的局部聚合規(guī)則

默認情況下,map計算的結果逐條保存到磁盤中,傳輸給reduce之后也是分條的記錄,這樣可能造成一個問題就是如果某個分區(qū)下的數(shù)據(jù)較多,而有的分區(qū)下數(shù)據(jù)較少,就導致出現(xiàn)reduce task之間任務量差距較大,即出現(xiàn)數(shù)據(jù)傾斜的情況。一個解決辦法是在形成map結果文件的時候進行一次局部聚合。

使用Combiner組件可以實現(xiàn)在每個MapTask中對數(shù)據(jù)進行一次局部聚合。這個局部聚合的邏輯其實和Reducer的邏輯是一樣的,都是對map計算出的kv數(shù)據(jù)進行聚合,只不過如果是maptask來調用我們定義的Reducer實現(xiàn)類,則聚合的是當前這個maptask運行的結果,如果是reducetask來調用我們定義的Reducer實現(xiàn)類,則聚合的是全部maptask的運行結果。

定義類局部聚合類XXCombationer,繼承Rducer復寫reduce方法,在方法中實現(xiàn)具體的聚合邏輯;在jobSubmitter的job中設置mapTask端的局部聚合類為我們定義的類?job.setCombinerClass(XXCombiner.class)?。

?

2.4 控制輸入輸出格式。。。

?

?

五、MR程序的調試、執(zhí)行方式

?

5.1 提交到linux運行

?

5.2 Win本地執(zhí)行

?

轉載于:https://www.cnblogs.com/Jing-Wang/p/10886890.html

創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎勵來咯,堅持創(chuàng)作打卡瓜分現(xiàn)金大獎

總結

以上是生活随笔為你收集整理的MapReduce编程实践的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內容還不錯,歡迎將生活随笔推薦給好友。