日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

分布式队列编程优化篇

發(fā)布時間:2024/7/5 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 分布式队列编程优化篇 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

“分布式隊列編程”是一個系列文,之前我們已經(jīng)發(fā)布了《分布式隊列編程模型、實戰(zhàn)》,主要剖析了分布式隊列編程模型的需求來源、定義、結(jié)構(gòu)以及其變化多樣性;根據(jù)作者在新美大實際工作經(jīng)驗,給出了隊列式編程在分布式環(huán)境下的一些具體應(yīng)用。本文將重點闡述工程師運用分布式隊列編程構(gòu)架的時候,在生產(chǎn)者、分布式隊列以及消費者這三個環(huán)節(jié)的注意點以及優(yōu)化建議。

確定采用分布式隊列編程模型之后,主體架構(gòu)就算完成了,但工程師的工作還遠遠未結(jié)束。天下事必做于細,細節(jié)是一個不錯的架構(gòu)向一個優(yōu)秀的系統(tǒng)進階的關(guān)鍵因素。優(yōu)化篇選取了作者以及其同事在運用分布式隊列編程模型架構(gòu)時所碰到的典型問題和解決方案。這里些問題出現(xiàn)的頻率較高,如果你經(jīng)驗不夠,很可能會“踩坑”。希望通過這些講解,幫助讀者降低分布式隊列編程模型的使用門檻。本文將對分布式隊列編程模型的三種角色:生產(chǎn)者(Producer),分布式隊列(Queue),消費者(Consumer)分別進行優(yōu)化討論。

在分布式隊列編程中,生產(chǎn)者往往并非真正的生產(chǎn)源頭,只是整個數(shù)據(jù)流中的一個節(jié)點,這種生產(chǎn)者的操作是處理-轉(zhuǎn)發(fā)(Process-Forward)模式。

這種模式給工程師們帶來的第一個問題是吞吐量問題。這種模式下運行的生產(chǎn)者,一邊接收上游的數(shù)據(jù),一邊將處理完的數(shù)據(jù)發(fā)送給下游。本質(zhì)上,它是一個非常經(jīng)典的數(shù)學(xué)問題,其抽象模型是一些沒有蓋子的水箱,每個水箱接收來自上一個水箱的水,進行處理之后,再將水發(fā)送到下一個水箱。工程師需要預(yù)測水源的流量、每個環(huán)節(jié)水箱的處理能力、水龍頭的排水速度,最終目的是避免水溢出水箱,或者盡可能地減小溢出事件的概率。實際上流式編程框架以及其開發(fā)者花了大量的精力去處理和優(yōu)化這個問題。下文的緩存優(yōu)化和批量寫入優(yōu)化都是針對該問題的解決方案。

第二個需要考慮的問題是持久化。由于各種原因,系統(tǒng)總是會宕機。如果信息比較敏感,例如計費信息、火車票訂單信息等,工程師們需要考慮系統(tǒng)宕機所帶來的損失,找到讓損失最小化的解決方案。持久化優(yōu)化重點解決這一類問題。

緩存優(yōu)化

處于“處理-轉(zhuǎn)發(fā)”模式下運行的生產(chǎn)者往往被設(shè)計成請求驅(qū)動型的服務(wù),即每個請求都會觸發(fā)一個處理線程,線程處理完后將結(jié)果寫入分布式隊列。如果由于某種原因隊列服務(wù)不可用,或者性能惡化,隨著新請求的到來,生產(chǎn)者的處理線程就會產(chǎn)生堆積。這可能會導(dǎo)致如下兩個問題: * 系統(tǒng)可用性降低。由于每個線程都需要一定的內(nèi)存開銷,線程過多會使系統(tǒng)內(nèi)存耗盡,甚至可能產(chǎn)生雪崩效應(yīng)導(dǎo)致最終完全不可用。 * 信息丟失。為了避免系統(tǒng)崩潰,工程師可能會給請求驅(qū)動型服務(wù)設(shè)置一個處理線程池,設(shè)置最大處理線程數(shù)量。這是一種典型的降級策略,目的是為了系統(tǒng)崩潰。但是,后續(xù)的請求會因為沒有處理線程而被迫阻塞,最終可能產(chǎn)生信息丟失。例如:對于廣告計費采集,如果采集系統(tǒng)因為線程耗盡而不接收客戶端的計費行為,這些計費行為就會丟失。

緩解這類問題的思路來自于CAP理論,即通過降低一致性來提高可用性。生產(chǎn)者接收線程在收到請求之后第一時間不去處理,直接將請求緩存在內(nèi)存中(犧牲一致性),而在后臺啟動多個處理線程從緩存中讀取請求、進行處理并寫入分布式隊列。與線程所占用的內(nèi)存開銷相比,大部分的請求所占內(nèi)存幾乎可以忽略。通過在接收請求和處理請求之間增加一層內(nèi)存緩存,可以大大提高系統(tǒng)的處理吞吐量和可擴展性。這個方案本質(zhì)上是一個內(nèi)存生產(chǎn)者消費者模型。

