日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

MapReduce概述及工作流程

發布時間:2023/12/14 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 MapReduce概述及工作流程 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

內容

mapreduce原語(獨創)

mapreduce工作流程(重點)

MR作業提交流程(重點)

YARN RM-HA搭建(熟練)

運行自帶的wordcount(了解)

動手寫wordcount(熟練)

MapReduce原語

hadoop MapReduce框架可以讓你的應用在集群中

可靠地

容錯地

并行

處理TB級別的數據

1024TB=1PB? 1024PB=1EB? 1024EB=1ZB

MapReduce原語

?

相同key的鍵值對為一調用一次reduce方法,方法內迭代這一組數據進行計算

分組比較器

YARN:資源管理框架

???????? ResourceManager:一個 ??主

???????? NodeManager:很多,每個DataNode上有一個 ?從

???????? Container(容器):CPU、內存

公司為了營業,掙錢租老王家的寫字樓

公司相當于MR作業

MR任務相當于公司員工,員工干活,相當于MR的任務運行。

員工在辦公室干活,任務在容器運行。

每個容器同時運行一個任務

???????? 客人提出訂幾間房

?

1、一個ResourceManager主節點

2、每個DataNode上一個NodeManager從節點

3、每個運行于MapReduce的程序有一個MRAppMaster

公司的運作流程

?

1、MapReduce將輸入的數據集邏輯切片 split

2、map任務以并行方式處理切片數據

3、框架對map輸出排序,然后將數據發送給reduce

4、MapReduce的輸入輸出數據存在于同一個文件系統(HDFS)

5、框架負責任務調度、任務監控和失敗任務的重新執行

容錯地、可靠地、并行計算

1、MapReduce處理鍵值對形式的很多鍵值對輸入,生成鍵值對形式的很多鍵值對輸出

2、框架會對鍵和值序列化,因此鍵類型和值類型需要實現Writable接口??蚣軙?span style="color:#FF0000;">對鍵進行排序,因此必須實現WritableComparable接口。

3、map輸出鍵值對類型和reduce鍵值對輸入類型一致

4、map的輸入鍵值對類型和輸出鍵值對類型一般不一致

5、reduce的輸入鍵值對類型和輸出鍵值對類型一般不一致

盡管hadoop框架是java開發的,MapReduce應用不一定得java開發。

hadoop streaming允許用戶使用可執行文件的方式提供mapper和reducer,創建和執行作業。

hadoop pipes是一個跟SWIG兼容的C++ API,用于開發MapReduce應用(不基于JNI)。

mapreduce工作流程

為什么叫MapReduce:MapTask & ReduceTask

1、每個block會有map任務

2、block切分為切片,每個切片對應一個map任務,默認一個block一個切片,一個map

3、map默認按行讀取切片數據,組成鍵值對<當前行字節偏移量, "讀到的行字符串">

4、map函數對該鍵值對進行計算,輸出若干鍵值對。<key, value, partition>

???????? partition指定該鍵值對由哪個reducer進行處理

5、map輸出的kvp寫到環形緩沖區,環形緩沖區默認100MB,閾值80%,當環緩達到80%就向磁盤溢寫小文件,該小文件首先按照分區號排序,相同分區號的按key進行排序。

6、默認如果落磁盤的小文件達到了3個,則進行歸并,歸并的大文件也是按分區號排序,相同分區號按照key進行排序。只是一個歸并。

7、如果map任務處理完了,它的輸出被下載到reducer所在主機

???????? 按照HTTP GET的方式下載到reducer:

???????? reducer發送HTTP GET請求到mapper主機下載數據,該過程是洗牌shuffle

8、每個map任務都要經歷運行結束洗牌的過程

9、可以設置combinClass,先在map端對數據進行一個壓縮,比如10w個<hello,1>壓縮為1個<hello, 10w>通過網絡IO洗牌,肯定要快很多。一般情況下,combineClass就是一個reducerClass。

combinerClass的設置要求數據算法滿足結合律。

交換律

???????? 1+2=2+1

結合律????

1+2+3=(1+2)+3=1+(2+3)

????????

map1?? 5/3

map2?? 7/6?????? reduce:? 5/3+7/6+8/11 =?? reduce:(5+7+8)/(3+6+11)

map3?? 8/11

map任務結束

reeduce任務開始

9、等所有map任務都運行結束,并且洗牌結束,每個reducer獲取到它自己應得的所有數據,此時開始reducer處理過程

10、如果有時間,reduce會對洗牌獲取的數據進行歸并落磁盤

