压箱底总结:流系统端到端一致性对比
點(diǎn)擊上方“朱小廝的博客”,選擇“設(shè)為星標(biāo)”
回復(fù)”666“獲取公眾號(hào)專屬資料
分布式最難的2個(gè)問(wèn)題:
1、Exactly Once Message processing。
2、保證消息處理順序。
我們今天著重來(lái)討論一下:
為什么很難;
怎么解。
前言
這篇文章可以說(shuō)是作者壓箱底兒的知識(shí)總結(jié)(之一,畢竟作者學(xué)的東西很雜 ╮( ̄▽ ̄"")╭ )了,斷斷續(xù)續(xù)寫了將近三個(gè)月, 耗費(fèi)了大量的精力。
本來(lái)的目的只是想對(duì)比一下各個(gè)state of art的流系統(tǒng)有什么不同, 但是寫出來(lái)之后只是亂糟糟的羅列數(shù)據(jù)和資料,像這樣列數(shù)據(jù)一樣,“介紹下這個(gè)framework這樣實(shí)現(xiàn)的,所以有這樣的特性”,“那個(gè)是那樣的”.... blablabla... 使得文章只停留于表層,這樣寫并不是一個(gè)好的筆記。
我想記錄的是更本質(zhì)和更精髓的一些東西:我想更深入的探討一個(gè)分布式系統(tǒng)的設(shè)計(jì)是被什么現(xiàn)實(shí)和本質(zhì)問(wèn)題所逼迫的結(jié)果, 2個(gè)不同的設(shè)計(jì)到底是在哪個(gè)本質(zhì)問(wèn)題上分道揚(yáng)鑣才造成了系統(tǒng)設(shè)計(jì)如此的不同。
追尋問(wèn)題和解法的前因后果,讓未來(lái)的自己一眼掃過(guò)去之后能夠自然而然的回憶和理解出: "嗯,對(duì),在這種現(xiàn)實(shí)限制下,要達(dá)到這種效果就必須這么做", 我想從亂糟糟的觀察到的混亂表象中,抽象出流系統(tǒng)所面對(duì)的問(wèn)題和解決方案的本質(zhì),這就是本文的目的,和文章中除開引用文獻(xiàn)之外,作者所貢獻(xiàn)的一些自己的思考。
就作者學(xué)習(xí)流系統(tǒng)的感受來(lái)看, 流系統(tǒng)有2個(gè)難點(diǎn):
第一是end to end consistency,或者說(shuō)exactly once msg processing;
第二是event time based window操作。
本來(lái)作者想用一篇文章同時(shí)概括和比較這2點(diǎn),無(wú)奈第一點(diǎn)寫完, 文章已經(jīng)長(zhǎng)度爆炸。于是分開2篇,此為上篇, 著重于從分布式系統(tǒng)的本質(zhì)問(wèn)題出發(fā), 從最底層的各種"不可能", 和它們的解(比如:consensus協(xié)議)開始, 一層一層的遞進(jìn)到高層的流系統(tǒng)中, 如何實(shí)現(xiàn)容錯(cuò)場(chǎng)景下的end to end consistency,或者說(shuō)exactly once msg processing。
目錄
流系統(tǒng)的具體對(duì)比在“9.流系統(tǒng)的EOMP”這一節(jié), 前邊都是準(zhǔn)備知識(shí)... -_-
1、一些術(shù)語(yǔ)
2、圣光(廣告)不會(huì)告訴你的事
3、幾個(gè)事實(shí)
4、Liveness和Safety的取舍
5、絕望中的曙光
6、Zombie Fencing
7、三節(jié)點(diǎn)間的EOMP
8、加入節(jié)點(diǎn)狀態(tài)的三節(jié)點(diǎn)間的EOMP
9、流系統(tǒng)的EOMP
10、異步增量checkpointing
11、系統(tǒng)內(nèi)與系統(tǒng)外
12、Latency, 冪等和non-deterministic
13、REFERENCE
一些術(shù)語(yǔ)
1、端到端一致性end to end Consistency
一致性其實(shí)就是業(yè)務(wù)正確性, 在不同的業(yè)務(wù)場(chǎng)景有不同的意思, 在"流系統(tǒng)中間件"這個(gè)業(yè)務(wù)領(lǐng)域, 端到端的一致性就代表Exact once msg processing, 一個(gè)消息只被處理一次,造成一次效果。
注意: 這里的"一個(gè)消息"代表"邏輯上的一個(gè)", 即application對(duì)中間件的期待就是把此消息作為一個(gè)來(lái)處理, 而不是指消息本身的值相等。比如要求計(jì)數(shù)+1的一個(gè)消息, 消息本身的內(nèi)容可能一模一樣, 但是application發(fā)來(lái)2次相同消息的"本意"就是要計(jì)數(shù)兩次, 那么中間件就應(yīng)該處理兩次, 如果application由于超時(shí)重發(fā)了本意只想讓中間件處理一次的+1操作, 那么中間件就應(yīng)該處理一次。
中間件怎么能區(qū)分application的"本意"來(lái)決策到底處理一次還是多次, 是end to end consistency的關(guān)鍵。
2、EOMP
由于Exactly once msg processing太經(jīng)常出現(xiàn), 我們用EOMP來(lái)代替簡(jiǎn)寫一下。
3、容錯(cuò)failure tolerance
為了方便討論,后邊談到failure, 我們指的都是crash failure, 你可以想象是任何可以造成“把機(jī)器砸了然后任何本地狀態(tài)丟失(比如硬盤損壞)一樣效果的情況出現(xiàn)"。
在今天的虛擬云時(shí)代,這其實(shí)很常見,比如container或者虛擬機(jī)被resource manager突然kill掉回收了, 那么即使物理機(jī)其實(shí)沒有問(wèn)題, 你的application的邏輯節(jié)點(diǎn)也是被完全銷毀的樣子。
容錯(cuò)在end to end Consistency的語(yǔ)義下,是指在機(jī)器掛了,網(wǎng)絡(luò)鏈接斷開...等情況下,系統(tǒng)的運(yùn)算結(jié)果和沒有任何failure發(fā)生時(shí)是一摸一樣的。
3、Effective once msg processing(應(yīng)該翻成有效一次性處理?)
后邊我們可以看到, 保證字面上的Exact once msg processing(即整個(gè)系統(tǒng)在物理意義上真的只對(duì)消息處理一次), 這在需要考慮容錯(cuò)的情況下是不可能做到的。
Effective once msg processing是一個(gè)更恰當(dāng)?shù)男稳?#xff0c;而所有號(hào)稱可以做到EOMP的系統(tǒng),其實(shí)都只是能做到Effective once msg processing。即:中間件, 或者說(shuō)流處理framework可能在failure發(fā)生的情況下處理了多次同一個(gè)消息,但是最終的系統(tǒng)計(jì)算結(jié)果和沒有任何failure時(shí), 一個(gè)消息真的只處理了一次時(shí)計(jì)算的結(jié)果相等。這和冪等息息相關(guān)。
4、冪等Idempotent
一個(gè)相同的操作, 無(wú)論重復(fù)多少次, 造成的效果都和只操作一次相等。比如更新一個(gè)keyValue, 無(wú)論你update多少次, 只要key和value不變,那么效果是一樣的。再比如更新計(jì)數(shù)器處理一次消息就計(jì)數(shù)器+1, 這個(gè)操作本身不冪等, 同一個(gè)消息被中間件重"發(fā)+收"兩次就會(huì)造成計(jì)數(shù)器統(tǒng)計(jì)兩次。
而如果我們的消息有id, 那么更新計(jì)數(shù)器的邏輯修改為, 把處理過(guò)的消息的id全記錄起來(lái), 接到消息先查重, 然后才更新計(jì)數(shù)器, 那么這個(gè)"更新計(jì)數(shù)器的邏輯"就變成冪等操作了。
把本不冪等的操作轉(zhuǎn)化為冪等操作是end to end consistency的關(guān)鍵之一。
5、確定性計(jì)算deterministic
和冪等有些類似, 不過(guò)是針對(duì)一個(gè)計(jì)算,相同的input必得到相同的output, 則是一個(gè)確定性(deterministic)。
比如從一個(gè)msg里計(jì)算出一個(gè)key和一個(gè)value, 如果對(duì)同一個(gè)消息運(yùn)算無(wú)數(shù)次得到的key和value都相同, 那么這個(gè)計(jì)算就是deterministic的;而如果key里加上一個(gè)當(dāng)前的時(shí)鐘的字符串表示, 那么這個(gè)計(jì)算就不是確定性的, 因?yàn)槿绻匦掠?jì)算一次這個(gè)msg, 得到的是完全不同的key。
注意1: 非確定性計(jì)算一般會(huì)導(dǎo)致不冪等的操作, 比如我們?nèi)绻焉线吚永锏膋eyvalue存在數(shù)據(jù)庫(kù)里, 重復(fù)處理多少次同一個(gè)msg, 我們就會(huì)重復(fù)的插入多少條數(shù)據(jù)(因?yàn)閗ey里的時(shí)間戳字符串不同)。
注意2: 非確定性計(jì)算并非必然導(dǎo)致不冪等的操作,比如這個(gè)時(shí)間戳沒有添加在key里而是添加在value里, 且key總是相同的, 那么這個(gè)計(jì)算還是"非確定性"計(jì)算。但是當(dāng)我們存數(shù)據(jù)的時(shí)候先查重才存keyvalue, 那么無(wú)論我們重復(fù)處理多少次同一個(gè)msg, 我們也只會(huì)成功存入第一個(gè)keyValue, 之后的keyValue都會(huì)被過(guò)濾掉。
支持非確定業(yè)務(wù)計(jì)算的同時(shí), 還能在容錯(cuò)的情況下達(dá)成端到端一致性, 是流系統(tǒng)的大難題, 甚至我們今天會(huì)提到的幾個(gè)state of art的流系統(tǒng)都未必完全支持。(好吧Spark說(shuō)的就是你)
圣光(廣告)不會(huì)告訴你的事
分布式系統(tǒng)最tricky的問(wèn)題就是, 問(wèn)題看起來(lái)很普通很簡(jiǎn)單。一些問(wèn)題總是看起來(lái)有簡(jiǎn)單直接的解法,而一個(gè)"簡(jiǎn)單解"被人查出問(wèn)題時(shí),也總是看起來(lái)可以很簡(jiǎn)單的就可以把這個(gè)挑出的edge case很簡(jiǎn)單的解決掉。
然而我們會(huì)立刻發(fā)現(xiàn)解決這個(gè)edge case而引入的新步驟會(huì)引發(fā)新的問(wèn)題... 如此循環(huán), 直到"簡(jiǎn)單"疊加到"無(wú)法解決的復(fù)雜"。
由于人們對(duì)這些問(wèn)題的"預(yù)期是簡(jiǎn)單的", 所以很多書, online doc, 都大大簡(jiǎn)化了對(duì)問(wèn)題的描述和對(duì)問(wèn)題的分析。最普遍的是對(duì)failure recovery的介紹, 一般只會(huì)簡(jiǎn)單的寫"failure發(fā)生時(shí), 系統(tǒng)會(huì)怎么recovery", 但是完全不提怎么檢測(cè)failure和“根本不可能完美檢測(cè)到failure”這個(gè)分布式系統(tǒng)的基本事實(shí), 從而給了讀者“failure可以完美檢測(cè)”的錯(cuò)覺。
這是因?yàn)橐粊?lái)說(shuō)清楚各種edge case會(huì)大大增加文檔的復(fù)雜性, 另外一點(diǎn)是寫了讀者可能也看不明白, 還有就是廣告效應(yīng), 比如真正字面意義的exactly once msg processing是不存在的, 但是所有其他做到effective once msg processing的系統(tǒng)都說(shuō)自己可以支持exactly once, 那自己也得打這個(gè)廣告不是。
還有就是語(yǔ)焉不詳, 比如某stream系統(tǒng)說(shuō)自己可以實(shí)現(xiàn)exactly once msg delivery, 別看delivery和processing好像差不多, 這里邊的用詞藝術(shù)就有意思了,delivery是指消息只在stream里出現(xiàn)一次, 但是在stream里只出現(xiàn)一次的消息卻無(wú)法保證只被consume一次確根本不提。
再比如某serverless產(chǎn)品處理某stream的消息, 描述是保證舊的消息沒有處理之前不會(huì)處理新消息, 你會(huì)想, 簡(jiǎn)單描述成保證消息按順序處理不是一樣么? 其實(shí)差大了去了, 前者并沒有屏蔽掉舊消息突然replay, 覆蓋掉新消息的處理結(jié)果的edge case。而這個(gè)事實(shí)甚至顛覆了很多使用這個(gè)服務(wù)的Sr. SDE的對(duì)其的認(rèn)知。
沒有理解分布式系統(tǒng)的幾個(gè)簡(jiǎn)單的本質(zhì)問(wèn)題之前, 你讀文檔的理解很有可能和文檔真正精準(zhǔn)定義的事實(shí)不符。且讀者對(duì)“系統(tǒng)保證”的理解, 往往會(huì)由于文檔"藝術(shù)"定義的誤導(dǎo), 而過(guò)多的假設(shè)系統(tǒng)保證的"強(qiáng)", 直到被坑了去尋根問(wèn)底, 才會(huì)收到"你誤讀了文檔的哪里的詳細(xì)解釋"。這是分布式系統(tǒng)"最難的地方在最普通的地方"的直接結(jié)果之一。
個(gè)人認(rèn)為最好的辦法就是去理解分布式系統(tǒng)軟件算法所能達(dá)到的上限=>關(guān)于各種impossibility的結(jié)論的論文,然后去學(xué)習(xí)克服他們的方法的論文。
這樣, 我們才能從各種簡(jiǎn)化了的 tutorials里, 從API中, 從各種云服務(wù), 框架的廣告詞背后, 發(fā)現(xiàn)“圣光不會(huì)告訴你的事", 和"這個(gè)世界的真相";(從廣告和online doc天花亂墜的描述中看到分布式系統(tǒng)設(shè)計(jì)真正的取舍, 這是區(qū)分API調(diào)包俠和分布式系統(tǒng)專家的分水嶺之一)。
而不是“簡(jiǎn)單的信了它們的邪”。而下邊,就是學(xué)習(xí)分布式系統(tǒng),你所需要了解的最重要事實(shí)中, 和end to end consistency相關(guān)的幾個(gè)。
幾個(gè)事實(shí)
1、不存在完美的failure detector
很多關(guān)于分布式系統(tǒng)的書上都會(huì)說(shuō),當(dāng)failure發(fā)生時(shí)系統(tǒng)應(yīng)該怎么做來(lái)容錯(cuò), 就好像可以準(zhǔn)確的檢測(cè)到failure一樣。然而事實(shí)是, 在目前互聯(lián)網(wǎng)的物理實(shí)現(xiàn)上(share nothing architecture, 只靠網(wǎng)絡(luò)互聯(lián),不直接共享其他比如內(nèi)存物理硬盤等),我們無(wú)法準(zhǔn)確的檢測(cè)到failure。
簡(jiǎn)單來(lái)說(shuō),就是當(dāng)我們發(fā)現(xiàn)一個(gè)node無(wú)反應(yīng)的時(shí)候,比如ping它,給它發(fā)消息、request、查詢,都沒有反應(yīng),我們無(wú)法知道這到底是對(duì)方已經(jīng)停止工作了,還是只是處理的很慢而已。
無(wú)法制造完美的failure detector,即使在今天也是分布式系統(tǒng)的基礎(chǔ)事實(shí)。本文無(wú)意在基礎(chǔ)事實(shí)上多費(fèi)唇舌, 無(wú)法接受此事實(shí)者可以去翻相關(guān)論文。╮( ̄▽ ̄"")╭
Essentially, the impossibility results for Consensus and Atomic Broadcast stem from the inherent difficulty of determining whether a process has actually crashed or is>The fundamental reason why Consensus cannot be solved in completely asynchronous systems is the fact that, in such systems, it is impossible to reliably distinguish a process that has crashed from>A fundamental problem in distributed systems is that network partitions (split brain scenarios) and machine crashes are indistinguishable for the observer, i.e. a node can observe that there is a problem with another node, but it cannot tell if it has crashed and will never be available again or if there is a network issue that might or might not heal again after a while. Temporary and permanent failures are indistinguishable because decisions must be made in finite time, and there always exists a temporary failure that lasts longer than the time limit for the decision…
A third type of problem is if a process is unresponsive, e.g. because of overload, CPU starvation or long garbage collection pauses. This is also indistinguishable from network partitions and crashes. The only signal we have for decision is “no reply in given time for heartbeats” and this means that phenomena causing delays or lost heartbeats are indistinguishable from each other and must be handled in the same way.[29]
2、不存在完美的failure detector, 所導(dǎo)致的幾個(gè)顛覆你認(rèn)知的問(wèn)題
1)分布式共識(shí)問(wèn)題"Consensus"在"不存在完美的Failure Detector的情況下"不可解, 這又叫做FLP impossibility[36]。
可以說(shuō)是上世紀(jì),奠定分布式系統(tǒng)研究基石方向的一篇論文。即: 在理論上, 在分布式環(huán)境里(更準(zhǔn)確說(shuō)應(yīng)該是異步環(huán)境里), 在最多可能出現(xiàn)一個(gè)crash failure的強(qiáng)假設(shè)下, 不存在任何一個(gè)算法可以保證系統(tǒng)里的所有的"正常"節(jié)點(diǎn)對(duì)某一信息有共識(shí)。
對(duì)于"共識(shí)"你可以理解為一個(gè)數(shù)據(jù)一摸一樣的備份在多個(gè)節(jié)點(diǎn)上。(那么paxos, raft這種consensus協(xié)議是怎么回事呢? 稍后會(huì)解釋)
2)在分布式環(huán)境, 連分配只增序列號(hào)這件事都很難(即不同的進(jìn)程去向一個(gè)系統(tǒng)申請(qǐng)序列號(hào), 從0開始不斷增加, 保證process得到的序列號(hào)不能重復(fù))。
因?yàn)楸举|(zhì)上這是一個(gè)consensus問(wèn)題, 后邊可以看到, 能夠分配高可用性的global序列號(hào)(epoch id), 是解決zombie leader/master/processor的問(wèn)題時(shí)的一大助力。
3)在保證liveness的情況下(即檢測(cè)到失敗就在另外的機(jī)器重啟邏輯節(jié)點(diǎn)), 無(wú)法保證系統(tǒng)中的Singleton角色“在同一時(shí)間點(diǎn)”只有一個(gè)。
比如在有l(wèi)eader概念的分布式系統(tǒng)里, 要求任意時(shí)間點(diǎn)只有一個(gè)leader做決定, 比如HBase需要只能有一個(gè)Region Server負(fù)責(zé)某region的寫操作; 再比如kafka或者Kinesis[22]里需要只能有一個(gè)partition processor接受一個(gè)stream partition的信息并且采取行動(dòng)。
而事實(shí)是, 任何云服務(wù)和現(xiàn)有實(shí)現(xiàn), 都無(wú)法在物理上保證“在同一時(shí)間點(diǎn)”, 真的只有一個(gè)這樣的邏輯角色存在于機(jī)群中; 這就牽涉到一個(gè)概念=> Zombie Process。
4)Zombie process
由于沒有完美的failure detector, 所以即使幾率再低, 只要時(shí)間夠長(zhǎng), 需要failure detection的用例夠多, 系統(tǒng)不可避免會(huì)錯(cuò)誤的判斷把一個(gè)并沒有真正crash掉的process當(dāng)作死掉了。
而如果系統(tǒng)需要保持高可用性,需要在檢測(cè)到crash的時(shí)候,在新的機(jī)器上啟動(dòng)此process繼續(xù)處理,那么當(dāng)failure detector出錯(cuò),則會(huì)發(fā)生新老process共同工作的問(wèn)題,此時(shí),這個(gè)老的process就是zombie process。
嚴(yán)重注意,在分布式系統(tǒng)里,我們需要單一責(zé)任的一個(gè)節(jié)點(diǎn)/processor/role來(lái)做決策或者處理信息時(shí),我們要么不保護(hù)系統(tǒng)的高可用性(機(jī)器掛了就停止服務(wù)),要么解決zombie process會(huì)帶來(lái)的問(wèn)題。
高可用性的系統(tǒng)中, zombie無(wú)法消除。這關(guān)系到分布式系統(tǒng)設(shè)計(jì)里的一個(gè)核心問(wèn)題:liveness和safety的取舍。
Liveness和Safety的取舍
1、在缺乏完美的failure detector的情況下, 對(duì)方遲遲不回信息(ping它也不回), 不發(fā)heartbeat, 那么本機(jī)只有2個(gè)選擇:?
認(rèn)為對(duì)方還沒有crash, 持續(xù)等待;
認(rèn)為其crash掉了, 進(jìn)行failover處理。
選擇1傷害系統(tǒng)的liveness, 因?yàn)槿绻麑?duì)方真的掛了,我們會(huì)無(wú)限等待下去, 系統(tǒng)或者計(jì)算就無(wú)法進(jìn)行下去。選擇2傷害系統(tǒng)的safety, 因?yàn)槿绻麑?duì)方其實(shí)沒有crash, 那我們就需要處理可能出現(xiàn)的重發(fā)去重, 或者zombie問(wèn)題, 即系統(tǒng)的邏輯節(jié)點(diǎn)的“角色唯一性“就會(huì)被破壞掉了。
越好的liveness要求越快的響應(yīng)速度, 而“100%的safety“的意義, 則因系統(tǒng)的具體功能的不同而不同, 但一般都要求系統(tǒng)做決定要小心謹(jǐn)慎, 不能放過(guò)一個(gè)edge case, 窮盡所有必要的檢查來(lái)保證"系統(tǒng)不允許出現(xiàn)的行為絕對(duì)不會(huì)發(fā)生"。在consensus的語(yǔ)義下來(lái)說(shuō), safety就是絕對(duì)不能向外發(fā)出不一致的決定(比如向A說(shuō)決定是X, 后來(lái)向B說(shuō)決定是Y)。
可以看到, 系統(tǒng)的edge case越多, safety越難保證, 而edge cases的全集只是可能發(fā)生的情況的集合, 而某一次運(yùn)行只會(huì)發(fā)生一種情況(且大概率是正常情況)。
如果系統(tǒng)不檢查最難分辨最耗時(shí)的幾種小概率發(fā)生的edge case, 那么系統(tǒng)大概率(甚至極大概率)也可以完美運(yùn)轉(zhuǎn)毫無(wú)問(wèn)題幾個(gè)月, 運(yùn)氣好甚至幾年。這樣降低了系統(tǒng)的safety(不再是100%), 但是提高了系統(tǒng)的響應(yīng)速度(由于是概率上會(huì)出問(wèn)題, 所以即使降低了safety保證, 也不是說(shuō)就一定會(huì)出問(wèn)題, 只是你把系統(tǒng)的正確性交給了運(yùn)氣和命運(yùn))。
而如果系統(tǒng)保證檢查所有的edge case, 但是系統(tǒng)99.9999%的概率都不會(huì)進(jìn)入一些edge cases, 那么這些檢查就會(huì)阻礙正常情況的運(yùn)算速度。Liveness和Safety, 這是分布式系統(tǒng)設(shè)計(jì)的最基本取舍之一。
而FLP則干脆說(shuō): 在分布式consensus這個(gè)問(wèn)題里, 如果你想要獲得100%的系統(tǒng)safety, 那么你絕對(duì)無(wú)法保證系統(tǒng)liveness, 即:系統(tǒng)總是存在活鎖的可能性, 算法設(shè)計(jì)只能減小這個(gè)可能性, 而無(wú)法絕對(duì)消除它。
2、更多的safety VS. liveness 取舍的例子:
Kubernetes StatefulSet, 簡(jiǎn)單說(shuō)是可以給容器(pod/container)指定一個(gè)名字的, 且保證全cluster總是只有一個(gè)容器可以有這個(gè)名字, 這樣application就可以通過(guò)這個(gè)保證來(lái)指定機(jī)群中的邏輯角色, 且用這個(gè)邏輯容器中保存一些狀態(tài)。(一般的replicaSet會(huì)load balance連接或請(qǐng)求到背后不同的節(jié)點(diǎn), 你的一個(gè)請(qǐng)求要求在server本地存一些狀態(tài), 下一個(gè)請(qǐng)求未必還會(huì)到同一個(gè)server)
When a stateful pod instance dies (or the node it’s running on fails), the pod instance needs to be resurrected on another node, but the new instance needs to get the same name, network identity, and state as the one it’s replacing. This is what happens when the pods are managed through a StatefulSet. [37]
Kubernetes StatefulSet在liveness和safety里選擇了safety, 當(dāng)statefulSet所在的的物理節(jié)點(diǎn)"掛了"之后, kubernetes默認(rèn)不會(huì)重啟這個(gè)pod到其他節(jié)點(diǎn)去, 因?yàn)樗鼰o(wú)法確定這個(gè)物理節(jié)點(diǎn)到底死沒死, 為了保證safety它選擇放棄了liveness, 即系統(tǒng)無(wú)法自愈, StatefulSet提供所提供的服務(wù)不可用, 直到靠人干預(yù)來(lái)解決問(wèn)題。
([38] P305: 10.5. Understanding how StatefulSets deal with node failures)?
Node fail cause daemon of Kubelet could not tell state of pod on the node….StatefulSet guarantees that there will never be two pods running with the same identity and storage...
Akka Cluster也做了相同的選擇, 在cluster membership管理中,有一個(gè)auto-downing的配置, 如果你打開它, 那么cluster就會(huì)完全相信Akka的failure detection而自動(dòng)把unreachable的機(jī)器從cluster中刪去, 這意味著一些在這個(gè)unreachable節(jié)點(diǎn)上的Actor會(huì)自動(dòng)在其他節(jié)點(diǎn)重啟。
Akka Cluster的文檔中, auto-downing是強(qiáng)烈不推薦使用的[38], 這是由于Akka Cluster提供的很多feature要求角色的絕對(duì)單一性, 比如singleton role這個(gè)功能, 在保證“cluster里只有這一個(gè)節(jié)點(diǎn)扮演這個(gè)actor"(safety), 和保證"cluster里總要有一個(gè)節(jié)點(diǎn)扮演這個(gè)actor"(liveness) 中, 選擇了safety, 即保證at most one actor存在于cluster中, 一旦次actor的節(jié)點(diǎn)變成unreachable(比如機(jī)器真的掛了), 那么Akka也無(wú)能為力, 只能傻等這個(gè)節(jié)點(diǎn)回來(lái)或者人來(lái)干預(yù)決策:
The default setting in Akka Cluster is to not remove unreachable nodes automatically and the recommendation is that the decision of what to do should be taken by a human operator or an external monitoring system. [29]
一個(gè)商用的Akka拓展(Akka Split Brain Resolver)提供了一些智能點(diǎn)的解決方案(基于quorum), 有興趣的同學(xué)可以看引用文檔[29]。
This is a feature of the Lightbend Reactive Platform. that is exclusively available for Lightbend customers.[29]
3、為什么Kubernetes和Akka不能同時(shí)保證safety和liveness呢?
這是因?yàn)檫@兩個(gè)作為比較底層的平臺(tái), 他們需要對(duì)上層提供非常大的自由性, 而不能限制上層的活動(dòng)。比如kubernetes沒有規(guī)定用戶不能在pod上跑某種程序, Akka也沒有規(guī)定用戶不能寫某種actor的code。
這樣, 在不限制自己處理能力的同時(shí)要保證任何行為都看起來(lái)exactly happen once(因?yàn)檎Z(yǔ)義上singleton節(jié)點(diǎn)只有一個(gè), 那么就不能讓用戶寫的任意單線程程序出現(xiàn)多節(jié)點(diǎn)平行執(zhí)行的外部效果), 而這對(duì)中間件來(lái)說(shuō)是不可能的, 這就引出了另外一篇論文: end to end argument[27], 作者已經(jīng)寫過(guò)一篇文章詳細(xì)介紹end to end argument(阿萊克西斯:End to End Argument(可能是最重要的系統(tǒng)設(shè)計(jì)論文)), 這里不在贅述。
后邊我們可以看到Flink, Spark等流系統(tǒng)為了保證exactly once msg processing需要怎樣和end to end argument 搏斗。
4、可以同時(shí)保證safety和liveness么?
取決于具體情況下對(duì)safety和liveness的具體要求, 在流處理的情況下, 至少本文提到的4種流系統(tǒng)都給出了自己的解。請(qǐng)耐心往下閱讀。
絕望中的曙光
1、可解也不可解的分布式consensus
由于異步環(huán)境下, 釘死了我們不可能有一個(gè)完美不犯錯(cuò)的failure detector。這篇著名的論文Unreliable Failure Detectors for Reliable Distributed Systems[30] 詳細(xì)描述了即使我們用一個(gè)不準(zhǔn)確的failure detector, 也可以解決consensus的方法。
但是它并沒有推翻FLP impossibility的結(jié)論:Consensus還是并非絕對(duì)可解。但是, 如果我們對(duì)需要consensus的計(jì)算加一個(gè)限制,則Consensus可解。
這個(gè)限制是: 計(jì)算和通訊只需要在"安全時(shí)間"內(nèi)完成即可, 對(duì)[30]提供的算法來(lái)講, 提供consensus的系統(tǒng)需要在這段時(shí)間內(nèi)"正確識(shí)別crash"即可,也就是說(shuō)(1)識(shí)別出真正掛掉的node, 和(2)不要懷疑正確的node。
怎么理解呢, 這兩個(gè)看似對(duì)立的概念:?
(1)consensus的有解(比如paxos協(xié)議)是對(duì)的;
(2)consensus的無(wú)解證明:FLP impossibility也是對(duì)的。
要準(zhǔn)確且簡(jiǎn)單的解釋為什么它們都是對(duì)的有點(diǎn)難, 推薦還是看論文。但是用比喻來(lái)解釋的話, 根據(jù)[30], Consensus算法可以看作這樣一個(gè)東西, 當(dāng)系統(tǒng)出現(xiàn)crash, failure detector判斷錯(cuò)誤,或者網(wǎng)絡(luò)突然延遲...等時(shí)候, 算法會(huì)進(jìn)入某種循環(huán)而不會(huì)輕易作出決定。
for example, there is a crash that no process ever detects, and all correct processes are repeatedly (and forever) falsely suspected — the application may lose liveness but not safety [31]
而只要滿足必要的條件時(shí)(計(jì)算和通訊只需要在"安全時(shí)間"內(nèi)完成), 系統(tǒng)則可以跳出循環(huán)讓機(jī)群達(dá)成一致[30,31]。
(1) There is a time after which every process that crashes is always suspected by some correct process.?
(2) There is a time after which some correct process is never suspected by any correct process.?
The two properties of <>W0 state that eventually something must hold forever; this may appear too strong a requirement to implement in practice. However, when solving a problem that “terminates,” such as Consensus, it is not really required that the properties hold forever, but merely that they hold for a sufficiently long time, that is, long enough for the algorithm that uses the failure detector to achieve its goal.
而FLP impossibility則可以理解為挑刺兒的說(shuō), 那這個(gè)條件永遠(yuǎn)無(wú)法出現(xiàn)呢? 你的算法就活鎖了呀(丟失liveness)。
幸運(yùn)的是, 在現(xiàn)實(shí)世界, 我們總可以對(duì)消息傳遞和處理來(lái)估計(jì)一個(gè)上限, 你可以理解為,只要消息處理總是在這個(gè)上限之內(nèi)完成,那么consensus總是可以實(shí)現(xiàn), 而消息處理的時(shí)間即使偶爾超過(guò)了這個(gè)上限, 我們的consensus協(xié)議也會(huì)進(jìn)入安全循環(huán)自我保護(hù), 從而不會(huì)破壞系統(tǒng)的safety, 而系統(tǒng)總是可以再次回歸平穩(wěn)(處理時(shí)間回歸上限之內(nèi))。
而FLP則是像說(shuō): 你無(wú)法證明系統(tǒng)總是可以回歸平穩(wěn) (確實(shí)無(wú)法證明, 因?yàn)镕LP的前提是異步模型, 而我們的真實(shí)世界更像是介于異步和同步模型之間的半同步模型, 我們只能說(shuō)極大概率系統(tǒng)可以"回歸平穩(wěn)", 而無(wú)法證明它的絕對(duì)保證; =>可以絕對(duì)保證"上限"的模型一般稱為同步模型)。
其實(shí)用paxos來(lái)模擬出FLP的活鎖的例子也很簡(jiǎn)單, 你把節(jié)點(diǎn)間對(duì)leader的heartbeat timeout時(shí)間設(shè)為0.001ms, 那么所有的節(jié)點(diǎn)都會(huì)忙著說(shuō)服別的其他節(jié)點(diǎn)自己才是leader(因?yàn)樘痰谋;顣r(shí)間, 除了自己, 節(jié)點(diǎn)總是會(huì)認(rèn)為其他的任意節(jié)點(diǎn)是leader時(shí), leader死了), 那么系統(tǒng)就會(huì)進(jìn)入活鎖, 永遠(yuǎn)無(wú)法前進(jìn)達(dá)成cluster內(nèi)的consensus, 系統(tǒng)喪失liveness。
Zombie Fencing
即使consensus問(wèn)題解決了, zombie節(jié)點(diǎn)也還是大問(wèn)題, kubernetes和Akka可以選擇避開zombie, 損失liveness。
然而對(duì)于絕大多數(shù)分布式系統(tǒng)來(lái)說(shuō), 是必須直面zombie節(jié)點(diǎn)這個(gè)問(wèn)題的,比如各種分布式系統(tǒng)的master節(jié)點(diǎn), 如果master掛了整個(gè)系統(tǒng)不在另外的機(jī)器重啟master,整個(gè)系統(tǒng)就可能變?yōu)椴豢捎谩?/p>
再比如kafka和Kinesis的單一partition只能有一個(gè)consumer, 如果這個(gè)msg consumer掛了不自動(dòng)重啟, 對(duì)消息的處理就會(huì)完全停止。
zombie是最容易被忽視的問(wèn)題, 比如, 即使我們有了paxos, raft, zookeeper這種consensus工具可以幫我們做leader election, 也不要以為你的系統(tǒng)中不會(huì)同時(shí)有2個(gè)leader做決策了。
這是因?yàn)橄纫淮膌eader可能突然失去任何對(duì)外通信,或者cpu資源被其他進(jìn)程吃光, 或者各種edge case影響, 使得其他節(jié)點(diǎn)無(wú)法和其通信, 新的leader被選出, 而老的leader其實(shí)還沒死, 如果老的leader在失去cpu之前的最后一件事是去寫只有l(wèi)eader才能寫的數(shù)據(jù)庫(kù), 那么當(dāng)它突然獲得cpu時(shí)間且網(wǎng)絡(luò)恢復(fù)正常, 那么這個(gè)以為自己還是leader的zombie leader就會(huì)出乎意料的去寫數(shù)據(jù)庫(kù)。
這曾經(jīng)是HBase的一個(gè)重大bug[39, Leader Election and External Resources P105]。
Apache HBase, one of the early adopters of ZooKeeper, ran into this problem in the field. HBase has region servers that manage regions of a database table. The data is stored on a distributed file system, HDFS, and a region server has exclusive access to the files that correspond to its region. HBase ensures that only one region server is active at a time for a particular region by using leader election through ZooKeeper...?
The region server is written in Java and has a large memory footprint. When available memory starts getting low, Java starts periodically running garbage collection to find memory no longer in use and free it for future allocations. Unfortunately, when collecting lots of memory, a long stop-the-world garbage collection cycle will occasionally run, pausing the process for extended periods of time. The HBase community found that sometimes this stop-the-world time period would be tens of seconds, which would cause ZooKeeper to consider the region server as dead. When the garbage collection finished and the region server continued processing, sometimes the first thing it would do would be to make changes to the distributed file system. This would end up corrupting data being managed by the new region server that had replaced the region server that had been given up for dead.
(解釋不動(dòng)了, 大家看英文吧...)
其實(shí)對(duì)付zombie已經(jīng)是分布式系統(tǒng)的共識(shí)了,也有很多標(biāo)準(zhǔn)的解法,以至于各個(gè)論文都不會(huì)太仔細(xì)的去描述, 這里簡(jiǎn)單介紹幾種方法:
zombie fencing設(shè)計(jì)的關(guān)鍵點(diǎn)在于如何阻止已經(jīng)“成為zombie的自己”搞亂正常的“下一代的自己”的狀態(tài)。畢竟無(wú)論是zombie還是新的要取代可能死掉的上一代的"下一代", 大家跑的都是相同的代碼邏輯,也就是說(shuō)這同一段代碼, zombie來(lái)跑就"不能過(guò):"(比如不能對(duì)系統(tǒng)的狀態(tài)造成影響), 但是"下一代"來(lái)跑, 就可以正常工作。這一般需要滿足以下幾點(diǎn):
1)如何正確區(qū)分“正常的下一代”(由于懷疑當(dāng)前的節(jié)點(diǎn)已經(jīng)死掉了,所以重新創(chuàng)建和啟動(dòng)的新一代邏輯節(jié)點(diǎn))和“zombie”(懷疑錯(cuò)誤,當(dāng)前節(jié)點(diǎn)并沒有死掉,但是新一代節(jié)點(diǎn)已經(jīng)創(chuàng)建并啟動(dòng),當(dāng)前節(jié)點(diǎn)成為大家都以為死掉但是還活著的zombie)一般需要一個(gè)多機(jī)復(fù)制且穩(wěn)定自增的epoch number來(lái)確定新老邏輯節(jié)點(diǎn)。
這個(gè)epoch number要在分布式環(huán)境中穩(wěn)定自增,一般只能通過(guò)consensus協(xié)議來(lái)實(shí)現(xiàn)。否則要分配新一代epoch number時(shí),由于管理epoch number的機(jī)群的failover造成分配了一個(gè)老的epoch number給新啟動(dòng)的“下一代”,那么zombie反而會(huì)有更大的epoch number,這就會(huì)造成整個(gè)系統(tǒng)的狀態(tài)混亂。
怎樣的混亂在介紹完zombie fencing之后就顯而易見了(因?yàn)樗衅渌?jié)點(diǎn)都以為zombie死掉了, 把所有的最新操作和狀態(tài)發(fā)給新節(jié)點(diǎn),但是新節(jié)點(diǎn)卻有一個(gè)比zombie還小的epoch number, 從而被zombie fence掉, 而不是自己可以fence zombie)
2)會(huì)被zombie影響的系統(tǒng)需要特殊設(shè)計(jì)使得:當(dāng)“新一代”注冊(cè)后就拒絕“老一代的任何請(qǐng)求”,特別是寫入請(qǐng)求。也就是具體的利用epoch number的zombie fencing的實(shí)現(xiàn); 一般需要具體情況具體分析。
如果被影響的系統(tǒng)是自己的一個(gè)microservice,那么可以隨意設(shè)計(jì)協(xié)議來(lái)驗(yàn)證一個(gè)請(qǐng)求所攜帶的epoch number是不是最新的。而當(dāng)這個(gè)被影響的系統(tǒng)是一個(gè)外部系統(tǒng), 比如是業(yè)務(wù)系統(tǒng)需要用到的一個(gè)數(shù)據(jù)庫(kù),由于你沒法改數(shù)據(jù)庫(kù)的代碼和數(shù)據(jù)庫(kù)client與server之間的協(xié)議, 那么就要利數(shù)據(jù)庫(kù)所提供的功能或者說(shuō)它的協(xié)議來(lái)設(shè)計(jì)application層級(jí)的zombie fencing協(xié)議。
比如對(duì)提供test and set,compare and swap的kv數(shù)據(jù)庫(kù)來(lái)說(shuō),application設(shè)計(jì)自己的業(yè)務(wù)表時(shí),要求所有的表都必須有一個(gè)epoch字段,而所有的寫入都必須用test and set操作來(lái)檢測(cè)當(dāng)前epoch字段是否比要寫入的請(qǐng)求的epoch字段大或相等, 否則拒絕寫入。這樣, 只有"下一代"可以更改zombie寫入的數(shù)據(jù), 而zombie永遠(yuǎn)無(wú)法更改"下一代"插入或者更新過(guò)的數(shù)據(jù)。
另一方面,很多時(shí)候"下一代"需要讀取上一代的信息,繼承上一代的數(shù)據(jù),然后繼續(xù)上一代的工作。那么如果剛讀取完數(shù)據(jù),zombie就改變了數(shù)據(jù),那么"下一代"對(duì)于當(dāng)前系統(tǒng)狀態(tài)的判斷就會(huì)出差錯(cuò)。
一個(gè)general的解決的方法也很簡(jiǎn)單,要讀先寫,“下一代”開始工作前, 如果要先讀入數(shù)據(jù)了解“系統(tǒng)的當(dāng)前狀態(tài)”,必須先改變數(shù)據(jù)的epoch number為自己的epoch number(當(dāng)然要遵從只增更改原則test and set,如果發(fā)現(xiàn)當(dāng)前數(shù)據(jù)的epoch number已經(jīng)比自己的epoch number還大了,則說(shuō)明自己也已經(jīng)是zombie了,更新的"下下一代"已經(jīng)開始工作), 更改數(shù)據(jù)的epoch number成功之后,再讀入數(shù)據(jù),就可以保證比自己老的zombie絕對(duì)不可能再更改這個(gè)數(shù)據(jù),而現(xiàn)在讀入的數(shù)據(jù)可以體現(xiàn)系統(tǒng)的最新狀態(tài),從而完成對(duì)"老一代"數(shù)據(jù)的繼承。
而在增加epoch number之前所有被寫入的數(shù)據(jù)。這里即使是"新一代"啟動(dòng)之后, 讀取系統(tǒng)狀態(tài)之前被zombie寫入的數(shù)據(jù), 都可以看做老一代的合法數(shù)據(jù),只要被新一代開始工作前繼承讀入即可。我們所要避免的是"新一代" 所讀取的事實(shí)被zombie所更改。而不是在物理時(shí)間的意義上在"新一代"啟動(dòng)時(shí)就立刻阻止zombie的所有系統(tǒng)改動(dòng)。
zombie fencing的設(shè)計(jì)取決于分布式系統(tǒng)的具體情況,比如業(yè)務(wù)邏輯可能更改的數(shù)據(jù)范圍可能是幾百萬(wàn)幾千萬(wàn)的數(shù)據(jù)記錄,那么這也意味著zombie可能會(huì)修改的數(shù)據(jù)范圍非常大,那么要求"下一代"在開始工作前更改所有數(shù)據(jù)的epoch number就很不現(xiàn)實(shí)。
對(duì)于zombie的影響的耐受性也會(huì)影響zombie fencing的設(shè)計(jì),比如如果"下一代"只需要自己所接觸的有限數(shù)據(jù)在特定時(shí)刻之后不被zombie影響就能正確工作, 那么只要在"下一代"需要接觸特定數(shù)據(jù)時(shí)才更改此數(shù)據(jù)的epoch number來(lái)屏蔽zombie即可,那么即使業(yè)務(wù)可能修改的數(shù)據(jù)范圍很大,簡(jiǎn)單的更改數(shù)據(jù)的epoch number也還是可以接受的解決方案。
最糟糕的情況,如果"zombie"可能會(huì)插入新的數(shù)據(jù), 而"下一代"的正常工作需要不能有非法的新數(shù)據(jù)插入(比如下一代開始工作前先統(tǒng)計(jì)所有資源的個(gè)數(shù),然后開始基于這個(gè)事實(shí)和"只有自己才能更改資源"的假設(shè),作出各種決策。
而此時(shí)zombie突然插入了一條新資源記錄或者資源使用記錄...),如果"新一代"完全無(wú)法預(yù)測(cè)zombie會(huì)插入什么記錄,要阻止zombie隨意插入數(shù)據(jù),“新一代”就只能在利用predicate lock來(lái)防止新紀(jì)錄插入,且不說(shuō)很多數(shù)據(jù)庫(kù)根本不支持“鎖住不存在的數(shù)據(jù)”的predicate lock,就算支持此功能的數(shù)據(jù)庫(kù)也很有可能是使用表級(jí)鎖來(lái)鎖住整張表。
如果數(shù)據(jù)表設(shè)計(jì)成了需要共享給多個(gè)節(jié)點(diǎn)使用(比如一張資源表,不同的singleton worker負(fù)責(zé)維護(hù)不同的資源范圍),那么表級(jí)鎖就會(huì)妨礙其他worker的工作。
zombie fencing的設(shè)計(jì)在于如何引入簡(jiǎn)單的fencing點(diǎn), 對(duì)"新一代"暢通無(wú)阻,但是卻可以阻止zombie的異常活動(dòng), 如果協(xié)議設(shè)計(jì)使得"新一代"可以很容易制造這個(gè)fencing點(diǎn), 則"新一代"在啟動(dòng)或者需要的時(shí)候加入fencing點(diǎn)即可。
比如前邊說(shuō)的任何數(shù)據(jù)都要附帶一個(gè)epoch值,任何數(shù)據(jù)寫入都要用test and set來(lái)對(duì)比數(shù)據(jù)的當(dāng)前epoch值和請(qǐng)求的epoch值。
對(duì)于上文的隨機(jī)插入的業(yè)務(wù)需求, 可以要求業(yè)務(wù)邏輯插入任何數(shù)據(jù)之前,先在一個(gè)注冊(cè)表的屬于自己epoch的一行里記錄自己要寫的數(shù)據(jù)的id, 且在記錄的時(shí)候用test and set來(lái)檢測(cè)自己這一行數(shù)據(jù)的active值是否被更改為disable了。
這樣就相當(dāng)于引入了一個(gè)更簡(jiǎn)單的fencing點(diǎn),因?yàn)?#34;下一代"只要在注冊(cè)表里把所有上一代的記錄寫為disable, 就可以阻止zombie的未來(lái)任何活動(dòng),但是此時(shí)無(wú)法阻止zombie的最后一個(gè)注冊(cè)的數(shù)據(jù)插入, 但是"下一代"可以簡(jiǎn)單的讀注冊(cè)表得知這個(gè)數(shù)據(jù)的id, 從而對(duì)這個(gè)"最后的zombie寫入"采取相應(yīng)的策略(繼承,刪除, 甚至fencing, 比如這個(gè)id并不存在,那么無(wú)法得知是zombie真的在寫之前死了所以永遠(yuǎn)不會(huì)插入這個(gè)記錄了,還是zombie只是卡了, 那么"下一代"可以用自己的epoch和zombie注冊(cè)的id先插入一條記錄來(lái)占位,這樣無(wú)論zombie是真的死了還是卡了,都無(wú)法再寫入這個(gè)數(shù)據(jù)了)。這樣,我們就引入了一個(gè)連數(shù)據(jù)插入都可以fencing的fencing點(diǎn)。
Zombie fencing一般都是以上這些套路, 用consensus協(xié)議確定epoch number區(qū)分"下一代"還是zombie,這個(gè)epoch number一般也可以稱為fencing token, 通過(guò)把fencing token分發(fā)給需要拒絕zombie的service,把fencing token和需要保護(hù)的數(shù)據(jù)(防止被zombie修改)存在一起。
所以一般論文[7, 26]里只會(huì)簡(jiǎn)單的提到epoch或者sequencer等概念,基本都是zombie fencing的fencing token。
三節(jié)點(diǎn)間的EOMP
三點(diǎn)為 (上游/input提供端)=> (當(dāng)前計(jì)算節(jié)點(diǎn)/計(jì)算結(jié)果發(fā)送端) =>(下游/計(jì)算接收端)
如果我們考慮必須保證系統(tǒng)的高可用性,即檢測(cè)到任意process的failure,都會(huì)由一個(gè)(絕對(duì)不死)的高可用的JobManager或者M(jìn)asterNode,來(lái)重啟(可能在另外的node)這個(gè)process, 所以我們定義這種即使所在的host掛掉, 也會(huì)不斷重新在其他host上重啟的process為邏輯process。這時(shí)我們要面臨幾種可能造成inconsistent的情況:
"計(jì)算接受端"沒有成功ack"計(jì)算結(jié)果發(fā)送端"的消息,一般表現(xiàn)為發(fā)送端的等待ack 超時(shí)。根據(jù)之前的討論,接收端有可能把消息處理完畢了(ack的消息丟失,或者剛處理完消息還沒發(fā)ack就掛了…等情況),也可能沒有處理完畢(沒接到或剛接到消息就掛了…等情況)。
這種情況發(fā)送端可以重發(fā)信息, 而發(fā)送端是需要“上游input提供端”提供某種數(shù)據(jù)然后進(jìn)行某種計(jì)算后產(chǎn)生的這個(gè)消息/計(jì)算結(jié)果(設(shè)為outputA), 那么"計(jì)算結(jié)果發(fā)送端"有兩個(gè)策略:
策略1: 利用存儲(chǔ)計(jì)算結(jié)果來(lái)盡量避免重算
要實(shí)現(xiàn)上下游exact once processing,需要實(shí)現(xiàn)4個(gè)條件:
結(jié)果高可用;
下游去重;
上游可以replay;
記錄上游進(jìn)度。
1)要求結(jié)果高可用, 應(yīng)對(duì)timeout時(shí), “下游計(jì)算接收端”其實(shí)并沒有成功處理"計(jì)算結(jié)果發(fā)送端"的計(jì)算結(jié)果的情況(比如下游掛了), 這時(shí)"計(jì)算結(jié)果發(fā)送端"可以把計(jì)算的結(jié)果存儲(chǔ)在高可用的DataStore里(比如DynamoDB,Spanner…或者自己維護(hù)的多備份數(shù)據(jù)庫(kù))。
這樣超時(shí)只要重發(fā)這個(gè)計(jì)算結(jié)果即可, 自己甚至可以開始去做別的事情, 比如處理和計(jì)算下一個(gè)來(lái)自“上游/input提供端“的event, 而已經(jīng)被“下游計(jì)算接收端”ack的"計(jì)算結(jié)果"則可以清理,一般由異步的garbage collection清理掉。
注意, 由于存在存儲(chǔ)失敗的可能性, 或者剛計(jì)算完結(jié)果還沒來(lái)得及存儲(chǔ)就掛掉重啟的可能,我們無(wú)法真的保證避免重算,詳見:無(wú)法避免的重算的例子。
2)下游去重,應(yīng)對(duì)timeout時(shí)下游其實(shí)已經(jīng)處理完畢消息的情況
①一般的解決方案:當(dāng)邏輯接收端不固定, 比如發(fā)送端要根據(jù)計(jì)算出來(lái)的outputA的某key字段把不同的key發(fā)送給負(fù)責(zé)不同key range(也就是partition)的多個(gè)"下游計(jì)算接收端"。
只需要一個(gè)sequenceId就可以實(shí)現(xiàn)接收端的消息去重。接收端和發(fā)送端各維護(hù)一個(gè)partition level的sequenceId即可。這樣當(dāng)發(fā)送端收到當(dāng)前message sequenceId(假設(shè)為n)的Ack才發(fā)下一個(gè)sequenceId為n+1的信息,否則就無(wú)限重試。而接收端則根據(jù)收到的消息的id是不是已經(jīng)處理過(guò)的最大id+1來(lái)判斷是這是不是下一個(gè)message。
②Google MillWheel的特例:Google MillWheel做出了一個(gè)很有意思的選擇,發(fā)送端完全不維護(hù)sequenceId,而是為每一個(gè)發(fā)出的message生成一個(gè)全局唯一的id,下游則需要記住"所有"見過(guò)的id來(lái)去重,但是這樣會(huì)造成大量查詢io和存儲(chǔ)cost,所以需要另外的方案來(lái)解決性能和下游沒有無(wú)限的存儲(chǔ)所以"不可能記住所有id"的問(wèn)題。這個(gè)例子比較特殊,有興趣的同學(xué)可以查閱[4,7]
③要求觸發(fā)本次計(jì)算的“上游input提供端”可以replay input event,否則剛接到event還沒計(jì)算就掛掉重啟, 則event丟失。
無(wú)法避免的重算:任何時(shí)候計(jì)算沒完成,或者計(jì)算完成后但是成功儲(chǔ)存前(a.結(jié)果高可用的需求), 計(jì)算節(jié)點(diǎn)fail掉重啟, 我們都需要replay上次計(jì)算過(guò)的input event,所以由于計(jì)算結(jié)果都還沒存成功,所以從物理上講, 此時(shí)我們還是重算了的; 所以即使我們采用把計(jì)算結(jié)果記錄下來(lái)的策略, 我們無(wú)法從物理意義上真正避免重算, 我們避免的是有多個(gè)"重復(fù)的"成功計(jì)算結(jié)果提交給下游。
而當(dāng)計(jì)算不是deterministic的, 這多個(gè)“重復(fù)的”計(jì)算結(jié)果可能是不同的值發(fā)送給不同的下游((比如按照計(jì)算結(jié)果的key發(fā)送給下游不同的partition)。那么下游就會(huì)處理同一個(gè)event所產(chǎn)生的本應(yīng)只有一個(gè)的計(jì)算結(jié)果兩次,且由于非確定性計(jì)算的原因,這兩個(gè)計(jì)算結(jié)果不一樣。這就會(huì)造成event不是EOMP的問(wèn)題。(不僅在物理上計(jì)算了2次, 在效果上也影響了2次下游的計(jì)算, 打破的effective process once的要求)
④要求記錄event處理的進(jìn)度, 并保證儲(chǔ)存計(jì)算結(jié)果不出現(xiàn)重復(fù)。記錄event處理的進(jìn)度, 使得trigger本次計(jì)算的"event"可以被屏蔽(比如, ack“上游input提供端”, 告知其input event處理完畢, 可以發(fā)下一個(gè)了), 來(lái)避免計(jì)算的re-trigger; 這要求以下策略2選一:
記錄event處理的進(jìn)度, 和把計(jì)算結(jié)果存在高可用存儲(chǔ)里的操作是一個(gè)原子操作, 要么一起成功, 要么一起失敗; 這種策略可以保證當(dāng)計(jì)算結(jié)果儲(chǔ)存下來(lái), 此計(jì)算不會(huì)replay了;
或者存儲(chǔ)計(jì)算結(jié)果是一個(gè)冪等操作,那么可以先存計(jì)算結(jié)果,再記錄event處理進(jìn)度,一旦計(jì)算計(jì)算結(jié)果成功但是記錄event處理進(jìn)度失敗,重新計(jì)算上游的同一個(gè)event并儲(chǔ)存計(jì)算結(jié)果也不會(huì)引起問(wèn)題。
否則要么計(jì)算沒存event就被屏蔽掉了, 要么多次計(jì)算結(jié)果存儲(chǔ)在DataStore里造成下游的重復(fù)信息。注意, 此時(shí)下游是無(wú)法分辨這是重復(fù)信息的, 因?yàn)檫@是datastore里的"2條的記錄", 將會(huì)獲得不同的message id。
冪等和end2end argument: 所以實(shí)現(xiàn)原子操作就不需要冪等了么? 是也不是, 在業(yè)務(wù)層是的, 比如要實(shí)現(xiàn)業(yè)務(wù)層的冪等,我們可以在存計(jì)算結(jié)果到datastore里的時(shí)候把一個(gè)與觸發(fā)本次計(jì)算的event的唯一id記錄在一起,這樣我們每次存的時(shí)候就可以使用樂觀鎖的方式test-and-set, 來(lái)保證如果這個(gè)id在數(shù)據(jù)庫(kù)里沒有才插入。(取決于業(yè)務(wù),我們也可以用這個(gè)id當(dāng)主key來(lái),那么即使多次寫入同樣的內(nèi)容也沒關(guān)系=>要求計(jì)算是deterministic的)?
如果我們保證觸發(fā)計(jì)算的event的"屏蔽"和計(jì)算結(jié)果的儲(chǔ)存是一個(gè)原子操作,那么我們就不需要上邊這種復(fù)雜的存儲(chǔ)策略,因?yàn)橐坏┯?jì)算結(jié)果存儲(chǔ)成功,觸發(fā)計(jì)算的event必定被"屏蔽"掉了, 而如果沒存儲(chǔ)成功, 則event一定會(huì)replay來(lái)重試。
然而在傳輸層卻不是的,比如儲(chǔ)存數(shù)據(jù)庫(kù)的tcp有可能丟包重發(fā),依靠tcp的傳輸層id自動(dòng)去重,實(shí)現(xiàn)tcp的冪等。
策略2: 完全依賴重算。
高可靠重發(fā)的問(wèn)題是,所有信息都必須先記錄在高可用性的DataStore里, 相對(duì)于重新計(jì)算,重發(fā)需要的網(wǎng)絡(luò)IO, 存儲(chǔ),狀態(tài)管理的cost是很高的。
而如果觸發(fā)計(jì)算的event可以replay的話(其實(shí)不管重算還是不重算,為了防止“剛接到event, 計(jì)算節(jié)點(diǎn)就掛掉的情況”, 我們都要求event可以replay), 我們就可以選擇重算然后重發(fā)來(lái)代替存儲(chǔ)計(jì)算結(jié)果的重發(fā);重算需要2個(gè)條件:
計(jì)算需要是 deterministic 的,用完全一樣的數(shù)據(jù),必須算出完全相同的結(jié)果,否則,當(dāng)計(jì)算結(jié)果所需要發(fā)送的邏輯下游是由計(jì)算結(jié)果所決定的情況下(比如按照計(jì)算結(jié)果的key發(fā)送給下游不同的partition) 那么non-deterministic的重算有可能把計(jì)算結(jié)果發(fā)給不同的下游,這樣如果重算發(fā)生時(shí),下游(假設(shè)是節(jié)點(diǎn)A)其實(shí)已經(jīng)成功處理完畢重算前上游發(fā)送的信息, 只是ACK丟失, 那么重算的結(jié)果卻發(fā)送給了另外一個(gè)(節(jié)點(diǎn)B), 那么就會(huì)造成一個(gè)event造成了2個(gè)下游effect的結(jié)果, 引起一個(gè)event造成2次下游影響的結(jié)果, 違反EOMP的原則;
重算之前, 狀態(tài)需要rollback到?jīng)]有計(jì)算之前, 否則會(huì)影響需要狀態(tài)的計(jì)算的結(jié)果正確性,如果狀態(tài)更新非冪等,本次計(jì)算所做的狀態(tài)更新也會(huì)更新多次;詳見"加入節(jié)點(diǎn)狀態(tài)的三節(jié)點(diǎn)間的EOMP"。
(在多節(jié)點(diǎn)流計(jì)算里,要求上游可以重發(fā),意味著上游把計(jì)算結(jié)果存下來(lái)了,或者上游可以重算,如果上游需要重算,那么上游需要上游的上游重發(fā),那么上游的上游可以用儲(chǔ)存的結(jié)果重發(fā)或者重算。。。以此類推)
(2種策略其實(shí)都有可能造成重算,也都對(duì)event replay有需求。為什么還要浪費(fèi)資源去存儲(chǔ)計(jì)算結(jié)果呢?這里邊的重要區(qū)別是,當(dāng)儲(chǔ)存結(jié)束時(shí),對(duì)觸發(fā)本次計(jì)算的上游event的依賴結(jié)束了,而不穩(wěn)定的下游不會(huì)造成額外的重算, 和對(duì)上游, 上游的上游....計(jì)算的"鏈?zhǔn)椒磻?yīng)", 詳見流的EOMP中的討論)
加入節(jié)點(diǎn)狀態(tài)的三節(jié)點(diǎn)間的EOMP
帶狀態(tài)的計(jì)算, 比如流計(jì)算的某中間節(jié)點(diǎn)需要統(tǒng)計(jì)總共都收到多少信息了, 每次從上游收到新信息, 都把自己統(tǒng)計(jì)的當(dāng)前歷史信息總數(shù)更新并發(fā)往下游節(jié)點(diǎn), 那么這個(gè)"系統(tǒng)的歷史信息"就是這個(gè)"統(tǒng)計(jì)消息總數(shù)"的邏輯節(jié)點(diǎn)的狀態(tài)。
由于狀態(tài)也需要高保活,所以它也一定需要儲(chǔ)存在遠(yuǎn)端dataStore里, 這樣儲(chǔ)存狀態(tài)的遠(yuǎn)端datastore就相當(dāng)于一個(gè)特殊的下游。不同點(diǎn)在于, 當(dāng)采用策略2:重算, 而不存儲(chǔ)中間計(jì)算結(jié)果的話, 重算時(shí)則需要datastore可以把它所記錄的狀態(tài)rollback到最初剛開始處理此event的那個(gè)點(diǎn)。
這里我們只能rollback, 而不能只是依靠?jī)绲葋?lái)保證“狀態(tài)的更新是exactly once”的原因是, 節(jié)點(diǎn)在處理任意消息時(shí)的狀態(tài)也和當(dāng)前信息的數(shù)據(jù)一樣是本次計(jì)算的input, 而更新后的狀態(tài)則是本次消息處理的output, 如果重算時(shí)不rollback節(jié)點(diǎn)的狀態(tài), 那么我們就會(huì)用一個(gè)被本消息"影響過(guò)"的狀態(tài)來(lái)進(jìn)行計(jì)算, 而這是會(huì)違反exactly once msg processing語(yǔ)義的。
比如節(jié)點(diǎn)的本地狀態(tài)是上次收到的信息的數(shù)據(jù)上記錄的時(shí)間戳, 節(jié)點(diǎn)的運(yùn)算是計(jì)算2個(gè)event數(shù)據(jù)之間的時(shí)間戳差距。假設(shè)eventA發(fā)生在時(shí)刻0, eventB發(fā)生在時(shí)刻10, 那么eventB引發(fā)的計(jì)算應(yīng)往下游發(fā)送10, 并把節(jié)點(diǎn)的本地狀態(tài)更新為10, 此時(shí)如果eventB的這個(gè)計(jì)算需要重算, 但是我們不rollback狀態(tài)10回到0的話, eventB重算所得的結(jié)果就會(huì)變成0。
注意: 由于state更新也是處理event的"下游", 那么計(jì)算過(guò)程中的所有狀態(tài)更新都可以算作“計(jì)算結(jié)果”的一部分, 所以當(dāng)我們需要儲(chǔ)存計(jì)算結(jié)果時(shí),則需要把:
狀態(tài)更新儲(chǔ)存回高可靠的statestore里;
記錄event處理進(jìn)度;
把計(jì)算結(jié)果存在高可用存儲(chǔ)里。
這3個(gè)操作作為一個(gè)原子操作(以后我們稱之為"原子完成"來(lái)省略篇幅); 而任何時(shí)候需要重算的話, 狀態(tài)必須恢復(fù)到處理event之前的樣子。
加入state,我們需要把(d. 要求記錄event處理的進(jìn)度, 并保證儲(chǔ)存計(jì)算結(jié)果不出現(xiàn)重復(fù), 更改為 (d+. 要求記錄event處理的進(jìn)度, 并保證儲(chǔ)存計(jì)算結(jié)果和state的更新不出現(xiàn)重復(fù)。
并加入要求(e. state需要在replay 上游event的時(shí)候rollback到處理event之前時(shí)的狀態(tài)。
這些要求稍有抽象,讓我們看一下流系統(tǒng)一般怎么達(dá)成這些要求。
流系統(tǒng)的EOMP
考慮一個(gè)多節(jié)點(diǎn)的流系統(tǒng),如果我們把上游所發(fā)來(lái)的計(jì)算結(jié)果當(dāng)成前邊所說(shuō)的“觸發(fā)計(jì)算的event”,而自己的發(fā)給下游的計(jì)算結(jié)果msg作為觸發(fā)下游計(jì)算的event。那么我們就可以用上邊的模型保證兩兩節(jié)點(diǎn)之間的exact once msg processing,從而最終實(shí)現(xiàn)端到端的exact once msg processing; 這就是Google MillWheel(DataFlow) 和Kafka Stream的解決方案。
他們都選擇把每個(gè)節(jié)點(diǎn)的計(jì)算結(jié)果儲(chǔ)存起來(lái),并保證即使non-deterministic的計(jì)算, 也只有一次的計(jì)算會(huì)起作用, 而不會(huì)出現(xiàn)(策略2-1中提到的non-deterministic造成的不一致)。他們的區(qū)別是:
如何實(shí)現(xiàn)state和;
如何實(shí)現(xiàn)接收端去重;
如何實(shí)現(xiàn)“原子完成”
1、Google MillWheel(DataFlow)
1)每個(gè)節(jié)點(diǎn)維護(hù)一個(gè)用來(lái)去重的"已處理msgId"集, 從上游收到新msg之后, 檢查去重 (b.下游去重)
2)開始計(jì)算, 所有的狀態(tài)更新寫在本地, 由于一個(gè)狀態(tài)只有一個(gè)更新者(本計(jì)算), 所以可以在本地維護(hù)一個(gè)狀態(tài)的view, 所有的更新只更新本地的view而暫時(shí)不commit到"remote的高可用DataStore", MillWheel用的BigTable。
3)計(jì)算完畢后, (1).所有的要發(fā)送的計(jì)算結(jié)果,(有一些可能是在計(jì)算過(guò)程中產(chǎn)生并要求發(fā)送的, 都會(huì)cache起來(lái)), (2)所有的state的所有更新, (3) 引發(fā)計(jì)算的msgId, 會(huì)用一個(gè)atomic write寫在BigTable里。(a.要求結(jié)果高可用, d+.要求記錄event處理的進(jìn)度, 并保證儲(chǔ)存計(jì)算結(jié)果和state的更新不出現(xiàn)重復(fù))
4)當(dāng)commit成功之后, ACK上游, 而由于上游也采用commit計(jì)算結(jié)果到BigTable里的策略,且只有當(dāng)自己(這里)發(fā)出的消息ACK之后, 才會(huì)允許 garbage collection回收計(jì)算結(jié)果占用的存儲(chǔ), 所以在收到ACK之前, 上游的計(jì)算結(jié)果, 也就是當(dāng)前計(jì)算所需要的msg, 都可以重發(fā),直至本節(jié)點(diǎn)計(jì)算成功且commit結(jié)果 (c. 要求觸發(fā)本次計(jì)算的event可以replay)
5)一旦計(jì)算過(guò)程中failure發(fā)生(比如機(jī)器掛了), 會(huì)在另外的host上重啟本process節(jié)點(diǎn),從BigTable恢復(fù)本地state和"用來(lái)去重的已處理msgId集", 由于上次計(jì)算的結(jié)果還沒有commit, 所以滿足(e. state需要在replay event的時(shí)候rollback到處理event之前時(shí)的狀態(tài))
5)新啟動(dòng)的運(yùn)算節(jié)點(diǎn)在load本地狀態(tài)之前先用自己的sequencer廢掉現(xiàn)存的sequencer, 這樣BigTable就可以block zombie計(jì)算節(jié)點(diǎn)的寫。
2、Kafka Stream
Kafka Stream是建立在kafka分布式隊(duì)列功能上個(gè)一個(gè)library, 所以在介紹kafka Stream之前, 我們先來(lái)講一下Kafka
3、簡(jiǎn)單介紹Kafka Topic
Kafka的topic可以看作一個(gè)partition的queue, 通過(guò)發(fā)給topic時(shí)指定partition(或者用一個(gè)partitioner 比如按key做hash來(lái)指定使用那個(gè)partition), 不同的key的消息可以發(fā)送到不同的partition, 相同key的message則可以保證發(fā)送到同一個(gè)partition去, topic里的信息可以靠一個(gè)確定的index來(lái)訪問(wèn), 就好像一個(gè)數(shù)據(jù)庫(kù)一樣,所以只要在data retention到期之前,consumer都可以用同一個(gè)index來(lái)訪問(wèn)之前已經(jīng)訪問(wèn)過(guò)的數(shù)據(jù)。
4、Kafka Transactional Messaging
前邊說(shuō)過(guò), Kafka Stream是建立在kafka分布式隊(duì)列功能上個(gè)一個(gè)library, 主要依靠kafka的Transactional Messaging來(lái)實(shí)現(xiàn)end2end exactly once msg processing。
Transactional Messaging是指用戶可以通過(guò)類似以下code來(lái)定義哪些對(duì)kafka topic的寫屬于一個(gè)transaction, 并進(jìn)一步保證tx的atomic和Isolation。
producer.initTransactions();
?try {?
? ? // called? right before sending any records?
? ? producer.beginTransaction();
? ? //sending some messages...
? ? // when done? sending, commit the transaction?
? ? producer.commitTransaction();
} catch? (Exception e) {
? ? ?producer.close();
} catch? (KafkaException e) {
? ? producer.abortTransaction();??
}?
Kafka transaction保證了, beginTransactions之后的, 所有往不同Kafka topic里發(fā)送的消息, 要么在commitTransaction之后才能被read-committed的consumer看到, 要么由于close或者failure而全部作廢, 從而不為read-committed的consumer所見。
而kafka stream通過(guò)用kafka本身的分布式queue的功能來(lái)實(shí)現(xiàn)了state和記錄處理event進(jìn)度的功能,因?yàn)?#xff1a;
所有的要發(fā)送的計(jì)算結(jié)果(由于可以允許計(jì)算發(fā)不同消息給多個(gè)下游,所以可能發(fā)給不同的topic和partition);
記錄input event stream的消費(fèi)進(jìn)度;
所有的state的所有更新。
這3點(diǎn), Kafka Stream都是用寫消息到kafka topic里實(shí)現(xiàn)的。
1)自不必說(shuō),本來(lái)就是往topic里寫數(shù)據(jù)。
2)其實(shí)是寫當(dāng)前consume position的topic;。(注意Kafka Stream的上下游消息傳遞考的是一個(gè)中間隱藏的Kafka topic, 上游往這個(gè)topic寫, 下游從這個(gè)topic讀, 上游不需要下游的ack,只要往topic里寫成功即可, 也不需要管下游已經(jīng)處理到那里了。
而下游則需要維護(hù)自己"處理到那里了"這個(gè)信息,儲(chǔ)存在consume position的topic, 這樣比如機(jī)器掛掉需要在另外的host上重啟計(jì)算節(jié)點(diǎn),則計(jì)算節(jié)點(diǎn)可以從記錄consume position的topic里讀出自己處理到那里然后從失敗的地方重洗開始。
3)其實(shí)是寫一個(gè)內(nèi)部隱藏的state的change log的topic,和一個(gè)本地key value表(也就是本計(jì)算節(jié)點(diǎn)的state)。failover的時(shí)候, 之前的"本地"表丟失沒關(guān)系, 可以沖change log里恢復(fù)出失敗前確定commit的所有state。
(1)(2)(3)的topic都只是普通的Kafka topic。只不過(guò)(2)(3)由Kafka Stream自動(dòng)創(chuàng)建和維護(hù)(一部分用來(lái)支持高層API的(1)也是自動(dòng)創(chuàng)建)
開始計(jì)算時(shí), 在從上游的topic里拿msg之前, Kafka Stream會(huì)啟動(dòng)一個(gè)tx, 然后開始才開始計(jì)算, 此時(shí)tx coordinator會(huì)分配一個(gè)新的epoch id給這個(gè)producer并且以后跟tx coordinator通訊都要附帶這個(gè)epochId;
Kafka Stream的計(jì)算節(jié)點(diǎn)的上游信息都來(lái)自kafka topic的分布式partition queue, 且只接收commit之后的record, 在queue里的record都有確定的某種sequenceId, 所以只要計(jì)算節(jié)點(diǎn)記錄好自己當(dāng)前處理的sequenceId, 處理完一個(gè)信息就更新自己的sequenceId到下一個(gè)record, 且commit到可靠dataStore里, 就絕對(duì)不會(huì)重復(fù)處理上游event, 而只要沒有commit這個(gè)位置則可以無(wú)數(shù)次replay當(dāng)前的record; (b.下游去重, c. 要求觸發(fā)本次計(jì)算的event可以replay);
在tx內(nèi)部,每從上游topic里讀一條信息就寫一條信息到記錄consume position的topic里, 每一個(gè)state的更改都會(huì)更新到本地的state(是一張表)里,且同時(shí)寫在隱藏的changelog里; 計(jì)算過(guò)程中需要往下游發(fā)信息則寫與下游聯(lián)系的topic;
計(jì)算結(jié)束后, commit本次的tx, 由Kafka Transactional Messaging來(lái)保證本次tx里發(fā)生的所有(1)往下游發(fā)的消息, (2) 記錄input event stream的消費(fèi)進(jìn)度,(3)所有的state的所有更新是一個(gè)原子操作, 由于結(jié)果都成功寫入kafka topic,所以達(dá)到計(jì)算結(jié)果的高可用性 (a.要求結(jié)果高可用, d+.要求記錄event處理的進(jìn)度, 并保證儲(chǔ)存計(jì)算結(jié)果和state的更新不出現(xiàn)重復(fù));
計(jì)算過(guò)程中出現(xiàn)failure(比如機(jī)器掛了), 那么當(dāng)計(jì)算重啟,會(huì)重新運(yùn)行initTransactions來(lái)注冊(cè)tx, 此時(shí)tx coordinator會(huì)分配一個(gè)新的epoch id給此producer, 并且從此以后拒絕老的epoch id的任何commit信息來(lái)防止zombie的計(jì)算節(jié)點(diǎn); tx coordinator同時(shí)roll back(如果上一個(gè)tx已經(jīng)在prepare_commit狀態(tài), 繼續(xù)完成transaction, 具體看下邊Transactional Messaging這個(gè)章節(jié)); 如果rollback,那么input的處理進(jìn)度, 狀態(tài)的更改和往下游發(fā)送的信息都會(huì)rollback, 那么計(jì)算可以重新開始,就好像沒有上次fail的失敗一樣; 如果上一個(gè)tx已經(jīng)prepare_commit, 那么完成所有信息的commit; 此時(shí)當(dāng)initTransactions返回,當(dāng)前計(jì)算會(huì)接著上一個(gè)tx完成的進(jìn)度繼續(xù)計(jì)算;(e. state需要在replay event的時(shí)候rollback到處理event之前時(shí)的狀態(tài))
Idempotent producer
冪等producer主要解決這么一個(gè)問(wèn)題: Kafka的消息producer, 也就是往Kafka發(fā)消息的client 如果不冪等, 那么因?yàn)镵afka的接受消息的broker和producer之間在什么是“重復(fù)信息”上沒有共識(shí)的話,則broker無(wú)法分辨兩個(gè)前后一模一樣的消息, 到底是producer的本意就是要發(fā)兩次,還是由于producer的重發(fā)(比如:producer在收到broker的"接受成功"的ack之前就掛了,所以不知道之前的消息有沒有成功被broker接收, 因此重啟后重發(fā)了此信息)。此時(shí)broker只能選擇接受消息,這就造成了同一個(gè)消息的多次接受。
同時(shí)我們也要解決zombie producer的問(wèn)題: 如果我們保證producer高可用, 重啟我們認(rèn)為fail掉的producer, 那么其實(shí)沒死的zombie producer的信息則會(huì)造成,重復(fù)且亂序的發(fā)布消息。(由于zombie的存在, 會(huì)有2個(gè)producer同時(shí)發(fā)布我們以為只有一個(gè)producer會(huì)按順序發(fā)布的消息,這樣就無(wú)法保證順序: 比如zombie在發(fā)送A, B, C...的時(shí)候, 新啟動(dòng)的producer也開始發(fā)送A, B, C... )
Kafka的解法:
10用一個(gè)producer指定的固定不變的transactional.id(非自增id,叫producerName可能更好)來(lái)識(shí)別可能會(huì)在不同機(jī)器上重啟的同一個(gè)logical producer; 相當(dāng)于給producer起了一個(gè)logical name。
2)注冊(cè)transaction.id來(lái)開始session, 而在session里此tx發(fā)來(lái)的消息都可以通過(guò)維護(hù)一個(gè)sequenceid來(lái)dedup。
3)非正常結(jié)束tx的話, 比如機(jī)器掛了, producer重啟, 那么就會(huì)再次注冊(cè)自己的transaction.id, 則標(biāo)志前一個(gè)session失效, 而所有屬于上一個(gè)session的信息全部作廢(具體看下一節(jié)Atomic and Isolation), 這樣就可以做到producer的zombie fencing
(Further, since each new instance of a producer is assigned a new, unique, PID, we can only guarantee idempotent production within a single producer session ---- Idempotent Producer Guarantees [26])
Atomic and Isolation
1)Producer Zombie fencing: 注冊(cè)transaction.id會(huì)申請(qǐng)高可靠epoch id, broker和tx coordinator可以依此fencing zombie的任何寫操作 (e.g. tx coordinator關(guān)閉tx);. Zombie fencing in https://www.confluent.io/blog/transactions-apache-kafka/
2)多個(gè)Tx coordinator跑在kafka broker里, 寫是按照tx.id hash給不同的Tx coordinator, 每一個(gè)tx coordintor負(fù)責(zé)subset的transactionlog的partition。
這樣保證同一個(gè)logic produce啟動(dòng)的tx必定連接同一個(gè)tx coordinator。tx coordinator保證所有的狀態(tài)都在的高可用高一致性的寫在tx log里。(且用queue zombie fencing來(lái)保護(hù)自己的狀態(tài)一致性, Discussion on Coordinator Fencing in [26]) (Discussion on Transaction-based Fencing, => 如果zombie不跟coordinator再聯(lián)系,那么可以一直跟broker發(fā)垃圾信息... P39in [26])
3)Producer注冊(cè)新的tx之后,在給任意topic的任意partition發(fā)消息之前,先跟tx coordinator注冊(cè)這個(gè)partition。
4)當(dāng)寫完畢,producer給tx coordinator發(fā)commit,tx coordinator執(zhí)行2PC,在transaction log里寫prepare_commit, 這樣就一定會(huì)commit了,因?yàn)閜roducer 通知commit就代表所有的寫已經(jīng)寫成功了, 這一步其實(shí)只是把決定記下來(lái)。
5)Tx coordinator聯(lián)系所有的注冊(cè)過(guò)的topic的partition,寫一個(gè)commit marker進(jìn)去。
6)當(dāng)所有的marker寫完,在transaction log里記錄commit complete。
7)注意:當(dāng)在第一步tx coordinator在發(fā)現(xiàn)新的重復(fù)transaction.id來(lái)注冊(cè)時(shí),會(huì)檢查有沒有相同的transaction.id下未關(guān)閉的tx,有的話發(fā)起rollback,先在transaction log里記下rollback的決定,然后聯(lián)系所有的注冊(cè)過(guò)的topic的partition, 寫入一個(gè)ABORT marker。
而如果此tx的狀態(tài)已經(jīng)時(shí)prepare_commit了,那么有可能tx coordinator在下邊第6步聯(lián)系所有partition來(lái)commit中間掛掉了,那么要接著完成這個(gè)commit過(guò)程;即roll forward而不是roll back。
8)Read_commit等級(jí)的consumer需要等待transaction有結(jié)果,consumer library讀到任何與Transactional Messaging相關(guān)的信息,就開始進(jìn)入cache階段,并不會(huì)運(yùn)行任何consumer端的計(jì)算,只有當(dāng)讀到commit mark,則把cache住的record依次交給consumer端的計(jì)算,而當(dāng)讀到ABORT mark,則把相關(guān)tx的record全部filter掉。注意: pending的tx會(huì)block所有Read_commit等級(jí)的consumer對(duì)topic的讀。
在保證兩兩節(jié)點(diǎn)之間的EOMP來(lái)實(shí)現(xiàn)整個(gè)流的EOMP的模型里,如果我們某一個(gè)或多個(gè)節(jié)點(diǎn)的狀態(tài)和計(jì)算結(jié)果根本不記錄在高可用DataStore里,我們還是可以實(shí)現(xiàn)EOMP, 我們只需要(1)replay這個(gè)節(jié)點(diǎn)的上游來(lái)重算這個(gè)節(jié)點(diǎn)的狀態(tài)和發(fā)給下游的計(jì)算結(jié)果, (2)下游去重。
如果上游也沒計(jì)算結(jié)果記, 那么replay上游的上游即可, 如果上游的上游也沒記....一直追溯到記錄了計(jì)算結(jié)果的上游節(jié)點(diǎn)即可。
如果一直都沒有failure,那么比起Dataflow和Kafka Stream那種記錄所有計(jì)算結(jié)果的模型 我們少記錄一些額外的計(jì)算結(jié)果和狀態(tài)就減少了很多系統(tǒng)負(fù)載; 這就是重算與記錄計(jì)算結(jié)果模型的結(jié)合。
重算與記錄計(jì)算結(jié)果的結(jié)合
考慮 A,B,C, D 4個(gè)節(jié)點(diǎn), A的計(jì)算結(jié)果傳給B, 而B則把一部分計(jì)算結(jié)果給C一部分給D, 如果B沒有記錄自己的output, 則Cfail掉之后需要replay上游的input,這就需要B的一些重算來(lái)重新制造C所需要的input, 即使B的input(即A)記錄了所有的計(jì)算結(jié)果, 我們還需要"恰巧可以產(chǎn)生這些歷史計(jì)算結(jié)果的"B的歷史狀態(tài),才能重算出C所需要的input。(所以B必須保存歷史狀態(tài)或者用某種方法重建自己的歷史狀態(tài)才能保證可以重算C所需要的input)
如果C的狀態(tài)也丟失了, 那么對(duì)上游的負(fù)擔(dān)則更重些, B需要重新計(jì)算來(lái)提供所有的歷史計(jì)算結(jié)果(即C的所有歷史input)來(lái)讓C重建自己的歷史狀態(tài)。
可以看到, 任意一個(gè)節(jié)點(diǎn)的某狀態(tài)S(n+1)是:
上一個(gè)歷史狀態(tài)S(n), 和;
從歷史狀態(tài)S0建立開始所接收到的信息M(n)。
同時(shí)作為輸入而得到的輸出; 而這個(gè)過(guò)程中又會(huì)向下游發(fā)出一些計(jì)算結(jié)果O(n+1)。
所以M(n) + S(n) => S(n+1) + O(n+1), 當(dāng)下游crash重啟需要O(n+1)時(shí), 我們則有2種選擇:
1、記錄O(n+1);
2、記錄O(n+1)但是記住, O(n+1)是根據(jù)什么數(shù)據(jù)生成的。
1是記錄計(jì)算結(jié)果, 2是重算。兩者并用的好處在于, 1可以異步batch進(jìn)行而不需要節(jié)點(diǎn)必須等待O(n+1)記錄成功才往下游發(fā)送O(n+1)。而2保證了當(dāng)1還沒有完成時(shí), 系統(tǒng)也有足夠的信息可以重建O(n+1)。
這是一個(gè)鏈?zhǔn)椒磻?yīng), 當(dāng)重算需要M(n)和S(n)時(shí), 而如果M(n)并沒有存則需要上游重算M(n), 上游還沒存這些重算M(n)的信息則需要replay上游的上游來(lái)重算這些信息,這就是所謂的鏈?zhǔn)椒磻?yīng)...。最極端的情況是什么都沒存,那么需要從頭開始跑我們的stream程序。
可以看到, 如果沒有存中間計(jì)算結(jié)果或者狀態(tài), 那么當(dāng)這個(gè)數(shù)據(jù)被下游重算需要的時(shí)候, 需要我們重算這個(gè)數(shù)據(jù), 這就會(huì)產(chǎn)生對(duì)上游的計(jì)算結(jié)果或者狀態(tài)的需求, 這就要求我們?nèi)绻淮嫦逻@些數(shù)據(jù), 我們就需要記住計(jì)算這個(gè)數(shù)據(jù)的數(shù)據(jù)依賴圖, 所以要么把"中間"數(shù)據(jù)和狀態(tài)存起來(lái)待用, 要么記住他們的數(shù)據(jù)依賴圖。
而這些記錄的中間結(jié)果只有當(dāng)對(duì)其的所有依賴從計(jì)算圖中消失時(shí), 我們才可以垃圾回收/刪除這些數(shù)據(jù)(比如所有基于某狀態(tài)的計(jì)算結(jié)果都已經(jīng)存下來(lái)了, 那么這個(gè)狀態(tài)的數(shù)據(jù)就可以刪除, 再比如某計(jì)算結(jié)果所引發(fā)的下游計(jì)算結(jié)果和狀態(tài)都已經(jīng)存下來(lái)了, 那么此計(jì)算結(jié)果的數(shù)據(jù)就可以刪除了),從而不會(huì)造成儲(chǔ)存數(shù)據(jù)爆炸。
這, 也就是Spark Streaming的解法。
Spark
Spark有三種Stream...
(1)快要被deprecate掉的DStreaming [10, 14]
(2)新一代為了彌補(bǔ)和Flink之間差距的, 支持event time的Structural Streaming(可惜還是有很多不足, 具體的不同和哪里有不足, 要留到對(duì)比各個(gè)系統(tǒng)對(duì)event time和windows操作的支持的對(duì)比, 也就是下篇來(lái)詳細(xì)描述了) [12,13]
(3)實(shí)驗(yàn)中的Continuous Streaming(Spark Continuous Processing) [11, 20]
(3)還在實(shí)驗(yàn)狀態(tài), 基本上是把底層都改掉來(lái)使用了和Flink相同的Chamdy-Lamport算法[20], 但是貌似還有很多問(wèn)題需要解決所以目前不支持EOMP, 這里不多聊了。
根據(jù)Structural Streaming的論文[12], (2)和(1)使用了相似的方法來(lái)保證EOMP, 但是其實(shí)作者發(fā)現(xiàn)(2)比起(1)還是有一些性能上的改進(jìn)[21],但是總體原則還是和(1)類似的利用一個(gè)重算關(guān)系圖lineage來(lái)維護(hù)各個(gè)狀態(tài)計(jì)算結(jié)果的依賴關(guān)系, 通過(guò)異步的checkpoint來(lái)截?cái)鄉(xiāng)ineage也就是各個(gè)節(jié)點(diǎn)狀態(tài)和計(jì)算結(jié)果復(fù)雜的關(guān)系(比如一個(gè)數(shù)據(jù)如果已經(jīng)checkpoint了, 那么它所依賴的所有狀態(tài)和計(jì)算結(jié)果都可以在關(guān)系圖里刪去, 因?yàn)閞eplay如果依賴于這個(gè)數(shù)據(jù), 那么使用它的checkpoint即可, 而不需要知道這個(gè)數(shù)據(jù)是怎么算出來(lái)的, 如果還沒checkpoint成功, 則需要根據(jù)數(shù)據(jù)依賴圖來(lái)重算這個(gè)數(shù)據(jù)), 像這樣利用checkpoint, 就可以防止lineage無(wú)限增長(zhǎng)。
但是維護(hù)關(guān)系圖需要利用micro-batch來(lái)平衡"關(guān)系維護(hù)"造成的cost, 否則每一條信息的process都產(chǎn)生一個(gè)新狀態(tài)和新計(jì)算結(jié)果的話, 關(guān)系圖會(huì)爆炸式增長(zhǎng)(用micro-batch, 可能1000條信息會(huì)積累起來(lái)當(dāng)作"一個(gè)信息"發(fā)給下游, 只需要在關(guān)系圖里記錄一個(gè)batch-id即可, 而不是1000個(gè)msg id, 對(duì)與狀態(tài)來(lái)說(shuō)也是這樣,處理1000個(gè)msg之前的狀態(tài)分配一個(gè)id, 處理這1000個(gè)信息之后的狀態(tài)一個(gè)id, 而不需要記錄1000個(gè)狀態(tài)id, 同時(shí)他們之間的聯(lián)系線也從1000條降低為1條。這樣就大大減小了關(guān)系圖維護(hù)的負(fù)擔(dān)。
但這樣造成的結(jié)果是micro-batch會(huì)造成很高的端到端處理的latency, 因?yàn)閙icro-batch里的第一條信息要等待micro-batch里的最后一條信息來(lái)了之后才能一起傳給下游。
而這個(gè)等待是疊加的,當(dāng)stream的層數(shù)越深,每一層的micro-batch的第一條信息都需要等待最后一條信息被處理完畢,相比在每一層都毫不等待,micro-batch造成的額外latency就會(huì)疊加式的增高。
注意, Spark Structured Stream提供了一種continuous mode[11,12,13,20]來(lái)替代micro-batch,解決了latency的問(wèn)題,但是目前支持的operator很少,且不能做到exact once msg processing, 這里不多加討論了(不過(guò)將來(lái)有望做成和flink一樣的模式, 畢竟也用的Chandy-Lamport Distributed Snapshot algorithm) : Note that any time you switch to continuous mode, you will get at-least-once fault-tolerance guarantees.[13]
spark的micro-batch會(huì)造成嚴(yán)重的latency問(wèn)題, 而Dataflow和Kafka Stream的方案要求記錄每一個(gè)計(jì)算結(jié)果, 則會(huì)在大大增加系統(tǒng)負(fù)擔(dān)的同時(shí)也會(huì)有不小的latency附加。那么有沒有一種方法可以不記錄所有中間計(jì)算結(jié)果, 并且也不使用micro-batch呢?
我們來(lái)看看flink的藝術(shù);
Flink
如果我們不儲(chǔ)存流系統(tǒng)中間節(jié)點(diǎn)的計(jì)算結(jié)果在高可用DataStore里, 也不想維護(hù)復(fù)雜的數(shù)據(jù)依賴圖(需要micro-batch的根源), 那么當(dāng)一個(gè)節(jié)點(diǎn)fail掉需要replay上游的input的時(shí)候,上游就必定需要replay自己的上游,且自己的狀態(tài)要rollback到?jīng)]有接收這些要replay的消息之前的狀態(tài);對(duì)上游的上游就有相同的要求,那么最終所有節(jié)點(diǎn)的上游最終會(huì)歸向數(shù)據(jù)源節(jié)點(diǎn),并要求"重新replay"。總而言之2個(gè)要求:
數(shù)據(jù)源節(jié)點(diǎn)可以replay, 并產(chǎn)生層層的蝴蝶效應(yīng)的"每個(gè)節(jié)點(diǎn)對(duì)上游要求的replay";
所有的計(jì)算節(jié)點(diǎn)的狀態(tài),要恢復(fù)到?jīng)]有接收到"上游所replay的消息"之前的樣子(所以replay后可以回到現(xiàn)在的狀態(tài),且重新生成下游所需要的input, 即當(dāng)前節(jié)點(diǎn)在處理這些replay消息時(shí)產(chǎn)生的計(jì)算結(jié)果集)。
全局一致點(diǎn)和全局一致狀態(tài)集
為了方便討論,我們定義2里所提到的global的狀態(tài)為一個(gè)全局穩(wěn)定點(diǎn); 顯然,如果我們一條消息一條消息的處理,數(shù)據(jù)源節(jié)點(diǎn)等待直到所有流節(jié)點(diǎn)處理完這條消息所產(chǎn)生的蝴蝶效應(yīng)信息之后,才發(fā)出下一個(gè)消息B0,那么在消息B要發(fā)出但是沒發(fā)出之前,所有的節(jié)點(diǎn)的狀態(tài)就滿足我們對(duì)全局穩(wěn)定點(diǎn)的需要。
比如當(dāng)我們持續(xù)處理B1,B2,B3...B100, 這時(shí)一個(gè)節(jié)點(diǎn)fail掉了,那么我們只要流系統(tǒng)的所有節(jié)點(diǎn)rollback他們的狀態(tài)到發(fā)出B0前的"全局穩(wěn)定點(diǎn)", 整個(gè)系統(tǒng)的計(jì)算和狀態(tài)就會(huì)干凈的回道任何節(jié)點(diǎn)都不曾被任何B0-B100所影響的狀態(tài), 那么此時(shí)從數(shù)據(jù)源節(jié)點(diǎn)replay B0, B1, B2... B100 成功, 這些消息就"exactly once process"掉了。所以,我們找到了第一個(gè)不需要micro-batch, 也不需要記錄中間節(jié)點(diǎn)計(jì)算結(jié)果,就能實(shí)現(xiàn)EOMP的方法:
每n條信息, 或者每一段時(shí)間, 數(shù)據(jù)源節(jié)點(diǎn)(或者流系統(tǒng)的第一個(gè)入口節(jié)點(diǎn))停止向下游發(fā)送任何信息, 直到所有節(jié)點(diǎn)報(bào)告說(shuō)有關(guān)這條信息的所有派生信息(由于這條信息引起的第一個(gè)計(jì)算節(jié)點(diǎn)的計(jì)算結(jié)果會(huì)發(fā)送給它的下游, 下游的計(jì)算結(jié)果又會(huì)發(fā)送給它的下游...等等這些都是派生信息)都已經(jīng)處理完畢, 此時(shí)把所有節(jié)點(diǎn)的狀態(tài)checkpointing在高可用DataStore里, 建立一個(gè)全局穩(wěn)定狀態(tài)集(由流系統(tǒng)中每個(gè)計(jì)算節(jié)點(diǎn)各自的全局穩(wěn)定狀態(tài)所組成), 數(shù)據(jù)源才開始繼續(xù)發(fā)送信息...這樣, 任意的節(jié)點(diǎn)fail掉, 我們只要在別的機(jī)器上重啟這個(gè)計(jì)算節(jié)點(diǎn)并download之前checkpoint的狀態(tài),流系統(tǒng)的所有節(jié)點(diǎn)也rollback到上一個(gè)全局穩(wěn)定狀態(tài)即可, 由于數(shù)據(jù)源發(fā)送數(shù)據(jù)的進(jìn)度也屬于全局穩(wěn)定狀態(tài)集中的一員, 所以當(dāng)數(shù)據(jù)源rollback自己的狀態(tài),則可以開始replay 全局穩(wěn)定點(diǎn)checkpoint之后才發(fā)送的信息,而此時(shí)所有節(jié)點(diǎn)都已經(jīng)rollback到一個(gè)"從沒見過(guò)這些信息和它們的派生信息"的狀態(tài)了,整個(gè)系統(tǒng)就好像從來(lái)沒有見過(guò)這些信息一樣, 從而實(shí)現(xiàn)即使failure發(fā)生,我們的系統(tǒng)也可以實(shí)現(xiàn)EOMP。
更進(jìn)一步, 我們來(lái)看如何不停住數(shù)據(jù)源的信息接收,我們所需要處理的問(wèn)題。
1)任意時(shí)間點(diǎn)的全局狀態(tài),都不是全局穩(wěn)定點(diǎn): 如果所有節(jié)點(diǎn)都不等待后續(xù)節(jié)點(diǎn)有沒有處理完信息, 那么任意時(shí)間點(diǎn), 在流的中間節(jié)點(diǎn)建立全局穩(wěn)定狀態(tài)的時(shí)候 ,流上游的節(jié)點(diǎn)已經(jīng)開始處理新的信息, 它們的全局穩(wěn)定狀態(tài)早已被新的信息所影響了, 而下游可能還沒收到建立全局穩(wěn)定狀態(tài)所需要的信息。
2)隨意指定的全局穩(wěn)定狀態(tài)集可能根本不存在, 比如數(shù)據(jù)源連續(xù)給A和B發(fā)出x,y兩條信息, 而A和B則需要把計(jì)算結(jié)果都發(fā)送給C,如果我們想定義全局穩(wěn)定狀態(tài)為所有節(jié)點(diǎn)"處理完x相關(guān)的消息之后", 但是"處理完y相關(guān)的信息之前"的狀態(tài)。
那么考慮這樣一個(gè)運(yùn)行順序: A處理完x向C發(fā)出x-A, B處理完x, y后向C發(fā)出x-B, y-B, 然而由于網(wǎng)絡(luò)和處理速度的因素, C在還沒有收到x-A的情況下就處理完了x-B, y-B, 所以C的一個(gè)"干凈"的從未被y信息影響的狀態(tài),但包含了所有需要的x信息的穩(wěn)定狀態(tài), 在C的狀態(tài)變遷過(guò)程中是從來(lái)不存在的 (即, 處理完x-A和x-B,但是沒有處理y-B時(shí)的狀態(tài))。
問(wèn)題1意味著我們不能用物理時(shí)間來(lái)建立全局一致狀態(tài)集, 那么既然流的不同節(jié)點(diǎn)接收到數(shù)據(jù)源任意消息x的派生消息的時(shí)間不同, 那么只要我們能讓所有節(jié)點(diǎn)分清哪些是x的消息和派生消息, 哪些是x之后的消息和派生消息, 所有節(jié)點(diǎn)就可以在處理完x的派生消息之后把本地狀態(tài)復(fù)制一份儲(chǔ)存在高可用DataStore里, 作為全局一致狀態(tài)集的一員。
問(wèn)題2意味著即使允許計(jì)算節(jié)點(diǎn)連續(xù)處理input而不必等待所有下游建立好全局一致狀態(tài)才發(fā)下一個(gè)計(jì)算結(jié)果, 計(jì)算節(jié)點(diǎn)也不能盲目的不加考慮的處理上游信息, 我們要使得計(jì)算節(jié)點(diǎn)的狀態(tài)變遷過(guò)程中, 至少全局一致狀態(tài)是可以出現(xiàn)的。
Flink的解法就是由一個(gè)高可用的coordinator連續(xù)發(fā)出不同的stage barrier(比如先給所有src發(fā)1,然后1分鐘后發(fā)2,2分鐘后發(fā)3..... 如此增長(zhǎng)), 夾雜在發(fā)給數(shù)據(jù)源發(fā)出的數(shù)據(jù)流里, 所有的節(jié)點(diǎn)都必須忠實(shí)的轉(zhuǎn)發(fā)這個(gè)stage barrier, 這樣所有的節(jié)點(diǎn)的:
input都分為了接收到某barrier(設(shè)為barrier-a)之前的信息和收到barrier-a之后的信息,;
所有的發(fā)給下游的計(jì)算結(jié)果也分為自己發(fā)出barrier-a之前的信息和發(fā)出barrier-a之后的信息;
所有的狀態(tài)變遷也分為,用所有接收到barrier-a之前的信息, 所建立的狀態(tài), 和收到barrier-a之后被新的信息影響了的狀態(tài)。
那么如果所有節(jié)點(diǎn)都遵循2個(gè)原則:
只用"接收到barrier-a之前的所有信息", 來(lái)建立自己的本地狀態(tài),并備份在高可用DataStore里;
只使用"接收到barrier-a之前的所有信息"來(lái)計(jì)算結(jié)果并發(fā)送給下游之后, 才轉(zhuǎn)發(fā)barrier-a; 然后才開始處理"接收到barrier-a之后的信息"; 這樣就保證了自己在往下游發(fā)送barrier-a之前所發(fā)的所有計(jì)算結(jié)果, 都沒有被自己所收到的barrier-a之后的新消息所影響(自己發(fā)送的barrier-a之前的計(jì)算結(jié)果只和自己接收的barrier-a前的input集合相關(guān))。
而當(dāng)所有的節(jié)點(diǎn)都保證"自己發(fā)送的barrier-a之前的計(jì)算結(jié)果只和自己接收的barrier-a前的input集合相關(guān)", barrier-a就成了系統(tǒng)系統(tǒng)的分隔點(diǎn),而所有節(jié)點(diǎn)遵循原則-1所建立的本地狀態(tài)備份, 也絕對(duì)沒有被數(shù)據(jù)源發(fā)出的在barrier-a之后的信息和它們的派生信息所影響; 而這些所有本地狀態(tài)備份的全集,則組成了全局一致狀態(tài)集。
一個(gè)細(xì)節(jié), 當(dāng)一個(gè)節(jié)點(diǎn)只有一個(gè)input channel的時(shí)候, 只要按順序處理input信息即可; 而當(dāng)一個(gè)節(jié)點(diǎn)有多于一個(gè)input channel的時(shí)候, 一個(gè)input channel的barrier-a已經(jīng)接收到, 但是其他channel的barrier-a還沒有收到怎么辦呢?
從收到barrier-a的channel接收新的信息并處理可行么? 顯然不行, 這樣違反了原則-1, 因?yàn)?#34;barrier-a之前的信息全集"還沒有湊齊(其他channel的barrier-a還沒有收到), 此時(shí)如果處理了任何屬于barrier-a后的"新"信息, 我們就再也無(wú)法在狀態(tài)變遷中得到一個(gè)"干凈"不受barrier-a后的"新"信息所影響的狀態(tài)了, 這意味著我們必須block 這個(gè)已經(jīng)收到barrier-a的channel;
我們可以向下游轉(zhuǎn)發(fā)barrier-a么? 顯然也不行, 這樣違反了原則-2, 理由相同, 我們還沒有收到"barrier-a之前的信息全集", 而從其他channel收到barrier-a之前還收到其他信息的話, 它們所產(chǎn)生的計(jì)算結(jié)果也必須在轉(zhuǎn)發(fā)barrier-a之前發(fā)送。
由1,2就很清楚可以推理出flink的算法了:
收到任意input channel 的barrier-a之后, block此channel;
收到所有input channel的barrier-a之后, 把當(dāng)前狀態(tài)checkpoint并備份到高可用的DataStore里; (這里可以做到異步checkpoint并不會(huì)影響latency, 詳細(xì)介紹看后邊的異步checkpointing這一節(jié));
收到所有input channel的barrier-a之后, 并且處理完所有此前收到的信息并向下游發(fā)送計(jì)算結(jié)果完畢后, 向所有和自己相連的下游轉(zhuǎn)發(fā)barrier-a;
當(dāng)所有節(jié)點(diǎn)都備份完成,我們就得到了一個(gè)全局一致狀態(tài)集, 或者說(shuō)全局一致狀態(tài)快照; 系統(tǒng)的穩(wěn)定點(diǎn)就進(jìn)步到了barrier-a, 如果下一個(gè)barrier是barrier-b, 那么在得到barrier-b的全局一致狀態(tài)集之前, 如果系統(tǒng)出現(xiàn)failure, 我們就可以通過(guò)重啟所有計(jì)算節(jié)點(diǎn)的方式, 讓所有節(jié)點(diǎn)reload barrier-a所記錄的狀態(tài)集, 從而實(shí)現(xiàn)把所有節(jié)點(diǎn)的狀態(tài)rollback到"上一個(gè)全局一致"的狀態(tài), 使得流系統(tǒng)可以重置到好像根本沒有看到過(guò)任何barrier-a到barrier-b之間的信息的一樣, 然后重跑這段信息;
通過(guò)干凈的rollback了可能造成的重復(fù)處理的痕跡, 使得所有信息的效果都只發(fā)生了一次, 所以我們得到了一個(gè)端到端的EOMP系統(tǒng)。
異步checkpoint可以使得, checkpoint本身不會(huì)block流本身的計(jì)算,增量checkpoint避免了,每次一點(diǎn)小變動(dòng)都需要checkpoint全部的state,可以節(jié)省計(jì)算機(jī)資源(比如網(wǎng)絡(luò)壓力)
flink和spark這種需要checkpoint的系統(tǒng)都可以做到異步增量checkpoint, 且這個(gè)技術(shù)也很成熟了, 本文只選flink的方法[35]來(lái)簡(jiǎn)單說(shuō)明一下 , Spark的可以看[21]
1、Flink的異步增量checkpointing
Flink使用RocksDB 作為本地狀態(tài)儲(chǔ)存, RocksDB本質(zhì)上就是一個(gè)LSM tree, 對(duì)狀態(tài)的寫會(huì)寫在內(nèi)存的memtable, 一般是一個(gè)linked hashmap, 寫到一定大小就存到硬盤里變成sstable(sorted-string-table), 不再更改。
此后會(huì)開一個(gè)新的memtable來(lái)接受新的寫。這樣會(huì)按歷史時(shí)間來(lái)生成很多小文件, 讀的時(shí)候先讀memtable,如果里邊有想要的key對(duì)應(yīng)的value,必定是最新的,否則按歷史時(shí)間順來(lái)查sstable(sstable有自己的cache, 所以未必需要讀硬盤)。
對(duì)于flink來(lái)說(shuō), 當(dāng)需要checkpoint的時(shí)候, 只需要把當(dāng)時(shí)的memtable寫在硬盤里即可, 這是唯一一個(gè)需要block住當(dāng)前計(jì)算的操作, 此后也只需要把從上個(gè)checkpoint開始, 新生成的sstable異步發(fā)送到高可用的遠(yuǎn)程文件系統(tǒng)即可(比如S3, HDFS)。這樣就做到了異步(發(fā)送到高可用datastore是異步執(zhí)行的),和增量(只發(fā)送新增文件)。
注意, 由于太多的小文件的sstable會(huì)造成讀的性能問(wèn)題, 所以RocksDB需要異步的compact這些小文件到一個(gè)大文件, 對(duì)此flink也需要做出一些應(yīng)對(duì), 詳見[35], 例子給的非常清楚,這里不再贅述。
系統(tǒng)內(nèi)與系統(tǒng)外以上的討論都是關(guān)于中間件內(nèi)部如何實(shí)現(xiàn)EOMP, 但是由于end to end argument的影響, 中間件提供的保證再多, 沒有source的支持, 它也無(wú)法區(qū)分source(流系統(tǒng)的event來(lái)源)發(fā)來(lái)的2個(gè)內(nèi)容一樣的event, 到底是"同一個(gè)"信息的重發(fā), 還是"本意"就是想要中間件處理兩次的兩個(gè)"不同"event; 對(duì)sink(流系統(tǒng)計(jì)算結(jié)果的去處)來(lái)說(shuō),由于failure造成的重算,zombie的存在, 則需要sink能夠"融入"到流系統(tǒng)的EOMP體系中去。
對(duì)于source的要求基本就是重發(fā)和對(duì)消息提供能區(qū)分到底是不是一個(gè)event的eventId,一般就是Kafka那樣就OK, 比較簡(jiǎn)單就不多討論了; 這里著重聊一下sink; Sink主要有兩種手段來(lái)配合流系統(tǒng)中間件的EOMP, 冪等和2階段提交(2PC)
1、冪等Sink
最簡(jiǎn)單的來(lái)配合流系統(tǒng)EOMP的策略就是冪等, 由于是外部系統(tǒng), 所以重用我們的"兩節(jié)點(diǎn)EOMP模型"基本不可能, 因?yàn)榛静豢赡苡靡粋€(gè)tx來(lái)把要寫外部系統(tǒng)的操作和記錄已經(jīng)處理過(guò)這個(gè)操作用一個(gè)原子tx來(lái)commit, 這也是流系統(tǒng)為什么要支持2PC的原因。
由于冪等保證對(duì)同一個(gè)計(jì)算結(jié)果寫多次和寫一次一樣, 所以無(wú)論是什么流系統(tǒng), 無(wú)論系統(tǒng)是重算型, 還是記錄計(jì)算結(jié)果來(lái)避免重算型, 冪等的sink都可以很好的支持; 所以Dataflow/Spark/Kafka Stream都是靠?jī)绲鹊膕ink來(lái)實(shí)現(xiàn)EOMP。
冪等的問(wèn)題在于無(wú)法應(yīng)對(duì)需要重算, 且計(jì)算可以是non-deterministic的情況, 詳見: 后邊(Latency, 冪等和non-deterministic)一節(jié)的討論; 這也是Spark Streaming, 使用冪等sink的Flink無(wú)法支持non-deterministic計(jì)算的本質(zhì)原因。
相比之下, dataflow總是記錄計(jì)算結(jié)果來(lái)避免重算(即使重算也只會(huì)有一次重算的結(jié)果會(huì)影響下游), Kafka Stream支持tx可以保證只有一次計(jì)算結(jié)果可以被commit到Kafka Stream里, 如果sink也只讀committed上游kafka stream, 則可以保證即使計(jì)算是non-deterministic的, 也只會(huì)有唯一commit的計(jì)算結(jié)果被讀到(其他的計(jì)算結(jié)果沒有commit marker而被Kafka data comsume API忽略)從而影響sink的外部系統(tǒng)。
而Flink的2PC sink也做到了重算會(huì)直接導(dǎo)致sink的外部系統(tǒng)可以配合flink的global rollback, 所以只會(huì)有一次的計(jì)算結(jié)果被外部系統(tǒng)接受(commit)。
所以Spark Stream在4個(gè)流系統(tǒng)里, 是唯一一個(gè)完全無(wú)法支持non-deterministic計(jì)算的流系統(tǒng)。
2、Flink獨(dú)特的2PC Sink
2PC對(duì)很多熟悉數(shù)據(jù)庫(kù)的人來(lái)說(shuō)應(yīng)該是臭名昭著了, 這是很復(fù)雜和很容易造成問(wèn)題而需要極力避免的東西、但是時(shí)代在變化, 2PC在新時(shí)代也有了彌補(bǔ)自己?jiǎn)栴}的很多解法了,這里簡(jiǎn)單介紹一下。
2PC協(xié)議由一個(gè)coordinator,和很多參與2PC的異構(gòu)系統(tǒng)組成,發(fā)起2PC的時(shí)候 coodinator要求所有人pre-commit,這是2PC的第一個(gè)P(phase),如果所有tx參與者都可以pre-commit并告知coordinator,則coordinator告訴所有人commit,否則告訴所有人abort,這是2PC的第二個(gè)P(phrase)
2PC最大的問(wèn)題是它是一個(gè)blocking協(xié)議,blocking的點(diǎn)在于當(dāng)coordinator和某一個(gè)2PC的參與者A掛了,其他參與者無(wú)法作出任何決定,只能等待coordinator或者死掉的那個(gè)參與者A上線,因?yàn)檫@時(shí)所有其他參與者都無(wú)法判斷以下兩種情況到底那種發(fā)生了,從而無(wú)法決定到底是commit還是abort。
coodinator已經(jīng)收到了所有人的pre-commit并告知參與者A commit,A commit后就掛了;
A并不能pre-commit,但是coodinator在告訴所有人需要abort之前就掛了。
在情況1. 所有其他參與者都應(yīng)該commit,在情況2,所有其他參與者都應(yīng)該abort;由于無(wú)法辨別到底是情況1. 還是2. 所有其他參與者必須block等待,這對(duì)很多數(shù)據(jù)庫(kù)來(lái)說(shuō)意味著為此tx加的鎖都不能放掉,從而影響數(shù)據(jù)庫(kù)的其他不參與2PC的操作,甚至鎖死整個(gè)數(shù)據(jù)庫(kù)。而如果coordinator或者參與者A無(wú)法再上線或者狀態(tài)丟失,則需要非常復(fù)雜的人工操作來(lái)解決其他參與者應(yīng)該如何決策的問(wèn)題。
雖然2PC有各種問(wèn)題, 但是在consensus協(xié)議早已經(jīng)成功分布式系統(tǒng)的基石, 各種開源和標(biāo)準(zhǔn)實(shí)現(xiàn)可以被輕松獲得的今天, 用consensus協(xié)議來(lái)彌補(bǔ)2PC的問(wèn)題已經(jīng)成為一個(gè)"已經(jīng)解決的問(wèn)題", 如[25]4.2 The Paxos Commit Algorithm 中所說(shuō):
We could make that fault-tolerant by simply using a consensus algorithm to choose the committed/aborted decision, letting the TM be the client that proposes the consensus value…
解決2PC問(wèn)題的關(guān)鍵在于保持coordinator狀態(tài)的高可用性, 那么只要coordinator保證把commit或者abort的決定記錄在一個(gè)consensus cluster里即可,比如etcd或者zookeeper,這樣coordintor死了,重啟從consensus cluster里恢復(fù)狀態(tài)重新告知所有參與者到底應(yīng)該commit還是abort即可; 這也是為什么各種流行的分布式系統(tǒng)實(shí)現(xiàn)分布式tx都是用2PC的原因, 比如dynamoDB, Spanner, Flink, Kafka...
3、Flink的2PC Sink
2PC的第一個(gè)P的關(guān)鍵在于所有tx參與者在不知道其他參與者狀態(tài)的情況下,承諾未來(lái)一定可以前進(jìn)commit成功或者干凈的回退abort。當(dāng)前的tx參與者準(zhǔn)備好了,且同意commit,2PC的第二個(gè)P的關(guān)鍵點(diǎn)在于整體系統(tǒng)的”唯一決定”統(tǒng)一的推進(jìn)或者回退各個(gè)參與者的狀態(tài)。
而Flink的global state其實(shí)可以看做一個(gè)2PC,當(dāng)一個(gè)節(jié)點(diǎn)收到所有的上游的barrier-n時(shí),這個(gè)“契機(jī)”可以看做收到了coordinator的可不可以precommit的問(wèn)詢,而當(dāng)localstate已經(jīng)在remote 存好之后,當(dāng)前節(jié)點(diǎn)就可以告訴coordinator它準(zhǔn)備好了,這可以看做回復(fù)precommit(如果此節(jié)點(diǎn)在發(fā)給precommit)。
而當(dāng)所有的節(jié)點(diǎn)都通知coordinator“準(zhǔn)備好了”之后,coordinator就可以記錄下barrier-n的global state完整checkpoint的這個(gè)事實(shí),這相當(dāng)于一個(gè)不需要發(fā)給tx參與者的commit。
這是由于當(dāng)failover的時(shí)候,是由coordinator告訴所有節(jié)點(diǎn)應(yīng)該從哪個(gè)checkpoint點(diǎn)來(lái)恢復(fù)本地狀態(tài),所以各個(gè)節(jié)點(diǎn)的localstate到底是commit了還是rollback了,可以完全由“有沒有記錄下barrier-n的global state完整checkpoint成功”這個(gè)metadata推算出來(lái),所以也就不需要單獨(dú)給各個(gè)節(jié)點(diǎn)發(fā)commit/abort信息來(lái)讓各個(gè)節(jié)點(diǎn)commit或者abort了。
當(dāng)系統(tǒng)狀態(tài)只涉及到flink的內(nèi)部狀態(tài)時(shí)(flink提供的stateApi所提供的statestore), 如果一個(gè)某節(jié)點(diǎn)X在回復(fù)precommit之后掛了,coordinator還是可以選擇commit,因?yàn)榻M成global state的節(jié)點(diǎn)X的local state已經(jīng)完整的存儲(chǔ)在remote的datastore里了。
但是如果涉及到外部狀態(tài),比如sink需要把計(jì)算結(jié)果存儲(chǔ)到一個(gè)非flink控制的數(shù)據(jù)庫(kù)中去時(shí),flink的sink節(jié)點(diǎn)就相當(dāng)于這個(gè)外部數(shù)據(jù)庫(kù)的client,需要連接外部數(shù)據(jù)庫(kù)并把數(shù)據(jù)存入外部數(shù)據(jù)庫(kù);要使得外部數(shù)據(jù)庫(kù)的狀態(tài)和flink的狀態(tài)保持一致,則需要sink把外部數(shù)據(jù)庫(kù)的狀態(tài)引入到flink global state的2PC里,而coordinator在決定commit或者abort的時(shí)候,必須通知sink來(lái)執(zhí)行外部狀態(tài)的commit或者abort,因?yàn)閏oordinator是不知道外部狀態(tài)到底是什么,也無(wú)法簡(jiǎn)單的用通知sink從不同的globalstate點(diǎn)恢復(fù)來(lái)代替2PC的commit/abort通知。
同時(shí)sink收到barrier-n時(shí),sink要保證外部數(shù)據(jù)庫(kù)里與barrier-(n-1)到barrier-n之間信息相關(guān)的數(shù)據(jù)更改,處于一種“在任何情況下都一定可以commit成功,但是還沒有真的commit,所以外部數(shù)據(jù)庫(kù)的消費(fèi)者不可見這些狀態(tài),且可以rollback的,可進(jìn)可退的狀態(tài)”。[40]給出了如何用文件實(shí)現(xiàn)的一個(gè)例子;我這里給出一個(gè)如何使用支持transaction的數(shù)據(jù)庫(kù)的例子。
首先為了避免產(chǎn)生歧義, 我們定義:
1)flink-precommit ack為 barrier-n流到各個(gè)節(jié)點(diǎn)(包括sink), 各個(gè)節(jié)點(diǎn)完成local snapshot checkpoint后發(fā)給coordinator的ack, sink則是完成“某個(gè)操作”后發(fā)給coordinator的ack, 這個(gè)操作需要把外部系統(tǒng)(比如數(shù)據(jù)庫(kù))置于一種, 保證任何情況下都可以服從coordinator的最終決定的狀態(tài), 一個(gè)既可以commit(如果coordinator最終決定commit), 又可以rollback(如果coordinator決定abort)的狀態(tài), 且數(shù)據(jù)不為外部系統(tǒng)的consumer所見。
2)定義flink-commit為coordinator收到所有人的pre-commit ack后的的最終commit決定。
3)定義db-commit就是普通的外部數(shù)據(jù)庫(kù)的commit。
①當(dāng)程序開始,sink立刻開一個(gè)外部數(shù)據(jù)庫(kù)的transaction,當(dāng)sink收到上游的所有的barrier-1,則立刻db-commit當(dāng)前transaction然后回復(fù)coordinator flink-precommit成功(flink-precommit ack),因?yàn)榇藭r(shí)如果不db-commit,一旦回復(fù)coordinator flink-precommit之后,這個(gè)sink掛了,那么外部數(shù)據(jù)庫(kù)一般就會(huì)自動(dòng)rollback;此時(shí)就算sink在其他機(jī)器上重啟,我們也丟失了所有要最終flink-commit的數(shù)據(jù); 而如果這個(gè)sink的crash是發(fā)生在coordinator收到所有節(jié)點(diǎn)的flink-precommit ack并最終決定flink-commit之后, 所有其他節(jié)點(diǎn)(比如另外一個(gè)sink)的狀態(tài)可能都commit了(所以無(wú)法簡(jiǎn)單rollback); 而只有此sink的所有數(shù)據(jù)都無(wú)法恢復(fù), 這就破壞了global consistency。
②但是上邊我們?cè)趂link-precommit階段就db-commit了外部數(shù)據(jù)庫(kù)的transaction; 這時(shí)會(huì)有兩個(gè)問(wèn)題:?
第一, 我們暴露了只是應(yīng)該precommit的數(shù)據(jù)(這些數(shù)據(jù)不應(yīng)被數(shù)據(jù)庫(kù)的外部consumer所見);
第二, 如果有一個(gè)其他節(jié)點(diǎn)不同意commit而發(fā)給coordinator abort的決定, 那么coordinator則會(huì)決定abort, 所以我們的sink則需要服從rollback的決定, 但是我們已經(jīng)db-commit了的數(shù)據(jù), 而一般數(shù)據(jù)庫(kù)都不支持rollback已經(jīng)commit的數(shù)據(jù), 這就造成了問(wèn)題。
為了解決這兩個(gè)問(wèn)題, 這時(shí)我們需要設(shè)計(jì)一個(gè)和外部數(shù)據(jù)庫(kù)的數(shù)據(jù)消費(fèi)者的數(shù)據(jù)“屏蔽協(xié)議”。比如利用一個(gè)字段來(lái)表示當(dāng)前數(shù)據(jù)只是“precommit”,所有的外部數(shù)據(jù)庫(kù)的讀寫者都應(yīng)該忽略這些數(shù)據(jù)(而只有當(dāng)這個(gè)字段是committed才能讀寫)。
這樣當(dāng)flink的coordinator通知flink-commit時(shí),我們用另外一個(gè)外部數(shù)據(jù)庫(kù)的tx來(lái)把所有涉及到的precommit的數(shù)據(jù)的這個(gè)字段改為committed即可, 這就解決了第一個(gè)問(wèn)題。對(duì)于第二個(gè)問(wèn)題來(lái)說(shuō), 如果最終flink coordinator決定abort, 我們把此字段設(shè)為abort并利用一個(gè)異步垃圾回收的程序把所有標(biāo)記為abort的數(shù)據(jù)清理掉即可。
③這樣設(shè)計(jì)的關(guān)鍵是, 即使sink precommit ack之后掛了, 我們要flink-commit的數(shù)據(jù)也不會(huì)丟。所以其實(shí)flink-precommit ack時(shí), sink把數(shù)據(jù)寫在任何其他可以保證數(shù)據(jù)高可用的地方都行(只要sink fail掉重啟之后還能找到它), 未必需要是同一個(gè)數(shù)據(jù)庫(kù)的同一個(gè)表。如果采取這種策略, 那么在flink-commit時(shí)則需要重新把要db-commit的數(shù)據(jù)從存的地方讀出來(lái), 然后重新寫入到真正要寫的數(shù)據(jù)庫(kù)并db-commit。
④flink提供了一個(gè)TwoPhaseCommitSinkFunction,[40]里有詳細(xì)描述如何簡(jiǎn)單的extends這個(gè)interface來(lái)實(shí)現(xiàn)一個(gè)可以和flink的global consistency配合的sink節(jié)點(diǎn)的邏輯,本文不再贅述。
需要注意的一點(diǎn)是,當(dāng)sink收到coordinator的flink-commit指令之后,運(yùn)行sink的db-commit邏輯,在外部數(shù)據(jù)庫(kù)的db-commit更改完畢(比如把要commit的數(shù)據(jù)的status的值從precommit改為committed)后,但是flink記住sink已經(jīng)完成commit之前(flink在跑完sink的commit函數(shù)后會(huì)記住這個(gè)sinki已經(jīng)commit了, 所以不再重復(fù)call sink的commit, 否則flink就會(huì)一直重試commit), 此時(shí),一旦sink掛了,那么在另外的機(jī)器重啟的sink,flink無(wú)法得知外部數(shù)據(jù)庫(kù)已經(jīng)commit成功了,所以flink會(huì)再次重試commit函數(shù)來(lái)嘗試commit。從而造成重復(fù)commit,這也是[40]中提到的commit必須設(shè)計(jì)為冪等操作的原因。
注意1: 可以使用2PC作為sink的關(guān)鍵是, 你的sink可以保證在ack pre-commit之后, 保證無(wú)論任何情況都可以成功commit; 這不是說(shuō)你的sink所連接的外部系統(tǒng)支持tx就可以的, 需要application設(shè)計(jì)者根據(jù)情況具體設(shè)計(jì)。[1]的P213頁(yè), 就描述了sink是用kafka transaction記錄計(jì)算結(jié)果到kafka,但是即使用了transaction也可能丟數(shù)據(jù)的一種edge case。而[41] Kafka 0.11 and newer=>Caveats 里也有提到。
丟失數(shù)據(jù)的原因就在于, kafka sink的默認(rèn)實(shí)現(xiàn):FlinkKafkaProducer011, 在precommit的時(shí)候沒有真的commit數(shù)據(jù), 因此當(dāng)kafka sink fail掉沒有及時(shí)重啟, 一旦kafka tx超時(shí), 所有tx里的數(shù)據(jù)都會(huì)丟失, 而此時(shí)如果coordinator已經(jīng)決定commit就絕不會(huì)再重發(fā)數(shù)據(jù)(source也已經(jīng)commit發(fā)出的消息的index),從而kafka sink的此次tx的所有數(shù)據(jù)永久丟失。
這里提供的DB版本的sink實(shí)現(xiàn)思路, 在precommit階段就commit數(shù)據(jù), 來(lái)保證“無(wú)論如何數(shù)據(jù)都不會(huì)丟”, 但是用app level的flag屏蔽外部可見; 這樣做的原因就是為了克服類似kafka sink的這種缺陷。
注意2: 使用2PC Sink的Flink應(yīng)該是可以應(yīng)對(duì)non-deterministic計(jì)算的, 因?yàn)橐坏ゝailure發(fā)生, 所有之前的狀態(tài)和對(duì)sink的寫入都會(huì)被rollback; 但是這樣的話, Flink在sink端就變成了micro-batch模型, batch大小取決于發(fā)barrier的頻率; 但是即使這樣, 由于只有sink需要聚集一個(gè)batch才能做一次2PC, 但是中間節(jié)點(diǎn)往下游發(fā)送計(jì)算結(jié)果還是即算即發(fā)的, 所以比起Spark這種所有中間計(jì)算都是micro-batch,micro-batch造成的額外latency會(huì)疊加式的增高的模型, 端到端的latency應(yīng)該還是會(huì)要小一些。
Latency, 冪等和non-deterministic利用冪等的sink可以做到實(shí)時(shí)記錄計(jì)算結(jié)果, 達(dá)到最小的end to end latency。因?yàn)閟ink根本不需要等待barrier, 來(lái)一條計(jì)算結(jié)果就向外部系統(tǒng)commit一條記錄就好, 而由冪等保證了就算整個(gè)系統(tǒng)開始重算, 在sink端也會(huì)表現(xiàn)出每個(gè)source端的event只產(chǎn)生了一次效果的結(jié)果。
但是冪等是很難克服non-deterministic計(jì)算的。因?yàn)閚on-deterministic計(jì)算使得同一個(gè)source發(fā)出的event引起千變?nèi)f化的"蝴蝶效應(yīng)" (比如第一次計(jì)算event生成的Key是A, 第二次重算生成的Key是B, 如果下一個(gè)節(jié)點(diǎn)是partitionByKey, 那么這里的2次計(jì)算結(jié)果就會(huì)發(fā)送給了完全不同的下游節(jié)點(diǎn), 考慮幾百次不確定計(jì)算引起的不同蝴蝶效應(yīng), 等計(jì)算結(jié)果到達(dá)各個(gè)sink節(jié)時(shí), 計(jì)算的key和value甚至結(jié)果的個(gè)數(shù)和在sink節(jié)點(diǎn)的distribution都完全不同了, 那么sink也就完全無(wú)法利用冪等來(lái)屏蔽掉同一個(gè)event replay所造成的"蝴蝶效應(yīng)"了)
相比之下, 如果整個(gè)流系統(tǒng)的計(jì)算都是確定性的, 那么無(wú)論在source端replay多少次同一個(gè)event, 它所產(chǎn)生的"蝴蝶效應(yīng)"在sink端也必定相同, 則application設(shè)計(jì)者則可以很容易設(shè)計(jì)出冪等操作來(lái)屏蔽掉重復(fù)的計(jì)算結(jié)果。
如果業(yè)務(wù)里無(wú)法去除non-determnistic的計(jì)算, 那么你只能選擇Google Dataflow, KafkaStream,或者Flink+2PCSink; 而只支持冪等的Spark和利用冪等sink的Flink無(wú)法支持non-determnistic的業(yè)務(wù)計(jì)算。
REFERENCEStream Processing with Apache Flink: Fundamentals, Implementation, and Operation of Streaming Applications
Designing Data-Intensive Applications: The Big Ideas Behind Reliable, Scalable, and Maintainable Systems
Lightweight Asynchronous Snapshots for Distributed Dataflows
Streaming Systems: The What, Where, When, and How of Large-Scale Data Processing
Kafka Streams in Action: Real-time apps and microservices with the Kafka Streams API
The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing
MillWheel: Fault-Tolerant Stream Processing at Internet Scale
Distributed Snapshots: Determining Global States of Distributed Systems (Chandy-Lamport)
State Management in Apache Flink R Consistent Stateful Distributed Stream Processing
Discretized Streams: Fault-Tolerant Streaming Computation at Scale
Continuous Processing in Structured Streaming Design Sketch
Structured Streaming: A Declarative API for Real-Time Applications in Apache Spark
Structured Streaming Programming Guide 2.4.3
Spark Streaming Programming Guide2.4.3
Watermarks, Tables, Event Time, and the Dataflow Model
Kafka Streams’ Take on Watermarks and Triggers
Streams Architecture Kafka
Enabling Exactly Once in Kafka Streams
Transactions in Apache Kafka
Introducing Low-latency Continuous Processing Mode in Structured Streaming in Apache Spark 2.3
State Management in Spark Structured Streaming
Process Large DynamoDB Streams Using Multiple Amazon Kinesis Client Library (KCL) Workers
Big Data: Principles and best practices of scalable realtime data systems
Making Sense of Stream Processing
Consensus on Transaction Commit
Exactly Once Delivery and Transactional Messaging in Kafka=>docs.google.com/documen
End-to-End Arguments in System Design
Transactional Messaging in Kafka
Akka Split Brain Resolver
Unreliable Failure Detectors for Reliable Distributed Systems
The Weakest Failure Detector for Solving Consensus
Exactly once Semantics are Possible: Here’s How Kafka Does it
24/7 Spark Streaming on YARN in Production
Monitoring Back Pressure (flink)
Managing Large State in Apache Flink: An Intro to Incremental Checkpointing
Impossibility of Distributed Consensus with One Faulty Process (AKA, FLP impossibility)
Kubernetes in Action
Akka:Auto-Downing(DO NOT USE)
ZooKeeper: Distributed Process Coordination
An Overview of End-to-End Exactly-Once Processing in Apache Flink
Kafka producers and fault tolerance
想知道更多?掃描下面的二維碼關(guān)注我
加技術(shù)群入口(備注:Tech):
免費(fèi)星球入口:
免費(fèi)資料入口:后臺(tái)回復(fù)“666”
朕已閱?
總結(jié)
以上是生活随笔為你收集整理的压箱底总结:流系统端到端一致性对比的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 下次遇到嚣张的候选人就先这么问:系统变慢
- 下一篇: java信息管理系统总结_java实现科