日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

muduo网络库学习(五)服务器监听类Acceptor及Tcp连接TcpConnection的建立与关闭

發(fā)布時間:2024/4/19 编程问答 42 豆豆
生活随笔 收集整理的這篇文章主要介紹了 muduo网络库学习(五)服务器监听类Acceptor及Tcp连接TcpConnection的建立与关闭 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

通常服務(wù)器在處理客戶端連接請求時,為了不阻塞在accept函數(shù)上,會將監(jiān)聽套接字注冊到io復(fù)用函數(shù)中,當(dāng)客戶端請求連接時,監(jiān)聽套接字變?yōu)榭勺x,隨后在回調(diào)函數(shù)調(diào)用accept接收客戶端連接。muduo將這一部分封裝成了Acceptor類,用于執(zhí)行接收客戶端請求的任務(wù)。


類的定義如下,主要就是監(jiān)聽套接字變?yōu)榭勺x的回調(diào)函數(shù)

class EventLoop; class InetAddress;/// /// Acceptor of incoming TCP connections. /// /* * 對TCP socket, bind, listen, accept的封裝 * 將sockfd以Channel的形式注冊到EventLoop的Poller中,檢測到sockfd可讀時,接收客戶端*/ class Acceptor : noncopyable {public:typedef std::function<void (int sockfd, const InetAddress&)> NewConnectionCallback;Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport);~Acceptor();/* 由服務(wù)器TcpServer設(shè)置的回調(diào)函數(shù),在接收完客戶端請求后執(zhí)行,用于創(chuàng)建TcpConnection */void setNewConnectionCallback(const NewConnectionCallback& cb){ newConnectionCallback_ = cb; }bool listenning() const { return listenning_; }/* 調(diào)用listen函數(shù),轉(zhuǎn)為監(jiān)聽套接字,同時將監(jiān)聽套接字添加到Poller中 */void listen();private:/* 回調(diào)函數(shù),當(dāng)有客戶端請求連接時執(zhí)行(監(jiān)聽套接字變?yōu)榭勺x) */void handleRead();/* 事件驅(qū)動主循環(huán) */EventLoop* loop_;/* 封裝socket的一些接口 */Socket acceptSocket_;/* Channel,保存著sockfd,被添加到Poller中,等待被激活 */Channel acceptChannel_;/* * 當(dāng)有客戶端連接時首先內(nèi)部接收連接,然后調(diào)用的用戶提供的回調(diào)函數(shù)* 客戶端套接字和地址作為參數(shù)傳入*/NewConnectionCallback newConnectionCallback_;bool listenning_;/* * Tcp連接建立的流程* 1.服務(wù)器調(diào)用socket,bind,listen開啟監(jiān)聽套接字監(jiān)聽客戶端請求* 2.客戶端調(diào)用socket,connect連接到服務(wù)器* 3.第一次握手客戶端發(fā)送SYN請求分節(jié)(數(shù)據(jù)序列號)* 4.服務(wù)器接收SYN后保存在本地然后發(fā)送自己的SYN分節(jié)(數(shù)據(jù)序列號)和ACK確認(rèn)分節(jié)告知客戶端已收到* 同時開啟第二次握手* 5.客戶端接收到服務(wù)器的SYN分節(jié)和ACK確認(rèn)分節(jié)后保存在本地然后發(fā)送ACK確認(rèn)分節(jié)告知服務(wù)器已收到* 此時第二次握手完成,客戶端connect返回* 此時,tcp連接已經(jīng)建立完成,客戶端tcp狀態(tài)轉(zhuǎn)為ESTABLISHED,而在服務(wù)器端,新建的連接保存在內(nèi)核tcp* 連接的隊(duì)列中,此時服務(wù)器端監(jiān)聽套接字變?yōu)榭勺x,等待服務(wù)器調(diào)用accept函數(shù)取出這個連接* 6.服務(wù)器接收到客戶端發(fā)來的ACK確認(rèn)分節(jié),服務(wù)器端調(diào)用accept嘗試找到一個空閑的文件描述符,然后* 從內(nèi)核tcp連接隊(duì)列中取出第一個tcp連接,分配這個文件描述符用于這個tcp連接* 此時服務(wù)器端tcp轉(zhuǎn)為ESTABLISHED,三次握手完成,tcp連接建立* * 服務(wù)器啟動時占用的一個空閑文件描述符,/dev/null,作用是解決文件描述符耗盡的情況* 原理如下:* 當(dāng)服務(wù)器端文件描述符耗盡,當(dāng)客戶端再次請求連接,服務(wù)器端由于沒有可用文件描述符* 會返回-1,同時errno為EMFILE,意為描述符到達(dá)hard limit,無可用描述符,此時服務(wù)器端* accept函數(shù)在獲取一個空閑文件描述符時就已經(jīng)失敗,還沒有從內(nèi)核tcp連接隊(duì)列中取出tcp連接* 這會導(dǎo)致監(jiān)聽套接字一直可讀,因?yàn)閠cp連接隊(duì)列中一直有客戶端的連接請求* * 所以服務(wù)器在啟動時打開一個空閑描述符/dev/null(文件描述符),先站著'坑‘,當(dāng)出現(xiàn)上面* 情況accept返回-1時,服務(wù)器暫時關(guān)閉idleFd_讓出'坑',此時就會多出一個空閑描述符* 然后再次調(diào)用accept接收客戶端請求,然后close接收后的客戶端套接字,優(yōu)雅的告訴* 客戶端關(guān)閉連接,然后再將'坑'占上*/int idleFd_;};

