日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

基于TableStore构建简易海量Topic消息队列

發(fā)布時(shí)間:2024/8/23 编程问答 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 基于TableStore构建简易海量Topic消息队列 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

摘要: 前言 消息隊(duì)列,通常有兩種場景,一種是發(fā)布者訂閱模式,一種是生產(chǎn)者消費(fèi)者模式。發(fā)布者訂閱模式,即發(fā)布者生產(chǎn)消息放入隊(duì)列,多個(gè)監(jiān)聽的消費(fèi)者都會收到同一份消息,也就是每個(gè)消費(fèi)者收到的消息是一樣的。生產(chǎn)者消費(fèi)者模式,生產(chǎn)者生產(chǎn)消息放入隊(duì)列,多個(gè)消費(fèi)者同時(shí)監(jiān)聽隊(duì)列,誰先搶到消息就會從隊(duì)列中取走消息,最終每個(gè)消息只會有一個(gè)消費(fèi)者擁有。

前言
消息隊(duì)列,通常有兩種場景,一種是發(fā)布者訂閱模式,一種是生產(chǎn)者消費(fèi)者模式。發(fā)布者訂閱模式,即發(fā)布者生產(chǎn)消息放入隊(duì)列,多個(gè)監(jiān)聽的消費(fèi)者都會收到同一份消息,也就是每個(gè)消費(fèi)者收到的消息是一樣的。生產(chǎn)者消費(fèi)者模式,生產(chǎn)者生產(chǎn)消息放入隊(duì)列,多個(gè)消費(fèi)者同時(shí)監(jiān)聽隊(duì)列,誰先搶到消息就會從隊(duì)列中取走消息,最終每個(gè)消息只會有一個(gè)消費(fèi)者擁有。

在大數(shù)據(jù)時(shí)代,傳統(tǒng)的生產(chǎn)者消費(fèi)者隊(duì)列模式中的Topic數(shù)目可能從少量的幾個(gè)變?yōu)楹A縯opic。例如要實(shí)現(xiàn)一個(gè)全網(wǎng)爬蟲抓取任務(wù)調(diào)度系統(tǒng),每個(gè)大型的門戶,SNS都會成為一個(gè)topic。在topic內(nèi)部也會有海量的子網(wǎng)頁需要抓取。在實(shí)現(xiàn)這樣的一個(gè)任務(wù)分發(fā)調(diào)度系統(tǒng)時(shí)可能會遇到以下一些問題:

海量的topic,意味著我們可能會有海量的隊(duì)列。針對爬蟲場景,根據(jù)網(wǎng)頁類型,一類網(wǎng)站對應(yīng)到一個(gè)任務(wù)隊(duì)列,不同的任務(wù)隊(duì)列會有自己的生產(chǎn)者和消費(fèi)者。
生產(chǎn)者和消費(fèi)者會有多個(gè),在業(yè)務(wù)峰值期間,產(chǎn)生較大并發(fā)訪問,消息總量也是海量。針對爬蟲任務(wù)消息總量可能就是全網(wǎng)的網(wǎng)頁地址數(shù)量。
任務(wù)可能會有優(yōu)先級,為了實(shí)現(xiàn)優(yōu)先級高的任務(wù)優(yōu)先調(diào)度,我們可能會在一個(gè)topic下再細(xì)分子隊(duì)列。
消息消費(fèi)不能丟失,如果是作為任務(wù)的調(diào)度消息,我們的消息丟失失零容忍的。
消費(fèi)者模式中如果消費(fèi)者因?yàn)榉N種原因處理失敗或者超時(shí),需要支持消息被重新調(diào)度。
在保證消息一定會被處理的前提下,我們也要避免少量消息因?yàn)楦鞣N原因處理堆積,而影響整個(gè)系統(tǒng)的吞吐。因?yàn)橄⒆x區(qū)往往是輕量級,消息的處理是資源密集型。我們不希望因?yàn)橄⒆x區(qū)堆積導(dǎo)致處理資源閑置。
解決方案
基于TableStore(表格存儲)的跨分區(qū)高并發(fā),主鍵自增列這個(gè)特性又很好的適配到我們的隊(duì)列特性。支持海量,不同分區(qū)鍵下使用各自的自增主鍵,可以很好的實(shí)現(xiàn)海量隊(duì)列。具體我們給出如下方案:

需要設(shè)計(jì)以下表:

任務(wù)消息表
消息消費(fèi)checkpoint表
全量消息表
在介紹表設(shè)計(jì)之前,先做一些名詞解釋。

