日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Storm概念学习系列之Topology拓扑

發布時間:2025/3/15 编程问答 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Storm概念学习系列之Topology拓扑 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

?

?

  不多說,直接上干貨!

?

  ??Hadoop 上運行的是 MapReduce 作業,而在?Storm 上運行的是拓撲 Topology,這兩者之間是非常不同的。一個關鍵的區別是:一個MapReduce 作業最終會結束,而一個 Topology 拓撲會永遠運行(除非手動殺掉)

?

?

?

Topology拓撲

  從字面上解釋Topology,就是網絡拓撲,是指用傳輸介質互連各種設備的物理布局,是構成網絡的成員間特定的物理的(即真實的),或者邏輯的,即虛擬的排列方式。拓撲是一種不考慮物體的大小、形狀等物理屬性,而只使用點或者線描述多個物體實際位置與關系的抽象表示方法。拓撲不關心事物的細節,也不在乎相互的比例關系,只是以圖的形式表示一定范圍內多個物體之間的相互關系。從Storm角度考慮,它不是網絡拓撲,但是又類似于網絡拓撲的結構,所以取名Topology。
那么Storm的Topology指的是類似于網絡拓撲圖的一種虛擬結構。Storm的拓撲Topology類似于MapReduce任務,一個關鍵的區別是MapReduce任務運行一段時間后最終會完成,而Storm拓撲一直運行(直到殺掉它)。一個拓撲是由Spout和Bolt組成的圖,Spout和Bolt之間通過流分組連接起來。圖1形象地描述了Topology中的Spout和Bolt之間的關系。

              ?

                      圖1 ? ?Spout和Bolt的關系圖

?

?

?

  通過對圖1的理解可以看出,Topology是由Spout、Bolt、數據載體Tuple等構成的一定規則的網絡拓撲圖。Storm提供了TopologyBuilder類來創建Topology。打個比方,TopologyBuilder是Topology的骨架,Spout、Bolt是Topology的肉和血液。TopologyBuilder類的主要方法如圖2所示。

            

                    圖 2 ? ?TopologyBuilder類的主要方法

?

  ?TopologyBuilder實際上是封裝了Topology的Thrift接口,也就是說Topology實際上是通過Thrift定義的一個結構,TopologyBuilder將這個對象建立起來,然后Nimbus實際上運行一個Thrift服務器,用于接收用戶提交的結構。由于采用Thrift實現,所以用戶可以用其他語言建立Topology,這樣就提供了比較方便的多語言操作支持。

?

?

?

?

?

?Topology實例
  下面從一個簡單的例子開始介紹Topology的構建和定義,通過此案例能夠基本理解Storm,并且能夠構建一個簡單的Topology。本實例使用Topology來統計一個句子中單詞出現的頻率。下面詳細介紹如何設計和運行Topology,以及一些注意事項。


  1. 設計Topology結構
  在編寫代碼之前,首先要設計Topology。在理清數據處理邏輯之后,創建Topology就非常簡單了。統計單詞詞頻的Topology的大致結構如圖3所示。可以將Topology分成3個部分:一是數據源KafkaSpout,負責發送語句;二是數據處理者SplitSentenceBolt,負責切分語句;三是數據再處理者WordCountBolt,負責累加單詞的頻率

? ? ? ? ? ? ?

                   ?     圖 3 ? ? ?Topology的結構

?

?

? ? ? ? ? ? ? ? ?

? ? ? ? ?2. 設計數據流
  設計的Topology是從KafkaSpout中讀取句子,并把句子劃分成單詞,然后匯總每個單詞出現的次數,一個Bolt負責獲取句子后劃分成單詞,一個Bolt分別對應計算每一個單詞出現的次數,然后Tuple在Spout和Bolt之間傳遞,如圖3-15所示。

          

                       圖4 ? ?Topology內部數據流圖

  

          

?  3. 代碼實現
(1)構建Maven環境
  為了開發Topology,需要把Storm相關的JAR包添加到CLASSPATH中,要么手動添加所有相關的JAR包,要么使用Maven來管理所有的依賴。Storm的JAR包發布在Clojars(一個Maven庫),如果使用Maven,需要把下面的配置代碼添加在項目的pom.xml中。

<repository><id>clojars.org</id><url>http:// clojars.org/repo</url> </repository> <dependency><groupId>storm</groupId><artifactId>storm</artifactId><version>0.8.2</version><scope>test</scope> </dependency>

?

?

(2)定義Topology
  定義Topology的內部邏輯,代碼如下:

  

SpoutConf?ig kafkaConf?ig = new SpoutConf?ig(brokerHosts, "storm-sentence", "", "storm"); kafkaConf?ig.scheme = new SchemeAsMultiScheme(new StringScheme()); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(1new KafkaSpout(kafkaConf?ig), 10);// id, spout, parallelism_hint builder.setBolt(2, new SplitSentence(), 10) .shuffleGrouping(1); builder.setBolt(3, new WordCount(), 20) .f?ieldsGrouping(2, new Fields("word"));

