日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

你需要知道的高性能并发框架Disruptor原理

發布時間:2023/11/27 生活经验 43 豆豆
生活随笔 收集整理的這篇文章主要介紹了 你需要知道的高性能并发框架Disruptor原理 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Disruptor的小史

現在要是不知道Disruptor真的已經很outer了,Disruptor是英國外匯交易公司LMAX開發的一款開源的高性能隊列,LMAX Disruptor是一個高性能的線程間消息傳遞庫,它源于LMAX對并發性,性能和非阻塞算法的研究,如今構成了其Exchange基礎架構的核心部分。

稍后,包括Apache Storm、Camel、Log4j等在內的很多知名項目都集成了Disruptor。國內不少一線大廠技術團隊也在用,或者借鑒了其優秀的架構思想。Disruptor通過無鎖設計實現了高并發高性能,其設計思想可以擴展到分布式環境,通過無鎖設計來提升服務的高性能。

?

Martin Fowler的布道

著名的軟件設計模式專家Martin Fowler專門寫了一篇文章來推廣https://martinfowler.com/articles/lmax.html

LMAX是一個新的零售金融交易平臺。因此,它必須以低延遲處理許多交易。該系統構建于JVM平臺之上,并以業務邏輯處理器為中心,可在單個線程上處理每秒600萬個訂單。業務邏輯處理器使用事件源完全在內存中運行。業務邏輯處理器被Disruptors包圍 - Disruptors是一個并發組件,它實現了一個無需鎖定即可運行的隊列網絡。在設計過程中,團隊得出結論,使用隊列的高性能并發模型的最新方向與現代CPU設計基本不一致。

Disruptor數據結構

在原始級別,可以將Disruptor視為隊列的多播圖,其中生產者在其上放置對象,這些對象通過單獨的下游隊列發送給所有消費者以供并行使用。當你查看內部時,你會看到這個隊列網絡實際上是一個單一的數據結構 - 一個環形緩沖區

每個生產者和消費者都有一個序列計數器,用于指示它當前正在處理的緩沖區中的哪個槽。每個生產者/消費者編寫自己的序列計數器,但可以讀取其他序列計數器。通過這種方式,生產者可以讀取消費者的計數器,以確保可以在沒有計數器鎖定的情況下使用它想要寫入的插槽。類似地,消費者可以確保它只通過觀察計數器一旦另一個消費者完成消息就處理消息。

?

輸出Disruptor類似,但它們只有兩個連續的消費者用于編組和輸出。輸出事件被組織成幾個主題,因此消息只能發送給對它們感興趣的接收者。每個主題都有自己的Disruptor。

我所描述的Disruptor以一種生產者和多種消費者的風格使用,但這并不是對Disruptor設計的限制。Disruptor也可以與多個生產者一起工作,在這種情況下它仍然不需要鎖。

Disruptor設計的一個好處是,如果消費者遇到問題而落后,它可以讓消費者更容易趕上。如果解組器在插槽15上處理時出現問題并且當接收器在插槽31上時返回,則它可以從一個批次中的插槽16-30讀取數據以趕上。批量讀取來自Disruptor的數據使得滯后的消費者更容易趕上,從而減少總體延遲。

?環形緩沖區很大:輸入緩沖區有2000萬個插槽,每個輸出緩沖區有400萬個插槽。序列計數器是64位長整數,即使在環形槽縫合時也會單調增加。緩沖區設置為2的冪的大小,因此編譯器可以執行有效的模運算以從序列計數器編號映射到槽號。與系統的其他部分一樣,Disruptor在一夜之間被反彈。這種反彈主要用于擦除內存,以便在交易過程中發生昂貴的垃圾收集事件的可能性較小。(我也認為定期重啟是一個好習慣,所以你要排練如何在緊急情況下這樣做。)

?

Disruptor在哪里

https://github.com/LMAX-Exchange/disruptor/