一個不好理解的變量是idleFd_;,它是一個文件描述符,這里是打開"/dev/null"文件后返回的描述符,用于解決服務(wù)器端描述符耗盡的情況。
如果當(dāng)服務(wù)器文件描述符耗盡后,服務(wù)器端accept還沒等從tcp連接隊(duì)列中取出連接請求就已經(jīng)失敗返回了,此時內(nèi)核tcp隊(duì)列中一直有客戶端請求,內(nèi)核會一直通知監(jiān)聽套接字,導(dǎo)致監(jiān)聽套接字一直處于可讀,在下次直接poll函數(shù)時會直接返回。
解決的辦法就是在服務(wù)器剛啟動時就預(yù)先占用一個文件描述符,通常可以是打開一個文件,這里是"/dev/null"。此時服務(wù)器就有一個空閑的文件描述符了,當(dāng)出現(xiàn)上述情況無法取得tcp連接隊(duì)列中的請求時,先關(guān)閉這個文件讓出一個文件描述符,此時調(diào)用accept函數(shù)再次接收,由于已經(jīng)有一個空閑的文件描述符了,accept會正常返回,將連接請求從tcp隊(duì)列中取出,然后優(yōu)雅的關(guān)閉這個tcp連接(調(diào)用close函數(shù)),最后再打開"/dev/null"這個文件把”坑“占住。


成員函數(shù)的實(shí)現(xiàn)也有比較重點(diǎn)的地方,首先是構(gòu)造函數(shù)

Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport): loop_(loop),acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family())),acceptChannel_(loop, acceptSocket_.fd()),listenning_(false),idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC)) {assert(idleFd_ >= 0);/* * setsockopt設(shè)置套接字選項(xiàng)SO_REUSEADDR,對于端口bind,如果這個地址/端口處于TIME_WAIT,也可bind成功* int flag = 1;* setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag));*/acceptSocket_.setReuseAddr(true);/** setsockopt設(shè)置套接字選項(xiàng)SO_REUSEPORT,作用是對于多核cpu,允許在同一個<ip, port>對上運(yùn)行多個相同服務(wù)器* 內(nèi)核會采用負(fù)載均衡的的方式分配客戶端的連接請求給某一個服務(wù)器*/acceptSocket_.setReusePort(reuseport);acceptSocket_.bindAddress(listenAddr);/* Channel設(shè)置讀事件的回調(diào)函數(shù),此時還沒有開始監(jiān)聽這個Channel,需要調(diào)用Channel::enableReading() */acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this)); }

