【MapReduce】 MR初识
文章目錄
- 一、定義
- 二、優(yōu)缺點
- 三、MR核心編程思想 --- 案例WordCount
- 四、MR進程
- 五、MR編程規(guī)范
- 六、wordCount案例
- ① 創(chuàng)建工程
- ▲創(chuàng)建Maven項目
- ▲pom.xml文件添加依賴
- ▲配置日志文件
- ② 代碼實現(xiàn)
- ③ 本地運行
- ④ 集群運行
- ▲環(huán)境配置 --- 打jar包Maven依賴
- ▲生成jar包
- ▲具體執(zhí)行
- ▲查看結(jié)果
一、定義
1.MR是一個分布式運算程序的編程框架,是用戶開發(fā)“基于Hadoop的數(shù)據(jù)分析應用”的核心框架。
2.MR的核心功能是將用戶編寫的業(yè)務邏輯代碼和自帶默認組件合成一個完整的分布式運算程序,并發(fā)運行在一個Hadoop集群上。
返回頂部
二、優(yōu)缺點
?優(yōu)點:
- 易于編程 : 簡單的實現(xiàn)一些接口就可以完成分布式程序 ---- 與簡單的串行程序一樣。
* 良好的擴展性 : 可以通過增加機器提高運行。
* 高容錯性:可以將故障機器上的計算任務轉(zhuǎn)移至另一個節(jié)點上運行,完全有Hadoop內(nèi)部自動完成。
* 適合PB級以上海量數(shù)據(jù)的離線處理
? 缺點:
- 不擅長實時計算:無法像mysql一樣,在毫秒或者秒級內(nèi)返回結(jié)果。
* 不擅長流式計算:流式計算的輸入數(shù)據(jù)是動態(tài)的,而MR的數(shù)據(jù)數(shù)據(jù)集是靜態(tài)的,不能動態(tài)變化。
* 不擅長DAG有向圖的計算:多個應用程序之間存在依賴關系,后一個應用程序的輸入為前一個的輸出。此情況下,MR可以做,但是會造成大量的磁盤IO,性能低下。
返回頂部
三、MR核心編程思想 — 案例WordCount
-
MR運算程序分為兩個部分:Map 、 Reduce。
-
Map數(shù)據(jù)的劃分:按照塊級劃分,默認每128M為一塊。
-
每一塊數(shù)據(jù)集對應啟動一個MapTask,實行完全并行計算,互不干擾。
? Task處理時,讀取數(shù)據(jù)并按行處理,按空格切分
? 形成(K,V)鍵值對的形式
? 將所有的鍵值對按照單詞首字母,分成兩個分區(qū)溢寫到磁盤 -
Reduce階段的并發(fā)ReduceTask,完全互不相犯。但是數(shù)據(jù)來源于上一個階段所有MapTask并發(fā)實例的輸出。
-
MR編程模型只能包含一個M端和一個R端,若業(yè)務邏輯復雜,可采用多個MR程序串行運行。
返回頂部
四、MR進程
一個完整的MR程序在分布式運行時有三類實例進程:
■ MrAppMaster :負責成個程序的過程調(diào)度及狀態(tài)協(xié)調(diào) (準備一些資源 ~ 內(nèi)存~ CPU~ 開啟后續(xù)進程等等,job級別)
■ MapTask : 負責Map階段的整個數(shù)據(jù)處理流程 — 分
■ ReduceTask : 負責Reduce階段的整個數(shù)據(jù)處理流程 — 合
返回頂部
五、MR編程規(guī)范
三部分:Map 、 Reduce 、 Driver
-
Map階段
- 用戶自定義的Mapper要繼承自己的父類
- Mapper的輸入、輸出數(shù)據(jù)都是KV對的形式,且類型可自定義
- Mapper中的業(yè)務邏輯寫在map()方法中
- map()方法(MapTask進程)對每一個<K,V>只調(diào)用一次 — 對輸入行級數(shù)據(jù)只調(diào)用一次,處理后輸出 -
Reduce階段
- 用戶自定義的Reducer要繼承自己的父類
- Reducer的輸入數(shù)據(jù)類型對應Mapper的輸出數(shù)據(jù)類型,也是KV對(Reducer的數(shù)入,就是Mapper的輸出)
- Reducer中的業(yè)務邏輯寫在Reducer()方法中
- ReduceTask進程對每一組相同 K 的 <K,V> 組調(diào)用一次reduce()方法 -
Driver階段
- 相當于Yarn集群的客戶端,用于提交我們整個程序到Y(jié)arn集群,提交的是封裝了MR程序相關運行參數(shù)的Job對象
返回頂部
六、wordCount案例
① 創(chuàng)建工程
▲創(chuàng)建Maven項目
返回頂部
▲pom.xml文件添加依賴
<dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>RELEASE</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.8.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.7.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.7.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>2.7.2</version></dependency> </dependencies>返回頂部
▲配置日志文件
在項目的src/main/resources目錄下,新建一個文件,命名為“l(fā)og4j.properties”,在文件中填入。
log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n返回頂部
② 代碼實現(xiàn)
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** Mapper 階段* KEYIN 輸入數(shù)據(jù)的key類型* VALUEIN 輸入數(shù)據(jù)的value類型* KEYOUT 輸出數(shù)據(jù)的key類型* VALUEOUT 輸出數(shù)據(jù)的value類型*/ public class wordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {// 創(chuàng)建對象Text k = new Text();IntWritable v = new IntWritable();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 1.獲取一行數(shù)據(jù)// atguigu atguiguString line = value.toString();// 2.切分String[] words = line.split(" ");// 3.循環(huán)寫出for (String word:words){// 設置鍵 atguiguk.set(word);// 設置詞頻為 1 , 也可以在上面創(chuàng)建對象時默認為1v.set(1);// 生成鍵值對 (atguigu,1)context.write(k,v);}} } import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException;/*** Reducer 階段* KEYIN ,VALUEIN Reducer階段輸入(Mapper階段輸出)數(shù)據(jù)的類型* KEYOUT 最終輸出數(shù)據(jù)的key類型* VALUEOUT 最終輸出數(shù)據(jù)的value類型*/ public class wordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {IntWritable v = new IntWritable();@Override// Iterable<IntWritable> values 對key的value值進行迭代實現(xiàn)詞頻統(tǒng)計protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {// atguigu,1// atguigu,1// 1.累加求和int sum = 0;for (IntWritable value:values){// value是IntWritable類型數(shù)據(jù),通過get轉(zhuǎn)為int型,才好計算sum += value.get();}// 2.寫出結(jié)果v.set(sum);context.write(key,v);} } import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException;public class wordCountDriver {public static void main(String[] args) {Configuration conf = new Configuration();Job job = null;try {// 1.獲取job對象job = Job.getInstance(conf);// 2.設置jar存儲位置job.setJarByClass(wordCountDriver.class);// 3.關聯(lián)map、reduce類job.setMapperClass(wordCountMapper.class);job.setReducerClass(wordCountReducer.class);// 4.設置Mapper階段輸出數(shù)據(jù)的key、value類型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);// 5.設置Reducer階段輸出數(shù)據(jù)的key、value類型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 6.設置輸入、出路徑 // FileInputFormat.setInputPaths(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第一章_MR概述\\dataset\\")); // FileOutputFormat.setOutputPath(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第一章_MR概述\\output\\"));// 打jar包FileInputFormat.setInputPaths(job,new Path(args[0]));FileOutputFormat.setOutputPath(job,new Path(args[1]));// 7.提交jobjob.waitForCompletion(true);} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();} catch (ClassNotFoundException e) {e.printStackTrace();}} }返回頂部
③ 本地運行
- 設置輸入輸出路徑為本地路徑
- 運行結(jié)果
返回頂部
④ 集群運行
▲環(huán)境配置 — 打jar包Maven依賴
<plugin><artifactId>maven-assembly-plugin </artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><mainClass>第一章_MR概述.wordCountDriver</mainClass> // 這里要對應自己的主類路徑進行修改</manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin>返回頂部
▲生成jar包
返回頂部
▲具體執(zhí)行
- 上傳文件到hdfs,以及通過Xshell上傳jar包至虛擬機~
- 輸出文件不需要創(chuàng)建,在執(zhí)行jar包時會自動創(chuàng)建,如不刪掉,會報文件夾已存在的異常。所以在上面刪除了output文件夾~
- 執(zhí)行jar包運行命令
- hadoop jar jar包名 主類 輸入文件路徑 輸出文件路徑
返回頂部
▲查看結(jié)果
返回頂部
<!-- 這里是以后會用的一些jar的總配置,僅供參考!!! --> <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.zyx</groupId><artifactId>MR</artifactId><version>1.0-SNAPSHOT</version><dependencies><!-- 測試 --><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>RELEASE</version></dependency><!-- 日志 --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.8.2</version></dependency><!-- hadoop --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.7.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.7.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>2.7.2</version></dependency><!-- jdbc --> <!-- <dependency>--> <!-- <groupId>mysql</groupId>--> <!-- <artifactId>mysql-connector-java</artifactId>--> <!-- <version>5.1.40</version>--> <!-- </dependency>--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.19</version></dependency><!-- 分詞器 --><dependency><groupId>com.janeluo</groupId><artifactId>ikanalyzer</artifactId><version>2012_u6</version></dependency><!-- fastJson 依賴 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.6</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.6.0</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding><compilerArguments><extdirs>libs</extdirs><!-- rt包沒有打到項目中去 --><verbose /><!-- C:/Program Files/Java/jdk1.8.0_201 是我本地安裝的jdk家目錄,rt.jar等jar 我在 jdk家目錄下的 /jre/lib/ 目錄中有發(fā)現(xiàn)存在,你們需要注意確認自己的實際情況,Windows分隔符英文分號,linux分隔符英文冒號 --><bootclasspath>G:/Projects/jdk1.8.0_202/jre/lib/rt.jar;G:/Projects/jdk1.8.0_202/jre/lib/jce.jar;G:/Projects/jdk1.8.0_202/jre/lib/jsse.jar</bootclasspath></compilerArguments></configuration></plugin> <!-- <plugin>--> <!-- <artifactId>maven-compiler-plugin</artifactId>--> <!-- <version>2.3.2</version>--> <!-- <configuration>--> <!-- <source>1.8</source>--> <!-- <target>1.8</target>--> <!-- </configuration>--> <!-- </plugin>--><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><archive><manifest><mainClass>通信數(shù)據(jù)處理.DriverTest</mainClass></manifest></archive><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><!--下面是為了使用 mvn package命令,如果不加則使用mvn assembly--><executions><execution><id>make-assemble</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build> </project>
總結(jié)
以上是生活随笔為你收集整理的【MapReduce】 MR初识的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。