日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Hadoop入门(六)Mapreduce

發(fā)布時間:2023/12/3 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hadoop入门(六)Mapreduce 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

一、Mapreduce概述

MapReduce是一個編程模型,用以進行大數(shù)據(jù)量的計算

?

二、Hadoop MapReduce

(1)MapReduce是什么

Hadoop MapReduce是一個軟件框架,基于該框架能夠容易地編寫應(yīng)用程序,這些應(yīng)用程序能夠運行在由上千個商用機器組成的大集群上,并以一種可靠的,具有容錯能力的方式并行地處理上TB級別的海量數(shù)據(jù)集

Mapreduce的特點:

  • 軟件框架
  • 并行處理
  • 可靠且容錯
  • 大規(guī)模集群
  • 海量數(shù)據(jù)集
    ?
  • (2)MapReduce做什么

    MapReduce的思想就是“分而治之”

    1)Mapper負責(zé)“分”
    把復(fù)雜的任務(wù)分解為若干個“簡單的任務(wù)”來處理。“簡單的任務(wù)”包含三層含義:

  • 數(shù)據(jù)或計算的規(guī)模相對原任務(wù)要大大縮小
  • 就近計算原則,任務(wù)會分配到存放著所需數(shù)據(jù)的節(jié)點上進行計算
  • 這些小任務(wù)可以并行計算彼此間幾乎沒有依賴關(guān)系
  • 2)Reducer負責(zé)對map階段的結(jié)果進行匯總
    至于需要多少個Reducer,可以根據(jù)具體問題,
    通過在mapred-site.xml配置文件里設(shè)置參數(shù)mapred.reduce.tasks的值,缺省值為1。
    ?

    三、MapReduce工作機制

    作業(yè)執(zhí)行涉及4個獨立的實體

  • 客戶端,用來提交MapReduce作業(yè)
  • JobTracker,用來協(xié)調(diào)作業(yè)的運行
  • TaskTracker,用來處理作業(yè)劃分后的任務(wù)
  • HDFS,用來在其它實體間共享作業(yè)文件
  • mapreduce作業(yè)工作流程圖

    Mapreduce作業(yè)的4個對象

  • 客戶端(client):編寫mapreduce程序,配置作業(yè),提交作業(yè),這就是程序員完成的工作;
  • JobTracker:初始化作業(yè),分配作業(yè),與TaskTracker通信,協(xié)調(diào)整個作業(yè)的執(zhí)行;
  • TaskTracker:保持與JobTracker的通信,在分配的數(shù)據(jù)片段上執(zhí)行Map或Reduce任務(wù),TaskTracker和JobTracker的不同有個很重要的方面,就是在執(zhí)行任務(wù)時候TaskTracker可以有n多個,JobTracker則只會有一個(JobTracker只能有一個就和hdfs里namenode一樣存在單點故障,我會在后面的mapreduce的相關(guān)問題里講到這個問題的)
  • Hdfs:保存作業(yè)的數(shù)據(jù)、配置信息等等,最后的結(jié)果也是保存在hdfs上面
  • ?

    mapreduce運行步驟1

     首先是客戶端要編寫好mapreduce程序,配置好mapreduce的作業(yè)也就是job,
    接下來就是提交job了,提交job是提交到JobTracker上的,這個時候JobTracker就會構(gòu)建這個job,具體就是分配一個新的job任務(wù)的ID值

    接下來它會做檢查操作,這個檢查就是確定輸出目錄是否存在,如果存在那么job就不能正常運行下去,JobTracker會拋出錯誤給客戶端,接下來還要檢查輸入目錄是否存在,如果不存在同樣拋出錯誤,如果存在JobTracker會根據(jù)輸入計算輸入分片(Input Split),如果分片計算不出來也會拋出錯誤,至于輸入分片我后面會做講解的,這些都做好了JobTracker就會配置Job需要的資源了。

    分配好資源后,JobTracker就會初始化作業(yè),初始化主要做的是將Job放入一個內(nèi)部的隊列,讓配置好的作業(yè)調(diào)度器能調(diào)度到這個作業(yè),作業(yè)調(diào)度器會初始化這個job,初始化就是創(chuàng)建一個正在運行的job對象(封裝任務(wù)和記錄信息),以便JobTracker跟蹤job的狀態(tài)和進程。

    mapreduce運行步驟2

    初始化完畢后,作業(yè)調(diào)度器會獲取輸入分片信息(input split),每個分片創(chuàng)建一個map任務(wù)。

    接下來就是任務(wù)分配了,這個時候tasktracker會運行一個簡單的循環(huán)機制定期發(fā)送心跳給jobtracker,心跳間隔是5秒,程序員可以配置這個時間,心跳就是jobtracker和tasktracker溝通的橋梁,通過心跳,jobtracker可以監(jiān)控tasktracker是否存活,也可以獲取tasktracker處理的狀態(tài)和問題,同時tasktracker也可以通過心跳里的返回值獲取jobtracker給它的操作指令。

    任務(wù)分配好后就是執(zhí)行任務(wù)了。在任務(wù)執(zhí)行時候jobtracker可以通過心跳機制監(jiān)控tasktracker的狀態(tài)和進度,同時也能計算出整個job的狀態(tài)和進度,而tasktracker也可以本地監(jiān)控自己的狀態(tài)和進度。當(dāng)jobtracker獲得了最后一個完成指定任務(wù)的tasktracker操作成功的通知時候,jobtracker會把整個job狀態(tài)置為成功,然后當(dāng)客戶端查詢job運行狀態(tài)時候(注意:這個是異步操作),客戶端會查到j(luò)ob完成的通知的。如果job中途失敗,mapreduce也會有相應(yīng)機制處理,一般而言如果不是程序員程序本身有bug,mapreduce錯誤處理機制都能保證提交的job能正常完成。

    ?

    四、mapreduce運行機制

    在Hadoop中,一個MapReduce作業(yè)會把輸入的數(shù)據(jù)集切分為若干獨立的數(shù)據(jù)塊,由Map任務(wù)以完全并行的方式處理。框架會對Map的輸出先進行排序,然后把結(jié)果輸入給Reduce任務(wù)。

    作業(yè)的輸入和輸出都會被存儲在文件系統(tǒng)中,整個框架負責(zé)任務(wù)的調(diào)度和監(jiān)控,以及重新執(zhí)行已經(jīng)關(guān)閉的任務(wù)。MapReduce框架和分布式文件系統(tǒng)是運行在一組相同的節(jié)點,計算節(jié)點和存儲節(jié)點都是在一起的

    (1) MapReduce的輸入輸出

    一個MapReduce作業(yè)的輸入和輸出類型: 會有三組<key,value>鍵值對類型的存在

    ?

    (2)Mapreduce作業(yè)的處理流程

    ?

    按照時間順序包括:
    輸入分片(input split)
    map階段
    shuffle階段:map shuffle(partition、sort/group、combiner、partition? merge)和reduce?shuffle(copy、merge(sort/group))
    reduce階段

    1)輸入分片(input split)

    在進行map計算之前,mapreduce會根據(jù)輸入文件計算輸入分片(input split),每個輸入分片(input split)針對一個map任務(wù)
    輸入分片(input split)存儲的并非數(shù)據(jù)本身,而是一個分片長度和一個記錄數(shù)據(jù)的位置的數(shù)組,輸入分片(input split)往往和hdfs的block(塊)關(guān)系很密切

    ?? ?假如我們設(shè)定hdfs的塊的大小是64mb,如果我們輸入有三個文件,大小分別是3mb、65mb和127mb,那么mapreduce會把3mb文件分為一個輸入分片(input split),65mb則是兩個輸入分片(input split)而127mb也是兩個輸入分片(input split)
    ? ? 即我們?nèi)绻趍ap計算前做輸入分片調(diào)整,例如合并小文件,那么就會有5個map任務(wù)將執(zhí)行,而且每個map執(zhí)行的數(shù)據(jù)大小不均,這個也是mapreduce優(yōu)化計算的一個關(guān)鍵點。

    2)map階段
    程序員編寫好的map函數(shù)了,因此map函數(shù)效率相對好控制,而且一般map操作都是本地化操作也就是在數(shù)據(jù)存儲節(jié)點上進行;

    3)combiner階段

    combiner階段是程序員可以選擇的,combiner其實也是一種reduce操作,因此我們看見WordCount類里是用reduce進行加載的。

    Combiner是一個本地化的reduce操作,它是map運算的后續(xù)操作,主要是在map計算出中間文件前做一個簡單的合并重復(fù)key值的操作,例如我們對文件里的單詞頻率做統(tǒng)計,map計算時候如果碰到一個hadoop的單詞就會記錄為1,但是這篇文章里hadoop可能會出現(xiàn)n多次,那么map輸出文件冗余就會很多,因此在reduce計算前對相同的key做一個合并操作,那么文件會變小,這樣就提高了寬帶的傳輸效率,畢竟hadoop計算力寬帶資源往往是計算的瓶頸也是最為寶貴的資源,但是combiner操作是有風(fēng)險的,使用它的原則是combiner的輸入不會影響到reduce計算的最終輸入,
    例如:

    如果計算只是求總數(shù),最大值,最小值可以使用combiner,但是做平均值計算使用combiner的話,最終的reduce計算結(jié)果就會出錯。

    4)shuffle階段

    將map的輸出作為reduce的輸入的過程就是shuffle了。

    5)reduce階段

    和map函數(shù)一樣也是程序員編寫的,最終結(jié)果是存儲在hdfs上的。

    ?

    五、Mapreduce框架的相關(guān)問題

    jobtracker的單點故障

    jobtracker和hdfs的namenode一樣也存在單點故障,單點故障一直是hadoop被人詬病的大問題,為什么hadoop的做的文件系統(tǒng)和mapreduce計算框架都是高容錯的,但是最重要的管理節(jié)點的故障機制卻如此不好,我認為主要是namenode和jobtracker在實際運行中都是在內(nèi)存操作,而做到內(nèi)存的容錯就比較復(fù)雜了,只有當(dāng)內(nèi)存數(shù)據(jù)被持久化后容錯才好做,namenode和jobtracker都可以備份自己持久化的文件,但是這個持久化都會有延遲,因此真的出故障,任然不能整體恢復(fù),另外hadoop框架里包含zookeeper框架,zookeeper可以結(jié)合jobtracker,用幾臺機器同時部署jobtracker,保證一臺出故障,有一臺馬上能補充上,不過這種方式也沒法恢復(fù)正在跑的mapreduce任務(wù)。

    ?

    六、Mapreduce的單詞計數(shù)實例

    public class WordCount {public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {public void map(Object key, Text value, Context context) throws IOException, InterruptedException {}}public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {}}public static void main(String[] args) throws Exception {//…?} }

    (1)Map

    ?public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{??private final static IntWritable one = new IntWritable(1);private Text word = new Text();???????public void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());while (itr.hasMoreTokens()) {word.set(itr.nextToken());context.write(word, one);}}}

    map的方法

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {…}
    這里有三個參數(shù),前面兩個Object key, Text value就是輸入的key和value,第三個參數(shù)Context context這是可以記錄輸入的key和value

    例如:context.write(word, one);此外context還會記錄map運算的狀態(tài)。

    (2)reduce

    ?public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}}

    reduce函數(shù)的方法

    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {…}

      reduce函數(shù)的輸入也是一個key/value的形式,
    不過它的value是一個迭代器的形式Iterable<IntWritable> values,

    也就是說reduce的輸入是一個key對應(yīng)一組的值的value,reduce也有context和map的context作用一致。

    (3)main函數(shù)

    public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: wordcount <in> <out>");System.exit(2);}Job job = new Job(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);} Configuration conf = new Configuration();

    運行mapreduce程序前都要初始化Configuration,該類主要是讀取mapreduce系統(tǒng)配置信息,這些信息包括hdfs還有mapreduce,也就是安裝hadoop時候的配置文件例如:core-site.xml、hdfs-site.xml和mapred-site.xml等等文件里的信息,有些童鞋不理解為啥要這么做,這個是沒有深入思考mapreduce計算框架造成,我們程序員開發(fā)mapreduce時候只是在填空,在map函數(shù)和reduce函數(shù)里編寫實際進行的業(yè)務(wù)邏輯,其它的工作都是交給mapreduce框架自己操作的,但是至少我們要告訴它怎么操作啊,比如hdfs在哪里啊,mapreduce的jobstracker在哪里啊,而這些信息就在conf包下的配置文件里。

    ? ? String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.err.println("Usage: wordcount <in> <out>");System.exit(2);}

    If的語句好理解,就是運行WordCount程序時候一定是兩個參數(shù),如果不是就會報錯退出。至于第一句里的GenericOptionsParser類,它是用來解釋常用hadoop命令,并根據(jù)需要為Configuration對象設(shè)置相應(yīng)的值,其實平時開發(fā)里我們不太常用它,而是讓類實現(xiàn)Tool接口,然后再main函數(shù)里使用ToolRunner運行程序,而ToolRunner內(nèi)部會調(diào)用GenericOptionsParser。

    ? ? Job job = new Job(conf, "word count");job.setJarByClass(WordCount.class);job.setMapperClass(TokenizerMapper.class);job.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);

    第一行就是在構(gòu)建一個job,在mapreduce框架里一個mapreduce任務(wù)也叫mapreduce作業(yè)也叫做一個mapreduce的job,而具體的map和reduce運算就是task了,這里我們構(gòu)建一個job,構(gòu)建時候有兩個參數(shù),一個是conf這個就不累述了,一個是這個job的名稱。
      
    第二行就是裝載程序員編寫好的計算程序,例如我們的程序類名就是WordCount了。這里我要做下糾正,雖然我們編寫mapreduce程序只需要實現(xiàn)map函數(shù)和reduce函數(shù),但是實際開發(fā)我們要實現(xiàn)三個類,第三個類是為了配置mapreduce如何運行map和reduce函數(shù),準確的說就是構(gòu)建一個mapreduce能執(zhí)行的job了,例如WordCount類。
      
    第三行和第五行就是裝載map函數(shù)和reduce函數(shù)實現(xiàn)類了,這里多了個第四行,這個是裝載Combiner類,這個我后面講mapreduce運行機制時候會講述,其實本例去掉第四行也沒有關(guān)系,但是使用了第四行理論上運行效率會更好。

    ? ? job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);

    這個是定義輸出的key/value的類型,也就是最終存儲在hdfs上結(jié)果文件的key/value的類型。?

    ? FileInputFormat.addInputPath(job, new Path(otherArgs[0]));FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);

    第一行就是構(gòu)建輸入的數(shù)據(jù)文件,
    第二行是構(gòu)建輸出的數(shù)據(jù)文件,
    最后一行如果job運行成功了,我們的程序就會正常退出。FileInputFormat和FileOutputFormat可以設(shè)置輸入輸出文件路徑,
    mapreduce計算時候,輸入文件必須存在,要不直Mr任務(wù)直接退出。輸出一般是一個文件夾,而且該文件夾不能存在。

    ?

    ?

    總結(jié)

    以上是生活随笔為你收集整理的Hadoop入门(六)Mapreduce的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。