日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

muduo网络库学习(六)缓冲区Buffer及TcpConnection的读写操作

發(fā)布時(shí)間:2024/4/19 编程问答 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 muduo网络库学习(六)缓冲区Buffer及TcpConnection的读写操作 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

在tcp的通信過程中,內(nèi)核其實(shí)為tcp維護(hù)著一個(gè)緩沖區(qū)

  • 當(dāng)調(diào)用write/send時(shí),會(huì)向內(nèi)核緩沖區(qū)中寫入數(shù)據(jù),內(nèi)核和tcp協(xié)議棧負(fù)責(zé)將緩沖區(qū)中的數(shù)據(jù)發(fā)送到指定<ip,port>的目標(biāo)位置。
  • 當(dāng)有數(shù)據(jù)到達(dá)內(nèi)核的tcp緩沖區(qū)中,如果開啟了對(duì)套接字可讀事件的監(jiān)聽,那么內(nèi)核會(huì)讓套接字變?yōu)榭勺x狀態(tài),從而從poll函數(shù)中返回,調(diào)用read/recv進(jìn)行讀操作。

但是,內(nèi)核維護(hù)的tcp緩沖區(qū)通常都比較小

  • 如果調(diào)用write/send時(shí),內(nèi)核緩沖區(qū)已滿,那么阻塞io將會(huì)阻塞在io函數(shù)上直到內(nèi)核緩沖區(qū)有足夠的空間容納要寫入的數(shù)據(jù),非阻塞io將會(huì)返回錯(cuò)誤,通常是EAGAIN/EWOULDBLOCK。
  • 如果調(diào)用write/send時(shí),內(nèi)核緩沖區(qū)未滿,但是不能容納要寫入的字節(jié)數(shù),可用空間不足,那么只會(huì)寫入能寫入的那么多字節(jié)數(shù),此時(shí),仍然有一些數(shù)據(jù)沒有發(fā)送,可是這些數(shù)據(jù)還非發(fā)送不可,就出現(xiàn)緩沖區(qū)已滿的情況
  • 這就導(dǎo)致要不阻塞當(dāng)前線程,要不無法正常寫入數(shù)據(jù),而如果采用判斷返回值是否出錯(cuò)的方法,仍然是一直忙循環(huán)檢測(cè)io寫入狀態(tài),仍然是busy loop,仍然會(huì)阻塞當(dāng)前線程

而且,io多路復(fù)用分水平觸發(fā)和邊緣觸發(fā)兩種,當(dāng)內(nèi)核tcp緩沖區(qū)中一直有數(shù)據(jù)時(shí)

  • 如果是水平觸發(fā),那么套接字會(huì)一直處于可讀狀態(tài),io多路復(fù)用函數(shù)會(huì)一直認(rèn)為這個(gè)套接字被激活,也就是說如果第一次觸發(fā)后沒有將tcp緩沖區(qū)中的數(shù)據(jù)全部讀出,那么下次進(jìn)行到poll函數(shù)時(shí)會(huì)立即返回,因?yàn)樘捉幼忠恢笔强勺x的。這會(huì)導(dǎo)致了busy loop問題
  • 如果是邊緣觸發(fā),那么就只會(huì)觸發(fā)一次,即使第一次觸發(fā)沒有將所有數(shù)據(jù)都讀走,下次進(jìn)行到poll也不會(huì)再觸發(fā)套接字的可讀狀態(tài),直到下次又有一批數(shù)據(jù)送至tcp緩沖區(qū)中,才會(huì)再次觸發(fā)可讀。所以有可能存在漏讀數(shù)據(jù)的問題,萬一不會(huì)再有數(shù)據(jù)到來呢,此時(shí)tcp緩沖區(qū)中仍然有數(shù)據(jù),而應(yīng)用程序卻不知道