構(gòu)造函數(shù)中為用于監(jiān)聽的套接字設(shè)置了SO_REUSEPORT和SO_REUSEADDR屬性,一個是端口重用,一個是地址重用。

  • SO_REUSEPORT,端口重用,可以使用還處于TIME_WAIT狀態(tài)的端口
  • SO_REUSEADDR,地址重用,服務(wù)器可以同時建立多個用于監(jiān)聽的socket,每個socket綁定的地址端口都相同,內(nèi)核會采用負(fù)載均衡的方法將每個將每個客戶端請求分配給某一個socket,可以很大程序的提高并發(fā)性,充分利用CPU資源

acceptChannel_用于保存這個用于監(jiān)聽的套接字,綁定回調(diào)函數(shù),在合適的時機(jī)注冊到Poller上(調(diào)用listen時)

void Acceptor::listen() {loop_->assertInLoopThread();listenning_ = true;acceptSocket_.listen();/* * 開始監(jiān)聽Channel,也就是設(shè)置fd關(guān)心的事件(EPOLLIN/EPOLLOUT等),然后添加到Poller中 * Poller中保存著所有注冊到EventLoop中的Channel*/acceptChannel_.enableReading(); }

比較重要的是事件處理函數(shù),當(dāng)監(jiān)聽套接字可讀時,調(diào)用accept接收客戶端請求,如果描述符耗盡,釋放idleFd_重新accept,然后關(guān)閉,再占用idleFd_

