muduo网络库学习(七)用于创建服务器的类TcpServer
目前為止,涉及到的絕大多數(shù)操作都沒有提及線程,EventLoop,Poller,Channel,Acceptor,TcpConnection,這些對象的執(zhí)行都是在單獨線程完成,并沒有設(shè)計多線程的創(chuàng)建銷毀等。除了runInLoop函數(shù)僅僅提及了線程安全性,發(fā)現(xiàn)原來其實是有多個線程在同時運行的,也發(fā)現(xiàn)某個對象可能會暴露給其他線程,這樣不安全,為了解決不安全隱患,muduo為每個EventLoop設(shè)計了runInLoop和queueInLoop函數(shù)用來將本該在其他線程執(zhí)行的線程不安全函數(shù)放到它所屬線程執(zhí)行,從而達到線程安全。
之所以線程的存在感比較低的原因在于,muduo采用one loop per thread的設(shè)計思想,即每個線程運行一個循環(huán),這里的循環(huán)也就是事件驅(qū)動循環(huán)EventLoop。所以,EventLoop對象的loop函數(shù),包括間接引發(fā)的Poller的poll函數(shù),Channel的handleEvent函數(shù),以及TcpConnection的handle*函數(shù)都是在一個線程內(nèi)完成的。而在整個muduo體系中,有著多個這樣的EventLoop,每個EventLoop都執(zhí)行著loop,poll,handleEvent,handle*這些函數(shù)。這種設(shè)計模型也被成為Reactor + 線程池
而處于食物鏈頂層,控制著這些EventLoop的,換句話說,保存著事件驅(qū)動循環(huán)線程池的,就是TcpServer類。顧名思義,服務(wù)器類,用來創(chuàng)建一個高并發(fā)的服務(wù)器,內(nèi)部便有一個線程池,線程池中有大量的線程,每個線程運行著一個事件驅(qū)動循環(huán),即one loop per thread。
另外,TcpServer本身也是一個線程(主線程),也運行著一個EventLoop,這個事件驅(qū)動循環(huán)僅僅用來監(jiān)控客戶端的連接請求,即Acceptor對象的Channel的可讀事件。通常如果用戶添加定時器任務(wù)的話,也會由這個EventLoop監(jiān)聽
但是TcpServer的這個EventLoop不在線程池中,這一點要區(qū)分開,線程池中的線程只用來運行負責監(jiān)控TcpConnection的EventLoop的
TcpServer的成員變量主要以EventLoop,Acceptor,TcpConnection, EventLoopThreadLoop為主,其它變量主要是各種用戶提供的回調(diào)函數(shù),大多都傳給每一個TcpConnection對象
typedef std::map<string, TcpConnectionPtr> ConnectionMap;/* TcpServer所在的主線程下運行的事件驅(qū)動循環(huán),負責監(jiān)聽Acceptor的Channel */EventLoop* loop_; // the acceptor loop/* 服務(wù)器負責監(jiān)聽的本地ip和端口 */const string ipPort_;/* 服務(wù)器名字,創(chuàng)建時傳入 */const string name_;/* Acceptor對象,負責監(jiān)聽客戶端連接請求,運行在主線程的EventLoop中 */std::unique_ptr<Acceptor> acceptor_; // avoid revealing Acceptor/* 事件驅(qū)動線程池,池中每個線程運行一個EventLoop */std::shared_ptr<EventLoopThreadPool> threadPool_;/* 用戶傳入,有tcp連接到達或tcp連接關(guān)閉時調(diào)用,傳給TcpConnection */ConnectionCallback connectionCallback_;/* 用戶傳入,對端發(fā)來消息時調(diào)用,傳給TcpConnection */MessageCallback messageCallback_;/* 成功寫入內(nèi)核tcp緩沖區(qū)后調(diào)用,傳給TcpConnection */WriteCompleteCallback writeCompleteCallback_;/* 線程池初始化完成后調(diào)用,傳給EventLoopThreadPool,再傳給每個EventLoopThread */ThreadInitCallback threadInitCallback_;AtomicInt32 started_;// always in loop thread/* TcpConnection特有id,每增加一個TcpConnection,nextConnId_加一 */int nextConnId_;/* 所有的TcpConnection對象,智能指針 */ConnectionMap connections_;- 事件驅(qū)動循環(huán)EventLoop如上所說,運行在主線程,只用來監(jiān)聽客戶端連接請求(Acceptor),不負責監(jiān)聽TcpConnection
- Acceptor用來監(jiān)聽客戶端請求,在muduo網(wǎng)絡(luò)庫學習(五)服務(wù)器監(jiān)聽類Acceptor及Tcp連接TcpConnection的建立與關(guān)閉中有提及
- 事件驅(qū)動循環(huán)線程池EventLoopThreadPool用于分發(fā)線程,當新建TcpConnection時,從線程池中選出一個事件驅(qū)動循環(huán)線程負責這個TcpConnection
- connections_是std::map<string, shared_ptr<TcpConnection> >類型,在新建TcpConnection后會添加到這個map中,可以保證每個TcpConnection在添加后引用計數(shù)為1,保證生命期的正常管理。在tcp連接關(guān)閉后會從map中刪除TcpConnection,使引用計數(shù)減1,為了防止引用計數(shù)為0導(dǎo)致沒有從TcpConnection的函數(shù)中返回就已經(jīng)將TcpConnection銷毀,在removeConnectionInLoop中使用std::bind延長了TcpConnection的生命期,在muduo網(wǎng)絡(luò)庫學習(五)服務(wù)器監(jiān)聽類Acceptor及Tcp連接TcpConnection的建立與關(guān)閉中有提及
構(gòu)造函數(shù)主要任務(wù)是為Acceptor設(shè)置回調(diào)函數(shù),當有新的客戶端連接時由Acceptor接收后調(diào)用這個回調(diào)函數(shù)
TcpServer::TcpServer(EventLoop* loop,const InetAddress& listenAddr,const string& nameArg,Option option): loop_(CHECK_NOTNULL(loop)),ipPort_(listenAddr.toIpPort()),name_(nameArg),acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),threadPool_(new EventLoopThreadPool(loop, name_)),connectionCallback_(defaultConnectionCallback),messageCallback_(defaultMessageCallback),nextConnId_(1) {/* * 設(shè)置回調(diào)函數(shù),當有客戶端請求時,Acceptor接收客戶端請求,然后調(diào)用這里設(shè)置的回調(diào)函數(shù)* 回調(diào)函數(shù)用于創(chuàng)建TcpConnection連接*/acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this, _1, _2)); }回調(diào)函數(shù)newConnection用于創(chuàng)建一個TcpConnection,將其添加到connections_中,然后設(shè)置各種用戶提供的回調(diào)函數(shù)(有數(shù)據(jù)可讀時/連接關(guān)閉時/出現(xiàn)錯誤時等)
/* * Acceptor接收客戶端請求后調(diào)用的回調(diào)函數(shù)* @param sockfd: 已經(jīng)接收完成(三次握手完成)后的客戶端套接字* @param peerAddr: 客戶端地址* * Acceptor只負責接收客戶端請求* TcpServer需要生成一個TcpConnection用于管理tcp連接* * 1.TcpServer內(nèi)有一個EventLoopThreadPool,即事件循環(huán)線程池,池子中每個線程都是一個EventLoop* 2.每個EventLoop包含一個Poller用于監(jiān)聽注冊到這個EventLoop上的所有Channel* 3.當建立起一個新的TcpConnection時,這個連接會放到線程池中的某個EventLoop中* 4.TcpServer中的baseLoop只用來檢測客戶端的連接* * 從libevent的角度看就是* 1.EventLoopThreadPool是一個struct event_base的池子,池子中全是struct event_base* 2.TcpServer獨占一個event_base,這個event_base不在池子中* 3.TcpConnection會扔到這個池子中的某個event_base中*/ void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) {loop_->assertInLoopThread();/* 從事件驅(qū)動線程池中取出一個線程給TcpConnection */EventLoop* ioLoop = threadPool_->getNextLoop();/* 為TcpConnection生成獨一無二的名字 */char buf[64];snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);++nextConnId_;string connName = name_ + buf;LOG_INFO << "TcpServer::newConnection [" << name_<< "] - new connection [" << connName<< "] from " << peerAddr.toIpPort();/* * 根據(jù)sockfd獲取tcp連接在本地的<地址,端口>* getsockname(int fd, struct sockaddr*, int *size);*/InetAddress localAddr(sockets::getLocalAddr(sockfd));// FIXME poll with zero timeout to double confirm the new connection// FIXME use make_shared if necessary/* 創(chuàng)建一個新的TcpConnection代表一個Tcp連接 */TcpConnectionPtr conn(new TcpConnection(ioLoop,connName,sockfd,localAddr,peerAddr));/* 添加到所有tcp 連接的map中,鍵是tcp連接獨特的名字(服務(wù)器名+客戶端<地址,端口>) */connections_[connName] = conn;/* 為tcp連接設(shè)置回調(diào)函數(shù)(由用戶提供) */conn->setConnectionCallback(connectionCallback_);conn->setMessageCallback(messageCallback_);conn->setWriteCompleteCallback(writeCompleteCallback_);/* * 關(guān)閉回調(diào)函數(shù),由TcpServer設(shè)置,作用是將這個關(guān)閉的TcpConnection從map中刪除* 當poll返回后,發(fā)現(xiàn)被激活的原因是EPOLLHUP,此時需要關(guān)閉tcp連接* 調(diào)用Channel的CloseCallback,進而調(diào)用TcpConnection的handleClose,進而調(diào)用removeConnection*/conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe/* * 連接建立后,調(diào)用TcpConnection連接建立成功的函數(shù)* 1.新建的TcpConnection所在事件循環(huán)是在事件循環(huán)線程池中的某個線程* 2.所以TcpConnection也就屬于它所在的事件驅(qū)動循環(huán)所在的那個線程* 3.調(diào)用TcpConnection的函數(shù)時也就應(yīng)該在自己所在線程調(diào)用* 4.所以需要調(diào)用runInLoop在自己的那個事件驅(qū)動循環(huán)所在線程調(diào)用這個函數(shù)* 5.當前線程是TcpServer的主線程,不是TcpConnection的線程,如果在這個線程直接調(diào)用會阻塞監(jiān)聽客戶端請求* 6.其實這里不是因為線程不安全,即使在這個線程調(diào)用也不會出現(xiàn)線程不安全,因為TcpConnection本就是由這個線程創(chuàng)建的*/ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn)); }兩個比較重要的調(diào)用
EventLoop* ioLoop = threadPool_->getNextLoop();如下,采用round_robin算法來選擇一個線程,其實就是在線程池中挨著取每一個線程,這次取第一個,下次就取第二個…取到最后一個后再從第一個開始取
/* 從線程池中取出一個線程,挨著取 */ EventLoop* EventLoopThreadPool::getNextLoop() {baseLoop_->assertInLoopThread();assert(started_);/* 線程池所在線程,TcpServer的主線程 */EventLoop* loop = baseLoop_;/* * 如果不為空,取出一個* 如果為空,說明線程池中沒有創(chuàng)建線程,是單線程程序,返回主線程*/if (!loops_.empty()){// round-robin/* loops_保存所有的線程 */loop = loops_[next_];++next_;if (implicit_cast<size_t>(next_) >= loops_.size()){next_ = 0;}}return loop; }TcpConnection::connectEstablished在muduo網(wǎng)絡(luò)庫學習(五)服務(wù)器監(jiān)聽類Acceptor及Tcp連接TcpConnection的建立與關(guān)閉中提到過,主要做一些初始化工作,比如傳遞自己的智能指針給Channel用于Channel判斷tcp連接是否還在,將Channel添加到Poller中,調(diào)用用戶的回調(diào)函數(shù)
/* * 1.創(chuàng)建服務(wù)器(TcpServer)時,創(chuàng)建Acceptor,設(shè)置接收到客戶端請求后執(zhí)行的回調(diào)函數(shù)* 2.Acceptor創(chuàng)建監(jiān)聽套接字,將監(jiān)聽套接字綁定到一個Channel中,設(shè)置可讀回調(diào)函數(shù)為Acceptor的handleRead* 3.服務(wù)器啟動,調(diào)用Acceptor的listen函數(shù)創(chuàng)建監(jiān)聽套接字,同時將Channel添加到Poller中* 4.有客戶端請求連接,監(jiān)聽套接字可讀,Channel被激活,調(diào)用可讀回調(diào)函數(shù)(handleRead)* 5.回調(diào)函數(shù)接收客戶端請求,獲得客戶端套接字和地址,調(diào)用TcpServer提供的回調(diào)函數(shù)(newConnection)* 6.TcpServer的回調(diào)函數(shù)中創(chuàng)建TcpConnection代表這個tcp連接,設(shè)置tcp連接各種回調(diào)函數(shù)(由用戶提供給TcpServer)* 7.TcpServer讓tcp連接所屬線程調(diào)用TcpConnection的connectEstablished* 8.connectEstablished開啟對客戶端套接字的Channel的可讀監(jiān)聽,然后調(diào)用用戶提供的回調(diào)函數(shù)*/ void TcpConnection::connectEstablished() {loop_->assertInLoopThread();assert(state_ == kConnecting);setState(kConnected);/* Channel中對TcpConnection的弱引用在這里設(shè)置 */channel_->tie(shared_from_this());/* 設(shè)置對可讀事件的監(jiān)聽,同時將Channel添加到Poller中 */channel_->enableReading();/* 用戶提供的回調(diào)函數(shù),在連接建立成功后調(diào)用 */connectionCallback_(shared_from_this()); }用戶程序代碼使用TcpServer的例子大致如下
void onConnection(const muduo::net::TcpConnectionPtr& conn) {... }void onMessage(const muduo::net::TcpConnectionPtr& conn,muduo::net::Buffer* buf,muduo::Timerstamp time) {... }int main() {muduo::net::EventLoop loop;muduo::net::InetAddress listenAddr(2007);TcpServer server(&loop, listenAddr);server.setConnectionCallback(std::bind(onConnection, _1));server.setMessageCallback(std::bind(onMessage, _1, _2, _3));server.start();loop.loop();return 0; }由上面的例子可知,要想使用TcpServer,需要
TcpServer中的成員變量loop_就是這里創(chuàng)建的EventLoop,是主線程的事件驅(qū)動循環(huán)。
需要注意的是開啟服務(wù)器監(jiān)聽用到的是EventLoop::loop()函數(shù),TcpServer::start()是用來啟動內(nèi)部事件驅(qū)動線程池的
此時還沒有開啟對客戶端連接請求的監(jiān)聽,因為監(jiān)聽套接字所在的EventLoop還沒有開啟循環(huán),需要調(diào)用loop函數(shù),EventLoop在muduo網(wǎng)絡(luò)庫學習(四)事件驅(qū)動循環(huán)EventLoop中有提及
剩下兩個函數(shù)removeConnection和removeConnectionInLoop用于關(guān)閉TcpConnection連接,在muduo網(wǎng)絡(luò)庫學習(五)服務(wù)器監(jiān)聽類Acceptor及Tcp連接TcpConnection的建立與關(guān)閉中有提及。主要就是從connections_中刪除,然后調(diào)用TcpConnection::connectDestroyed函數(shù)執(zhí)行析構(gòu)前最后的清理工作,也有線程不安全的問題,都有解釋
TcpServer本身的接口比較少,也正是為了簡化用戶的操作,不需要考慮太多,只管創(chuàng)建一個TcpServer對象即可,剩下的muduo在內(nèi)部都可以自己完成
總結(jié)
以上是生活随笔為你收集整理的muduo网络库学习(七)用于创建服务器的类TcpServer的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 每天一道LeetCode-----KMP
- 下一篇: 每天一道LeetCode----位运算实