日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

每秒钟承载600万订单级别的无锁并行计算框架 Disruptor学习

發(fā)布時(shí)間:2023/12/10 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 每秒钟承载600万订单级别的无锁并行计算框架 Disruptor学习 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

1.來(lái)源

Disruptor是英國(guó)外匯交易公司LMAX開(kāi)發(fā)的一個(gè)高性能隊(duì)列,研發(fā)的初衷是解決內(nèi)部的內(nèi)存隊(duì)列的延遲問(wèn)題,而不是分布式隊(duì)列。基于Disruptor開(kāi)發(fā)的系統(tǒng)單線程能支撐每秒600萬(wàn)訂單,2010年在QCon演講后,獲得了業(yè)界關(guān)注。

2.應(yīng)用背景和介紹

據(jù)目前資料顯示:應(yīng)用Disruptor的知名項(xiàng)目有如下的一些:Storm, Camel, Log4j2,還有目前的美團(tuán)點(diǎn)評(píng)技術(shù)團(tuán)隊(duì)也有很多不少的應(yīng)用,或者說(shuō)有一些借鑒了它的設(shè)計(jì)機(jī)制。
Disruptor是一個(gè)高性能的線程間異步通信的框架,即在同一個(gè)JVM進(jìn)程中的多線程間消息傳遞。
?

Disruptor是英國(guó)外匯交易公司LMAX開(kāi)發(fā)的一個(gè)高性能隊(duì)列,研發(fā)的初衷是解決內(nèi)存隊(duì)列的延遲問(wèn)題。與Kafka、RabbitMQ用于服務(wù)間的消息隊(duì)列不同,disruptor一般用于線程間消息的傳遞。基于Disruptor開(kāi)發(fā)的系統(tǒng)單線程能支撐每秒600萬(wàn)訂單。

disruptor是用于一個(gè)JVM中多個(gè)線程之間的消息隊(duì)列,作用與ArrayBlockingQueue有相似之處,但是disruptor從功能、性能都遠(yuǎn)好于ArrayBlockingQueue,當(dāng)多個(gè)線程之間傳遞大量數(shù)據(jù)或?qū)π阅芤筝^高時(shí),可以考慮使用disruptor作為ArrayBlockingQueue的替代者。
?官方也對(duì)disruptor和ArrayBlockingQueue的性能在不同的應(yīng)用場(chǎng)景下做了對(duì)比,目測(cè)性能只有有5~10倍左右的提升。

隊(duì)列

隊(duì)列是屬于一種數(shù)據(jù)結(jié)構(gòu),隊(duì)列采用的FIFO(first in firstout),新元素(等待進(jìn)入隊(duì)列的元素)總是被插入到尾部,而讀取的時(shí)候總是從頭部開(kāi)始讀取。在計(jì)算中隊(duì)列一般用來(lái)做排隊(duì)(如線程池的等待排隊(duì),鎖的等待排隊(duì)),用來(lái)做解耦(生產(chǎn)者消費(fèi)者模式),異步等等

在jdk中的隊(duì)列都實(shí)現(xiàn)了java.util.Queue接口,在隊(duì)列中又分為兩類,一類是線程不安全的,ArrayDeque,LinkedList等等,還有一類都在java.util.concurrent包下屬于線程安全,而在我們真實(shí)的環(huán)境中,我們的機(jī)器都是屬于多線程,當(dāng)多線程對(duì)同一個(gè)隊(duì)列進(jìn)行排隊(duì)操作的時(shí)候,如果使用線程不安全會(huì)出現(xiàn),覆蓋數(shù)據(jù),數(shù)據(jù)丟失等無(wú)法預(yù)測(cè)的事情,所以我們這個(gè)時(shí)候只能選擇線程安全的隊(duì)列。
其次還剩下ArrayBlockingQueue,LinkedBlockingQueue兩個(gè)隊(duì)列,他們兩個(gè)都是用ReentrantLock控制的線程安全,他們兩個(gè)的區(qū)別一個(gè)是數(shù)組,一個(gè)是鏈表,在隊(duì)列中,一般獲取這個(gè)隊(duì)列元素之后緊接著會(huì)獲取下一個(gè)元素,或者一次獲取多個(gè)隊(duì)列元素都有可能,而數(shù)組在內(nèi)存中地址是連續(xù)的,在操作系統(tǒng)中會(huì)有緩存的優(yōu)化(下面也會(huì)介紹緩存行),所以訪問(wèn)的速度會(huì)略勝一籌,我們也會(huì)盡量去選擇ArrayBlockingQueue。而事實(shí)證明在很多第三方的框架中,比如早期的log4j異步,都是選擇的ArrayBlockingQueue。

