在线实时大数据平台Storm集群组件学习
Hadoop常用于離線的復雜的大數(shù)據(jù)處理,Spark常用于離線的快速(輕量級)的大數(shù)據(jù)處理, Storm常用于在線的實時的大數(shù)據(jù)處理;這句話一定程度上反應了三套大數(shù)據(jù)平臺的鮮明特征。Storm是一套實時、在線、分布式的大數(shù)據(jù)處理平臺。
1)Nimbus和Supervisor
Storm集群中包含兩類節(jié)點:主控節(jié)點(Master Node)和工作節(jié)點(Work Node),角色定位如下:
主控節(jié)點(MasterNode)上運行一個被稱為Nimbus的后臺程序,它負責在Storm集群內(nèi)分發(fā)代碼,分配任務給工作機器,并且負責監(jiān)控集群運行狀態(tài)。Nimbus的作用類似于Hadoop中JobTracker的角色。
每個工作節(jié)點(WorkNode)上運行一個被稱為Supervisor的后臺程序。Supervisor負責監(jiān)聽從Nimbus分配給它執(zhí)行的任務,據(jù)此啟動或停止執(zhí)行任務的工作進程。每一個工作進程執(zhí)行一個Topology的子集;一個運行中的Topology由分布在不同工作節(jié)點上的多個工作進程組成。
Nimbus和Supervisor節(jié)點之間所有的協(xié)調(diào)工作是通過Zookeeper集群來實現(xiàn)的。此外,Nimbus和Supervisor進程都是快速失敗(fail-fast)和無狀態(tài)(stateless)的;Storm集群所有的狀態(tài)要么在Zookeeper集群中,要么存儲在本地磁盤上。這意味著你可以用kill -9來殺死Nimbus和Supervisor進程,它們在重啟后可以繼續(xù)工作。這個設計使得Storm集群擁有不可思議的穩(wěn)定性。
2)Topology
一個topology是spouts和bolts組成的圖, 通過streamgroupings將圖中的spouts和bolts連接起來,如下圖:
一個topology會一直運行直到你手動kill掉,Storm自動重新分配執(zhí)行失敗的任務, 并且Storm可以保證你不會有數(shù)據(jù)丟失(如果開啟了高可靠性的話)。如果一些機器意外停機它上面的所有任務會被轉(zhuǎn)移到其他機器上。
運行一個topology很簡單。首先,把你所有的代碼以及所依賴的jar打進一個jar包。然后運行類似下面的這個命令:
storm jarall-my-code.jar backtype.storm.MyTopology arg1 arg2
這個命令會運行主類: backtype.strom.MyTopology, 參數(shù)是arg1, arg2。這個類的main函數(shù)定義這個topology并且把它提交給Nimbus。storm jar負責連接到Nimbus并且上傳jar包。Topology的定義是一個Thrift結(jié)構(gòu),并且Nimbus就是一個Thrift服務,可以提交由任何語言創(chuàng)建的topology。
3)Stream
消息流stream是storm里的關鍵抽象。一個消息流是一個沒有邊界的tuple序列, 而這些tuple序列會以一種分布式的方式并行地創(chuàng)建和處理。通過對stream中tuple序列中每個字段命名來定義stream。在默認的情況下,tuple的字段類型可以是:integer,long,short, byte,string,double,float,boolean和bytearray。你也可以自定義類型(只要實現(xiàn)相應的序列化器)。
每個消息流在定義的時候會被分配給一個id,因為單向消息流使用的相當普遍, OutputFieldsDeclarer定義了一些方法讓你可以定義一個stream而不用指定這個id。在這種情況下這個stream會分配個值為‘default’默認的id 。
Storm提供的最基本的處理stream的原語是spout和bolt。你可以實現(xiàn)spout和bolt提供的接口來處理你的業(yè)務邏輯。
4)Spouts
消息源spout是Storm里面一個topology里面的消息生產(chǎn)者。一般來說消息源會從一個外部源讀取數(shù)據(jù)并且向topology里面發(fā)出消息:tuple。Spout可以是可靠的也可以是不可靠的。如果這個tuple沒有被storm成功處理,可靠的消息源spouts可以重新發(fā)射一個tuple, 但是不可靠的消息源spouts一旦發(fā)出一個tuple就不能重發(fā)了。
消息源可以發(fā)射多條消息流stream。使用OutputFieldsDeclarer.declareStream來定義多個stream,然后使用SpoutOutputCollector來發(fā)射指定的stream。
Spout類里面最重要的方法是nextTuple。要么發(fā)射一個新的tuple到topology里面或者簡單的返回如果已經(jīng)沒有新的tuple。要注意的是nextTuple方法不能阻塞,因為storm在同一個線程上面調(diào)用所有消息源spout的方法。
另外兩個比較重要的spout方法是ack和fail。storm在檢測到一個tuple被整個topology成功處理的時候調(diào)用ack,否則調(diào)用fail。storm只對可靠的spout調(diào)用ack和fail。
5)Bolts
所有的消息處理邏輯被封裝在bolts里面。Bolts可以做很多事情:過濾,聚合,查詢數(shù)據(jù)庫等等。
Bolts可以簡單的做消息流的傳遞。復雜的消息流處理往往需要很多步驟,從而也就需要經(jīng)過很多bolts。比如算出一堆圖片里面被轉(zhuǎn)發(fā)最多的圖片就至少需要兩步:第一步算出每個圖片的轉(zhuǎn)發(fā)數(shù)量。第二步找出轉(zhuǎn)發(fā)最多的前10個圖片。(如果要把這個過程做得更具有擴展性那么可能需要更多的步驟)。
Bolts可以發(fā)射多條消息流, 使用OutputFieldsDeclarer.declareStream定義stream,使用OutputCollector.emit來選擇要發(fā)射的stream。
Bolts的主要方法是execute,它以一個tuple作為輸入,bolts使用OutputCollector來發(fā)射tuple,bolts必須要為它處理的每一個tuple調(diào)用OutputCollector的ack方法,以通知Storm這個tuple被處理完成了,從而通知這個tuple的發(fā)射者spouts。 一般的流程是: bolts處理一個輸入tuple,? 發(fā)射0個或者多個tuple, 然后調(diào)用ack通知storm自己已經(jīng)處理過這個tuple了。storm提供了一個IBasicBolt會自動調(diào)用ack。
6)Stream groupings
義一個topology的其中一步是定義每個bolt接收什么樣的流作為輸入。stream grouping就是用來定義一個stream應該如何分配數(shù)據(jù)給bolts上面的多個tasks。
Storm里面有7種類型的stream grouping
ü?? ShuffleGrouping: 隨機分組,隨機派發(fā)stream里面的tuple,保證每個bolt接收到的tuple數(shù)目大致相同。
ü?? FieldsGrouping:按字段分組,比如按userid來分組, 具有同樣userid的tuple會被分到相同的Bolts里的一個task, 而不同的userid則會被分配到不同的bolts里的task。
ü?? All Grouping:廣播發(fā)送,對于每一個tuple,所有的bolts都會收到。
ü?? GlobalGrouping:全局分組,這個tuple被分配到storm中的一個bolt的其中一個task。再具體一點就是分配給id值最低的那個task。
ü?? Non Grouping:不分組,這個分組的意思是說stream不關心到底誰會收到它的tuple。目前這種分組和Shuffle grouping是一樣的效果, 有一點不同的是storm會把這個bolt放到這個bolt的訂閱者同一個線程里面去執(zhí)行。
ü?? DirectGrouping: 直接分組,這是一種比較特別的分組方法,用這種分組意味著消息的發(fā)送者指定由消息接收者的哪個task處理這個消息。 只有被聲明為Direct Stream的消息流可以聲明這種分組方法。而且這種消息tuple必須使用emitDirect方法來發(fā)射。消息處理者可以通過TopologyContext來獲取處理它的消息的task的id (OutputCollector.emit方法也會返回task的id)。
ü?? Local orshuffle grouping:如果目標bolt有一個或者多個task在同一個工作進程中,tuple將會被隨機發(fā)生給這些tasks。否則,和普通的Shuffle Grouping行為一致。
7)Tasks和Workers
每一個spout和bolt會被當作很多task在整個集群里執(zhí)行。每一個executor對應到一個線程,在這個線程上運行多個task,而stream grouping則是定義怎么從一堆task發(fā)射tuple到另外一堆task。你可以調(diào)用TopologyBuilder類的setSpout和setBolt來設置并行度(也就是有多少個task)。
一個topology可能會在一個或者多個worker(工作進程)里面執(zhí)行,每個worker是一個物理JVM并且執(zhí)行整個topology的一部分。比如,對于并行度是300的topology來說,如果我們使用50個工作進程來執(zhí)行,那么每個工作進程會處理其中的6個tasks。Storm會盡量均勻的工作分配給所有的worker。
8)Configuration
Storm里面有一堆參數(shù)可以配置來調(diào)整Nimbus, Supervisor以及正在運行的topology的行為,一些配置是系統(tǒng)級別的,一些配置是topology級別的。default.yaml里面有所有的默認配置。你可以通過定義個storm.yaml在你的classpath里來覆蓋這些默認配置。并且你也可以在代碼里面設置一些topology相關的配置信息(使用StormSubmitter)。總結(jié)
以上是生活随笔為你收集整理的在线实时大数据平台Storm集群组件学习的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 在线实时大数据平台Storm单机部署
- 下一篇: 在线实时大数据平台Storm开发之wor