/** 當(dāng)有客戶端嘗試連接服務(wù)器時,監(jiān)聽套接字變?yōu)榭勺x,epoll_wait/poll返回* EventLoop處理激活隊(duì)列中的Channel,調(diào)用對應(yīng)的回調(diào)函數(shù)* 監(jiān)聽套接字的Channel的回調(diào)函數(shù)是handleRead(),用于接收客戶端請求*/ void Acceptor::handleRead() {loop_->assertInLoopThread();InetAddress peerAddr;//FIXME loop until no moreint connfd = acceptSocket_.accept(&peerAddr);if (connfd >= 0){// string hostport = peerAddr.toIpPort();// LOG_TRACE << "Accepts of " << hostport;/* * 如果設(shè)置了回調(diào)函數(shù),那么就調(diào)用,參數(shù)是客戶端套接字和地址/端口* 否則就關(guān)閉連接,因?yàn)椴]有要處理客戶端的意思* * 這個回調(diào)函數(shù)是TcpServer中的newConnection,用于創(chuàng)建一個TcpConnection連接*/if (newConnectionCallback_){newConnectionCallback_(connfd, peerAddr);}else{sockets::close(connfd);}}else{LOG_SYSERR << "in Acceptor::handleRead";// Read the section named "The special problem of// accept()ing when you can't" in libev's doc.// By Marc Lehmann, author of libev.// /* 解決服務(wù)器端描述符耗盡的情況,原因見.h文件 */if (errno == EMFILE){::close(idleFd_);idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);::close(idleFd_);idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);}} }

對文件描述符耗盡的處理比較重要,以前沒怎么接觸過


回調(diào)函數(shù)中調(diào)用的newConnectionCallback_函數(shù)是在Acceptor創(chuàng)建之初由TcpServer設(shè)置的(TcpServer表示服務(wù)器,內(nèi)有一個監(jiān)聽類Acceptor),這個函數(shù)主要用于初始化一個TcpConnection,一個TcpConnection對象代表著一個tcp連接

TcpConnection的定義主要都是寫set*函數(shù),成員變量比較多,但是重要的就

  • 事件驅(qū)動循環(huán)loop_
  • 用于tcp通信的socket_
  • 用于監(jiān)聽sockfd的channel_
  • 輸入輸出緩沖區(qū)inputBuffer_/outputBuffer_
  • 由TcpServer提供的各種回調(diào)函數(shù)
/* 事件驅(qū)動循環(huán) */EventLoop* loop_;/* 每個tcp連接有一個獨(dú)一無二的名字,建立連接時由TcpServer傳入 */const string name_;StateE state_; // FIXME: use atomic variablebool reading_;// we don't expose those classes to client./* 用于tcp連接的套接字,以及用于監(jiān)聽套接字的Channel */std::unique_ptr<Socket> socket_;std::unique_ptr<Channel> channel_;/* 本地<地址,端口>,客戶端<地址,端口>,由TcpServer傳入 */const InetAddress localAddr_;const InetAddress peerAddr_;/* 連接建立后/關(guān)閉后的回調(diào)函數(shù),通常是由用戶提供給TcpServer,然后TcpServer提供給TcpConnection */ConnectionCallback connectionCallback_;/* 當(dāng)tcp連接有消息通信時執(zhí)行的回調(diào)函數(shù),也是由用戶提供 */MessageCallback messageCallback_;/* * 寫入tcp緩沖區(qū)之后的回調(diào)函數(shù)* 通常是tcp緩沖區(qū)滿然后添加到應(yīng)用層緩沖區(qū)后,由應(yīng)用層緩沖區(qū)寫入內(nèi)核tcp緩沖區(qū)* 后執(zhí)行,一般用戶不關(guān)系這部分*/WriteCompleteCallback writeCompleteCallback_;/* 高水位回調(diào),設(shè)定緩沖區(qū)接收大小,如果應(yīng)用層緩沖區(qū)堆積的數(shù)據(jù)大于某個給定值時調(diào)用 */HighWaterMarkCallback highWaterMarkCallback_;/* * tcp連接關(guān)閉時調(diào)用的回調(diào)函數(shù),由TcpServer設(shè)置,用于TcpServer將這個要關(guān)閉的TcpConnection從* 保存著所有TcpConnection的map中刪除* 這個回調(diào)函數(shù)和TcpConnection自己的handleClose不同,后者是提供給Channel的,函數(shù)中會使用到* closeCallback_*/CloseCallback closeCallback_;/* 高水位值 */size_t highWaterMark_;/* 輸入輸出緩沖區(qū) */Buffer inputBuffer_;Buffer outputBuffer_; // FIXME: use list<Buffer> as output buffer.boost::any context_;// FIXME: creationTime_, lastReceiveTime_// bytesReceived_, bytesSent_

首先是構(gòu)造函數(shù)的實(shí)現(xiàn),主要是為Channel提供各種回調(diào)函數(shù)

/* * 構(gòu)造函數(shù),設(shè)置當(dāng)fd就緒時調(diào)用的回調(diào)函數(shù)* Channel代表一個對fd事件的監(jiān)聽*/ TcpConnection::TcpConnection(EventLoop* loop,const string& nameArg,int sockfd,const InetAddress& localAddr,const InetAddress& peerAddr): loop_(CHECK_NOTNULL(loop)),name_(nameArg),state_(kConnecting),reading_(true),socket_(new Socket(sockfd)),channel_(new Channel(loop, sockfd)),localAddr_(localAddr),peerAddr_(peerAddr),highWaterMark_(64*1024*1024) {/* 設(shè)置各種回調(diào)函數(shù) */channel_->setReadCallback(std::bind(&TcpConnection::handleRead, this, _1));channel_->setWriteCallback(std::bind(&TcpConnection::handleWrite, this));channel_->setCloseCallback(std::bind(&TcpConnection::handleClose, this));channel_->setErrorCallback(std::bind(&TcpConnection::handleError, this));LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this<< " fd=" << sockfd;/** 設(shè)置KEEP-ALIVE屬性,如果客戶端很久沒有和服務(wù)器通訊,tcp會自動判斷客戶端是否還處于連接(類似心跳包)* * int setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &sockopt, static_cast<socklen_t>(sizeof(sockopt)));*/socket_->setKeepAlive(true); }

回調(diào)函數(shù)的設(shè)置對應(yīng)了Channel的hanleEvent函數(shù)中根據(jù)不同激活原因調(diào)用不同回調(diào)函數(shù)(handleEvent調(diào)用handleEventWithGuard)。
另外,handleEvent中的tie_是對TcpConnection的弱引用(在后面設(shè)置),因?yàn)榛卣{(diào)函數(shù)都是TcpConnection的,所以在調(diào)用之前需要確保TcpConnection沒有被銷毀,所以將tie_提升為shared_ptr判斷TcpConnection是否還存在,之后再調(diào)用TcpConnection的一系列回調(diào)函數(shù)

/** 根據(jù)fd激活事件的不同,調(diào)用不同的fd的回調(diào)函數(shù)*/ void Channel::handleEvent(Timestamp receiveTime) {/* * RAII,對象管理資源* weak_ptr使用lock提升成shared_ptr,此時引用計(jì)數(shù)加一* 函數(shù)返回,棧空間對象銷毀,提升的shared_ptr guard銷毀,引用計(jì)數(shù)減一*/std::shared_ptr<void> guard;if (tied_){guard = tie_.lock();if (guard){handleEventWithGuard(receiveTime);}}else{handleEventWithGuard(receiveTime);} }/** 根據(jù)被激活事件的不同,調(diào)用不同的回調(diào)函數(shù)*/ void Channel::handleEventWithGuard(Timestamp receiveTime) {eventHandling_ = true;LOG_TRACE << reventsToString();if ((revents_ & POLLHUP) && !(revents_ & POLLIN)){if (logHup_){LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLHUP";}if (closeCallback_) closeCallback_();}if (revents_ & POLLNVAL){LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLNVAL";}if (revents_ & (POLLERR | POLLNVAL)){if (errorCallback_) errorCallback_();}if (revents_ & (POLLIN | POLLPRI | POLLRDHUP)){if (readCallback_) readCallback_(receiveTime);}if (revents_ & POLLOUT){if (writeCallback_) writeCallback_();}eventHandling_ = false; }

當(dāng)TcpServer創(chuàng)建完TcpConnection后,會設(shè)置各種會調(diào)用書,然后調(diào)用TcpConnection的connectEstablished函數(shù),主要用于將Channel添加到Poller中,同時調(diào)用用戶提供的連接建立成功后的回調(diào)函數(shù)
TcpServer創(chuàng)建并設(shè)置TcpConnection的部分,可以看到,TcpServer會將用戶提供的所有回調(diào)函數(shù)都傳給TcpConnection,然后執(zhí)行TcpConnection的connectEstablished函數(shù),這個函數(shù)的執(zhí)行要放到它所屬的那個事件驅(qū)動循環(huán)線程做,不要阻塞TcpServer線程(這個地方不是為了線程安全性考慮,因?yàn)門cpConnection本身就是在TcpServer線程創(chuàng)建的,暴露給TcpServer線程很正常,而且TcpServer中也記錄著所有創(chuàng)建的TcpConnection,這里的主要目的是不阻塞TcpServer線程,讓它繼續(xù)監(jiān)聽客戶端請求)

/* ... *//* 從事件驅(qū)動線程池中取出一個線程給TcpConnection */EventLoop* ioLoop = threadPool_->getNextLoop();/* ... *//* 創(chuàng)建一個新的TcpConnection代表一個Tcp連接 */TcpConnectionPtr conn(new TcpConnection(ioLoop,connName,sockfd,localAddr,peerAddr));/* 添加到所有tcp連接的map中,鍵是tcp連接獨(dú)特的名字(服務(wù)器名+客戶端<地址,端口>) */connections_[connName] = conn;/* 為tcp連接設(shè)置回調(diào)函數(shù)(由用戶提供) */conn->setConnectionCallback(connectionCallback_);conn->setMessageCallback(messageCallback_);conn->setWriteCompleteCallback(writeCompleteCallback_);/* * 關(guān)閉回調(diào)函數(shù),由TcpServer設(shè)置,作用是將這個關(guān)閉的TcpConnection從map中刪除* 當(dāng)poll返回后,發(fā)現(xiàn)被激活的原因是EPOLLHUP,此時需要關(guān)閉tcp連接* 調(diào)用Channel的CloseCallback,進(jìn)而調(diào)用TcpConnection的handleClose,進(jìn)而調(diào)用removeConnection*/conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe/* * 連接建立后,調(diào)用TcpConnection連接建立成功的回調(diào)函數(shù),這個函數(shù)會調(diào)用用戶提供的回調(diào)函數(shù)* 1.新建的TcpConnection所在事件循環(huán)是在事件循環(huán)線程池中的某個線程* 2.所以TcpConnection也就屬于它所在的事件驅(qū)動循環(huán)所在的那個線程* 3.調(diào)用TcpConnection的函數(shù)時也就應(yīng)該在自己所在線程調(diào)用* 4.所以需要調(diào)用runInLoop在自己的那個事件驅(qū)動循環(huán)所在線程調(diào)用這個函數(shù)*/ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));

