muduo网络库学习(四)事件驱动循环EventLoop
muduo的設計采用高并發(fā)服務器框架中的one loop per thread模式,即一個線程一個事件循環(huán)。
這里的loop,其實就是muduo中的EventLoop,所以到目前為止,不管是Poller,Channel還是TimerQueue都僅僅是單線程下的任務,因為這些都依賴于EventLoop。這每一個EventLoop,其實也就是一個Reactor模型。
而多線程體現(xiàn)在EventLoop的上層,即在EventLoop上層有一個線程池,線程池中每一個線程運行一個EventLoop,也就是Reactor + 線程池的設計模式
梳理一下
- 每個muduo網(wǎng)絡庫有一個事件驅(qū)動循環(huán)線程池EventLoopThreadPool
- 每個線程池中有多個事件驅(qū)動線程EventLoopThread
- 每個線程運行一個EventLoop事件循環(huán)
- 每個EventLoop事件循環(huán)包含一個io復用Poller,一個計時器隊列TimerQueue
- 每個Poller監(jiān)聽多個Channel,TimerQueue其實也是一個Channel
- 每個Channel對應一個fd,在Channel被激活后調(diào)用回調(diào)函數(shù)
- 每個回調(diào)函數(shù)是在EventLoop所在線程執(zhí)行
- 所有激活的Channel回調(diào)結束后EventLoop繼續(xù)讓Poller監(jiān)聽
所以調(diào)用回調(diào)函數(shù)的過程中是同步的,如果回調(diào)函數(shù)執(zhí)行時間很長,那么這個EventLoop所在線程就會等待很久之后才會再次調(diào)用poll。
整個muduo網(wǎng)絡庫實際上是由Reactor + 線程池實現(xiàn)的,線程池中每一個線程都是一個Reactor模型。在處理大并發(fā)的服務器任務上有很大優(yōu)勢。
簡化的關系圖如下,EventLoop只涉及Poller,Channel(簡單涉及TcpConnection)和TimerQueue。
- 白色三角,繼承
- 黑色菱形,聚合
一個事件驅(qū)動循環(huán)EventLoop其實就是一個Reactor模型,是一個單線程任務。主要包含io復用函數(shù)Poller,定時器隊列TimerQueue以及激活隊列。其他的就是一些輔助變量
typedef std::vector<Channel*> ChannelList;bool looping_; /* atomic */std::atomic<bool> quit_;bool eventHandling_; /* atomic */bool callingPendingFunctors_; /* atomic */int64_t iteration_;/* 創(chuàng)建時保存當前事件循環(huán)所在線程,用于之后運行時判斷使用EventLoop的線程是否是EventLoop所屬的線程 */const pid_t threadId_;/* poll返回的時間,用于計算從激活到調(diào)用回調(diào)函數(shù)的延遲 */Timestamp pollReturnTime_;/* io多路復用 */std::unique_ptr<Poller> poller_;/* 定時器隊列 */std::unique_ptr<TimerQueue> timerQueue_;/* 喚醒當前線程的描述符 */int wakeupFd_;// unlike in TimerQueue, which is an internal class,// we don't expose Channel to client./* * 用于喚醒當前線程,因為當前線程主要阻塞在poll函數(shù)上* 所以喚醒的方法就是手動激活這個wakeupChannel_,即寫入幾個字節(jié)讓Channel變?yōu)榭勺x* 注: 這個Channel也注冊到Poller中*/std::unique_ptr<Channel> wakeupChannel_;boost::any context_;// scratch variables/* * 激活隊列,poll函數(shù)在返回前將所有激活的Channel添加到激活隊列中* 在當前事件循環(huán)中的所有Channel在Poller中*/ChannelList activeChannels_;/* 當前執(zhí)行回調(diào)函數(shù)的Channel */Channel* currentActiveChannel_;/* * queueInLoop添加函數(shù)時給pendingFunctors_上鎖,防止多個線程同時添加* * mutable,突破const限制,在被const聲明的函數(shù)仍然可以更改這個變量*/mutable MutexLock mutex_;/* * 等待在當前線程調(diào)用的回調(diào)函數(shù),* 原因是本來屬于當前線程的回調(diào)函數(shù)會被其他線程調(diào)用時,應該把這個回調(diào)函數(shù)添加到它屬于的線程中* 等待它屬于的線程被喚醒后調(diào)用,以滿足線程安全性* * TcpServer::removeConnection是個例子* 當關閉一個TcpConnection時,需要調(diào)用TcpServer::removeConnection,但是這個函數(shù)屬于TcpServer,* 然而TcpServer和TcpConnection不屬于同一個線程,這就容易將TcpServer暴露給其他線程,* 萬一其他線程析構了TcpServer怎么辦(線程不安全)* 所以會調(diào)用EventLoop::runInLoop,如果要調(diào)用的函數(shù)屬于當前線程,直接調(diào)用* 否則,就添加到這個隊列中,等待當前線程被喚醒*/std::vector<Functor> pendingFunctors_; // @GuardedBy mutex_最后一個變量std::vector<Functor> pendingFunctors_;比較不好理解,它是一個任務容器,存放的是將要執(zhí)行的回調(diào)函數(shù)。
準備這么一個容器的原因在于
- 某個對象(通常是Channel或者TcpConnection)可能被另一個線程使用(這個線程不是這個對象所在線程),此時這個對象就等于暴露給其他線程了。這是非常不安全的,萬一這個線程不小心析構了這個對象,而這個對象所屬的那個線程正要訪問這個對象(例如調(diào)用這個對象的接口),這個線程就會崩潰,因為它訪問了一個本不存在的對象(已經(jīng)被析構)。
- 為了解決這個問題,就需要盡量將對這個對象的操作移到它所屬的那個線程執(zhí)行(這里是調(diào)用這個對象的接口)以滿足線程安全性。又因為每個對象都有它所屬的事件驅(qū)動循環(huán)EventLoop,這個EventLoop通常阻塞在poll上。可以保證的是EventLoop阻塞的線程就是它所屬的那個線程,所以調(diào)用poll的線程就是這個對象所屬的線程。這就 可以讓poll返回后再執(zhí)行想要調(diào)用的函數(shù),但是需要手動喚醒poll,否則一直阻塞在那里會耽誤函數(shù)的執(zhí)行。
runInLoop和queueInLoop函數(shù)執(zhí)行的就是上述操作
/** 1.如果事件循環(huán)不屬于當前這個線程,就不能直接調(diào)用回調(diào)函數(shù),應該回到自己所在線程調(diào)用* 2.此時需要先添加到自己的隊列中存起來,然后喚醒自己所在線程的io復用函數(shù)(poll)* 3.喚醒方法是采用eventfd,這個eventfd只有8字節(jié)的緩沖區(qū),向eventfd中寫入數(shù)據(jù)另poll返回* 4.返回后會調(diào)用在隊列中的函數(shù),見EventLoop* * 舉例說明什么時候會出現(xiàn)事件驅(qū)動循環(huán)不屬于當前線程的情況* 1.客戶端close連接,服務器端某個Channel被激活,原因為EPOLLHUP* 2.Channel調(diào)用回調(diào)函數(shù),即TcpConnection的handleClose* 3.handleClose調(diào)用TcpServer為它提供的回調(diào)函數(shù)removeConnection* 4.此時執(zhí)行的是TcpServer的removeConnection函數(shù),* 解釋 * 1.因為TcpServer所在線程和TcpConnection所在的不是同一個線程* 2.這就導致將TcpServer暴露給了TcpConnection所在線程* 3.因為TcpServer需要將這個關閉的TcpConnection從tcp map中刪除* 就需要調(diào)用自己的另一個函數(shù)removeConnectionInLoop* 4.為了實現(xiàn)線程安全性,也就是為了讓removeConnectionInLoop在TcpServer自己所在線程執(zhí)行* 需要先把這個函數(shù)添加到隊列中存起來,等到回到自己的線程在執(zhí)行* 5.runInLoop中的queueInLoop就是將這個函數(shù)存起來* 6.而此時調(diào)用runInLoop的仍然是TcpConnection所在線程* 7.因為自始至終,removeConnection這個函數(shù)都還沒有結束* * 如果調(diào)用runInLoop所在線程和事件驅(qū)動循環(huán)線程是一個線程,那么直接調(diào)用回調(diào)函數(shù)就行了* * 在TcpServer所在線程中,EventLoop明明阻塞在poll上,這里為什么可以對它進行修改* 1.線程相當于一個人可以同時做兩件事情,一個EventLoop同時調(diào)用兩個函數(shù)就很正常了* 2.其實函數(shù)調(diào)用都是通過函數(shù)地址調(diào)用的,既然EventLoop可讀,就一定直到內(nèi)部函數(shù)的地址,自然可以調(diào)用* 3.而更改成員函數(shù),通過地址訪問,進而修改,也是可以的*/ void EventLoop::runInLoop(Functor cb) {if (isInLoopThread()){cb();}else{queueInLoop(std::move(cb));} }當然了,如果這個對象所屬線程和當前線程相同,就沒有線程安全性的問題,直接調(diào)用即可。否則,就需要添加到pendingFunctors_中,這正是queueInLoop的功效
/** 由runInLoop調(diào)用,也可直接調(diào)用,作用* 1.將相應的回調(diào)函數(shù)存在事件驅(qū)動循環(huán)的隊列中,等待回到自己線程再調(diào)用它* 2.激活自己線程的事件驅(qū)動循環(huán)*/ void EventLoop::queueInLoop(Functor cb) {{MutexLockGuard lock(mutex_);pendingFunctors_.push_back(std::move(cb));}if (!isInLoopThread() || callingPendingFunctors_){wakeup();} }此處需要上鎖保護pendingFunctors_以防止多個線程同時向它添加函數(shù)。這里的鎖體現(xiàn)了RAII方法,大括號是語句塊,把里面的變量作為臨時變量處理
因為EventLoop通常阻塞在poll上,所以添加到pendingFunctors_后需要手動喚醒它,不然它一直阻塞在poll,會耽誤了函數(shù)的執(zhí)行。喚醒的方法是使用eventfd
函數(shù)用于創(chuàng)建一個eventfd文件描述符,這個描述符可用于進程/線程間的等待/喚醒。原因是內(nèi)核只為eventfd維護一個uint64_t類型的計數(shù)器,大小應該在64位。
參數(shù)initval是這個計數(shù)器的初值
flags是一些標志,可以是下面幾個的或運算結果
- EFD_NONBLOCK,非阻塞
- EFD_CLOEXEC,設置close-on-exec屬性,調(diào)用exec時會自動close
- …
eventfd也可以使用write/read等io函數(shù)進行讀寫,區(qū)別是
write每次只能寫入8字節(jié)大小的數(shù)據(jù),內(nèi)核會將這8字節(jié)大小的數(shù)值加到計數(shù)器上
read一次性讀取這個計數(shù)器的值,并把緩沖區(qū)初始化為0。如果調(diào)用read時這個計數(shù)器值就是0,那么非阻塞時會返回EAGAIN,阻塞時會等待計數(shù)器的值變?yōu)榉?
可以把這個eventfd添加到poll中,在需要喚醒時寫入8字節(jié)數(shù)據(jù),此時poll返回,執(zhí)行回調(diào)函數(shù),然后執(zhí)行在pendingFunctors_中的函數(shù)。
loop函數(shù)是EventLoop的事件驅(qū)動循環(huán),所有的Reactor模型的loop函數(shù)都差不多。執(zhí)行的就是poll和回調(diào)函數(shù)的回調(diào),以及pendingFunctors_中函數(shù)的調(diào)用
/* * 事件驅(qū)動主循環(huán)* * 1.每個TcpServer對應一個事件驅(qū)動循環(huán)線程池* 2.每個事件驅(qū)動循環(huán)線程池對應多個事件驅(qū)動循環(huán)線程* 3.每個事件驅(qū)動循環(huán)線程對應一個事件驅(qū)動主循環(huán)* 4.每個事件驅(qū)動主循環(huán)對應一個io多路復用函數(shù)* 5.每個io多路復用函數(shù)監(jiān)聽多個Channel* 6.每個Channel對應一個fd,也就對應一個TcpConnection或者監(jiān)聽套接字* 7.在poll返回后處理激活隊列中Channel的過程是同步的,也就是一個一個調(diào)用回調(diào)函數(shù)* 8.調(diào)用回調(diào)函數(shù)的線程和事件驅(qū)動主循環(huán)所在線程是同一個,也就是同步執(zhí)行回調(diào)函數(shù)* 9.線程池用在事件驅(qū)動循環(huán)上層,也就是事件驅(qū)動循環(huán)是線程池中的一個線程*/ void EventLoop::loop() {assert(!looping_);assertInLoopThread();looping_ = true;quit_ = false; // FIXME: what if someone calls quit() before loop() ?LOG_TRACE << "EventLoop " << this << " start looping";while (!quit_){/* 清空激活隊列 */activeChannels_.clear();/* epoll_wait返回后會將所有就緒的Channel添加到激活隊列activeChannel中 */pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);++iteration_;if (Logger::logLevel() <= Logger::TRACE){printActiveChannels();}// TODO sort channel by priorityeventHandling_ = true;/* 執(zhí)行所有在激活隊列中的Channel的回調(diào)函數(shù) */for (Channel* channel : activeChannels_){currentActiveChannel_ = channel;currentActiveChannel_->handleEvent(pollReturnTime_);}currentActiveChannel_ = NULL;eventHandling_ = false;/* 執(zhí)行pendingFunctors_中的所有函數(shù) */doPendingFunctors();}LOG_TRACE << "EventLoop " << this << " stop looping";looping_ = false; }Reactor模式的loop函數(shù)大多一個樣子
muduo中多了處理pendingFunctors_中的函數(shù),在自己的線程調(diào)用自己的函數(shù)是安全的
Channel的回調(diào)函數(shù)就是根據(jù)被激活原因調(diào)用不同的回調(diào)函數(shù),這些回調(diào)函數(shù)是在TcpConnection創(chuàng)建之初被設置的。
簡單說一下Channel和TcpConnection的關系
- 每個TcpConnection對象代表一個tcp連接,所以TcpConnection中需要保存用于服務器/客戶端通信的套接字,這個套接字就記錄在Channel中
- TcpConnection在創(chuàng)建之初會為Channel設置回調(diào)函數(shù),如果套接字可讀/可寫/錯誤/關閉等就會執(zhí)行TcpConnection中的函數(shù)
- TcpConnection在確定連接已經(jīng)建立后會向Poller注冊自己的Channel
Channel的handleEvent如下
tie_是TcpConnection的弱引用,在調(diào)用TcpConnection的函數(shù)之前判斷它是否還存在,如果被析構了,那么提升的shared_ptr會是null
具體可以參考 muduo網(wǎng)絡庫學習(二)對套接字和監(jiān)聽事件的封裝Channel
EventLoop沒有特別處理定時器任務,原因是定時器任務TimerQueue也被轉換成一個文件描述符添加到Poller中,所以時間一到timerfd變?yōu)榭勺x,poll就會返回,就會調(diào)用回調(diào)函數(shù)。EventLoop只提供了runAt/runAfter/runEveny三個接口用于設置定時任務。這些在 muduo網(wǎng)絡庫學習(三)定時器TimerQueue的設計中有提及
/* * 定時器功能,由用戶調(diào)用runAt并提供當事件到了執(zhí)行的回調(diào)函數(shù)* 時間在Timestamp設置,絕對時間,單位是微秒*/ TimerId EventLoop::runAt(Timestamp time, TimerCallback cb) {/* std::move,移動語義,避免拷貝 */return timerQueue_->addTimer(std::move(cb), time, 0.0); }/** 如上,單位是微秒,相對時間*/ TimerId EventLoop::runAfter(double delay, TimerCallback cb) {Timestamp time(addTime(Timestamp::now(), delay));return runAt(time, std::move(cb)); }/** 每隔多少微秒調(diào)用一次*/ TimerId EventLoop::runEvery(double interval, TimerCallback cb) {Timestamp time(addTime(Timestamp::now(), interval));return timerQueue_->addTimer(std::move(cb), time, interval); }幾個C++方面的知識點
- std::move,移動語義,避免拷貝
- RAII,以鎖為例,構造時上鎖,析構時解鎖(函數(shù)返回時局部變量析構)
- 花括號語句塊
- std::unique_ptr,智能指針,不允許拷貝和賦值,獨一無二
- std::shared_ptr,智能指針,可以拷貝賦值,存在引用計數(shù)
- std::weak_ptr,弱引用,不增加引用計數(shù),必要時可通過lock函數(shù)提升為shared_ptr
總結
以上是生活随笔為你收集整理的muduo网络库学习(四)事件驱动循环EventLoop的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: muduo网络库学习(三)定时器Time
- 下一篇: 每天一道LeetCode-----合并两