所以,設(shè)計(jì)應(yīng)用層自己的緩沖區(qū)是很有必要的,也就是由應(yīng)用程序來管理緩沖區(qū)問題

  • 應(yīng)用層緩沖區(qū)通常很大,也可以初始很小,但可以通過動(dòng)態(tài)調(diào)整改變大小(vector)
  • 應(yīng)用層緩沖區(qū)需要有讀/寫兩個(gè)(緩沖區(qū)類只有一個(gè),既可被用作讀緩沖區(qū),也可被用作寫緩沖區(qū))
  • 當(dāng)用戶想要調(diào)用write/send寫入數(shù)據(jù)給對(duì)端,如果數(shù)據(jù)可以全部寫入,那么寫入就好了。如果寫入了部分?jǐn)?shù)據(jù)或者根本一點(diǎn)數(shù)據(jù)都寫不進(jìn)去,此時(shí)表明內(nèi)核緩沖區(qū)已滿,為了不阻塞當(dāng)前線程,應(yīng)用層寫緩沖區(qū)會(huì)接管這些數(shù)據(jù),等到內(nèi)核緩沖區(qū)可以寫入的時(shí)候自動(dòng)幫用戶寫入。
  • 當(dāng)有數(shù)據(jù)到達(dá)內(nèi)核緩沖區(qū),應(yīng)用層的讀緩沖區(qū)會(huì)自動(dòng)將這些數(shù)據(jù)讀到自己那里,當(dāng)用戶調(diào)用read/recv想要讀取數(shù)據(jù)時(shí),應(yīng)用層讀緩沖區(qū)將已經(jīng)從內(nèi)核緩沖區(qū)取出的數(shù)據(jù)返回給用戶,實(shí)際上就是用戶從應(yīng)用層讀緩沖區(qū)讀取數(shù)據(jù)
  • 應(yīng)用層緩沖區(qū)對(duì)用戶而言是隱藏的,用戶可能根本不知道有應(yīng)用層緩沖區(qū)的存在,只需讀/取數(shù)據(jù),而且也不會(huì)阻塞當(dāng)前線程

緩沖區(qū)Buffer的設(shè)計(jì)

muduo應(yīng)用層緩沖區(qū)的設(shè)計(jì)采用std::vector數(shù)據(jù)結(jié)構(gòu),一方面內(nèi)存是連續(xù)的方便管理,另一方面,vector自帶的增長模式足以應(yīng)對(duì)動(dòng)態(tài)調(diào)整大小的任務(wù)
緩沖區(qū)Buffer的定義如下,只列出了一些重要部分

注釋中寫明了緩沖區(qū)的設(shè)計(jì)方法,主要就是利用兩個(gè)指針readerIndex,writerIndex分別記錄著緩沖區(qū)中數(shù)據(jù)的起點(diǎn)和終點(diǎn),寫入數(shù)據(jù)的時(shí)候追加到writeIndex后面,讀出數(shù)據(jù)時(shí)從readerIndex開始讀。在readerIndex前面預(yù)留了幾個(gè)字節(jié)大小的空間,方便日后為數(shù)據(jù)追加頭部信息。緩沖區(qū)在使用的過程中會(huì)動(dòng)態(tài)調(diào)整readerIndex和writerIndex的位置,初始緩沖區(qū)為空,readerIndex == writerIndex
緩沖區(qū)默認(rèn)大小為1KB,頭部預(yù)留空間為8 bytes,如果使用過程中發(fā)現(xiàn)緩沖區(qū)大小不夠,會(huì)增加緩沖區(qū)大小,方法見readFd函數(shù)