connectEstablished函數(shù)如下

/* * 1.創(chuàng)建服務(wù)器(TcpServer)時,創(chuàng)建Acceptor,設(shè)置接收到客戶端請求后執(zhí)行的回調(diào)函數(shù)* 2.Acceptor創(chuàng)建監(jiān)聽套接字,將監(jiān)聽套接字綁定到一個Channel中,設(shè)置可讀回調(diào)函數(shù)為Acceptor的handleRead* 3.服務(wù)器啟動,調(diào)用Acceptor的listen函數(shù)創(chuàng)建監(jiān)聽套接字,同時將Channel添加到Poller中* 4.有客戶端請求連接,監(jiān)聽套接字可讀,Channel被激活,調(diào)用可讀回調(diào)函數(shù)(handleRead)* 5.回調(diào)函數(shù)接收客戶端請求,獲得客戶端套接字和地址,調(diào)用TcpServer提供的回調(diào)函數(shù)(newConnection)* 6.TcpServer的回調(diào)函數(shù)中創(chuàng)建TcpConnection代表這個tcp連接,設(shè)置tcp連接各種回調(diào)函數(shù)(由用戶提供給TcpServer)* 7.TcpServer讓tcp連接所屬線程調(diào)用TcpConnection的connectEstablished* 8.connectEstablished開啟對客戶端套接字的Channel的可讀監(jiān)聽,然后調(diào)用用戶提供的回調(diào)函數(shù)*/ void TcpConnection::connectEstablished() {loop_->assertInLoopThread();assert(state_ == kConnecting);setState(kConnected);/* Channel中對TcpConnection的弱引用在這里設(shè)置 */channel_->tie(shared_from_this());/* 設(shè)置對可讀事件的監(jiān)聽,同時將Channel添加到Poller中 */channel_->enableReading();/* 用戶提供的回調(diào)函數(shù),在連接建立成功后調(diào)用 */connectionCallback_(shared_from_this()); }

