生活随笔
收集整理的這篇文章主要介紹了
Hadoop之MapReduce入门
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
Hadoop之MapReduce概述
目錄
MapReduce定義MapReduce優缺點MapReduce核心思想MapReduce進程MapReduce編程規范MapReduce案例實操
1. MapReduce定義
Mapreduce是一個分布式運算程序的編程框架,是用戶開發“基于hadoop的數據分析應用”的核心框架。Mapreduce核心功能是將用戶編寫的業務邏輯代碼和自帶默認組件整合成一個完整的分布式運算程序,并發運行在一個hadoop集群上。
2. MapReduce優缺點
優點
MapReduce 易于編程
它簡單的實現一些接口,就可以完成一個分布式程序,這個分布式程序可以分布到大量廉價的PC機器上運行。也就是說你寫一個分布式程序,跟寫一個簡單的串行程序是一模一樣的。就是因為這個特點使得MapReduce編程變得非常流行。良好的擴展性
當你的計算資源不能得到滿足的時候,你可以通過簡單的增加機器來擴展它的計算能力。高容錯性
MapReduce設計的初衷就是使程序能夠部署在廉價的PC機器上,這就要求它具有很高的容錯性。比如其中一臺機器掛了,它可以把上面的計算任務轉移到另外一個節點上運行,不至于這個任務運行失敗,而且這個過程不需要人工參與,而完全是由Hadoop內部完成的。適合PB級以上海量數據的離線處理
這里加紅字體離線處理,說明它適合離線處理而不適合在線處理。比如像毫秒級別的返回一個結果,MapReduce很難做到。
缺點
MapReduce不擅長做實時計算、流式計算、DAG(有向圖)計算。
實時計算
MapReduce無法像Mysql一樣,在毫秒或者秒級內返回結果。流式計算
流式計算的輸入數據是動態的,而MapReduce的輸入數據集是靜態的,不能動態變化。這是因為MapReduce自身的設計特點決定了數據源必須是靜態的。DAG(有向圖)計算
多個應用程序存在依賴關系,后一個應用程序的輸入為前一個的輸出。在這種情況下,MapReduce并不是不能做,而是使用后,每個MapReduce作業的輸出結果都會寫入到磁盤,會造成大量的磁盤IO,導致性能非常的低下。
3. MapReduce核心思想
分布式的運算程序往往需要分成至少2個階段。
第一個階段的maptask并發實例,完全并行運行,互不相干。
第二個階段的reduce task并發實例互不相干,但是他們的數據依賴于上一個階段的所有maptask并發實例的輸出。
MapReduce編程模型只能包含一個map階段和一個reduce階段,如果用戶的業務邏輯非常復雜,那就只能多個mapreduce程序,串行運行。
4. MapReduce進程
一個完整的mapreduce程序在分布式運行時有三類實例進程:
MrAppMaster:負責整個程序的過程調度及狀態協調。MapTask:負責map階段的整個數據處理流程。ReduceTask:負責reduce階段的整個數據處理流程。
5. MapReduce編程規范
用戶編寫的程序分成三個部分:Mapper、Reducer和Driver。
Mapper階段(k-long首字母的偏移量,v-string 記錄一行的數據)
用戶自定義的Mapper要繼承自己的父類Mapper的輸入數據是KV對的形式(KV的類型可自定義)Mapper中的業務邏輯寫在map()方法中Mapper的輸出數據是KV對的形式(KV的類型可自定義)map()方法(maptask進程)對每一個<K,V>調用一次 Reducer階段
用戶自定義的Reducer要繼承自己的父類Reducer的輸入數據類型對應Mapper的輸出數據類型,也是KVReducer的業務邏輯寫在reduce()方法中Reducetask進程對每一組相同k的<k,v>組調用一次reduce()方法 Driver階段
相當于yarn集群的客戶端,用于提交我們整個程序到yarn集群,提交的是封裝了mapreduce程序相關運行參數的job對象
6. MapReduce案例實操
WordCount案例實操
需求
在給定的文本文件中統計輸出每一個單詞出現的總次數數據準備(hello.txt)
hello world
atguigu atguigu
hadoop
spark
hello world
atguigu atguigu
hadoop
spark
hello world
atguigu atguigu
hadoop
spark
分析
按照mapreduce編程規范,分別編寫Mapper,Reducer,Driver,如下圖
4. 導入相應的依賴坐標+日志添加
<dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>RELEASE</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.8.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.7.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.7.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>2.7.2</version></dependency>
</dependencies>
在項目的src/main/resources目錄下,新建一個文件,命名為“log4j.properties”,在文件中填入
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
編寫程序 編寫mapper類
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{Text k = new Text();IntWritable v = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {// 1 獲取一行String line = value.toString();// 2 切割String[] words = line.split(" ");// 3 輸出for (String word : words) {k.set(word);context.write(k, v);}}
}
編寫reducer類
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{int sum;
IntWritable v = new IntWritable();@Overrideprotected void reduce(Text key, Iterable<IntWritable> value,Context context) throws IOException, InterruptedException {// 1 累加求和sum = 0;for (IntWritable count : value) {sum += count.get();}// 2 輸出v.set(sum);context.write(key,v);}
}
編寫驅動類
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WordcountDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {// 1 獲取配置信息以及封裝任務Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);// 2 設置jar加載路徑job.setJarByClass(WordcountDriver.class);// 3 設置map和reduce類job.setMapperClass(WordcountMapper.class);job.setReducerClass(WordcountReducer.class);// 4 設置map輸出job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);// 5 設置Reduce輸出job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 6 設置輸入和輸出路徑FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 7 提交boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}
總結
以上是生活随笔為你收集整理的Hadoop之MapReduce入门的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。