批量寫入優(yōu)化

如果生產(chǎn)者的請求過大,寫分布式隊列可能成為性能瓶頸,有如下幾個因素: * 隊列自身性能不高。 * 分布式隊列編程模型往往被應(yīng)用在跨機房的系統(tǒng)里面,跨機房的網(wǎng)絡(luò)開銷往往容易成為系統(tǒng)瓶頸。 * 消息確認機制往往會大大降低隊列的吞吐量以及響應(yīng)時間。

如果在處理請求和寫隊列之間添加一層緩存,消息寫入程序批量將消息寫入隊列,可以大大提高系統(tǒng)的吞吐量。原因如下: * 批量寫隊列可以大大減少生產(chǎn)者和分布式隊列的交互次數(shù)和消息傳輸量。特別是對于高吞吐小載荷的消息實體,批量寫可以顯著降低網(wǎng)絡(luò)傳輸量。 * 對于需要確認機制的消息,確認機制往往會大大降低隊列的吞吐量以及響應(yīng)時間,某些高敏感的消息需要多個消息中間件代理同時確認,這近一步惡化性能。在生產(chǎn)者的應(yīng)用層將多條消息批量組合成一個消息體,消息中間件就只需要對批量消息進行一次確認,這可能會數(shù)量級的提高消息傳輸性能。

持久化優(yōu)化

通過添加緩存,消費者服務(wù)的吞吐量和可用性都得到了提升。但緩存引入了一個新問題——內(nèi)存數(shù)據(jù)丟失。對于敏感數(shù)據(jù),工程師需要考慮如下兩個潛在問題: * 如果內(nèi)存中存在未處理完的請求,而某些原因?qū)е律a(chǎn)者服務(wù)宕機,內(nèi)存數(shù)據(jù)就會丟失而可能無法恢復(fù)。 * 如果分布式隊列長時間不可用,隨著請求數(shù)量的不斷增加,最終系統(tǒng)內(nèi)存可能會耗盡而崩潰,內(nèi)存的消息也可能丟失。

所以緩存中的數(shù)據(jù)需要定期被持久化到磁盤等持久層設(shè)備中,典型的持久化觸發(fā)策略主要有兩種: * 定期觸發(fā),即每隔一段時間進行一次持久化。 * 定量觸發(fā),即每當(dāng)緩存中的請求數(shù)量達到一定閾值后進行持久化。 是否需要持久化優(yōu)化,以及持久化策略應(yīng)該由請求數(shù)據(jù)的敏感度、請求量、持久化性能等因素共同決定。

分布式隊列不等同于各種開源的或者收費的消息中間件,甚至在一些場景下完全不需要使用消息中間件。但是,消息中間件產(chǎn)生的目的就是解決消息傳遞問題,這為分布式隊列編程架構(gòu)提供了很多的便利。在實際工作中,工程師們應(yīng)該將成熟的消息中間件作為隊列的首要備選方案。 本小節(jié)對消息中間件的功能、模型進行闡述,并給出一些消息中間件選型、部署的具體建議。

中間件的功能

明白一個系統(tǒng)的每個具體功能是設(shè)計和架構(gòu)一個系統(tǒng)的基礎(chǔ)。典型的消息中間件主要包含如下幾個功能: * 消息接收 * 消息分發(fā) * 消息存儲 * 消息讀取

概念模型

抽象的消息中間件模型包含如下幾個角色: * 發(fā)送者和接收者客戶端(Sender/Receiver Client),在具體實施過程中,它們一般以庫的形式嵌入到應(yīng)用程序代碼中。 * 代理服務(wù)器(Broker Server),它們是與客戶端代碼直接交互的服務(wù)端代碼。 * 消息交換機(Exchanger),接收到的消息一般需要通過消息交換機(Exchanger)分發(fā)到具體的消息隊列中。 * 消息隊列,一般是一塊內(nèi)存數(shù)據(jù)結(jié)構(gòu)或持久化數(shù)據(jù)。 概念模型如下圖:

為了提高分發(fā)性能,很多消息中間件把消息代理服務(wù)器的拓撲圖發(fā)送到發(fā)送者和接收者客戶端(Sender/Receiver Client),如此一來,發(fā)送源可以直接進行消息分發(fā)。

選型標(biāo)準(zhǔn)

要完整的描述消息中間件各個方面非常困難,大部分良好的消息中間件都有完善的文檔,這些文檔的長度遠遠超過本文的總長度。但如下幾個標(biāo)準(zhǔn)是工程師們在進行消息中間件選型時經(jīng)常需要考慮和權(quán)衡的。