/// A buffer class modeled after org.jboss.netty.buffer.ChannelBuffer /// /// @code /// +-------------------+------------------+------------------+ /// | prependable bytes | readable bytes | writable bytes | /// | | (CONTENT) | | /// +-------------------+------------------+------------------+ /// | | | | /// 0 <= readerIndex <= writerIndex <= size /// @endcode/** * 緩沖區(qū)的設(shè)計(jì)方法,muduo采用vector連續(xù)內(nèi)存作為緩沖區(qū),libevent則是分塊內(nèi)存* 1.相比之下,采用vector連續(xù)內(nèi)存更容易管理,同時(shí)利用std::vector自帶的內(nèi)存* 增長方式,可以減少擴(kuò)充的次數(shù)(capacity和size一般不同)* 2.記錄緩沖區(qū)數(shù)據(jù)起始位置和結(jié)束位置,寫入時(shí)寫到已有數(shù)據(jù)的后面,讀出時(shí)從* 數(shù)據(jù)起始位置讀出* 3.起始/結(jié)束位置如上圖的readerIndex/writeIndex,其中readerIndex為緩沖區(qū)* 數(shù)據(jù)的起始索引下標(biāo),writeIndex為結(jié)束位置下標(biāo)。采用下標(biāo)而不是迭代器的* 原因是刪除(erase)數(shù)據(jù)時(shí)迭代器可能失效* 4.開頭部分(readerIndex以前)是預(yù)留空間,通常只有幾個(gè)字節(jié)的大小,可以用來* 寫入數(shù)據(jù)的長度,解決粘包問題* 5.讀出和寫入數(shù)據(jù)時(shí)會(huì)動(dòng)態(tài)調(diào)整readerIndex/writeIndex,如果沒有數(shù)據(jù),二者* 相等*/ class Buffer : public muduo::copyable {public:static const size_t kCheapPrepend = 8;static const size_t kInitialSize = 1024;explicit Buffer(size_t initialSize = kInitialSize): buffer_(kCheapPrepend + initialSize),readerIndex_(kCheapPrepend),writerIndex_(kCheapPrepend){assert(readableBytes() == 0);assert(writableBytes() == initialSize);assert(prependableBytes() == kCheapPrepend);}/* 可讀的數(shù)據(jù)就是起始位置和結(jié)束位置中間的部分 */size_t readableBytes() const{ return writerIndex_ - readerIndex_; }size_t writableBytes() const{ return buffer_.size() - writerIndex_; }size_t prependableBytes() const{ return readerIndex_; }/* 返回?cái)?shù)據(jù)起始位置 */const char* peek() const{ return begin() + readerIndex_; }/// Read data directly into buffer.////// It may implement with readv(2)/// @return result of read(2), @c errno is saved/* 從套接字(內(nèi)核tcp緩沖區(qū))中讀取數(shù)據(jù)放到讀緩沖區(qū)中 */ssize_t readFd(int fd, int* savedErrno);private:char* begin(){ return &*buffer_.begin(); }const char* begin() const{ return &*buffer_.begin(); }private:/* 緩沖區(qū) */std::vector<char> buffer_;/* 數(shù)據(jù)起始點(diǎn) */size_t readerIndex_;/* 數(shù)據(jù)結(jié)束點(diǎn) */size_t writerIndex_;/* \r\n */static const char kCRLF[]; };

TcpConnection的讀操作

當(dāng)Poller檢測(cè)到套接字的Channel處于可讀狀態(tài)時(shí),會(huì)調(diào)用Channel的回調(diào)函數(shù),回調(diào)函數(shù)中根據(jù)不同激活原因調(diào)用不同的函數(shù),這些函數(shù)都由TcpConnection在創(chuàng)建Channel之初提供,當(dāng)可讀時(shí),調(diào)用TcpConnection的可讀函數(shù)handleRead,而在這個(gè)函數(shù)中,讀緩沖區(qū)就會(huì)從內(nèi)核的tcp緩沖區(qū)讀取數(shù)據(jù)
注意這個(gè)是TcpConnection的函數(shù)

/** 1.TcpConnection構(gòu)造時(shí),創(chuàng)建一個(gè)監(jiān)聽服務(wù)器/客戶端連接的fd的Channel,設(shè)置各種回調(diào)函數(shù)* 2.TcpServer設(shè)置各種回調(diào)函數(shù)(可讀等),然后調(diào)用connectEstablished,將Channel添加到Poller中* 3.EventLoop繼續(xù)監(jiān)聽事件,調(diào)用Poller* 4.poll返回,處理激活的Channel,調(diào)用Channel的handleEvent* 5.hanleEvent根據(jù)激活事件的類型(可讀/可寫/掛起/錯(cuò)誤)調(diào)用不同的處理函數(shù)* 6.若可讀,調(diào)用hanleRead,TcpConnection中的讀緩沖區(qū)將內(nèi)核tcp緩沖區(qū)的數(shù)據(jù)全部讀出* 7.調(diào)用用戶提供的當(dāng)可讀時(shí)執(zhí)行的回調(diào)函數(shù),用戶可直接從應(yīng)用層緩沖區(qū)讀數(shù)據(jù)*/ void TcpConnection::handleRead(Timestamp receiveTime) {loop_->assertInLoopThread();int savedErrno = 0;/* 讀緩沖區(qū)從內(nèi)核tcp中讀取數(shù)據(jù) */ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);if (n > 0){/* 如果成功讀取數(shù)據(jù),調(diào)用用戶提供的可讀時(shí)回調(diào)函數(shù) */messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);}else if (n == 0){/* 如果返回0,說明對(duì)端已經(jīng)close連接,處理close事件,關(guān)閉tcp連接 */handleClose();}else{/* 出錯(cuò) */errno = savedErrno;LOG_SYSERR << "TcpConnection::handleRead";handleError();} }