理解Disruptor是什么的最好方法是將它與目前很好理解和非常相似的東西進行比較。可以把Disruptor類比成Java的阻塞隊列BlockingQueue。像隊列一樣,Disruptor的目的是在同一進程內的線程之間移動數據(例如消息或事件)。但是,Disruptor提供了一些將其與隊列區分開來的關鍵功能。他們是:

1)具有消費者依賴關系圖的消費者多播事件。

2)為事件預分配內存。

3)可選擇無鎖模式。

?

Disruptor核心概念

在我們理解Disruptor是如何工作之前,需要先理解一些Disruptor團隊定義的術語。

  • Ring Buffer環形緩沖區:環形緩沖區通常被認為是Disruptor的核心,但是從3.0開始,環形緩沖區僅負責存儲和更新通過Disruptor的數據(事件)。對于一些高級用例,可以完全由用戶替換。
  • Sequence序列:Disruptor使用Sequences作為識別特定組件所在位置的方法。每個消費者(EventProcessor)都像Disruptor本身一樣維護一個Sequence。大多數并發代碼依賴于這些Sequence值的移動,因此Sequence支持AtomicLong的許多當前功能。事實上,兩者之間唯一真正的區別是序列包含額外的功能,以防止序列和其他值之間的共享錯誤。
  • Sequencer:Sequencer是Disruptor真正的核心。該接口的兩個實現(單生成者,多生產者)實現了所有并發算法,用于在生產者和消費者之間快速而又正確地傳遞數據。
  • Sequence Barrier序列屏障:序列屏障由序列發生器產生,包含對序列發生器中主要發布的序列和任何依賴性消費者的序列的引用。它包含確定是否有任何可供消費者處理的事件的邏輯。
  • Wait Strategy等待策略:等待策略確定消費者如何等待生產者將事件放入Disruptor。有關可選鎖定的部分中提供了更多詳細信息。
  • Event事件:從生產者傳遞給消費者的數據單位。事件沒有特定的代碼表示,因為它完全由用戶定義。
  • EventProcessor:用于處理來自Disruptor的事件的主事件循環,并具有消費者序列的所有權。有一個名為?BatchEventProcessor的表示,它包含事件循環的有效實現,并將回調到使用的提供的EventHandler接口實現。
  • EventHandler:由用戶實現并代表Disruptor的使用者的接口。
  • 生產者:這是調用Disruptor以將事件排入隊列的用戶代碼。這個概念在代碼中也沒有表示。

為了將這些元素置于上下文中,下面是LMAX如何在其高性能核心服務(例如交換)中使用Disruptor的示例。

圖1.具有一組依賴消費者的Disruptor

多播事件

這是普通隊列和Disruptor之間最大的行為差異。當您有多個消費者在同一個Disruptor上監聽時,所有事件都會發布給所有消費者,而不是一個事件只發送給單一消費者隊列。Disruptor的行為旨在用于需要對同一數據進行獨立多個并行操作的情況。

來自LMAX的規范示例是我們有三個操作,即日志記錄(將輸入數據寫入持久性日志文件),復制(將輸入數據發送到另一臺機器以確保存在數據的遠程副本)和業務邏輯(真正的處理工作)。

Executor風格的事件處理,通過在同一處并行處理不同的事件來找到比例,也可以使用WorkerPool。請注意,它是在現有的Disruptor類之上進行的,并且不會使用相同的第一類支持進行處理,因此它可能不是實現該特定目標的最有效方法。

查看圖1可以看到有3個事件處理程序(JournalConsumer,ReplicationConsumer和ApplicationConsumer)監聽Disruptor,這些事件處理程序中的每一個都將按相同的順序接收Disruptor中可用的所有消息,允許每個消費者的工作并行進行。

?

消費者依賴圖

為了支持并行處理行為的實際應用,有必要支持消費者之間的協調。返回參考上述示例,必須防止業務邏輯消費者在日志記錄和復制消費者完成其任務之前取得進展。我們稱這個概念為門控,或者更準確地說,這種行為的超集特征稱為門控(concept gating)