性能

性能主要有兩個方面需要考慮:吞吐量(Throughput)和響應(yīng)時間(Latency)。 不同的消息隊列中間件的吞吐量和響應(yīng)時間相差甚遠,在選型時可以去網(wǎng)上查看一些性能對比報告。 對于同一種中間件,不同的配置方式也會影響性能。主要有如下幾方面的配置: * 是否需要確認機制,即寫入隊列后,或從隊列讀取后,是否需要進行確認。確認機制對響應(yīng)時間的影響往往很大。 * 能否批處理,即消息能否批量讀取或者寫入。批量操作可以大大減少應(yīng)用程序與消息中間件的交互次數(shù)和消息傳遞量,大大提高吞吐量。 * 能否進行分區(qū)(Partition)。將某一主題消息隊列進行分區(qū),同一主題消息可以有多臺機器并行處理。這不僅僅能影響消息中間件的吞吐量,還決定著消息中間件是否具備良好的可伸縮性(Scalability)。 * 是否需要進行持久化。將消息進行持久化往往會同時影響吞吐量和響應(yīng)時間。

可靠性

可靠性主要包含:可用性、持久化、確認機制等。 高可用性的消息中間件應(yīng)該具備如下特征: * 消息中間件代理服務(wù)器(Broker)具有主從備份。即當(dāng)一臺代理服務(wù)宕機之后,備用服務(wù)器能接管相關(guān)的服務(wù)。 * 消息中間件中緩存的消息是否有備份、并持久化。 根據(jù)CAP理論,高可用、高一致性以及網(wǎng)絡(luò)分裂不可兼得。根據(jù)作者的觀察,大部分的消息中間件在面臨網(wǎng)絡(luò)分裂的情況下下,都很難保證數(shù)據(jù)的一致性以及可用性。 很多消息中間件都會提供一些可配置策略,讓使用者在可用性和一致性之間做權(quán)衡。

高可靠的消息中間件應(yīng)該確保從發(fā)送者接收到的消息不會丟失。中間件代理服務(wù)器的宕機并不是小概率事件,所以保存在內(nèi)存中的消息很容易發(fā)生丟失。大部分的消息中間件都依賴于消息的持久化去降低消息丟失損失,即將接收到的消息寫入磁盤。即使提供持久化,仍有兩個問題需要考慮: * 磁盤損壞問題。長時間來看,磁盤出問題的概率仍然存在。 * 性能問題。與操作內(nèi)存相比,磁盤I/O的操作性能要慢幾個數(shù)量級。頻繁持久化不僅會增加響應(yīng)時間,也會降低吞吐量。 解決這兩個問題的一個解決方案就是:多機確認,定期持久化。即消息被緩存在多臺機器的內(nèi)存中,只有每臺機器都確認收到消息,才跟發(fā)送者確認(很多消息中間件都會提供相應(yīng)的配置選項,讓用戶設(shè)置最少需要多少臺機器接收到消息)。由于多臺獨立機器同時出故障的概率遵循乘法法則,指數(shù)級降低,這會大大提高消息中間件的可靠性。

確認機制本質(zhì)上是通訊的握手機制(Handshaking)。如果沒有該機制,消息在傳輸過程中丟失將不會被發(fā)現(xiàn)。高敏感的消息要求選取具備確認機制的消息中間件。當(dāng)然如果沒有接收到消息中間件確認完成的指令,應(yīng)用程序需要決定如何處理。典型的做法有兩個: * 多次重試。 * 暫存到本地磁盤或其它持久化媒介。

客戶端接口所支持語言

采用現(xiàn)存消息中間件就意味著避免重復(fù)造輪子。如果某個消息中間件未能提供對應(yīng)語言的客戶端接口,則意味著極大的成本和兼容性問題。

投遞策略(Delivery policies)

投遞策略指的是一個消息會被發(fā)送幾次。主要包含三種策略:最多一次(At most Once )、最少一次(At least Once)、僅有一次(Exactly Once)。 在實際應(yīng)用中,只考慮消息中間件的投遞策略并不能保證業(yè)務(wù)的投遞策略,因為接收者在確認收到消息和處理完消息并持久化之間存在一個時間窗口。例如,即使消息中間件保證僅有一次(Exactly Once),如果接收者先確認消息,在持久化之前宕機,則該消息并未被處理。從應(yīng)用的角度,這就是最多一次(At most Once)。反之,接收者先處理消息并完成持久化,但在確認之前宕機,消息就要被再次發(fā)送,這就是最少一次(At least Once)。 如果消息投遞策略非常重要,應(yīng)用程序自身也需要仔細設(shè)計。

消費者是分布式隊列編程中真正的數(shù)據(jù)處理方,數(shù)據(jù)處理方最常見的挑戰(zhàn)包括:有序性、串行化(Serializability)、頻次控制、完整性和一致性等。

