muduo网络库学习(五)服务器监听类Acceptor及Tcp连接TcpConnection的建立与关闭
通常服務(wù)器在處理客戶端連接請求時,為了不阻塞在accept函數(shù)上,會將監(jiān)聽套接字注冊到io復(fù)用函數(shù)中,當(dāng)客戶端請求連接時,監(jiān)聽套接字變?yōu)榭勺x,隨后在回調(diào)函數(shù)調(diào)用accept接收客戶端連接。muduo將這一部分封裝成了Acceptor類,用于執(zhí)行接收客戶端請求的任務(wù)。
類的定義如下,主要就是監(jiān)聽套接字變?yōu)榭勺x的回調(diào)函數(shù)
class EventLoop; class InetAddress;/// /// Acceptor of incoming TCP connections. /// /* * 對TCP socket, bind, listen, accept的封裝 * 將sockfd以Channel的形式注冊到EventLoop的Poller中,檢測到sockfd可讀時,接收客戶端*/ class Acceptor : noncopyable {public:typedef std::function<void (int sockfd, const InetAddress&)> NewConnectionCallback;Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport);~Acceptor();/* 由服務(wù)器TcpServer設(shè)置的回調(diào)函數(shù),在接收完客戶端請求后執(zhí)行,用于創(chuàng)建TcpConnection */void setNewConnectionCallback(const NewConnectionCallback& cb){ newConnectionCallback_ = cb; }bool listenning() const { return listenning_; }/* 調(diào)用listen函數(shù),轉(zhuǎn)為監(jiān)聽套接字,同時將監(jiān)聽套接字添加到Poller中 */void listen();private:/* 回調(diào)函數(shù),當(dāng)有客戶端請求連接時執(zhí)行(監(jiān)聽套接字變?yōu)榭勺x) */void handleRead();/* 事件驅(qū)動主循環(huán) */EventLoop* loop_;/* 封裝socket的一些接口 */Socket acceptSocket_;/* Channel,保存著sockfd,被添加到Poller中,等待被激活 */Channel acceptChannel_;/* * 當(dāng)有客戶端連接時首先內(nèi)部接收連接,然后調(diào)用的用戶提供的回調(diào)函數(shù)* 客戶端套接字和地址作為參數(shù)傳入*/NewConnectionCallback newConnectionCallback_;bool listenning_;/* * Tcp連接建立的流程* 1.服務(wù)器調(diào)用socket,bind,listen開啟監(jiān)聽套接字監(jiān)聽客戶端請求* 2.客戶端調(diào)用socket,connect連接到服務(wù)器* 3.第一次握手客戶端發(fā)送SYN請求分節(jié)(數(shù)據(jù)序列號)* 4.服務(wù)器接收SYN后保存在本地然后發(fā)送自己的SYN分節(jié)(數(shù)據(jù)序列號)和ACK確認(rèn)分節(jié)告知客戶端已收到* 同時開啟第二次握手* 5.客戶端接收到服務(wù)器的SYN分節(jié)和ACK確認(rèn)分節(jié)后保存在本地然后發(fā)送ACK確認(rèn)分節(jié)告知服務(wù)器已收到* 此時第二次握手完成,客戶端connect返回* 此時,tcp連接已經(jīng)建立完成,客戶端tcp狀態(tài)轉(zhuǎn)為ESTABLISHED,而在服務(wù)器端,新建的連接保存在內(nèi)核tcp* 連接的隊(duì)列中,此時服務(wù)器端監(jiān)聽套接字變?yōu)榭勺x,等待服務(wù)器調(diào)用accept函數(shù)取出這個連接* 6.服務(wù)器接收到客戶端發(fā)來的ACK確認(rèn)分節(jié),服務(wù)器端調(diào)用accept嘗試找到一個空閑的文件描述符,然后* 從內(nèi)核tcp連接隊(duì)列中取出第一個tcp連接,分配這個文件描述符用于這個tcp連接* 此時服務(wù)器端tcp轉(zhuǎn)為ESTABLISHED,三次握手完成,tcp連接建立* * 服務(wù)器啟動時占用的一個空閑文件描述符,/dev/null,作用是解決文件描述符耗盡的情況* 原理如下:* 當(dāng)服務(wù)器端文件描述符耗盡,當(dāng)客戶端再次請求連接,服務(wù)器端由于沒有可用文件描述符* 會返回-1,同時errno為EMFILE,意為描述符到達(dá)hard limit,無可用描述符,此時服務(wù)器端* accept函數(shù)在獲取一個空閑文件描述符時就已經(jīng)失敗,還沒有從內(nèi)核tcp連接隊(duì)列中取出tcp連接* 這會導(dǎo)致監(jiān)聽套接字一直可讀,因?yàn)閠cp連接隊(duì)列中一直有客戶端的連接請求* * 所以服務(wù)器在啟動時打開一個空閑描述符/dev/null(文件描述符),先站著'坑‘,當(dāng)出現(xiàn)上面* 情況accept返回-1時,服務(wù)器暫時關(guān)閉idleFd_讓出'坑',此時就會多出一個空閑描述符* 然后再次調(diào)用accept接收客戶端請求,然后close接收后的客戶端套接字,優(yōu)雅的告訴* 客戶端關(guān)閉連接,然后再將'坑'占上*/int idleFd_;};一個不好理解的變量是idleFd_;,它是一個文件描述符,這里是打開"/dev/null"文件后返回的描述符,用于解決服務(wù)器端描述符耗盡的情況。
如果當(dāng)服務(wù)器文件描述符耗盡后,服務(wù)器端accept還沒等從tcp連接隊(duì)列中取出連接請求就已經(jīng)失敗返回了,此時內(nèi)核tcp隊(duì)列中一直有客戶端請求,內(nèi)核會一直通知監(jiān)聽套接字,導(dǎo)致監(jiān)聽套接字一直處于可讀,在下次直接poll函數(shù)時會直接返回。
解決的辦法就是在服務(wù)器剛啟動時就預(yù)先占用一個文件描述符,通常可以是打開一個文件,這里是"/dev/null"。此時服務(wù)器就有一個空閑的文件描述符了,當(dāng)出現(xiàn)上述情況無法取得tcp連接隊(duì)列中的請求時,先關(guān)閉這個文件讓出一個文件描述符,此時調(diào)用accept函數(shù)再次接收,由于已經(jīng)有一個空閑的文件描述符了,accept會正常返回,將連接請求從tcp隊(duì)列中取出,然后優(yōu)雅的關(guān)閉這個tcp連接(調(diào)用close函數(shù)),最后再打開"/dev/null"這個文件把”坑“占住。
成員函數(shù)的實(shí)現(xiàn)也有比較重點(diǎn)的地方,首先是構(gòu)造函數(shù)
Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport): loop_(loop),acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family())),acceptChannel_(loop, acceptSocket_.fd()),listenning_(false),idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC)) {assert(idleFd_ >= 0);/* * setsockopt設(shè)置套接字選項(xiàng)SO_REUSEADDR,對于端口bind,如果這個地址/端口處于TIME_WAIT,也可bind成功* int flag = 1;* setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag));*/acceptSocket_.setReuseAddr(true);/** setsockopt設(shè)置套接字選項(xiàng)SO_REUSEPORT,作用是對于多核cpu,允許在同一個<ip, port>對上運(yùn)行多個相同服務(wù)器* 內(nèi)核會采用負(fù)載均衡的的方式分配客戶端的連接請求給某一個服務(wù)器*/acceptSocket_.setReusePort(reuseport);acceptSocket_.bindAddress(listenAddr);/* Channel設(shè)置讀事件的回調(diào)函數(shù),此時還沒有開始監(jiān)聽這個Channel,需要調(diào)用Channel::enableReading() */acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this)); }構(gòu)造函數(shù)中為用于監(jiān)聽的套接字設(shè)置了SO_REUSEPORT和SO_REUSEADDR屬性,一個是端口重用,一個是地址重用。
- SO_REUSEPORT,端口重用,可以使用還處于TIME_WAIT狀態(tài)的端口
- SO_REUSEADDR,地址重用,服務(wù)器可以同時建立多個用于監(jiān)聽的socket,每個socket綁定的地址端口都相同,內(nèi)核會采用負(fù)載均衡的方法將每個將每個客戶端請求分配給某一個socket,可以很大程序的提高并發(fā)性,充分利用CPU資源
acceptChannel_用于保存這個用于監(jiān)聽的套接字,綁定回調(diào)函數(shù),在合適的時機(jī)注冊到Poller上(調(diào)用listen時)
void Acceptor::listen() {loop_->assertInLoopThread();listenning_ = true;acceptSocket_.listen();/* * 開始監(jiān)聽Channel,也就是設(shè)置fd關(guān)心的事件(EPOLLIN/EPOLLOUT等),然后添加到Poller中 * Poller中保存著所有注冊到EventLoop中的Channel*/acceptChannel_.enableReading(); }比較重要的是事件處理函數(shù),當(dāng)監(jiān)聽套接字可讀時,調(diào)用accept接收客戶端請求,如果描述符耗盡,釋放idleFd_重新accept,然后關(guān)閉,再占用idleFd_
/** 當(dāng)有客戶端嘗試連接服務(wù)器時,監(jiān)聽套接字變?yōu)榭勺x,epoll_wait/poll返回* EventLoop處理激活隊(duì)列中的Channel,調(diào)用對應(yīng)的回調(diào)函數(shù)* 監(jiān)聽套接字的Channel的回調(diào)函數(shù)是handleRead(),用于接收客戶端請求*/ void Acceptor::handleRead() {loop_->assertInLoopThread();InetAddress peerAddr;//FIXME loop until no moreint connfd = acceptSocket_.accept(&peerAddr);if (connfd >= 0){// string hostport = peerAddr.toIpPort();// LOG_TRACE << "Accepts of " << hostport;/* * 如果設(shè)置了回調(diào)函數(shù),那么就調(diào)用,參數(shù)是客戶端套接字和地址/端口* 否則就關(guān)閉連接,因?yàn)椴]有要處理客戶端的意思* * 這個回調(diào)函數(shù)是TcpServer中的newConnection,用于創(chuàng)建一個TcpConnection連接*/if (newConnectionCallback_){newConnectionCallback_(connfd, peerAddr);}else{sockets::close(connfd);}}else{LOG_SYSERR << "in Acceptor::handleRead";// Read the section named "The special problem of// accept()ing when you can't" in libev's doc.// By Marc Lehmann, author of libev.// /* 解決服務(wù)器端描述符耗盡的情況,原因見.h文件 */if (errno == EMFILE){::close(idleFd_);idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);::close(idleFd_);idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);}} }對文件描述符耗盡的處理比較重要,以前沒怎么接觸過
回調(diào)函數(shù)中調(diào)用的newConnectionCallback_函數(shù)是在Acceptor創(chuàng)建之初由TcpServer設(shè)置的(TcpServer表示服務(wù)器,內(nèi)有一個監(jiān)聽類Acceptor),這個函數(shù)主要用于初始化一個TcpConnection,一個TcpConnection對象代表著一個tcp連接
TcpConnection的定義主要都是寫set*函數(shù),成員變量比較多,但是重要的就
- 事件驅(qū)動循環(huán)loop_
- 用于tcp通信的socket_
- 用于監(jiān)聽sockfd的channel_
- 輸入輸出緩沖區(qū)inputBuffer_/outputBuffer_
- 由TcpServer提供的各種回調(diào)函數(shù)
首先是構(gòu)造函數(shù)的實(shí)現(xiàn),主要是為Channel提供各種回調(diào)函數(shù)
/* * 構(gòu)造函數(shù),設(shè)置當(dāng)fd就緒時調(diào)用的回調(diào)函數(shù)* Channel代表一個對fd事件的監(jiān)聽*/ TcpConnection::TcpConnection(EventLoop* loop,const string& nameArg,int sockfd,const InetAddress& localAddr,const InetAddress& peerAddr): loop_(CHECK_NOTNULL(loop)),name_(nameArg),state_(kConnecting),reading_(true),socket_(new Socket(sockfd)),channel_(new Channel(loop, sockfd)),localAddr_(localAddr),peerAddr_(peerAddr),highWaterMark_(64*1024*1024) {/* 設(shè)置各種回調(diào)函數(shù) */channel_->setReadCallback(std::bind(&TcpConnection::handleRead, this, _1));channel_->setWriteCallback(std::bind(&TcpConnection::handleWrite, this));channel_->setCloseCallback(std::bind(&TcpConnection::handleClose, this));channel_->setErrorCallback(std::bind(&TcpConnection::handleError, this));LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this<< " fd=" << sockfd;/** 設(shè)置KEEP-ALIVE屬性,如果客戶端很久沒有和服務(wù)器通訊,tcp會自動判斷客戶端是否還處于連接(類似心跳包)* * int setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &sockopt, static_cast<socklen_t>(sizeof(sockopt)));*/socket_->setKeepAlive(true); }回調(diào)函數(shù)的設(shè)置對應(yīng)了Channel的hanleEvent函數(shù)中根據(jù)不同激活原因調(diào)用不同回調(diào)函數(shù)(handleEvent調(diào)用handleEventWithGuard)。
另外,handleEvent中的tie_是對TcpConnection的弱引用(在后面設(shè)置),因?yàn)榛卣{(diào)函數(shù)都是TcpConnection的,所以在調(diào)用之前需要確保TcpConnection沒有被銷毀,所以將tie_提升為shared_ptr判斷TcpConnection是否還存在,之后再調(diào)用TcpConnection的一系列回調(diào)函數(shù)
當(dāng)TcpServer創(chuàng)建完TcpConnection后,會設(shè)置各種會調(diào)用書,然后調(diào)用TcpConnection的connectEstablished函數(shù),主要用于將Channel添加到Poller中,同時調(diào)用用戶提供的連接建立成功后的回調(diào)函數(shù)
TcpServer創(chuàng)建并設(shè)置TcpConnection的部分,可以看到,TcpServer會將用戶提供的所有回調(diào)函數(shù)都傳給TcpConnection,然后執(zhí)行TcpConnection的connectEstablished函數(shù),這個函數(shù)的執(zhí)行要放到它所屬的那個事件驅(qū)動循環(huán)線程做,不要阻塞TcpServer線程(這個地方不是為了線程安全性考慮,因?yàn)門cpConnection本身就是在TcpServer線程創(chuàng)建的,暴露給TcpServer線程很正常,而且TcpServer中也記錄著所有創(chuàng)建的TcpConnection,這里的主要目的是不阻塞TcpServer線程,讓它繼續(xù)監(jiān)聽客戶端請求)
connectEstablished函數(shù)如下
/* * 1.創(chuàng)建服務(wù)器(TcpServer)時,創(chuàng)建Acceptor,設(shè)置接收到客戶端請求后執(zhí)行的回調(diào)函數(shù)* 2.Acceptor創(chuàng)建監(jiān)聽套接字,將監(jiān)聽套接字綁定到一個Channel中,設(shè)置可讀回調(diào)函數(shù)為Acceptor的handleRead* 3.服務(wù)器啟動,調(diào)用Acceptor的listen函數(shù)創(chuàng)建監(jiān)聽套接字,同時將Channel添加到Poller中* 4.有客戶端請求連接,監(jiān)聽套接字可讀,Channel被激活,調(diào)用可讀回調(diào)函數(shù)(handleRead)* 5.回調(diào)函數(shù)接收客戶端請求,獲得客戶端套接字和地址,調(diào)用TcpServer提供的回調(diào)函數(shù)(newConnection)* 6.TcpServer的回調(diào)函數(shù)中創(chuàng)建TcpConnection代表這個tcp連接,設(shè)置tcp連接各種回調(diào)函數(shù)(由用戶提供給TcpServer)* 7.TcpServer讓tcp連接所屬線程調(diào)用TcpConnection的connectEstablished* 8.connectEstablished開啟對客戶端套接字的Channel的可讀監(jiān)聽,然后調(diào)用用戶提供的回調(diào)函數(shù)*/ void TcpConnection::connectEstablished() {loop_->assertInLoopThread();assert(state_ == kConnecting);setState(kConnected);/* Channel中對TcpConnection的弱引用在這里設(shè)置 */channel_->tie(shared_from_this());/* 設(shè)置對可讀事件的監(jiān)聽,同時將Channel添加到Poller中 */channel_->enableReading();/* 用戶提供的回調(diào)函數(shù),在連接建立成功后調(diào)用 */connectionCallback_(shared_from_this()); }至此tcp連接建立完成,在用戶提供的回調(diào)函數(shù)中,傳入的參數(shù)便是這個TcpConnection的shared_ptr,用戶可以使用TcpConnection::send操作向客戶端發(fā)送消息(放到后面)
有連接的建立就有連接的關(guān)閉,當(dāng)客戶端主動關(guān)閉(調(diào)用close)時,服務(wù)器端對應(yīng)的Channel被激活,激活原因?yàn)镋POLLHUP,表示連接已關(guān)閉,此時會調(diào)用TcpConnection的回調(diào)函數(shù)handleClose,在這個函數(shù)中,TcpConnection處理執(zhí)行各種關(guān)閉動作,包括
- 將Channel從Poller中移除
- 調(diào)用TcpServer提供的關(guān)閉回調(diào)函數(shù),將自己從TcpServer的tcp連接map中移除
- 調(diào)用客戶提供的關(guān)閉回調(diào)函數(shù)(如果有的話)
connectionCallback_是由用戶提供的,連接建立/關(guān)閉時調(diào)用,主要調(diào)用closeCallback_函數(shù)(由TcpServer)提供
這個函數(shù)主要就存在線程不安全的問題,原因就是此時的線程是TcpConnection所在線程
函數(shù)執(zhí)行順序?yàn)?#xff1a;
EventLoop::loop->Poller::poll->Channel::handleEvent->TcpConnection::handleClose->TcpServer::removeConnection
此時就將TcpServer暴露給其他線程,導(dǎo)致線程不安全的問題
為了減輕線程不安全帶來的危險(xiǎn),盡量將線程不安全的函數(shù)縮短,muduo中使用runInLoop直接將要調(diào)用的函數(shù)放到自己線程執(zhí)行,轉(zhuǎn)換到線程安全,所以這部分只有這一條語句是線程不安全的
removeConnectionInLoop函數(shù)如下
/* * 這個函數(shù)是線程安全的,因?yàn)槭怯蒚cpServer所在事件驅(qū)動循環(huán)調(diào)用的*/ void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn) {loop_->assertInLoopThread();LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_<< "] - connection " << conn->name();size_t n = connections_.erase(conn->name());(void)n;assert(n == 1);EventLoop* ioLoop = conn->getLoop();/* * 為什么不能用runInLoop, why? *//* * std::bind綁定函數(shù)指針,注意是值綁定,也就是說conn會復(fù)制一份到bind上* 這就會延長TcpConnection生命期,否則* 1.此時對于TcpConnection的引用計(jì)數(shù)為2,參數(shù)一個,connections_中一個* 2.connections_刪除掉TcpConnection后,引用計(jì)數(shù)為1* 3.removeConnectionInLoop返回,上層函數(shù)handleClose返回,引用計(jì)數(shù)為0,會被析構(gòu)* 4.bind會值綁定,conn復(fù)制一份,TcpConnection引用計(jì)數(shù)加1,就不會導(dǎo)致TcpConnection被析構(gòu)*/ioLoop->queueInLoop(std::bind(&TcpConnection::connectDestroyed, conn)); }比較重要的地方是TcpConnection生命期的問題,注釋中也有提及。因?yàn)閙uduo中對象使用智能指針shared_ptr存儲的,所以只有當(dāng)shard_ptr的引用計(jì)數(shù)為0時才會析構(gòu)它保存的對象。對于TcpConnection而言,它的引用計(jì)數(shù)在
所以如果在第7步不使用std::bind增加TcpConnection生命期的話,TcpConnection可能在handleClose函數(shù)返回后就銷毀了,根本不能執(zhí)行TcpConnection::connectDestroyed函數(shù)
總結(jié)
以上是生活随笔為你收集整理的muduo网络库学习(五)服务器监听类Acceptor及Tcp连接TcpConnection的建立与关闭的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 每天一道LeetCode-----将链表
- 下一篇: 每天一道LeetCode-----删除序