門控發生在兩個地方。

首先,我們需要確保生產者不會超過消費者。這是通過調用RingBuffer.addGatingConsumers()將相關的使用者添加到Disruptor來處理的。

其次,先前提到的情況是通過從必須首先完成其處理的組件構造包含序列的SequenceBarrier來實現的。

參考圖1有3個消費者正在收聽來自Ring Buffer的事件。此示例中有一個依賴關系圖。ApplicationConsumer依賴于JournalConsumer和ReplicationConsumer。這意味著JournalConsumer和ReplicationConsumer可以彼此并行的運行。從ApplicationConsumer的SequenceBarrier到JournalConsumer和ReplicationConsumer的序列的連接可以看到依賴關系。

值得注意的是Sequencer與下游消費者之間的關系。它的一個作用是確保發布不包裝Ring Buffer。要做到這一點,下游消費者中沒有一個可能具有低于環形緩沖區序列的序列,而不是環形緩沖區的大小。但是,使用依賴關系圖可以進行優化。由于ApplicationConsumers Sequence保證小于或等于JournalConsumer和ReplicationConsumer(這是該依賴關系所確保的),因此Sequencer只需要查看ApplicationConsumer的Sequence。在更一般的意義上,Sequencer只需要知道作為依賴關系樹中葉節點的使用者的序列。

?

事件預分配

Disruptor的設計目標之一是能在低延遲環境中使用。在低延遲系統中,必須減少或移除內存分配。在基于Java的系統中,目的是減少由于垃圾收集GC導致的系統停頓(在低延遲C / C ++系統中,由于存在于內存分配器上的爭用,大量內存分配也存在問題)。

為了支持這一點,用戶可以提前分配Disruptor中事件所需的存儲空間。在構造期間,EventFactory由用戶提供,并將在Disruptor的Ring Buffer中為每個條目調用。將新數據發布到Disruptor時,API將允許用戶獲取構造的對象,以便他們可以調用方法或更新該存儲對象上的字段。Disruptor保證這些操作只要正確實現就是并發安全的。

?

可選擇無鎖

低延遲期望推動的另一個關鍵實現細節是廣泛使用無鎖算法來實現Disruptor。

所有內存可見性和正確性保證都是使用內存屏障(memory barriers)和CAS操作實現的。只有一個用例需要實際鎖定并且在BlockingWaitStrategy中。這僅僅是為了使用條件,以便在等待新事件到達時停放消耗線程。許多低延遲系統將使用忙等待來避免使用條件可能引起的性能抖動,但是在系統忙等待操作的數量可能導致性能顯著下降,尤其是在CPU資源嚴重受限的情況下。例如,虛擬化環境中的Web服務器。

?

獲得Disruptor的Jar

Disruptor jar文件可從Maven,可以從那里集成到您選擇的依賴管理器中。

<dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.4.2</version>
</dependency>

為了開始使用Disruptor,我們將考慮一個非常簡單的例子,一個將生產者傳遞給消費者的Long值,消費者只需打印出該值。

首先,我們將定義攜??帶數據的事件。

public class LongEvent
{private long value;public void set(long value){this.value = value;}
}

為了讓Disruptor為我們預先分配這些事件,我們需要一個將執行構造的EventFactory

import com.lmax.disruptor.EventFactory;public class LongEventFactory implements EventFactory<LongEvent>
{public LongEvent newInstance(){return new LongEvent();}
}

一旦我們定義了事件,我們需要創建一個處理這些事件的消費者。在我們的例子中,我們要做的就是從控制臺中打印出值。

import com.lmax.disruptor.EventHandler;public class LongEventHandler implements EventHandler<LongEvent>
{public void onEvent(LongEvent event, long sequence, boolean endOfBatch){System.out.println("Event: " + event);}
}

我們需要這些事件的來源,為了舉例,我將假設數據來自某種I/O設備,例如網絡或ByteBuffer形式的文件。