挑戰(zhàn)

有序性

在很多場景下,如何保證隊列信息的有序處理是一個棘手的問題。如下圖,假定分布式隊列保證請求嚴(yán)格有序,請求ri2和ri1都是針對同一數(shù)據(jù)記錄的不同狀態(tài),ri2的狀態(tài)比ri1的狀態(tài)新。T1、T2、T3和T4代表各個操作發(fā)生的時間,并且 T1 < T2 < T3 < T4(”<“代表早于)。 采用多消費者架構(gòu),這兩條記錄被兩個消費者(Consumer1和Consumer2)處理后更新到數(shù)據(jù)庫里面。Consumer1雖然先讀取ri1但是卻后寫入數(shù)據(jù)庫,這就導(dǎo)致,新的狀態(tài)被老的狀態(tài)覆蓋,所以多消費者不保證數(shù)據(jù)的有序性。

串行化

很多場景下,串行化是數(shù)據(jù)處理的一個基本需求,這是保證數(shù)據(jù)完整性、可恢復(fù)性、事務(wù)原子性等的基礎(chǔ)。為了在并行計算系統(tǒng)里實現(xiàn)串行化,一系列的相關(guān)理論和實踐算法被提出。對于分布式隊列編程架構(gòu),要在在多臺消費者實現(xiàn)串行化非常復(fù)雜,無異于重復(fù)造輪子。

頻次控制

有時候,消費者的消費頻次需要被控制,可能的原因包括: * 費用問題。如果每次消費所引起的操作都需要收費,而同一個請求消息在隊列中保存多份,不進行頻次控制,就會導(dǎo)致無謂的浪費。 * 性能問題。每次消費可能會引起對其他服務(wù)的調(diào)用,被調(diào)用服務(wù)希望對調(diào)用量有所控制,對同一個請求消息的多次訪問就需要有所控制。

完整性和一致性

完整性和一致性是所有多線程和多進程的代碼都面臨的問題。在多線程或者多進程的系統(tǒng)中考慮完整性和一致性往往會大大地增加代碼的復(fù)雜度和系統(tǒng)出錯的概率。

單例服務(wù)優(yōu)化

幾乎所有串行化理論真正解決的問題只有一個:性能。 所以,在性能允許的前提下,對于消費者角色,建議采用單實例部署。通過單實例部署,有序性、串行化、完整性和一致性問題自動獲得了解決。另外,單實例部署的消費者擁有全部所需信息,它可以在頻次控制上采取很多優(yōu)化策略。

天下沒有免費的午餐。同樣,單實例部署并非沒有代價,它意味著系統(tǒng)可用性的降低,很多時候,這是無法接受的。解決可用性問題的最直接的思路就是冗余(Redundancy)。最常用的冗余方案是Master-slave架構(gòu),不過大部分的Master-slave架構(gòu)都是Active/active模式,即主從服務(wù)器都提供服務(wù)。例如,數(shù)據(jù)庫的Master-slave架構(gòu)就是主從服務(wù)器都提供讀服務(wù),只有主服務(wù)器提供寫服務(wù)。大部分基于負載均衡設(shè)計的Master-slave集群中,主服務(wù)器和從服務(wù)器同時提供相同的服務(wù)。這顯然不滿足單例服務(wù)優(yōu)化需求。有序性和串行化需要Active/passive架構(gòu),即在某一時刻只有主實例提供服務(wù),其他的從服務(wù)等待主實例失效。這是典型的領(lǐng)導(dǎo)人選舉架構(gòu),即只有獲得領(lǐng)導(dǎo)權(quán)的實例才能充當(dāng)實際消費者,其他實例都在等待下一次選舉。采用領(lǐng)導(dǎo)人選舉的Active/passive架構(gòu)可以大大緩解純粹的單實例部署所帶來的可用性問題。

令人遺憾的是,除非工程師們自己在消費者實例里面實現(xiàn)Paxos等算法,并在每次消息處理之前都執(zhí)行領(lǐng)導(dǎo)人選舉。否則,理論上講,沒有方法可以保障在同一個時刻只有一個領(lǐng)導(dǎo)者。而對每個消息都執(zhí)行一次領(lǐng)導(dǎo)人選舉,顯然性能不可行。實際工作中,最容易出現(xiàn)的問題時機發(fā)生在領(lǐng)導(dǎo)人交接過程中,即前任領(lǐng)導(dǎo)人實例變成輔助實例,新部署實例開始承擔(dān)領(lǐng)導(dǎo)人角色。為了平穩(wěn)過渡,這兩者之間需要有一定的通訊機制,但是,無論是網(wǎng)絡(luò)分區(qū)(Network partition)還是原領(lǐng)導(dǎo)人服務(wù)崩潰都會使這種通訊機制變的不可能。

