日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

UDT 最新源码分析(二) -- 开始与终止

發布時間:2023/12/14 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 UDT 最新源码分析(二) -- 开始与终止 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

UDT 最新源碼分析 -- 開始與終止

  • UDT 開始與終止
    • 開始流程
    • 終止流程

UDT 開始與終止

開始流程

UDT:: startup -> CUDT::startup -> CUDTUnited::startup

初始化UDT庫,多次調用時,調用計數增加,但實際上僅僅初始化一次。對于windows下,還需要初始化網絡庫WSAStartup。建立 garbageCollect 線程,用于清理失效socket,并刪除分發器。

int CUDTUnited::startup() {CGuard gcinit(m_InitLock); //獲取初始鎖if (m_iInstanceCount++ > 0) //實例被初始化的計數器,僅初始化一次return 0;if (m_bGCStatus) //garbageCollect狀態,保證線程不會被建立多個return true;m_bClosing = false; //CUDTUnited狀態, 此狀態下 garbageCollect內 while循環不退出pthread_mutex_init(&m_GCStopLock, NULL);pthread_cond_init(&m_GCStopCond, NULL);pthread_create(&m_GCThread, NULL, garbageCollect, this); //garbageCollect 線程創建m_bGCStatus = true;return 0; }

再來看 garbageCollect:

void* CUDTUnited::garbageCollect(void* p) {CUDTUnited* self = (CUDTUnited*)p; //獲取CUDTUnited實例,類型轉化CGuard gcguard(self->m_GCStopLock);//資源清理的鎖while (!self->m_bClosing) //初始 m_bClosing = false,無限循環{//當UDT協議判斷某一個UDT SOCKET的狀態不正確時,會將其狀態設置為BROKEN,并在這個函數中進行處理self->checkBrokenSockets();//睡眠等待,等待下一次可以清理出現BROKEN狀態的UDT SOCKET。睡眠時間 timeout: 1spthread_cond_timedwait(&self->m_GCStopCond, &self->m_GCStopLock, &timeout);}//remove all sockets and multiplexers. m_bClosing = true, 清理目前依舊殘余的資源 CGuard::enterCS(self->m_ControlLock);//遍歷 m_Sockets 中的 UDT socket,設置為關閉,存入 m_ClosedSockets map,下一步處理連接記錄。//查找 Listener, 將 該socket 在Listener中的 m_pQueuedSockets 與 m_pAcceptSockets中記錄刪除for (map<UDTSOCKET, CUDTSocket*>::iterator i = self->m_Sockets.begin(); i != self->m_Sockets.end(); ++ i){i->second->m_pUDT->m_bBroken = true; //將CUDT* 的狀態設置為 BROKEN,后續進行處理i->second->m_pUDT->close(); //關閉i->second->m_Status = CLOSED;//設置狀態為關閉i->second->m_TimeStamp = CTimer::getTime(); //調整最后一次操作 UDT socket 的時間self->m_ClosedSockets[i->first] = i->second;//將當前描述連接的CUDT*保存至 m_ClosedSockets// remove from listener's queuemap<UDTSOCKET, CUDTSocket*>::iterator ls = self->m_Sockets.find(i->second->m_ListenSocket);if (ls == self->m_Sockets.end()) //如果沒有找到Listener, m_ClosedSockets中繼續查找{ls = self->m_ClosedSockets.find(i->second->m_ListenSocket);if (ls == self->m_ClosedSockets.end()) //如果沒有找到,就不再處理continue;}CGuard::enterCS(ls->second->m_AcceptLock);//獲取Listener中的鎖ls->second->m_pQueuedSockets->erase(i->second->m_SocketID);//清理接收連接但還未接受的排隊 UDT socketls->second->m_pAcceptSockets->erase(i->second->m_SocketID);//清理已完成連接的隊列中UDT socketCGuard::leaveCS(ls->second->m_AcceptLock);}self->m_Sockets.clear();//最后刪除m_Sockets中的所有sockets//最后再次遍歷待關閉的socket隊列for (map<UDTSOCKET, CUDTSocket*>::iterator j = self->m_ClosedSockets.begin(); j != self->m_ClosedSockets.end(); ++ j){j->second->m_TimeStamp = 0;//設置狀態為 0}CGuard::leaveCS(self->m_ControlLock);while (true){//之前只是將即將清理的UDT socket 狀態設置為BROKEN,此時對BROKEN狀態的socket進行清理self->checkBrokenSockets();CGuard::enterCS(self->m_ControlLock);bool empty = self->m_ClosedSockets.empty(); //判斷待關閉socket是否已經全部清理CGuard::leaveCS(self->m_ControlLock);if (empty)//如果為empty,就可以直接退出break;CTimer::sleep();//不行的話,再歇一會,再次進行處理}return NULL; }

上面代碼中出現了兩次 checkBrokenSockets,該方法用于清理處于BROKEN狀態的 UDT socket。

  • 檢查所有的 UDT socket
    • 如果不是處于broken狀態,查找下一個,如果處于broken狀態:
      • 如果 socket 處于 LISTENING 狀態,須再等待3s,以防止有客戶端正在連接
      • 如果 recvbuffer 存在數據,且 brokencount 計數器仍大于0,繼續等待更長的時間
      • 否則:
        • 設置socket為CLOSED狀態,更新 m_TimeStamp 為當前時間, 開啟移除定時器。
        • 將當前socket加入待關閉 tbc vector, socket加入臨時存儲closed socket的 m_ClosedSockets map.
        • 查找 Listener, 從m_pQueuedSockets 和 m_pAcceptSockets 移除接收到的socket 連接 socket。
  • 對于移入臨時存儲待關閉的 m_ClosedSockets map,檢查每個socket,
    • 如果 m_ullLingerExpiration > 0,表示在發送緩沖中存在數據時,GC設置了延遲關閉的時間
      • 如果發送緩沖不存在,或為空,或設置關閉時間已經超時:
        • 將m_ullLingerExpiration設置為0;設置UDT socket為關閉狀態,在下一次GC中將被回收。更新關閉時間為當前時間。
    • 否則:
      • 如果標記關閉時間已經超過1s,且socket對應接收隊列信息節點已經被刪除,或者節點不存在list中
      • 將UDT socket加入tbr 移除隊列
  • 遍歷tbc,從 m_Sockets中刪除。遍歷tbr, 將這些超時的 socket 移除。
void CUDTUnited::checkBrokenSockets() {CGuard cg(m_ControlLock);//獲取GC鎖// set of sockets To Be Closed and To Be Removedvector<UDTSOCKET> tbc; //收集處于Closed狀態的 UDT SOCKETvector<UDTSOCKET> tbr; //收集處于Removed狀態的 UDT SOCKETfor (map<UDTSOCKET, CUDTSocket*>::iterator i = m_Sockets.begin(); i != m_Sockets.end(); ++ i){// check broken connectionif (i->second->m_pUDT->m_bBroken) //如果處于BROKEN狀態{if (i->second->m_Status == LISTENING) //如果是LISTENING UDT SOCKET{// for a listening socket, it should wait an extra 3 seconds in case a client is connectingif (CTimer::getTime() - i->second->m_TimeStamp < 3000000)continue;}else if ((i->second->m_pUDT->m_pRcvBuffer != NULL) && (i->second->m_pUDT->m_pRcvBuffer->getRcvDataSize() > 0) && (i->second->m_pUDT->m_iBrokenCounter -- > 0)){// if there is still data in the receiver buffer, wait longercontinue;//如果緩沖區中依舊有數據,應該等待更長的時間}//close broken connections and start removal timeri->second->m_Status = CLOSED; //將狀態設置為CLOSEDi->second->m_TimeStamp = CTimer::getTime(); //設置UDT SOCKET的關閉時間tbc.push_back(i->first);//將這個UDT SOCKET添加進Closed Array,稍后處理m_ClosedSockets[i->first] = i->second;//將這個UDT SOCKET添加進CLOSED UDT SOCKET MAP// remove from listener's queuemap<UDTSOCKET, CUDTSocket*>::iterator ls = m_Sockets.find(i->second->m_ListenSocket);if (ls == m_Sockets.end()){ls = m_ClosedSockets.find(i->second->m_ListenSocket);if (ls == m_ClosedSockets.end())continue;}CGuard::enterCS(ls->second->m_AcceptLock);ls->second->m_pQueuedSockets->erase(i->second->m_SocketID);ls->second->m_pAcceptSockets->erase(i->second->m_SocketID);CGuard::leaveCS(ls->second->m_AcceptLock);}}for (map<UDTSOCKET, CUDTSocket*>::iterator j = m_ClosedSockets.begin(); j != m_ClosedSockets.end(); ++ j){if (j->second->m_pUDT->m_ullLingerExpiration > 0) //如果還沒有到等待關閉時間{// asynchronous close: if ((NULL == j->second->m_pUDT->m_pSndBuffer) || (0 == j->second->m_pUDT->m_pSndBuffer->getCurrBufSize()) || (j->second->m_pUDT->m_ullLingerExpiration <= CTimer::getTime())){ //如果發送緩沖區為空,接收緩沖區為空或者等待關閉時間小于0,調整狀態為CLOSED直接關閉j->second->m_pUDT->m_ullLingerExpiration = 0;j->second->m_pUDT->m_bClosing = true; //更新Closed狀態,由下一次啟動的GC線程回收j->second->m_TimeStamp = CTimer::getTime();//更新關閉的時間}}// timeout 1 second to destroy a socket AND it has been removed from RcvUListif ((CTimer::getTime() - j->second->m_TimeStamp > 1000000) && ((NULL == j->second->m_pUDT->m_pRNode) || !j->second->m_pUDT->m_pRNode->m_bOnList)){tbr.push_back(j->first);}}// move closed sockets to the ClosedSockets structurefor (vector<UDTSOCKET>::iterator k = tbc.begin(); k != tbc.end(); ++ k)m_Sockets.erase(*k); // remove those timeout socketsfor (vector<UDTSOCKET>::iterator l = tbr.begin(); l != tbr.end(); ++ l)removeSocket(*l); }

如果 m_ClosedSockets 中UDT socket 已經超過超過1s, 就會放入 tbr。遍歷 tbr, 移除所有socket,更新相關資源。這里用到了removeSocket 方法。

  • 如果UDT socket 是一個 Listener,將所有收到的但未接受的socket 關閉,設置為 BROKEN 狀態,等待移除。
  • 刪除 m_PeerRec 中記錄的連接。
  • 刪除 UDT socket, 檢查對應的復用器,計數器減一,如果已經為0, 則關閉channel,清理復用器內的資源。
void CUDTUnited::removeSocket(const UDTSOCKET u) {map<UDTSOCKET, CUDTSocket*>::iterator i = m_ClosedSockets.find(u);// invalid socket IDif (i == m_ClosedSockets.end())return;// 由于是多個UDT實例共享一個資源復用器,銷毀時要減少引用計數// decrease multiplexer reference count, and remove it if necessaryconst int mid = i->second->m_iMuxID;if (NULL != i->second->m_pQueuedSockets){CGuard::enterCS(i->second->m_AcceptLock);// if it is a listener, close all un-accepted sockets in its queue and remove them laterfor (set<UDTSOCKET>::iterator q = i->second->m_pQueuedSockets->begin(); q != i->second->m_pQueuedSockets->end(); ++ q){m_Sockets[*q]->m_pUDT->m_bBroken = true; //1. 將CUDT*的狀態設置為BROKENm_Sockets[*q]->m_pUDT->close(); //2. 調用CUDT中的close()m_Sockets[*q]->m_TimeStamp = CTimer::getTime();//3. 更新UDT SOCKET的關閉時間m_Sockets[*q]->m_Status = CLOSED; //4. 將UDT SOCKET設置為Closedm_ClosedSockets[*q] = m_Sockets[*q]; //5. 在Closed Array中添加當前UDT SOCKET,在GC線程中進行處理m_Sockets.erase(*q); //6. 從全局的MAP中刪除}CGuard::leaveCS(i->second->m_AcceptLock);}// remove from peer recmap<int64_t, set<UDTSOCKET> >::iterator j = m_PeerRec.find((i->second->m_PeerID << 30) + i->second->m_iISN);if (j != m_PeerRec.end()){j->second.erase(u);if (j->second.empty())m_PeerRec.erase(j);}// delete this onei->second->m_pUDT->close();delete i->second;m_ClosedSockets.erase(i);map<int, CMultiplexer>::iterator m;m = m_mMultiplexer.find(mid);if (m == m_mMultiplexer.end())//如果這個資源復用器不存在,直接返回{//something is wrong!!!return;}m->second.m_iRefCount --;//否則的話,減少這個資源復用器的引用計數if (0 == m->second.m_iRefCount){m->second.m_pChannel->close(); //與UDP SOCKET關聯的Channel直接關閉delete m->second.m_pSndQueue; //清理資源復用器的資源delete m->second.m_pRcvQueue;delete m->second.m_pTimer;delete m->second.m_pChannel;m_mMultiplexer.erase(m);} }

終止流程

  • UDT::cleanup -> CUDT::cleanup -> CUDTUnited::cleanup

終止流程與開始流程相反,在計數器清零以后才會真正退出,每次調用,計數減一。修改 m_bClosing 與 m_bGCStatus 狀態,終止垃圾回收線程。

int CUDTUnited::cleanup() {CGuard gcinit(m_InitLock);if (--m_iInstanceCount > 0) //計數器減一return 0;if (!m_bGCStatus) return 0;m_bClosing = true;pthread_cond_signal(&m_GCStopCond);pthread_join(m_GCThread, NULL);pthread_mutex_destroy(&m_GCStopLock);pthread_cond_destroy(&m_GCStopCond);m_bGCStatus = false;return 0; }

對UDT庫的初始化與終止流程分析結束,在例程中,基本上都是顯示調用這兩個接口,但是在 appclient.cpp 中,通過構造函數與析構函數隱式完成。兩種方式均可,可根據需要自選。

struct UDTUpDown{UDTUpDown(){// use this function to initialize the UDT libraryUDT::startup();}~UDTUpDown(){// use this function to release the UDT libraryUDT::cleanup();} };

總結

以上是生活随笔為你收集整理的UDT 最新源码分析(二) -- 开始与终止的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。