?

?

  聲明的Topology的Spout是從Kafka中讀取句子,Spout用setSpout方法插入一個獨特的ID到Topology中。Topology中的每個節點必須給予一個ID,ID是由其他Bolt用于訂閱該節點的輸出流,KafkaSpout在Topology中的ID為1。
setBolt用于在Topology中插入Bolt。在Topology中定義的第一個Bolt是切割句子的Bolt,該Bolt(即SplitSentence)將句子流轉成單詞流;setBolt的最后一個參數是Bolt的并行量,因為SplitSentence是10個并發,所以在Storm集群中有10個線程并行執行。當Topology遇到性能瓶頸時,可以通過增加Bolt并行數量來解決。setBolt方法返回一個對象,用來定義Bolt的輸入。例如,SplitSentence約定使用組件ID為1的輸出流,1是指已經定義的KafkaSpout。SplitSentence會消耗KafkaSpout發出的每一個元組。
  SplitSentence的關鍵方法是execute,它將句子拆分成單詞,并發出每個單詞作為新的元組。另一個重要的方法是declareOutputFields,其中聲明了Bolt輸出元組的架構,這個方法聲明它發出一個域為“word”的元組。
SplitSentence對句子中的每個單詞發射一個新的Tuple,WordCount在內存中維護每個單詞出現次數的映射,WordCount每收到一個單詞,都會更新內存中的統計狀態。

?

?

  SplitSentence的實現代碼如下:

public class SplitSentence implements IBasicBolt{public void prepare(Map conf, TopologyContext context) {}public void execute(Tuple tuple, BasicOutputCollector collector) {String sentence = tuple.getString(0);for(String word: sentence.split(" ")) {collector.emit(new Values(word));}}public void cleanup() {}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}}

?

?

?

  WordCount的實現代碼如下:

?

public class WordCount implements IBasicBolt {private Map<String, Integer> _counts = new HashMap<String, Integer>();public void prepare(Map conf, TopologyContext context) {}public void execute(Tuple tuple, BasicOutputCollector collector) {String word = tuple.getString(0);int count;if(_counts.containsKey(word)) {count = _counts.get(word);} else {count = 0;}count++;_counts.put(word, count);collector.emit(new Values(word, count));}public void cleanup() {}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word", "count"));} }

?

?

?

  4. Topology運行
  Topology運行有兩種模式:本地模式和分布式模式。這兩種模式的接口區別很大,使用場景也不相同。另外,下面還將介紹Topology的運行流程、方法調用過程以及并行度等。
  1. Topology運行模式
  Topology的運行模式可以分為本地模式和分布式模式,模式可以在配置文件中和代碼中設置。
  (1)本地模式
  Storm用一個進程中的線程來模擬所有的Spout和Bolt。本地模式對開發和測試來說比較有用。storm-starter中的Topology是以本地模式運行的,可以看到Topology中的每一個組件發射的消息。示例代碼如下:

?

Config conf = new Conf?ig(); conf.setDebug(true); conf.setNumWorkers(2); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); Utils.sleep(10000); cluster.killTopology("test"); cluster.shutdown();

  首先,這段代碼通過定義一個LocalCluster對象來定義一個進程內的集群。提交Topology給這個虛擬的集群和提交Topology給分布式集群相同。通過調用submitTopology方法來提交Topology,共有3個參數:要運行的Topology的名稱、一個配置對象,以及要運行的Topology本身。
  Topology是以名稱來唯一區別的,可以用這個名稱來殺掉該Topology,而且必須顯式地殺掉,否則它會一直運行。

?


conf對象可以配置內容很多,下面兩個是最常見的:
  TOPOLOGY_WORKERS (setNumWorkers):定義希望集群分配多少個工作進程來執行這個Topology。Topology中的每個組件都需要線程來執行。每個組件到底用多少個線程是通過setBolt和setSpout來指定的。這些線程都運行在工作進程中。每一個工作進程包含一些節點的一些工作線程。例如,指定300個線程,60個進程,那么每個工作進程中要執行6個線程,而這6個線程可能屬于不同的組件(Spout或Bolt)。可以調整每個組件的并行度以及這些線程所在的進程數量來調整Topology的性能。
  TOPOLOGY_DEBUG (setDebug):當它設置為true時,Storm會記錄下每個組件發射的每條消息。這在本地環境調試Topology時很有用,但是在生產環境中如果這么做,則會影響性能。

?

?

?(2)分布式模式
  Storm由若干節點組成。提交Topology給Nimbus時,也會提交Topology代碼。Nimbus負責分發代碼和給Topolgoy分配工作進程。如果一個工作進程掛掉了,Nimbus節點會將其重新分配到其他節點。分布式模式提交拓撲的代碼如下:

StormSubmitter.submitTopology(topologyName, topologyConf?ig, builder.createTopology());

?

