日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

storm调优

發布時間:2024/1/23 编程问答 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 storm调优 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

storm調優

@(STORM)[storm]

本文從2個方面討論storm的調優,第一個是集群的調優,第二個是運行在集群中的拓撲的調優,這部分還包括了使用storm-kafka從kafka中讀取消息的調優。

官方的一些建議請見:http://storm.apache.org/documentation/FAQ.html
中文版:http://ifeve.com/storm-faq/

一、集群調優

1、netty的調優

netty的配置項主要包括以下幾個:

storm.messaging.netty.server_worker_threads: 1 storm.messaging.netty.client_worker_threads: 1 storm.messaging.netty.buffer_size: 5242880 #5MB buffer # Since nimbus.task.launch.secs and supervisor.worker.start.timeout.secs are 120, other workers should also wait at least that long before giving up on connecting to the other worker. The reconnection period need also be bigger than storm.zookeeper.session.timeout(default is 20s), so that we can abort the reconnection when the target worker is dead. storm.messaging.netty.max_retries: 300 storm.messaging.netty.max_wait_ms: 1000 storm.messaging.netty.min_wait_ms: 100# If the Netty messaging layer is busy(netty internal buffer not writable), the Netty client will try to batch message as more as possible up to the size of storm.messaging.netty.transfer.batch.size bytes, otherwise it will try to flush message as soon as possible to reduce latency. storm.messaging.netty.transfer.batch.size: 262144 # Sets the backlog value to specify when the channel binds to a local address storm.messaging.netty.socket.backlog: 500# By default, the Netty SASL authentication is set to false. Users can override and set it true for a specific topology. storm.messaging.netty.authentication: false

2、GC記錄打印

  • 在配置文件中開啟 GC 日志記錄;如果一切正常,日志中記錄的 major GC 應該會非常少

二、拓撲調優

1、使用組件的并行度代替線程池

在storm中,我們可以很方便的調整spout/bolt的并行度,即使啟動拓撲時設置不合理,也可以使用rebanlance命令進行動態調整。
但有些人可能會在一個spout/bolt組件的task內部啟動一個線程池,這些線程池所在的task會比其余task消耗更多的資源,因此這些task所在的worker會消耗較多的資源,有可能影響其它拓撲的正常執行。
因此,應該使用組件自身的并行度來代替線程池,因為這些并行度會被合理分配到不同的worker中去。除此之外,還可以使用CGroup等技術進行資源的控制。

2、不要在spout中處理耗時的操作

在storm中,spout是單線程的。如果nextTuple方法非常耗時,某個消息被成功執行完畢后,acker會給spout發送消息,spout若無法及時消費,則有可能導致 ack消息被丟棄,然后spout認為執行失敗了。
在jstorm中將spout分成了3個線程,分別執行nextTuple, fail, ack方法。

3、fieldsGrouping的數據均衡性

fieldsGrouping根據某個field的值進行分組,以userId為例,如果一個組件以userId的值作為分組,則具有相同userId的值會被發送到同一個task。如果某些userId的數據量特別大,會導致這接收這些數據的task負載特別高,從而導致數據均衡出現問題。
因此必須合理選擇field的值,或者更換分組策略。

4、優先使用localOrShuffleGrouping代替shuffleGrouping

localOrShuffleGrouping是指如果task發送消息給目標task時,發現同一個worker中有目標task,則優先發送到這個task;如果沒有,則進行shuffle,隨機選取一個目標task。
localOrShuffleGrouping其實是對shuffleGrouping的一個優化,因為消除了網絡開銷和序列化操作。

5、設置合理的MaxSpoutPending

另附官方建議:
- 開始時設置一個很小的 TOPOLOGY_MAX_SPOUT_PENDING(對于 trident 可以設置為 1,對于一般的 topology 可以設置為 executor 的數量),然后逐漸增大,直到數據流不再發生變化。這時你可能會發現結果大約等于 “2 × 吞吐率(每秒收到的消息數) × 端到端時延” (最小的額定容量的2倍)。

注意,此參數慎用,過大的maxspoutpending會增加某個batch fail的風險,如果不能合理處理fail(如寫磁盤),則將其設置為1以盡量降低其fail的風險。如果可以通過state來處理fail,則可選擇最優參數。

在啟用了ack的情況下,spout中有個RotatingMap來保存spout已經發送出去,但未收到ack結果的消息。RotatingMap最大的大小為p*num-task,其中num-task就是spout的task數量,而p為topology.max.spout.pending的值,也可以通過setMaxSpoutPending來指定,是指每個task最多已經發送出去但未被ack的消息數量。

若設置過小,則task的處理能力未充分應用,不能達到最佳的吞吐量。若設置過大,則消費過多的內存,還有可能spout的消息不能及時處理,從而導致fail的出現。

1、spout的Execute latency(執行nextTuple的時間)為17ms,因此理論上每秒每個spout task的最大發送速度是60個tuple。
2、一個tuple的處理時長約為200ms(topo的complete latency)
3、200ms內有大約有60*0.2=12個tuple被發送。
4、因此MaxSpoutPenging被設置為12較為合理。

小結:1/Execute latency*complete latency,即使用topo的complete latency除以Execute latency即可,但實際上不應該考慮如此極端的情況,以避免過多的fail出現,所以可以設置為上述值除以1.5左右。

默認值為1,需要改為合理的值。對trident是否適用??

//任何時刻中,一個spout task最多可以同時處理的tuple數量,即已經emite,但未acked的tuple數量。默認為1Number active = (Number) conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);if(active==null) {_maxTransactionActive = 1;} else {_maxTransactionActive = active.intValue();}

