Strom程序的并发机制,配置并行度(代码实现)、动态改变并行度,local or shuffle分组,分组的概念以及分组类型
1、Storm程序的并發(fā)機(jī)制
1.1、概念
? Workers (JVMs): 在一個(gè)物理節(jié)點(diǎn)上可以運(yùn)行一個(gè)或多個(gè)獨(dú)立的JVM 進(jìn)程。一個(gè)Topology可以包含一個(gè)或多個(gè)worker(并行的跑在不同的物理機(jī)上), 所以worker process就是執(zhí)行一個(gè)topology的子集, 并且worker只能對(duì)應(yīng)于一個(gè)topology
? Executors (threads): 在一個(gè)worker JVM進(jìn)程中運(yùn)行著多個(gè)Java線程。一個(gè)executor線程可以執(zhí)行一個(gè)或多個(gè)tasks。但一般默認(rèn)每個(gè)executor只執(zhí)行一個(gè)task。一個(gè)worker可以包含一個(gè)或多個(gè)executor, 每個(gè)component (spout或bolt)至少對(duì)應(yīng)于一個(gè)executor, 所以可以說(shuō)executor執(zhí)行一個(gè)compenent的子集, 同時(shí)一個(gè)executor只能對(duì)應(yīng)于一個(gè)component。
? Tasks(bolt/spout instances): Task就是具體的處理邏輯對(duì)象,每一個(gè)Spout和Bolt會(huì)被當(dāng)作很多task在整個(gè)集群里面執(zhí)行。每一個(gè)task對(duì)應(yīng)到一個(gè)線程,而stream grouping則是定義怎么從一堆task發(fā)射tuple到另外一堆task。你可以調(diào)用TopologyBuilder.setSpout和TopBuilder.setBolt來(lái)設(shè)置并行度 — 也就是有多少個(gè)task。
1.2、配置并行度
? 對(duì)于并發(fā)度的配置, 在storm里面可以在多個(gè)地方進(jìn)行配置, 優(yōu)先級(jí)為:
defaults.yaml < storm.yaml < topology-specific configuration < internal component-specific configuration < external component-specific configuration? worker processes的數(shù)目, 可以通過(guò)配置文件和代碼中配置, worker就是執(zhí)行進(jìn)程, 所以考慮并發(fā)的效果, 數(shù)目至少應(yīng)該大亍machines的數(shù)目
? executor的數(shù)目, component的并發(fā)線程數(shù),只能在代碼中配置(通過(guò)setBolt和setSpout的參數(shù)), 例如, setBolt(“green-bolt”, new GreenBolt(), 2)
? tasks的數(shù)目, 可以不配置, 默認(rèn)和executor1:1, 也可以通過(guò)setNumTasks()配置
Topology的worker數(shù)通過(guò)config設(shè)置,即執(zhí)行該topology的worker(java)進(jìn)程數(shù)。它可以通過(guò) storm rebalance 命令任意調(diào)整。
3個(gè)組件的并發(fā)度加起來(lái)是10,就是說(shuō)拓?fù)湟还灿?0個(gè)executor,一共有2個(gè)worker,每個(gè)worker產(chǎn)生10 / 2 = 5條線程。
綠色的bolt配置成2個(gè)executor和4個(gè)task。為此每個(gè)executor為這個(gè)bolt運(yùn)行2個(gè)task。
? 動(dòng)態(tài)的改變并行度
Storm支持在不 restart topology 的情況下, 動(dòng)態(tài)的改變(增減) worker processes 的數(shù)目和 executors 的數(shù)目, 稱為rebalancing. 通過(guò)Storm web UI,或者通過(guò)storm rebalance命令實(shí)現(xiàn):
local or shuffle 分組
說(shuō)明:
1.如果strom程序中的并行度總數(shù)太多,放在一個(gè)JVM中運(yùn)行比較緩慢。
2.會(huì)考慮使用多個(gè)JVM來(lái)運(yùn)行程序。
3.每個(gè)JVM中都有spout線程和邏輯執(zhí)行線程。
4.為了減少JVM之間數(shù)據(jù)傳輸?shù)拈_(kāi)銷(xiāo),設(shè)計(jì)一個(gè)數(shù)據(jù)分發(fā)的策略
如果數(shù)據(jù)分發(fā)沒(méi)有嚴(yán)格的業(yè)務(wù)含義,考慮spout或者上游的數(shù)據(jù)只發(fā)給當(dāng)前JVM中的下游bolt.
如果一個(gè)stormTopology任務(wù)想在多個(gè)JVM中運(yùn)行,如何設(shè)置?
Config config = new Config();
config.setNumWorkers(2);
如果不指定numworkers的數(shù)量,默認(rèn)是在一個(gè)worker中運(yùn)行。
分組的概念
說(shuō)明:
Strom里面有7種類型的stream grouping
1.Shuffle Grouping:隨機(jī)分組,隨機(jī)派發(fā)stream里面的tuple,保證每個(gè)bolt接收到的tuple數(shù)目大致相同。
2.Fields Grouping:按字段分組,比如按userid來(lái)分組,具有同樣userid的tuple會(huì)被分到相同的Bolts里的task,而不同的userId則會(huì)被分配到不同的bolts里的task.
3.All Grouping:廣播發(fā)送,對(duì)于每一個(gè)tuple,所有的bolts都會(huì)收到。
4.Global Grouping:全局分組,這個(gè)tuple被分配到storm中的一個(gè)bolt的其中一個(gè)task。再具體一點(diǎn)就是分配給id值最低的那個(gè)task.
5.Non Grouping:不分組,這stream grouping個(gè)分組的意思是說(shuō)stream不關(guān)心到底誰(shuí)會(huì)收到它的tuple。目前這種分組和Shuffle grouping是一樣的效果,有一點(diǎn)不同的是storm會(huì)把這個(gè)bolt放在這個(gè)bolt的訂閱者同一個(gè)線程里面去執(zhí)行。
6.Direct Grouping:直接分組,這是一種比較特別的分組方法,用這種分組意味著消息的發(fā)送者指定由消息接收者的哪個(gè)task處理這個(gè)消息。只有被聲明Direct Stream的消息流可以聲明這種分組方法。而且這種消息tuple必須使用emitDirect方法來(lái)發(fā)射。消息處理者可以通過(guò)TopologyContext來(lái)獲取處理它的消息的task的id(OutputCollector.emit方法也會(huì)返回task的id)
7.Local or shuffle grouping:如果目標(biāo)bolt有一個(gè)或者多個(gè)task在同一個(gè)工作進(jìn)程中,tuple將會(huì)被隨機(jī)發(fā)給這些tasks。否則,和普通的Shuffle Grouping行為一致。
總結(jié)
以上是生活随笔為你收集整理的Strom程序的并发机制,配置并行度(代码实现)、动态改变并行度,local or shuffle分组,分组的概念以及分组类型的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 买房怎样贷款合适
- 下一篇: Strom+Kafka + redis实