【源码分析】storm拓扑运行全流程源码分析
【源碼分析】storm拓?fù)溥\(yùn)行全流程源碼分析
@(STORM)[storm]
- 源碼分析storm拓?fù)溥\(yùn)行全流程源碼分析
- 一拓?fù)涮峤涣鞒?ul>
- 一stormpy
- 1storm jar
- 2def jar
- 3exec_storm_class
- 4get_classpath
- 二拓?fù)涮峤恢?ul>
- 1用戶代碼調(diào)用submitTopology
- 2StormSubmittersubmitTopologyWithProgressBar
- 3StormSubmittersubmitTopology
- 1加載配置
- 2使用NimbusClient提交拓?fù)?/li>
- 一概述
(一)storm.py
在這部分,請尤其注意classpath的設(shè)置。
依次將下列內(nèi)容加入classpath中:
詳見下面的分析。
1、storm jar
用戶可以通過storm jar命令向storm集群提交一個拓?fù)?#xff0c;如:
/home/hadoop/storm/bin/storm jar storm-starter-topologies-0.9.4.jar storm.starter.WordCountTopology word-count其實(shí),storm執(zhí)行的是bin/目錄下的storm.py文件
2、def jar
jar函數(shù)只有一行,就是執(zhí)行exec_storm_class函數(shù)。
def jar(jarfile, klass, *args):exec_storm_class(klass,jvmtype="-client",extrajars=[jarfile, USER_CONF_DIR, STORM_BIN_DIR],args=args,daemon=False,jvmopts=JAR_JVM_OPTS + ["-Dstorm.jar=" + jarfile])其中的幾個變量為:
USER_CONF_DIR = os.path.expanduser("~" + os.sep + ".storm") STORM_BIN_DIR = os.path.join(STORM_DIR, "bin")因此用戶jar包,~/.storm及$STORM_HOME/bin目錄下的jar包會被自動加載到classpath中。
3、exec_storm_class
def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[], fork=False, daemon=True, daemonName=""):global CONFFILEstorm_log_dir = confvalue("storm.log.dir",[CLUSTER_CONF_DIR])if(storm_log_dir == None or storm_log_dir == "nil"):storm_log_dir = os.path.join(STORM_DIR, "logs")all_args = [JAVA_CMD, jvmtype,"-Ddaemon.name=" + daemonName,get_config_opts(),"-Dstorm.home=" + STORM_DIR,"-Dstorm.log.dir=" + storm_log_dir,"-Djava.library.path=" + confvalue("java.library.path", extrajars, daemon),"-Dstorm.conf.file=" + CONFFILE,"-cp", get_classpath(extrajars, daemon),] + jvmopts + [klass] + list(args)print("Running: " + " ".join(all_args))if fork:os.spawnvp(os.P_WAIT, JAVA_CMD, all_args)elif is_windows():# handling whitespaces in JAVA_CMDsub.call(all_args)else:os.execvp(JAVA_CMD, all_args)可以看出,最后就是運(yùn)行一條java命令,主類是用戶main函數(shù)的類。
看一下classpath的設(shè)置。
4、get_classpath
def get_classpath(extrajars, daemon=True):ret = get_jars_full(STORM_DIR)ret.extend(get_jars_full(STORM_DIR + "/lib"))ret.extend(get_jars_full(STORM_DIR + "/extlib"))if daemon:ret.extend(get_jars_full(STORM_DIR + "/extlib-daemon"))if STORM_EXT_CLASSPATH != None:for path in STORM_EXT_CLASSPATH.split(os.pathsep):ret.extend(get_jars_full(path))if daemon and STORM_EXT_CLASSPATH_DAEMON != None:for path in STORM_EXT_CLASSPATH_DAEMON.split(os.pathsep):ret.extend(get_jars_full(path))ret.extend(extrajars)return normclasspath(os.pathsep.join(ret))依次將下列內(nèi)容加入classpath中:
"-Dstorm.jar=" + jarfile \$STORM_HOME\$STORM_HOME/lib\$STORM_HOME/extlib用戶代碼的jar包~/.storm\$STORM_HOME/bin1、用戶代碼調(diào)用submitTopology
用戶一般通過StormSubmitter.submitTopology提交拓?fù)?/p> if (args != null && args.length > 0) {conf.setNumWorkers(3);StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());}
這里使用了submitTopologyWithProgressBar,只是在submitTopology的基礎(chǔ)上增加了一些進(jìn)度信息,見下面代碼。
2、StormSubmitter.submitTopologyWithProgressBar
public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {// show a progress bar so we know we're not stuck (especially on slow connections)submitTopology(name, stormConf, topology, opts, new StormSubmitter.ProgressListener() {@Overridepublic void onStart(String srcFile, String targetFile, long totalBytes) {System.out.printf("Start uploading file '%s' to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes);}@Overridepublic void onProgress(String srcFile, String targetFile, long bytesUploaded, long totalBytes) {int length = 50;int p = (int)((length * bytesUploaded) / totalBytes);String progress = StringUtils.repeat("=", p);String todo = StringUtils.repeat(" ", length - p);System.out.printf("\r[%s%s] %d / %d", progress, todo, bytesUploaded, totalBytes);}@Overridepublic void onCompleted(String srcFile, String targetFile, long totalBytes) {System.out.printf("\nFile '%s' uploaded to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes);}});}本質(zhì)上就是調(diào)用submitTopology方法,同時在start, progress和complete階段輸出一些信息。
3、StormSubmitter.submitTopology
@SuppressWarnings("unchecked") public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts,ProgressListener progressListener) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {submitTopologyAs(name, stormConf, topology, opts, progressListener, null); }StormSubmitter.submitTopology其實(shí)就是調(diào)用StormSubmitter.submitTopologyAs。下面我們詳細(xì)分析一下StormSubmitter.submitTopologyAs
(二)提交拓?fù)渲?#xff1a;StormSubmitter.submitTopologyAs
1、加載配置
在submitTopologyAs中,第一件事就是將拓?fù)涞呐渲眉虞d到一個HashMap中
if(!Utils.isValidConf(stormConf)) {throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");}stormConf = new HashMap(stormConf);stormConf.putAll(Utils.readCommandLineOpts());Map conf = Utils.readStormConfig();conf.putAll(stormConf);stormConf.putAll(prepareZookeeperAuthentication(conf));上述代碼完成了以下功能:
(1)檢查拓?fù)鋫鬟M(jìn)來的conf是否有效,是否能json化,然后將其轉(zhuǎn)換為HashMap。這里的conf是用戶在建立拓?fù)鋾r通過以下類似代碼傳進(jìn)來的:
(2)將命令行中的參數(shù)加載進(jìn)stormConf中
(3)調(diào)用readStormConfig,加載配置文件中的內(nèi)容:
先加載defaults.yaml, 然后再加載storm.yaml
(4)最后,加載zk認(rèn)證相關(guān)信息。
(5)除此之外,還可以組件中覆蓋getComponentConfiguration方法以修改其組件的配置。
(6)最后,還可以使用spoutDeclare與boltDeclare設(shè)置外部組件。
注意,這里有conf和stormConf2個變量,conf才是全部的配置,stormConf不包括defaults.yaml和storm.yaml。先將用戶配置加載到stormConf,然后將defaults.yaml和storm.yaml回到conf,最后將stormConf加載到conf.
二、 拓?fù)溥\(yùn)行流程
(一)概述
拓?fù)鋽?shù)據(jù)流如下:
1、Spout讀取或者產(chǎn)生數(shù)據(jù)
2、通過netty/ZMQ將數(shù)據(jù)從所在的worker發(fā)送到下一個Executor所在的worker(如果下一個Executor與spout的executor在同一個worker,則直接發(fā)送到自身worker內(nèi)部的Disruptor Queue)
3、worker根據(jù)TaskId將消息放入Executor的輸入Disruptor Queue中
4、Executor處理完數(shù)據(jù)后,將其放到自身的輸出Disruptor Queue中
5、然后Executor還會啟動一個線程將輸出Disruptor Queue中的內(nèi)容通過netty發(fā)送到其它worker中,或者直接發(fā)送至其它Executor相對應(yīng)的輸入Disruptor Queue(源executor與目標(biāo)executor在同一個worker的情況)。
6、如此循環(huán)3~5步驟,直至所有executor都處理完成數(shù)據(jù)。
executor的執(zhí)行方式是一個典型的生產(chǎn)者消費(fèi)者模式
總結(jié)
以上是生活随笔為你收集整理的【源码分析】storm拓扑运行全流程源码分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: storm目录结构及在zk中的目录结构
- 下一篇: trident API指南