日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

UDT 最新源码分析(五) -- 网络数据收发

發布時間:2023/12/14 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 UDT 最新源码分析(五) -- 网络数据收发 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

UDT 最新源碼分析 -- 網絡數據收發

  • 從接口實現看 UDT 網絡收發
    • UDT 發送 send / sendmsg / sendfile
    • UDT 接收 recv /recvmsg /recvfile
  • 從內部實現看 UDT 網絡收發
    • UDT 發送工作線程
    • UDT 接收工作線程

從接口實現看 UDT 網絡收發

從對外的接口實現方法來看,網絡收發過程實際上是對 m_pSndBuffer 和 m_pRcvBuffer 進行操作,而實際的網絡收發涉及到系統調度,算法實現等問題。簡單來看看代碼。

UDT 發送 send / sendmsg / sendfile

以 send 為例,外部接口調用send 其實并不是直接發送到網絡,而是將數據加入發送的 buffer 中,后續再通過調度將數據發送到網絡中去。send 僅僅針對流傳輸模式而言,其他模式不可調用此函數。對于數據包模式,應該調用 sendmsg。

CUDT::send(UDTSOCKET u, const char* buf, int len, int)
-> CUDT::send(const char* data, int len)

int CUDT::send(const char* data, int len) {...if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()) //buffer 已滿{// 檢查buffer狀態,等待滿足條件被觸發。// 檢查網絡連接狀態,以及UDT 是否關閉等狀態。}int size = (m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize; //最大可用if (size > len)size = len; //size 為本次需要填充的字節,最大為可用容量// record total time used for sendingif (0 == m_pSndBuffer->getCurrBufSize())m_llSndDurationCounter = CTimer::getTime();// insert the user buffer into the sending listm_pSndBuffer->addBuffer(data, size); //重點代碼,發送過程其實只是放入buffer// insert this socket to snd list if it is not on the list yetm_pSndQueue->m_pSndUList->update(this, false);if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize()){// write is not available any mores_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, false);}return size; }

sendmsg 與 send 函數有非常多代碼一致,核心代碼基本上沒有變化。sendfile 中 addbuffer 變成 addBufferFromFile,其余基本沒變化。

UDT 接收 recv /recvmsg /recvfile

從接口調用 recv 實際上只是從接收緩沖中取出數據,在獲取數據會檢查當前是否流模式,如果沒數據,或啟動條件喚醒和定時等待等,也會檢查網絡連接是否正常。