至此tcp連接建立完成,在用戶提供的回調(diào)函數(shù)中,傳入的參數(shù)便是這個TcpConnection的shared_ptr,用戶可以使用TcpConnection::send操作向客戶端發(fā)送消息(放到后面)


有連接的建立就有連接的關(guān)閉,當(dāng)客戶端主動關(guān)閉(調(diào)用close)時,服務(wù)器端對應(yīng)的Channel被激活,激活原因?yàn)镋POLLHUP,表示連接已關(guān)閉,此時會調(diào)用TcpConnection的回調(diào)函數(shù)handleClose,在這個函數(shù)中,TcpConnection處理執(zhí)行各種關(guān)閉動作,包括

  • 將Channel從Poller中移除
  • 調(diào)用TcpServer提供的關(guān)閉回調(diào)函數(shù),將自己從TcpServer的tcp連接map中移除
  • 調(diào)用客戶提供的關(guān)閉回調(diào)函數(shù)(如果有的話)
void TcpConnection::handleClose() {loop_->assertInLoopThread();LOG_TRACE << "fd = " << channel_->fd() << " state = " << stateToString();assert(state_ == kConnected || state_ == kDisconnecting);// we don't close fd, leave it to dtor, so we can find leaks easily.setState(kDisconnected);channel_->disableAll();/* 此時當(dāng)前的TcpConnection的引用計(jì)數(shù)為2,一個是guardThis,另一個在TcpServer的connections_中 */TcpConnectionPtr guardThis(shared_from_this());connectionCallback_(guardThis);// must be the last line/* * closeCallback返回后,TcpServer的connections_(tcp連接map)已經(jīng)將TcpConnection刪除,引用計(jì)數(shù)變?yōu)?* 此時如果函數(shù)返回,guardThis也會被銷毀,引用計(jì)數(shù)變?yōu)?,這個TcpConnection就會被銷毀* 所以在TcpServer::removeConnectionInLoop使用bind將TcpConnection生命期延長,引用計(jì)數(shù)加一,變?yōu)?* 就算guardThis銷毀,引用計(jì)數(shù)仍然有1個* 等到調(diào)用完connectDestroyed后,bind綁定的TcpConnection也會被銷毀,引用計(jì)數(shù)為0,TcpConnection析構(gòu)*/closeCallback_(guardThis); }

connectionCallback_是由用戶提供的,連接建立/關(guān)閉時調(diào)用,主要調(diào)用closeCallback_函數(shù)(由TcpServer)提供
這個函數(shù)主要就存在線程不安全的問題,原因就是此時的線程是TcpConnection所在線程
函數(shù)執(zhí)行順序?yàn)?#xff1a;
EventLoop::loop->Poller::poll->Channel::handleEvent->TcpConnection::handleClose->TcpServer::removeConnection
此時就將TcpServer暴露給其他線程,導(dǎo)致線程不安全的問題
為了減輕線程不安全帶來的危險(xiǎn),盡量將線程不安全的函數(shù)縮短,muduo中使用runInLoop直接將要調(diào)用的函數(shù)放到自己線程執(zhí)行,轉(zhuǎn)換到線程安全,所以這部分只有這一條語句是線程不安全的