在jdk中提供的線程安全的隊(duì)列下面簡(jiǎn)單列舉部分隊(duì)列:\

?

?

我們可以看見(jiàn),我們無(wú)鎖的隊(duì)列是無(wú)界的,有鎖的隊(duì)列是有界的,這里就會(huì)涉及到一個(gè)問(wèn)題,我們?cè)谡嬲木€上環(huán)境中,無(wú)界的隊(duì)列,對(duì)我們系統(tǒng)的影響比較大,有可能會(huì)導(dǎo)致我們內(nèi)存直接溢出,所以我們首先得排除無(wú)界隊(duì)列,當(dāng)然并不是無(wú)界隊(duì)列就沒(méi)用了,只是在某些場(chǎng)景下得排除。其次還剩下ArrayBlockingQueue,LinkedBlockingQueue兩個(gè)隊(duì)列,他們兩個(gè)都是用ReentrantLock控制的線程安全,他們兩個(gè)的區(qū)別一個(gè)是數(shù)組,一個(gè)是鏈表。
(LinkedBlockingQueue 其實(shí)也是有界隊(duì)列,但是不設(shè)置大小時(shí)就時(shí)Integer.MAX_VALUE),ArrayBlockingQueue,LinkedBlockingQueue也有自己的弊端,就是性能比較低,為什么jdk會(huì)增加一些無(wú)鎖的隊(duì)列,其實(shí)就是為了增加性能,很苦惱,又需要無(wú)鎖,又需要有界,答案就是Disruptor

Disruptor

Disruptor是英國(guó)外匯交易公司LMAX開(kāi)發(fā)的一個(gè)高性能隊(duì)列,并且是一個(gè)開(kāi)源的并發(fā)框架,并獲得2011Duke’s程序框架創(chuàng)新獎(jiǎng)。能夠在無(wú)鎖的情況下實(shí)現(xiàn)網(wǎng)絡(luò)的Queue并發(fā)操作,基于Disruptor開(kāi)發(fā)的系統(tǒng)單線程能支撐每秒600萬(wàn)訂單。目前,包括Apache Storm、Camel、Log4j2等等知名的框架都在內(nèi)部集成了Disruptor用來(lái)替代jdk的隊(duì)列,以此來(lái)獲得高性能。

為什么這么牛逼?

在Disruptor中有三大殺器:

  • CAS
  • 消除偽共享
  • RingBuffer

?

3.1.1鎖和CAS

我們ArrayBlockingQueue為什么會(huì)被拋棄的一點(diǎn),就是因?yàn)橛昧酥亓考?jí)lock鎖,在我們加鎖過(guò)程中我們會(huì)把鎖掛起,解鎖后,又會(huì)把線程恢復(fù),這一過(guò)程會(huì)有一定的開(kāi)銷,并且我們一旦沒(méi)有獲取鎖,這個(gè)線程就只能一直等待,這個(gè)線程什么事也不能做。

CAS(compare and swap),顧名思義先比較在交換,一般是比較是否是老的值,如果是的進(jìn)行交換設(shè)置,大家熟悉樂(lè)觀鎖的人都知道CAS可以用來(lái)實(shí)現(xiàn)樂(lè)觀鎖,CAS中沒(méi)有線程的上下文切換,減少了不必要的開(kāi)銷
而我們的Disruptor也是基于CAS。

3.1.2偽共享

到了偽共享就不得不說(shuō)計(jì)算機(jī)CPU緩存,緩存大小是CPU的重要指標(biāo)之一,而且緩存的結(jié)構(gòu)和大小對(duì)CPU速度的影響非常大,CPU內(nèi)緩存的運(yùn)行頻率極高,一般是和處理器同頻運(yùn)作,工作效率遠(yuǎn)遠(yuǎn)大于系統(tǒng)內(nèi)存和硬盤。實(shí)際工作時(shí),CPU往往需要重復(fù)讀取同樣的數(shù)據(jù)塊,而緩存容量的增大,可以大幅度提升CPU內(nèi)部讀取數(shù)據(jù)的命中率,而不用再到內(nèi)存或者硬盤上尋找,以此提高系統(tǒng)性能。但是從CPU芯片面積和成本的因素來(lái)考慮,緩存都很小。