?

使用翻譯器發布

使用Disruptor的3.0版本,添加了更豐富的Lambda風格的API,以幫助開發人員將這種復雜性封裝在Ring Buffer中,因此3.0之后發布消息的首選方法是通過API的Event Publisher / Event Translator部分。例如

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.EventTranslatorOneArg;public class LongEventProducerWithTranslator
{private final RingBuffer<LongEvent> ringBuffer;public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer){this.ringBuffer = ringBuffer;}private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =new EventTranslatorOneArg<LongEvent, ByteBuffer>(){public void translateTo(LongEvent event, long sequence, ByteBuffer bb){event.set(bb.getLong(0));}};public void onData(ByteBuffer bb){ringBuffer.publishEvent(TRANSLATOR, bb);}
}

?

這種方法的另一個優點是翻譯器代碼可以被拉入一個單獨的類中,并可以輕松地單獨進行單元測試。

Disruptor提供了許多不同的接口(EventTranslator,EventTranslatorOneArg,EventTranslatorTwoArg等),可以實現這些接口來提供翻譯。原因是允許轉換器被表示為靜態類或非實例捕獲lambda(non-capturing lambda)(當Java 8 rolls around)作為轉換方法的參數通過Ring Buffer上的調用傳遞給轉換器。

?

使用舊版API發布

我們也可以使用更“原始”的方法。