每個(gè)任務(wù)消息,我們假設(shè)已有一個(gè)唯一的id。
任務(wù)優(yōu)先級,我們假設(shè)優(yōu)先級范圍是固定并且已經(jīng)知道,如果任務(wù)優(yōu)先級過多,可以分層,例如優(yōu)先級1~100的映射到層級1。這里如果我們的任務(wù)沒有優(yōu)先級,那可以根據(jù)任務(wù)數(shù)據(jù)量級做一個(gè)簡單的分桶,然后輪訓(xùn)抓取每個(gè)分桶中的任務(wù)。
兩個(gè)游標(biāo),對應(yīng)到每個(gè)topic的每個(gè)優(yōu)先層級,我們需要記錄2個(gè)游標(biāo)位移點(diǎn)。一個(gè)是抓取掃描游標(biāo),一個(gè)是完成游標(biāo)。掃描游標(biāo)的定義是指當(dāng)前任務(wù)當(dāng)前優(yōu)先層級下,被掃描到的最大位移位置。完成位移點(diǎn)表示改任務(wù)當(dāng)前優(yōu)先層級下,最大的抓取完成位移點(diǎn),之前的任務(wù)都已經(jīng)完成抓取。
表設(shè)計(jì)
任務(wù)消息表

這里,每一個(gè)子任務(wù)都會被插入這張表,任務(wù)可能由不同的爬蟲端抓取后產(chǎn)生子任務(wù),在子任務(wù)產(chǎn)生的同時(shí),任務(wù)的訪問地址,訪問優(yōu)先級已經(jīng)被固定。我們根據(jù)一個(gè)分層算法進(jìn)行映射。所以主鍵前三列已經(jīng)確定,插入TableStore(表格存儲)后,id會自增生成,用于后續(xù)消費(fèi)者讀任務(wù)用。

消息消費(fèi)checkpoint表
這張表用于消息消費(fèi)的checkpoint。下面會結(jié)合schema具體說下checkpoint的內(nèi)容。


這張表屬性列上會有兩列,一列用來表示抓取掃描位移點(diǎn),一列記錄完成位移點(diǎn)。這里checkpoint的記錄需要使用條件更新,即我們只會確保原來值小于待更新的值才會更新。

全量消息表
我們用全量消息表存放我們的消息id以及對應(yīng)屬性,一個(gè)消息任務(wù)是否重復(fù)處理也通過這張表做判斷。

在全網(wǎng)信息表中,有一列屬性用來表示任務(wù)處理狀態(tài),消費(fèi)者在拿到任務(wù)id時(shí)需要條件更新這張表對應(yīng)的這個(gè)key,對應(yīng)行不存在可以直接插入。如果已經(jīng)存在,需要先讀狀態(tài)為非結(jié)束狀態(tài),版本為讀到版本情況下再做更新。更新成功者意味著當(dāng)前id的任務(wù)被這個(gè)消費(fèi)者搶占。其中行不存在表示第一次爬取,如果存在非結(jié)束狀態(tài),表示之前的任務(wù)可能已經(jīng)失敗。

任務(wù)消費(fèi)處理流程
下面我們用爬蟲抓取全網(wǎng)網(wǎng)頁做為例子來看下具體如何基于TableStore(表格存儲)做消息隊(duì)列并最終實(shí)現(xiàn)任務(wù)的分發(fā):


這張圖展現(xiàn)了我們的整個(gè)爬蟲框架,爬蟲具體流程如下