6、避免出現fail

bolt在處理消息時,worker 的日志中出現Failing message

原因:可能是因為Topology 的消息處理超時所致。一個常見的原因是supervisor的負載太高(如網絡、磁盤IO等),不能及時的處理消息,從而導致fail。

解決方法:提交Topology 時設置適當的消息超時時間,比默認消息超時時間(30
秒)更長。比如:

conf.setMessageTimeoutSecs(60);

或者:

config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS,60);

也可以在storm.yaml中修改這個參數:

topology.message.timeout.secs: 30

因為Config.java中定義了:

public static final String TOPOLOGY_MESSAGE_TIMEOUT_SECS = "topology.message.timeout.secs";

一個場景:拓撲中進行大量的磁盤IO輸出,當負載過高時,機器不能在超時時間內處理消息,從而消息fail掉,導致重傳。而由于已經寫入磁盤的IO無法清除,所以重傳時再寫入磁盤會導致數據重復。

7、batch interval

  • 將 trident 的 batch interval 配置為你的集群的端到端時延的 50% 左右

8、Coordinator 是什么,為什么會有很多 Coordinator?

Trident spout 實際上是通過 Storm 的 bolt 運行的。MasterBatchCoordinator(MBC)封裝了 Trident 拓撲的 spout,它負責整合 Trident 中的 batch,這一點對于你所使用的任何類型的 spout 而言都是一樣的。Trident 的 batch 就是在 MBC 向各個 spout-coordinator 分發種子 tuple 的過程中生成的。Spout-coordinator bolt 知道你所定義的 spout 是如何互相協作的 —— 實際上,在使用 Kafka 的情況下,各個 spout 就是通過 spout-coordinator 來獲取 pull 消息所需要的 partition 和 offset 信息的。

在 spout 的 metadata 記錄中能夠存儲什么信息?
只能存儲少量靜態數據,而且是越少越好(盡管你確實可以向其中存儲更多的信息,不過我們不推薦這樣做)。

emitPartitionBatchNew 函數是多久調用一次的?
由于在 Trident 中 MBC 才是實際運行的 spout,一個 batch 中的所有 tuple 都是 MBC 生成的 tuple 樹的節點。也就是說,Storm 的 “max spout pending” 參數實際上定義的是可以并發運行的 batch 數量。MBC 在滿足以下兩個條件下會發送出一個新的 batch:首先,掛起的 tuple 數需要小于 “max pending” 參數;其次,距離上一個 batch 的發送已經過去了至少一個trident batch interval 的間隔時間。

如果沒有數據發送,Trident 會降低發送頻率嗎?
是的,Storm 中有一個可選的 “spout 等待策略”,默認配置是 sleep 一段指定的配置時間。

topology.spout.wait.strategy: "org.apache.storm.spout.SleepSpoutWaitStrategy" topology.sleep.spout.wait.strategy.time.ms: 1

Trident batch interval 參數有什么用?
你知道 486 時代的計算機上面為什么有個 trubo button 嗎?這個參數的作用和這個按鈕有點像。

實際上,trident batch interval 有兩個用處。首先,它可以用于減緩 spout 從遠程數據源獲取數據的速度,但這不會影響數據處理的效率。例如,對于一個從給定的 S3 存儲區中讀取批量上傳文件并按行發送數據的 spout,我們就不希望它經常觸發 S3 的閾值,因為文件要隔幾分鐘才會上傳一次,而且每個 batch 也需要花費一定的時間來執行。

另一個用處是限制啟動期間或者突發數據負載情況下內部消息隊列的負載壓力。如果 spout 突然活躍起來,并向系統中擠入了 10 個 batch 的記錄,那么可能會有從 batch7 開始的大量不緊急的 tuple 堵塞住傳輸緩沖區,并且阻塞了從 batch3 中的 tuple(甚至可能包含 batch3 中的部分舊 tuple)的 commit 過程#。對于這種情況,我們的解決方法就是將 trident batch interval 設置為正常的端到端處理時延的一半左右 —— 也就是說如果需要花費 600 ms 的時間處理一個 batch,那么就可以每 300 ms 處理一個 batch。

注意,這個 300 ms 僅僅是一個上限值,而不是額外增加的延時時間,如果你的 batch 需要花費 258 ms 來運行,那么 Trident 就只會延時等待 42 ms。

9、fetch Size

在讀取streaming數據流時,需要將某個時間點前的數據掛掉,然后從這個時間點開始計算指標。
問題來了,如果指定每個批次的大小很大的話,數據丟得很快,很快就可以進入處理邏輯,但是這批數據由于非常大,后面的bolt處理起來就會有問題,甚至出現OOM。
如果指定批次很小的話話,則需要很長時間才能把數據丟完,可能半天以上,基本不能接受。

解決辦法:
默認情況下,每個kafka的數據文件大小為1G,因此丟棄數據時,每個分區均需要丟1G以上的數據。
減小這些topic的文件大小,如64M,則每次只需要丟掉100G左右的數據。

bin/kafka-topics.sh –create –zookeeper 10.120.69.44:2181/kafka –topic streaming_g18_sdc –partitions 20 –replication-factor 2 –config segment.bytes=67108864

10、設置拓撲的jvm

可以在拓撲級別指定Jvm參數,覆蓋storm.yaml中的配置:

config.put(Config.TOPOLOGY_WORKER_CHILDOPTS,"-Xmx2048m -Xms2048m -Xmn384m -XX:PermSize=128m -XX:+UseConcMarkSweepGC");

總結

以上是生活随笔為你收集整理的storm调优的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。