?  在Storm代碼編寫完成之后,需要打包成JAR包放到Nimbus中運行。在打包時,不需要把依賴的JAR都打進去,否則運行時會出現重復的配置文件錯誤導致Topology無法運行,因為在Topology運行之前,會加載本地的storm.yaml配置文件。
  在Nimbus運行的命令如下。

storm jar StormTopology.jar mainclass args

?

?

?

  2. Topology運行流程
  在Topology的運行流程中,有幾點需要特別說明。
  1)提交Topology后,Storm會把代碼先存放到Nimbus節點的inbox目錄下;之后,把當前Storm運行的配置生成一個stormconf.ser文件放到Nimbus節點的stormdist目錄中,此目錄中同時還有序列化之后的Topology代碼文件。
  2)在設定Topology關聯的Spout和Bolt時,可以同時設置當前Spout和Bolt的Executor和Task數量。在默認情況下,一個Topology的Task總和與Executor的總和一致。之后,系統根據Worker的數量,盡量將這些Task平均分配到不同的Worker上執行。Worker在哪個Supervisor節點上運行是由Storm本身決定的。
  3)在任務分配好之后,Nimbus節點將任務的信息提交到ZooKeeper集群,同時在ZooKeeper集群中有Workerbeats,這里存儲了當前Topology所有Worker進程的心跳信息。
  4)Supervisor節點不斷輪詢ZooKeeper集群,在ZooKeeper的assignments中保存了所有Topology的任務分配信息、代碼存儲目錄、任務之間的關聯關系等,Supervisor通過輪詢此節點的內容來領取自己的任務,啟動Worker進程運行。
  5)一個Topology運行之后,不斷通過Spout來發送流,通過Bolt來不斷處理接收到的流,流是無界的。最后一步會不間斷地執行,除非手動結束該Topology。

?


  3. Topology的方法調用流程
  Topology中的流處理時,調用方法的過程如圖3-16所示。
  Topology方法調用的過程有如下一些要點:
  1)每個組件(Spout或者Bolt)的構造方法和declareOutputFields方法都只被調用一次。
  2)open方法和prepare方法被調用多次。在入口函數中設定的setSpout或者setBolt中的并行度參數是指Executor的數量,是負責運行組件中的Task的線程數量,此數量是多少,上述兩個方法就會被調用多少次,在每個Executor運行時調用一次。
  3)nextTuple方法和execute方法是一直運行的,nextTuple方法不斷發射Tuple,Bolt的execute不斷接收Tuple進行處理。只有這樣不斷地運行,才會產生無界的Tuple流,體現實時性。這類似于Java線程的run方法。
  4)提交一個Topology之后,Storm創建Spout/Bolt實例并進行序列化。之后,將序列化的組件發送給所有任務所在的節點(即Supervisor節點),在每一個任務上反序列化組件。
  5)Spout和Bolt之間、Bolt和Bolt之間的通信,通過ZeroMQ的消息隊列實現。
  6)圖3-16沒有列出ack和fail方法,在一個Tuple成功處理之后,需要調用ack方法來標記成功,否則調用fail方法標記失敗,重新處理該Tuple。

    ?       ?

?                        圖5 ? ?Topology流處理過程圖      

?    

?

  4. Topology并行度
  在Topology的執行單元中,有幾個和并行度相關的概念。
  (1)Worker
  每個Worker都屬于一個特定的Topology,每個Supervisor節點的Worker可以有多個,每個Worker使用一個單獨的端口,Worker對Topology中的每個組件運行一個或者多個Executor線程來提供Task的執行服務。
  (2)Executor
  Executor是產生于Worker進程內部的線程,會執行同一個組件的一個或者多個Task。
  (3)Task
  實際的數據處理由Task完成。在Topology的生命周期中,每個組件的Task數量不會變化,而Executor的數量卻不一定。Executor數量小于等于Task的數量,在默認情況下,二者是相等的。
  在運行一個Topology時,可以根據具體的情況來設置不同數量的Worker、Task、Executor,設置的位置也可以在多個地方。
  1)Worker設置:可以設置yaml中的topology.workers屬性。在代碼中通過Conf?ig的setNumWorkers方法設定。
  2)Executor設置:通過Topology的入口類中的setBolt、setSpout方法的最后一個參數指定,如果不指定,則使用默認值1。
  3)Task設置:在默認情況下,和executor數量一致。在代碼中通過TopologyBuilder的setNumTasks方法設定具體某個組件的Task數量。

?

  5. 終止Topology
  在Nimbus啟動的節點上,使用下面的命令來終止一個Topology的運行。
  storm kill topologyName
  執行kill之后,通過UI界面查看Topology狀態,其先變成KILLED狀態,清理完本地目錄和ZooKeeper集群中與當前Topology相關的信息之后,此Topology將徹底消失。

?

  6.Topology跟蹤
  提交Topology后,可以在Storm UI界面查看整個Topology運行的過程。

?

?

?

?

   

         

         

?

?

  如下

?

總結

以上是生活随笔為你收集整理的Storm概念学习系列之Topology拓扑的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。