日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

ESFramework介绍之(23)―― AgileTcp

發(fā)布時間:2023/11/30 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ESFramework介绍之(23)―― AgileTcp 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

??? 前面已經(jīng)介紹了ITcp接口,而AgileTcp就是ESFramework給出的ITcp的參考實現(xiàn)。在之前,我曾經(jīng)講解過模擬完成端口的Tcp組件實現(xiàn)和異步Tcp組件實現(xiàn),在它們的基礎(chǔ)之上,我更改了處理策略,而形成了AgileTcp,目的是更清晰的結(jié)構(gòu)、更高的效率。這個策略會在后面講到。

??? Tcp組件主要控制著系統(tǒng)與終端用戶的所有消息的進(jìn)出,ITcp接口描述了這個組件的外貌,告訴外部如何使用Tcp組件、如何與Tcp組件交互。而從實現(xiàn)的角度來看,我們必須理清Tcp組件的職責(zé):
(1)?管理所有已經(jīng)建立的Tcp連接
(2)?管理與每個連接相對應(yīng)接收緩沖區(qū)
(3)?管理所有的工作者線程
(4)?處理長度大于接收緩沖區(qū)的消息

??? 我們來看看如何滿足這些職責(zé)。
??? 由于每個連接都對應(yīng)著一個接收緩沖區(qū),所以可以將它們封裝在一起形成ContextKey(連接上下文):
???

ContextKey
??? public?class?ContextKey
????{????????
????????
private?byte[]??buffer?;??????????//封裝接收緩沖區(qū)
????????private?ISafeNetworkStream?netStream?=?null?;????????????
????????
private?volatile?bool??????isDataManaging?=?false?;
????????
????????
public?ContextKey(ISafeNetworkStream?net_Stream?,int?buffSize)
????????{
????????????
this.netStream?=?net_Stream?;????????????
????????????
this.buffer????=?new?byte[buffSize]?;????????????
????????}

????????
#region?NetStream??
????????
public?ISafeNetworkStream?NetStream
????????{
????????????
get
????????????{
????????????????
return?this.netStream?;
????????????}
????????}????????????

????????
public?byte[]?Buffer
????????{
????????????
get
????????????{
????????????????
return?this.buffer?;
????????????}????????????
????????}????????

????????
public?bool?IsDataManaging
????????{
????????????
get
????????????{
????????????????
return?this.isDataManaging?;
????????????}
????????????
set
????????????{
????????????????
this.isDataManaging?=?value?;
????????????}
????????}

????????
private?bool?firstMessageExist?=?false?;
????????
public??bool?FirstMessageExist?
????????{
????????????
get
????????????{
????????????????
return?this.firstMessageExist?;
????????????}
????????????
set
????????????{
????????????????
this.firstMessageExist?=?value?;
????????????}
????????}
????????
#endregion????????????
????}????

??? ContextKey中封裝的是ISafeNetworkStream而不是NetworkStream,原因可參見這里。
??? IsDataManaging屬性表明工作線程是否正在處理本連接對應(yīng)的緩沖區(qū)中的數(shù)據(jù),FirstMessageExist屬性用于標(biāo)志接收到的第一條消息,因為系統(tǒng)可能需要對接收到的第一條消息做一些特殊的處理。

??? 任何時刻,可能都有成千上萬個連接存活著;任何時刻,都可能有新的連接建立、或已有的連接被釋放。所有這些ContextKey對象需要被管理起來,這就是上下文管理器IContextKeyManager:

??? public?interface?IContextKeyManager
????{
????????
void?InsertContextKey(ContextKey?context_key)?;
????????
void?DisposeAllContextKey()?;????????
????????
void?RemoveContextKey(int?streamHashCode)?;
????????ISafeNetworkStream?GetNetStream(
int?streamHashCode)?;

????????
int????????????ConnectionCount?{get?;}
????????ICollection?ContextKeyList{
get?;}

????????
event?CbSimpleInt?StreamCountChanged?;????????
????}????

??? 說到上下文管理器,先要講講如何標(biāo)志每個不同的上下文了?使用連接字,連接字是Tcp連接的Hashcode,它們與連接一一對應(yīng),并且不會重復(fù)。所以在源碼中你經(jīng)常會看到名為“streamHashCode”的參數(shù)和變量。由于Tcp連接與streamHashCode一一對應(yīng),所以GetNetStream方法的實現(xiàn)就非常簡單。不知道你是否記得,RoundedMessage類中有個ConnectID字段,它就是連接字,與streamHashCode意義一樣。根據(jù)此字段,你可以清楚的知道這個RoundedMessage來自于哪個連接。