對于完整性和一致性要求很高的系統(tǒng),我們需要在選舉制度和交接制度這兩塊進行優(yōu)化。

領(lǐng)導(dǎo)人選舉架構(gòu)

典型的領(lǐng)導(dǎo)人選舉算法有Paxos、ZAB( ZooKeeper Atomic Broadcast protocol)。為了避免重復(fù)造輪子,建議采用ZooKeeper的分布式鎖來實現(xiàn)領(lǐng)導(dǎo)人選舉。典型的ZooKeeper實現(xiàn)算法如下(摘自參考資料[4]):

Let ELECTION be a path of choice of the application. To volunteer to be a leader:

1.Create znode z with path “ELECTION/guid-n_” with both SEQUENCE and EPHEMERAL flags; 2.Let C be the children of “ELECTION”, and i be the sequence number of z; 3.Watch for changes on “ELECTION/guid-n_j”, where j is the largest sequence number such that j < i and n_j is a znode in C;

Upon receiving a notification of znode deletion:

1.Let C be the new set of children of ELECTION; 2.If z is the smallest node in C, then execute leader procedure; 3.Otherwise, watch for changes on “ELECTION/guid-n_j”, where j is the largest sequence number such that j < i and n_j is a znode in C;

領(lǐng)導(dǎo)人交接架構(gòu)

領(lǐng)導(dǎo)人選舉的整個過程發(fā)生在ZooKeeper集群中,各個消費者實例在這場選舉中只充當(dāng)被告知者角色(Learner)。領(lǐng)導(dǎo)人選舉算法,只能保證最終只有一個Leader被選舉出來,并不保障被告知者對Leader的理解是完全一致的。本質(zhì)上,上文的架構(gòu)里,選舉的結(jié)果是作為令牌(Token)傳遞給消費者實例,消費者將自身的ID與令牌進行對比,如果相等,則開始執(zhí)行消費操作。所以當(dāng)發(fā)生領(lǐng)導(dǎo)人換屆的情況,不同的Learner獲知新Leader的時間并不同。例如,前任Leader如果因為網(wǎng)絡(luò)問題與ZooKeeper集群斷開,前任Leader只能在超時后才能判斷自己是否不再承擔(dān)Leader角色了,而新的Leader可能在這之前已經(jīng)產(chǎn)生。另一方面,即使前任Leader和新Leader同時接收到新Leader選舉結(jié)果,某些業(yè)務(wù)的完整性要求迫使前任Leader仍然完成當(dāng)前未完成的工作。以上的講解非常抽象,生活中卻給了一些更加具體的例子。眾所周知,美國總統(tǒng)候選人在選舉結(jié)束后并不直接擔(dān)任美國總統(tǒng),從選舉到最終承擔(dān)總統(tǒng)角色需要一個過渡期。對于新當(dāng)選Leader的候選人而言,過渡期間稱之為加冕階段(Inauguration)。對于即將卸任的Leader,過渡期稱為交接階段(HandOver)。所以一個基于領(lǐng)導(dǎo)人選舉的消費者從加冕到卸任經(jīng)歷三個階段:Inauguration、Execution、HandOver。在加冕階段,新領(lǐng)導(dǎo)需要進行一些初始化操作。Execution階段是真正的隊列消息處理階段。在交接階段,前任領(lǐng)導(dǎo)需要進行一些清理操作。

類似的,為了解決領(lǐng)導(dǎo)人交接問題,所有的消費者從代碼實現(xiàn)的角度都需要實現(xiàn)類似ILeaderCareer接口。這個接口包含三個方發(fā)inaugurate(),handOver()和execute()。某個部署實例(Learner)在得知自己承擔(dān)領(lǐng)導(dǎo)人角色后,需要調(diào)用inaugurate()方法,進行加冕。主要的消費邏輯通過不停的執(zhí)行execute()實現(xiàn),當(dāng)確認自己不再承擔(dān)領(lǐng)導(dǎo)人之后,執(zhí)行handOver()進行交接。

public interface ILeaderCareer {public void inaugurate();public void handOver();public boolean execute(); }

如果承擔(dān)領(lǐng)導(dǎo)人角色的消費者,在執(zhí)行execute()階段得知自己將要下臺,根據(jù)消息處理的原子性,該領(lǐng)導(dǎo)人可以決定是否提前終止操作。如果整個消息處理是一個原子性事務(wù),直接終止該操作可以快速實現(xiàn)領(lǐng)導(dǎo)人換屆。否則,前任領(lǐng)導(dǎo)必須完成當(dāng)前消息處理后,才進入交接階段。這意味著新的領(lǐng)導(dǎo)人,在inaugurate()階段需要進行一定時間的等待。

排重優(yōu)化