???????? 如果沒有時間,也歸并,只是可能不落磁盤,直接交給reduce方法進行迭代處理了。

???????? 洗牌獲取到的數據也可能不落磁盤,此時歸并的鍵值對來源可能是磁盤的和內存的一個混合。

11、reduce按照key進行分組,每個分組調用一次reduce方法,該方法迭代計算,將結果寫到HDFS輸出。

當一個map任務計算結束,所有的reduce需要使用http get請求獲取各自分區編號的數據,當所有map任務結束后,開始reduce計算階段。

blk按照設置進行切片,一個切片對應一個map任務,map按行讀取切片內容,以鍵值對的形式發給map方法(<"偏移量", “zifuchuan”>)

當map對當前簡直對計算完成,要寫到環形緩沖區,在寫之前要計算該鍵值對的分區編號

默認情況下,key的hash值對reduce個數取模。

當環形緩沖區大小達到到80%的時候,需要向磁盤溢寫數據,在溢寫的時候需要對鍵值對按照分區排序,分區內按照key的字典序排序(快排排序)

溢寫的小文件如果達到3個,則進行歸并,歸并為大文件,大文件也是按照分區排序,分區內按照key的字典序排序。

當一個map任務處理完它的切片的數據,此時所有的reduce任務到該map的機器以http get請求獲取各自編號分區的數據,下載到reduce本地

reduce獲取到map的數據后,如果有時間,也會進行歸并

并不能保證此時所有的map都計算結束了。

只有當所有的map計算結束,同時reduce獲取到所有的數據之后,才開始進行reduce計算。

按照原語,相同key的鍵值對為一組,調用一次reduce方法,方法內迭代這組數據計算,結果輸出到HDFS中。

mapreduce是一套分布式計算的流程、框架

數單詞游戲:

?

?

getFileBlockLocations(new Path(), offset, len);

?

reduce從map端拉取數據的過程稱為洗牌shuffle

通過網絡拉取,慢!!!

要對map端數據進行壓縮:

Combiner:

<hello, 1>? 1000萬個 ????<hello, 1000萬>

但是不能保證combiner什么時候都能用:

需要計算滿足結合律:(A+B)+C=A+(B+C)

job.setCombinerClass(MyReducer.class)

8/9

4/7?????? REDUCE: (8+4+2)/(9+7+11)

2/11

也不能保證combiner什么時候都用得上:

環形緩沖區小文件歸并,進行combiner,如果不歸并,沒有combiner過程。

reducer通過HTTP按照分區號獲取map輸出文件的數據。map端有一個HTTP服務處理該reducer的HTTP請求。該HTTP服務最大線程數由mapreduce.shuffle.Max.threads屬性指定。這個屬性指定nodemanager的線程數而不是對map任務指定線程數(該數字在多個不同的任務之間共享),因為nodemanager上有可能運行了好幾個map任務。默認值是0,表示最大線程數是服務器處理器核心數的兩倍。

map輸出文件位于運行map任務的本地磁盤。一個reduce任務需要從集群中多個map任務獲取指定分區的數據。多個map任務有可能是在不同時間完成的,每當一個map任務運行完,reduce就從該map任務獲取指定分區數據。reduce任務會以多線程的方式從多個map任務并行獲取指定分區數據。默認線程數是5,可以通過mapreduce.reduce.shuffle.parallelcopies屬性指定。

reducer拷貝map的輸出如果很小,則放在內存中(mapreduce.reduce.shuffle.input.buffer.percent指定堆空間百分比)否則拷貝到磁盤。當內存緩沖區數據大小達到閾值(mapreduce.reduce.shuffle.merge.percent

)或map輸出文件個數達到閾值(mapreduce.reduce.merge.inmem.threshold

),就發生文件合并溢寫到磁盤上。如果指定combiner,此處也會進行combine。

二次排序(先了解)

在map階段按照key對鍵值對進行排序,對值不排序。如果相對value進行排序,就需要二次排序。

需求:查找每年的最高氣溫

數據格式:年份為key,每天的氣溫是value

所謂二次排序:

1、新的key應該是輸入的key和value的組合

2、按照復合key進行比較排序

3、分區比較器分組比較器只對復合key中的原生key進行分區和分組

總結

Map:

1、根據業務需求處理數據并映射為KV模型

2、并行分布式

3、計算向數據移動

Reduce:

1、數據全量/分量加工

2、Reducer中可以包含不同的key???? 分區的范圍大于分組

