Hadoop源码分析(四)
2021SC@SDUSC
研究內容簡略介紹
上周我們分析了org.apache.hadoop.mapreduce.Cluster中的的核心代碼,本周將繼續對mapreduce部分進行分析。在對Cluster類有初步了解的基礎上,我們繼續分析與Cluster相關的org.apache.hadoop.mapreduce.ClusterMetrics。
hadoop.mapreduce.ClusterMetrics分析
我們首先來看ClusterMetrics的全局變量與構造方法。
@InterfaceAudience.Public @InterfaceStability.Evolving public class ClusterMetrics implements Writable {private int runningMaps;private int runningReduces;private int occupiedMapSlots;private int occupiedReduceSlots;private int reservedMapSlots;private int reservedReduceSlots;private int totalMapSlots;private int totalReduceSlots;private int totalJobSubmissions;private int numTrackers;private int numBlacklistedTrackers;private int numGraylistedTrackers;private int numDecommissionedTrackers;public ClusterMetrics() {}public ClusterMetrics(int runningMaps, int runningReduces,int occupiedMapSlots, int occupiedReduceSlots, int reservedMapSlots,int reservedReduceSlots, int mapSlots, int reduceSlots,int totalJobSubmissions, int numTrackers, int numBlacklistedTrackers,int numDecommissionedNodes) {this(runningMaps, runningReduces, occupiedMapSlots, occupiedReduceSlots,reservedMapSlots, reservedReduceSlots, mapSlots, reduceSlots,totalJobSubmissions, numTrackers, numBlacklistedTrackers, 0,numDecommissionedNodes);}public ClusterMetrics(int runningMaps, int runningReduces,int occupiedMapSlots, int occupiedReduceSlots, int reservedMapSlots,int reservedReduceSlots, int mapSlots, int reduceSlots,int totalJobSubmissions, int numTrackers, int numBlacklistedTrackers,int numGraylistedTrackers, int numDecommissionedNodes) {this.runningMaps = runningMaps;this.runningReduces = runningReduces;this.occupiedMapSlots = occupiedMapSlots;this.occupiedReduceSlots = occupiedReduceSlots;this.reservedMapSlots = reservedMapSlots;this.reservedReduceSlots = reservedReduceSlots;this.totalMapSlots = mapSlots;this.totalReduceSlots = reduceSlots;this.totalJobSubmissions = totalJobSubmissions;this.numTrackers = numTrackers;this.numBlacklistedTrackers = numBlacklistedTrackers;this.numGraylistedTrackers = numGraylistedTrackers;this.numDecommissionedTrackers = numDecommissionedNodes;}可以看出在ClusterMetrics中定義了許多的與cluster相關的變量,例如:
集群的大小。
列入黑名單和退役的跟蹤器數量。
集群的槽位容量。
當前占用/保留的map和reduce槽的數量。
當前運行的 map 和 reduce 任務的數量。
作業提交的數量。
因此不難推測ClusterMetrics主要是用于記錄cluster的相關信息,提供給用戶。
在官網提供的api中也可以看到對ClusterMetrics類的描述。
Status information on the current state of the Map-Reduce cluster.,即用于記錄Map-Reduce cluster的具體信息,在上次一的博客中我們已經學習了cluster的相關內容,這里便不展開贅述。
我們還可以看到Clients can query for the latest ClusterMetrics, via Cluster.getClusterStatus(),也就是通過調用getClusterStatus即可得到對應的ClusterMetrics信息。
關于ClusterMetrics就大致看到這里,接下來我們繼續學習mapreduce中的其他類。
org.apache.hadoop.mapreduce.Counters源碼分析
Counters是mapreduce中極其重要的一個類。
計數器(Counter)是 MapReduce 應用程序報告其統計數據的設施。Mapper 和 Reducer 實現可以使用計數器報告統計數據。
MapReduce Counter為提供我們一個窗口:觀察MapReduce job運行期的各種細節數據。MapReduce自帶了許多默認Counter。
Counters 是全局計數器,由MapReduce框架或者Application定義。每一個Counter可以是任何枚舉類型。特定計數器按枚舉類型進行分組。
用戶可以在map/reduce方法里通過context.getCounter(Enum<?> counterName)來獲取定義好的計數器。然后通過counter.increment(long incr)的方式計數。
MapReduce框架本身就提供了很多內置的計數器,如File System Counters、Job Counters、Map-Reduce Framework、File Input Format Counters、File Output Format Counters??稍谶\行MR任務后的控制臺打印信息里看到,如下圖。其中SecondarySort Counters是自定義的Counter,不是MR自帶的。
我們首先附上整個Counters類的源碼,以便進一步分析。
/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/ package org.apache.hadoop.mapreduce;import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.counters.Limits; import org.apache.hadoop.mapreduce.counters.GenericCounter; import org.apache.hadoop.mapreduce.counters.AbstractCounterGroup; import org.apache.hadoop.mapreduce.counters.CounterGroupBase; import org.apache.hadoop.mapreduce.counters.FileSystemCounterGroup; import org.apache.hadoop.mapreduce.counters.AbstractCounters; import org.apache.hadoop.mapreduce.counters.CounterGroupFactory; import org.apache.hadoop.mapreduce.counters.FrameworkCounterGroup;/*** <p><code>Counters</code> holds per job/task counters, defined either by the* Map-Reduce framework or applications. Each <code>Counter</code> can be of* any {@link Enum} type.</p>** <p><code>Counters</code> are bunched into {@link CounterGroup}s, each* comprising of counters from a particular <code>Enum</code> class.*/ @InterfaceAudience.Public @InterfaceStability.Stable public class Counters extends AbstractCounters<Counter, CounterGroup> {// Mix framework group implementation into CounterGroup interfaceprivate static class FrameworkGroupImpl<T extends Enum<T>>extends FrameworkCounterGroup<T, Counter> implements CounterGroup {FrameworkGroupImpl(Class<T> cls) {super(cls);}@Overrideprotected FrameworkCounter<T> newCounter(T key) {return new FrameworkCounter<T>(key, getName());}@Overridepublic CounterGroupBase<Counter> getUnderlyingGroup() {return this;}}// Mix generic group implementation into CounterGroup interface// and provide some mandatory group factory methods.private static class GenericGroup extends AbstractCounterGroup<Counter>implements CounterGroup {GenericGroup(String name, String displayName, Limits limits) {super(name, displayName, limits);}@Overrideprotected Counter newCounter(String name, String displayName, long value) {return new GenericCounter(name, displayName, value);}@Overrideprotected Counter newCounter() {return new GenericCounter();}@Overridepublic CounterGroupBase<Counter> getUnderlyingGroup() {return this;}}// Mix file system group implementation into the CounterGroup interfaceprivate static class FileSystemGroup extends FileSystemCounterGroup<Counter>implements CounterGroup {@Overrideprotected Counter newCounter(String scheme, FileSystemCounter key) {return new FSCounter(scheme, key);}@Overridepublic CounterGroupBase<Counter> getUnderlyingGroup() {return this;}}/*** Provide factory methods for counter group factory implementation.* See also the GroupFactory in* {@link org.apache.hadoop.mapred.Counters mapred.Counters}*/private static class GroupFactoryextends CounterGroupFactory<Counter, CounterGroup> {@Overrideprotected <T extends Enum<T>>FrameworkGroupFactory<CounterGroup>newFrameworkGroupFactory(final Class<T> cls) {return new FrameworkGroupFactory<CounterGroup>() {@Override public CounterGroup newGroup(String name) {return new FrameworkGroupImpl<T>(cls); // impl in this package}};}@Overrideprotected CounterGroup newGenericGroup(String name, String displayName,Limits limits) {return new GenericGroup(name, displayName, limits);}@Overrideprotected CounterGroup newFileSystemGroup() {return new FileSystemGroup();}}private static final GroupFactory groupFactory = new GroupFactory();/*** Default constructor*/public Counters() {super(groupFactory);}/*** Construct the Counters object from the another counters object* @param <C> the type of counter* @param <G> the type of counter group* @param counters the old counters object*/public <C extends Counter, G extends CounterGroupBase<C>>Counters(AbstractCounters<C, G> counters) {super(counters, groupFactory);} }那么Counters類是用來做什么的呢?我們首先來看官方對Class Counters的定義:
可以看出,Counters包括了每個作業/任務計數器,由 Map-Reduce 框架或應用程序定義。每個Counter都可以是任何Enum類型。
Counters被聚集成CounterGroups,每個都包含來自特定Enum類的計數器。
我們看到,Counters繼承自AbstractCounters類
打開AbstractCounters相關源碼,它其實是一個抽象類,用于為 mapred 和 mapreduce 包中的 Counters 容器提供通用實現。
它提供的函數也不難理解,主要是為了方便計數器的使用,包括構造計數器、統計組內計數器總數、返回計數器名稱等基本功能。
Counters構造函數分析
Counters提供了兩個構造函數,分別是有參和無參構造:
有參構造器利用了我們上面提到的AbstractCounters提供的計數器構造方法,從另一個計數器對象構造新的計數器對象。別傳入兩個參數,一個是AbstractCounters,一個是舊的counters對象。AbstractCounters <C,G>中的C -指計數器的類型,G -指計數器組的類型。
我們上面提到了計數器組的概念,那么什么是計數器組呢?通過查詢相關資料,我了解到
Counter有"組group"的概念,用于表示邏輯上相同范圍的所有數值。MapReduce job提供的默認Counter分為三個組:
Map-Reduce Frameword
Map input records,Map skipped records,Map input bytes,Map output records,Map output bytes,Combine input records,Combine output records,Reduce input records,Reduce input groups,Reduce output records,Reduce skipped groups,Reduce skipped records,Spilled records
File Systems
FileSystem bytes read,FileSystem bytes written
Job Counters
Launched map tasks,Launched reduce tasks,Failed map tasks,Failed reduce tasks,Data-local map tasks,Rack-local map tasks,Other local map tasks
Counters的實際使用
了解了counters的概念與構造函數,那么我們希望進一步了解counters的使用方法。
比如,用戶可能想快速實現文件行數,以及其中錯誤記錄的統計。
為了使用這樣的特性,用戶代碼創建一個叫作 Counter 的對象,并且在適當的時候,Map 和 Reduce 函數中增加 Counter 的值。
這些 Counter 的值,會定時從各個單獨的 Worker 機器上傳遞給 Master(通過 Ping 的應答包傳遞)。
Master 把執行成功的 Map 或者 Reduce 任務的 Counter 值進行累計,并且當 MapReduce 操作完成之后,返回給用戶代碼。
當前 Counter 值也會顯示在 Master 的狀態頁面,這樣用戶可以看到計算現場的進度。
當累計 Counter 的值的時侯, Master 會檢查是否有對同一個 Map 或者 Reduce 任務的相同累計,避免重復累計。
下面的代碼就可以通過counters實現這樣的一個目標。
package com.shockang.study.bigdata.mapreduce.counter;import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class MyMapperWithCounter extends Mapper<LongWritable, Text, LongWritable, IntWritable> {/*** 定義一個枚舉類型*/public static enum FileRecorder {ErrorRecorder,TotalRecorder}@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {if ("error".equals(value.toString())) {/*** 把counter實現累加*/context.getCounter(FileRecorder.ErrorRecorder).increment(1);}/*** 把counter實現累加*/context.getCounter(FileRecorder.TotalRecorder).increment(1);} } package com.shockang.study.bigdata.mapreduce.counter;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class JobMain {public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();/*** 使NLineInputFormat來分割一個小文件,近而模擬分布式大文件的處理*/configuration.setInt("mapreduce.input.lineinputformat.linespermap", 5);Job job = new Job(configuration, "counter-job");job.setInputFormatClass(NLineInputFormat.class);job.setJarByClass(JobMain.class);job.setMapperClass(MyMapperWithCounter.class);job.setMapOutputKeyClass(LongWritable.class);job.setMapOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job, new Path(args[0]));Path outputDir = new Path(args[1]);FileSystem fs = FileSystem.get(configuration);if (fs.exists(outputDir)) {fs.delete(outputDir, true);}FileOutputFormat.setOutputPath(job, outputDir);if (job.waitForCompletion(true)) {System.out.println("Error num:" + job.getCounters().findCounter(MyMapperWithCounter.FileRecorder.ErrorRecorder).getValue());System.out.println("Total num:" + job.getCounters().findCounter(MyMapperWithCounter.FileRecorder.TotalRecorder).getValue());}} }總結
本次我們首先分析了ClusterMetrics類,完善了對Cluster集群類的了解,然后開始了對mapreduce核心類counters計數器的分析,初步探討了它的作用、構造函數,并通過編寫簡單的自定義Counters完成了對文件錯誤記錄與全部記錄的統計,加深了了解。
總結
以上是生活随笔為你收集整理的Hadoop源码分析(四)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何在大型商业银行研发中心发挥PMO作用
- 下一篇: 无稳态多谐振荡器原理