頻次控制是一個經(jīng)典問題。對于分布式隊列編程架構(gòu),相同請求重復(fù)出現(xiàn)在隊列的情況并不少見。如果相同請求在隊列中重復(fù)太多,排重優(yōu)化就顯得很必要。分布式緩存更新是一個典型例子,所有請求都被發(fā)送到隊列中用于緩存更新。如果請求符合典型的高斯分布,在一段時間內(nèi)會出現(xiàn)大量重復(fù)的請求,而同時多線程更新同一請求緩存顯然沒有太大的意義。 排重優(yōu)化是一個算法,其本質(zhì)是基于狀態(tài)機的編程,整個講解通過模型、構(gòu)思和實施三個步驟完成。

模型

進行排重優(yōu)化的前提是大量重復(fù)的請求。在模型這一小節(jié),我們首先闡述重復(fù)度模型、以及不同重復(fù)度所導(dǎo)致的消費模型,最后基于這兩個模型去講解排重狀態(tài)機。

重復(fù)度模型

首先我們給出最小重復(fù)長度的概念。同一請求最小重復(fù)長度:同一請求在隊列中的重復(fù)出現(xiàn)的最小間距。例如,請求ri第一次出現(xiàn)在位置3,第二次出現(xiàn)在10,最小重復(fù)長度等于7。 是否需要進行排重優(yōu)化取決于隊列中請求的重復(fù)度。由于不同請求之間并不存在重復(fù)的問題,不失一般性,這里的模型只考了單個請求的重復(fù)度,重復(fù)度分為三個類:無重復(fù)、稀疏重復(fù)、高重復(fù)。 無重復(fù):在整個請求過程,沒有任何一個請求出現(xiàn)一次以上。 稀疏重復(fù):主要的請求最小重復(fù)長度大于消費隊列長度。 高重復(fù):大量請求最小重復(fù)長度小于消費隊列長度。 對于不同的重復(fù)度,會有不同的消費模型。

無重復(fù)消費模型

在整個隊列處理過程中,所有的請求都不相同,如下圖:

稀疏重復(fù)消費模型

當(dāng)同一請求最小重復(fù)長度大于消費者隊列長度,如下圖。假定有3個消費者,Consumer1將會處理r1,Consumer2將會處理r2,Consumer3將會處理r3,如果每個請求處理的時間嚴(yán)格相等,Consumer1在處理完r1之后,接著處理r4,Consumer2將會處理r2之后會處理r1。雖然r1被再次處理,但是任何時刻,只有這一個消費者在處理r1,不會出現(xiàn)多個消費者同時處理同一請求的場景。

高重復(fù)消費模型

如下圖,仍然假定有3個消費者,隊列中前面4個請求都是r1,它會同時被3個消費者線程處理:

顯然,對于無重復(fù)和稀疏重復(fù)的分布式隊列,排重優(yōu)化并不會帶來額外的好處。排重優(yōu)化所針對的對象是高重復(fù)消費模型,特別是對于并行處理消費者比較多的情況,重復(fù)處理同一請求,資源消耗極大。

排重狀態(tài)機