void TcpServer::removeConnection(const TcpConnectionPtr& conn) {// FIXME: unsafe/* * 在TcpConnection所在的事件驅(qū)動循環(huán)所在的線程執(zhí)行刪除工作* 因?yàn)樾枰僮鱐cpServer::connections_,就需要傳TcpServer的this指針到TcpConnection所在線程* 會導(dǎo)致將TcpServer暴露給TcpConnection線程,也不具有線程安全性* * TcpConnection所在線程:在創(chuàng)建時從事件驅(qū)動循環(huán)線程池中選擇的某個事件驅(qū)動循環(huán)線程* TcpServer所在線程:事件驅(qū)動循環(huán)線程池所在線程,不在線程池中* * 1.調(diào)用這個函數(shù)的線程是TcpConnection所在線程,因?yàn)樗患せ?#xff0c;然后調(diào)用回調(diào)函數(shù),都是在自己線程執(zhí)行的* 2.而removeConnection的調(diào)用者TcpServer的this指針如今在TcpConnection所在線程* 3.如果這個線程把this指針delele了,或者改了什么東西,那么TcpServer所在線程就會出錯* 4.所以不安全* * 為什么不在TcpServer所在線程執(zhí)行以滿足線程安全性(TcpConnection就是由TcpServer所在線程創(chuàng)建的)* 1.只有TcpConnection自己知道自己什么時候需要關(guān)閉,TcpServer哪里會知道* 2.一旦需要關(guān)閉,就必定需要將自己從TcpServer的connections_中移除,還是暴露了TcpServer* 3.這里僅僅讓一條語句變?yōu)榫€程不安全的,然后直接用TcpServer所在線程調(diào)用刪除操作轉(zhuǎn)為線程安全*/loop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn)); }

removeConnectionInLoop函數(shù)如下

/* * 這個函數(shù)是線程安全的,因?yàn)槭怯蒚cpServer所在事件驅(qū)動循環(huán)調(diào)用的*/ void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn) {loop_->assertInLoopThread();LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_<< "] - connection " << conn->name();size_t n = connections_.erase(conn->name());(void)n;assert(n == 1);EventLoop* ioLoop = conn->getLoop();/* * 為什么不能用runInLoop, why? *//* * std::bind綁定函數(shù)指針,注意是值綁定,也就是說conn會復(fù)制一份到bind上* 這就會延長TcpConnection生命期,否則* 1.此時對于TcpConnection的引用計(jì)數(shù)為2,參數(shù)一個,connections_中一個* 2.connections_刪除掉TcpConnection后,引用計(jì)數(shù)為1* 3.removeConnectionInLoop返回,上層函數(shù)handleClose返回,引用計(jì)數(shù)為0,會被析構(gòu)* 4.bind會值綁定,conn復(fù)制一份,TcpConnection引用計(jì)數(shù)加1,就不會導(dǎo)致TcpConnection被析構(gòu)*/ioLoop->queueInLoop(std::bind(&TcpConnection::connectDestroyed, conn)); }