??? 關(guān)于工作者線程,很幸運的是,我們可以直接使用.NET提供的后臺線程池,而不必要再去手動管理,這可以省卻一些麻煩。當(dāng)然你也可以使用ThreadPool類,甚至你可以從頭開始實現(xiàn)自己的線程池組件,這也是不困難的。
????
??? 我經(jīng)常被問到,接收緩沖區(qū)應(yīng)該開辟多大?這取決于你的應(yīng)用,但是有一點是錯不了的――緩沖區(qū)的大小至少要大于消息頭Header的大小,否則麻煩就多了。根據(jù)我的經(jīng)驗,一般緩沖區(qū)的大小至少應(yīng)該能容納所有接收消息中的60%-80%。對于大于緩沖區(qū)大小的消息,ESFramework采用的策略是使用緩沖區(qū)池IBufferPool。

??? public?interface?IBufferPool
????{
????????
byte[]?RentBuffer(int?minSize)?;
????????
void???GivebackBuffer(byte[]?buffer)?;
????}

??? 通過上面的介紹我們已經(jīng)知道如何滿足Tcp組件的職責(zé),現(xiàn)在我們來看看更細(xì)的實現(xiàn)策略:
(1)?使用Checker線程。
??? 使用Checker線程是AgileTcp組件的區(qū)別于模擬完成端口的Tcp組件實現(xiàn)和異步Tcp組件的主要特色。當(dāng)AgileTcp啟動時,Checker線程也隨之啟動,這個線程的主要工作就是檢查已經(jīng)存在的每個連接上是否有數(shù)據(jù)要接收(還記得Select網(wǎng)絡(luò)模型),這可以通過NetworkStream.DataAvailable屬性知道。如果發(fā)現(xiàn)某個連接上有待接收的數(shù)據(jù),就將其放到工作者線程中去處理,并設(shè)置前面提到的ContextKey.IsDataManaging屬性,然后再判斷下個連接,如此循環(huán)下去。

??????? private?void?TaskChecker()
????????{
????????????
while(!?this.stop)
????????????{
????????????????
foreach(ContextKey?key?in?this.contextKeyManager.ContextKeyList)
????????????????{
????????????????????
if(this.stop)
????????????????????{
????????????????????????
break?;
????????????????????}

????????????????????
if((!?key.IsDataManaging)?&&?key.NetStream.DataAvailable)
????????????????????{????????????????????????
????????????????????????key.IsDataManaging?
=?true?;????
????????????????????????CbContextKey?cb?
=?new?CbContextKey(this.DataManaging)?;
????????????????????????cb.BeginInvoke(key?,
null?,null?)?;
????????????????????}????????????????????
????????????????}

????????????????System.Threading.Thread.Sleep(
50)?;
????????????}
????????}

(2)?將消息頭的解析置于Tcp組件之中
??? 將消息頭解析置于Tcp組件之中這個方案我層考慮了非常久,原因是,這會破壞Tcp組件的單純性,使得Tcp組件與協(xié)議(Contract)有所關(guān)聯(lián)。最終采用這個策略的第一個理由是清晰,第二個理由是效率。清晰在于簡化了ContextKey結(jié)構(gòu),避免了使用消息分裂器這樣復(fù)雜的算法組件(如果大家看過我以前關(guān)于通信方案的文章,一定會得到這樣的答案)。效率在于,當(dāng)在此解析了Header之后,后面所有的處理器都可以使用這個Header對象了,而不用在自己去解析。這也是NetMessage類中有個Header字段的原因。

(3)?針對于某個連接,只有當(dāng)上一個消息處理完并將回復(fù)發(fā)送后(如果有回復(fù)的話),才會去接收下一個消息。
??? 這個策略會使很多事情變得簡單,而且不會影響任何有用的特性。由于不會在處理消息的時候去接收下一個消息,所以可以直接處理接收緩沖區(qū)中的數(shù)據(jù),而不需要將數(shù)據(jù)從接收緩沖區(qū)拷貝到另外的地方去處理。這又對效率提高有所幫助。

??? 綜上所述,我們可以總結(jié)工作者線程要做的事情:首先,從連接上接收MessageHeaderSize個字節(jié),解析成Header,然后在接收Header. MessageBodyLength個字節(jié),即是Body,接著構(gòu)造成RoundedMessage對象交給消息分配器去處理,最后將得到的處理結(jié)果發(fā)送出去。代碼如下所示:

