Disruptor编程指南
Disruptor編程指南
@(STORM)[storm, java]
完整代碼請見:https://github.com/lujinhong/lujinhong-commons/tree/master/lujinhong-commons-java/src/main/java/com/lujinhong/commons/java/disruptor
部分內(nèi)容參考自:http://www.cnblogs.com/haiq/p/4112689.html
- Disruptor編程指南
- 一什么是 Disruptor
- 二Disruptor 的核心概念
- 三編程指南
- 1創(chuàng)建事件
- 2創(chuàng)建事件工廠
- 3創(chuàng)建事件處理器
- 4主類
- 四其它內(nèi)容
- 1等待策略
- 2關(guān)于消息發(fā)布
一、什么是 Disruptor
從功能上來看,Disruptor 是實現(xiàn)了“隊列”的功能,而且是一個有界隊列。那么它的應(yīng)用場景自然就是“生產(chǎn)者-消費者”模型的應(yīng)用場合了。
可以拿 JDK 的 BlockingQueue 做一個簡單對比,以便更好地認識 Disruptor 是什么。
我們知道 BlockingQueue 是一個 FIFO 隊列,生產(chǎn)者(Producer)往隊列里發(fā)布(publish)一項事件(或稱之為“消息”也可以)時,消費者(Consumer)能獲得通知;如果沒有事件時,消費者被堵塞,直到生產(chǎn)者發(fā)布了新的事件。
這些都是 Disruptor 能做到的,與之不同的是,Disruptor 能做更多:
同一個“事件”可以有多個消費者,消費者之間既可以并行處理,也可以相互依賴形成處理的先后次序(形成一個依賴圖); 預(yù)分配用于存儲事件內(nèi)容的內(nèi)存空間; 針對極高的性能目標而實現(xiàn)的極度優(yōu)化和無鎖的設(shè)計;以上的描述雖然簡單地指出了 Disruptor 是什么,但對于它“能做什么”還不是那么直截了當(dāng)。一般性地來說,當(dāng)你需要在兩個獨立的處理過程(兩個線程)之間交換數(shù)據(jù)時,就可以使用 Disruptor 。當(dāng)然使用隊列(如上面提到的 BlockingQueue)也可以,只不過 Disruptor 做得更好。
拿隊列來作比較的做法弱化了對 Disruptor 有多強大的認識,如果想要對此有更多的了解,可以仔細看看 Disruptor 在其東家 LMAX 交易平臺(也是實現(xiàn)者) 是如何作為核心架構(gòu)來使用的,這方面就不做詳述了,問度娘或谷哥都能找到。
二、Disruptor 的核心概念
先從了解 Disruptor 的核心概念開始,來了解它是如何運作的。下面介紹的概念模型,既是領(lǐng)域?qū)ο?#xff0c;也是映射到代碼實現(xiàn)上的核心對象。
Ring Buffer
如其名,環(huán)形的緩沖區(qū)。曾經(jīng) RingBuffer 是 Disruptor 中的最主要的對象,但從3.0版本開始,其職責(zé)被簡化為僅僅負責(zé)對通過 Disruptor 進行交換的數(shù)據(jù)(事件)進行存儲和更新。在一些更高級的應(yīng)用場景中,Ring Buffer 可以由用戶的自定義實現(xiàn)來完全替代。
Sequence Disruptor
通過順序遞增的序號來編號管理通過其進行交換的數(shù)據(jù)(事件),對數(shù)據(jù)(事件)的處理過程總是沿著序號逐個遞增處理。一個 Sequence 用于跟蹤標識某個特定的事件處理者( RingBuffer/Consumer )的處理進度。雖然一個 AtomicLong 也可以用于標識進度,但定義 Sequence 來負責(zé)該問題還有另一個目的,那就是防止不同的 Sequence 之間的CPU緩存?zhèn)喂蚕?Flase Sharing)問題。
(注:這是 Disruptor 實現(xiàn)高性能的關(guān)鍵點之一,網(wǎng)上關(guān)于偽共享問題的介紹已經(jīng)汗牛充棟,在此不再贅述)。
Sequencer
Sequencer 是 Disruptor 的真正核心。此接口有兩個實現(xiàn)類 SingleProducerSequencer、MultiProducerSequencer ,它們定義在生產(chǎn)者和消費者之間快速、正確地傳遞數(shù)據(jù)的并發(fā)算法。
Sequence Barrier
用于保持對RingBuffer的 main published Sequence 和Consumer依賴的其它Consumer的 Sequence 的引用。 Sequence Barrier 還定義了決定 Consumer 是否還有可處理的事件的邏輯。
Wait Strategy
定義 Consumer 如何進行等待下一個事件的策略。 (注:Disruptor 定義了多種不同的策略,針對不同的場景,提供了不一樣的性能表現(xiàn))
Event
在 Disruptor 的語義中,生產(chǎn)者和消費者之間進行交換的數(shù)據(jù)被稱為事件(Event)。它不是一個被 Disruptor 定義的特定類型,而是由 Disruptor 的使用者定義并指定。
EventProcessor
EventProcessor 持有特定消費者(Consumer)的 Sequence,并提供用于調(diào)用事件處理實現(xiàn)的事件循環(huán)(Event Loop)。
EventHandler
Disruptor 定義的事件處理接口,由用戶實現(xiàn),用于處理事件,是 Consumer 的真正實現(xiàn)。
Producer
即生產(chǎn)者,只是泛指調(diào)用 Disruptor 發(fā)布事件的用戶代碼,Disruptor 沒有定義特定接口或類型。
三、編程指南
1、創(chuàng)建事件
在disruptor中,事件即是我們常說的消息。
public class LongEvent {private long _value;public void set(long value){_value = value;}public long getValue(){return _value;} }2、創(chuàng)建事件工廠
public class LongEventFactory implements EventFactory<LongEvent>{@Overridepublic LongEvent newInstance() {return new LongEvent();} }3、創(chuàng)建事件處理器
public class LongEventHandler implements EventHandler<LongEvent> {@Overridepublic void onEvent(LongEvent event, long sequent, boolean endOfBatch) throws Exception {System.out.println("Dealing " + event.getValue());} }4、主類
主類只要寫成2件事:
(1)啟動disruptor,并指定處理事件的handler
(2)向Disruptor中發(fā)布事件
四、其它內(nèi)容
1、等待策略
Disruptor 定義了 com.lmax.disruptor.WaitStrategy 接口用于抽象 Consumer 如何等待新事件,這是策略模式的應(yīng)用。
Disruptor 提供了多個 WaitStrategy 的實現(xiàn),每種策略都具有不同性能和優(yōu)缺點,根據(jù)實際運行環(huán)境的 CPU 的硬件特點選擇恰當(dāng)?shù)牟呗?#xff0c;并配合特定的 JVM 的配置參數(shù),能夠?qū)崿F(xiàn)不同的性能提升。
例如,BlockingWaitStrategy、SleepingWaitStrategy、YieldingWaitStrategy 等,其中,
BlockingWaitStrategy 是最低效的策略,但其對CPU的消耗最小并且在各種不同部署環(huán)境中能提供更加一致的性能表現(xiàn);
SleepingWaitStrategy 的性能表現(xiàn)跟 BlockingWaitStrategy 差不多,對 CPU 的消耗也類似,但其對生產(chǎn)者線程的影響最小,適合用于異步日志類似的場景;
YieldingWaitStrategy 的性能是最好的,適合用于低延遲的系統(tǒng)。在要求極高性能且事件處理線數(shù)小于 CPU 邏輯核心數(shù)的場景中,推薦使用此策略;例如,CPU開啟超線程的特性。
2、關(guān)于消息發(fā)布
此外,Disruptor 要求 RingBuffer.publish 必須得到調(diào)用的潛臺詞就是,如果發(fā)生異常也一樣要調(diào)用 publish ,那么,很顯然這個時候需要調(diào)用者在事件處理的實現(xiàn)上來判斷事件攜帶的數(shù)據(jù)是否是正確的或者完整的,這是實現(xiàn)者應(yīng)該要注意的事情。
此外,Disruptor 要求 RingBuffer.publish 必須得到調(diào)用的潛臺詞就是,如果發(fā)生異常也一樣要調(diào)用 publish ,那么,很顯然這個時候需要調(diào)用者在事件處理的實現(xiàn)上來判斷事件攜帶的數(shù)據(jù)是否是正確的或者完整的,這是實現(xiàn)者應(yīng)該要注意的事情。
Disruptor 還提供另外一種形式的調(diào)用來簡化以上操作,并確保 publish 總是得到調(diào)用。
static class Translator implements EventTranslatorOneArg<LongEvent, Long>{@Overridepublic void translateTo(LongEvent event, long sequence, Long data) {event.set(data);} }public static Translator TRANSLATOR = new Translator();public static void publishEvent2(Disruptor<LongEvent> disruptor) {// 發(fā)布事件;RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();long data = getEventData();//獲取要通過事件傳遞的業(yè)務(wù)數(shù)據(jù);ringBuffer.publishEvent(TRANSLATOR, data); }總結(jié)
以上是生活随笔為你收集整理的Disruptor编程指南的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RDD, DataFrame or Da
- 下一篇: UncaughtExceptionHan