比較重要的地方是TcpConnection生命期的問題,注釋中也有提及。因?yàn)閙uduo中對象使用智能指針shared_ptr存儲的,所以只有當(dāng)shard_ptr的引用計(jì)數(shù)為0時才會析構(gòu)它保存的對象。對于TcpConnection而言,它的引用計(jì)數(shù)在

  • 建立之初保存在TcpServer的connections_中,這個connections_是一個map,鍵是字符串類型,值就是shared_ptr<TcpConnection>類型,所以建立之初,TcpConnection對象的引用計(jì)數(shù)為1
  • 創(chuàng)建TcpConnection后,TcpConnection為內(nèi)部用于監(jiān)聽文件描述符sockfd的Channel傳遞保存著自己指針的shared_ptr,但是Channel中的智能指針是以weak_ptr的形式存在的(tie_),不增加引用計(jì)數(shù),所以此時仍然是1
  • 在TcpConnection處于連接創(chuàng)建的過程中未有shared_ptr的創(chuàng)建和銷毀,所以仍然是1
  • 客戶端主動關(guān)閉連接后,服務(wù)器端的TcpConnection在handleClose函數(shù)中又創(chuàng)建了一個shared_ptr引用自身的局部變量,此時引用計(jì)數(shù)加一,變?yōu)?
  • 緊接著調(diào)用TcpServer::removeConnectionInLoop函數(shù),因?yàn)閭魅氲膮?shù)是引用類型,不存在賦值,所以引用計(jì)數(shù)仍為2
  • 在removeConnectionInLoop函數(shù)中,TcpServer將TcpConnection從自己的connections_(保存所有tcp連接的map)中刪除,此時引用計(jì)數(shù)減一,變?yōu)?
  • TcpServer通過std::bind綁定函數(shù)指針,將TcpConnection::connectDestroyed函數(shù)和TcpConnection對象綁定并放到EventLoop中等待調(diào)用,因?yàn)閟td::bind只能是賦值操作,所以引用計(jì)數(shù)加一,變?yōu)?
  • 返回到handleClose函數(shù)中,函數(shù)返回,局部變量銷毀,引用計(jì)數(shù)減一,變?yōu)?
  • EventLoop從poll中返回,執(zhí)行TcpConnection::connectDestroyed函數(shù),做最后的清理工作,函數(shù)返回,綁定到這個函數(shù)上的TcpConnection對象指針也跟著銷毀,引用計(jì)數(shù)減一,變?yōu)?
  • 開始調(diào)用TcpConnection析構(gòu)函數(shù),TcpConnection銷毀
  • 所以如果在第7步不使用std::bind增加TcpConnection生命期的話,TcpConnection可能在handleClose函數(shù)返回后就銷毀了,根本不能執(zhí)行TcpConnection::connectDestroyed函數(shù)

    總結(jié)

    以上是生活随笔為你收集整理的muduo网络库学习(五)服务器监听类Acceptor及Tcp连接TcpConnection的建立与关闭的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。