java amqp_AMQP协议
當(dāng)前各種應(yīng)用大量使用異步消息模型,并隨之產(chǎn)生眾多消息中間件產(chǎn)品及協(xié)議,標(biāo)準(zhǔn)的不一致使應(yīng)用與中間件之間的耦合限制產(chǎn)品的選擇,并增加維護(hù)成本。AMQP是一個(gè)提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)協(xié)議,基于此協(xié)議的客戶端與消息中間件可傳遞消息,并不受客戶端/中間件不同產(chǎn)品,不同開發(fā)語言等條件的限制。
當(dāng)然這種降低耦合的機(jī)制是基于與上層產(chǎn)品,語言無關(guān)的協(xié)議。AMQP協(xié)議是一種二進(jìn)制協(xié)議,提供客戶端應(yīng)用與消息中間件之間異步、安全、高效地交互。從整體來看,AMQP協(xié)議可劃分為三層:
這種分層架構(gòu)類似于OSI網(wǎng)絡(luò)協(xié)議,可替換各層實(shí)現(xiàn)而不影響與其它層的交互。AMQP定義了合適的服務(wù)器端域模型,用于規(guī)范服務(wù)器的行為(AMQP服務(wù)器端可稱為broker)。在這里Model層決定這些基本域模型所產(chǎn)生的行為,這種行為在AMQP中用”command”表示,在后文中會(huì)著重來分析這些域模型。Session層定義客戶端與broker之間的通信(通信雙方都是一個(gè)peer,可互稱做partner),為command的可靠傳輸提供保障。Transport層專注于數(shù)據(jù)傳送,并與Session保持交互,接受上層的數(shù)據(jù),組裝成二進(jìn)制流,傳送到receiver后再解析數(shù)據(jù),交付給Session層。Session層需要Transport層完成網(wǎng)絡(luò)異常情況的匯報(bào),順序傳送command等工作。
上面是對(duì)AMQP協(xié)議的大致說明。下面會(huì)以我們對(duì)消息服務(wù)的需求來理解AMQP所提供的域模型。消息中間件的主要功能是消息的路由(Routing)和緩存(Buffering)。在AMQP中提供類似功能的兩種域模型:Exchange 和 Message queue。
Exchange接收消息生產(chǎn)者(Message Producer)發(fā)送的消息根據(jù)不同的路由算法將消息發(fā)送往Message queue。Message queue會(huì)在消息不能被正常消費(fèi)時(shí)緩存這些消息,具體的緩存策略由實(shí)現(xiàn)者決定,當(dāng)message queue與消息消費(fèi)者(Message consumer)之間的連接通暢時(shí),Message queue有將消息轉(zhuǎn)發(fā)到consumer的責(zé)任。
Message是當(dāng)前模型中所操縱的基本單位,它由Producer產(chǎn)生,經(jīng)過Broker被Consumer所消費(fèi)。它的基本結(jié)構(gòu)有兩部分: Header和Body。Header是由Producer添加上的各種屬性的集合,這些屬性有控制Message是否可被緩存,接收的queue是哪個(gè),優(yōu)先級(jí)是多少等。Body是真正需要傳送的數(shù)據(jù),它是對(duì)Broker不可見的二進(jìn)制數(shù)據(jù)流,在傳輸過程中不應(yīng)該受到影響。
一個(gè)broker中會(huì)存在多個(gè)Message queue,Exchange怎樣知道它要把消息發(fā)送到哪個(gè)Message queue中去呢? 這就是上圖中所展示Binding的作用。Message queue的創(chuàng)建是由client application控制的,在創(chuàng)建Message queue后需要確定它來接收并保存哪個(gè)Exchange路由的結(jié)果。Binding是用來關(guān)聯(lián)Exchange與Message queue的域模型。Client application控制Exchange與某個(gè)特定Message queue關(guān)聯(lián),并將這個(gè)queue接受哪種消息的條件綁定到Exchange,這個(gè)條件也叫Binding key或是 Criteria。
在與多個(gè)Message queue關(guān)聯(lián)后,Exchange中就會(huì)存在一個(gè)路由表,這個(gè)表中存儲(chǔ)著每個(gè)Message queue所需要消息的限制條件。Exchange就會(huì)檢查它接受到的每個(gè)Message的Header及Body信息,來決定將Message路由到哪個(gè)queue中去。Message的Header中應(yīng)該有個(gè)屬性叫Routing Key,它由Message發(fā)送者產(chǎn)生,提供給Exchange路由這條Message的標(biāo)準(zhǔn)。Exchange根據(jù)不同路由算法有不同有Exchange Type。比如有Direct類似,需要Binding key等于Routing key;也有Binding key與Routing key符合一個(gè)模式關(guān)系;也有根據(jù)Message包含的某些屬性來判斷。一些基礎(chǔ)的路由算法由AMQP所提供,client application也可以自定義各種自己的擴(kuò)展路由算法。那么一個(gè)Message的處理流程類似于這樣:
在這里有個(gè)新名詞需要介紹: Virtual Host。一個(gè)Virtual Host可持有一些Exchange和Message queue。它是一個(gè)虛擬概念,一個(gè)Virtual Host可以是一臺(tái)服務(wù)器,也可以是由多臺(tái)服務(wù)器組成的集群。同步擴(kuò)展下,Exchange與Message queue的部署也可以是一臺(tái)或是多臺(tái)服務(wù)器上。
Message的產(chǎn)生者和消費(fèi)者可能是同一個(gè)應(yīng)用。整個(gè)AMQP定義的就是Client application與Broker之間的交互。在粗略介紹完AMQP的域模型后,可以關(guān)注下Client是怎樣與Broker建立起連接的。
在AMQP中,Client application想要與Broker溝通,就需要建立起與Broker的connection,這種connection其實(shí)是與Virtual Host相關(guān)聯(lián)的,也就是說,connection是建立在client與Virtual Host之間。可以在一個(gè)connection上并發(fā)運(yùn)行多個(gè)channel,每個(gè)channel執(zhí)行與Broker的通信,我們前面提供的session就是依附于channel上的。
這里的Session可以有多種定義,既可以表示AMQP內(nèi)部提供的command分發(fā)機(jī)制,也可以說是在宏觀上區(qū)別與域模型的接口。正常理解就是我們平時(shí)所說的交互context,主要作用就是在網(wǎng)絡(luò)上可靠地傳遞每一個(gè)command。在AMQP的設(shè)計(jì)中,應(yīng)當(dāng)是借鑒了TCP的各種設(shè)計(jì),用于保證這種可靠性。
在Session層,為上層所需要交互的每個(gè)command分配一個(gè)惟一標(biāo)識(shí)符(可以是一個(gè)UUID),是為了在傳輸過程中可以對(duì)command做校驗(yàn)和重傳。Command發(fā)送端也需要記錄每個(gè)發(fā)送出去的command到Replay Buffer,以期得到接收方的回饋,保證這個(gè)command被接收方明確地接收或是已執(zhí)行這個(gè)command。對(duì)于超時(shí)沒有收到反饋的command,發(fā)送方再次重傳。如果接收方已明確地回饋信息想要告知command發(fā)送方但這條信息在中途丟失或是其它問題發(fā)送方?jīng)]有收到,那么發(fā)送方不斷重傳會(huì)對(duì)接收方產(chǎn)生影響,為了降低這種影響,command接收方設(shè)置一個(gè)過濾器Idempotency Barrier,來攔截那些已接收過的command。?關(guān)于這種重傳及確認(rèn)機(jī)制,可以參考下TCP的相關(guān)設(shè)計(jì)。
上面大致介紹了AMQP的域模型及連接機(jī)制中的確認(rèn)及重傳模型,不涉及AMQP的詳細(xì)二進(jìn)制規(guī)范。
總結(jié)
以上是生活随笔為你收集整理的java amqp_AMQP协议的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java 的tree_Java Tree
- 下一篇: 过滤器java面试_过滤器监听器面试题都