日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

day08 MapReduce

發布時間:2024/4/17 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 day08 MapReduce 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

PS:?HDFS對于MapReduce來說,HDFS就是一個就是一個客戶端。

PS:?離線就是?寫sql,sparkh還是寫sql

1. MAPREDUCE原理篇(1)

Mapreduce是一個分布式運算程序的編程框架,是用戶開發“基于hadoop的數據分析應用”的核心框架;

Mapreduce核心功能是將用戶編寫的業務邏輯代碼和自帶默認組件整合成一個完整的分布式運算程序,并發運行在一個hadoop集群上;

?

PS: 上圖為mapreduce的設計思想,以wordcount為例子,首先把一個任務分我 兩個階段。 第一map階段負責業務邏輯,第二數據的合并(比如按a-h合并,然后第二個安裝i-k)。中間還有還多負責的業務
管理邏輯,由mr application master來管理,協調調度。
PS: MapReduce程序都是依賴于HDFS,以流的形式讀取

----------------------------------------------------------------------------------------------------------------

1.1 為什么要MAPREDUCE

(1)海量數據在單機上處理因為硬件資源限制,無法勝任

(2)而一旦將單機版程序擴展到集群來分布式運行,將極大增加程序的復雜度和開發難度

(3)引入mapreduce框架后,開發人員可以將絕大部分工作集中在業務邏輯的開發上,而將分布式計算中的復雜性交由框架來處理

-----------------------------------------------------------

設想一個海量數據場景下的wordcount需求:

單機版:內存受限,磁盤受限,運算能力受限

分布式:

1、文件分布式存儲(HDFS)

2、運算邏輯需要至少分成2個階段(一個階段獨立并發,一個階段匯聚)

3、運算程序如何分發

4、程序如何分配運算任務(切片)

5、兩階段的程序如何啟動?如何協調?

6、整個程序運行過程中的監控?容錯?重試?

?

  可見在程序由單機版擴成分布式時,會引入大量的復雜工作。為了提高開發效率,可以將分布式程序中的公共功能封裝成框架,讓開發人員可以將精力集中于業務邏輯。

而mapreduce就是這樣一個分布式程序的通用框架,其應對以上問題的整體結構如下:

1、MRAppMaster(mapreduce application master)

2、MapTask

3、ReduceTask

?

1.2 MAPREDUCE框架結構及核心運行機制

1.2.1 結構

一個完整的mapreduce程序在分布式運行時有三類實例進程

1、MRAppMaster:負責整個程序的過程調度及狀態協調

2、mapTask:負責map階段的整個數據處理流程

3、ReduceTask:負責reduce階段的整個數據處理流程

-----------------------------------------------------------------------------

PS:寫程序代碼

1.配置項目,common和hdfs如之前配置

2.配置mapreduce

3.配置yarn。PS:

yarn Apache Hadoop YARN (Yet Another Resource Negotiator,另一種資源協調者)是一種新的 Hadoop 資源管理器,
它是一個通用資源管理系統,可為上層應用提供統一的資源管理和調度,它的引入為集群在利用率、資源統一管理和數據共享等方面帶來了巨大好處。
------------------
YARN的基本思想是將JobTracker的兩個主要功能(資源管理和作業調度/監控)分離,主要方法是創建一個全局的ResourceManager(RM)
和若干個針對應用程序的ApplicationMaster(AM)。這里的應用程序是指傳統的MapReduce作業或作業的DAG(有向無環圖)。

?

-------------------------------------------------------------------

