storm原理介绍
storm原理介紹
@(STORM)[storm, 大數據]
- storm原理介紹
- 一原理介紹
- Why use Storm
- 1適用場景
- 2集群相關概念
- 3拓撲相關概念
- 二配置
- 三并行度
- 一storm拓撲的并行度可以從以下4個維度進行設置
- 二并行度的設置方法
- 三示例
- 四分組
- 五可靠性
- 一spout
- 二bolt
一、原理介紹
Why use Storm?
**Apache Storm is a free and open source distributed realtime computation system. Storm makes it easy to reliably process unbounded streams of data, doing for realtime processing what Hadoop did for batch processing. **Storm is simple, can be used with any programming language, and is a lot of fun to use!
Storm has many use cases: realtime analytics, online machine learning, continuous computation, distributed RPC, ETL, and more. Storm is fast: a benchmark clocked it at over a million tuples processed per second per node. It is scalable, fault-tolerant, guarantees your data will be processed, and is easy to set up and operate.
**Storm integrates with the queueing and database technologies you already use. **A Storm topology consumes streams of data and processes those streams in arbitrarily complex ways, repartitioning the streams between each stage of the computation however needed. Read more in the tutorial.
storm: 分布式實時計算系統。
1、適用場景
流數據處理:Storm可以用來處理源源不斷流進來的消息,處理之后將結果寫入到某個存儲中去。
分布式rpc:由于storm的處理組件是分布式的,而且處理延遲極低,所以可以作為一個通用的分布式rpc框架來使用。當然,其實我們的搜索引擎本身也是一個分布式rpc系統。
2、集群相關概念
(1) Nimbus:負責資源分配和任務調度。
(2)Supervisor:負責接受nimbus分配的任務,啟動和停止屬于自己管理的worker進程。
(3)Worker:運行具體處理組件邏輯的進程。
(4)Task:worker中每一個spout/bolt的線程稱為一個task. 在storm0.8之后,task不再與物理線程對應,同一個spout/bolt的task可能會共享一個物理線程,該線程稱為executor。
3、拓撲相關概念
(1) Topology:storm中運行的一個實時應用程序,因為各個組件間的消息流動形成邏輯上的一個拓撲結構。
(2)Spout:在一個topology中產生源數據流的組件。通常情況下spout會從外部數據源中讀取數據,然后轉換為topology內部的源數據。Spout是一個主動的角色,其接口中有個nextTuple()函數,storm框架會不停地調用此函數,用戶只要在其中生成源數據即可。
(3)Bolt:在一個topology中接受數據然后執行處理的組件。Bolt可以執行過濾、函數操作、合并、寫數據庫等任何操作。Bolt是一個被動的角色,其接口中有個execute(Tuple input)函數,在接受到消息后會調用此函數,用戶可以在其中執行自己想要的操作。
(4)Tuple:一次消息傳遞的基本單元。本來應該是一個key-value的map,但是由于各個組件間傳遞的tuple的字段名稱已經事先定義好,所以tuple中只要按序填入各個value就行了,所以就是一個value list.
(5)Stream:源源不斷傳遞的tuple就組成了stream。
二、配置
完整的默認配置文件見下面defaluts.yaml,若需要修改,則在storm.yaml中修改。重要參數如下:
1、storm.zookeeper.servers:指定使用哪個zookeeper集群
2、nimbus.host:指定nimbus是哪臺機器
nimbus.host: "gdc-nn01-test”3、指定supervisor在哪個端口上運行worker,每個端口可運行一個worker,因此有多少個配置端口,則每個supervisor有多少個slot(即可運行多少個worker)
supervisor.slots.ports:- 6700- 6701- 6702- 6703storm.local.dir: "/home/hadoop/storm/data"4、jvm設置
nimbus.childopts:"-4096m” supervisor.childopts:"-Xmx4096m" nimubs.childopts:"-Xmx3072m”除此外,還有ui.childopts,logviewer.childopts
附完整配置文件:defaults.yaml
########### These all have default values as shown ########### Additional configuration goes into storm.yaml java.library.path: "/usr/local/lib:/opt/local/lib:/usr/lib"### storm.* configs are general configurations # the local dir is where jars are kept storm.local.dir: "storm-local" storm.zookeeper.servers:- "localhost" storm.zookeeper.port: 2181 storm.zookeeper.root: "/storm" storm.zookeeper.session.timeout: 20000 storm.zookeeper.connection.timeout: 15000 storm.zookeeper.retry.times: 5 storm.zookeeper.retry.interval: 1000 storm.zookeeper.retry.intervalceiling.millis: 30000 storm.cluster.mode: "distributed" # can be distributed or local storm.local.mode.zmq: false storm.thrift.transport: "backtype.storm.security.auth.SimpleTransportPlugin" storm.messaging.transport: "backtype.storm.messaging.netty.Context" storm.meta.serialization.delegate: "backtype.storm.serialization.DefaultSerializationDelegate"### nimbus.* configs are for the master nimbus.host: "localhost" nimbus.thrift.port: 6627 nimbus.thrift.max_buffer_size: 1048576 nimbus.childopts: "-Xmx1024m" nimbus.task.timeout.secs: 30 nimbus.supervisor.timeout.secs: 60 nimbus.monitor.freq.secs: 10 nimbus.cleanup.inbox.freq.secs: 600 nimbus.inbox.jar.expiration.secs: 3600 nimbus.task.launch.secs: 120 nimbus.reassign: true nimbus.file.copy.expiration.secs: 600 nimbus.topology.validator: "backtype.storm.nimbus.DefaultTopologyValidator"### ui.* configs are for the master ui.port: 8080 ui.childopts: "-Xmx768m" logviewer.port: 8000 logviewer.childopts: "-Xmx128m" logviewer.appender.name: "A1"drpc.port: 3772 drpc.worker.threads: 64 drpc.queue.size: 128 drpc.invocations.port: 3773 drpc.request.timeout.secs: 600 drpc.childopts: "-Xmx768m" transactional.zookeeper.root: "/transactional" transactional.zookeeper.servers: null transactional.zookeeper.port: null### supervisor.* configs are for node supervisors # Define the amount of workers that can be run on this machine. Each worker is assigned a port to use for communication supervisor.slots.ports:- 6700- 6701- 6702- 6703 supervisor.childopts: "-Xmx256m" #how long supervisor will wait to ensure that a worker process is started supervisor.worker.start.timeout.secs: 120 #how long between heartbeats until supervisor considers that worker dead and tries to restart it supervisor.worker.timeout.secs: 30 #how frequently the supervisor checks on the status of the processes it's monitoring and restarts if necessary supervisor.monitor.frequency.secs: 3 #how frequently the supervisor heartbeats to the cluster state (for nimbus) supervisor.heartbeat.frequency.secs: 5 supervisor.enable: true ### worker.* configs are for task workers worker.childopts: "-Xmx768m" worker.heartbeat.frequency.secs: 1# control how many worker receiver threads we need per worker topology.worker.receiver.thread.count: 1 task.heartbeat.frequency.secs: 3 task.refresh.poll.secs: 10 zmq.threads: 1 zmq.linger.millis: 5000 zmq.hwm: 0 storm.messaging.netty.server_worker_threads: 1 storm.messaging.netty.client_worker_threads: 1 storm.messaging.netty.buffer_size: 5242880 #5MB buffer # Since nimbus.task.launch.secs and supervisor.worker.start.timeout.secs are 120, other workers should also wait at least that long before giving up on connecting to the other worker. The reconnection period need also be bigger than storm.zookeeper.session.timeout(default is 20s), so that we can abort the reconnection when the target worker is dead. storm.messaging.netty.max_retries: 30 storm.messaging.netty.max_wait_ms: 1000 storm.messaging.netty.min_wait_ms: 100# If the Netty messaging layer is busy(netty internal buffer not writable), the Netty client will try to batch message as more as possible up to the size of storm.messaging.netty.transfer.batch.size bytes, otherwise it will try to flush message as soon as possible to reduce latency. storm.messaging.netty.transfer.batch.size: 262144# We check with this interval that whether the Netty channel is writable and try to write pending messages if it is. storm.messaging.netty.flush.check.interval.ms: 10 ### topology.* configs are for specific executing storms topology.enable.message.timeouts: true topology.debug: false topology.workers: 1 topology.acker.executors: null topology.tasks: null # maximum amount of time a message has to complete before it's considered failed topology.message.timeout.secs: 30 topology.multilang.serializer: "backtype.storm.multilang.JsonSerializer" topology.skip.missing.kryo.registrations: false topology.max.task.parallelism: null topology.max.spout.pending: null topology.state.synchronization.timeout.secs: 60 topology.stats.sample.rate: 0.05 topology.builtin.metrics.bucket.size.secs: 60 topology.fall.back.on.java.serialization: true topology.worker.childopts: null topology.executor.receive.buffer.size: 1024 #batched topology.executor.send.buffer.size: 1024 #individual messages topology.receiver.buffer.size: 8 # setting it too high causes a lot of problems (heartbeat thread gets starved, throughput plummets) topology.transfer.buffer.size: 1024 # batched topology.tick.tuple.freq.secs: null topology.worker.shared.thread.pool.size: 4 topology.disruptor.wait.strategy: "com.lmax.disruptor.BlockingWaitStrategy" topology.spout.wait.strategy: "backtype.storm.spout.SleepSpoutWaitStrategy" topology.sleep.spout.wait.strategy.time.ms: 1 topology.error.throttle.interval.secs: 10 topology.max.error.report.per.interval: 5 topology.kryo.factory: "backtype.storm.serialization.DefaultKryoFactory" topology.tuple.serializer: "backtype.storm.serialization.types.ListDelegateSerializer" topology.trident.batch.emit.interval.millis: 500 topology.classpath: null topology.environment: null dev.zookeeper.path: "/tmp/dev-storm-zookeeper"</span>三、并行度
(一)storm拓撲的并行度可以從以下4個維度進行設置:
1、node(服務器):指一個storm集群中的supervisor服務器數量。
2、worker(jvm進程):指整個拓撲中worker進程的總數量,這些數量會隨機的平均分配到各個node。
3、executor(線程):指某個spout或者bolt的總線程數量,這些線程會被隨機平均的分配到各個worker。
4、task(spout/bolt實例):task是spout和bolt的實例,它們的nextTuple()和execute()方法會被executors線程調用。除非明確指定,storm會給每個executor分配一個task。如果設置了多個task,即一個線程持有了多個spout/bolt實例.
注意:以上設置的都是總數量,這些數量會被平均分配到各自的宿主上,而不是設置每個宿主進行多少個進程/線程。詳見下面的例子。
關于executor/task的進一步說明:
The number of tasks is the number of spout objects that get created, that each have their own distinct sets of tuples that are emitted, need to be acked, etc. The number of executors is the number of OS threads (potentially across more than 1 machine) that get created to service these spout objects. Usually there is 1 executor for each task, but you may want to create more tasks than executors if you think you will want to rebalance in the future.
(二)并行度的設置方法
1、node:買機器吧,然后加入集群中……
2、worker:Config#setNumWorkers() 或者配置項 TOPOLOGY_WORKERS
3、executor:Topology.setSpout()/.setBolt()的最后一個參數
4、task:ComponentConfigurationDeclarer#setNumWorker()
(三)示例
// 創建topology TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5);//設置executor數量為5 builder.setBolt("filter-bolt", new FilterBolt(), 3).shuffleGrouping( "kafka-reader");//設置executor數量為3 builder.setBolt("log-splitter", new LogSplitterBolt(), 3) .shuffleGrouping("filter-bolt");//設置executor數量為5 builder.setBolt("hdfs-bolt", hdfsBolt, 2).shuffleGrouping( "log-splitter");//設置executor數量為2 // 啟動topology Config conf = new Config(); conf.put(Config.NIMBUS_HOST, nimbusHost); conf.setNumWorkers(3); //設置worker數量 StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());1、通過config.setNumWorkers(3)將worker進程數量設置為3,假設集群中有3個node,則每個node會運行一個worker。
2、executor的數量分別為:
spout:5
filter-bolt:3
log-splitter:3
hdfs-bolt:2
總共為13個executor,這13個executor會被隨機分配到各個worker中去。
注:這段代碼是從kafka中讀取消息源的,而這個topic在kafka中的分區數量設置為5,因此這里spout的線程ovtn為5.
3、這個示例都沒有單獨設置task的數量,即使用每個executor一個task的默認配置。若需要設置,可以:
來進行設置,這5個task會被分配到3個executor中。
(四)并行度的動態調整
對storm拓撲的并行度進行調整有2種方法:
1、kill topo—>修改代碼—>編譯—>提交拓撲
2、動態調整
第1種方法太不方便了,有時候topo不能說kill就kill,另外,如果加幾臺機器,難道要把所有topo kill掉還要修改代碼?
因此storm提供了動態調整的方法,動態調整有2種方法:
1、ui方式:進入某個topo的頁面,點擊rebalance即可,此時可以看到topo的狀態是rebalancing。但此方法只是把進程、線程在各個機器上重新分配,即適用于增加機器,或者減少機器的情形,不能調整worker數量、executor數量等
2、cli方式:storm rebalance
舉個例子
將topo的worker數量設置為7,并將filter-bolt與hdfs-bolt的executor數量分別設置為6、8.
此時,查看topo的狀態是rebalancing,調整完成后,可以看到3臺機器中的worker數量分別為3、2、2
四、分組
Storm通過分組來指定數據的流向,主要指定了每個bolt消費哪個流,以及如何消費。
storm內置了7個分組方式,并提供了CustomStreamGrouping來創建自定義的分組方式。
1、隨機分組 shuffleGrouping
這種方式會隨機分發tuple給bolt的各個task,每個task接到到相同數量的tuple。
2、字段分組 fieldGrouping
按照指定字段進行分組,該字段具有相同組的會被發送到同一個task,具體不同值的可能會被發送到不同的task。
3、全復制分組 allGrouping(或者叫廣播分組)
每一個tuple都會發送給所有的task,必須小心使用。
4、全局分組 globlaGrouping
將所有tuple均發送到唯一的task,會選取task ID最小的task。這種分組下,設置task的并行度是沒有意義的。另外,這種方式很有可能引起瓶頸。
5、不分組 noneGrouping
留作以后使用,目前也隨機分組相同。
6、指向型分組 directGrouping(或者叫直接分組)
數據源會調用emitDirect()方法來判斷一個tuple應該由哪個storm組件來接收,只能在聲明了是指向型的數據流上使用。
7、本地或隨機分組 localOrShuffleGrouping
如果接收bolt在同一個進程中存在一個或者多個task,tuple會優先發送給這個task。否則和隨機分組一樣。相對于隨機分組,此方式可以減少網絡傳輸,從而提高性能。
五、可靠性
可靠性:spout發送的消息會被拓撲樹上的所有節點ack,否則會一直重發。
導致重發的原因有2個:
(1)fail()被調用
(2)超時無響應。
完整的可靠性示例請參考storm blueprint的chapter1 v4代碼,或者P22,或者參考從零開始學storm P102頁的例子。
關鍵步驟如下:
(一)spout
1、創建一個map,用于記錄已經發送的tuple的id與內容,此為待確認的tuple列表。
private ConcurrentHashMap<UUID,Values> pending;2、發送tuple時,加上一個參數用于指明該tuple的id。同時,將此tuple加入map中,等待確認。
UUID msgId = UUID.randomUUID(); this.pending.put(msgId,values); this.collector.emit(values,msgId);3、定義ack方法與fail方法。
ack方法將tuple從map中取出
fail方法將tuple重新發送
this.collector.emit(this.pending.get(msgId),msgId);對于沒回復的tuple,會定時重新發送。
(二)bolt
處理該tuple的每個bolt均需要增加以下內容:
1、emit時,增加一個參數anchor,指定響應的tuple
2、確認接收到的tuple已經處理
this.collector.ack(tuple); 創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
- 上一篇: trident原理及编程指南
- 下一篇: storm集群操作指南