在TcpConnection的handleRead函數(shù)中,讀緩沖區(qū)讀取數(shù)據(jù),調(diào)用readFd函數(shù),readFd函數(shù)是將數(shù)據(jù)從內(nèi)核tcp緩沖區(qū)中讀出,存放到自己的讀緩沖區(qū)中,也是緩沖區(qū)最重要的函數(shù),其中用到了readv(分散讀)/writev(集中寫)系統(tǒng)調(diào)用解決緩沖區(qū)大小不足的問題

/** 從tcp緩沖區(qū)(sockfd)中讀取數(shù)據(jù),存放到應(yīng)用層緩沖區(qū)中* 兩種情況* 1.應(yīng)用層緩沖區(qū)足以容納所有數(shù)據(jù)* 直接讀取到buffer_中* 2.應(yīng)用層緩沖區(qū)不夠* 開辟一段棧空間(128k)大小,使用分散讀(readv)系統(tǒng)調(diào)用讀取數(shù)據(jù)* 然后為buffer_開辟更大的空間,存放讀到棧區(qū)的那部分?jǐn)?shù)據(jù)* * 為什么不在Buffer構(gòu)造時(shí)就開辟足夠大的緩沖區(qū)* 1.每個(gè)tcp連接都有輸入/輸出緩沖區(qū),如果連接過多則內(nèi)存消耗會(huì)很大* 2.防止客戶端與服務(wù)器端數(shù)據(jù)交互比較少,造成緩沖區(qū)的浪費(fèi)* 3.當(dāng)緩沖區(qū)大小不足時(shí),利用vector內(nèi)存增長的優(yōu)勢(shì),擴(kuò)充緩沖區(qū)* * 為什么不在讀數(shù)據(jù)之前判斷一下應(yīng)用層緩沖區(qū)是否可以容納內(nèi)核緩沖區(qū)的全部數(shù)據(jù)* 1.采用這種方式就會(huì)調(diào)用一次recv,傳入MSG_PEEK,即recv(sockfd,, extrabuf, sizeof(extrabuf), MSG_PEEK)* 可根據(jù)返回值判斷緩沖區(qū)還有多少數(shù)據(jù)沒有接收,然后再調(diào)用一次recv從內(nèi)核沖讀取數(shù)據(jù)* 2.但是這樣會(huì)執(zhí)行兩次系統(tǒng)調(diào)用,得不償失,盡量使用一次系統(tǒng)調(diào)用就將所有數(shù)據(jù)讀出,這就需要一個(gè)很大的空間* * struct iovec* 1.iov_base,存放數(shù)據(jù)的緩沖區(qū)起始位置,寫時(shí)往這個(gè)位置寫入iov_len個(gè)字節(jié),讀時(shí)從這個(gè)位置讀出iov_len個(gè)字節(jié)* 2.iov_len,要讀入多少數(shù)據(jù)從內(nèi)核緩沖區(qū)/要寫入多少數(shù)據(jù)到內(nèi)核緩沖區(qū)* * readv(int fd, const struct iovec *iov, int iovcnt);分散讀* writev(int fd, const struct iovec *iov, int iovcnt);集中寫*/ ssize_t Buffer::readFd(int fd, int* savedErrno) {// saved an ioctl()/FIONREAD call to tell how much to read/* 開辟的棧空間,128k */char extrabuf[65536];/* readv用到的數(shù)據(jù)結(jié)構(gòu),定義如上 */struct iovec vec[2];/* 緩沖區(qū)接口,返回緩沖區(qū)還可以寫入多少字節(jié) */const size_t writable = writableBytes();/* 定義兩塊內(nèi)存,一塊是讀緩沖區(qū),一塊是棧空間 */vec[0].iov_base = begin()+writerIndex_;vec[0].iov_len = writable;vec[1].iov_base = extrabuf;vec[1].iov_len = sizeof extrabuf;// when there is enough space in this buffer, don't read into extrabuf.// when extrabuf is used, we read 128k-1 bytes at most./* 如果應(yīng)用層讀緩沖區(qū)足夠大(大于128k,初始時(shí)才1k -.-),就不需要往棧區(qū)寫數(shù)據(jù)了 */const int iovcnt = (writable < sizeof extrabuf) ? 2 : 1;/* 分散讀,返回讀取的字節(jié)數(shù) */const ssize_t n = sockets::readv(fd, vec, iovcnt);if (n < 0){*savedErrno = errno;}/* * 讀取的字節(jié)數(shù)比較少,讀緩沖區(qū)足以容納* 因?yàn)樽x緩沖區(qū)是readv的第一塊內(nèi)存,所以率先向這塊內(nèi)存寫數(shù)據(jù)*/else if (implicit_cast<size_t>(n) <= writable){writerIndex_ += n;}else{/* * 將棧空間的數(shù)據(jù)追加到緩沖區(qū)末尾 * 因?yàn)樽x緩沖區(qū)已經(jīng)寫滿了,所以writerIndex指針就指向緩沖區(qū)的末尾*/writerIndex_ = buffer_.size();append(extrabuf, n - writable);}// if (n == writable + sizeof extrabuf)// {// goto line_30;// }return n; }

如果讀緩沖區(qū)大小不夠,其他數(shù)據(jù)就會(huì)寫入到棧空間,接下來需要將棧空間的數(shù)據(jù)追加到緩沖區(qū)的末尾,使用append函數(shù)

void append(const char* /*restrict*/ data, size_t len){/* 確保有足夠的空間容納len大小的數(shù)據(jù) */ensureWritableBytes(len);/* 將數(shù)據(jù)copy到writerIndex后面,beginWrite返回的就是writerIndex位置的地址(writerIndex是下標(biāo)) */std::copy(data, data+len, beginWrite());/* 寫完數(shù)據(jù),更新writerIndex */hasWritten(len);}

函數(shù)首先調(diào)用ensureWritableBytes函數(shù)確保讀緩沖區(qū)有足夠的空間,如果沒有,就需要調(diào)用resize函數(shù)重新設(shè)置空間大小(std::vector的內(nèi)存增長就體現(xiàn)在這里,因?yàn)閏apacity和size通常不同,所以如果resize設(shè)置的大小沒有超過capacity,那么空間仍然足夠,不會(huì)重新開辟內(nèi)存,將數(shù)據(jù)拷貝到新內(nèi)存上)