int CUDT::recv(char* data, int len) {... if (0 == m_pRcvBuffer->getRcvDataSize()) // buffer 為空{... //等待條件滿足或者超時}int res = m_pRcvBuffer->readBuffer(data, len);if (m_pRcvBuffer->getRcvDataSize() <= 0){// read is not available any mores_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, false); //刪除}if ((res <= 0) && (m_iRcvTimeOut >= 0))throw CUDTException(6, 3, 0);return res; }

從內部實現看 UDT 網絡收發

從接口上可以看到,發送接收僅僅是將數據與buffer進行交互,看不到數據真正進行發送接收的地方。那么在內部究竟如何實現的呢?在以前的文章分析中已經提到過發送接收工作線程的概念,在這里再次看看,代碼參考 queue.cpp。

初始化的地方如下,通過調用 m_pSndQueue 和 m_pRcvQueue 調用 init 實現 worker 線程創建:

void CUDTUnited::updateMux(CUDTSocket* s, const sockaddr* addr, const UDPSOCKET* udpsock) {...CMultiplexer m;m.m_iID = s->m_SocketID;m.m_pChannel = new CChannel(s->m_pUDT->m_iIPversion);m.m_pChannel->setSndBufSize(s->m_pUDT->m_iUDPSndBufSize);m.m_pChannel->setRcvBufSize(s->m_pUDT->m_iUDPRcvBufSize);try{if (NULL != udpsock)m.m_pChannel->open(*udpsock);elsem.m_pChannel->open(addr);}catch (CUDTException& e){m.m_pChannel->close();delete m.m_pChannel;throw e;}m.m_pTimer = new CTimer;m.m_pSndQueue = new CSndQueue;m.m_pSndQueue->init(m.m_pChannel, m.m_pTimer);m.m_pRcvQueue = new CRcvQueue;m.m_pRcvQueue->init(32, s->m_pUDT->m_iPayloadSize, m.m_iIPversion, 1024, m.m_pChannel, m.m_pTimer);m_mMultiplexer[m.m_iID] = m; }

UDT 發送工作線程

發送線程中主要的變量有 m_pSndUList, m_pChannel, m_pTimer。線程的工作就是不停的檢查 m_pSndUList 中的UDT 實例,取出包,通過 m_pChannel 發送出去。如果取出的包時發現未到發送時間,則通過 m_pTimer sleep 剩余的時間再發送。

創建線程如下所示:

void CSndQueue::init(CChannel* c, CTimer* t) {m_pChannel = c;m_pTimer = t;m_pSndUList = new CSndUList;m_pSndUList->m_pWindowLock = &m_WindowLock;m_pSndUList->m_pWindowCond = &m_WindowCond;m_pSndUList->m_pTimer = m_pTimer;#ifndef WIN32if (0 != pthread_create(&m_WorkerThread, NULL, CSndQueue::worker, this)){m_WorkerThread = 0;throw CUDTException(3, 1);}#elseDWORD threadID;m_WorkerThread = CreateThread(NULL, 0, CSndQueue::worker, this, 0, &threadID);if (NULL == m_WorkerThread)throw CUDTException(3, 1);#endif }

根據前面的描述,接下來理解發送工作線程運行過程。getNextProcTime 實際上就是獲取 m_pHeap[0] 的 m_llTimeStamp。這個時間就是即將要發送的數據的時間。sleepto 等待時間到達。pop 則是初始化 CPacket,然后再發送。如果 ts <= 0,代表當前并無數據需要發送,需要繼續等待。

#ifndef WIN32void* CSndQueue::worker(void* param) #elseDWORD WINAPI CSndQueue::worker(LPVOID param) #endif {CSndQueue* self = (CSndQueue*)param;while (!self->m_bClosing){uint64_t ts = self->m_pSndUList->getNextProcTime(); //獲取下一次發送時間if (ts > 0){// wait until next processing time of the first socket on the listuint64_t currtime;CTimer::rdtsc(currtime);if (currtime < ts) //時間未到self->m_pTimer->sleepto(ts); //sleep, 控制包與包之間的發送間隔// it is time to send the next pktsockaddr* addr;CPacket pkt;if (self->m_pSndUList->pop(addr, pkt) < 0)continue;self->m_pChannel->sendto(addr, pkt);}else{// wait here if there is no sockets with data to be sent#ifndef WIN32pthread_mutex_lock(&self->m_WindowLock);if (!self->m_bClosing && (self->m_pSndUList->m_iLastEntry < 0))pthread_cond_wait(&self->m_WindowCond, &self->m_WindowLock);pthread_mutex_unlock(&self->m_WindowLock);#elseWaitForSingleObject(self->m_WindowCond, INFINITE);#endif}}#ifndef WIN32return NULL;#elseSetEvent(self->m_ExitCond);return 0;#endif }

Retrieve the next packet and peer address from the first entry, and reschedule it in the queue.
在線程循環塊內,出現了 pop 方法。這個方法取出 m_pHeap 中的根節點,檢查時間戳,若時間已到,在堆中刪除該節點,進入 packData。

int CSndUList::pop(sockaddr*& addr, CPacket& pkt) {CGuard listguard(m_ListLock);if (-1 == m_iLastEntry) //m_pHeap中為空return -1;// no pop until the next schedulled timeuint64_t ts;CTimer::rdtsc(ts);if (ts < m_pHeap[0]->m_llTimeStamp)return -1;CUDT* u = m_pHeap[0]->m_pUDT;remove_(u);if (!u->m_bConnected || u->m_bBroken)return -1;// pack a packet from the socketif (u->packData(pkt, ts) <= 0)return -1;addr = u->m_pPeerAddr;// insert a new entry, ts is the next processing timeif (ts > 0)insert_(ts, u);return 1; }

m_pHeap 是一個以節點時間為參考建立的最小堆。所有的插入與刪除操作均為堆的操作,需要注意的是,孩子節點與根節點的對應關系。對于根節點 q 來說,左孩子序號為 2 * q + 1, 右孩子為 2 * q + 2,這也是代碼中的 p 節點值。

首先看刪除某節點的操作:

void CSndUList::remove_(const CUDT* u) {CSNode* n = u->m_pSNode;if (n->m_iHeapLoc >= 0){// remove the node from heap 最后節點與被刪節點交換m_pHeap[n->m_iHeapLoc] = m_pHeap[m_iLastEntry];m_iLastEntry --;m_pHeap[n->m_iHeapLoc]->m_iHeapLoc = n->m_iHeapLoc;int q = n->m_iHeapLoc; //被刪位置上新節點int p = q * 2 + 1; //左孩子序號while (p <= m_iLastEntry) // 存在左孩子節點{// 存在右孩子,且左孩子時間戳大于右孩子時間戳,則修改當前孩子為右孩子if ((p + 1 <= m_iLastEntry) && (m_pHeap[p]->m_llTimeStamp > m_pHeap[p + 1]->m_llTimeStamp))p ++;// 如果根節點時間戳大于孩子中最小時間戳節點,則交換,并置當前節點為新的根節點的左孩子if (m_pHeap[q]->m_llTimeStamp > m_pHeap[p]->m_llTimeStamp){CSNode* t = m_pHeap[p];m_pHeap[p] = m_pHeap[q];m_pHeap[p]->m_iHeapLoc = p;m_pHeap[q] = t;m_pHeap[q]->m_iHeapLoc = q;q = p;p = q * 2 + 1;}elsebreak;}n->m_iHeapLoc = -1;}// the only event has been deleted, wake up immediatelyif (0 == m_iLastEntry)m_pTimer->interrupt(); }

對于插入操作,只要記住節點序號關系,就很容易看明白了。父節點 p 為孩子節點 (q-1)/2。 如果還不明白,只能去復習一下堆的數據結構相關知識。

void CSndUList::insert_(int64_t ts, const CUDT* u) {CSNode* n = u->m_pSNode;// do not insert repeated nodeif (n->m_iHeapLoc >= 0) return;//插入增加到最后節點m_iLastEntry ++; m_pHeap[m_iLastEntry] = n;n->m_llTimeStamp = ts;//開始調整int q = m_iLastEntry;int p = q;while (p != 0){p = (q - 1) >> 1; //父節點if (m_pHeap[p]->m_llTimeStamp > m_pHeap[q]->m_llTimeStamp){CSNode* t = m_pHeap[p];m_pHeap[p] = m_pHeap[q];m_pHeap[q] = t;t->m_iHeapLoc = q;q = p;}elsebreak;}n->m_iHeapLoc = q;// an earlier event has been inserted, wake up sending workerif (n->m_iHeapLoc == 0)m_pTimer->interrupt();// first entry, activate the sending queueif (0 == m_iLastEntry){#ifndef WIN32pthread_mutex_lock(m_pWindowLock);pthread_cond_signal(m_pWindowCond); //喚醒線程pthread_mutex_unlock(m_pWindowLock);#elseSetEvent(*m_pWindowCond);#endif} }

在發送線程中還有一個 packData 方法,處理了兩類 packet 的讀取,一是丟失的 packet,二是正常的順序傳輸的包。處理過程:

  • 獲取 entertime, 更新 m_ullTimeDiff, 即記錄當前發包對應目標時間的差值,會影響到下一次發包的目標時間。UDT 以此使得發包的時間間隔始終控制在算法之中。

    在 UDT 中,在開始的時候會初始化一個發包時間間隔 m_ullInterval ,這個值表示期望的發送時間間隔。初始化如下所示:

    m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);

    m_ullInterval 并不是一個固定的值,而是根據網絡狀態進行調整。比如在 processCtrl 中 收到包類型為 4 時,就會改變。但是查找代碼可以發現,當前udt 版本不再執行 sendCtrl(4),代碼詳見包類型為6 時,代碼已經被注釋。但是無用代碼并未刪除,如下所示。

    // One way packet delay is increasing, so decrease the sending rate m_ullInterval = (uint64_t)ceil(m_ullInterval * 1.125);

    在擁塞控制中 CCUpdate 改變 m_ullInterva 值:

    m_ullInterval = (uint64_t) (m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);
    m_dCongestionWindow = m_pCC->m_dCWndSize;
    if (m_llMaxBW <= 0)
    ??return;
    const double minSP = 1000000.0 / (double(m_llMaxBW) / m_iMSS) * m_ullCPUFrequency;
    if (m_ullInterval < minSP)
    ??m_ullInterval = minSP;

    在UDT中,包發送會有一個隨著網絡狀況調整的一個發送周期,也就是 m_ullInterva 值。在每一次發送包時,都會根據 m_ullInterval值計算下一次包發送的理想時間間隔,并修改 m_ullTargetTime 值。

  • 檢查是否丟包。

    • 如果丟包,就將 packet.m_iSeqNo 賦值為丟包的序號值。然后計算 offset。m_iSndLastDataAck 是在接收到最后一個 ack 時更新的序號,之前的所有包都被確認。如果 offset < 0, 表示上次確認序號大于丟包序號,即有包未收到但是被確認,可能出現錯誤。讀取數據如果失敗,就會發送丟棄請求,并更新 m_iSndCurrSeqNo。

    • 如果沒有丟包,則發送一個新包。根據流窗口與擁塞窗口更新 cwnd 值。若發送包序號在窗口范圍內,則 readData 并且更新本地和 ccc中 m_iSndCurrSeqNo,更新 m_iSeqNo,檢查是否需要發送包對探測。

    • 更新 packet 與 cc,更新 ts, m_ullTargetTime。包將在 worker 中被發送

  • int CUDT::packData(CPacket& packet, uint64_t& ts) {int payload = 0;bool probe = false;uint64_t entertime;CTimer::rdtsc(entertime);if ((0 != m_ullTargetTime) && (entertime > m_ullTargetTime))m_ullTimeDiff += entertime - m_ullTargetTime;// Loss retransmission always has higher priority.if ((packet.m_iSeqNo = m_pSndLossList->getLostSeq()) >= 0) //發現丟包,可能超時或者 NACK回應消息{// protect m_iSndLastDataAck from updating by ACK processingCGuard ackguard(m_AckLock);int offset = CSeqNo::seqoff(m_iSndLastDataAck, packet.m_iSeqNo); if (offset < 0)return 0;int msglen;//重新取數據payload = m_pSndBuffer->readData(&(packet.m_pcData), offset, packet.m_iMsgNo, msglen);if (-1 == payload){int32_t seqpair[2];seqpair[0] = packet.m_iSeqNo;seqpair[1] = CSeqNo::incseq(seqpair[0], msglen);sendCtrl(7, &packet.m_iMsgNo, seqpair, 8);// only one msg drop request is necessarym_pSndLossList->remove(seqpair[1]);// skip all dropped packetsif (CSeqNo::seqcmp(m_iSndCurrSeqNo, CSeqNo::incseq(seqpair[1])) < 0)m_iSndCurrSeqNo = CSeqNo::incseq(seqpair[1]);return 0;}else if (0 == payload)return 0;++ m_iTraceRetrans;++ m_iRetransTotal;}else{// If no loss, pack a new packet.// check congestion/flow window limitint cwnd = (m_iFlowWindowSize < (int)m_dCongestionWindow) ? m_iFlowWindowSize : (int)m_dCongestionWindow;if (cwnd >= CSeqNo::seqlen(m_iSndLastAck, CSeqNo::incseq(m_iSndCurrSeqNo))){if (0 != (payload = m_pSndBuffer->readData(&(packet.m_pcData), packet.m_iMsgNo))){m_iSndCurrSeqNo = CSeqNo::incseq(m_iSndCurrSeqNo);m_pCC->setSndCurrSeqNo(m_iSndCurrSeqNo);packet.m_iSeqNo = m_iSndCurrSeqNo;// every 16 (0xF) packets, a packet pair is sentif (0 == (packet.m_iSeqNo & 0xF))probe = true;}else{m_ullTargetTime = 0;m_ullTimeDiff = 0;ts = 0;return 0;}}else{m_ullTargetTime = 0;m_ullTimeDiff = 0;ts = 0;return 0;}}packet.m_iTimeStamp = int(CTimer::getTime() - m_StartTime);packet.m_iID = m_PeerID;packet.setLength(payload);m_pCC->onPktSent(&packet);//m_pSndTimeWindow->onPktSent(packet.m_iTimeStamp);++ m_llTraceSent;++ m_llSentTotal;if (probe){// sends out probing packet pairts = entertime;probe = false;}else{#ifndef NO_BUSY_WAITINGts = entertime + m_ullInterval;#elseif (m_ullTimeDiff >= m_ullInterval){ts = entertime;m_ullTimeDiff -= m_ullInterval;}else{ts = entertime + m_ullInterval - m_ullTimeDiff;m_ullTimeDiff = 0;}#endif}m_ullTargetTime = ts;return payload; }

    UDT 接收工作線程

    接收工作線程的主要工作同樣在 while 循環中完成。首先檢查是否有新的 socket 到來,如果有,則不斷加入 m_pRcvUList,同時添加到 m_pHash 中。然后再 m_UnitQueue 中查找是否存在可用的存儲塊,在此過程中如果發現已經數量太多會自動擴容。不斷的通過 recvfrom 接收包。

    如果是連接請求, 將被送給 listening socket 或者 rendezvous sockets,對應將進入 listen 或者 connect 操作。否則, 根據 getFlag 判斷,進入 processData 或者 processCtrl。這也是接收數據被處理的核心函數。最后將這個 UDT實例 放入 m_pRcvUList 最后。

    #ifndef WIN32void* CRcvQueue::worker(void* param) #elseDWORD WINAPI CRcvQueue::worker(LPVOID param) #endif {CRcvQueue* self = (CRcvQueue*)param;sockaddr* addr = (AF_INET == self->m_UnitQueue.m_iIPversion) ? (sockaddr*) new sockaddr_in : (sockaddr*) new sockaddr_in6;CUDT* u = NULL;int32_t id;while (!self->m_bClosing){#ifdef NO_BUSY_WAITINGself->m_pTimer->tick();#endif// check waiting list, if new socket, insert it to the listwhile (self->ifNewEntry()){CUDT* ne = self->getNewEntry();if (NULL != ne){self->m_pRcvUList->insert(ne);self->m_pHash->insert(ne->m_SocketID, ne);}}// find next available slot for incoming packetCUnit* unit = self->m_UnitQueue.getNextAvailUnit();if (NULL == unit){// no space, skip this packetCPacket temp;temp.m_pcData = new char[self->m_iPayloadSize];temp.setLength(self->m_iPayloadSize);self->m_pChannel->recvfrom(addr, temp);delete [] temp.m_pcData;goto TIMER_CHECK;}unit->m_Packet.setLength(self->m_iPayloadSize);// reading next incoming packet, recvfrom returns -1 is nothing has been receivedif (self->m_pChannel->recvfrom(addr, unit->m_Packet) < 0)goto TIMER_CHECK;id = unit->m_Packet.m_iID;// ID 0 is for connection request, which should be passed to the listening socket or rendezvous socketsif (0 == id){if (NULL != self->m_pListener)self->m_pListener->listen(addr, unit->m_Packet);else if (NULL != (u = self->m_pRendezvousQueue->retrieve(addr, id))){// asynchronous connect: call connect here// otherwise wait for the UDT socket to retrieve this packetif (!u->m_bSynRecving)u->connect(unit->m_Packet);elseself->storePkt(id, unit->m_Packet.clone());}}else if (id > 0){if (NULL != (u = self->m_pHash->lookup(id))){if (CIPAddress::ipcmp(addr, u->m_pPeerAddr, u->m_iIPversion)){if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing){if (0 == unit->m_Packet.getFlag())u->processData(unit);elseu->processCtrl(unit->m_Packet);u->checkTimers();self->m_pRcvUList->update(u);}}}else if (NULL != (u = self->m_pRendezvousQueue->retrieve(addr, id))){if (!u->m_bSynRecving)u->connect(unit->m_Packet);elseself->storePkt(id, unit->m_Packet.clone());}}TIMER_CHECK:// take care of the timing event for all UDT socketsuint64_t currtime;CTimer::rdtsc(currtime);CRNode* ul = self->m_pRcvUList->m_pUList;uint64_t ctime = currtime - 100000 * CTimer::getCPUFrequency();while ((NULL != ul) && (ul->m_llTimeStamp < ctime)){CUDT* u = ul->m_pUDT;if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing){u->checkTimers();self->m_pRcvUList->update(u);}else{// the socket must be removed from Hash table first, then RcvUListself->m_pHash->remove(u->m_SocketID);self->m_pRcvUList->remove(u);u->m_pRNode->m_bOnList = false;}ul = self->m_pRcvUList->m_pUList;}// Check connection requests status for all sockets in the RendezvousQueue.self->m_pRendezvousQueue->updateConnStatus();}if (AF_INET == self->m_UnitQueue.m_iIPversion)delete (sockaddr_in*)addr;elsedelete (sockaddr_in6*)addr;#ifndef WIN32return NULL;#elseSetEvent(self->m_ExitCond);return 0;#endif }

    checkTimers 會更新 cc 參數,并發送 ack 包,檢查連接是否中斷。在代碼中,NAK 定時器不再生效,僅僅依靠發送方的超時機制。檢測如果16個超時 且 總時間達到閾值才會認為連接掛掉。超時也會導致擁塞控制算法進行調整。

    void CUDT::checkTimers() {// update CC parametersCCUpdate(); //更新發包時間間隔和擁塞窗口uint64_t currtime;CTimer::rdtsc(currtime);if ((currtime > m_ullNextACKTime) || ((m_pCC->m_iACKInterval > 0) && (m_pCC->m_iACKInterval <= m_iPktCount))){// ACK timer expired or ACK interval is reachedsendCtrl(2); //ackCTimer::rdtsc(currtime);if (m_pCC->m_iACKPeriod > 0) //更新 m_ullNextACKTimem_ullNextACKTime = currtime + m_pCC->m_iACKPeriod * m_ullCPUFrequency;elsem_ullNextACKTime = currtime + m_ullACKInt;m_iPktCount = 0;m_iLightACKCount = 1;}else if (m_iSelfClockInterval * m_iLightACKCount <= m_iPktCount){//send a "light" ACKsendCtrl(2, NULL, NULL, 4);++ m_iLightACKCount;}// we are not sending back repeated NAK anymore and rely on the sender's EXP for retransmission//if ((m_pRcvLossList->getLossLength() > 0) && (currtime > m_ullNextNAKTime))//{ // // NAK timer expired, and there is loss to be reported.// sendCtrl(3);//// CTimer::rdtsc(currtime);// m_ullNextNAKTime = currtime + m_ullNAKInt;//} //不再觸發 NAK 定時器,僅僅依靠發送方的重傳超時,應該是為了減少誤丟包識別。uint64_t next_exp_time;if (m_pCC->m_bUserDefinedRTO)next_exp_time = m_ullLastRspTime + m_pCC->m_iRTO * m_ullCPUFrequency;else{uint64_t exp_int = (m_iEXPCount * (m_iRTT + 4 * m_iRTTVar) + m_iSYNInterval) * m_ullCPUFrequency;if (exp_int < m_iEXPCount * m_ullMinExpInt)exp_int = m_iEXPCount * m_ullMinExpInt;next_exp_time = m_ullLastRspTime + exp_int;}if (currtime > next_exp_time){// Haven't receive any information from the peer, is it dead?!// timeout: at least 16 expirations and must be greater than 10 secondsif ((m_iEXPCount > 16) && (currtime - m_ullLastRspTime > 5000000 * m_ullCPUFrequency)){// Connection is broken. // UDT does not signal any information about this instead of to stop quietly.// Application will detect this when it calls any UDT methods next time.m_bClosing = true;m_bBroken = true;m_iBrokenCounter = 30;// update snd U list to remove this socketm_pSndQueue->m_pSndUList->update(this);releaseSynch();// app can call any UDT API to learn the connection_broken errors_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN | UDT_EPOLL_OUT | UDT_EPOLL_ERR, true);CTimer::triggerEvent();return;}// sender: Insert all the packets sent after last received acknowledgement into the sender loss list.// recver: Send out a keep-alive packetif (m_pSndBuffer->getCurrBufSize() > 0){if ((CSeqNo::incseq(m_iSndCurrSeqNo) != m_iSndLastAck) && (m_pSndLossList->getLossLength() == 0)){// resend all unacknowledged packets on timeout, but only if there is no packet in the loss listint32_t csn = m_iSndCurrSeqNo;int num = m_pSndLossList->insert(m_iSndLastAck, csn); m_iTraceSndLoss += num;m_iSndLossTotal += num;}m_pCC->onTimeout();CCUpdate();// immediately restart transmissionm_pSndQueue->m_pSndUList->update(this);}else{sendCtrl(1); //keep-live 包}++ m_iEXPCount; //增加,如果到達16 次,進入超時處理,如果收到確認,則重置為0。// Reset last response time since we just sent a heart-beat.m_ullLastRspTime = currtime;} }

    再回頭看數據處理部分 processData 。

    int CUDT::processData(CUnit* unit) {CPacket& packet = unit->m_Packet;// Just heard from the peer, reset the expiration count.m_iEXPCount = 1; //有收到數據,重置 EXPuint64_t currtime;CTimer::rdtsc(currtime);m_ullLastRspTime = currtime; //更新 m_ullLastRspTimem_pCC->onPktReceived(&packet); //未找到函數的實現++ m_iPktCount;// update time information, 記錄包到達的時間以及上一包時間m_pRcvTimeWindow->onPktArrival(); //記錄的目的用于計算包的到達速率,然后將計算的速率通過ACK反饋回去// check if it is probing packet pair, 用于估計鏈路容量,將計算的容量通過ACK反饋回去if (0 == (packet.m_iSeqNo & 0xF))//檢查是否為包對m_pRcvTimeWindow->probe1Arrival(); //記錄包對中第一個包的到達時間else if (1 == (packet.m_iSeqNo & 0xF))m_pRcvTimeWindow->probe2Arrival(); // 記錄探測包對的時間間隔++ m_llTraceRecv;++ m_llRecvTotal;int32_t offset = CSeqNo::seqoff(m_iRcvLastAck, packet.m_iSeqNo);if ((offset < 0) || (offset >= m_pRcvBuffer->getAvailBufSize()))return -1;if (m_pRcvBuffer->addData(unit, offset) < 0)//將數據包加入到 m_pRcvBufferreturn -1;// Loss detection.if (CSeqNo::seqcmp(packet.m_iSeqNo, CSeqNo::incseq(m_iRcvCurrSeqNo)) > 0){// If loss found, insert them to the receiver loss listm_pRcvLossList->insert(CSeqNo::incseq(m_iRcvCurrSeqNo), CSeqNo::decseq(packet.m_iSeqNo));// pack loss list for NAKint32_t lossdata[2];lossdata[0] = CSeqNo::incseq(m_iRcvCurrSeqNo) | 0x80000000;lossdata[1] = CSeqNo::decseq(packet.m_iSeqNo);// Generate loss report immediately.sendCtrl(3, NULL, lossdata, (CSeqNo::incseq(m_iRcvCurrSeqNo) == CSeqNo::decseq(packet.m_iSeqNo)) ? 1 : 2);int loss = CSeqNo::seqlen(m_iRcvCurrSeqNo, packet.m_iSeqNo) - 2;m_iTraceRcvLoss += loss;m_iRcvLossTotal += loss;}// This is not a regular fixed size packet... //an irregular sized packet usually indicates the end of a message, so send an ACK immediately if (packet.getLength() != m_iPayloadSize)CTimer::rdtsc(m_ullNextACKTime); // Update the current largest sequence number that has been received.// Or it is a retransmitted packet, remove it from receiver loss list.if (CSeqNo::seqcmp(packet.m_iSeqNo, m_iRcvCurrSeqNo) > 0)m_iRcvCurrSeqNo = packet.m_iSeqNo;elsem_pRcvLossList->remove(packet.m_iSeqNo);return 0; }

    接下來看控制消息的處理。這部分的內容可以參考 UDT 最新協議分析.

  • ACK 處理
    • 如果是一個輕量級 ACK,更新 m_iFlowWindowSize 和 m_iSndLastAck, 終止處理。
    • 否則:
      • 使用相同的 ACK 序號返回一個 ACK2 作為確認的確認。更新 m_ullSndLastAck2Time, m_iFlowWindowSize, m_iSndLastDataAck 和 m_iSndLastAck。\
      • 更新發送丟失鏈表,移除已經被確認的所有包序號。\
      • 更新RTT與RTTVar。更新ACK和NAK周期為 4 * RTT + RTTVar + SYN。\
      • 更新發送端緩沖,釋放已經被確認的緩沖。\
      • 更新包到達速率為:A = (A * 7 + a) / 8,其中a為ACK中攜帶的相應值。更新鏈路容量估計值:B = (B * 7 + b) / 8,其中b為ACK中攜帶的相應值。
      • 更新發包間隔 m_ullInterval。
    void CUDT::processCtrl(CPacket& ctrlpkt) {...case 2: //010 - Acknowledgement{int32_t ack;// process a lite ACKif (4 == ctrlpkt.getLength()){ack = *(int32_t *)ctrlpkt.m_pcData;if (CSeqNo::seqcmp(ack, m_iSndLastAck) >= 0){m_iFlowWindowSize -= CSeqNo::seqoff(m_iSndLastAck, ack); //更新 m_iFlowWindowSizem_iSndLastAck = ack;}break;}// read ACK seq. no.ack = ctrlpkt.getAckSeqNo();// send ACK acknowledgement// number of ACK2 can be much less than number of ACKuint64_t now = CTimer::getTime();if ((currtime - m_ullSndLastAck2Time > (uint64_t)m_iSYNInterval) || (ack == m_iSndLastAck2)){sendCtrl(6, &ack); // ack of ack, 對確認包的二次確認m_iSndLastAck2 = ack;m_ullSndLastAck2Time = now;}// Got data ACKack = *(int32_t *)ctrlpkt.m_pcData;// check the validation of the ackif (CSeqNo::seqcmp(ack, CSeqNo::incseq(m_iSndCurrSeqNo)) > 0){//this should not happen: attack or bug . 不應該大于最大發送序號m_bBroken = true;m_iBrokenCounter = 0;break;}if (CSeqNo::seqcmp(ack, m_iSndLastAck) >= 0) //新的數據的 ack{// Update Flow Window Size, must update before and together with m_iSndLastAckm_iFlowWindowSize = *((int32_t *)ctrlpkt.m_pcData + 3);m_iSndLastAck = ack;}// protect packet retransmissionCGuard::enterCS(m_AckLock);int offset = CSeqNo::seqoff(m_iSndLastDataAck, ack);if (offset <= 0){// discard it if it is a repeated ACKCGuard::leaveCS(m_AckLock);break;}// acknowledge the sending bufferm_pSndBuffer->ackData(offset); //僅修改 m_pFirstBlock 指針 和 m_iCount// record total time used for sendingm_llSndDuration += currtime - m_llSndDurationCounter;m_llSndDurationTotal += currtime - m_llSndDurationCounter;m_llSndDurationCounter = currtime;// update sending variablesm_iSndLastDataAck = ack;m_pSndLossList->remove(CSeqNo::decseq(m_iSndLastDataAck));//重點函數,后續介紹CGuard::leaveCS(m_AckLock);#ifndef WIN32pthread_mutex_lock(&m_SendBlockLock);if (m_bSynSending)pthread_cond_signal(&m_SendBlockCond);pthread_mutex_unlock(&m_SendBlockLock);#elseif (m_bSynSending)SetEvent(m_SendBlockCond);#endif// acknowledde any waiting epolls to writes_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true);// insert this socket to snd list if it is not on the list yetm_pSndQueue->m_pSndUList->update(this, false);// Update RTT//m_iRTT = *((int32_t *)ctrlpkt.m_pcData + 1);//m_iRTTVar = *((int32_t *)ctrlpkt.m_pcData + 2);int rtt = *((int32_t *)ctrlpkt.m_pcData + 1);m_iRTTVar = (m_iRTTVar * 3 + abs(rtt - m_iRTT)) >> 2;m_iRTT = (m_iRTT * 7 + rtt) >> 3;m_pCC->setRTT(m_iRTT); //更新cc m_iRTTif (ctrlpkt.getLength() > 16){// Update Estimated Bandwidth and packet delivery rateif (*((int32_t *)ctrlpkt.m_pcData + 4) > 0)m_iDeliveryRate = (m_iDeliveryRate * 7 + *((int32_t *)ctrlpkt.m_pcData + 4)) >> 3;if (*((int32_t *)ctrlpkt.m_pcData + 5) > 0)m_iBandwidth = (m_iBandwidth * 7 + *((int32_t *)ctrlpkt.m_pcData + 5)) >> 3;m_pCC->setRcvRate(m_iDeliveryRate); //更新cc m_iRcvRatem_pCC->setBandwidth(m_iBandwidth); //更新cc m_iBandwidth}m_pCC->onACK(ack); //更新cc m_dPktSndPeriod,進一步影響發包間隔計算 m_ullIntervalCCUpdate(); //重新計算 m_ullInterval++ m_iRecvACK;++ m_iRecvACKTotal;break;}... }
  • ACK2 處理
    • acknowledge 根據ACK2中的ACK序號,在ACK歷史窗口中找到關聯的ACK,根據ACK2到達時間和ACK離開時間,計算rtt。
    • 計算新的 RTT = (RTT * 7 + rtt) / 8,更新RTTVar = (RTTVar * 3 + abs(RTT - rtt)) / 4,更新cc中 rtt。
    • 更新被確認的最大ACK序號。
    void CUDT::processCtrl(CPacket& ctrlpkt) {case 6: //110 - Acknowledgement of Acknowledgement{int32_t ack;int rtt = -1;// update RTTrtt = m_pACKWindow->acknowledge(ctrlpkt.getAckSeqNo(), ack);if (rtt <= 0)break;//if increasing delay detected...// sendCtrl(4);// RTT EWMAm_iRTTVar = (m_iRTTVar * 3 + abs(rtt - m_iRTT)) >> 2;m_iRTT = (m_iRTT * 7 + rtt) >> 3;m_pCC->setRTT(m_iRTT); // update last ACK that has been received by the senderif (CSeqNo::seqcmp(ack, m_iRcvLastAckAck) > 0)m_iRcvLastAckAck = ack;break;} }
  • NAK 處理
    • 將 NAK 中攜帶的所有序號添加到發送丟失鏈表中。通過碼率控制更新 SND 周期。重置 EXP 時間變量。更新 m_pSndUList,等待重傳。
    void CUDT::processCtrl(CPacket& ctrlpkt) {case 3: //011 - Loss Report{int32_t* losslist = (int32_t *)(ctrlpkt.m_pcData);m_pCC->onLoss(losslist, ctrlpkt.getLength() / 4);//擁塞控制丟包處理,比如停止慢啟動,更新參數。CCUpdate();bool secure = true;// decode loss list message and insert loss into the sender loss listfor (int i = 0, n = (int)(ctrlpkt.getLength() / 4); i < n; ++ i){if (0 != (losslist[i] & 0x80000000)){if ((CSeqNo::seqcmp(losslist[i] & 0x7FFFFFFF, losslist[i + 1]) > 0) || (CSeqNo::seqcmp(losslist[i + 1], m_iSndCurrSeqNo) > 0)){// seq_a must not be greater than seq_b; seq_b must not be greater than the most recent sent seqsecure = false;break;}int num = 0;if (CSeqNo::seqcmp(losslist[i] & 0x7FFFFFFF, m_iSndLastAck) >= 0)num = m_pSndLossList->insert(losslist[i] & 0x7FFFFFFF, losslist[i + 1]);else if (CSeqNo::seqcmp(losslist[i + 1], m_iSndLastAck) >= 0)num = m_pSndLossList->insert(m_iSndLastAck, losslist[i + 1]);m_iTraceSndLoss += num;m_iSndLossTotal += num;++ i;}else if (CSeqNo::seqcmp(losslist[i], m_iSndLastAck) >= 0){if (CSeqNo::seqcmp(losslist[i], m_iSndCurrSeqNo) > 0){//seq_a must not be greater than the most recent sent seqsecure = false;break;}int num = m_pSndLossList->insert(losslist[i], losslist[i]);m_iTraceSndLoss += num;m_iSndLossTotal += num;}}if (!secure){//this should not happen: attack or bugm_bBroken = true;m_iBrokenCounter = 0;break;}// the lost packet (retransmission) should be sent out immediatelym_pSndQueue->m_pSndUList->update(this);++ m_iRecvNAK;++ m_iRecvNAKTotal;break;} }
  • Handshake 處理
  • case 0: //000 - Handshake{CHandShake req;req.deserialize(ctrlpkt.m_pcData, ctrlpkt.getLength());if ((req.m_iReqType > 0) || (m_bRendezvous && (req.m_iReqType != -2))){// The peer side has not received the handshake message, so it keeps querying// resend the handshake packetCHandShake initdata;initdata.m_iISN = m_iISN;initdata.m_iMSS = m_iMSS;initdata.m_iFlightFlagSize = m_iFlightFlagSize;initdata.m_iReqType = (!m_bRendezvous) ? -1 : -2;initdata.m_iID = m_SocketID;char* hs = new char [m_iPayloadSize];int hs_size = m_iPayloadSize;initdata.serialize(hs, hs_size);sendCtrl(0, NULL, hs, hs_size);delete [] hs;}break;} }
  • Msg drop request 處理
    • 在接收緩沖中標記所有屬于同一個消息的包,使得不再可讀。 在接收丟失鏈表中移除所有對應的包。
    case 7: //111 - Msg drop requestm_pRcvBuffer->dropMsg(ctrlpkt.getMsgSeq());m_pRcvLossList->remove(*(int32_t*)ctrlpkt.m_pcData, *(int32_t*)(ctrlpkt.m_pcData + 4));// move forward with current recv seq no.if ((CSeqNo::seqcmp(*(int32_t*)ctrlpkt.m_pcData, CSeqNo::incseq(m_iRcvCurrSeqNo)) <= 0)&& (CSeqNo::seqcmp(*(int32_t*)(ctrlpkt.m_pcData + 4), m_iRcvCurrSeqNo) > 0)){m_iRcvCurrSeqNo = *(int32_t*)(ctrlpkt.m_pcData + 4);}break; }

    總結

    以上是生活随笔為你收集整理的UDT 最新源码分析(五) -- 网络数据收发的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。