import com.lmax.disruptor.RingBuffer;public class LongEventProducer
{private final RingBuffer<LongEvent> ringBuffer;public LongEventProducer(RingBuffer<LongEvent> ringBuffer){this.ringBuffer = ringBuffer;}public void onData(ByteBuffer bb){long sequence = ringBuffer.next();  // Grab the next sequencetry{LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor// for the sequenceevent.set(bb.getLong(0));  // Fill with data}finally{ringBuffer.publish(sequence);}}
}

顯而易見的是,事件發布變得比使用簡單隊列更復雜。這是由于對事件預分配的需求。它需要(在最低級別)消息發布的兩階段方法,即聲明環形緩沖區中的插槽然后發布可用數據。還必須將發布包裝在try/finally塊中。如果我們在Ring Buffer中聲明一個插槽(調用RingBuffer.next()),那么我們必須發布這個序列。如果不這樣做可能會導致Disruptor狀態的變壞。具體而言,在多生產者的情況下,這將導致消費者停滯并且在沒有重啟的情況下無法恢復。因此,建議使用EventTranslator API。

最后一步是將整個事物連接在一起。可以手動連接所有組件,但是它可能有點復雜,因此提供DSL以簡化構造。一些更復雜的選項不能通過DSL獲得,但它適用于大多數情況。

import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.nio.ByteBuffer;public class LongEventMain
{public static void main(String[] args) throws Exception{// The factory for the eventLongEventFactory factory = new LongEventFactory();// Specify the size of the ring buffer, must be power of 2.int bufferSize = 1024;// Construct the DisruptorDisruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);// Connect the handlerdisruptor.handleEventsWith(new LongEventHandler());// Start the Disruptor, starts all threads runningdisruptor.start();// Get the ring buffer from the Disruptor to be used for publishing.RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();LongEventProducer producer = new LongEventProducer(ringBuffer);ByteBuffer bb = ByteBuffer.allocate(8);for (long l = 0; true; l++){bb.putLong(0, l);producer.onData(bb);Thread.sleep(1000);}}
}

使用Java 8

Disruptor API的設計影響之一是Java 8將依賴功能接口的概念作為Java Lambdas的類型聲明。Disruptor API中的大多數接口定義符合功能接口的要求,因此可以使用Lambda而不是自定義類,這可以減少所需的boiler place。

import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.nio.ByteBuffer;public class LongEventMain
{public static void main(String[] args) throws Exception{// Specify the size of the ring buffer, must be power of 2.int bufferSize = 1024;// Construct the DisruptorDisruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);// Connect the handlerdisruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event));// Start the Disruptor, starts all threads runningdisruptor.start();// Get the ring buffer from the Disruptor to be used for publishing.RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();ByteBuffer bb = ByteBuffer.allocate(8);for (long l = 0; true; l++){bb.putLong(0, l);ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);Thread.sleep(1000);}}
}

注意不再需要許多類(例如處理程序,翻譯器)。還要注意lambda如何publishEvent()僅用于引用傳入的參數。

如果我們要將該代碼編寫為:

ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{bb.putLong(0, l);ringBuffer.publishEvent((event, sequence) -> event.set(bb.getLong(0)));Thread.sleep(1000);
}

這將創建一個捕獲lambda,這意味著它需要實例化一個對象來保存ByteBuffer bb變量,因為它將lambda傳遞給publishEvent()調用。這將產生額外的(不必要的)垃圾,因此如果要求低GC壓力,則應首選將參數傳遞給lambda的調用。

給那個方法引用可以用來代替匿名lamdbas,可以用這種方式重寫這個例子。

import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.nio.ByteBuffer;public class LongEventMain
{public static void handleEvent(LongEvent event, long sequence, boolean endOfBatch){System.out.println(event);}public static void translate(LongEvent event, long sequence, ByteBuffer buffer){event.set(buffer.getLong(0));}public static void main(String[] args) throws Exception{// Specify the size of the ring buffer, must be power of 2.int bufferSize = 1024;// Construct the DisruptorDisruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);// Connect the handlerdisruptor.handleEventsWith(LongEventMain::handleEvent);// Start the Disruptor, starts all threads runningdisruptor.start();// Get the ring buffer from the Disruptor to be used for publishing.RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();ByteBuffer bb = ByteBuffer.allocate(8);for (long l = 0; true; l++){bb.putLong(0, l);ringBuffer.publishEvent(LongEventMain::translate, bb);Thread.sleep(1000);}}
}

Disruptor通過精巧的無鎖設計實現了在高并發情形下的高性能。

在美團內部,很多高并發場景借鑒了Disruptor的設計,減少競爭的強度。其設計思想可以擴展到分布式場景,通過無鎖設計,來提升服務性能。


/*** @description disruptor代碼樣例。每10ms向disruptor中插入一個元素,消費者讀取數據,并打印到終端*/
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;import java.util.concurrent.ThreadFactory;public class DisruptorMain
{public static void main(String[] args) throws Exception{// 隊列中的元素class Element {private int value;public int get(){return value;}public void set(int value){this.value= value;}}// 生產者的線程工廠ThreadFactory threadFactory = new ThreadFactory(){@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "simpleThread");}};// RingBuffer生產工廠,初始化RingBuffer的時候使用EventFactory<Element> factory = new EventFactory<Element>() {@Overridepublic Element newInstance() {return new Element();}};// 處理Event的handlerEventHandler<Element> handler = new EventHandler<Element>(){@Overridepublic void onEvent(Element element, long sequence, boolean endOfBatch){System.out.println("Element: " + element.get());}};// 阻塞策略BlockingWaitStrategy strategy = new BlockingWaitStrategy();// 指定RingBuffer的大小int bufferSize = 16;// 創建disruptor,采用單生產者模式Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);// 設置EventHandlerdisruptor.handleEventsWith(handler);// 啟動disruptor的線程disruptor.start();RingBuffer<Element> ringBuffer = disruptor.getRingBuffer();for (int l = 0; true; l++){// 獲取下一個可用位置的下標long sequence = ringBuffer.next();try{// 返回可用位置的元素Element event = ringBuffer.get(sequence);// 設置該位置元素的值event.set(l);}finally{ringBuffer.publish(sequence);}Thread.sleep(10);}}
}

輸出截圖:

參考:

https://martinfowler.com/articles/lmax.html

https://tech.meituan.com/2016/11/18/disruptor.html?

總結

以上是生活随笔為你收集整理的你需要知道的高性能并发框架Disruptor原理的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。