日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

如何编写Hadoop调度器

發布時間:2025/3/21 编程问答 16 豆豆
生活随笔 收集整理的這篇文章主要介紹了 如何编写Hadoop调度器 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
1. 編寫目的

在Hadoop中,調度器是一個可插拔的模塊,用戶可以根據自己的實際應用要求設計調度器,然后在配置文件中指定相應的調度器,這樣,當Hadoop集群啟動時,便會加載該調度器。當前Hadoop自帶了幾種調度器,分別是FIFO(默認調度器),Capacity Scheduler和FairScheduler,通常境況下,這些調度器很難滿足公司復雜的應用需求,因而往往需要開發自己的調度器。本文介紹了Hadoop調度器的基本編寫方法。

2. Hadoop調度框架

Hadoop的調度器是在JobTracker中加載和調用的,用戶可以在配置文件mapred-site.xml中的mapred.jobtracker.taskScheduler屬性中指定調度器。本節分析了Hadoop調度器的調度框架,實際上分析了兩個重要類:TaskScheduler和JobTracker的關系。

(1) TaskScheduler

如果用戶要編寫自己的調度器,需要繼承抽象類TaskScheduler,該類的接口如下:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 abstract class TaskScheduler implements Configurable { protected Configuration conf; //配置文件 protected TaskTrackerManager taskTrackerManager; //一般會設為JobTracker public Configuration getConf() { ??return conf; } public void setConf(Configuration conf) { ??this.conf = conf; } public synchronized void setTaskTrackerManager( TaskTrackerManager taskTrackerManager) { ??this.taskTrackerManager = taskTrackerManager; } public void start() throws IOException { //初始化函數,如加載配置文件等 ??// do nothing } public void terminate() throws IOException { //結束函數 // do nothing } //最重要的函數,為該taskTracker分配合適的task public abstract List<Task> assignTasks(TaskTrackerStatus taskTracker) throws IOException; ??//根據隊列名字獲job列表 public abstract Collection<JobInProgress> getJobs(String queueName); }

(2) JobTracker

JobTracker是Hadoop最核心的組件,它監控整個集群中的作業運行情況并對資源進行管理和調度。

每個TaskTracker每個3s(默認值,可配置)通過heartbeat向JobTracker匯報自己管理的機器的一些基本信息,包括內存使用量,內存剩余量,正在運行的task,空閑的slot數目等,一旦JobTracker發現該TaskTracker出現了空閑的slot,便會調用調度器中的AssignTasks方法為該TaskTracker分配task。

下面分析JobTracker調用TaskScheduler的具體流程:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 …… private final TaskScheduler taskScheduler; //聲明調度器對象 …… public static JobTracker startTracker(JobConf conf, String identifier) { ??……. ??result = new JobTracker(conf, identifier); ??result.taskScheduler.setTaskTrackerManager(result); //設置調度器的manager ??…… } //創建調度器 JobTracker(JobConf conf, String identifier) { ??…… ??// Create the scheduler ??Class<? extends TaskScheduler> schedulerClass ??= conf.getClass("mapred.jobtracker.taskScheduler", ????JobQueueTaskScheduler.class, TaskScheduler.class); ??taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf); ??….. } //run forever public void offerService() { ??…… ??taskScheduler.start(); //啟動調度器 ??…… } 。。。。。 HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean restarted, boolean initialContact, boolean acceptNewTasks, short responseId) { ??……. ??// Check for new tasks to be executed on the tasktracker ??if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) { ????…… ????//使用調度器,為該taskTracker分配作業 ????tasks = taskScheduler.assignTasks(taskTrackerStatus); ????…… ??} }

從上面的分析可以知道,Scheduler和JobTracker之間會相互包含(實際上是組合模式),Scheduler中要包含JobTracker(實際上就是TaskTrackerManager)對象,以便獲取整個Hadoop集群的一些信息,如slot總數,QueueManager對象,添加JobInProgressListener以便增加或刪除job時,通知Scheduler;JobTracker中要包含Scheduler對象,以便可以對每個TaskTracker分配task。

3. 編寫Hadoop調度器

假設我們要編寫一個新的調度器,為MyHadoopScheduler,需要進行以下工作:

(1) 用戶需要自己實現的類

@ MyHadoopSchedulerConf:配置文件管理類,讀取你自己的配置文件,并保存到合適的數據結構中,一般而言,這個類應該支持動態加載配置文件。

@ MyHadoopSchedulerListener:編寫自己的JobInProgressListener,并調用JobTracker的addJobInProgressListener(),將之加到系統的Listener隊列中,以便系統中添加或刪除job后,JobTracker可立刻告訴調度器。

@ MyHadoopScheduler:調度器的核心實現算法

(2) 用戶要用到的系統類

@?JobTracker:JobTracker在startTracker函數中,會將MyHadoopScheduler的taskTrackerManager賦值為JobTracker對象,這樣,在MyHadoopScheduler中,可調用Jobracker中的所有public方法和成員變量,常用的有:

$ getClusterStatus():獲取集群的狀態,如tasktracker列表,map slot總數,reduce slot總數,當前正在運行的map/reduce task總數等

$ getQueueManager():如果MyHadoopScheduler支持多隊列,那么需要使用該方法獲取QueueManager對象,通過該對象,會用可以獲取系統的所有隊列名稱,每個隊列的ACL(Access Control List),具體參考:http://hadoop.apache.org/common/docs/current/service_level_auth.html

$ killJob:可以調用該函數殺死某個job

$ killTask:如果調度器支持資源搶占,可調用該函數 殺死某個task以便進行資源搶占。

@?JobInprogress:用戶向Hadoop中提交一個job后,Hadoop會為該job創建一個叫JobInProgress的對象,該對象中包含了job相關的基本信息,且它會伴隨某個job的一生(與job共存亡)。該對象中包含的job信息有:該job包含的所有task的信息(如:正在運行的task列表,已經完成的task列表,尚未運行的task列表等),作業的優先級,作業的提交時間,開始運行時間,運行結束時間等信息。

在JobInprogress的task列表中,每個task以對象TaskInProgress的形式保存,該對象中包含了每個task的基本信息,包括:task要處理的數據split,task創建時間,task開始執行時間,task結束時間等信息。這些信息肯定會在調度器中使用。

@?JobConf

每個作業的運行參數和配置選項被保存到一個JobConf對象中,該對象包含了配置文件mapred-site.xml,core-site.xml和hdfs-site.xml設置的選項和該作業的特有屬性(用戶名,InputFormat,Mapper等),一般是以key/value的形式保存,比如:想獲取當前用戶名,可以這樣:

1 2 3 4 5 JobConf conf; ……. String username = conf.get("user.name");

用戶也可以通過該對象傳遞一些自己定義的全局屬性,如用戶自己定義了一個屬性叫mapred.job.deadline(作業的deadline時間),用戶可以在提交作業時設定該值:

hadoop jar hadoop-examples.jar wordcount -files cachefile.txt \

-D mapred.job.deadline=100000 \

input output

然后在調度器中這樣獲取該屬性的值:

1 2 3 4 5 JobConf conf; ……. int deadline=conf.getInt("mapred.job.deadline", -1); //獲取mapred.job.deadline屬性,如果沒有設置,則返回-1

4. 總結

調度器是Hadoop的中樞,其重要性可想而知。用戶如果要設計Hadoop調度器,需要對Hadoop的整個框架有比較深入的理解,同時需閱讀一些很重要的類(如JobTracker和JobInprogress等)的源碼,以便利用這些類完成你的調度算法。

Hadoop目前自帶了三個比較常用的調度器,分別為JobQueueTaskScheduler (FIFO,但隊列調度器),Capacity Scheduler(多隊列多用戶調度器)和Fair Scheduler(多隊列多用戶調度器),它們是你學習Hadoop調度器的最好資料。

5. 參考資料

(1) Hadoop-0.20.2源代碼

原創文章,轉載請注明:?轉載自董的博客

本文鏈接地址:?http://dongxicheng.org/mapreduce/how-to-write-hadoop-schedulers/

總結

以上是生活随笔為你收集整理的如何编写Hadoop调度器的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。