CPU緩存可以分為一級(jí)緩存,二級(jí)緩存,如今主流CPU還有三級(jí)緩存,甚至有些CPU還有四級(jí)緩存。每一級(jí)緩存中所儲(chǔ)存的全部數(shù)據(jù)都是下一級(jí)緩存的一部分,這三種緩存的技術(shù)難度和制造成本是相對(duì)遞減的,所以其容量也是相對(duì)遞增的。

每一次你聽(tīng)見(jiàn)intel發(fā)布新的cpu什么,比如i7-7700k,8700k,都會(huì)對(duì)cpu緩存大小進(jìn)行優(yōu)化,感興趣可以自行下來(lái)搜索,這些的發(fā)布會(huì)或者發(fā)布文章。

Martin和Mike的 QConpresentation演講中給出了一些每個(gè)緩存時(shí)間:

緩存行

在cpu的多級(jí)緩存中,并不是以獨(dú)立的項(xiàng)來(lái)保存的,而是類似一種pageCahe的一種策略,以緩存行來(lái)保存,而緩存行的大小通常是64字節(jié),在Java中Long是8個(gè)字節(jié),所以可以存儲(chǔ)8個(gè)Long,舉個(gè)例子,你訪問(wèn)一個(gè)long的變量的時(shí)候,他會(huì)把幫助再加載7個(gè),我們上面說(shuō)為什么選擇數(shù)組不選擇鏈表,也就是這個(gè)原因,在數(shù)組中可以依靠緩沖行得到很快的訪問(wèn)。

?

緩存行是萬(wàn)能的嗎?NO,因?yàn)樗廊粠?lái)了一個(gè)缺點(diǎn),我在這里舉個(gè)例子說(shuō)明這個(gè)缺點(diǎn),可以想象有個(gè)數(shù)組隊(duì)列,ArrayQueue,他的數(shù)據(jù)結(jié)構(gòu)如下:

?

對(duì)于maxSize是我們一開(kāi)始就定義好的,數(shù)組的大小,對(duì)于currentIndex,是標(biāo)志我們當(dāng)前隊(duì)列的位置,這個(gè)變化比較快,可以想象你訪問(wèn)maxSize的時(shí)候,是不是把currentIndex也加載進(jìn)來(lái)了,這個(gè)時(shí)候,其他線程更新currentIndex,就會(huì)把cpu中的緩存行置位無(wú)效,請(qǐng)注意這是CPU規(guī)定的,他并不是只吧currentIndex置位無(wú)效,如果此時(shí)又繼續(xù)訪問(wèn)maxSize他依然得繼續(xù)從內(nèi)存中讀取,但是MaxSize卻是我們一開(kāi)始定義好的,我們應(yīng)該訪問(wèn)緩存即可,但是卻被我們經(jīng)常改變的currentIndex所影響。


Padding的魔法

為了解決上面緩存行出現(xiàn)的問(wèn)題,在Disruptor中采用了Padding的方式

?

其中的Value就被其他一些無(wú)用的long變量給填充了。這樣你修改Value的時(shí)候,就不會(huì)影響到其他變量的緩存行。

最后順便一提,在jdk8中提供了@Contended的注解,當(dāng)然一般來(lái)說(shuō)只允許Jdk中內(nèi)部,如果你自己使用那就得配置Jvm參數(shù) -RestricContentended = fase,將限制這個(gè)注解置位取消。很多文章分析了ConcurrentHashMap,但是都把這個(gè)注解給忽略掉了,在ConcurrentHashMap中就使用了這個(gè)注解,在ConcurrentHashMap每個(gè)桶都是單獨(dú)的用計(jì)數(shù)器去做計(jì)算,而這個(gè)計(jì)數(shù)器由于時(shí)刻都在變化,所以被用這個(gè)注解進(jìn)行填充緩存行優(yōu)化,以此來(lái)增加性能。



作者:tracy_668
鏈接:https://www.jianshu.com/p/bad7b4b44e48
來(lái)源:簡(jiǎn)書
著作權(quán)歸作者所有。商業(yè)轉(zhuǎn)載請(qǐng)聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請(qǐng)注明出處。

?

