日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

【源码分析】storm拓扑运行全流程源码分析

發(fā)布時間:2024/1/23 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【源码分析】storm拓扑运行全流程源码分析 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

【源碼分析】storm拓?fù)溥\(yùn)行全流程源碼分析

@(STORM)[storm]

  • 源碼分析storm拓?fù)溥\(yùn)行全流程源碼分析
  • 一拓?fù)涮峤涣鞒?ul>
  • 一stormpy
    • 1storm jar
    • 2def jar
    • 3exec_storm_class
    • 4get_classpath
  • 二拓?fù)涮峤恢?ul>
  • 1用戶代碼調(diào)用submitTopology
  • 2StormSubmittersubmitTopologyWithProgressBar
  • 3StormSubmittersubmitTopology
  • 二提交拓?fù)渲tormSubmittersubmitTopologyAs
    • 1加載配置
    • 2使用NimbusClient提交拓?fù)?/li>
  • 二 拓?fù)溥\(yùn)行流程
    • 一概述
  • (一)storm.py

    在這部分,請尤其注意classpath的設(shè)置。
    依次將下列內(nèi)容加入classpath中:

    \$STORM_HOME\$STORM_HOME/lib\$STORM_HOME/extlib用戶代碼的jar包~/.storm\$STORM_HOME/bin

    詳見下面的分析。

    1、storm jar

    用戶可以通過storm jar命令向storm集群提交一個拓?fù)?#xff0c;如:

    /home/hadoop/storm/bin/storm jar storm-starter-topologies-0.9.4.jar storm.starter.WordCountTopology word-count

    其實(shí),storm執(zhí)行的是bin/目錄下的storm.py文件

    2、def jar

    jar函數(shù)只有一行,就是執(zhí)行exec_storm_class函數(shù)。

    def jar(jarfile, klass, *args):exec_storm_class(klass,jvmtype="-client",extrajars=[jarfile, USER_CONF_DIR, STORM_BIN_DIR],args=args,daemon=False,jvmopts=JAR_JVM_OPTS + ["-Dstorm.jar=" + jarfile])

    其中的幾個變量為:

    USER_CONF_DIR = os.path.expanduser("~" + os.sep + ".storm") STORM_BIN_DIR = os.path.join(STORM_DIR, "bin")

    因此用戶jar包,~/.storm及$STORM_HOME/bin目錄下的jar包會被自動加載到classpath中。

    3、exec_storm_class

    def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[], fork=False, daemon=True, daemonName=""):global CONFFILEstorm_log_dir = confvalue("storm.log.dir",[CLUSTER_CONF_DIR])if(storm_log_dir == None or storm_log_dir == "nil"):storm_log_dir = os.path.join(STORM_DIR, "logs")all_args = [JAVA_CMD, jvmtype,"-Ddaemon.name=" + daemonName,get_config_opts(),"-Dstorm.home=" + STORM_DIR,"-Dstorm.log.dir=" + storm_log_dir,"-Djava.library.path=" + confvalue("java.library.path", extrajars, daemon),"-Dstorm.conf.file=" + CONFFILE,"-cp", get_classpath(extrajars, daemon),] + jvmopts + [klass] + list(args)print("Running: " + " ".join(all_args))if fork:os.spawnvp(os.P_WAIT, JAVA_CMD, all_args)elif is_windows():# handling whitespaces in JAVA_CMDsub.call(all_args)else:os.execvp(JAVA_CMD, all_args)

    可以看出,最后就是運(yùn)行一條java命令,主類是用戶main函數(shù)的類。
    看一下classpath的設(shè)置。

    4、get_classpath

    def get_classpath(extrajars, daemon=True):ret = get_jars_full(STORM_DIR)ret.extend(get_jars_full(STORM_DIR + "/lib"))ret.extend(get_jars_full(STORM_DIR + "/extlib"))if daemon:ret.extend(get_jars_full(STORM_DIR + "/extlib-daemon"))if STORM_EXT_CLASSPATH != None:for path in STORM_EXT_CLASSPATH.split(os.pathsep):ret.extend(get_jars_full(path))if daemon and STORM_EXT_CLASSPATH_DAEMON != None:for path in STORM_EXT_CLASSPATH_DAEMON.split(os.pathsep):ret.extend(get_jars_full(path))ret.extend(extrajars)return normclasspath(os.pathsep.join(ret))

    依次將下列內(nèi)容加入classpath中:

    "-Dstorm.jar=" + jarfile \$STORM_HOME\$STORM_HOME/lib\$STORM_HOME/extlib用戶代碼的jar包~/.storm\$STORM_HOME/bin

    1、用戶代碼調(diào)用submitTopology

    用戶一般通過StormSubmitter.submitTopology提交拓?fù)?/p> if (args != null && args.length > 0) {conf.setNumWorkers(3);StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());}

    這里使用了submitTopologyWithProgressBar,只是在submitTopology的基礎(chǔ)上增加了一些進(jìn)度信息,見下面代碼。

    2、StormSubmitter.submitTopologyWithProgressBar

    public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {// show a progress bar so we know we're not stuck (especially on slow connections)submitTopology(name, stormConf, topology, opts, new StormSubmitter.ProgressListener() {@Overridepublic void onStart(String srcFile, String targetFile, long totalBytes) {System.out.printf("Start uploading file '%s' to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes);}@Overridepublic void onProgress(String srcFile, String targetFile, long bytesUploaded, long totalBytes) {int length = 50;int p = (int)((length * bytesUploaded) / totalBytes);String progress = StringUtils.repeat("=", p);String todo = StringUtils.repeat(" ", length - p);System.out.printf("\r[%s%s] %d / %d", progress, todo, bytesUploaded, totalBytes);}@Overridepublic void onCompleted(String srcFile, String targetFile, long totalBytes) {System.out.printf("\nFile '%s' uploaded to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes);}});}

    本質(zhì)上就是調(diào)用submitTopology方法,同時在start, progress和complete階段輸出一些信息。

    3、StormSubmitter.submitTopology

    @SuppressWarnings("unchecked") public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts,ProgressListener progressListener) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {submitTopologyAs(name, stormConf, topology, opts, progressListener, null); }

    StormSubmitter.submitTopology其實(shí)就是調(diào)用StormSubmitter.submitTopologyAs。下面我們詳細(xì)分析一下StormSubmitter.submitTopologyAs

    (二)提交拓?fù)渲?#xff1a;StormSubmitter.submitTopologyAs

    1、加載配置

    在submitTopologyAs中,第一件事就是將拓?fù)涞呐渲眉虞d到一個HashMap中

    if(!Utils.isValidConf(stormConf)) {throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");}stormConf = new HashMap(stormConf);stormConf.putAll(Utils.readCommandLineOpts());Map conf = Utils.readStormConfig();conf.putAll(stormConf);stormConf.putAll(prepareZookeeperAuthentication(conf));

    上述代碼完成了以下功能:
    (1)檢查拓?fù)鋫鬟M(jìn)來的conf是否有效,是否能json化,然后將其轉(zhuǎn)換為HashMap。這里的conf是用戶在建立拓?fù)鋾r通過以下類似代碼傳進(jìn)來的:

    Config config = new Config();config.put(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS, 200);config.setNumWorkers(topoNumWorker);config.setMaxTaskParallelism(20);config.put(Config.NIMBUS_HOST, nimbusHost);config.put(Config.NIMBUS_THRIFT_PORT, 6627);config.put(Config.STORM_ZOOKEEPER_PORT, 2181);config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(zk));config.put(Config.TOPOLOGY_NAME, topologyName);

    (2)將命令行中的參數(shù)加載進(jìn)stormConf中
    (3)調(diào)用readStormConfig,加載配置文件中的內(nèi)容:

    public static Map readStormConfig() {Map ret = readDefaultConfig();String confFile = System.getProperty("storm.conf.file");Map storm;if (confFile==null || confFile.equals("")) {storm = findAndReadConfigFile("storm.yaml", false);} else {storm = findAndReadConfigFile(confFile, true);}ret.putAll(storm);ret.putAll(readCommandLineOpts());return ret; }

    先加載defaults.yaml, 然后再加載storm.yaml

    (4)最后,加載zk認(rèn)證相關(guān)信息。
    (5)除此之外,還可以組件中覆蓋getComponentConfiguration方法以修改其組件的配置。
    (6)最后,還可以使用spoutDeclare與boltDeclare設(shè)置外部組件。

    注意,這里有conf和stormConf2個變量,conf才是全部的配置,stormConf不包括defaults.yaml和storm.yaml。先將用戶配置加載到stormConf,然后將defaults.yaml和storm.yaml回到conf,最后將stormConf加載到conf.

    二、 拓?fù)溥\(yùn)行流程

    (一)概述

    拓?fù)鋽?shù)據(jù)流如下:
    1、Spout讀取或者產(chǎn)生數(shù)據(jù)
    2、通過netty/ZMQ將數(shù)據(jù)從所在的worker發(fā)送到下一個Executor所在的worker(如果下一個Executor與spout的executor在同一個worker,則直接發(fā)送到自身worker內(nèi)部的Disruptor Queue)
    3、worker根據(jù)TaskId將消息放入Executor的輸入Disruptor Queue中
    4、Executor處理完數(shù)據(jù)后,將其放到自身的輸出Disruptor Queue中
    5、然后Executor還會啟動一個線程將輸出Disruptor Queue中的內(nèi)容通過netty發(fā)送到其它worker中,或者直接發(fā)送至其它Executor相對應(yīng)的輸入Disruptor Queue(源executor與目標(biāo)executor在同一個worker的情況)。
    6、如此循環(huán)3~5步驟,直至所有executor都處理完成數(shù)據(jù)。

    executor的執(zhí)行方式是一個典型的生產(chǎn)者消費(fèi)者模式

    總結(jié)

    以上是生活随笔為你收集整理的【源码分析】storm拓扑运行全流程源码分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。