muduo网络库学习(六)缓冲区Buffer及TcpConnection的读写操作
在tcp的通信過程中,內(nèi)核其實(shí)為tcp維護(hù)著一個(gè)緩沖區(qū)
- 當(dāng)調(diào)用write/send時(shí),會(huì)向內(nèi)核緩沖區(qū)中寫入數(shù)據(jù),內(nèi)核和tcp協(xié)議棧負(fù)責(zé)將緩沖區(qū)中的數(shù)據(jù)發(fā)送到指定<ip,port>的目標(biāo)位置。
- 當(dāng)有數(shù)據(jù)到達(dá)內(nèi)核的tcp緩沖區(qū)中,如果開啟了對(duì)套接字可讀事件的監(jiān)聽,那么內(nèi)核會(huì)讓套接字變?yōu)榭勺x狀態(tài),從而從poll函數(shù)中返回,調(diào)用read/recv進(jìn)行讀操作。
但是,內(nèi)核維護(hù)的tcp緩沖區(qū)通常都比較小
- 如果調(diào)用write/send時(shí),內(nèi)核緩沖區(qū)已滿,那么阻塞io將會(huì)阻塞在io函數(shù)上直到內(nèi)核緩沖區(qū)有足夠的空間容納要寫入的數(shù)據(jù),非阻塞io將會(huì)返回錯(cuò)誤,通常是EAGAIN/EWOULDBLOCK。
- 如果調(diào)用write/send時(shí),內(nèi)核緩沖區(qū)未滿,但是不能容納要寫入的字節(jié)數(shù),可用空間不足,那么只會(huì)寫入能寫入的那么多字節(jié)數(shù),此時(shí),仍然有一些數(shù)據(jù)沒有發(fā)送,可是這些數(shù)據(jù)還非發(fā)送不可,就出現(xiàn)緩沖區(qū)已滿的情況
- 這就導(dǎo)致要不阻塞當(dāng)前線程,要不無法正常寫入數(shù)據(jù),而如果采用判斷返回值是否出錯(cuò)的方法,仍然是一直忙循環(huán)檢測(cè)io寫入狀態(tài),仍然是busy loop,仍然會(huì)阻塞當(dāng)前線程
而且,io多路復(fù)用分水平觸發(fā)和邊緣觸發(fā)兩種,當(dāng)內(nèi)核tcp緩沖區(qū)中一直有數(shù)據(jù)時(shí)
- 如果是水平觸發(fā),那么套接字會(huì)一直處于可讀狀態(tài),io多路復(fù)用函數(shù)會(huì)一直認(rèn)為這個(gè)套接字被激活,也就是說如果第一次觸發(fā)后沒有將tcp緩沖區(qū)中的數(shù)據(jù)全部讀出,那么下次進(jìn)行到poll函數(shù)時(shí)會(huì)立即返回,因?yàn)樘捉幼忠恢笔强勺x的。這會(huì)導(dǎo)致了busy loop問題
- 如果是邊緣觸發(fā),那么就只會(huì)觸發(fā)一次,即使第一次觸發(fā)沒有將所有數(shù)據(jù)都讀走,下次進(jìn)行到poll也不會(huì)再觸發(fā)套接字的可讀狀態(tài),直到下次又有一批數(shù)據(jù)送至tcp緩沖區(qū)中,才會(huì)再次觸發(fā)可讀。所以有可能存在漏讀數(shù)據(jù)的問題,萬一不會(huì)再有數(shù)據(jù)到來呢,此時(shí)tcp緩沖區(qū)中仍然有數(shù)據(jù),而應(yīng)用程序卻不知道
所以,設(shè)計(jì)應(yīng)用層自己的緩沖區(qū)是很有必要的,也就是由應(yīng)用程序來管理緩沖區(qū)問題
- 應(yīng)用層緩沖區(qū)通常很大,也可以初始很小,但可以通過動(dòng)態(tài)調(diào)整改變大小(vector)
- 應(yīng)用層緩沖區(qū)需要有讀/寫兩個(gè)(緩沖區(qū)類只有一個(gè),既可被用作讀緩沖區(qū),也可被用作寫緩沖區(qū))
- 當(dāng)用戶想要調(diào)用write/send寫入數(shù)據(jù)給對(duì)端,如果數(shù)據(jù)可以全部寫入,那么寫入就好了。如果寫入了部分?jǐn)?shù)據(jù)或者根本一點(diǎn)數(shù)據(jù)都寫不進(jìn)去,此時(shí)表明內(nèi)核緩沖區(qū)已滿,為了不阻塞當(dāng)前線程,應(yīng)用層寫緩沖區(qū)會(huì)接管這些數(shù)據(jù),等到內(nèi)核緩沖區(qū)可以寫入的時(shí)候自動(dòng)幫用戶寫入。
- 當(dāng)有數(shù)據(jù)到達(dá)內(nèi)核緩沖區(qū),應(yīng)用層的讀緩沖區(qū)會(huì)自動(dòng)將這些數(shù)據(jù)讀到自己那里,當(dāng)用戶調(diào)用read/recv想要讀取數(shù)據(jù)時(shí),應(yīng)用層讀緩沖區(qū)將已經(jīng)從內(nèi)核緩沖區(qū)取出的數(shù)據(jù)返回給用戶,實(shí)際上就是用戶從應(yīng)用層讀緩沖區(qū)讀取數(shù)據(jù)
- 應(yīng)用層緩沖區(qū)對(duì)用戶而言是隱藏的,用戶可能根本不知道有應(yīng)用層緩沖區(qū)的存在,只需讀/取數(shù)據(jù),而且也不會(huì)阻塞當(dāng)前線程
緩沖區(qū)Buffer的設(shè)計(jì)
muduo應(yīng)用層緩沖區(qū)的設(shè)計(jì)采用std::vector數(shù)據(jù)結(jié)構(gòu),一方面內(nèi)存是連續(xù)的方便管理,另一方面,vector自帶的增長模式足以應(yīng)對(duì)動(dòng)態(tài)調(diào)整大小的任務(wù)
緩沖區(qū)Buffer的定義如下,只列出了一些重要部分
注釋中寫明了緩沖區(qū)的設(shè)計(jì)方法,主要就是利用兩個(gè)指針readerIndex,writerIndex分別記錄著緩沖區(qū)中數(shù)據(jù)的起點(diǎn)和終點(diǎn),寫入數(shù)據(jù)的時(shí)候追加到writeIndex后面,讀出數(shù)據(jù)時(shí)從readerIndex開始讀。在readerIndex前面預(yù)留了幾個(gè)字節(jié)大小的空間,方便日后為數(shù)據(jù)追加頭部信息。緩沖區(qū)在使用的過程中會(huì)動(dòng)態(tài)調(diào)整readerIndex和writerIndex的位置,初始緩沖區(qū)為空,readerIndex == writerIndex
緩沖區(qū)默認(rèn)大小為1KB,頭部預(yù)留空間為8 bytes,如果使用過程中發(fā)現(xiàn)緩沖區(qū)大小不夠,會(huì)增加緩沖區(qū)大小,方法見readFd函數(shù)
TcpConnection的讀操作
當(dāng)Poller檢測(cè)到套接字的Channel處于可讀狀態(tài)時(shí),會(huì)調(diào)用Channel的回調(diào)函數(shù),回調(diào)函數(shù)中根據(jù)不同激活原因調(diào)用不同的函數(shù),這些函數(shù)都由TcpConnection在創(chuàng)建Channel之初提供,當(dāng)可讀時(shí),調(diào)用TcpConnection的可讀函數(shù)handleRead,而在這個(gè)函數(shù)中,讀緩沖區(qū)就會(huì)從內(nèi)核的tcp緩沖區(qū)讀取數(shù)據(jù)
注意這個(gè)是TcpConnection的函數(shù)
在TcpConnection的handleRead函數(shù)中,讀緩沖區(qū)讀取數(shù)據(jù),調(diào)用readFd函數(shù),readFd函數(shù)是將數(shù)據(jù)從內(nèi)核tcp緩沖區(qū)中讀出,存放到自己的讀緩沖區(qū)中,也是緩沖區(qū)最重要的函數(shù),其中用到了readv(分散讀)/writev(集中寫)系統(tǒng)調(diào)用解決緩沖區(qū)大小不足的問題
/** 從tcp緩沖區(qū)(sockfd)中讀取數(shù)據(jù),存放到應(yīng)用層緩沖區(qū)中* 兩種情況* 1.應(yīng)用層緩沖區(qū)足以容納所有數(shù)據(jù)* 直接讀取到buffer_中* 2.應(yīng)用層緩沖區(qū)不夠* 開辟一段棧空間(128k)大小,使用分散讀(readv)系統(tǒng)調(diào)用讀取數(shù)據(jù)* 然后為buffer_開辟更大的空間,存放讀到棧區(qū)的那部分?jǐn)?shù)據(jù)* * 為什么不在Buffer構(gòu)造時(shí)就開辟足夠大的緩沖區(qū)* 1.每個(gè)tcp連接都有輸入/輸出緩沖區(qū),如果連接過多則內(nèi)存消耗會(huì)很大* 2.防止客戶端與服務(wù)器端數(shù)據(jù)交互比較少,造成緩沖區(qū)的浪費(fèi)* 3.當(dāng)緩沖區(qū)大小不足時(shí),利用vector內(nèi)存增長的優(yōu)勢(shì),擴(kuò)充緩沖區(qū)* * 為什么不在讀數(shù)據(jù)之前判斷一下應(yīng)用層緩沖區(qū)是否可以容納內(nèi)核緩沖區(qū)的全部數(shù)據(jù)* 1.采用這種方式就會(huì)調(diào)用一次recv,傳入MSG_PEEK,即recv(sockfd,, extrabuf, sizeof(extrabuf), MSG_PEEK)* 可根據(jù)返回值判斷緩沖區(qū)還有多少數(shù)據(jù)沒有接收,然后再調(diào)用一次recv從內(nèi)核沖讀取數(shù)據(jù)* 2.但是這樣會(huì)執(zhí)行兩次系統(tǒng)調(diào)用,得不償失,盡量使用一次系統(tǒng)調(diào)用就將所有數(shù)據(jù)讀出,這就需要一個(gè)很大的空間* * struct iovec* 1.iov_base,存放數(shù)據(jù)的緩沖區(qū)起始位置,寫時(shí)往這個(gè)位置寫入iov_len個(gè)字節(jié),讀時(shí)從這個(gè)位置讀出iov_len個(gè)字節(jié)* 2.iov_len,要讀入多少數(shù)據(jù)從內(nèi)核緩沖區(qū)/要寫入多少數(shù)據(jù)到內(nèi)核緩沖區(qū)* * readv(int fd, const struct iovec *iov, int iovcnt);分散讀* writev(int fd, const struct iovec *iov, int iovcnt);集中寫*/ ssize_t Buffer::readFd(int fd, int* savedErrno) {// saved an ioctl()/FIONREAD call to tell how much to read/* 開辟的棧空間,128k */char extrabuf[65536];/* readv用到的數(shù)據(jù)結(jié)構(gòu),定義如上 */struct iovec vec[2];/* 緩沖區(qū)接口,返回緩沖區(qū)還可以寫入多少字節(jié) */const size_t writable = writableBytes();/* 定義兩塊內(nèi)存,一塊是讀緩沖區(qū),一塊是棧空間 */vec[0].iov_base = begin()+writerIndex_;vec[0].iov_len = writable;vec[1].iov_base = extrabuf;vec[1].iov_len = sizeof extrabuf;// when there is enough space in this buffer, don't read into extrabuf.// when extrabuf is used, we read 128k-1 bytes at most./* 如果應(yīng)用層讀緩沖區(qū)足夠大(大于128k,初始時(shí)才1k -.-),就不需要往棧區(qū)寫數(shù)據(jù)了 */const int iovcnt = (writable < sizeof extrabuf) ? 2 : 1;/* 分散讀,返回讀取的字節(jié)數(shù) */const ssize_t n = sockets::readv(fd, vec, iovcnt);if (n < 0){*savedErrno = errno;}/* * 讀取的字節(jié)數(shù)比較少,讀緩沖區(qū)足以容納* 因?yàn)樽x緩沖區(qū)是readv的第一塊內(nèi)存,所以率先向這塊內(nèi)存寫數(shù)據(jù)*/else if (implicit_cast<size_t>(n) <= writable){writerIndex_ += n;}else{/* * 將棧空間的數(shù)據(jù)追加到緩沖區(qū)末尾 * 因?yàn)樽x緩沖區(qū)已經(jīng)寫滿了,所以writerIndex指針就指向緩沖區(qū)的末尾*/writerIndex_ = buffer_.size();append(extrabuf, n - writable);}// if (n == writable + sizeof extrabuf)// {// goto line_30;// }return n; }如果讀緩沖區(qū)大小不夠,其他數(shù)據(jù)就會(huì)寫入到棧空間,接下來需要將棧空間的數(shù)據(jù)追加到緩沖區(qū)的末尾,使用append函數(shù)
void append(const char* /*restrict*/ data, size_t len){/* 確保有足夠的空間容納len大小的數(shù)據(jù) */ensureWritableBytes(len);/* 將數(shù)據(jù)copy到writerIndex后面,beginWrite返回的就是writerIndex位置的地址(writerIndex是下標(biāo)) */std::copy(data, data+len, beginWrite());/* 寫完數(shù)據(jù),更新writerIndex */hasWritten(len);}函數(shù)首先調(diào)用ensureWritableBytes函數(shù)確保讀緩沖區(qū)有足夠的空間,如果沒有,就需要調(diào)用resize函數(shù)重新設(shè)置空間大小(std::vector的內(nèi)存增長就體現(xiàn)在這里,因?yàn)閏apacity和size通常不同,所以如果resize設(shè)置的大小沒有超過capacity,那么空間仍然足夠,不會(huì)重新開辟內(nèi)存,將數(shù)據(jù)拷貝到新內(nèi)存上)
void ensureWritableBytes(size_t len){/* 返回剩余可用空間大小,如果不足len,開辟新空間(調(diào)用resize) */if (writableBytes() < len){makeSpace(len);}assert(writableBytes() >= len);}如果空間不夠,就需要調(diào)整空間大小
void makeSpace(size_t len){/* * 在多次從緩沖區(qū)讀數(shù)據(jù)后,readerIndex會(huì)后移很多,導(dǎo)致預(yù)留空間變大* 在增大空間之前,先判斷調(diào)整預(yù)留空間的大小后能否容納要求的數(shù)據(jù)* 如果可以,則將預(yù)留空間縮小為8字節(jié)(默認(rèn)的預(yù)留空間大小)* 如果不可以,那么就只能增加空間*/if (writableBytes() + prependableBytes() < len + kCheapPrepend){// FIXME: move readable data/* writerIndex代表當(dāng)前緩沖區(qū)已使用的大小,調(diào)整只需調(diào)整到恰好滿足len大小即可 */buffer_.resize(writerIndex_+len);}else{/* 通過縮小預(yù)留空間大小可以容納len個(gè)數(shù)據(jù),就縮小預(yù)留空間 */// move readable data to the front, make space inside bufferassert(kCheapPrepend < readerIndex_);/* 返回緩沖區(qū)數(shù)據(jù)個(gè)數(shù),writerIndex - readerIndex */size_t readable = readableBytes();/* 將所有數(shù)據(jù)前移 */std::copy(begin()+readerIndex_,begin()+writerIndex_,begin()+kCheapPrepend);/* 更新兩個(gè)指針(下標(biāo)) */readerIndex_ = kCheapPrepend;writerIndex_ = readerIndex_ + readable;assert(readable == readableBytes());}}此時(shí)應(yīng)用層讀緩沖區(qū)從內(nèi)核中讀取數(shù)據(jù)完成,在用戶可讀的回調(diào)函數(shù)中(在readFd函數(shù)執(zhí)行完調(diào)用),用戶可以調(diào)用Buffer的接口從緩沖區(qū)中讀取數(shù)據(jù),程序示例如下
這是用戶提供給TcpServer的可讀時(shí)的回調(diào)函數(shù),又由TcpServer提供給TcpConnection,當(dāng)TcpConnection的讀緩沖區(qū)執(zhí)行完readFd返回后,會(huì)執(zhí)行用戶的回調(diào)函數(shù),圖片程序來自muduo的測(cè)試用例。
可以看到
- buf->readableBytes()返回緩沖區(qū)中可讀字節(jié)數(shù)
- conn->name()返回TcpConnection的名字(由TcpServer設(shè)置)
- receiveTime是poll函數(shù)返回的時(shí)間,一直作為參數(shù)傳到Channel,TcpConnection,onMessage
- buf->retrieveAsString()讀取緩沖區(qū)所有數(shù)據(jù)
這兩個(gè)函數(shù)從讀緩沖區(qū)中讀取數(shù)據(jù),一個(gè)是全讀,一個(gè)是讀取指定字節(jié)個(gè)數(shù)的數(shù)據(jù),讀完之后,緩沖區(qū)需要調(diào)整readerIndex位置以指向新的數(shù)據(jù)起點(diǎn)
/* 調(diào)整readerIndex,后移len */void retrieve(size_t len){assert(len <= readableBytes());/* * 如果調(diào)整后仍然有數(shù)據(jù),就將readerIndex增加len* 如果已經(jīng)將數(shù)據(jù)全部讀完(len >= readableBytes),那么就初始化readerIndex/writerIndex位置*/if (len < readableBytes()){readerIndex_ += len;}else{retrieveAll();}}如果數(shù)據(jù)全部被用戶讀出,就重新調(diào)整readerIndex/writerIndex位置
/* 初始化readerIndex/writerIndex位置,通常在用戶將數(shù)據(jù)全部讀出之后執(zhí)行 */void retrieveAll(){readerIndex_ = kCheapPrepend;writerIndex_ = kCheapPrepend;}TcpConnection的寫操作
發(fā)送數(shù)據(jù)使用的是寫緩沖區(qū),當(dāng)內(nèi)核tcp緩沖區(qū)空間不足時(shí),會(huì)把數(shù)據(jù)寫到寫緩沖區(qū),由寫緩沖區(qū)在合適的時(shí)機(jī)寫入內(nèi)核tcp緩沖區(qū),合適的時(shí)機(jī)指內(nèi)核tcp緩沖區(qū)有多余空間時(shí)。
但是怎樣才能直到內(nèi)核tcp緩沖區(qū)有多余的空間呢,通過監(jiān)聽可寫事件即可。
但是如果內(nèi)核tcp緩沖區(qū)一直不滿,那么就一直可寫,就會(huì)一直觸發(fā)poll,導(dǎo)致busy loop,所以muduo只有在需要的時(shí)候才會(huì)檢測(cè)內(nèi)核tcp緩沖區(qū)的可寫事件,即只有當(dāng)tcp緩沖區(qū)已滿,但是寫緩沖區(qū)中有數(shù)據(jù)等待寫入tcp緩沖區(qū)時(shí)才會(huì)監(jiān)聽。
不同于讀取數(shù)據(jù)的是,發(fā)送數(shù)據(jù)使用的是TcpConnection提供的接口,而不是直接向Buffer中寫。
/* 幾個(gè)重載的send函數(shù),用于用戶想要發(fā)送數(shù)據(jù)到對(duì)端 */ void TcpConnection::send(const void* data, int len) {send(StringPiece(static_cast<const char*>(data), len)); }void TcpConnection::send(const StringPiece& message) {if (state_ == kConnected){/* * 如果當(dāng)前線程和TcpConnection所屬線程相同,直接在當(dāng)前線程發(fā)送* 否則,需要使用std::bind綁定函數(shù)和對(duì)象,并添加到自己所在線程的事件循環(huán)中*/if (loop_->isInLoopThread()){sendInLoop(message);}else{/* 可以直接在bind中綁定函數(shù) ? */void (TcpConnection::*fp)(const StringPiece& message) = &TcpConnection::sendInLoop;loop_->runInLoop(std::bind(fp,this, // FIXMEmessage.as_string()));//std::forward<string>(message)));}} }send函數(shù)調(diào)用sendInLoop函數(shù),保證在TcpConnection所屬線程發(fā)送數(shù)據(jù)
- 發(fā)送時(shí)會(huì)先判斷寫緩沖區(qū)是否已經(jīng)有數(shù)據(jù)存在,如果有,就不能直接向tcp緩沖區(qū)寫了,因?yàn)閿?shù)據(jù)要有順序的發(fā)送,所以需要追加到寫緩沖區(qū)中
- 如果寫緩沖區(qū)中沒有數(shù)據(jù),就可以嘗試向tcp緩沖區(qū)寫數(shù)據(jù),如果全部寫入,當(dāng)然很happy,但是如果只寫入一部分或者一點(diǎn)也沒寫進(jìn)去(tcp緩沖區(qū)已滿),就需要添加到寫緩沖區(qū)中,同時(shí)開啟對(duì)tcp緩沖區(qū)(其實(shí)就是用于通信的套接字)的可寫事件的監(jiān)聽,等待tcp緩沖區(qū)可寫
如果tcp緩沖區(qū)不足以全部容納數(shù)據(jù),就會(huì)開啟對(duì)可寫事件的監(jiān)聽,當(dāng)tcp緩沖區(qū)可寫,就調(diào)用Channel的回調(diào)函數(shù),這個(gè)回調(diào)函數(shù)也是在TcpConnection構(gòu)造函數(shù)中傳給Channel的
channel_->setWriteCallback(std::bind(&TcpConnection::handleWrite, this)); /* 當(dāng)tcp緩沖區(qū)可寫時(shí)調(diào)用 */ void TcpConnection::handleWrite() {loop_->assertInLoopThread();if (channel_->isWriting()){/* 嘗試寫入寫緩沖區(qū)的所有數(shù)據(jù),返回實(shí)際寫入的字節(jié)數(shù)(tcp緩沖區(qū)很有可能仍然不能容納所有數(shù)據(jù)) */ssize_t n = sockets::write(channel_->fd(),outputBuffer_.peek(),outputBuffer_.readableBytes());if (n > 0){/* 調(diào)整寫緩沖區(qū)的readerIndex */outputBuffer_.retrieve(n);if (outputBuffer_.readableBytes() == 0){/* 全部寫到tcp緩沖區(qū)中,關(guān)閉對(duì)可寫事件的監(jiān)聽 */channel_->disableWriting();/* 如果有寫入完成時(shí)的回調(diào)函數(shù)(用戶提供,則等待函數(shù)結(jié)束后調(diào)用 */if (writeCompleteCallback_){loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));}/* * 如果連接正在關(guān)閉(通常關(guān)閉讀端),那么關(guān)閉寫端,但是是在已經(jīng)寫完的前提下* 如果還有數(shù)據(jù)沒有寫完,不能關(guān)閉,要在寫完再關(guān) */if (state_ == kDisconnecting){shutdownInLoop();}}}else{LOG_SYSERR << "TcpConnection::handleWrite";// if (state_ == kDisconnecting)// {// shutdownInLoop();// }}}else{LOG_TRACE << "Connection fd = " << channel_->fd()<< " is down, no more writing";} }這里的細(xì)節(jié)問題就是如果想要關(guān)閉連接,那么通常是先關(guān)閉讀端,等到將寫緩沖區(qū)所有數(shù)據(jù)都寫到tcp緩沖區(qū)后,再關(guān)閉寫端,否則這些數(shù)據(jù)就不能發(fā)送給對(duì)端了
muduo沒有提供close函數(shù),關(guān)閉是分兩步進(jìn)行的(使用shutdown而不適用close),這樣更容易控制
handleWrite函數(shù)中調(diào)用的shutdownInLoop函數(shù)如下,用于關(guān)閉寫端
至此發(fā)送數(shù)據(jù)的操作完成,所以數(shù)據(jù)都在tcp緩沖區(qū)中等待著或正在運(yùn)往對(duì)端(客戶端)
總結(jié)
以上是生活随笔為你收集整理的muduo网络库学习(六)缓冲区Buffer及TcpConnection的读写操作的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 每天一道LeetCode-----删除序
- 下一篇: 每天一道LeetCode-----KMP