下面的例子是測(cè)試?yán)胏ache line的特性和不利用cache line的特性的效果對(duì)比.


?

什么是偽共享

ArrayBlockingQueue有三個(gè)成員變量:

這三個(gè)變量很容易放到一個(gè)緩存行中, 但是之間修改沒(méi)有太多的關(guān)聯(lián). 所以每次修改, 都會(huì)使之前緩存的數(shù)據(jù)失效, 從而不能完全達(dá)到共享的效果.

如上圖所示, 當(dāng)生產(chǎn)者線程put一個(gè)元素到ArrayBlockingQueue時(shí), putIndex會(huì)修改, 從而導(dǎo)致消費(fèi)者線程的緩存中的緩存行無(wú)效, 需要從主存中重新讀取.

這種無(wú)法充分使用緩存行特性的現(xiàn)象, 稱為偽共享

3.1.3RingBuffer

ringbuffer到底是什么
它是一個(gè)環(huán)(首尾相接的環(huán)),你可以把它用做在不同上下文(線程)間傳遞數(shù)據(jù)的buffer。


?

基本來(lái)說(shuō),ringbuffer擁有一個(gè)序號(hào),這個(gè)序號(hào)指向數(shù)組中下一個(gè)可用的元素。(如下圖右邊的圖片表示序號(hào),這個(gè)序號(hào)指向數(shù)組的索引4的位置

?

?

隨著你不停地填充這個(gè)buffer(可能也會(huì)有相應(yīng)的讀取),這個(gè)序號(hào)會(huì)一直增長(zhǎng),直到繞過(guò)這個(gè)環(huán)。

?

要找到數(shù)組中當(dāng)前序號(hào)指向的元素,可以通過(guò)sequence & (array length-1) = array index,比如一共有8槽,3&(8-1)=3,HashMap就是用這個(gè)方式來(lái)定位數(shù)組元素的,這種方式比取模的速度更快。

常用的隊(duì)列之間的區(qū)別

  • 沒(méi)有尾指針。只維護(hù)了一個(gè)指向下一個(gè)可用位置的序號(hào)。
  • 不刪除buffer中的數(shù)據(jù),也就是說(shuō)這些數(shù)據(jù)一直存放在buffer中,直到新的數(shù)據(jù)覆蓋他們

ringbuffer采用這種數(shù)據(jù)結(jié)構(gòu)原因

  • 因?yàn)樗菙?shù)組,所以要比鏈表快,數(shù)組內(nèi)元素的內(nèi)存地址的連續(xù)性存儲(chǔ)的。這是對(duì)CPU緩存友好的—也就是說(shuō),在硬件級(jí)別,數(shù)組中的元素是會(huì)被預(yù)加載的,因此在ringbuffer當(dāng)中,cpu無(wú)需時(shí)不時(shí)去主存加載數(shù)組中的下一個(gè)元素。因?yàn)橹灰粋€(gè)元素被加載到緩存行,其他相鄰的幾個(gè)元素也會(huì)被加載進(jìn)同一個(gè)緩存行。
  • 其次,你可以為數(shù)組預(yù)先分配內(nèi)存,使得數(shù)組對(duì)象一直存在(除非程序終止)。這就意味著不需要花大量的時(shí)間用于垃圾回收。此外,不像鏈表那樣,需要為每一個(gè)添加到其上面的對(duì)象創(chuàng)造節(jié)點(diǎn)對(duì)象—對(duì)應(yīng)的,當(dāng)刪除節(jié)點(diǎn)時(shí),需要執(zhí)行相應(yīng)的內(nèi)存清理操作。


如何從Ringbuffer讀取

消費(fèi)者(Consumer)是一個(gè)想從Ring Buffer里讀取數(shù)據(jù)的線程,它可以訪問(wèn)ConsumerBarrier對(duì)象——這個(gè)對(duì)象由RingBuffer創(chuàng)建并且代表消費(fèi)者與RingBuffer進(jìn)行交互。就像Ring Buffer顯然需要一個(gè)序號(hào)才能找到下一個(gè)可用節(jié)點(diǎn)一樣,消費(fèi)者也需要知道它將要處理的序號(hào)——每個(gè)消費(fèi)者都需要找到下一個(gè)它要訪問(wèn)的序號(hào)。在上面的例子中,消費(fèi)者處理完了Ring Buffer里序號(hào)8之前(包括8)的所有數(shù)據(jù),那么它期待訪問(wèn)的下一個(gè)序號(hào)是9。

