Hadoop学习之MapReduce
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Hadoop學習之MapReduce
目錄
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Hadoop學習之MapReduce
1 MapReduce簡介
1.1 什么是MapReduce
1.2?MapReduce的作用
1.3?MapReduce的運行方式
2?MapReduce的運行機制
2.1 相關進程
2.2?MapReduce的編程套路
2.3?MapTask的并行度
2.4?切片及其源碼解讀
2.5?ReduceTask的并行度與分區
3?hadoop中的自定義類型
4 MapReduce中的排序
4 MapReduce中的分組
5 MapReduce中的Combiner
6 MapReduce中的Shuffle
7 MapReduce的join
7.1 reduce join
7.2 map join
1 MapReduce簡介
1.1 什么是MapReduce
MapReduce是一個分布式運算程序的編程框架,它的核心功能是將用戶編寫的業務邏輯代碼和自帶默認組件整合成一個完整的的分布式運算程序,并發運行在一個hadoop集群上。
1.2?MapReduce的作用
海量數據在單機上處理的話磁盤受限,內存受限,計算能力受限是無法勝任的,將單機版程序擴展到集群來分布式運行,將極大增加程序的復雜度和開發難度,而MapReduce 框架,使得開發人員可以將絕大部分工作集中在業務邏輯的開發上,而將分布式計算中的復雜性交由框架來處理。
1.3?MapReduce的運行方式
(1)打jar包運行:①將代碼打成jar包上傳到linux服務器;②使用hadoop命令提交到yarn集群運行;③處理的數據文件和結果文件位于hdfs文件系統
(2)本地運行(程序沒有被提交到集群上):①windows 本地安裝 hadoop,并且配置環境變量,hadoop 必須是?windows 平臺上編譯的對應版本,否則將安裝目錄的 lib 和 bin 目錄替換成 windows 平臺編譯版本;②將hadoop.dll 放置在 c:/windows/system32 文件夾中;③winutils.exe 放置在$HADOOP_HOME/bin 目錄中;④在?eclipse 配置 hadoop 安裝目錄, windows -->?preferences -->Hadoop Map/Reduce --> Hadoop Installation Direcotry
(3)本地運行(將代碼提交到集群上):需要修改配置 ,修改源碼 ,不適用這里就不介紹了。
2?MapReduce的運行機制
2.1 相關進程
一個完整的 MapReduce 程序在分布式運行時有兩類實例進程:
1、MRAppMaster(MapReduce Application Master):負責整個程序的過程調度及狀態協調
2、Yarnchild:負責 map 階段的整個數據處理流程(對應 MapTask階段并發任務),與reduce 階段的整個數據處理流程 (對應ReduceTask:階段匯總任務)
注意: MapTask 和 ReduceTask 的進程都是 YarnChild,并不是說這 MapTask 和 ReduceTask 就跑在同一個 YarnChild 進行里,每個MapTask 和 ReduceTask都對應一個YarnChild進程。
2.2?MapReduce的編程套路
(1)根據客戶指定的 InputFormat 來獲取 RecordReader 讀取數據,形成輸入 KV 對;
(2)將輸入 KV 對傳遞給客戶定義的 map()方法,做邏輯運算,并將寫出;
(3)將map()方法輸出的 KV 進行shuffle處理,溢寫,分區,排序分組等一系列操作;
(4)Reducetask 進程啟動之后,從若干臺 maptask 運行所在機器上獲取到若干個 maptask 輸出結果文件,并在本地重新歸并排序, 然后按照相同 key 的 KV 為一個組,調用客戶定義的 reduce()方法進行邏輯運算,并收集運算輸出的結果 KV。
(5)調用客戶指定的 OutputFormat 將結果數據輸出到外部存儲
2.3?MapTask的并行度
MapTask:運行Mapper端邏輯的任務
并行度:有多少個MapTask一起運行
MapTask并行度:Mapper端邏輯在進行運行的時候,需要拆分成多少個task(任務),這個task是job運行的最小單位,不可拆分,一個task只能在一個節點上運行。
maptask 的并行度決定 map 階段的任務處理并發度,那么每一個任務對應一部分原始數據,那么這個數據因該多大呢?
hdfs的默認存儲的塊大小是128M,假設一個任務對應一個的數據量是1G(hdfs上8個塊,可能分散存儲到不同的節點),那么在獲取數據的時候,就必然會面臨跨節點傳輸問題,計算效率是非常低的。
假設一個任務對應的數據量是100M,要處理一個200M的文件(分為兩個塊block1和block2),那么task1處理的是block1塊的0-100M,task2處理的是block1塊的100-128M和block2塊的129-200M,仍然會造成跨節點數據傳輸問題 降低計算的效率。
綜上所述,一個任務對應的數據 最合理的應該就是和hdfs數據存儲的塊的大小一致 128M 。
注意:每一個任務只會被分配到每一個節點的一小部分資源,一個節點上可以執行多個任務(maptask|reducetask), ?一個任務只能在一個節點執行。
一個 job 的 map 階段并行度由客戶端在提交 job 時決定,客戶端對 map 階段并行度的規劃的基本邏輯為: 將待處理數據執行邏輯切片(即按照一個特定切片大小,將待處理數據劃分成邏輯上的多 個 split),然后每一個 split 分配一個 mapTask 并行實例處理。
?
2.4?切片及其源碼解讀
決定maptask并行度的的邏輯切片規劃描述文件,是由FileInputFormat實現類的getSplits()方法完成的。 該方法返回的是 List,InputSplit 封裝了每一個邏輯切片的信息,包括長度和位置信息,而 getSplits()方法返回一組 InputSplit。
源碼實現如下:
切片肯定是在map之前運行的,map之前的類是FileInputFormat文件加載,我們進入FileInputFormat類中,這個類中肯定會有切片相關的方法,getSplits方法就是進行邏輯切片的方法。首先看它的返回值:它的返回值是List集合,泛型是InputSplit,InputSplit對應的就是每一個邏輯切片對象,有一個邏輯切片就封裝成這個對象,所以List集合就是封裝所有切片對象的集合,所以返回值就是所有切片的集合。然后看它的參數JobContext job,是貫穿整個Job的上下文對象
因為返回值是一個list集合,所以這個方法里面肯定有把切片放到集合的過程。添加切片到集合的方法如下圖,其中splitSize為核心參數,既然這邊有用了這個參數,那么前面肯定有對這個參數的賦值,往前走找到最早對這個splitSize賦值的語句。
splitSize由computeSplitSize方法得來,這個方法有3個參數為blockSize, minSize, maxSize。blockSize是配置文件中塊的大小,默認128M。往前找minSize
minSize通過調用Math包下的max方法得到getFormatMinSplitSize()方法的返回值和getMinSplitSize(job)方法返回值的較大值。
進入getFormatMinSplitSize()方法,得知返回值為1
進入getMinSplitSize(job)方法
SPLIT_MINSIZE屬性如下
由于自己沒有配置過這個屬性,所以進入mapred-default.xml配置文件查找
從mapred-default.xml中可知mapreduce.input.fileinputformat.split.minsize默認為0,所以SPLIT_MINSIZE為0,那么job.getConfiguration().getLong(SPLIT_MINSIZE, 1L)方法返回0。那么Math.max(getFormatMinSplitSize(), getMinSplitSize(job))的返回值是1即minSize為1。
回到下圖中,我們已經知道了minSize
接下來查看maxSize的大小,進入到getMaxSplitSize方法,分析方法如上。
在core-site.xml配置文件找不到這個屬性,則返回Long的最大值,所以getMaxSplitSize的返回值為Long的最大值,即maxSize為Long的最大值
現在回到computeSplitSize方法,這三個參數都明確了blockSize為128M,minSize為0,maxSize為Long的最大值
進入到computeSplitSize方法的具體實現
Math.min(maxSize, blockSize)返回128M,所以Math.max(minSize, Math.min(maxSize, blockSize))返回128M。所以最終默認splitSize的大小等于blockSize的大小。
修改切片大小:
由上述可知:minSize對應mapreduce.input.fileinputformat.split.minsize
maxSize對應mapreduce.input.fileinputformat.split.maxsize
如果想要splitSize>blockSzie(128M),那就修改minSize
如果想要splitSize<blockSzie,那就修改maxSize
修改方式:①修改mapred-site.xml:不建議,這么改就寫死了,對所有任務生效,注意:單位是M
②代碼中修改,注意size是字節單位,提倡這個方法
?
知道了切片大小后,重新讀取getSplits方法。
getSplits方法中創建存放inputSplit的結果集后獲取輸入路徑下的所有文件,循環遍歷這些文件,如果文件的長度不為0的話,判斷是分布式文件系統還是本地文件系統,如果是分布式文件系統,獲取當前文件的分塊信息。然后再判斷當前文件是否可切分,如果可以切分,獲得邏輯切片大小和塊大小,并定義一個當前文件剩余大小的屬性,進行循環切分,條件是當前文件剩余大小除以邏輯切片大小大于1.1的話,所以一個文件的最后一個切片最大為128*1.1M,如果,最后一個切片大小為12M-128*1.1M的話會跨節點訪問,但是這樣還是比重新啟動一個maptask的效率高。
?
2.5?ReduceTask的并行度與分區
2.5.1 ReduceTask的并行度
ReduceTask并行度:運行reducer邏輯的任務的并行運行的個數。
ReduceTask并行度的并發數與 Map Task 的并發數不同, Map Task 由切片數決定不同,Reducetask 數量的決定是直接手動設置的。如 job.setNumReduceTasks(3);默認值是 1見下圖,手動設置為3,表示運行3 個 reduceTask,如果設置為 0,表示不運行 reduceTask 任務,就是沒有 reducer 階段,只有 mapper 階段。
每一個reducetask對應一個 yarnchild。reducetas是?reduce 端運行任務的最小單位 ,一個reducetask只能運行在一個節點 ?一個節點上可以運行多個reducetask的
注意:①如果數據分布不均勻,就有可能在 reduce 階段產生數據傾斜;②reducetask 數量并不是任意設置,需要跟進業務邏輯需求進行設計,有些情況下,需要計算全局匯總結果,就只能有 1 個 reducetask
2.5.2 分區
分區簡單理解就是為了給reducetask做數據準備的 ,所以有幾個reducetask 在shuffle中就會分幾個分區
如果啟動多個reducetask,是不會造成同一個組的數據分散到不同的reducetask中。因為對map輸出的結果,在shuffle過程中進行數據分發的時候 ?有一個分發策略(分區算法),既可以按照reducetask的個數將數據分成對應的份數,又不會將map輸出的相同的key 進行分發到不同的reducetask中。那么默認的分區算法是怎樣的呢?
Partitioner的實現類如下:
當map端調用context.write(…)的時候,實際上是調用collector.collect(…)
方法中用到了numPartitions參數,說明前面已經對他賦值了,再往上找,找到如下代碼:
當numPartitions的個數大于1時,partitioner分區器通過反射得到,進入getPartitionerClass方法
由這方法可知當用戶寫了分區的自定義方法,那么通過反射即可實例化自定義類,否則使用系統自帶的類。即默認為HashPartitioner。
所以默認的分區算法如下:
所以這個分區算法為:mapkey .hash & Integer_max % 分區個數(reducetask的個數)。
經過分區之后 ,不同的分區的數據內部進行排序分組,最終這個數據給不同的reducetask進行處理。
2.5.3 自定義分區:
默認分區算法缺點:?沒有辦法進行制定對應的數據,到對應的分區中,所以如果要實現這個需求就需要自定義分區算法。
自定義分區步驟:
? ? ? ? ? ?①自定義一個類繼承,Partitioner
? ? ? ? ? ?②重寫getPartition()方法
? ? ? ? ? ?③在job中 ?制定自定義分區類
? ? ? ? ? ?? ? ? ? ? ?job.setPartitionerClass(MyPartition.class)
? ? ? ? ? ?④指定reducetask的個數,不指定默認值運行一個
? ? ? ? ? ?? ? ? ? ? ?job.setNumReduceTasks(3)? ?
注意:①自定義分區中分區的個數和reducetask的個數要一致;②分區編號一定 要和reducetask的編號對應,reducetask的編號從0開始順序遞增;③雖然自定義分區中,分區編號是可以自己定義返回值的,不一定要順序遞增,但是出于性能考慮 ?分區編號最好是順序遞增的;④reducetask設置和分區個數相同,否則必然有reducetask在執行空跑。
import java.util.HashMap; import java.util.Map;import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Partitioner; /*** @Description 根據課程名稱course進行分區* @author refuel* @version v1.0*/ public class CoursePartition extends Partitioner<CourseInfo, NullWritable> {Map<String,Integer> map = new HashMap<>();int number = 0; @Overridepublic int getPartition(CourseInfo key, NullWritable value, int numPartitions) { String course = key.getCourse();if(map.containsKey(course)) {return map.get(course);}else {map.put(course, number);return number++;} } } #驅動類中 job.setPartitionerClass(CoursePartition.class); job.setNumReduceTasks(4);3?hadoop中的自定義類型
在Hadoop中已經有一些內置的自定義類型如下:
| BooleanWritable | 布爾型數值 |
| ByteWritable | 單字節數值 |
| DoubleWritable | 雙字節數值 |
| FloatWritable | 浮點數 |
| IntWritable | 整型數 |
| LongWritable | 長整型數 |
| Text | 使用UTF8格式存儲的文本 |
| NullWritable | 當<key, value>中的key或value為空時使用 |
| ? | ? |
當這些內置的數據類型不能滿足我們的需求時,就需要自定義類型。自定義的類型必須具備序列化和反序列的能力,當另個進程通訊時,這些數據都會以二進制序列的形式在網絡上傳送,如果我們需要將Java對象進行傳輸的時候,也應該先將對象進行序列化,而java中的Serializable序列化會將類結構也序列化,不便于高效的進行數據傳輸,而這些在hadoop中是可以不需要,所以hadoop實現了一個自己的序列化接口Writable接口。
hadoop中的自定義類步驟:
? ? ? ? ? ?①實現Writable接口;
? ? ? ? ? ?②重新write(對象----》二進制 ?序列化)和readFields(? 二進制---》對象 ? 反序列化)方法。
注意:一定給無參構造并重寫toString
當自定義類型放在map輸出的key,必須同時具備排序能力Comparable接口和具備序列化反序列能力Writable接口,因為map輸出的key必須具備排序的能力。
hadoop中的自定義類作為map輸出的key步驟:
? ? ? ? ? ?①實現WritableComparable接口
? ? ? ? ? ?②重新compareTo()方法用來指定排序規則。重新write(對象----》二進制 ?序列化)和readFields(? 二進制---》對象 ? 反序列化)方法。
?
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;import org.apache.hadoop.io.WritableComparable; /*** @Description 不是作為map的key普通的自定義類型* @author refuel* @version v1.0*/ public class CourseInfo implements Writable {private String course;private int number;private double avg;public String getCourse() {return course;}public void setCourse(String course) {this.course = course;}public int getNumber() {return number;}public void setNumber(int number) {this.number = number;}public double getAvg() {return avg;}public void setAvg(double avg) {this.avg = avg;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(course);out.writeInt(number);out.writeDouble(avg);}@Overridepublic void readFields(DataInput in) throws IOException {this.course=in.readUTF();this.number=in.readInt();this.avg=in.readDouble();}@Overridepublic String toString() {return course +"\t" + number + "\t" + avg;}}?
4 MapReduce中的排序
MapReduce的排序發生在shuffle過程,默認安裝map輸出的key進行排序,如果map的key為Text類型則默認按照字典順序升序排序,如果為數值類型默認按照值從小到大排序。
默認類型只能進行單一元素的全排序,因為默認類型的排序方法compareTo方法已經定義好了,如果無法滿足需求就需要自定義類型進行排序,自定義類型必須實現WritableComparable接口(使其具備排序能力,序列化能力和反序列化能力)。
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;import org.apache.hadoop.io.WritableComparable; /*** @Description 作為map輸出的key的自定義類型* @author refuel* @version v1.0*/ public class Person implements WritableComparable<Person> {private String name;private String gender;private Integer age;private String dept;public Person() {super();}public Person(String name, String gender, Integer age, String dept) {super();this.name = name;this.gender = gender;this.age = age;this.dept = dept;}public void setPerson(String name, String gender, Integer age, String dept) {this.name = name;this.gender = gender;this.age = age;this.dept = dept;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getGender() {return gender;}public void setGender(String gender) {this.gender = gender;}public Integer getAge() {return age;}public void setAge(Integer age) {this.age = age;}public String getDept() {return dept;}public void setDept(String dept) {this.dept = dept;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(name);out.writeUTF(gender);out.writeInt(age);out.writeUTF(dept);}@Overridepublic void readFields(DataInput in) throws IOException {this.name= in.readUTF();this.gender = in.readUTF();this.age = in.readInt();this.dept = in.readUTF();}@Overridepublic int compareTo(Person o) {int deptStatus = o.getDept().compareTo(this.getDept());if(deptStatus==0) {int genderStatus = o.getGender().compareTo(this.getGender());if(genderStatus==0) {return o.getAge()-this.getAge();}return genderStatus;}return deptStatus;}}4 MapReduce中的分組
分組和分區類似,也是用來劃分數據集的,只不過更加細粒度,根據Map<key,value>中的key進行分組。在同一個分區中,相同key的值記錄是屬于同一個分組的,相當于groupby key的功能。
默認情況下,當map的key使用hadoop中的默認類型的時候,將mapkey 相同的分到一組;當map的key使用的是自定義類型的時候 ,是按照排序的字段進行分組的。因為分組在本質上也是一個比較的過程,分組默認調用的類是WritableComparator,用于比較的方法為compare(),源碼如下:
觀察源碼可知,底層調用map的key的comparaTo方法。
注意:排序關注的是大小,分組關注的是是否相等
所以默認類型中,map的key的comparaTo方法按照整個字符串或整個值進行比較大小,將整個串或值完全相同的分到一組。而自定義類型中,map的key的comparaTo方法返回0時才是同一組,即將map的key的所有的比較屬性都相同的分到一組。
默認情況下分組字段和排序的字段完全一致,當排序和分組規則不一致,就不能使用默認分組了,必須自定義分組了。
自定義分組步驟:
? ? ? ? ? ?①自定義一個類繼承WritableComparator接口,注意要在無參構造器中調用父類的構造方法并傳入自定義的map的key的類。
? ? ? ? ? ?②重寫compare分組方法
? ? ? ? ? ?③job中指定
注意:
①分組發生在排序之后的,compare比較的時候只會比較相鄰的,想要得到需要的分組,必須在排序階段將需要分組的數據排到一起。
②雖然在排序階段將需要分組的數據排到一起,是一定需要自定義分組,不然會將排序字段也作為分組的條件之一。如下例子中是將相同部門相同性別的分為同一組,按照年齡排序,如果僅在排序階段將需要分組的數據排到一起,那么就變成了相同部門相同性別相同年齡才為一組
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;import org.apache.hadoop.io.WritableComparable; /*** @Description 作為map輸出的key的自定義類型* @author refuel* @version v1.0*/ public class Person implements WritableComparable<Person> {private String name;private String gender;private Integer age;private String dept;public Person() {super();}public Person(String name, String gender, Integer age, String dept) {super();this.name = name;this.gender = gender;this.age = age;this.dept = dept;}public void setPerson(String name, String gender, Integer age, String dept) {this.name = name;this.gender = gender;this.age = age;this.dept = dept;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getGender() {return gender;}public void setGender(String gender) {this.gender = gender;}public Integer getAge() {return age;}public void setAge(Integer age) {this.age = age;}public String getDept() {return dept;}public void setDept(String dept) {this.dept = dept;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(name);out.writeUTF(gender);out.writeInt(age);out.writeUTF(dept);}@Overridepublic void readFields(DataInput in) throws IOException {this.name= in.readUTF();this.gender = in.readUTF();this.age = in.readInt();this.dept = in.readUTF();}@Overridepublic int compareTo(Person o) {int deptStatus = o.getDept().compareTo(this.getDept());if(deptStatus==0) {int genderStatus = o.getGender().compareTo(this.getGender());if(genderStatus==0) {return o.getAge()-this.getAge();}return genderStatus;}return deptStatus;}} import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator;public class DeptGrouping extends WritableComparator {public DeptGrouping() {super(Person.class,true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {Person p1 = (Person)a;Person p2 = (Person)b;int deptStatus = p1.getDept().compareTo(p2.getDept());if(deptStatus==0) {return p1.getGender().compareTo(p2.getGender());}return deptStatus;}} #驅動類中添加自定義分組類 job.setGroupingComparatorClass(DeptGrouping.class);5 MapReduce中的Combiner
Combiner:局部聚合組件,是優化組件 ,接受數據來自maptask,輸出數據給reducetask,默認不是shuffle的組件。
作用:減少shuffle過程的數據量,減少reduce端接受的數據量,提升性能
工作過程:對每一個maptask的輸出結果做一個局部聚合,聚合操作邏輯取決于reduce端的操作邏輯
Combiner實現步驟:
? ? ? ? ? ?①自定義一個類繼承 Reducer <maptask輸出,reducetask的輸入>,注意前兩個泛型== 后兩個泛型
? ? ? ? ? ?②重寫 reduce 方法
? ? ? ? ? ?③在job中指定
一般情況下,在實際開發過程中,當Reducer的代碼中 前兩個泛型== 后兩個泛型,? ?可以使用Reducer的代替Combiner的代碼 ,如果泛型不一致 不可以替代的。
注意:combiner之所以沒有默認在shuffle中是因為使用場景受限
| 適用場景 | 不適用場景 |
| max ,min,?sum等操作 | avg直接求平均 |
雖然combiner不適合直接求平均值,但是可以在combiner中統計sum和count,在reduce端求平均值這樣間接求得。
?
6 MapReduce中的Shuffle
MapReduce 中,mapper 階段處理的數據如何傳遞給 reducer 階段,的流程就叫 Shuffle(數據混洗),Shuffle是 MapReduce 框架中 最關鍵的一個流程。
Shuffle的核心機制包括:數據分區,排序,局部聚合,緩存,拉取,再合并,排序等。
MapReduce 執行流程圖如下:
MapReduce 執行流程解讀:(Shuffle流程為2-7步)
(1)每一個文件以block形式存儲在HDFS上,默認128M存3份,運行時每個maptask處理一個split,默認情況下一個split的大小與block塊的大小一樣,有多少個 block 就有多少個 map 任務,所以不同大小的文件會有不同個map任務進行并行計算;
(2)每個 maptask處理完輸入的 split 后會把結果寫入到內存的一個環形緩沖區,環形緩沖區的默認大小為 100M,閾值為0.8,當緩沖區的大小使用超過閥值,一個后臺的線程就會啟動把緩沖區中的數據溢寫 (spill)到本地磁盤中(mapred-site.xml:mapreduce.cluster.local.dir),將數據從內存中寫出到磁盤的過程就是溢寫,同 Mapper 繼續時向環形緩沖區中寫入數據;
(3)環形緩沖區的數據在溢寫到磁盤前會先按照分區編號進行排序,每個分區中的都數據會有后臺線程根據 map 任務的輸出結果 key進行內排序(字典順序、自然順序或自定義順序comparator),如果有combiner,它會在溢寫到磁盤之前對排好序的輸出數據上運行,最后在本地生成分好區且排好序的小文件; 如果預留20%寫滿了,但是80%的數據還沒有溢寫完成,整個數組處于阻塞狀態,阻塞到80%的空間釋放再次啟動,到閥值后會向本地磁盤新建一個溢寫文件;
(4)每個maptask完成之前,會把本地磁盤的所有溢寫文件不斷合并成得到一個結果文件,合并得到的結果文件會根據小溢寫文件的分區而分區,每個分區的數據會再次根據 key進行排序,得到的結果文件是分好區且排好序的,可以合并成一個文件的溢寫文件數量默認為 10(mapred-site.xml:mapreduce.task.io.sort.factor);這個結果文件的分區存在一個映射關系, 比如 0~1024 字節內容為 0 號分區內容,1025~2048?字節內容為 1 號分區內容等等;
(5)當有一個maptask完成就會啟動reducetask,reduce 任務進入fetch(復制)階段,reduce 任務通過 http 協議(hadoop內置了netty容器)把所有Mapper結果文件的對應的分區數據復制過來。Reducer可以并行復制Mapper的結果,默認線程數為 5個(mapred-site.xml:mapreduce.reduce.shuffle.parallelcopies)。Reducer 個數由 mapred-site.xml 的 mapreduce.job.reduces 配置決定, 或者初始化 job 時調用 Job.setNumReduceTasks(int);Reducer 中的一個線程定期向 MRAppMaster詢問Mapper輸出結果文件位置,mapper結束后會向MRAppMaster匯報信息; 從而 Reducer得知Mapper狀態,得到map結果文件目錄;
由于 Reducer可能會失敗,所有 Reducer 復制完成 map 結果文件后,NodeManager 并沒有在第一個 map結果文件復制完成后刪除它,直到作業完成后 MRAppMaster 通知 NodeManager 進行刪除;
(6)fetch(復制)階段完成后,Reducer 進入 Merge 階段,循環地合并 map 結果文件,并按map的key排序,合并因子默認為 10(mapred-site.xml:mapreduce.task.io.sort.factor),經過不斷地 Merge 后 得到一個“最終文件”,可能存儲在磁盤也可能存在內存中;
(7)在執行進入到reducer類中,會對合并并排序的最終文件進行分組;
(8)經過合并,排序,分組后的最終文件輸入到 reduce 進行計算,計算結果輸入到 HDFS。
?
環形緩沖區詳解:
Mapper任務執行完后的數據會通過MapOutputBuffer提交到一個kvbuffer緩沖區中,這個緩沖區的數據是專門存儲map端輸出數據的,它是一個環形緩沖區,大小可通過配置mapreduce.task.io.sort.mb來配置這個緩沖區的大小,默認是100MB。kvbuffer本質上是一個byte數組,模擬的環形數據結構,環形緩沖區適用于寫入和讀取的內容保持在順序的情況下,要不然就不能均勻的向前推進。
在Hadoop中數據要排序有個非常良好的策略,就是不移動數據本身,而是為每個數據建立一個元數據kvmeta,在排序的時候,直接對元數據進行排序,然后順序讀寫元數據即可。淫威每條元數據的大小是固定的4*4byte,即4個整數,所以讀取非常方便。
元數據:描述數據的數據,這的元數據描述的是原始數據在數組中存儲的位置,其主要構成如下:
? ? ? ? ? ?①分區編號partitioner,通過getPartition()得到
? ? ? ? ? ?②原始數據的key的起始下標
? ? ? ? ? ?③原始數據的value的起始下標
? ? ? ? ? ?④原始數據的value的長度
一條元數據的長度是固定的4*4byte,即4個整數,所有的元數據合并在一起就是一個元數據塊,相當于一個數組,可以通過KV對的元數據,再按照其元數據的指引就可找到這個KV對的K和V,還可以知道這個KV對屬于哪個Partition。
讀取配置文件,如下圖
根據上圖配置文件可知環形緩沖區的大小為100*1024*1024字節,將數據寫出到磁盤的閾值默認0.8。
緩存區的數據(數組的數據)達到一定的閾值,默認0.8時,開始寫入磁盤,將數據從內存中寫出到磁盤的過程就是溢寫。預留20%是為了在溢寫過程中持續的接受maptask的輸出數據,若這20%寫滿了,但是80%的數據還沒有溢寫完成,整個數組處于阻塞狀態,阻塞到80%的空間釋放。
注意:字節數組的整個寫入過程是收尾相連的,這是按環形緩沖區使用的,所以往里寫入內容時一旦超過終點就又“翻折”到緩沖區的起點,反之亦然,當80%的數據溢寫完成,元數據會做一個位置的調整,和原始數據形成一個背對背的結構,新的equator就形成了。
?
7 MapReduce的join
7.1 reduce join
多表關聯發生在reduce端,需要map端能夠同時讀取兩個數據,兩個表的關聯鍵相同的數據必須分到一組,所以map端輸出的key為相同的關聯鍵。
案例如下:
import java.io.IOException; import java.util.HashMap; import java.util.Map;import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; /*** @Description reduce join的map端實現* @author refuel* @version v1.0*/ public class ReduceJoinMap extends Mapper<LongWritable,Text,Text,Text> {Text k = new Text();Text v = new Text();String filename;int count =0;Map<String,Integer> map = new HashMap<>();@Overrideprotected void setup(Mapper<LongWritable, Text, Text, Text>.Context context)throws IOException, InterruptedException {FileSplit fileSplit = (FileSplit)context.getInputSplit();filename = fileSplit.getPath().getName();}@Overrideprotected void map(LongWritable key,Text value,Mapper<LongWritable,Text,Text,Text>.Context context) throws IOException, InterruptedException {if("movies.dat".equals(filename)) {String[] mwords = value.toString().split("::");k.set(mwords[0]);v.set("M"+mwords[1]+"\t"+mwords[2]);context.write(k, v);}else {String[] rwords = value.toString().split("::");k.set(rwords[1]);v.set("R"+rwords[0]+"\t"+rwords[1]+"\t"+rwords[2]+"\t"+rwords[3]);context.write(k, v);}} }import java.io.IOException; import java.util.ArrayList; import java.util.List;import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /*** @Description reduce join的reduce端實現* @author refuel* @version v1.0*/ public class ReduceJoinReduce extends Reducer<Text,Text, Text, NullWritable> {List<String> mlist=new ArrayList<String>();List<String> rlist=new ArrayList<String>();Text k = new Text();@Overrideprotected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, NullWritable>.Context context)throws IOException, InterruptedException {mlist.clear();rlist.clear();for(Text value:values) {String line = value.toString();if(line.startsWith("M")) {mlist.add(line);}else {rlist.add(line);}}for(String m:mlist) {for(String r:rlist) {k.set(r+"\t"+m);context.write(k, NullWritable.get());}}}}package com.refuel.homework.mapreduce.day09_3;import java.io.IOException;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/*** @Description reduce join的驅動類實現* @author refuel* @version v1.0*/ public class ReduceJoinDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(ReduceJoinDriver.class);job.setMapperClass(ReduceJoinMap.class);job.setReducerClass(ReduceJoinReduce.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);FileSystem fs = FileSystem.get(conf);Path path = new Path("E:\\test\\mapreduce\\out10");if(fs.exists(path)) {fs.delete(path, true);}FileInputFormat.setInputPaths(job, new Path("E:\\test\\mapreduce\\movies.dat"),new Path("E:\\test\\mapreduce\\ratings.dat"));FileOutputFormat.setOutputPath(job, path);boolean result = job.waitForCompletion(true);System.exit(result?0:1);}}reduce join有可能缺陷問題:數據傾斜。MapReduce中數據傾斜的本質其實是分區中數據分配極大不均勻。
當mapreduce 程序中有reduce 有可能產生數據傾斜,combiner是一種的數據傾斜的處理方式,但是不能絕對避免,避免數據傾斜需要調整分區算法
7.2 map join
多表關聯發生在map端。整個join過程在map端發生,只需要maptask不需要reducetask?
優勢: 有效避免join 數據傾斜;劣勢:緩存中的表不可過大,適合處理大小表關聯
實現思路:map() 一行調用一次,在map端只讀取一個文件(大文件),另一個文件(小文件)加載在每一個maptask運行節點的內存中,每當map端進行讀取一個文件的一行 ?就去內存中 找是否可以匹配另一個文件
實現步驟:
? ? ? ? ? ?①將小文件加載到本地緩存(磁盤)中job.addCacheFile(uri); ?將制定的路徑的文件,加載到每一個maptask的運行節點上;
? ? ? ? ? ?②setup方法中定義一個流,定義一個集合,流開始讀取,放在集合中;
? ? ? ? ? ?③map方法中讀取大文件,每次讀取一行,和內存集合中的數據做關聯,關聯完成寫出hdfs 。
注意:注意:只有maptask 時候,需要將reducetask設置0,job.setNumReduceTasks(0),否則默認運行一個Reducer 。?
案例如下:
import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.util.HashMap; import java.util.Map;import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;/*** @Description map join的map端實現* @author refuel* @version v1.0*/ public class MapJoinMap extends Mapper<LongWritable,Text,Text,NullWritable> {Text k = new Text();Map<String,String> map;@Overrideprotected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context)throws IOException, InterruptedException {Path[] localCacheFiles = context.getLocalCacheFiles();String cacheFile=localCacheFiles[0].toString();BufferedReader br = new BufferedReader(new FileReader(cacheFile));map = new HashMap<>();String line =null;while((line= br.readLine())!=null) {String[] word = line.split("::");map.put(word[0],word[1]+"\t"+word[2]);}}@Overrideprotected void map(LongWritable key,Text value,Mapper<LongWritable,Text,Text,NullWritable>.Context context) throws IOException, InterruptedException {String[] words = value.toString().split("::");if(map.containsKey(words[1])) {k.set(words[0]+"\t"+words[1]+"\t"+words[2]+"\t"+words[3]+"\t"+map.get(words[1]));context.write(k, NullWritable.get());}} }import java.io.IOException; import java.net.URI; import java.net.URISyntaxException;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/*** @Description map join的驅動類* @author refuel* @version v1.0*/ public class MapJoinDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(MapJoinDriver.class);job.setMapperClass(MapJoinMap.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);job.addCacheFile(new URI(args[0]));job.setNumReduceTasks(0); FileSystem fs = FileSystem.get(conf);Path path = new Path(args[2]);if(fs.exists(path)) {fs.delete(path, true);}FileInputFormat.addInputPath(job, new Path(args[1]));FileOutputFormat.setOutputPath(job, path);boolean result = job.waitForCompletion(true);System.exit(result?0:1);}}?
總結
以上是生活随笔為你收集整理的Hadoop学习之MapReduce的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: php 管理员表和用户表,求discuz
- 下一篇: Log4j框架配置文件