3、相同分區的Key匯聚到一個Reducer中

4、相同的Key調用一次reduce方法

5、排序和比較實現key的匯聚

K,V使用自定義數據類型 ????????? MyKey:WritableComparable

MyValue:Writable

1、節省開發成本,提高程序自由度

2、框架會對鍵和值序列化,因此鍵類型和值類型需要實現Writable接口。

3、框架會對鍵進行排序,因此必須實現WritableComparable接口。

作業:

  • mapreduce處理過程,自己的語言寫
  • java API操作HDFS
  • MR作業提交流程

    YARN

    ResourceManager管理集群中所有的資源

    通過NodeManager管理

    NodeManager通過Container管理資源

    ???????? Container包裝資源:CPU/內存/IO

    MapReduce作業

    AppMaster? 調度

    ???????? RM申請資源

    MapTask

    ReduceTask

    客戶端:

    ???????? RM客戶端:用于申請資源

    ???????? AM客戶端:用于跟AppMaster交互

    YARN:解耦資源與計算

    ResourceManager

    主,核心

    集群節點資源管理

    NodeManager

    與RM匯報資源

    管理Container生命周期

    計算框架中的資源都以Container表示

    Container:【由節點NM管理,CPU,MEM,I/O大小,啟動命令】

    ???????? 內存:1024MB

    ???????? CPU1個虛擬核心 vcore

    默認NodeManager啟動線程監控Container大小,超出申請資源額度,kill

    支持Linux內核的Cgroup

    MR :

    AppMaster? 擁有 RM客戶端

    作業為單位,避免單點故障,負載到不同的節點

    創建Task,需要和RM申請資源Container

    Task-Container

    ???????? Map任務

    ???????? Reduce任務

    Client:

    RM-Client:請求資源創建AM

    AM-Client:與AM交互

    YARN:Yet Another Resource Negotiator;

    Hadoop 2.0新引入的資源管理系統,直接從MRv1演化而來的;

    核心思想:將MRv1中JobTracker的資源管理和任務調度兩個功能分開,分別由ResourceManagerApplicationMaster進程實現

    ResourceManager:負責整個集群的資源管理和調度

    ApplicationMaster:負責應用程序相關的事務,比如任務調度、任務監控和容錯等

    YARN的引入,使得多個計算框架可運行在一個集群中

    每個應用程序對應一個ApplicationMaster

    目前多個計算框架可以運行在YARN上,比如MapReduce、Spark、Storm等

    MapReduce On YARN:MRv2

    將MapReduce作業直接運行在YARN上,而不是由JobTracker和TaskTracker構建的MRv1系統中

    基本功能模塊

    YARN:負責資源管理和調度

    MRAppMaster:負責任務切分、任務調度、任務監控和容錯等

    MapTask/ReduceTask:任務驅動引擎,與MRv1一致

    每個MapRduce作業對應一個MRAppMaster

    MRAppMaster任務調度

    YARN將資源分配給MRAppMaster

    MRAppMaster進一步將資源分配給內部的任務

    MRAppMaster容錯

    失敗后,由YARN重新啟動

    任務失敗后,MRAppMaster重新申請資源

    ResourceManager掛怎么辦?RM-HA

    流程

    1、客戶端,提交MapReduce作業

    2、YARN的資源管理器(Resource Manager),協調集群中計算資源的分配

    3、YARN的節點管理器(Node Manager),啟動并監控集群中的計算容器

    4、MapReduce的Application Master,協調MapReduce作業中任務的運行。Application Master和MapReduce任務運行于容器中,這些容器由resourcemanager調度,由nodemanager管理。

    5、分布式文件系統(一般是HDFS),在組件之間共享作業數據。

    Job對象的submit方法創建了一個內部的JobSubmitter實例并調用該實例的submitJobInternal方法。一旦提交了作業,waitForCompletion方法每秒鐘輪詢作業的執行進度,如果進度發生了變化,則向控制臺報告進度。當作業成功完成,展示作業計數器的數據。否則展示作業失敗的錯誤日志信息。

    客戶端:JobSubmitter實現的作業提交的過程有如下幾個步驟:

    1、向resourcemanager申請一個新的application ID,用于MapReduce作業的ID

    2、檢查作業的輸出。如果沒有指定輸出或者輸出路徑已經存在,則不提交作業,MapReduce程序拋異常

    3、計算作業的輸入切片。如果不能計算切片(比如輸入路徑不存在等),不提交作業,MR程序拋異常。

    4、拷貝執行作業需要的資源到共享文件系統的以作業ID命名的目錄中,這些資源包括作業的jar配置文件計算好的輸入切片。作業的jar包有一個很高的副本數量(mapreduce.client.submit.file.replication指定,默認值是10),這樣當nodemanager如果運行作業中的任務,會有很多副本可以訪問。

    5、調用resourcemanager的submitApplication方法提交作業

    1、YARN為請求分配一個容器,resourcemanager通過容器所在節點上的nodemanager在該容器中啟動application master進程。

    2、MapReduce作業的application master是一個java app,主入口類是MRAppMaster。從HDFS抽取客戶端計算好的輸入切片,為每一個切片創建一個map任務對象,以及一定數量的reduce任務對象.

    application master會為作業中所有的map任務以及reduce任務向resourcemanager請求容器。為map任務的請求會首先進行并且相對于reduce任務請求有更高的優先級。當map任務完成率達到了5%之后才會為reduce任務發送容器請求。

    appmasterhdfs抽取客戶端上傳的信息,計算好map對象和reduce對象,首先向resourcemanagermap任務申請資源,當map任務完成5%之后為reduce任務申請資源

    reduce任務可以運行于集群中的任意位置,而map任務會有本地讀取數據的限制。移動計算而不是數據。數據本地。次之為機架本地。

    請求會指定每個任務需要的內存和cpu資源。默認情況下為每個map任務或reduce任務分配1024MB的內存和一個虛擬核心。這些值對于每個作業都是可以配置的:mapreduce.map.memory.mb,

    mapreduce.reduce.memory.mb

    mapreduce.map.cpu.vcores

    以及mapreduce.reduce.cpu.vcores。

    一旦resourcemanager在一個節點上的一個容器中為一個任務分配了資源,application masternodemanager通信,啟動容器。任務通過一個java app來執行,該app的主入口類是YarnChild。在它可以開始任務的執行之前,它要本地化任務需要的資源,包括jar包,配置文件,以及分布式緩存中存儲的其他共享文件。最后,它開始運行map任務或者reduce任務。

    當作業的最后一個任務完成并通知application master,AppMaster就更改作業的狀態為”successfully”。作業就打印信息告知客戶端,客戶端waitForCompletion方法返回。此時也會在控制臺打印作業的統計信息和計數器的信息

    作業完成,application master所在容器和任務所在容器銷毀工作狀態(中間的輸出結果刪除)。作業的信息被作業歷史服務器存檔以備以后查詢使用。

    YARN RM-HA搭建

    mapred-site.xml

    local/classic/yarn

    指定mr作業運行的框架:要么本地運行,要么使用MRv1,要么使用yarn

    <property>

    ?? <name>mapreduce.framework.name</name>

    ?? <value>yarn</value>

    </property>

    yarn-site.xml

    <!-- 讓yarn的容器支持mapreduce的洗牌,開啟shuffle服務 -->

    <property>

    ?? <name>yarn.nodemanager.aux-services</name>

    ?? <value>mapreduce_shuffle</value>

    </property>

    <!-- 啟用resourcemanager的HA -->

    <property>

    ?? <name>yarn.resourcemanager.ha.enabled</name>

    ?? <value>true</value>

    ?</property>

    <!-- 給兩個resourcemanager組成的HA命名 -->

    ?<property>

    ?? <name>yarn.resourcemanager.cluster-id</name>

    ?? <value>cluster1</value>

    ?</property>

    <!-- RM HA的兩個resourcemanager的名字 -->

    ?<property>

    ?? <name>yarn.resourcemanager.ha.rm-ids</name>

    ?? <value>rm1,rm2</value>

    ?</property>

    <!-- 指定rm1的reourcemanager進程所在的主機名稱 -->

    ?<property>

    ?? <name>yarn.resourcemanager.hostname.rm1</name>

    ?? <value>node3</value>

    ?</property>

    <!-- 指定rm2的reourcemanager進程所在的主機名稱 -->

    ?<property>

    ?? <name>yarn.resourcemanager.hostname.rm2</name>

    ?? <value>node4</value>

    ?</property>

    <!-- 指定zookeeper集群的各個節點地址和端口號 -->

    ?<property>

    ?? <name>yarn.resourcemanager.zk-address</name>

    ?? <value>node2:2181,node3:2181,node4:2181</value>

    ?</property>

    將配置文件在四臺服務器同步

    scp?? node[234]:`pwd`

    首先啟動HDFS

    start-ha.sh

    #!/bin/bash

    for node in node2 node3 node4

    do

    ?? ssh $node "source /etc/profile; zkServer.sh start"

    done

    sleep 1

    start-dfs.sh

    echo "--------------node1-jps----------------"

    jps

    for node in node2 node3 node4

    do

    ? echo "---------------$node-jps-------------------"

    ? ssh $node "source /etc/profile; jps"

    done

    在node3或node4上執行命令:

    start-yarn.sh

    在node4或者node3上執行命令:

    yarn-daemon.sh ?start ?resourcemanager

    停止:

    ???????? 在node3或者node4上執行:

    ???????? stop-yarn.sh

    ???????? 在node4或者node3上執行:

    ???????? yarn-deamon.sh? stop? resourcemanager

    http://node3:8088

    http://node4:8088

    訪問resourcemanager的web頁面

    運行自帶的wordcount

    運行的命令:

    cd $HADOOP_HOME

    cd share/hadoop/mapreduce

    hadoop jar hadoop-mapreduce-examples-2.6.5.jar wordcount? /input? /output

    *input:是hdfs文件系統中數據所在的目錄

    *ouput:是hdfs中不存在的目錄,mr程序運行的結果會輸出到該目錄

    輸出目錄內容:

    -rw-r--r--?? 3 root supergroup????????? 0 2017-07-02 02:49 /mr/test/output/_SUCCESS

    -rw-r--r--?? 3 root supergroup???????? 49 2017-07-02 02:49 /mr/test/output/part-r-00000

    /_SUCCESS:是信號/標志文件

    /part-r-00000:是reduce輸出的數據文件

    r:reduce的意思,00000是對應的reduce編號,多個reduce會有多個數據文件

    啟動腳本和停止腳本:

    start-hdfs-ha-rm-ha.sh

    #!/bin/bash

    for node in node2 node3 node4

    do

    ?? ssh $node "source /etc/profile; zkServer.sh start"

    done

    sleep 1

    start-dfs.sh

    ssh node3 ". /etc/profile; start-yarn.sh"

    ssh node4 ". /etc/profile; yarn-daemon.sh start resourcemanager"

    echo "--------------node1-jps----------------"

    jps

    for node in node2 node3 node4

    do

    ? echo "---------------$node-jps-------------------"

    ? ssh $node "source /etc/profile; jps"

    done

    stop-hdfs-ha-rm-ha.sh

    #!/bin/bash

    ssh node4 ". /etc/profile; stop-yarn.sh"

    ssh node3 ". /etc/profile; yarn-daemon.sh stop resourcemanager"

    stop-dfs.sh

    for node in node2 node3 node4

    do

    ? ssh $node "source /etc/profile; zkServer.sh stop"

    done

    echo "-------------node1-jps-----------------"

    jps

    for node in node2 node3 node4

    do

    ? echo "---------------$node-jps-----------------"

    ? ssh $node "source /etc/profile; jps"

    done

    動手寫wordcount

    1、新建eclipse的java項目

    2、添加hadoop的jar包依賴

    121個jar包

    $HADOOP_HOME/share/hadoop/{common,common/lib,hdfs,hdfs/lib,mapreduce,mapreduce/lib,tools/lib,yarn,yarn/lib}.jar

    3、添加hadoop的配置文件到類路徑

    從集群拷貝這四個文件到當前項目類路徑

    core-site.xml

    hdfs-site.xml

    mapred-site.xml

    yarn-site.xml

    4、編寫Mapper、Reducer以及MainClass

    wordcount

    WCMapper.java

    package com.bjsxt.mr.wordcount;

    import java.io.IOException;

    import org.apache.hadoop.io.LongWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Mapper;

    public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

    ????????

    ???????? private Text outKey = new Text();

    ???????? private LongWritable outValue = new LongWritable();

    ????????

    ???????? @Override

    ???????? protected void map(LongWritable key, Text value, Context context)

    ??????????????????????????? throws IOException, InterruptedException {

    ??????????????????

    ?????????????????? Thread.sleep(9999999999L);

    ??????????????????

    ?????????????????? //一句話? hello bjsxt 1

    ?????????????????? String line = value.toString();

    ?????????????????? //將一句話按照空格隔開為單個單詞

    ?????????????????? // {"hello", "bjsxt", "1"}

    ?????????????????? String[] words = line.split(" ");

    ??????????????????

    ?????????????????? for (String word : words) {

    ??????????????????????????? outKey.set(word);

    ??????????????????????????? outValue.set(1);

    ??????????????????????????? // <"hello", 1>

    ??????????????????????????? // <"bjsxt", 1>

    ??????????????????????????? // <"1", 1>

    ??????????????????????????? context.write(outKey, outValue);

    ?????????????????? }

    ??????????????????

    ???????? }

    }

    WCReducer.java

    package com.bjsxt.mr.wordcount;

    import java.io.IOException;

    import java.util.Iterator;

    import org.apache.hadoop.io.LongWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Reducer;

    public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

    ????????

    ???????? private LongWritable outValue = new LongWritable();

    ????????

    ???????? @Override

    ???????? protected void reduce(Text key, Iterable<LongWritable> values, Context context)

    ??????????????????????????? throws IOException, InterruptedException {

    ?????????????????? // key表示的單詞出現的次數,總數

    ?????????????????? long sum = 0;

    ?????????????????? // 獲取values的迭代器,用于遍歷

    ?????????????????? Iterator<LongWritable> itera = values.iterator();

    ??????????????????

    ?????????????????? //<"zhangsan", 1>

    ?????????????????? //<"zhangsan-0", 1>

    ?????????????????? //<"zhangsan-1", 1>

    ?????????????????? //<"zhangsan-2", 1>

    ?????????????????? //<"zhangsan-3", 1>

    ?????????????????? //<"zhangsan-4", 1>

    ??????????????????

    ?????????????????? while (itera.hasNext()) {

    ??????????????????????????? // 獲取該值

    ??????????????????????????? LongWritable val = itera.next();

    ??????????????????????????? // 將該值轉換為long類型

    ??????????????????????????? long num = val.get();

    ??????????????????????????? // 逐個求和

    ??????????????????????????? sum += num;

    ?????????????????? }

    ?????????????????? // 將總數封裝為LongWritable類型對象

    ?????????????????? outValue.set(sum);

    ?????????????????? // 輸出到HDFS

    ?????????????????? context.write(key, outValue);

    ??????????????????

    ???????? }

    ????????

    }

    MainClass.java

    package com.bjsxt.mr.wordcount;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.LongWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Job;

    import org.apache.hadoop.mapreduce.JobContext;

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    public class MainClass {

    ????????

    ???????? public static void main(String[] args) throws Exception {

    ??????????????????

    ?????????????????? if (args == null || args.length != 2) {

    ??????????????????????????? System.out.println("Usage : yarn jar wc.jar com.bjsxt.mr.wordcount.MainClass <input path> <output path>");

    ??????????????????????????? System.exit(1);

    ?????????????????? }

    ??????????????????

    ?????????????????? Configuration conf = new Configuration(true);

    ??????????????????

    ?????????????????? Job job = Job.getInstance(conf);

    ?????????????????? //設置主入口程序

    ?????????????????? job.setJarByClass(MainClass.class);

    ?????????????????? // 設置作業名稱,該名稱可以在UI上看到

    ?????????????????? job.setJobName("我的數單詞");

    ??????????????????

    //?????????????? Path inputPath = new Path("/mr/wc/input/hello.txt");

    ?????????????????? Path inputPath = new Path(args[0]);

    ?????????????????? //設置輸入路徑

    ?????????????????? FileInputFormat.addInputPath(job, inputPath);

    ??????????????????

    //?????????????? Path outputPath = new Path("/mr/wc/output");

    ?????????????????? Path outputPath = new Path(args[1]);

    ?????????????????? //指定輸出路徑,該路徑一定不能存在

    ?????????????????? FileOutputFormat.setOutputPath(job, outputPath);

    ??????????????????

    ?????????????????? //指定mapper類

    ?????????????????? job.setMapperClass(WCMapper.class);

    ?????????????????? //指定reducer類

    ?????????????????? job.setReducerClass(WCReducer.class);

    ??????????????????

    ?????????????????? //map輸出鍵值對的key類型

    ?????????????????? job.setMapOutputKeyClass(Text.class);

    ?????????????????? //map端輸出鍵值對的value類型

    ?????????????????? job.setMapOutputValueClass(LongWritable.class);

    ??????????????????

    ?????????????????? //提交作業

    ?????????????????? job.waitForCompletion(true);

    ???????? }

    ????????

    }

    5、打包

    只打包三個類就可以。

    6、上傳

    7、運行

    yarn?? jar?? </path/to/your/jar.jar>? /<inputpath>? /<outputpath>

    總結

    以上是生活随笔為你收集整理的MapReduce概述及工作流程的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。