消息队列:生产者/消费者模式
1.什么是生產(chǎn)者消費(fèi)者模式
? ? ? ?生產(chǎn)者消費(fèi)者模式是通過(guò)一個(gè)容器來(lái)解決生產(chǎn)者和消費(fèi)者的強(qiáng)耦合問(wèn)題。生產(chǎn)者和消費(fèi)者彼此之間不直接通訊,而通過(guò)阻塞隊(duì)列來(lái)進(jìn)行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費(fèi)者處理,直接扔給阻塞隊(duì)列,消費(fèi)者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊(duì)列里取,阻塞隊(duì)列就相當(dāng)于一個(gè)緩沖區(qū),平衡了生產(chǎn)者和消費(fèi)者的處理能力。
? ? ? ?這個(gè)阻塞隊(duì)列就是用來(lái)給生產(chǎn)者和消費(fèi)者解耦的。縱觀大多數(shù)設(shè)計(jì)模式,都會(huì)找一個(gè)第三者出來(lái)進(jìn)行解耦,如工廠模式的第三者是工廠類(lèi),模板模式的第三者是模板類(lèi)。在學(xué)習(xí)一些設(shè)計(jì)模式的過(guò)程中,如果先找到這個(gè)模式的第三者,能幫助我們快速熟悉一個(gè)設(shè)計(jì)模式。
2.生產(chǎn)消費(fèi)者模型
? ? ? ?生產(chǎn)者消費(fèi)者模型具體來(lái)講,就是在一個(gè)系統(tǒng)中,存在生產(chǎn)者和消費(fèi)者兩種角色,他們通過(guò)內(nèi)存緩沖區(qū)進(jìn)行通信,生產(chǎn)者生產(chǎn)消費(fèi)者需要的資料,消費(fèi)者把資料做成產(chǎn)品。生產(chǎn)消費(fèi)者模式如下圖。
? ? ? ?在日益發(fā)展的服務(wù)類(lèi)型中,譬如注冊(cè)用戶這種服務(wù),它可能解耦成好幾種獨(dú)立的服務(wù)(賬號(hào)驗(yàn)證,郵箱驗(yàn)證碼,手機(jī)短信碼等)。它們作為消費(fèi)者,等待用戶輸入數(shù)據(jù),在前臺(tái)數(shù)據(jù)提交之后會(huì)經(jīng)過(guò)分解并發(fā)送到各個(gè)服務(wù)所在的url,分發(fā)的那個(gè)角色就相當(dāng)于生產(chǎn)者。消費(fèi)者在獲取數(shù)據(jù)時(shí)候有可能一次不能處理完,那么它們各自有一個(gè)請(qǐng)求隊(duì)列,那就是內(nèi)存緩沖區(qū)了。做這項(xiàng)工作的框架叫做消息隊(duì)列。
3.生產(chǎn)者消費(fèi)者模型的實(shí)現(xiàn)
生產(chǎn)者是一堆線程,消費(fèi)者是另一堆線程,內(nèi)存緩沖區(qū)可以使用List數(shù)組隊(duì)列,數(shù)據(jù)類(lèi)型只需要定義一個(gè)簡(jiǎn)單的類(lèi)就好。關(guān)鍵是如何處理多線程之間的協(xié)作。這其實(shí)也是多線程通信的一個(gè)范例。
在這個(gè)模型中,最關(guān)鍵就是內(nèi)存緩沖區(qū)為空的時(shí)候消費(fèi)者必須等待,而內(nèi)存緩沖區(qū)滿的時(shí)候,生產(chǎn)者必須等待。其他時(shí)候可以是個(gè)動(dòng)態(tài)平衡。值得注意的是多線程對(duì)臨界區(qū)資源的操作時(shí)候必須保證在讀寫(xiě)中只能存在一個(gè)線程,所以需要設(shè)計(jì)鎖的策略。
4.為什么要使用生產(chǎn)者和消費(fèi)者模式
? ? ? ?在線程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費(fèi)者就是消費(fèi)數(shù)據(jù)的線程。在多線程開(kāi)發(fā)當(dāng)中,如果生產(chǎn)者處理速度很快,而消費(fèi)者處理速度很慢,那么生產(chǎn)者就必須等待消費(fèi)者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費(fèi)者的處理能力大于生產(chǎn)者,那么消費(fèi)者就必須等待生產(chǎn)者。為了解決這種生產(chǎn)消費(fèi)能力不均衡的問(wèn)題,所以便有了生產(chǎn)者和消費(fèi)者模式。
為了不至于太抽象,我們舉一個(gè)寄信的例子(雖說(shuō)這年頭寄信已經(jīng)不時(shí)興,但這個(gè)例子還是比較貼切的)。假設(shè)你要寄一封平信,大致過(guò)程如下:
??? 1、你把信寫(xiě)好——相當(dāng)于生產(chǎn)者制造數(shù)據(jù)
??? 2、你把信放入郵筒——相當(dāng)于生產(chǎn)者把數(shù)據(jù)放入緩沖區(qū)
??? 3、郵遞員把信從郵筒取出——相當(dāng)于消費(fèi)者把數(shù)據(jù)取出緩沖區(qū)
??? 4、郵遞員把信拿去郵局做相應(yīng)的處理——相當(dāng)于消費(fèi)者處理數(shù)據(jù)
4.1優(yōu)點(diǎn)
- 解耦
??? 假設(shè)生產(chǎn)者和消費(fèi)者分別是兩個(gè)類(lèi)。如果讓生產(chǎn)者直接調(diào)用消費(fèi)者的某個(gè)方法,那么生產(chǎn)者對(duì)于消費(fèi)者就會(huì)產(chǎn)生依賴(lài)(也就是耦合)。將來(lái)如果消費(fèi)者的代碼發(fā)生變化,可能會(huì)影響到生產(chǎn)者。而如果兩者都依賴(lài)于某個(gè)緩沖區(qū),兩者之間不直接依賴(lài),耦合也就相應(yīng)降低了。
? ? ? ?接著上述的例子,如果不使用郵筒(也就是緩沖區(qū)),你必須得把信直接交給郵遞員。有同學(xué)會(huì)說(shuō),直接給郵遞員不是挺簡(jiǎn)單的嘛?其實(shí)不簡(jiǎn)單,你必須得認(rèn)識(shí)誰(shuí)是郵遞員,才能把信給他(光憑身上穿的制服,萬(wàn)一有人假冒,就慘了)。這就產(chǎn)生和你和郵遞員之間的依賴(lài)(相當(dāng)于生產(chǎn)者和消費(fèi)者的強(qiáng)耦合)。萬(wàn)一哪天郵遞員換人了,你還要重新認(rèn)識(shí)一下(相當(dāng)于消費(fèi)者變化導(dǎo)致修改生產(chǎn)者代碼)。而郵筒相對(duì)來(lái)說(shuō)比較固定,你依賴(lài)它的成本就比較低(相當(dāng)于和緩沖區(qū)之間的弱耦合)。
- 支持并發(fā)(concurrency)
? ? ? ?生產(chǎn)者直接調(diào)用消費(fèi)者的某個(gè)方法,還有另一個(gè)弊端。由于函數(shù)調(diào)用是同步的(或者叫阻塞的),在消費(fèi)者的方法沒(méi)有返回之前,生產(chǎn)者只好一直等在那邊。萬(wàn)一消費(fèi)者處理數(shù)據(jù)很慢,生產(chǎn)者就會(huì)白白糟蹋大好時(shí)光。
? ? ? ?使用了生產(chǎn)者/消費(fèi)者模式之后,生產(chǎn)者和消費(fèi)者可以是兩個(gè)獨(dú)立的并發(fā)主體(常見(jiàn)并發(fā)類(lèi)型有進(jìn)程和線程兩種)。生產(chǎn)者把制造出來(lái)的數(shù)據(jù)往緩沖區(qū)一丟,就可以再去生產(chǎn)下一個(gè)數(shù)據(jù)。基本上不用依賴(lài)消費(fèi)者的處理速度。
- 支持忙閑不均
? ? ? ?緩沖區(qū)還有另一個(gè)好處。如果制造數(shù)據(jù)的速度時(shí)快時(shí)慢,緩沖區(qū)的好處就體現(xiàn)出來(lái)了。當(dāng)數(shù)據(jù)制造快的時(shí)候,消費(fèi)者來(lái)不及處理,未處理的數(shù)據(jù)可以暫時(shí)存在緩沖區(qū)中。等生產(chǎn)者的制造速度慢下來(lái),消費(fèi)者再慢慢處理掉。
? ? ? ?為了充分復(fù)用,我們?cè)倌眉男诺睦觼?lái)說(shuō)事。假設(shè)郵遞員一次只能帶走1000封信。萬(wàn)一某次碰上情人節(jié)(也可能是圣誕節(jié))送賀卡,需要寄出去的信超過(guò)1000封,這時(shí)候郵筒這個(gè)緩沖區(qū)就派上用場(chǎng)了。郵遞員把來(lái)不及帶走的信暫存在郵筒中,等下次過(guò)來(lái)時(shí)再拿走。
5.多生產(chǎn)者和多消費(fèi)者場(chǎng)景
? ? ? ? 在多核時(shí)代,多線程并發(fā)處理速度比單線程處理速度更快,所以我們可以使用多個(gè)線程來(lái)生產(chǎn)數(shù)據(jù),同樣可以使用多個(gè)消費(fèi)線程來(lái)消費(fèi)數(shù)據(jù)。而更復(fù)雜的情況是,消費(fèi)者消費(fèi)的數(shù)據(jù),有可能需要繼續(xù)處理,于是消費(fèi)者處理完數(shù)據(jù)之后,它又要作為生產(chǎn)者把數(shù)據(jù)放在新的隊(duì)列里,交給其他消費(fèi)者繼續(xù)處理。如下圖:
6.線程池與生產(chǎn)消費(fèi)者模式
? ? ? ?Java中的線程池類(lèi)其實(shí)就是一種生產(chǎn)者和消費(fèi)者模式的實(shí)現(xiàn)方式,但是我覺(jué)得其實(shí)現(xiàn)方式更加高明。生產(chǎn)者把任務(wù)丟給線程池,線程池創(chuàng)建線程并處理任務(wù),如果將要運(yùn)行的任務(wù)數(shù)大于線程池的基本線程數(shù)就把任務(wù)扔到阻塞隊(duì)列里,這種做法比只使用一個(gè)阻塞隊(duì)列來(lái)實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者模式顯然要高明很多,因?yàn)橄M(fèi)者能夠處理直接就處理掉了,這樣速度更快,而生產(chǎn)者先存,消費(fèi)者再取這種方式顯然慢一些。
? ? ? ?我們的系統(tǒng)也可以使用線程池來(lái)實(shí)現(xiàn)多生產(chǎn)者消費(fèi)者模式。比如創(chuàng)建N個(gè)不同規(guī)模的Java線程池來(lái)處理不同性質(zhì)的任務(wù),比如線程池1將數(shù)據(jù)讀到內(nèi)存之后,交給線程池2里的線程繼續(xù)處理壓縮數(shù)據(jù)。線程池1主要處理IO密集型任務(wù),線程池2主要處理CPU密集型任務(wù)。
7.內(nèi)存緩沖區(qū)
最傳統(tǒng)、最常見(jiàn)的方式:隊(duì)列(FIFO)作緩沖。
7.1 線程方式
并發(fā)線程中使用隊(duì)列的優(yōu)缺點(diǎn)
- 內(nèi)存分配的性能
? ? ? ?在線程方式下,生產(chǎn)者和消費(fèi)者各自是一個(gè)線程。生產(chǎn)者把數(shù)據(jù)寫(xiě)入隊(duì)列頭(以下簡(jiǎn)稱(chēng)push),消費(fèi)者從隊(duì)列尾部讀出數(shù)據(jù)(以下簡(jiǎn)稱(chēng)pop)。當(dāng)隊(duì)列為空,消費(fèi)者就稍息(稍事休息);當(dāng)隊(duì)列滿(達(dá)到最大長(zhǎng)度),生產(chǎn)者就稍息。整個(gè)流程并不復(fù)雜。
? ? ? ?上述過(guò)程會(huì)有一個(gè)主要的問(wèn)題是關(guān)于內(nèi)存分配的性能開(kāi)銷(xiāo)。對(duì)于常見(jiàn)的隊(duì)列實(shí)現(xiàn):在每次push時(shí),可能涉及到堆內(nèi)存的分配;在每次pop時(shí),可能涉及堆內(nèi)存的釋放。假如生產(chǎn)者和消費(fèi)者都很勤快,頻繁地push、pop,那內(nèi)存分配的開(kāi)銷(xiāo)就很可觀了。對(duì)于內(nèi)存分配的開(kāi)銷(xiāo),可查找Java性能優(yōu)化相關(guān)知識(shí)。
? ? ? ? 解決辦法:環(huán)形緩沖區(qū)。
- 同步和互斥的性能
? ? ? ?另外,由于兩個(gè)線程共用一個(gè)隊(duì)列,自然就會(huì)涉及到線程間諸如同步、互斥、死鎖等等。這會(huì)兒要細(xì)談的是,同步和互斥的性能開(kāi)銷(xiāo)。在很多場(chǎng)合中,諸如信號(hào)量、互斥量等的使用也是有不小的開(kāi)銷(xiāo)的(某些情況下,也可能導(dǎo)致用戶態(tài)/核心態(tài)切換)。如果像剛才所說(shuō),生產(chǎn)者和消費(fèi)者都很勤快,那這些開(kāi)銷(xiāo)也不容小覷。
? ? ? ? 解決辦法:雙緩沖區(qū)。
- 適用于隊(duì)列的場(chǎng)合
? ? ? ?由于隊(duì)列是很常見(jiàn)的數(shù)據(jù)結(jié)構(gòu),大部分編程語(yǔ)言都內(nèi)置了隊(duì)列的支持,有些語(yǔ)言甚至提供了線程安全的隊(duì)列(比如JDK 1.5引入的ArrayBlockingQueue)。因此,開(kāi)發(fā)人員可以撿現(xiàn)成,避免了重新發(fā)明輪子。
? ? ? ?所以,假如你的數(shù)據(jù)流量不是很大,采用隊(duì)列緩沖區(qū)的好處還是很明顯的:邏輯清晰、代碼簡(jiǎn)單、維護(hù)方便。比較符合KISS原則。
7.2 進(jìn)程方式
? ? ? ?跨進(jìn)程的生產(chǎn)者/消費(fèi)者模式,非常依賴(lài)于具體的進(jìn)程間通訊(IPC)方式。而IPC的種類(lèi)很多。下面介紹比較常用的跨平臺(tái)、且編程語(yǔ)言支持較多的IPC方式。
-
匿名管道
? ? ? ?感覺(jué)管道是最像隊(duì)列的IPC類(lèi)型。生產(chǎn)者進(jìn)程在管道的寫(xiě)端放入數(shù)據(jù);消費(fèi)者進(jìn)程在管道的讀端取出數(shù)據(jù)。整個(gè)的效果和線程中使用隊(duì)列非常類(lèi)似,區(qū)別在于使用管道就無(wú)需操心線程安全、內(nèi)存分配等瑣事(操作系統(tǒng)暗中都幫你搞定了)。
? ? ? ?管道又分命名管道和匿名管道兩種,今天主要聊匿名管道。因?yàn)槊艿涝诓煌牟僮飨到y(tǒng)下差異較大(比如Win32和POSIX,在命名管道的API接口和功能實(shí)現(xiàn)上都有較大差異;有些平臺(tái)不支持命名管道,比如Windows CE)。除了操作系統(tǒng)的問(wèn)題,對(duì)于有些編程語(yǔ)言(比如Java)來(lái)說(shuō),命名管道是無(wú)法使用的。
? ? ? ?其實(shí)匿名管道在不同平臺(tái)上的API接口,也是有差異的(比如Win32的CreatePipe和POSIX的pipe,用法就很不一樣)。但是我們可以僅使用標(biāo)準(zhǔn)輸入和標(biāo)準(zhǔn)輸出(以下簡(jiǎn)稱(chēng)stdio)來(lái)進(jìn)行數(shù)據(jù)的流入流出。然后利用shell的管道符把生產(chǎn)者進(jìn)程和消費(fèi)者進(jìn)程關(guān)聯(lián)起來(lái)。實(shí)際上,很多操作系統(tǒng)(尤其是POSIX風(fēng)格的)自帶的命令都充分利用了這個(gè)特性來(lái)實(shí)現(xiàn)數(shù)據(jù)的傳輸(比如more、grep等),如此優(yōu)點(diǎn):
? ? ? ?1、基本上所有操作系統(tǒng)都支持在shell方式下使用管道符。因此很容易實(shí)現(xiàn)跨平臺(tái)。
? ? ? ?2、大部分編程語(yǔ)言都能夠操作stdio,因此跨編程語(yǔ)言也就容易實(shí)現(xiàn)。
? ? ? ?3、管道方式省卻了線程安全方面的瑣事。有利于降低開(kāi)發(fā)、調(diào)試成本。
? ? ? ?當(dāng)然,這種方式也有自身的缺點(diǎn):
? ? ? ?1、生產(chǎn)者進(jìn)程和消費(fèi)者進(jìn)程必須得在同一臺(tái)主機(jī)上,無(wú)法跨機(jī)器通訊。這個(gè)缺點(diǎn)比較明顯。
? ? ? ?2、在一對(duì)一的情況下,這種方式挺合用。但如果要擴(kuò)展到一對(duì)多或者多對(duì)一,那就有點(diǎn)棘手了。所以這種方式的擴(kuò)展性要打個(gè)折扣。假如今后要考慮類(lèi)似的擴(kuò)展,這個(gè)缺點(diǎn)就比較明顯。
? ? ? ? 3、由于管道是shell創(chuàng)建的,對(duì)于兩邊的進(jìn)程不可見(jiàn)(程序看到的只是stdio)。在某些情況下,導(dǎo)致程序不便于對(duì)管道進(jìn)行操縱(比如調(diào)整管道緩沖區(qū)尺寸)。這個(gè)缺點(diǎn)不太明顯。
? ? ? ? 4、最后,這種方式只能單向傳數(shù)據(jù)。好在大多數(shù)情況下,消費(fèi)者進(jìn)程不需要傳數(shù)據(jù)給生產(chǎn)者進(jìn)程。萬(wàn)一你確實(shí)需要信息反饋(從消費(fèi)者到生產(chǎn)者),那就費(fèi)勁了。可能得考慮換種IPC方式。
? ? ? ?注意事項(xiàng):
? ? ? ?1、對(duì)stdio進(jìn)行讀寫(xiě)操作是以阻塞方式進(jìn)行。比如管道中沒(méi)有數(shù)據(jù),消費(fèi)者進(jìn)程的讀操作就會(huì)一直停在哪兒,直到管道中重新有數(shù)據(jù)。
? ? ? ?2、由于stdio內(nèi)部帶有自己的緩沖區(qū)(這緩沖區(qū)和管道緩沖區(qū)是兩碼事),有時(shí)會(huì)導(dǎo)致一些不太爽的現(xiàn)象(比如生產(chǎn)者進(jìn)程輸出了數(shù)據(jù),但消費(fèi)者進(jìn)程沒(méi)有立即讀到)。
-
SOCKET(TCP方式)
? ? ? ?基于TCP方式的SOCKET通訊是又一個(gè)類(lèi)似于隊(duì)列的IPC方式。它同樣保證了數(shù)據(jù)的順序到達(dá);同樣有緩沖的機(jī)制。而且跨平臺(tái)和跨語(yǔ)言,和剛才介紹的shell管道符方式類(lèi)似。
? ? ? ? SOCKET相比shell管道符的方式,主要有如下幾個(gè)優(yōu)點(diǎn):
??? 1、SOCKET方式可以跨機(jī)器(便于實(shí)現(xiàn)分布式)。這是主要優(yōu)點(diǎn)。
??? 2、SOCKET方式便于將來(lái)擴(kuò)展成為多對(duì)一或者一對(duì)多。這也是主要優(yōu)點(diǎn)。
??? 3、SOCKET可以設(shè)置阻塞和非阻塞方法,用起來(lái)比較靈活。這是次要優(yōu)點(diǎn)。
??? 4、SOCKET支持雙向通訊,有利于消費(fèi)者反饋信息。
? ? ? ?當(dāng)然有利就有弊。相對(duì)于上述shell管道的方式,使用SOCKET在編程上會(huì)更復(fù)雜一些。好在前人已經(jīng)做了大量的工作,可借助于這些第三方的庫(kù)和框架,比如C++的ACE庫(kù)、Python的Twisted。
? ? ? ?雖然TCP在很多方面比UDP可靠,但鑒于跨機(jī)器通訊先天的不可預(yù)料性,可以在生產(chǎn)者進(jìn)程和消費(fèi)者進(jìn)程內(nèi)部各自再引入基于線程的"生產(chǎn)者/消費(fèi)者模式",如下圖:
這么做的關(guān)鍵點(diǎn)在于把代碼分為兩部分:生產(chǎn)線程和消費(fèi)線程屬于和業(yè)務(wù)邏輯相關(guān)的代碼(和通訊邏輯無(wú)關(guān));發(fā)送線程和接收線程屬于通訊相關(guān)的代碼(和業(yè)務(wù)邏輯無(wú)關(guān))。
??? 這樣的好處是很明顯的,具體如下:
??? 1、能夠應(yīng)對(duì)暫時(shí)性的網(wǎng)絡(luò)故障。并且在網(wǎng)絡(luò)故障解除后,能夠繼續(xù)工作。
??? 2、網(wǎng)絡(luò)故障的應(yīng)對(duì)處理方式(比如斷開(kāi)后的嘗試重連),只影響發(fā)送和接收線程,不會(huì)影響生產(chǎn)線程和消費(fèi)線程(業(yè)務(wù)邏輯部分)。
??? 3、具體的SOCKET方式(阻塞和非阻塞)只影響發(fā)送和接收線程,不影響生產(chǎn)線程和消費(fèi)線程(業(yè)務(wù)邏輯部分)。
??? 4、不依賴(lài)TCP自身的發(fā)送緩沖區(qū)和接收緩沖區(qū)。(默認(rèn)的TCP緩沖區(qū)的大小可能無(wú)法滿足實(shí)際要求)
??? 5、業(yè)務(wù)邏輯的變化(比如業(yè)務(wù)需求變更)不影響發(fā)送線程和接收線程。
??? 針對(duì)上述的最后一條,如果整個(gè)業(yè)務(wù)系統(tǒng)中有多個(gè)進(jìn)程是采用上述的模式,那或許可以重構(gòu):在業(yè)務(wù)邏輯代碼和通訊邏輯代碼之間,把業(yè)務(wù)邏輯無(wú)關(guān)的部分封裝成一個(gè)通訊中間件。
7.3 環(huán)形緩沖區(qū)
使用場(chǎng)景:當(dāng)存儲(chǔ)空間(不僅包括內(nèi)存,還可能包括諸如硬盤(pán)之類(lèi)的存儲(chǔ)介質(zhì))的分配/釋放非常頻繁并且確實(shí)產(chǎn)生了明顯的影響,才應(yīng)該考慮環(huán)形緩沖區(qū)的使用。否則的話,還是選用最基本、最簡(jiǎn)單的隊(duì)列緩沖區(qū)。
-
環(huán)形緩沖區(qū) vs 隊(duì)列緩沖區(qū)
? ? 1.外部接口相似
? ? 普通的隊(duì)列有一個(gè)寫(xiě)入端和一個(gè)讀出端。隊(duì)列為空的時(shí)候,讀出端無(wú)法讀取數(shù)據(jù);當(dāng)隊(duì)列滿(達(dá)到最大尺寸)時(shí),寫(xiě)入端無(wú)法寫(xiě)入數(shù)據(jù)。
??? 對(duì)于使用者來(lái)講,環(huán)形緩沖區(qū)和隊(duì)列緩沖區(qū)是一樣的。它也有一個(gè)寫(xiě)入端(用于push)和一個(gè)讀出端(用于pop),也有緩沖區(qū)“滿”和“空”的狀態(tài)。所以,從隊(duì)列緩沖區(qū)切換到環(huán)形緩沖區(qū),對(duì)于使用者來(lái)說(shuō)能比較平滑地過(guò)渡。
? ? 2.內(nèi)部結(jié)構(gòu)迥異
??? 雖然兩者的對(duì)外接口差不多,但是內(nèi)部結(jié)構(gòu)和運(yùn)作機(jī)制有很大差別。重點(diǎn)介紹一下環(huán)形緩沖區(qū)的內(nèi)部結(jié)構(gòu)。
? ? 可以把環(huán)形緩沖區(qū)的讀出端(以下簡(jiǎn)稱(chēng)R)和寫(xiě)入端(以下簡(jiǎn)稱(chēng)W)想象成是兩個(gè)人在體育場(chǎng)跑道上追逐(R追W)。當(dāng)R追上W的時(shí)候,就是緩沖區(qū)為空;當(dāng)W追上R的時(shí)候(W比R多跑一圈),就是緩沖區(qū)滿。
??? 為了形象起見(jiàn),如下:
?從上圖可以看出,環(huán)形緩沖區(qū)所有的push和pop操作都是在一個(gè)固定的存儲(chǔ)空間內(nèi)進(jìn)行。而隊(duì)列緩沖區(qū)在push的時(shí)候,可能會(huì)分配存儲(chǔ)空間用于存儲(chǔ)新元素;在pop時(shí),可能會(huì)釋放廢棄元素的存儲(chǔ)空間。所以環(huán)形方式相比隊(duì)列方式,少掉了對(duì)于緩沖區(qū)元素所用存儲(chǔ)空間的分配、釋放。這是環(huán)形緩沖區(qū)的一個(gè)主要優(yōu)勢(shì)。
-
環(huán)形緩沖區(qū)的實(shí)現(xiàn)
? ? ? ?1.數(shù)組方式 vs 鏈表方式
? ? ? ?環(huán)形緩沖區(qū)的內(nèi)部實(shí)現(xiàn),即可基于數(shù)組(此處的數(shù)組,泛指連續(xù)存儲(chǔ)空間)實(shí)現(xiàn),也可基于鏈表實(shí)現(xiàn)。
? ? ? ?數(shù)組在物理存儲(chǔ)上是一維的連續(xù)線性結(jié)構(gòu),可以在初始化時(shí),把存儲(chǔ)空間一次性分配好,這是數(shù)組方式的優(yōu)點(diǎn)。但是要使用數(shù)組來(lái)模擬環(huán),你必須在邏輯上把數(shù)組的頭和尾相連。在順序遍歷數(shù)組時(shí),對(duì)尾部元素(最后一個(gè)元素)要作一下特殊處理。訪問(wèn)尾部元素的下一個(gè)元素時(shí),要重新回到頭部元素(第0個(gè)元素)。如下圖所示:
? ? ? ? 使用鏈表的方式,正好和數(shù)組相反:鏈表省去了頭尾相連的特殊處理。但是鏈表在初始化的時(shí)候比較繁瑣,而且在有些場(chǎng)合(比如跨進(jìn)程的IPC)不太方便使用。
? ? ? ?2.讀寫(xiě)操作
? ? ? ?環(huán)形緩沖區(qū)要維護(hù)兩個(gè)索引,分別對(duì)應(yīng)寫(xiě)入端(W)和讀取端(R)。寫(xiě)入(push)的時(shí)候,先確保環(huán)沒(méi)滿,然后把數(shù)據(jù)復(fù)制到W所對(duì)應(yīng)的元素,最后W指向下一個(gè)元素;讀取(pop)的時(shí)候,先確保環(huán)沒(méi)空,然后返回R對(duì)應(yīng)的元素,最后R指向下一個(gè)元素。
? ? ? ?3.判斷“空”和“滿”
? ? ? ?上述的操作并不復(fù)雜,不過(guò)有一個(gè)小小的麻煩:空環(huán)和滿環(huán)的時(shí)候,R和W都指向同一個(gè)位置!這樣就無(wú)法判斷到底是“空”還是“滿”。大體上有兩種方法可以解決該問(wèn)題。
? ? ? ?辦法1:始終保持一個(gè)元素不用
? ? ? ?當(dāng)空環(huán)的時(shí)候,R和W重疊。當(dāng)W比R跑得快,追到距離R還有一個(gè)元素間隔的時(shí)候,就認(rèn)為環(huán)已經(jīng)滿。當(dāng)環(huán)內(nèi)元素占用的存儲(chǔ)空間較大的時(shí)候,這種辦法顯得很土(浪費(fèi)空間)。
? ? ? ?辦法2:維護(hù)額外變量
? ? ? ?如果不喜歡上述辦法,還可以采用額外的變量來(lái)解決。比如可以用一個(gè)整數(shù)記錄當(dāng)前環(huán)中已經(jīng)保存的元素個(gè)數(shù)(該整數(shù)>=0)。當(dāng)R和W重疊的時(shí)候,通過(guò)該變量就可以知道是“空”還是“滿”。
? ? ? 4.元素的存儲(chǔ)
? ? ? ?由于環(huán)形緩沖區(qū)本身就是要降低存儲(chǔ)空間分配的開(kāi)銷(xiāo),因此緩沖區(qū)中元素的類(lèi)型要選好。盡量存儲(chǔ)值類(lèi)型的數(shù)據(jù),而不要存儲(chǔ)指針(引用)類(lèi)型的數(shù)據(jù)。因?yàn)橹羔橆?lèi)型的數(shù)據(jù)又會(huì)引起存儲(chǔ)空間(比如堆內(nèi)存)的分配和釋放,使得環(huán)形緩沖區(qū)的效果打折扣。
-
應(yīng)用場(chǎng)合
? ? ? ?如果所使用的編程語(yǔ)言和開(kāi)發(fā)庫(kù)中帶有現(xiàn)成的、成熟的環(huán)形緩沖區(qū),建議使用現(xiàn)成的庫(kù),不要重新制造輪子;確實(shí)找不到現(xiàn)成的,才考慮自己實(shí)現(xiàn)。
? ? ? 1.用于并發(fā)線程
? ? ? ?和線程中的隊(duì)列緩沖區(qū)類(lèi)似,線程中的環(huán)形緩沖區(qū)也要考慮線程安全的問(wèn)題。除非使用的環(huán)形緩沖區(qū)的庫(kù)已經(jīng)實(shí)現(xiàn)了線程安全,否則還是得自己動(dòng)手搞定。線程方式下的環(huán)形緩沖區(qū)用得比較多,相關(guān)的網(wǎng)上資料也多,下面就大致介紹幾個(gè)。
? ? ? ?對(duì)于C++的程序員,強(qiáng)烈推薦使用boost提供的circular_buffer模板,該模板最開(kāi)始是在boost 1.35版本中引入的。鑒于boost在C++社區(qū)中的地位,大伙兒應(yīng)該可以放心使用該模板。
? ? ? ?對(duì)于C程序員,可以去看看開(kāi)源項(xiàng)目circbuf,不過(guò)該項(xiàng)目是GPL協(xié)議的,不太爽;而且活躍度不太高;而且只有一個(gè)開(kāi)發(fā)人員。大伙兒慎用!建議只拿它當(dāng)參考。
? ? ? ?對(duì)于C#程序員,可以參考CodeProject上的一個(gè)示例。
? ? ? ?2.用于并發(fā)進(jìn)程
? ? ? ?進(jìn)程間的環(huán)形緩沖區(qū),似乎少有現(xiàn)成的庫(kù)可用。
? ? ? ?適用于進(jìn)程間環(huán)形緩沖的IPC類(lèi)型,常見(jiàn)的有共享內(nèi)存和文件。在這兩種方式上進(jìn)行環(huán)形緩沖,通常都采用數(shù)組的方式實(shí)現(xiàn)。程序事先分配好一個(gè)固定長(zhǎng)度的存儲(chǔ)空間,然后具體的讀寫(xiě)操作、判斷“空”和“滿”、元素存儲(chǔ)等細(xì)節(jié)就可參照前面所說(shuō)的來(lái)進(jìn)行。
? ? ? ?共享內(nèi)存方式的性能很好,適用于數(shù)據(jù)流量很大的場(chǎng)景。但是有些語(yǔ)言(比如Java)對(duì)于共享內(nèi)存不支持。因此,該方式在多語(yǔ)言協(xié)同開(kāi)發(fā)的系統(tǒng)中,會(huì)有一定的局限性。
? ? ? ? 而文件方式在編程語(yǔ)言方面支持很好,幾乎所有編程語(yǔ)言都支持操作文件。但它可能會(huì)受限于磁盤(pán)讀寫(xiě)(Disk I/O)的性能。所以文件方式不太適合于快速數(shù)據(jù)傳輸;但是對(duì)于某些“數(shù)據(jù)單元”很大的場(chǎng)合,文件方式是值得考慮的。
? ? ? ? 對(duì)于進(jìn)程間的環(huán)形緩沖區(qū),同樣要考慮好進(jìn)程間的同步、互斥等問(wèn)題。
8.生產(chǎn)者消費(fèi)者模式三種實(shí)現(xiàn)方式代碼示例
8.1?synchronized、wait和notify ?
package producerConsumer; //wait 和 notify public class ProducerConsumerWithWaitNofity {public static void main(String[] args) {Resource resource = new Resource();//生產(chǎn)者線程ProducerThread p1 = new ProducerThread(resource);ProducerThread p2 = new ProducerThread(resource);ProducerThread p3 = new ProducerThread(resource);//消費(fèi)者線程ConsumerThread c1 = new ConsumerThread(resource);//ConsumerThread c2 = new ConsumerThread(resource);//ConsumerThread c3 = new ConsumerThread(resource);p1.start();p2.start();p3.start();c1.start();//c2.start();//c3.start();}} /*** 公共資源類(lèi)* @author **/ class Resource{//重要//當(dāng)前資源數(shù)量private int num = 0;//資源池中允許存放的資源數(shù)目private int size = 10;/*** 從資源池中取走資源*/public synchronized void remove(){if(num > 0){num--;System.out.println("消費(fèi)者" + Thread.currentThread().getName() +"消耗一件資源," + "當(dāng)前線程池有" + num + "個(gè)");notifyAll();//通知生產(chǎn)者生產(chǎn)資源}else{try {//如果沒(méi)有資源,則消費(fèi)者進(jìn)入等待狀態(tài)wait();System.out.println("消費(fèi)者" + Thread.currentThread().getName() + "線程進(jìn)入等待狀態(tài)");} catch (InterruptedException e) {e.printStackTrace();}}}/*** 向資源池中添加資源*/public synchronized void add(){if(num < size){num++;System.out.println(Thread.currentThread().getName() + "生產(chǎn)一件資源,當(dāng)前資源池有" + num + "個(gè)");//通知等待的消費(fèi)者notifyAll();}else{//如果當(dāng)前資源池中有10件資源try{wait();//生產(chǎn)者進(jìn)入等待狀態(tài),并釋放鎖System.out.println(Thread.currentThread().getName()+"線程進(jìn)入等待");}catch(InterruptedException e){e.printStackTrace();}}} } /*** 消費(fèi)者線程*/ class ConsumerThread extends Thread{private Resource resource;public ConsumerThread(Resource resource){this.resource = resource;}@Overridepublic void run() {while(true){try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}resource.remove();}} } /*** 生產(chǎn)者線程*/ class ProducerThread extends Thread{private Resource resource;public ProducerThread(Resource resource){this.resource = resource;}@Overridepublic void run() {//不斷地生產(chǎn)資源while(true){try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}resource.add();}}}8.2?lock和condition的await、signalAll??
package producerConsumer;import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /*** 使用Lock 和 Condition解決生產(chǎn)者消費(fèi)者問(wèn)題* @author tangzhijing**/ public class LockCondition {public static void main(String[] args) {Lock lock = new ReentrantLock();Condition producerCondition = lock.newCondition();Condition consumerCondition = lock.newCondition();Resource2 resource = new Resource2(lock,producerCondition,consumerCondition);//生產(chǎn)者線程ProducerThread2 producer1 = new ProducerThread2(resource);//消費(fèi)者線程ConsumerThread2 consumer1 = new ConsumerThread2(resource);ConsumerThread2 consumer2 = new ConsumerThread2(resource);ConsumerThread2 consumer3 = new ConsumerThread2(resource);producer1.start();consumer1.start();consumer2.start();consumer3.start();} } /*** 消費(fèi)者線程*/ class ConsumerThread2 extends Thread{private Resource2 resource;public ConsumerThread2(Resource2 resource){this.resource = resource;//setName("消費(fèi)者");}public void run(){while(true){try {Thread.sleep((long) (1000 * Math.random()));} catch (InterruptedException e) {e.printStackTrace();}resource.remove();}} } /*** 生產(chǎn)者線程* @author tangzhijing**/ class ProducerThread2 extends Thread{private Resource2 resource;public ProducerThread2(Resource2 resource){this.resource = resource;setName("生產(chǎn)者");}public void run(){while(true){try {Thread.sleep((long) (1000 * Math.random()));} catch (InterruptedException e) {e.printStackTrace();}resource.add();}} } /*** 公共資源類(lèi)* @author tangzhijing**/ class Resource2{private int num = 0;//當(dāng)前資源數(shù)量private int size = 10;//資源池中允許存放的資源數(shù)目private Lock lock;private Condition producerCondition;private Condition consumerCondition;public Resource2(Lock lock, Condition producerCondition, Condition consumerCondition) {this.lock = lock;this.producerCondition = producerCondition;this.consumerCondition = consumerCondition;}/*** 向資源池中添加資源*/public void add(){lock.lock();try{if(num < size){num++;System.out.println(Thread.currentThread().getName() + "生產(chǎn)一件資源,當(dāng)前資源池有" + num + "個(gè)");//喚醒等待的消費(fèi)者consumerCondition.signalAll();}else{//讓生產(chǎn)者線程等待try {producerCondition.await();System.out.println(Thread.currentThread().getName() + "線程進(jìn)入等待");} catch (InterruptedException e) {e.printStackTrace();}}}finally{lock.unlock();}}/*** 從資源池中取走資源*/public void remove(){lock.lock();try{if(num > 0){num--;System.out.println("消費(fèi)者" + Thread.currentThread().getName() + "消耗一件資源," + "當(dāng)前資源池有" + num + "個(gè)");producerCondition.signalAll();//喚醒等待的生產(chǎn)者}else{try {consumerCondition.await();System.out.println(Thread.currentThread().getName() + "線程進(jìn)入等待");} catch (InterruptedException e) {e.printStackTrace();}//讓消費(fèi)者等待}}finally{lock.unlock();}}}8.3?lock和condition的await、signalAll
package producerConsumer;import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue;//使用阻塞隊(duì)列BlockingQueue解決生產(chǎn)者消費(fèi)者 public class BlockingQueueConsumerProducer {public static void main(String[] args) {Resource3 resource = new Resource3();//生產(chǎn)者線程ProducerThread3 p = new ProducerThread3(resource);//多個(gè)消費(fèi)者ConsumerThread3 c1 = new ConsumerThread3(resource);ConsumerThread3 c2 = new ConsumerThread3(resource);ConsumerThread3 c3 = new ConsumerThread3(resource);p.start();c1.start();c2.start();c3.start();} } /*** 消費(fèi)者線程* @author tangzhijing**/ class ConsumerThread3 extends Thread {private Resource3 resource3;public ConsumerThread3(Resource3 resource) {this.resource3 = resource;//setName("消費(fèi)者");}public void run() {while (true) {try {Thread.sleep((long) (1000 * Math.random()));} catch (InterruptedException e) {e.printStackTrace();}resource3.remove();}} } /*** 生產(chǎn)者線程* @author tangzhijing**/ class ProducerThread3 extends Thread{private Resource3 resource3;public ProducerThread3(Resource3 resource) {this.resource3 = resource;//setName("生產(chǎn)者");}public void run() {while (true) {try {Thread.sleep((long) (1000 * Math.random()));} catch (InterruptedException e) {e.printStackTrace();}resource3.add();}} } class Resource3{private BlockingQueue resourceQueue = new LinkedBlockingQueue(10);/*** 向資源池中添加資源*/public void add(){try {resourceQueue.put(1);System.out.println("生產(chǎn)者" + Thread.currentThread().getName()+ "生產(chǎn)一件資源," + "當(dāng)前資源池有" + resourceQueue.size() + "個(gè)資源");} catch (InterruptedException e) {e.printStackTrace();}}/*** 向資源池中移除資源*/public void remove(){try {resourceQueue.take();System.out.println("消費(fèi)者" + Thread.currentThread().getName() + "消耗一件資源," + "當(dāng)前資源池有" + resourceQueue.size() + "個(gè)資源");} catch (InterruptedException e) {e.printStackTrace();}} }參考文章:
1.https://blog.csdn.net/u011109589/article/details/80519863
2.https://www.cnblogs.com/chentingk/p/6497107.html
3.http://ifeve.com/producers-and-consumers-mode/(并發(fā)編程網(wǎng)--創(chuàng)始人:方騰飛)
4.https://www.cnblogs.com/fankongkong/p/7339848.html
總結(jié)
以上是生活随笔為你收集整理的消息队列:生产者/消费者模式的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 计算机图像处理的未来发展,探讨计算机图像
- 下一篇: 半导体鼻祖:仙童半导体的故事