package cn.itcast.bigdata.mr.wcdemo;import java.io.IOException;import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;/*** KEYIN: 默認情況下,是mr框架所讀到的一行文本的起始偏移量,Long,* 但是在hadoop中有自己的更精簡的序列化接口,所以不直接用Long,而用LongWritable* * VALUEIN:默認情況下,是mr框架所讀到的一行文本的內容,String,同上,用Text* * KEYOUT:是用戶自定義邏輯處理完成之后輸出數據中的key,在此處是單詞,String,同上,用Text* VALUEOUT:是用戶自定義邏輯處理完成之后輸出數據中的value,在此處是單詞次數,Integer,同上,用IntWritable* * @author**/public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{/*** map階段的業務邏輯就寫在自定義的map()方法中* maptask會對每一行輸入數據調用一次我們自定義的map()方法*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//將maptask傳給我們的文本內容先轉換成StringString line = value.toString();//根據空格將這一行切分成單詞String[] words = line.split(" ");//將單詞輸出為<單詞,1>for(String word:words){//將單詞作為key,將次數1作為value,以便于后續的數據分發,可以根據單詞分發,以便于相同單詞會到相同的reduce taskcontext.write(new Text(word), new IntWritable(1));}}} package cn.itcast.bigdata.mr.wcdemo;import java.io.IOException;import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;/*** KEYIN, VALUEIN 對應 mapper輸出的KEYOUT,VALUEOUT類型對應* * KEYOUT, VALUEOUT 是自定義reduce邏輯處理結果的輸出數據類型* KEYOUT是單詞* VLAUEOUT是總次數* @author**/ public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{/* 到這里的時候,數據已經按照一定的特性(規律如 統計a-h) 分組好了,所以入參的時候是一個同樣的key,values是Itrater專門用來迭代累加; 然后再執行第二組 hello ,1,...指導統計完
* <angelababy,1><angelababy,1><angelababy,1><angelababy,1><angelababy,1>* <hello,1><hello,1><hello,1><hello,1><hello,1><hello,1>* <banana,1><banana,1><banana,1><banana,1><banana,1><banana,1>* 入參key,是一組相同單詞kv對的key*/@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int count=0;/*Iterator<IntWritable> iterator = values.iterator();while(iterator.hasNext()){count += iterator.next().get();}*/for(IntWritable value:values){count += value.get();}context.write(key, new IntWritable(count));}} package cn.itcast.bigdata.mr.wcdemo;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/*** 相當于一個yarn集群的客戶端* 需要在此封裝我們的mr程序的相關運行參數,指定jar包* 最后提交給yarn* @author**/ public class WordcountDriver {public static void main(String[] args) throws Exception {if (args == null || args.length == 0) {args = new String[2];args[0] = "hdfs://192.168.8.10:9000/wordcount/input/wordcount.txt";args[1] = "hdfs://192.168.8.10:9000/wordcount/output";}Configuration conf = new Configuration();//設置的沒有用! ?????? 因為在linux下允許,所以這些注釋了 // conf.set("HADOOP_USER_NAME", "hadoop"); // conf.set("dfs.permissions.enabled", "false");// ---因為linux和windows操作系統的環境變量不同,所以不能直接運行在windows上,現在打包到linux系統上,那么,就不用注釋了(因為集群上已經配置過了)/*conf.set("mapreduce.framework.name", "yarn");conf.set("yarn.resoucemanager.hostname", "mini1");*/Job job = Job.getInstance(conf);/*job.setJar("/home/hadoop/wc.jar");*///指定本程序的jar包所在的本地路徑job.setJarByClass(WordcountDriver.class);//指定本業務job要使用的mapper/Reducer業務類job.setMapperClass(WordcountMapper.class);job.setReducerClass(WordcountReducer.class);//指定mapper輸出數據的kv類型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);//指定最終輸出的數據的kv類型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//指定job的輸入原始文件所在目錄FileInputFormat.setInputPaths(job, new Path(args[0]));//指定job的輸出結果所在目錄FileOutputFormat.setOutputPath(job, new Path(args[1]));//將job中配置的相關參數,以及job所用的java類所在的jar包,提交給yarn去運行/*job.submit();*/boolean res = job.waitForCompletion(true);System.exit(res?0:1);} }

PS:打包文件

?hadoop jar wordcount.jar cn.itcast.bigdata.mr.wcdemo.WordcountDriver /wordcount/input /wordcount/output1

PS:?紅色命令? 其實就是?調用?jar命令 ,只是把關聯的jar鏈接上了

?PS:在界面端也有一個專門界面。

PS:這個文件可以part-r-0000可以自定義文件的個數,現在把所有的文件都放在part-r-0000中。

PS:程序先啟動的時候先啟動Mr application master,然后再啟動map task 、reducetask

?

------------------------------------------------------------------------------------WordCount程序運行流程分析------------------------------------------------

?

PS:上圖應該這么看,首先待處理文件。當submit()的時候,會看到文件的規模,進行map()劃分(啟動maptask的多少,這個文件是job.split),然后job.split、wc.jar、job。xml提交到云端的YARN進行管理。
然后yarn啟動mr appmaster,啟動相應的Node Manager進行管理。
執行map()任務,他是通過InputFormat組件以流的形式讀入一定范圍的數據進來,每一個任務執行完了,接著通過另一個組件outputController會生成分區且排序的結果文件,
再進入reducetask()任務, 每傳過來maptask可以根據分區號進行分區,也就是輸出的part-r-0000X的變化的值。
在mapreduce線程中,執行業務邏輯,通過outputFormat輸出,最后生成文件(讀取多個的文件,輸出會進行文件)。

PS:? 上圖為輸出的應用結果,? ?key為單詞字符,value是每個單詞的個數

?---------------------------------

?

?-----------------------------------------MapReduce?應用編譯執行

?

1. PS:
start-dfs.sh //啟動應用
start-yarn.sh //啟動yarn管理器

?--------------------------------------------另一個程序練習

/*
PS: 打印什么內容是 由key 和 輸出 value對象的toString方法所決定的。
*/

?

?

PS:文件在hdfs上,用jar包進行編譯執行。

?

?

--------------------------切片和并行度的概念------------

1.3.1 mapTask并行度的決定機制

一個job的map階段并行度由客戶端在提交job時決定

而客戶端對map階段并行度的規劃的基本邏輯為:

將待處理數據執行邏輯切片(即按照一個特定切片大小,將待處理數據劃分成邏輯上的多個split),然后每一個split分配一個mapTask并行實例處理

Ps:由HDFS讀取到 日志文件后,根據文件block的大小進行并行度的確定。通常大文件直接有文件塊決定,決定時機是在 waitForCompletion這個函數執行時,會生成一個分塊的文件。
然后再mapreduce執行時,MRAppMaster就可以讀取到信息進行調度操作了。

PS:上圖為提交數據的完成的信息,里面有包含分塊的信息。所有配置參數,這些參數可以讓MRAppMaster讀取數據

?PS :客戶端提交MR程序job的流程

?

?-------------------------------------------------------------------------------------

按照不同號碼分區

package cn.itcast.bigdata.mr.provinceflow;import java.util.HashMap;import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner;/*** K2 V2 對應的是map輸出kv的類型, * @author* 返回的是分區地址的hashcode*/ public class ProvincePartitioner extends Partitioner<Text, FlowBean>{public static HashMap<String, Integer> proviceDict = new HashMap<String, Integer>();static{proviceDict.put("136", 0);proviceDict.put("137", 1);proviceDict.put("138", 2);proviceDict.put("139", 3);}@Overridepublic int getPartition(Text key, FlowBean value, int numPartitions) {String prefix = key.toString().substring(0, 3);Integer provinceId = proviceDict.get(prefix);return provinceId==null?4:provinceId;} } package cn.itcast.bigdata.mr.provinceflow;import java.io.IOException;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FlowCount {static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString(); //將一行內容轉成stringString[] fields = line.split("\t"); //切分字段String phoneNbr = fields[1]; //取出手機號long upFlow = Long.parseLong(fields[fields.length-3]); //取出上行流量下行流量long dFlow = Long.parseLong(fields[fields.length-2]);context.write(new Text(phoneNbr), new FlowBean(upFlow, dFlow));}}static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{//<183323,bean1><183323,bean2><183323,bean3><183323,bean4>....... @Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {long sum_upFlow = 0;long sum_dFlow = 0;//遍歷所有bean,將其中的上行流量,下行流量分別累加for(FlowBean bean: values){sum_upFlow += bean.getUpFlow();sum_dFlow += bean.getdFlow();}FlowBean resultBean = new FlowBean(sum_upFlow, sum_dFlow);context.write(key, resultBean);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();/*conf.set("mapreduce.framework.name", "yarn");conf.set("yarn.resoucemanager.hostname", "mini1");*/Job job = Job.getInstance(conf);/*job.setJar("/home/hadoop/wc.jar");*///指定本程序的jar包所在的本地路徑job.setJarByClass(FlowCount.class);//指定本業務job要使用的mapper/Reducer業務類job.setMapperClass(FlowCountMapper.class);job.setReducerClass(FlowCountReducer.class);//指定我們自定義的數據分區器job.setPartitionerClass(ProvincePartitioner.class);//同時指定相應“分區”數量的reducetaskjob.setNumReduceTasks(5); //數量必須和分區相對應,可以為1 可以大于分區個數。單數少于1到分區數會報錯。//指定mapper輸出數據的kv類型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);//指定最終輸出的數據的kv類型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//指定job的輸入原始文件所在目錄FileInputFormat.setInputPaths(job, new Path(args[0]));//指定job的輸出結果所在目錄FileOutputFormat.setOutputPath(job, new Path(args[1]));//將job中配置的相關參數,以及job所用的java類所在的jar包,提交給yarn去運行/*job.submit();*/boolean res = job.waitForCompletion(true);System.exit(res?0:1);}}

----------------------------作業:對以下數據進行排序,按照流量排序,這個是在之前生成數據的基礎上做的

PS:不要想著程序一下子全部完成,也可以分步驟解決問題、

package cn.itcast.bigdata.mr.flowsum;import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class FlowBean implements WritableComparable<FlowBean>{private long upFlow;private long dFlow;private long sumFlow;//反序列化時,需要反射調用空參構造函數,所以要顯示定義一個public FlowBean(){}public FlowBean(long upFlow, long dFlow) {this.upFlow = upFlow;this.dFlow = dFlow;this.sumFlow = upFlow + dFlow;}public void set(long upFlow, long dFlow) {this.upFlow = upFlow;this.dFlow = dFlow;this.sumFlow = upFlow + dFlow;}public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getdFlow() {return dFlow;}public void setdFlow(long dFlow) {this.dFlow = dFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow = sumFlow;}/*** 序列化方法*/@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(dFlow);out.writeLong(sumFlow);}/*** 反序列化方法* 注意:反序列化的順序跟序列化的順序完全一致*/@Overridepublic void readFields(DataInput in) throws IOException {upFlow = in.readLong();dFlow = in.readLong();sumFlow = in.readLong();}@Overridepublic String toString() { return upFlow + "\t" + dFlow + "\t" + sumFlow;}@Overridepublic int compareTo(FlowBean o) {return this.sumFlow>o.getSumFlow()?-1:1; //從大到小, 當前對象和要比較的對象比, 如果當前對象大, 返回-1, 交換他們的位置(自己的理解) }} package cn.itcast.bigdata.mr.flowsum;import java.io.IOException;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import cn.itcast.bigdata.mr.flowsum.FlowCount.FlowCountMapper; import cn.itcast.bigdata.mr.flowsum.FlowCount.FlowCountReducer;/*** 13480253104 180 180 360 13502468823 7335 110349 117684 13560436666 1116 954* 2070* * @author* */ public class FlowCountSort {static class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text> {FlowBean bean = new FlowBean();//!!!雖然這是一個對象,但是是序列化,所以不用擔心數據操作Text v = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 拿到的是上一個統計程序的輸出結果,已經是各手機號的總流量信息String line = value.toString();String[] fields = line.split("\t");String phoneNbr = fields[0];long upFlow = Long.parseLong(fields[1]);long dFlow = Long.parseLong(fields[2]);bean.set(upFlow, dFlow);v.set(phoneNbr);context.write(bean, v);}}/*** 根據key來掉, 傳過來的是對象, 每個對象都是不一樣的, 所以每個對象都調用一次reduce方法* @author: 張政* @date: 2016年4月11日 下午7:08:18* @package_name: day07.sample*/static class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> {// <bean(),phonenbr> @Overrideprotected void reduce(FlowBean bean, Iterable<Text> values, Context context) throws IOException, InterruptedException {context.write(values.iterator().next(), bean);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();/*conf.set("mapreduce.framework.name", "yarn");conf.set("yarn.resoucemanager.hostname", "mini1");*/Job job = Job.getInstance(conf);/*job.setJar("/home/hadoop/wc.jar");*///指定本程序的jar包所在的本地路徑job.setJarByClass(FlowCountSort.class);//指定本業務job要使用的mapper/Reducer業務類job.setMapperClass(FlowCountSortMapper.class);job.setReducerClass(FlowCountSortReducer.class);//指定mapper輸出數據的kv類型job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(Text.class);//指定最終輸出的數據的kv類型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//指定job的輸入原始文件所在目錄FileInputFormat.setInputPaths(job, new Path(args[0]));//指定job的輸出結果所在目錄 Path outPath = new Path(args[1]);/*FileSystem fs = FileSystem.get(conf);if(fs.exists(outPath)){fs.delete(outPath, true);}*/FileOutputFormat.setOutputPath(job, outPath);//將job中配置的相關參數,以及job所用的java類所在的jar包,提交給yarn去運行/*job.submit();*/boolean res = job.waitForCompletion(true);System.exit(res?0:1);}}

?

?

?

3.1.4 詳細流程示意圖? ? ? ? --------------這個圖非常重要

?------------------------Yarn拿到程序怎樣啟動,這是今天關心的問題---------------------------------

PS:

轉載于:https://www.cnblogs.com/bee-home/p/7908169.html

總結

以上是生活随笔為你收集整理的day08 MapReduce的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。