日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Disruptor编程指南

發(fā)布時間:2024/1/23 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Disruptor编程指南 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

Disruptor編程指南

@(STORM)[storm, java]

完整代碼請見:https://github.com/lujinhong/lujinhong-commons/tree/master/lujinhong-commons-java/src/main/java/com/lujinhong/commons/java/disruptor
部分內(nèi)容參考自:http://www.cnblogs.com/haiq/p/4112689.html

  • Disruptor編程指南
    • 一什么是 Disruptor
    • 二Disruptor 的核心概念
    • 三編程指南
      • 1創(chuàng)建事件
      • 2創(chuàng)建事件工廠
      • 3創(chuàng)建事件處理器
      • 4主類
    • 四其它內(nèi)容
      • 1等待策略
      • 2關(guān)于消息發(fā)布

一、什么是 Disruptor

從功能上來看,Disruptor 是實現(xiàn)了“隊列”的功能,而且是一個有界隊列。那么它的應(yīng)用場景自然就是“生產(chǎn)者-消費者”模型的應(yīng)用場合了。

可以拿 JDK 的 BlockingQueue 做一個簡單對比,以便更好地認識 Disruptor 是什么。

我們知道 BlockingQueue 是一個 FIFO 隊列,生產(chǎn)者(Producer)往隊列里發(fā)布(publish)一項事件(或稱之為“消息”也可以)時,消費者(Consumer)能獲得通知;如果沒有事件時,消費者被堵塞,直到生產(chǎn)者發(fā)布了新的事件。

這些都是 Disruptor 能做到的,與之不同的是,Disruptor 能做更多:

同一個“事件”可以有多個消費者,消費者之間既可以并行處理,也可以相互依賴形成處理的先后次序(形成一個依賴圖); 預(yù)分配用于存儲事件內(nèi)容的內(nèi)存空間; 針對極高的性能目標而實現(xiàn)的極度優(yōu)化和無鎖的設(shè)計;

以上的描述雖然簡單地指出了 Disruptor 是什么,但對于它“能做什么”還不是那么直截了當(dāng)。一般性地來說,當(dāng)你需要在兩個獨立的處理過程(兩個線程)之間交換數(shù)據(jù)時,就可以使用 Disruptor 。當(dāng)然使用隊列(如上面提到的 BlockingQueue)也可以,只不過 Disruptor 做得更好。

拿隊列來作比較的做法弱化了對 Disruptor 有多強大的認識,如果想要對此有更多的了解,可以仔細看看 Disruptor 在其東家 LMAX 交易平臺(也是實現(xiàn)者) 是如何作為核心架構(gòu)來使用的,這方面就不做詳述了,問度娘或谷哥都能找到。

二、Disruptor 的核心概念

先從了解 Disruptor 的核心概念開始,來了解它是如何運作的。下面介紹的概念模型,既是領(lǐng)域?qū)ο?#xff0c;也是映射到代碼實現(xiàn)上的核心對象。

Ring Buffer
如其名,環(huán)形的緩沖區(qū)。曾經(jīng) RingBuffer 是 Disruptor 中的最主要的對象,但從3.0版本開始,其職責(zé)被簡化為僅僅負責(zé)對通過 Disruptor 進行交換的數(shù)據(jù)(事件)進行存儲和更新。在一些更高級的應(yīng)用場景中,Ring Buffer 可以由用戶的自定義實現(xiàn)來完全替代。
Sequence Disruptor
通過順序遞增的序號來編號管理通過其進行交換的數(shù)據(jù)(事件),對數(shù)據(jù)(事件)的處理過程總是沿著序號逐個遞增處理。一個 Sequence 用于跟蹤標識某個特定的事件處理者( RingBuffer/Consumer )的處理進度。雖然一個 AtomicLong 也可以用于標識進度,但定義 Sequence 來負責(zé)該問題還有另一個目的,那就是防止不同的 Sequence 之間的CPU緩存?zhèn)喂蚕?Flase Sharing)問題。
(注:這是 Disruptor 實現(xiàn)高性能的關(guān)鍵點之一,網(wǎng)上關(guān)于偽共享問題的介紹已經(jīng)汗牛充棟,在此不再贅述)。
Sequencer
Sequencer 是 Disruptor 的真正核心。此接口有兩個實現(xiàn)類 SingleProducerSequencer、MultiProducerSequencer ,它們定義在生產(chǎn)者和消費者之間快速、正確地傳遞數(shù)據(jù)的并發(fā)算法。
Sequence Barrier
用于保持對RingBuffer的 main published Sequence 和Consumer依賴的其它Consumer的 Sequence 的引用。 Sequence Barrier 還定義了決定 Consumer 是否還有可處理的事件的邏輯。
Wait Strategy
定義 Consumer 如何進行等待下一個事件的策略。 (注:Disruptor 定義了多種不同的策略,針對不同的場景,提供了不一樣的性能表現(xiàn))
Event
在 Disruptor 的語義中,生產(chǎn)者和消費者之間進行交換的數(shù)據(jù)被稱為事件(Event)。它不是一個被 Disruptor 定義的特定類型,而是由 Disruptor 的使用者定義并指定。
EventProcessor
EventProcessor 持有特定消費者(Consumer)的 Sequence,并提供用于調(diào)用事件處理實現(xiàn)的事件循環(huán)(Event Loop)。
EventHandler
Disruptor 定義的事件處理接口,由用戶實現(xiàn),用于處理事件,是 Consumer 的真正實現(xiàn)。
Producer
即生產(chǎn)者,只是泛指調(diào)用 Disruptor 發(fā)布事件的用戶代碼,Disruptor 沒有定義特定接口或類型。