排重優(yōu)化的主要對象是高重復(fù)的隊列,多個消費者線程或進程同時處理同一個冪等請求只會浪費計算資源并延遲其他待請求處理。所以,排重狀態(tài)機的一個目標(biāo)是處理唯一性,即:同一時刻,同一個請求只有一個消費者處理。如果消費者獲取一條請求消息,但發(fā)現(xiàn)其他消費者正在處理該消息,則當(dāng)前消費者應(yīng)該處于等待狀態(tài)。如果對同一請求,有一個消費者在處理,一個消費者在等待,而同一請求再次被消費者讀取,再次等待則沒有意義。所以,狀態(tài)機的第二個目標(biāo)是等待唯一性,即:同一時刻,同一個請求最多只有一個消費者處于等待狀態(tài)。總上述,狀態(tài)機的目標(biāo)是:處理唯一性和等待唯一性。我們把正在處理的請求稱為頭部請求,正在等待的請求稱為尾部請求。 由于狀態(tài)機的處理單元是請求,所以需要針對每一個請求建立一個排重狀態(tài)機。基于以上要求,我們設(shè)計的排重狀態(tài)機包含4個狀態(tài)Init,Process,Block,Decline。各個狀態(tài)之間轉(zhuǎn)化過程如下圖:

  • 狀態(tài)機創(chuàng)建時處于Init狀態(tài)。
  • 對Init狀態(tài)進行Enqueue操作,即接收一個請求,開始處理(稱為頭部請求),狀態(tài)機進入Process狀態(tài)。
  • 狀態(tài)機處于Process狀態(tài),表明當(dāng)前有消費者正在處理頭部請求。此時,如果進行Dequeue操作,即頭部請求處理完成,返回Init狀態(tài)。如果進行Enqueue操作,即另一個消費者準(zhǔn)備處理同一個請求,狀態(tài)機進入Block狀態(tài)(該請求稱為尾部請求)。
  • 狀態(tài)機處于Block狀態(tài),表明頭部請求正在處理,尾部請求處于阻塞狀態(tài)。此時,進行Dequeue操作,即頭部請求處理完成,返回Process狀態(tài),并且尾部請求變成頭部請求,原尾部請求消費者結(jié)束阻塞狀態(tài),開始處理。進行Enqueue操作,表明一個新的消費者準(zhǔn)備處理同一個請求,狀態(tài)機進入Decline狀態(tài)。
  • 狀態(tài)機進入Decline狀態(tài),根據(jù)等待唯一性目標(biāo),處理最新請求的消費者將被拋棄該消息,狀態(tài)機自動轉(zhuǎn)換回Block狀態(tài)。
  • 構(gòu)思

    狀態(tài)機描述的是針對單個請求操作所引起狀態(tài)變化,排重優(yōu)化需要解決隊列中所有請求的排重問題,需要對所有請求的狀態(tài)機進行管理。這里只考慮單虛擬機內(nèi)部對所有請求狀態(tài)機的管理,對于跨虛擬機的管理可以采用類似的方法。對于多狀態(tài)機管理主要包含三個方面:一致性問題、完整性問題和請求緩存驅(qū)逐問題。

    一致性問題

    一致性在這里要求同一請求的不同消費者只會操作一個狀態(tài)機。由于每個請求都產(chǎn)生一個狀態(tài)機,系統(tǒng)將會包含大量的狀態(tài)機。為了兼顧性能和一致性,我們采用ConcurrentHashMap保存所有的狀態(tài)機。用ConcurrentHashMap而不是對整個狀態(tài)機隊列進行加鎖,可以提高并行處理能力,使得系統(tǒng)可以同時操作不同狀態(tài)機。為了避免處理同一請求的多消費者線程同時對ConcurrentHashMap進行插入所導(dǎo)致狀態(tài)機不一致問題,我們利用了ConcurrentHashMap的putIfAbsent()方法。代碼方案如下,key2Status用于存儲所有的狀態(tài)機。消費者在處理請求之前,從狀態(tài)機隊列中讀取排重狀態(tài)機TrafficAutomate。如果沒有找到,則創(chuàng)建一個新的狀態(tài)機,并通過putIfAbsent()方法插入到狀態(tài)機隊列中。

    private ConcurrentHashMap<T, TrafficAutomate> key2Status = new ConcurrentHashMap(); TrafficAutomate trafficAutomate = key2Status.get(key); if(trafficAutomate == null) {trafficAutomate = new TrafficAutomate();TrafficAutomate oldAutomate = key2Status.putIfAbsent(key, trafficAutomate);if(oldAutomate != null){trafficAutomate = oldAutomate;} }

    完整性問題

    完整性要求保障狀態(tài)機Init,Process,Block,Decline四種狀態(tài)正確、狀態(tài)之間的轉(zhuǎn)換也正確。由于狀態(tài)機的操作非常輕量級,兼顧完整性和降低代碼復(fù)雜度,我們對狀態(tài)機的所有方法進行加鎖。

    請求緩存驅(qū)逐問題(Cache Eviction)

    如果不同請求的數(shù)量太多,內(nèi)存永久保存所有請求的狀態(tài)機的內(nèi)存開銷太大。所以,某些狀態(tài)機需要在恰當(dāng)?shù)臅r候被驅(qū)逐出內(nèi)存。這里有兩個思路: * 當(dāng)狀態(tài)機返回Init狀態(tài)時,清除出隊列。 * 啟動一個后臺線程,定時掃描狀態(tài)機隊列,采用LRU等標(biāo)準(zhǔn)緩存清除機制。

    標(biāo)識問題

    每個請求對應(yīng)于一個狀態(tài)機,不同的狀態(tài)機采用不同的請求進行識別。 對于同一狀態(tài)機的不同消費者,在單虛擬機方案中,我們采用線程id進行標(biāo)識。

    實施

    排重優(yōu)化的主要功能都是通過排重狀態(tài)機(TrafficAutomate)和狀態(tài)機隊列(QueueCoordinator)來實施的。排重狀態(tài)機描述的是針對單個請求的排重問題,狀態(tài)機隊列解決所有請求狀態(tài)機的排重問題。

    狀態(tài)機實施(TrafficAutomate)

    根據(jù)狀態(tài)機模型,其主要操作為enQueue和deQueue,其狀態(tài)由頭部請求和尾部請求的狀態(tài)共同決定,所以需要定義兩個變量為head和tail,用于表示頭部請求和尾部請求。為了確保多線程操作下狀態(tài)機的完整性(Integraty),所有的操作都將加上鎖。

    enQueue操作

    當(dāng)一個消費者執(zhí)行enQueue操作時:如果此時尾部請求不為空,根據(jù)等待唯一性要求,返回DECLINE,當(dāng)前消費者應(yīng)該拋棄該請求;如果頭部請求為空,返回ACCPET,當(dāng)前消費者應(yīng)該立刻處理該消息;否則,返回BLOCK,該消費者應(yīng)該等待,并不停的查看狀態(tài)機的狀態(tài),一直到頭部請求處理完成。enQueue代碼如下:

    synchronized ActionEnum enQueue(long id) { if(tail != INIT_QUEUE_ID){return DECLINE;}if(head == INIT_QUEUE_ID){head = id;return ACCEPT;}else{tail = id;return BLOCK;} }
    deQueue操作

    對于deQueue操作,首先將尾部請求賦值給頭部請求,并將尾部請求置為無效。deQueue代碼如下:

    synchronized boolean deQueue(long id) {head = tail;tail = INIT_QUEUE_ID;return true; }

    狀態(tài)機隊列實施(QueueCoordinator)

    接口定義

    狀態(tài)機隊列集中管理所有請求的排重狀態(tài)機,所以其操作和單個狀態(tài)機一樣,即enQueue和deQueuqe接口。這兩個接口的實現(xiàn)需要識別特定請求的狀態(tài)機,所以它們的入?yún)?yīng)該是請求。為了兼容不同類型的請求消息,我們采用了Java泛型編程。接口定義如下:

    public interface QueueCoordinator<T> {public boolean enQueue(T key);public void deQueue(T key);}
    enQueue操作

    enQueue操作過程如下: 首先,根據(jù)傳入的請求key值,獲取狀態(tài)機, 如果不存在則創(chuàng)建一個新的狀態(tài)機,并保存在ConcurrentHashMap中。 接下來,獲取線程id作為該消費者的唯一標(biāo)識,并對對應(yīng)狀態(tài)機進行enQueue操作。 如果狀態(tài)機返回值為ACCEPT或者DECLINE,返回業(yè)務(wù)層處理代碼,ACCEPT意味著業(yè)務(wù)層需要處理該消息,DECLINE表示業(yè)務(wù)層可以拋棄當(dāng)前消息。如果狀態(tài)機返回值為Block,則該線程保持等待狀態(tài)。 異常處理。在某些情況下,頭部請求線程可能由于異常,未能對狀態(tài)機進行deQueue操作(作為組件提供方,不能假定所有的規(guī)范被使用方實施)。為了避免處于阻塞狀態(tài)的消費者無期限地等待,建議對狀態(tài)機設(shè)置安全超時時限。超過了一定時間后,狀態(tài)機強制清空頭部請求,返回到業(yè)務(wù)層,業(yè)務(wù)層開始處理該請求。 代碼如下:

    public boolean enQueue(T key) {_loggingStastic();TrafficAutomate trafficAutomate = key2Status.get(key);if(trafficAutomate == null){trafficAutomate = new TrafficAutomate();TrafficAutomate oldAutomate = key2Status.putIfAbsent(key, trafficAutomate);if(oldAutomate != null){trafficAutomate = oldAutomate;}}long threadId = Thread.currentThread().getId();ActionEnum action = trafficAutomate.enQueue(threadId);if(action == DECLINE){return false;}else if (action == ACCEPT){return true;}//Blocking status means some other thread are working on this key, so just wait till timeoutlong start = System.currentTimeMillis();long span = 0;do {_nonExceptionSleep(NAP_TIME_IN_MILL);if(trafficAutomate.isHead(threadId)){return true;}span = System.currentTimeMillis() - start;}while(span <= timeout);//remove head so that it won't block the queue for too longtrafficAutomate.evictHeadByForce(threadId);return true; }
    deQueue操作

    deQueue操作首先從ConcurrentHashMap獲取改請求所對應(yīng)的狀態(tài)機,接著獲取該線程的線程id,對狀態(tài)機進行deQueue操作。 enQueue代碼如下:

    public void deQueue(T key) {TrafficAutomate trafficAutomate = key2Status.get(key);if(trafficAutomate == null){logger.error("key {} doesn't exist ", key);return;}long threadId = Thread.currentThread().getId();trafficAutomate.deQueue(threadId); }

    源代碼

    完整源代碼可以在QueueCoordinator獲取。

    [1] Rabbit MQ, Highly Available Queues. [2] IBM Knowledge Center, Introduction to message queuing. [3] Wikipedia, Serializability. [4] Hadoop, ZooKeeper Recipes and Solutions. [5] Apache Kafka. [6] Lamport L, Paxos Made Simple.

    創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎勵來咯,堅持創(chuàng)作打卡瓜分現(xiàn)金大獎

    總結(jié)

    以上是生活随笔為你收集整理的分布式队列编程优化篇的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。