void ensureWritableBytes(size_t len){/* 返回剩余可用空間大小,如果不足len,開辟新空間(調(diào)用resize) */if (writableBytes() < len){makeSpace(len);}assert(writableBytes() >= len);}

如果空間不夠,就需要調(diào)整空間大小

void makeSpace(size_t len){/* * 在多次從緩沖區(qū)讀數(shù)據(jù)后,readerIndex會(huì)后移很多,導(dǎo)致預(yù)留空間變大* 在增大空間之前,先判斷調(diào)整預(yù)留空間的大小后能否容納要求的數(shù)據(jù)* 如果可以,則將預(yù)留空間縮小為8字節(jié)(默認(rèn)的預(yù)留空間大小)* 如果不可以,那么就只能增加空間*/if (writableBytes() + prependableBytes() < len + kCheapPrepend){// FIXME: move readable data/* writerIndex代表當(dāng)前緩沖區(qū)已使用的大小,調(diào)整只需調(diào)整到恰好滿足len大小即可 */buffer_.resize(writerIndex_+len);}else{/* 通過縮小預(yù)留空間大小可以容納len個(gè)數(shù)據(jù),就縮小預(yù)留空間 */// move readable data to the front, make space inside bufferassert(kCheapPrepend < readerIndex_);/* 返回緩沖區(qū)數(shù)據(jù)個(gè)數(shù),writerIndex - readerIndex */size_t readable = readableBytes();/* 將所有數(shù)據(jù)前移 */std::copy(begin()+readerIndex_,begin()+writerIndex_,begin()+kCheapPrepend);/* 更新兩個(gè)指針(下標(biāo)) */readerIndex_ = kCheapPrepend;writerIndex_ = readerIndex_ + readable;assert(readable == readableBytes());}}

此時(shí)應(yīng)用層讀緩沖區(qū)從內(nèi)核中讀取數(shù)據(jù)完成,在用戶可讀的回調(diào)函數(shù)中(在readFd函數(shù)執(zhí)行完調(diào)用),用戶可以調(diào)用Buffer的接口從緩沖區(qū)中讀取數(shù)據(jù),程序示例如下

這是用戶提供給TcpServer的可讀時(shí)的回調(diào)函數(shù),又由TcpServer提供給TcpConnection,當(dāng)TcpConnection的讀緩沖區(qū)執(zhí)行完readFd返回后,會(huì)執(zhí)行用戶的回調(diào)函數(shù),圖片程序來自muduo的測(cè)試用例。

可以看到

  • buf->readableBytes()返回緩沖區(qū)中可讀字節(jié)數(shù)
  • conn->name()返回TcpConnection的名字(由TcpServer設(shè)置)
  • receiveTime是poll函數(shù)返回的時(shí)間,一直作為參數(shù)傳到Channel,TcpConnection,onMessage
  • buf->retrieveAsString()讀取緩沖區(qū)所有數(shù)據(jù)
/* 從緩沖區(qū)中讀取所有數(shù)據(jù) */string retrieveAllAsString(){return retrieveAsString(readableBytes());}/* 從緩沖區(qū)中讀取len個(gè)字節(jié)的數(shù)據(jù) */string retrieveAsString(size_t len){assert(len <= readableBytes());/* peek返回?cái)?shù)據(jù)的起點(diǎn) *//* 調(diào)用string(const char* s, size_type n);構(gòu)造函數(shù),初始化為從地址s開始的n個(gè)字節(jié) */string result(peek(), len);/* 調(diào)整緩沖區(qū),即改變r(jià)eaderIndex的位置,后移len */retrieve(len);return result;}

這兩個(gè)函數(shù)從讀緩沖區(qū)中讀取數(shù)據(jù),一個(gè)是全讀,一個(gè)是讀取指定字節(jié)個(gè)數(shù)的數(shù)據(jù),讀完之后,緩沖區(qū)需要調(diào)整readerIndex位置以指向新的數(shù)據(jù)起點(diǎn)

/* 調(diào)整readerIndex,后移len */void retrieve(size_t len){assert(len <= readableBytes());/* * 如果調(diào)整后仍然有數(shù)據(jù),就將readerIndex增加len* 如果已經(jīng)將數(shù)據(jù)全部讀完(len >= readableBytes),那么就初始化readerIndex/writerIndex位置*/if (len < readableBytes()){readerIndex_ += len;}else{retrieveAll();}}

如果數(shù)據(jù)全部被用戶讀出,就重新調(diào)整readerIndex/writerIndex位置

/* 初始化readerIndex/writerIndex位置,通常在用戶將數(shù)據(jù)全部讀出之后執(zhí)行 */void retrieveAll(){readerIndex_ = kCheapPrepend;writerIndex_ = kCheapPrepend;}

TcpConnection的寫操作

發(fā)送數(shù)據(jù)使用的是寫緩沖區(qū),當(dāng)內(nèi)核tcp緩沖區(qū)空間不足時(shí),會(huì)把數(shù)據(jù)寫到寫緩沖區(qū),由寫緩沖區(qū)在合適的時(shí)機(jī)寫入內(nèi)核tcp緩沖區(qū),合適的時(shí)機(jī)指內(nèi)核tcp緩沖區(qū)有多余空間時(shí)。
但是怎樣才能直到內(nèi)核tcp緩沖區(qū)有多余的空間呢,通過監(jiān)聽可寫事件即可。
但是如果內(nèi)核tcp緩沖區(qū)一直不滿,那么就一直可寫,就會(huì)一直觸發(fā)poll,導(dǎo)致busy loop,所以muduo只有在需要的時(shí)候才會(huì)檢測(cè)內(nèi)核tcp緩沖區(qū)的可寫事件,即只有當(dāng)tcp緩沖區(qū)已滿,但是寫緩沖區(qū)中有數(shù)據(jù)等待寫入tcp緩沖區(qū)時(shí)才會(huì)監(jiān)聽。

不同于讀取數(shù)據(jù)的是,發(fā)送數(shù)據(jù)使用的是TcpConnection提供的接口,而不是直接向Buffer中寫。

/* 幾個(gè)重載的send函數(shù),用于用戶想要發(fā)送數(shù)據(jù)到對(duì)端 */ void TcpConnection::send(const void* data, int len) {send(StringPiece(static_cast<const char*>(data), len)); }void TcpConnection::send(const StringPiece& message) {if (state_ == kConnected){/* * 如果當(dāng)前線程和TcpConnection所屬線程相同,直接在當(dāng)前線程發(fā)送* 否則,需要使用std::bind綁定函數(shù)和對(duì)象,并添加到自己所在線程的事件循環(huán)中*/if (loop_->isInLoopThread()){sendInLoop(message);}else{/* 可以直接在bind中綁定函數(shù) ? */void (TcpConnection::*fp)(const StringPiece& message) = &TcpConnection::sendInLoop;loop_->runInLoop(std::bind(fp,this, // FIXMEmessage.as_string()));//std::forward<string>(message)));}} }

send函數(shù)調(diào)用sendInLoop函數(shù),保證在TcpConnection所屬線程發(fā)送數(shù)據(jù)

  • 發(fā)送時(shí)會(huì)先判斷寫緩沖區(qū)是否已經(jīng)有數(shù)據(jù)存在,如果有,就不能直接向tcp緩沖區(qū)寫了,因?yàn)閿?shù)據(jù)要有順序的發(fā)送,所以需要追加到寫緩沖區(qū)中
  • 如果寫緩沖區(qū)中沒有數(shù)據(jù),就可以嘗試向tcp緩沖區(qū)寫數(shù)據(jù),如果全部寫入,當(dāng)然很happy,但是如果只寫入一部分或者一點(diǎn)也沒寫進(jìn)去(tcp緩沖區(qū)已滿),就需要添加到寫緩沖區(qū)中,同時(shí)開啟對(duì)tcp緩沖區(qū)(其實(shí)就是用于通信的套接字)的可寫事件的監(jiān)聽,等待tcp緩沖區(qū)可寫
void TcpConnection::sendInLoop(const StringPiece& message) {sendInLoop(message.data(), message.size()); }/* * 寫入數(shù)據(jù)* 1.如果Channel沒有監(jiān)聽可寫事件且輸出緩沖區(qū)為空,說明之前沒有出現(xiàn)內(nèi)核緩沖區(qū)滿的情況,直接寫進(jìn)內(nèi)核* 2.如果寫入內(nèi)核出錯(cuò),且出錯(cuò)信息(errno)是EWOULDBLOCK,說明內(nèi)核緩沖區(qū)滿,將剩余部分添加到應(yīng)用層輸出緩沖區(qū)* 3.如果之前輸出緩沖區(qū)為空,那么就沒有監(jiān)聽內(nèi)核緩沖區(qū)(fd)可寫事件,開始監(jiān)聽*/ void TcpConnection::sendInLoop(const void* data, size_t len) {loop_->assertInLoopThread();/* 寫入tcp緩沖區(qū)的字節(jié)數(shù) */ssize_t nwrote = 0;/* 沒有寫入tcp緩沖區(qū)的字節(jié)數(shù) */size_t remaining = len;/* 調(diào)用write時(shí)是否出錯(cuò) */bool faultError = false;/* 當(dāng)前TcpConnection狀態(tài),TcpConnection有四種狀態(tài),kDisconnected表示已經(jīng)斷開連接,不能再寫了,直接返回 */if (state_ == kDisconnected){LOG_WARN << "disconnected, give up writing";return;}// if no thing in output queue, try writing directly/* 如果輸出緩沖區(qū)有數(shù)據(jù),就不能嘗試發(fā)送數(shù)據(jù)了,否則數(shù)據(jù)會(huì)亂,應(yīng)該直接寫到緩沖區(qū)中 */if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0){/* 讀取函數(shù) */nwrote = sockets::write(channel_->fd(), data, len);if (nwrote >= 0){/* 寫入了一些數(shù)據(jù) */remaining = len - nwrote;/* * 完全寫入tcp緩沖區(qū),且用戶有提供寫數(shù)據(jù)的回調(diào)函數(shù),等待執(zhí)行完后調(diào)用* 因?yàn)楫?dāng)前TcpConnection和EventLoop所在同一個(gè)線程,* 而且此時(shí)EventLoop通常處在正在處理激活Channel的過程中(當(dāng)前函數(shù)有可能也是在這個(gè)過程)* 所以等待這個(gè)函數(shù)執(zhí)行完再調(diào)用回調(diào)函數(shù)*/if (remaining == 0 && writeCompleteCallback_){loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));}}else // nwrote < 0{/* 一點(diǎn)也沒寫進(jìn)去* 如果錯(cuò)誤為EWOULDBLOCK,表明tcp緩沖區(qū)已滿*/nwrote = 0;if (errno != EWOULDBLOCK){/* EPIPE表示客戶端已經(jīng)關(guān)閉了連接,服務(wù)器仍然嘗試寫入,就會(huì)出現(xiàn)EPIPE */LOG_SYSERR << "TcpConnection::sendInLoop";if (errno == EPIPE || errno == ECONNRESET) // FIXME: any others?{faultError = true;}}}}assert(remaining <= len);/* 沒出錯(cuò),且仍有一些數(shù)據(jù)沒有寫到tcp緩沖區(qū)中,那么就添加到寫緩沖區(qū)中 */if (!faultError && remaining > 0){/* 獲取寫緩沖區(qū)數(shù)據(jù)總量 */size_t oldLen = outputBuffer_.readableBytes();/* 到達(dá)高水位,調(diào)用回調(diào)函數(shù),這個(gè)函數(shù)沒有設(shè)置? */if (oldLen + remaining >= highWaterMark_&& oldLen < highWaterMark_&& highWaterMarkCallback_){loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));}/* 把沒有寫完的數(shù)據(jù)追加到輸出緩沖區(qū)中,然后開啟對(duì)可寫事件的監(jiān)聽(如果之前沒開的話) */outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);if (!channel_->isWriting()){channel_->enableWriting();}} }

如果tcp緩沖區(qū)不足以全部容納數(shù)據(jù),就會(huì)開啟對(duì)可寫事件的監(jiān)聽,當(dāng)tcp緩沖區(qū)可寫,就調(diào)用Channel的回調(diào)函數(shù),這個(gè)回調(diào)函數(shù)也是在TcpConnection構(gòu)造函數(shù)中傳給Channel的

channel_->setWriteCallback(std::bind(&TcpConnection::handleWrite, this)); /* 當(dāng)tcp緩沖區(qū)可寫時(shí)調(diào)用 */ void TcpConnection::handleWrite() {loop_->assertInLoopThread();if (channel_->isWriting()){/* 嘗試寫入寫緩沖區(qū)的所有數(shù)據(jù),返回實(shí)際寫入的字節(jié)數(shù)(tcp緩沖區(qū)很有可能仍然不能容納所有數(shù)據(jù)) */ssize_t n = sockets::write(channel_->fd(),outputBuffer_.peek(),outputBuffer_.readableBytes());if (n > 0){/* 調(diào)整寫緩沖區(qū)的readerIndex */outputBuffer_.retrieve(n);if (outputBuffer_.readableBytes() == 0){/* 全部寫到tcp緩沖區(qū)中,關(guān)閉對(duì)可寫事件的監(jiān)聽 */channel_->disableWriting();/* 如果有寫入完成時(shí)的回調(diào)函數(shù)(用戶提供,則等待函數(shù)結(jié)束后調(diào)用 */if (writeCompleteCallback_){loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));}/* * 如果連接正在關(guān)閉(通常關(guān)閉讀端),那么關(guān)閉寫端,但是是在已經(jīng)寫完的前提下* 如果還有數(shù)據(jù)沒有寫完,不能關(guān)閉,要在寫完再關(guān) */if (state_ == kDisconnecting){shutdownInLoop();}}}else{LOG_SYSERR << "TcpConnection::handleWrite";// if (state_ == kDisconnecting)// {// shutdownInLoop();// }}}else{LOG_TRACE << "Connection fd = " << channel_->fd()<< " is down, no more writing";} }

這里的細(xì)節(jié)問題就是如果想要關(guān)閉連接,那么通常是先關(guān)閉讀端,等到將寫緩沖區(qū)所有數(shù)據(jù)都寫到tcp緩沖區(qū)后,再關(guān)閉寫端,否則這些數(shù)據(jù)就不能發(fā)送給對(duì)端了
muduo沒有提供close函數(shù),關(guān)閉是分兩步進(jìn)行的(使用shutdown而不適用close),這樣更容易控制
handleWrite函數(shù)中調(diào)用的shutdownInLoop函數(shù)如下,用于關(guān)閉寫端

void TcpConnection::shutdownInLoop() {loop_->assertInLoopThread();if (!channel_->isWriting()){// we are not writingsocket_->shutdownWrite();} }

至此發(fā)送數(shù)據(jù)的操作完成,所以數(shù)據(jù)都在tcp緩沖區(qū)中等待著或正在運(yùn)往對(duì)端(客戶端)

總結(jié)

以上是生活随笔為你收集整理的muduo网络库学习(六)缓冲区Buffer及TcpConnection的读写操作的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。