DataManaging
??????? private?void?DataManaging(ContextKey?key)
????????{????
????????????
int?streamHashCode?=?key.NetStream.GetHashCode()?;????
????????????
int?headerLen?=?this.contractHelper.MessageHeaderLength?;
????????????
????????????
while((key.NetStream.DataAvailable)?&&?(!?this.stop))
????????????{
????????????????
byte[]?rentBuff?=?null?;//每次分派的消息中,最多有一個rentBuff

????????????????
try
????????????????{
????????????????????
#region?構(gòu)造?RoundedMessage
????????????????????NetHelper.RecieveData(key.NetStream?,key.Buffer?,
0?,headerLen)?;
????????????????????IMessageHeader?header?
=?this.contractHelper.ParseMessageHeader(key.Buffer?,0)?;????
????????????????????
if(!?this.contractHelper.ValidateMessageToken(header))
????????????????????{
????????????????????????
this.DisposeOneConnection(streamHashCode?,DisconnectedCause.MessageTokenInvalid)?;
????????????????????????
return?;
????????????????????}

????????????????????RoundedMessage?requestMsg?
=?new?RoundedMessage()?;
????????????????????requestMsg.ConnectID??????
=?streamHashCode?;
????????????????????requestMsg.Header?????????
=?header?;
????????????????????
????????????????????
if(!?key.FirstMessageExist)
????????????????????{
????????????????????????requestMsg.IsFirstMessage?
=?true?;
????????????????????????key.FirstMessageExist?????
=?true?;
????????????????????}

????????????????????
if((headerLen?+?header.MessageBodyLength)?>?this.maxMessageSize)
????????????????????{
????????????????????????
this.DisposeOneConnection(streamHashCode?,DisconnectedCause.MessageSizeOverflow)?;
????????????????????????
return?;
????????????????????}
????????????????
????????????????????
if(header.MessageBodyLength?>0?)
????????????????????{
????????????????????????
if((header.MessageBodyLength?+?headerLen)?<=?this.recieveBuffSize)
????????????????????????{
????????????????????????????NetHelper.RecieveData(key.NetStream?,key.Buffer?,
0?,header.MessageBodyLength)?;
????????????????????????????requestMsg.Body?
=?key.Buffer?;????????????????????????????
????????????????????????}
????????????????????????
else
????????????????????????{????????????????????????
????????????????????????????rentBuff?
=?this.bufferPool.RentBuffer(header.MessageBodyLength)?;????????????????????????

????????????????????????????NetHelper.RecieveData(key.NetStream?,rentBuff?,
0?,header.MessageBodyLength)?;
????????????????????????????requestMsg.Body?
=?rentBuff?;????????????????????????????
????????????????????????}
????????????????????}
????????????????????
#endregion????????????????????
????????????????
????????????????????
bool?closeConnection?=?false?;
????????????????????NetMessage?resMsg?
=?this.tcpStreamDispatcher.DealRequestData(requestMsg?,ref?closeConnection)?;

????????????????????
if(rentBuff?!=?null)
????????????????????{
????????????????????????
this.bufferPool.GivebackBuffer(rentBuff)?;
????????????????????}

????????????????????
if(closeConnection)
????????????????????{
????????????????????????
this.DisposeOneConnection(streamHashCode?,DisconnectedCause.OtherCause)?;
????????????????????????
return?;
????????????????????}

????????????????????
if((resMsg?!=?null)?&&(!?this.stop))
????????????????????{????????????????????
????????????????????????
byte[]?bRes?=?resMsg.ToStream()?;
????????????????????????key.NetStream.Write(bRes?,
0?,bRes.Length)?;

????????????????????????
if(this.ServiceCommitted?!=?null)
????????????????????????{????????????????????????????????
????????????????????????????
this.ServiceCommitted(streamHashCode?,resMsg)?;
????????????????????????}
????????????????????}
????????????????}
????????????????
catch(Exception?ee)
????????????????{
????????????????????
if(ee?is?System.IO.IOException)?//正在讀寫流的時候,連接斷開
????????????????????{
????????????????????????
this.DisposeOneConnection(streamHashCode?,DisconnectedCause.NetworkError)?;
????????????????????????
break?;
????????????????????}
????????????????????
else
????????????????????{
????????????????????????
this.esbLogger.Log(ee.Message?,"ESFramework.Network.Tcp.AgileTcp"?,ErrorLevel.Standard)?;
????????????????????}

????????????????????ee?
=?ee?;????????????????????
????????????????}

????????????}

????????????key.IsDataManaging?
=?false?;
????????}

??? AgileTcp組件的主要原理差不多就這些了,這種實現(xiàn)有個缺點,不知大家發(fā)現(xiàn)沒有。那就是當(dāng)客戶端主動斷開連接或掉線時,AgileTcp組件可能感受不到(除非對應(yīng)的連接上正在發(fā)送或接收數(shù)據(jù),此時會拋出異常),因為當(dāng)連接斷開時,key.NetStream.DataAvailable不會拋出異常,而是仍然返回false。這是個問題,幸好有補救的辦法,一是要求客戶端下線的時候給服務(wù)器發(fā)送Logoff消息,二是使用定時掉線檢查器(IUserOnLineChecker)。當(dāng)服務(wù)器檢查或發(fā)現(xiàn)某用戶下線時,即可調(diào)用ITcpClientsController.DisposeOneConnection方法來釋放對應(yīng)的連接和Context。(你應(yīng)該還記得ITcp接口是從ITcpClientsController繼承的)。關(guān)于這個問題,你有更好的解決辦法嗎?
??? 感謝關(guān)注!

上一篇文章:ESFramework介紹之(21)-- Tcp組件接口ITcp介紹

轉(zhuǎn)到??:ESFramework 可復(fù)用的通信框架(序)



轉(zhuǎn)載于:https://www.cnblogs.com/zhuweisky/archive/2006/04/13/374025.html

總結(jié)

以上是生活随笔為你收集整理的ESFramework介绍之(23)―― AgileTcp的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。