不同的爬蟲端會根據(jù)自身爬取進(jìn)度定時(shí)從TableStore的爬蟲任務(wù)表進(jìn)行拉取爬蟲任務(wù),這里一般我們單線程GetRange訪問TableStore,我們認(rèn)為這里的任務(wù)讀區(qū)速率會遠(yuǎn)大于抓取消費(fèi)者的速度,從TableStore讀區(qū)到的任務(wù)數(shù)據(jù)進(jìn)入爬蟲內(nèi)存隊(duì)列,然后進(jìn)行下一輪任務(wù)消息讀區(qū)。直到當(dāng)前內(nèi)存隊(duì)列滿后等待下輪喚醒繼續(xù)抓取,如果有特殊需求可以并發(fā)拉取不同優(yōu)先級。
初始對于每個(gè)任務(wù)的各個(gè)priority,他們的默認(rèn)checkpoint都對應(yīng)于TableStore的一個(gè)flag即Inf_Min,也就是第一行。
GetRange拉取到當(dāng)前任務(wù)各優(yōu)先級抓取任務(wù)后(例如我們可以設(shè)置從優(yōu)先級高到低,一次最多200條,抓夠200條進(jìn)行一次任務(wù)搶占),爬蟲會先根據(jù)具體優(yōu)先級排序,然后按照優(yōu)先級從高到低嘗試更新網(wǎng)頁信息表,進(jìn)行爬取任務(wù)搶占,搶占成功后,該任務(wù)會被放進(jìn)爬蟲的內(nèi)存任務(wù)隊(duì)列給抓取線程使用。搶占成功同時(shí)我們也會更新一下爬蟲任務(wù)表中的狀態(tài),和當(dāng)前的時(shí)間,表示任務(wù)最新的更新時(shí)間,后續(xù)的任務(wù)狀態(tài)檢驗(yàn)線程會查看任務(wù)是否已經(jīng)過期需要重新處理。注意這里假如有一個(gè)爬蟲線程比較leg,是上一輪搶占任務(wù)后卡了很久才嘗試更新這個(gè)時(shí)間,也沒有問題。這種小概率的leg可能會帶來重復(fù)抓取,但是不會影響數(shù)據(jù)的一致性。并且我們可以在內(nèi)存中記錄下每一步的時(shí)間,如果我們發(fā)現(xiàn)每一步內(nèi)存中的時(shí)間超時(shí)也可以結(jié)束當(dāng)前任務(wù),進(jìn)一步減少小概率的重復(fù)抓取。
當(dāng)一輪的任務(wù)全部填充后,我們會根據(jù)當(dāng)前拿到的最大任務(wù)表id+1(即爬蟲任務(wù)表第三個(gè)主鍵,也就是自增主鍵)進(jìn)行嘗試當(dāng)前任務(wù)對應(yīng)優(yōu)先級checkpoint表的更新(這里更新頻率可以根據(jù)業(yè)務(wù)自由決定),更新的原則是新的id要大于等于當(dāng)前id。如果更新成功后,可以使用當(dāng)前更新值繼續(xù)拉取,如果更新失敗,意味著有另一個(gè)爬蟲已經(jīng)取得更新的任務(wù),需要重新讀一下checkpoint表獲得最新的checkpoint id值,從該id繼續(xù)拉取。
除了任務(wù)抓取線程以為,每個(gè)爬蟲端可以有一個(gè)頻率更低的任務(wù)進(jìn)行任務(wù)完成掃描,這個(gè)任務(wù)用來最新的完成任務(wù)游標(biāo)。掃描中g(shù)etrange的最大值為當(dāng)前拉取的起始位置,掃描的邏輯分以下幾種:

掃描到該行已經(jīng)更新為完成,此時(shí)游標(biāo)可以直接下移
掃描到任務(wù)還是initial狀態(tài),一個(gè)任務(wù)沒有被任何人設(shè)置為running,切被拉去過,原因是這個(gè)任務(wù)是一個(gè)重復(fù)抓取的任務(wù),此時(shí)可以去url表中檢查這個(gè)url是否存在,存在直接跳過。
掃描到任務(wù)是running,不超時(shí)認(rèn)為任務(wù)還在執(zhí)行,結(jié)束當(dāng)輪掃描。如果檢查時(shí)間戳超時(shí),檢查url表,如果內(nèi)容已經(jīng)存在,則有可能是更新狀態(tài)回任務(wù)表失敗,游標(biāo)繼續(xù)下移。如果內(nèi)容也不存在,一種簡單做法是直接在表對應(yīng)優(yōu)先級中put一個(gè)新任務(wù),唯一的問題是如果是并發(fā)檢查可能會產(chǎn)生重復(fù)的任務(wù)(重復(fù)任務(wù)通過url去重也可以解決)。另一種做法也是通過搶任務(wù)一樣更新url表,更新成功者可以新建任務(wù)下移坐標(biāo)。其余的人停止掃描,更新checkpoint為當(dāng)前位置。更新成功者可以繼續(xù)下移掃描直至尾部或者任務(wù)正常進(jìn)行位置,然后更新checkpoint。
爬蟲抓取每個(gè)任務(wù)完成后,會更新全網(wǎng)url表中的狀態(tài)以及對應(yīng)爬蟲任務(wù)表中的狀態(tài),其中全網(wǎng)url的狀態(tài)用來給后續(xù)抓取任務(wù)去重使用,爬蟲任務(wù)表中的狀態(tài)給上面步驟5的完成游標(biāo)掃描線程使用判斷一個(gè)任務(wù)是否已經(jīng)完成。
整個(gè)寫入子任務(wù)和讀取我們可以抽象出下面這張圖