消費(fèi)者可以調(diào)用ConsumerBarrier對(duì)象的waitFor()方法,傳遞它所需要的下一個(gè)序號(hào).

?

final long availableSeq = consumerBarrier.waitFor(nextSequence);

ConsumerBarrier返回RingBuffer的最大可訪問(wèn)序號(hào)——在上面的例子中是12。ConsumerBarrier有一個(gè)WaitStrategy方法來(lái)決定它如何等待這個(gè)序號(hào).

接下來(lái)

接下來(lái),消費(fèi)者會(huì)一直逛來(lái)逛去,等待更多數(shù)據(jù)被寫入 Ring Buffer。并且,寫入數(shù)據(jù)后消費(fèi)者會(huì)收到通知——節(jié)點(diǎn) 9,10,11 和 12 已寫入。現(xiàn)在序號(hào) 12 到了,消費(fèi)者可以指示 ConsumerBarrier 去拿這些序號(hào)里的數(shù)據(jù)了。



?

在Disruptor中采用了數(shù)組的方式保存了我們的數(shù)據(jù),上面我們也介紹了采用數(shù)組保存我們?cè)L問(wèn)時(shí)很好的利用緩存,但是在Disruptor中進(jìn)一步選擇采用了環(huán)形數(shù)組進(jìn)行保存數(shù)據(jù),也就是RingBuffer。在這里先說(shuō)明一下環(huán)形數(shù)組并不是真正的環(huán)形數(shù)組,在RingBuffer中是采用取余的方式進(jìn)行訪問(wèn)的,比如數(shù)組大小為 10,0訪問(wèn)的是數(shù)組下標(biāo)為0這個(gè)位置,其實(shí)10,20等訪問(wèn)的也是數(shù)組的下標(biāo)為0的這個(gè)位置。

實(shí)際上,在這些框架中取余并不是使用%運(yùn)算,都是使用的&與運(yùn)算,這就要求你設(shè)置的大小一般是2的N次方也就是,10,100,1000等等,這樣減去1的話就是,1,11,111,就能很好的使用index & (size -1),這樣利用位運(yùn)算就增加了訪問(wèn)速度。
如果在Disruptor中你不用2的N次方進(jìn)行大小設(shè)置,他會(huì)拋出buffersize必須為2的N次方異常。


  • Producer會(huì)向這個(gè)RingBuffer中填充元素,填充元素的流程是首先從RingBuffer讀取下一個(gè)Sequence,之后在這個(gè)Sequence位置的槽填充數(shù)據(jù),之后發(fā)布。
  • Consumer消費(fèi)RingBuffer中的數(shù)據(jù),通過(guò)SequenceBarrier來(lái)協(xié)調(diào)不同的Consumer的消費(fèi)先后順序,以及獲取下一個(gè)消費(fèi)位置Sequence。
  • Producer在RingBuffer寫滿時(shí),會(huì)從頭開(kāi)始繼續(xù)寫替換掉以前的數(shù)據(jù)。但是如果有SequenceBarrier指向下一個(gè)位置,則不會(huì)覆蓋這個(gè)位置,阻塞到這個(gè)位置被消費(fèi)完成。Consumer同理,在所有Barrier被消費(fèi)完之后,會(huì)阻塞到有新的數(shù)據(jù)進(jìn)來(lái)。

Disruptor的設(shè)計(jì)方案

Disruptor通過(guò)以下設(shè)計(jì)來(lái)解決隊(duì)列速度慢的問(wèn)題:

  • 環(huán)形數(shù)組結(jié)構(gòu)
    為了避免垃圾回收, 采用數(shù)組而非鏈表. 同時(shí), 數(shù)組對(duì)處理器的緩存機(jī)制更加友好.
  • 元素位置定位
    數(shù)組長(zhǎng)度2^n, 通過(guò)位運(yùn)算, 加快定位的速度. 下標(biāo)采取遞增的形式. 不用擔(dān)心index溢出的問(wèn)題. index是long類型, 即使100萬(wàn)QPS的處理速度, 也需要30萬(wàn)年才能用完.
  • 無(wú)鎖設(shè)計(jì)
    每個(gè)生產(chǎn)者或者消費(fèi)者線程, 會(huì)先申請(qǐng)可以操作的元素在數(shù)組中的位置, 申請(qǐng)到之后, 直接在該位置寫入或者讀取數(shù)據(jù).