三、編程指南

1、創(chuàng)建事件

在disruptor中,事件即是我們常說的消息。

public class LongEvent {private long _value;public void set(long value){_value = value;}public long getValue(){return _value;} }

2、創(chuàng)建事件工廠

public class LongEventFactory implements EventFactory<LongEvent>{@Overridepublic LongEvent newInstance() {return new LongEvent();} }

3、創(chuàng)建事件處理器

public class LongEventHandler implements EventHandler<LongEvent> {@Overridepublic void onEvent(LongEvent event, long sequent, boolean endOfBatch) throws Exception {System.out.println("Dealing " + event.getValue());} }

4、主類

主類只要寫成2件事:
(1)啟動disruptor,并指定處理事件的handler
(2)向Disruptor中發(fā)布事件

public class DisruptorDemo {private static Random random = new Random();public static void main(String[] args) {// 一、啟動disruptor,并指定處理事件的handlerEventFactory<LongEvent> eventFactory = new LongEventFactory();ExecutorService executor = Executors.newSingleThreadExecutor();int ringBufferSize = 1024 * 1024; // RingBuffer 大小,必須是 2 的 N 次方;Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executor,ProducerType.SINGLE, new YieldingWaitStrategy());//指定使用哪個handler來處理事件EventHandler<LongEvent> eventHandler = new LongEventHandler();disruptor.handleEventsWith(eventHandler);disruptor.start();// 二、向Disruptor中發(fā)布事件RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();for (int i = 0; i < 100; i++) {long sequence = ringBuffer.next();// 請求下一個事件序號;try {LongEvent event = ringBuffer.get(sequence);// 獲取該序號對應(yīng)的事件對象;long data = getEventData();// 獲取要通過事件傳遞的業(yè)務(wù)數(shù)據(jù);event.set(data);} finally {ringBuffer.publish(sequence);// 發(fā)布事件;}}disruptor.shutdown();// 關(guān)閉 disruptor,方法會堵塞,直至所有的事件都得到處理;executor.shutdown();// 關(guān)閉 disruptor 使用的線程池;如果需要的話,必須手動關(guān)閉, disruptor 在// shutdown 時不會自動關(guān)閉;}private static long getEventData() {return random.nextLong();}}

四、其它內(nèi)容

1、等待策略

Disruptor 定義了 com.lmax.disruptor.WaitStrategy 接口用于抽象 Consumer 如何等待新事件,這是策略模式的應(yīng)用。
Disruptor 提供了多個 WaitStrategy 的實現(xiàn),每種策略都具有不同性能和優(yōu)缺點,根據(jù)實際運行環(huán)境的 CPU 的硬件特點選擇恰當(dāng)?shù)牟呗?#xff0c;并配合特定的 JVM 的配置參數(shù),能夠?qū)崿F(xiàn)不同的性能提升。
例如,BlockingWaitStrategy、SleepingWaitStrategy、YieldingWaitStrategy 等,其中,
BlockingWaitStrategy 是最低效的策略,但其對CPU的消耗最小并且在各種不同部署環(huán)境中能提供更加一致的性能表現(xiàn);
SleepingWaitStrategy 的性能表現(xiàn)跟 BlockingWaitStrategy 差不多,對 CPU 的消耗也類似,但其對生產(chǎn)者線程的影響最小,適合用于異步日志類似的場景;
YieldingWaitStrategy 的性能是最好的,適合用于低延遲的系統(tǒng)。在要求極高性能且事件處理線數(shù)小于 CPU 邏輯核心數(shù)的場景中,推薦使用此策略;例如,CPU開啟超線程的特性。

2、關(guān)于消息發(fā)布

此外,Disruptor 要求 RingBuffer.publish 必須得到調(diào)用的潛臺詞就是,如果發(fā)生異常也一樣要調(diào)用 publish ,那么,很顯然這個時候需要調(diào)用者在事件處理的實現(xiàn)上來判斷事件攜帶的數(shù)據(jù)是否是正確的或者完整的,這是實現(xiàn)者應(yīng)該要注意的事情。

此外,Disruptor 要求 RingBuffer.publish 必須得到調(diào)用的潛臺詞就是,如果發(fā)生異常也一樣要調(diào)用 publish ,那么,很顯然這個時候需要調(diào)用者在事件處理的實現(xiàn)上來判斷事件攜帶的數(shù)據(jù)是否是正確的或者完整的,這是實現(xiàn)者應(yīng)該要注意的事情。

Disruptor 還提供另外一種形式的調(diào)用來簡化以上操作,并確保 publish 總是得到調(diào)用。

static class Translator implements EventTranslatorOneArg<LongEvent, Long>{@Overridepublic void translateTo(LongEvent event, long sequence, Long data) {event.set(data);} }public static Translator TRANSLATOR = new Translator();public static void publishEvent2(Disruptor<LongEvent> disruptor) {// 發(fā)布事件;RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();long data = getEventData();//獲取要通過事件傳遞的業(yè)務(wù)數(shù)據(jù);ringBuffer.publishEvent(TRANSLATOR, data); }

總結(jié)

以上是生活随笔為你收集整理的Disruptor编程指南的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。