新任務(wù)會根據(jù)優(yōu)先級并發(fā)寫入不同的隊(duì)列,其中圖中編號就對應(yīng)表格存儲中的自增列,用戶按照上面設(shè)計(jì)表結(jié)構(gòu)的話,不需要自己處理并發(fā)寫入的編號,表格存儲服務(wù)端會保證唯一且自增,即新任務(wù)在對應(yīng)隊(duì)列末尾。爬蟲讀取任務(wù)的游標(biāo)就是圖中紅色,藍(lán)色對應(yīng)完成的任務(wù)列表。兩個(gè)游標(biāo)在響應(yīng)優(yōu)先級下獨(dú)立維護(hù)。

下面我們舉個(gè)例子,如果一個(gè)爬蟲任務(wù)拉取線程假如設(shè)置一次拉2個(gè)任務(wù)為例,

我們的爬蟲任務(wù)表會從上面切換成下圖,task1 priority=3的掃描游標(biāo)更新到了10011,priority=2的掃描游標(biāo)更新到10006。也就意味著掃描優(yōu)先級3的下次會從10011開始,優(yōu)先級2的會從10006開始。

并發(fā)處理
多爬蟲拉取任務(wù)有重復(fù),這部分我們通過條件更新大表決定了同一個(gè)網(wǎng)頁不會同時(shí)被抓取。
多爬蟲條件更新checkpoint表決定了我們整個(gè)拉取任務(wù)不會漏過當(dāng)前拉到的一批任務(wù),如果checkpoint更新如果條件失敗任務(wù)繼續(xù)進(jìn)行,其他類型可重試錯(cuò)誤會繼續(xù)重試(例如服務(wù)短時(shí)間不可用,leg等。)這里只有可能導(dǎo)致其他爬蟲喚醒后拉到重復(fù)數(shù)據(jù),但是抓取因?yàn)閾屨际∫膊粫貜?fù)拉取,并且新喚醒的客戶端也會更新更大的游標(biāo),保證系統(tǒng)不會因?yàn)橐粋€(gè)客戶端leg而任務(wù)掃描游標(biāo)滯后。
任務(wù)判定完成邏輯我們可以做分布式互斥,同時(shí)只有一個(gè)進(jìn)程在判斷。也可以在判斷任務(wù)失敗的時(shí)候進(jìn)行條件更新原表,更新成功后再新插入一條新任務(wù)。
總結(jié)
最后我們再來看下整個(gè)設(shè)計(jì)中幾個(gè)關(guān)鍵的問題是否滿足

海量topic,TableStore(表格存儲)天然的以一個(gè)分區(qū)鍵做為一個(gè)隊(duì)列的能力使得我們可以很容易的實(shí)現(xiàn)海量的隊(duì)列,數(shù)量級可以在億級別甚至更多。
優(yōu)先級,優(yōu)先級對應(yīng)一個(gè)主鍵列,依照優(yōu)先級進(jìn)行分層優(yōu)先級高的會被優(yōu)先getrange獲得。
系統(tǒng)吞吐,整個(gè)系統(tǒng)中兩個(gè)游標(biāo)的設(shè)計(jì),使得我們?nèi)蝿?wù)掃描游標(biāo)每輪掃描后都會快速向下走,長尾任務(wù)不會阻礙對新任務(wù)的掃描。另一方面我們?nèi)蝿?wù)會在url大表上做搶占,避免不必要的重復(fù)抓取。
子任務(wù)不丟失,自增列的保證了新任務(wù)會用更大的值即排在當(dāng)前隊(duì)列末尾。另外有一個(gè)完成掃描線程,會確保新任務(wù)全部完成后才會更新,這個(gè)游標(biāo)代表了最后整個(gè)任務(wù)是否完成。這個(gè)游標(biāo)也保證了任務(wù)不會丟失。這個(gè)任務(wù)會對長尾的任務(wù)重新建一個(gè)任務(wù)并插入隊(duì)列,新任務(wù)會被新爬蟲端重新觸發(fā),也避免了因?yàn)橐粋€(gè)客戶端卡住而餓死的問題。

總結(jié)

以上是生活随笔為你收集整理的基于TableStore构建简易海量Topic消息队列的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。