下面忽略數(shù)組的環(huán)形結(jié)構(gòu), 介紹一下如何實(shí)現(xiàn)無(wú)鎖設(shè)計(jì). 整個(gè)過(guò)程通過(guò)原子變量CAS, 保證操作的線程安全.

一個(gè)生產(chǎn)者

生產(chǎn)者單線程寫數(shù)據(jù)的流程比較簡(jiǎn)單:

  • 申請(qǐng)寫入m個(gè)元素;
  • 若是有m個(gè)元素可以寫入, 則返回最大的序列號(hào). 這兒主要判斷是否會(huì)覆蓋未讀的元素
  • 若是返回的正確, 則生產(chǎn)者開(kāi)始寫入元素.

  • 多個(gè)生產(chǎn)者
    多個(gè)生產(chǎn)者的情況下, 會(huì)遇到“如何防止多個(gè)線程重復(fù)寫同一個(gè)元素”的問(wèn)題. Disruptor的解決方法是, 每個(gè)線程獲取不同的一段數(shù)組空間進(jìn)行操作. 這個(gè)通過(guò)CAS很容易達(dá)到. 只需要在分配元素的時(shí)候, 通過(guò)CAS判斷一下這段空間是否已經(jīng)分配出去即可.

    但是會(huì)遇到一個(gè)新問(wèn)題: 如何防止讀取的時(shí)候, 讀到還未寫的元素. Disruptor在多個(gè)生產(chǎn)者的情況下, 引入了一個(gè)與Ring Buffer大小相同的buffer: available Buffer. 當(dāng)某個(gè)位置寫入成功的時(shí)候, 便把a(bǔ)vailble Buffer相應(yīng)的位置置位, 標(biāo)記為寫入成功. 讀取的時(shí)候, 會(huì)遍歷available Buffer, 來(lái)判斷元素是否已經(jīng)就緒.

    讀數(shù)據(jù)
    生產(chǎn)者多線程寫入的情況會(huì)復(fù)雜很多:

  • 申請(qǐng)讀取到序號(hào)n;
  • 若writer cursor >= n, 這時(shí)仍然無(wú)法確定連續(xù)可讀的最大下標(biāo). 從reader cursor開(kāi)始讀取available Buffer, 一直查到第一個(gè)不可用的元素, 然后返回最大連續(xù)可讀元素的位置;
  • 消費(fèi)者讀取元素.
  • 如下圖所示, 讀線程讀到下標(biāo)為2的元素, 三個(gè)線程Writer1/Writer2/Writer3正在向RingBuffer相應(yīng)位置寫數(shù)據(jù), 寫線程被分配到的最大元素下標(biāo)是11.
    讀線程申請(qǐng)讀取到下標(biāo)從3到11的元素, 判斷writer cursor>=11. 然后開(kāi)始讀取availableBuffer, 從3開(kāi)始, 往后讀取, 發(fā)現(xiàn)下標(biāo)為7的元素沒(méi)有生產(chǎn)成功, 于是WaitFor(11)返回6.

    然后, 消費(fèi)者讀取下標(biāo)從3到6共計(jì)4個(gè)元素.



    ?

    ?

    寫數(shù)據(jù)
    多個(gè)生產(chǎn)者寫入的時(shí)候:

  • 申請(qǐng)寫入m個(gè)元素;
  • 若是有m個(gè)元素可以寫入, 則返回最大的序列號(hào). 每個(gè)生產(chǎn)者會(huì)被分配一段獨(dú)享的空間;
  • 生產(chǎn)者寫入元素, 寫入元素的同時(shí)設(shè)置available Buffer里面相應(yīng)的位置, 以標(biāo)記自己哪些位置是已經(jīng)寫入成功的.
    如下圖所示, Writer1和Writer2兩個(gè)線程寫入數(shù)組, 都申請(qǐng)可寫的數(shù)組空間. Writer1被分配了下標(biāo)3到下表5的空間, Writer2被分配了下標(biāo)6到下標(biāo)9的空間.
  • Writer1寫入下標(biāo)3位置的元素, 同時(shí)把a(bǔ)vailable Buffer相應(yīng)位置置位, 標(biāo)記已經(jīng)寫入成功, 往后移一位, 開(kāi)始寫下標(biāo)4位置的元素. Writer2同樣的方式. 最終都寫入完成.



    ?

    防止不同生產(chǎn)者對(duì)同一段空間寫入的代碼, 如下所示:

    通過(guò)do/while循環(huán)的條件cursor.compareAndSet(current, next), 來(lái)判斷每次申請(qǐng)的空間是否已經(jīng)被其他生產(chǎn)者占據(jù). 假如已經(jīng)被占據(jù), 該函數(shù)會(huì)返回失敗, While循環(huán)重新執(zhí)行, 申請(qǐng)寫入空間.

    消費(fèi)者的流程與生產(chǎn)者非常類似, 這兒就不多描述了. Disruptor通過(guò)精巧的無(wú)鎖設(shè)計(jì)實(shí)現(xiàn)了在高并發(fā)情形下的高性能.



    3.2Disruptor怎么使用

    package concurrent;import sun.misc.Contended;import java.util.concurrent.ThreadFactory;import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType;/*** @Description:* @Created on 2019-10-04*/ public class DisruptorTest {public static void main(String[] args) throws Exception {// 隊(duì)列中的元素class Element {@Contendedprivate String value;public String getValue() {return value;}public void setValue(String value) {this.value = value;}}// 生產(chǎn)者的線程工廠ThreadFactory threadFactory = new ThreadFactory() {int i = 0;@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "simpleThread" + String.valueOf(i++));}};// RingBuffer生產(chǎn)工廠,初始化RingBuffer的時(shí)候使用EventFactory<Element> factory = new EventFactory<Element>() {@Overridepublic Element newInstance() {return new Element();}};// 處理Event的handlerEventHandler<Element> handler = new EventHandler<Element>() {@Overridepublic void onEvent(Element element, long sequence, boolean endOfBatch) throws InterruptedException {System.out.println("Element: " + Thread.currentThread().getName() + ": " + element.getValue() + ": " + sequence); // Thread.sleep(10000000);}};// 阻塞策略BlockingWaitStrategy strategy = new BlockingWaitStrategy();// 指定RingBuffer的大小int bufferSize = 8;// 創(chuàng)建disruptor,采用單生產(chǎn)者模式Disruptor<Element> disruptor = new Disruptor(factory, bufferSize, threadFactory, ProducerType.SINGLE, strategy);// 設(shè)置EventHandlerdisruptor.handleEventsWith(handler);// 啟動(dòng)disruptor的線程disruptor.start();for (int i = 0; i < 10; i++) {disruptor.publishEvent((element, sequence) -> {System.out.println("之前的數(shù)據(jù)" + element.getValue() + "當(dāng)前的sequence" + sequence);element.setValue("我是第" + sequence + "個(gè)");});}} }

    ?

    在Disruptor中有幾個(gè)比較關(guān)鍵的:

    • ThreadFactory:這是一個(gè)線程工廠,用于我們Disruptor中生產(chǎn)、消費(fèi)的時(shí)候需要的線程。
    • EventFactory:事件工廠,用于產(chǎn)生我們隊(duì)列元素的工廠。在Disruptor中,他會(huì)在初始化的時(shí)候直接填充滿RingBuffer,一次到位。
    • EventHandler:用于處理Event的handler,這里一個(gè)EventHandler可以看做是一個(gè)消費(fèi)者,但是多個(gè)EventHandler他們都是獨(dú)立消費(fèi)的隊(duì)列。
    • WorkHandler:也是用于處理Event的handler,和上面區(qū)別在于,多個(gè)消費(fèi)者都是共享同一個(gè)隊(duì)列。
    • WaitStrategy:等待策略,在Disruptor中有多種策略,來(lái)決定消費(fèi)者在消費(fèi)時(shí),如果沒(méi)有數(shù)據(jù)采取的策略是什么?下面簡(jiǎn)單列舉一下Disruptor中的部分策略
  • BlockingWaitStrategy:通過(guò)線程阻塞的方式,等待生產(chǎn)者喚醒,被喚醒后,再循環(huán)檢查依賴的sequence是否已經(jīng)消費(fèi)。
  • BusySpinWaitStrategy:線程一直自旋等待,可能比較耗cpu
  • YieldingWaitStrategy:嘗試100次,然后Thread.yield()讓出cpu

  • ?

    總結(jié)

    以上是生活随笔為你收集整理的每秒钟承载600万订单级别的无锁并行计算框架 Disruptor学习的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。