MapReduce流程(WordCount案例实现)
文章目錄
- 1 MapReduce概述
- 設計構思
- 實例進程
- 實例進程分類
- 完整執行過程
- 總結
- 2 MapReduce編程規范
- Map階段2個步驟
- Shuffle階段4個步驟
- Reduce階段2個步驟
- 3.實現WordCount案例
- 3.1準備工作
- 3.2Map代碼編寫
- 3.3Reduce代碼編寫
- 3.4任務類編寫
- 4.MapReduce運行模式
- 4.1 集群運行模式
- 4.2 本地運行模式
1 MapReduce概述
設計構思
MapReduce是一個分布式運算程序的編程框架,核心功能是將用戶編寫的業務邏輯代碼和自帶默認組件整合成一個完整的分布式運算程序,并發運行在Hadoop集群上。
MapReduce設計并提供了統一的計算框架,為程序員隱藏了絕大多數系統層面的處理細節。為程序員提供一個抽象和高層的編程接口和框架。程序員僅需要關心其應用層的具體計算問題,僅需編寫少量的處理應用本身計算問題的程序代碼。如何具體完成這個并行計算任務所相關的諸多系統層細節被隱藏起來,交給計算框架去處理:
Map和Reduce為程序員提供了一個清晰的操作接口抽象描述。MapReduce中定義了如下的Map和Reduce兩個抽象的編程接口,由用戶去編程實現.Map和Reduce,MapReduce處理的數據類型是<key,value>鍵值對。
以WordCount為例:
-
Map: ( k1 ; v1 ) →[ ( k2 ; v2 ) ]
-
Shuffle過程(不需要我們寫)
-
Reduce:( k2 ; [ v2 ] )→[ ( k3 , v3 ) ]
實例進程
實例進程分類
一個完整的mapreduce程序在分布式運行時有三類實例進程:
完整執行過程
Client提交計算任務
啟動AppMaster進程
AppMaster請求分配資源
ResourceManager回復資源列表
AppMaster要求NodeManager分配資源
NodeManager執行具體的計算任務
NodeManager將計算狀態和結果匯報給AppMaster
AppMaster匯報計算結果
總結
ResourceManager分配任務
NodeManager實際執行任務
2 MapReduce編程規范
Map階段2個步驟
Shuffle階段4個步驟
Reduce階段2個步驟
3.實現WordCount案例
3.1準備工作
1)在hadoop集群上上傳文件至hdfs
#新建一個文件 vim wordcount.txt#在文件中寫入實例數據 按i鍵進入文件寫入模式,寫入后按esc,輸入:wq!保存更改 hello,world,hadoop hive,hello,tom love,hadoop hdfs#上傳到HDFS ## 在HDFS文件系統中增加目錄wordcount hdfs dfs -mkdir /wordcount/ ## 將剛剛創建的文件上傳至該文件夾,登錄web界面(50070)可以看到成功上傳 hdfs dfs -put wordcount.txt /wordcount2)導入依賴pom.xml
注意查看自己的hadoop版本
<dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.6.1</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>2.6.1</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.6.1</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency> </dependencies>3)測試hadoop連接環境
可以使用BigData插件測試,參照文章https://blog.csdn.net/weixin_44155966/article/details/108820920
3.2Map代碼編寫
package com.hunan.MapReduce;import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/* 四個泛型解釋: KEYIN:K1的類型 偏移量 VALUEIN:V1的類型 每行字符串KEYOUT:K2的類型 單詞 VALUEOUT:V2的類型 固定值1*/ //hadoop自己的類型 public class WordCountMapper extends Mapper<LongWritable,Text, Text,LongWritable> {//map方法就是將K1,V1轉為K2,V2/*參數:key : K1 行偏移量value : V1 每一行的文本數據context: 表示上下文對象,將各個流程連在一起*//*如何將K1,V1轉換為K2,V2K1 V10 hello,world,hadoop19 hive,hello,tom----------------------K2 V2hello 1world 1hadoop 1hive 1hello 1tom 1*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {Text text = new Text();LongWritable longWritable = new LongWritable();//1.將一行的文本數據進行拆分String[] split = value.toString().split(",");//2.遍歷數組,組裝K2,V2for (String s : split) {//3.將K2,V2寫入上下文中//context.write(new Text(s), new LongWritable(1));text.set(s);longWritable.set(1);context.write(text,longWritable);}} }3.3Reduce代碼編寫
package com.hunan.MapReduce;import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/* 四個泛型解釋: KEYIN:K2的類型 單詞 VALUEIN:V2的類型 1KEYOUT:K3的類型 單詞 VALUEOUT:V3的類型 個數*/ public class WordCountReducer extends Reducer<Text, LongWritable,Text,LongWritable> {//map方法就是將新K2,V2轉為K3,V3/*參數:key: 新K2values: 集合 新V2context:上下文對象*//*如何將新K2,V2轉為K3,V3新K2 新V2hello <1,1>world <1>hadoop <1,1,1>---------------K3 V3hello 2world 1hadoop 3*/@Overrideprotected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {long count=0;//1.遍歷集合,將集合中數字相加,得到V3for (LongWritable value : values) {count+=value.get();}//2.將K2和V3寫入上下文中context.write(key,new LongWritable(count));} }3.4任務類編寫
package com.hunan.MapReduce;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;public class JobMain extends Configured implements Tool {//該方法用于指定一個job任務,從提交到結果保存的整個任務public int run(String[] args) throws Exception {//1.創建一個job任務對象Job job = Job.getInstance(super.getConf(), "WordConut");//2.配置job任務對象(八個步驟)//第一步:指定文件讀取方式和讀取路徑job.setInputFormatClass(TextInputFormat.class);//TextInputFormat.addInputPath(job,new Path("hdfs://master:9000/wordcount"));TextInputFormat.addInputPath(job,new Path("file:///C:\\wordcount"));//第二步:指定Map階段的處理方式和數據類型job.setMapperClass(WordCountMapper.class);job.setMapOutputKeyClass(Text.class);//設置Map階段K2的類型job.setMapOutputKeyClass(LongWritable.class);//設置Map階段V2的類型//第三,四,五,六步 采用默認的shuffle階段處理//第七步:指定ruduce階段的處理方式和數據類型job.setReducerClass(WordCountReducer.class);job.setOutputKeyClass(Text.class);//設置Reduce階段K2的類型job.setMapOutputKeyClass(LongWritable.class);//設置Reduce階段V2的類型//第八步:設置輸出類型和輸出路徑job.setOutputFormatClass(TextOutputFormat.class);//TextOutputFormat.setOutputPath(job,new Path("hdfs://master:9000/wordcount_out"));TextOutputFormat.setOutputPath(job,new Path("file:///D:\\wordcount_output"));//等待任務結束boolean b = job.waitForCompletion(true);return b? 0 : 1;}public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();//configuration.set("mapred.job.tracker", "192.168.60.101:9000");//啟動job任務int run = ToolRunner.run(configuration, new JobMain(), args);System.exit(run);} }4.MapReduce運行模式
4.1 集群運行模式
將MapReduce程序提交給Yarn集群,分發到很多的節點上并發執行
處理的數據和輸出結果應該位于HDFS文件系統
提交集群的實現步驟:將程序打成JAR包(雙擊右側maven-Lifecycle-package),并上傳至節點,然后在集群上用hadoop命令啟動。
兩個參數分別為jar包名,main方法的路徑
4.2 本地運行模式
總結
以上是生活随笔為你收集整理的MapReduce流程(WordCount案例实现)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 统计学习方法的三要素
- 下一篇: Jupyter notebook 多行注