日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

muduo网络库学习(四)事件驱动循环EventLoop

發(fā)布時間:2024/4/19 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 muduo网络库学习(四)事件驱动循环EventLoop 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

muduo的設計采用高并發(fā)服務器框架中的one loop per thread模式,即一個線程一個事件循環(huán)。
這里的loop,其實就是muduo中的EventLoop,所以到目前為止,不管是Poller,Channel還是TimerQueue都僅僅是單線程下的任務,因為這些都依賴于EventLoop。這每一個EventLoop,其實也就是一個Reactor模型。
而多線程體現(xiàn)在EventLoop的上層,即在EventLoop上層有一個線程池,線程池中每一個線程運行一個EventLoop,也就是Reactor + 線程池的設計模式


梳理一下

  • 每個muduo網(wǎng)絡庫有一個事件驅(qū)動循環(huán)線程池EventLoopThreadPool
  • 每個線程池中有多個事件驅(qū)動線程EventLoopThread
  • 每個線程運行一個EventLoop事件循環(huán)
  • 每個EventLoop事件循環(huán)包含一個io復用Poller,一個計時器隊列TimerQueue
  • 每個Poller監(jiān)聽多個Channel,TimerQueue其實也是一個Channel
  • 每個Channel對應一個fd,在Channel被激活后調(diào)用回調(diào)函數(shù)
  • 每個回調(diào)函數(shù)是在EventLoop所在線程執(zhí)行
  • 所有激活的Channel回調(diào)結束后EventLoop繼續(xù)讓Poller監(jiān)聽

所以調(diào)用回調(diào)函數(shù)的過程中是同步的,如果回調(diào)函數(shù)執(zhí)行時間很長,那么這個EventLoop所在線程就會等待很久之后才會再次調(diào)用poll。

整個muduo網(wǎng)絡庫實際上是由Reactor + 線程池實現(xiàn)的,線程池中每一個線程都是一個Reactor模型。在處理大并發(fā)的服務器任務上有很大優(yōu)勢。

簡化的關系圖如下,EventLoop只涉及Poller,Channel(簡單涉及TcpConnection)和TimerQueue。

  • 白色三角,繼承
  • 黑色菱形,聚合


一個事件驅(qū)動循環(huán)EventLoop其實就是一個Reactor模型,是一個單線程任務。主要包含io復用函數(shù)Poller,定時器隊列TimerQueue以及激活隊列。其他的就是一些輔助變量

typedef std::vector<Channel*> ChannelList;bool looping_; /* atomic */std::atomic<bool> quit_;bool eventHandling_; /* atomic */bool callingPendingFunctors_; /* atomic */int64_t iteration_;/* 創(chuàng)建時保存當前事件循環(huán)所在線程,用于之后運行時判斷使用EventLoop的線程是否是EventLoop所屬的線程 */const pid_t threadId_;/* poll返回的時間,用于計算從激活到調(diào)用回調(diào)函數(shù)的延遲 */Timestamp pollReturnTime_;/* io多路復用 */std::unique_ptr<Poller> poller_;/* 定時器隊列 */std::unique_ptr<TimerQueue> timerQueue_;/* 喚醒當前線程的描述符 */int wakeupFd_;// unlike in TimerQueue, which is an internal class,// we don't expose Channel to client./* * 用于喚醒當前線程,因為當前線程主要阻塞在poll函數(shù)上* 所以喚醒的方法就是手動激活這個wakeupChannel_,即寫入幾個字節(jié)讓Channel變?yōu)榭勺x* 注: 這個Channel也注冊到Poller中*/std::unique_ptr<Channel> wakeupChannel_;boost::any context_;// scratch variables/* * 激活隊列,poll函數(shù)在返回前將所有激活的Channel添加到激活隊列中* 在當前事件循環(huán)中的所有Channel在Poller中*/ChannelList activeChannels_;/* 當前執(zhí)行回調(diào)函數(shù)的Channel */Channel* currentActiveChannel_;/* * queueInLoop添加函數(shù)時給pendingFunctors_上鎖,防止多個線程同時添加* * mutable,突破const限制,在被const聲明的函數(shù)仍然可以更改這個變量*/mutable MutexLock mutex_;/* * 等待在當前線程調(diào)用的回調(diào)函數(shù),* 原因是本來屬于當前線程的回調(diào)函數(shù)會被其他線程調(diào)用時,應該把這個回調(diào)函數(shù)添加到它屬于的線程中* 等待它屬于的線程被喚醒后調(diào)用,以滿足線程安全性* * TcpServer::removeConnection是個例子* 當關閉一個TcpConnection時,需要調(diào)用TcpServer::removeConnection,但是這個函數(shù)屬于TcpServer,* 然而TcpServer和TcpConnection不屬于同一個線程,這就容易將TcpServer暴露給其他線程,* 萬一其他線程析構了TcpServer怎么辦(線程不安全)* 所以會調(diào)用EventLoop::runInLoop,如果要調(diào)用的函數(shù)屬于當前線程,直接調(diào)用* 否則,就添加到這個隊列中,等待當前線程被喚醒*/std::vector<Functor> pendingFunctors_; // @GuardedBy mutex_

最后一個變量std::vector<Functor> pendingFunctors_;比較不好理解,它是一個任務容器,存放的是將要執(zhí)行的回調(diào)函數(shù)。

準備這么一個容器的原因在于

  • 某個對象(通常是Channel或者TcpConnection)可能被另一個線程使用(這個線程不是這個對象所在線程),此時這個對象就等于暴露給其他線程了。這是非常不安全的,萬一這個線程不小心析構了這個對象,而這個對象所屬的那個線程正要訪問這個對象(例如調(diào)用這個對象的接口),這個線程就會崩潰,因為它訪問了一個本不存在的對象(已經(jīng)被析構)。
  • 為了解決這個問題,就需要盡量將對這個對象的操作移到它所屬的那個線程執(zhí)行(這里是調(diào)用這個對象的接口)以滿足線程安全性。又因為每個對象都有它所屬的事件驅(qū)動循環(huán)EventLoop,這個EventLoop通常阻塞在poll上。可以保證的是EventLoop阻塞的線程就是它所屬的那個線程,所以調(diào)用poll的線程就是這個對象所屬的線程。這就 可以讓poll返回后再執(zhí)行想要調(diào)用的函數(shù),但是需要手動喚醒poll,否則一直阻塞在那里會耽誤函數(shù)的執(zhí)行。

runInLoop和queueInLoop函數(shù)執(zhí)行的就是上述操作

/** 1.如果事件循環(huán)不屬于當前這個線程,就不能直接調(diào)用回調(diào)函數(shù),應該回到自己所在線程調(diào)用* 2.此時需要先添加到自己的隊列中存起來,然后喚醒自己所在線程的io復用函數(shù)(poll)* 3.喚醒方法是采用eventfd,這個eventfd只有8字節(jié)的緩沖區(qū),向eventfd中寫入數(shù)據(jù)另poll返回* 4.返回后會調(diào)用在隊列中的函數(shù),見EventLoop* * 舉例說明什么時候會出現(xiàn)事件驅(qū)動循環(huán)不屬于當前線程的情況* 1.客戶端close連接,服務器端某個Channel被激活,原因為EPOLLHUP* 2.Channel調(diào)用回調(diào)函數(shù),即TcpConnection的handleClose* 3.handleClose調(diào)用TcpServer為它提供的回調(diào)函數(shù)removeConnection* 4.此時執(zhí)行的是TcpServer的removeConnection函數(shù),* 解釋 * 1.因為TcpServer所在線程和TcpConnection所在的不是同一個線程* 2.這就導致將TcpServer暴露給了TcpConnection所在線程* 3.因為TcpServer需要將這個關閉的TcpConnection從tcp map中刪除* 就需要調(diào)用自己的另一個函數(shù)removeConnectionInLoop* 4.為了實現(xiàn)線程安全性,也就是為了讓removeConnectionInLoop在TcpServer自己所在線程執(zhí)行* 需要先把這個函數(shù)添加到隊列中存起來,等到回到自己的線程在執(zhí)行* 5.runInLoop中的queueInLoop就是將這個函數(shù)存起來* 6.而此時調(diào)用runInLoop的仍然是TcpConnection所在線程* 7.因為自始至終,removeConnection這個函數(shù)都還沒有結束* * 如果調(diào)用runInLoop所在線程和事件驅(qū)動循環(huán)線程是一個線程,那么直接調(diào)用回調(diào)函數(shù)就行了* * 在TcpServer所在線程中,EventLoop明明阻塞在poll上,這里為什么可以對它進行修改* 1.線程相當于一個人可以同時做兩件事情,一個EventLoop同時調(diào)用兩個函數(shù)就很正常了* 2.其實函數(shù)調(diào)用都是通過函數(shù)地址調(diào)用的,既然EventLoop可讀,就一定直到內(nèi)部函數(shù)的地址,自然可以調(diào)用* 3.而更改成員函數(shù),通過地址訪問,進而修改,也是可以的*/ void EventLoop::runInLoop(Functor cb) {if (isInLoopThread()){cb();}else{queueInLoop(std::move(cb));} }

當然了,如果這個對象所屬線程和當前線程相同,就沒有線程安全性的問題,直接調(diào)用即可。否則,就需要添加到pendingFunctors_中,這正是queueInLoop的功效

/** 由runInLoop調(diào)用,也可直接調(diào)用,作用* 1.將相應的回調(diào)函數(shù)存在事件驅(qū)動循環(huán)的隊列中,等待回到自己線程再調(diào)用它* 2.激活自己線程的事件驅(qū)動循環(huán)*/ void EventLoop::queueInLoop(Functor cb) {{MutexLockGuard lock(mutex_);pendingFunctors_.push_back(std::move(cb));}if (!isInLoopThread() || callingPendingFunctors_){wakeup();} }

此處需要上鎖保護pendingFunctors_以防止多個線程同時向它添加函數(shù)。這里的鎖體現(xiàn)了RAII方法,大括號是語句塊,把里面的變量作為臨時變量處理
因為EventLoop通常阻塞在poll上,所以添加到pendingFunctors_后需要手動喚醒它,不然它一直阻塞在poll,會耽誤了函數(shù)的執(zhí)行。喚醒的方法是使用eventfd

#include <sys/eventfd.h> int eventfd(unsigned int initval, int flags);

函數(shù)用于創(chuàng)建一個eventfd文件描述符,這個描述符可用于進程/線程間的等待/喚醒。原因是內(nèi)核只為eventfd維護一個uint64_t類型的計數(shù)器,大小應該在64位。
參數(shù)initval是這個計數(shù)器的初值
flags是一些標志,可以是下面幾個的或運算結果

  • EFD_NONBLOCK,非阻塞
  • EFD_CLOEXEC,設置close-on-exec屬性,調(diào)用exec時會自動close

eventfd也可以使用write/read等io函數(shù)進行讀寫,區(qū)別是
write每次只能寫入8字節(jié)大小的數(shù)據(jù),內(nèi)核會將這8字節(jié)大小的數(shù)值加到計數(shù)器上
read一次性讀取這個計數(shù)器的值,并把緩沖區(qū)初始化為0。如果調(diào)用read時這個計數(shù)器值就是0,那么非阻塞時會返回EAGAIN,阻塞時會等待計數(shù)器的值變?yōu)榉?

可以把這個eventfd添加到poll中,在需要喚醒時寫入8字節(jié)數(shù)據(jù),此時poll返回,執(zhí)行回調(diào)函數(shù),然后執(zhí)行在pendingFunctors_中的函數(shù)。


loop函數(shù)是EventLoop的事件驅(qū)動循環(huán),所有的Reactor模型的loop函數(shù)都差不多。執(zhí)行的就是poll和回調(diào)函數(shù)的回調(diào),以及pendingFunctors_中函數(shù)的調(diào)用

/* * 事件驅(qū)動主循環(huán)* * 1.每個TcpServer對應一個事件驅(qū)動循環(huán)線程池* 2.每個事件驅(qū)動循環(huán)線程池對應多個事件驅(qū)動循環(huán)線程* 3.每個事件驅(qū)動循環(huán)線程對應一個事件驅(qū)動主循環(huán)* 4.每個事件驅(qū)動主循環(huán)對應一個io多路復用函數(shù)* 5.每個io多路復用函數(shù)監(jiān)聽多個Channel* 6.每個Channel對應一個fd,也就對應一個TcpConnection或者監(jiān)聽套接字* 7.在poll返回后處理激活隊列中Channel的過程是同步的,也就是一個一個調(diào)用回調(diào)函數(shù)* 8.調(diào)用回調(diào)函數(shù)的線程和事件驅(qū)動主循環(huán)所在線程是同一個,也就是同步執(zhí)行回調(diào)函數(shù)* 9.線程池用在事件驅(qū)動循環(huán)上層,也就是事件驅(qū)動循環(huán)是線程池中的一個線程*/ void EventLoop::loop() {assert(!looping_);assertInLoopThread();looping_ = true;quit_ = false; // FIXME: what if someone calls quit() before loop() ?LOG_TRACE << "EventLoop " << this << " start looping";while (!quit_){/* 清空激活隊列 */activeChannels_.clear();/* epoll_wait返回后會將所有就緒的Channel添加到激活隊列activeChannel中 */pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);++iteration_;if (Logger::logLevel() <= Logger::TRACE){printActiveChannels();}// TODO sort channel by priorityeventHandling_ = true;/* 執(zhí)行所有在激活隊列中的Channel的回調(diào)函數(shù) */for (Channel* channel : activeChannels_){currentActiveChannel_ = channel;currentActiveChannel_->handleEvent(pollReturnTime_);}currentActiveChannel_ = NULL;eventHandling_ = false;/* 執(zhí)行pendingFunctors_中的所有函數(shù) */doPendingFunctors();}LOG_TRACE << "EventLoop " << this << " stop looping";looping_ = false; }

Reactor模式的loop函數(shù)大多一個樣子

  • 調(diào)用poll監(jiān)聽事件,返回之前將激活的Channel添加到激活隊列中
  • 處理激活隊列中的Channel,調(diào)用回調(diào)函數(shù)
  • muduo中多了處理pendingFunctors_中的函數(shù),在自己的線程調(diào)用自己的函數(shù)是安全的


    Channel的回調(diào)函數(shù)就是根據(jù)被激活原因調(diào)用不同的回調(diào)函數(shù),這些回調(diào)函數(shù)是在TcpConnection創(chuàng)建之初被設置的。

    簡單說一下Channel和TcpConnection的關系

    • 每個TcpConnection對象代表一個tcp連接,所以TcpConnection中需要保存用于服務器/客戶端通信的套接字,這個套接字就記錄在Channel中
    • TcpConnection在創(chuàng)建之初會為Channel設置回調(diào)函數(shù),如果套接字可讀/可寫/錯誤/關閉等就會執(zhí)行TcpConnection中的函數(shù)
    • TcpConnection在確定連接已經(jīng)建立后會向Poller注冊自己的Channel
    /* TcpConnection.cc */channel_->setReadCallback(std::bind(&TcpConnection::handleRead, this, _1));channel_->setWriteCallback(std::bind(&TcpConnection::handleWrite, this));channel_->setCloseCallback(std::bind(&TcpConnection::handleClose, this));channel_->setErrorCallback(std::bind(&TcpConnection::handleError, this));

    Channel的handleEvent如下
    tie_是TcpConnection的弱引用,在調(diào)用TcpConnection的函數(shù)之前判斷它是否還存在,如果被析構了,那么提升的shared_ptr會是null
    具體可以參考 muduo網(wǎng)絡庫學習(二)對套接字和監(jiān)聽事件的封裝Channel

    /** 根據(jù)fd激活事件的不同,調(diào)用不同的fd的回調(diào)函數(shù)*/ void Channel::handleEvent(Timestamp receiveTime) {/* * RAII,對象管理資源* weak_ptr使用lock提升成shared_ptr,此時引用計數(shù)加一* 函數(shù)返回,棧空間對象銷毀,提升的shared_ptr guard銷毀,引用計數(shù)減一*/std::shared_ptr<void> guard;if (tied_){guard = tie_.lock();if (guard){handleEventWithGuard(receiveTime);}}else{handleEventWithGuard(receiveTime);} }/** 根據(jù)被激活事件的不同,調(diào)用不同的回調(diào)函數(shù)*/ void Channel::handleEventWithGuard(Timestamp receiveTime) {eventHandling_ = true;LOG_TRACE << reventsToString();if ((revents_ & POLLHUP) && !(revents_ & POLLIN)){if (logHup_){LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLHUP";}if (closeCallback_) closeCallback_();}if (revents_ & POLLNVAL){LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLNVAL";}if (revents_ & (POLLERR | POLLNVAL)){if (errorCallback_) errorCallback_();}if (revents_ & (POLLIN | POLLPRI | POLLRDHUP)){if (readCallback_) readCallback_(receiveTime);}if (revents_ & POLLOUT){if (writeCallback_) writeCallback_();}eventHandling_ = false; }

    EventLoop沒有特別處理定時器任務,原因是定時器任務TimerQueue也被轉換成一個文件描述符添加到Poller中,所以時間一到timerfd變?yōu)榭勺x,poll就會返回,就會調(diào)用回調(diào)函數(shù)。EventLoop只提供了runAt/runAfter/runEveny三個接口用于設置定時任務。這些在 muduo網(wǎng)絡庫學習(三)定時器TimerQueue的設計中有提及

    /* * 定時器功能,由用戶調(diào)用runAt并提供當事件到了執(zhí)行的回調(diào)函數(shù)* 時間在Timestamp設置,絕對時間,單位是微秒*/ TimerId EventLoop::runAt(Timestamp time, TimerCallback cb) {/* std::move,移動語義,避免拷貝 */return timerQueue_->addTimer(std::move(cb), time, 0.0); }/** 如上,單位是微秒,相對時間*/ TimerId EventLoop::runAfter(double delay, TimerCallback cb) {Timestamp time(addTime(Timestamp::now(), delay));return runAt(time, std::move(cb)); }/** 每隔多少微秒調(diào)用一次*/ TimerId EventLoop::runEvery(double interval, TimerCallback cb) {Timestamp time(addTime(Timestamp::now(), interval));return timerQueue_->addTimer(std::move(cb), time, interval); }

    幾個C++方面的知識點

    • std::move,移動語義,避免拷貝
    • RAII,以鎖為例,構造時上鎖,析構時解鎖(函數(shù)返回時局部變量析構)
    • 花括號語句塊
    • std::unique_ptr,智能指針,不允許拷貝和賦值,獨一無二
    • std::shared_ptr,智能指針,可以拷貝賦值,存在引用計數(shù)
    • std::weak_ptr,弱引用,不增加引用計數(shù),必要時可通過lock函數(shù)提升為shared_ptr

    總結

    以上是生活随笔為你收集整理的muduo网络库学习(四)事件驱动循环EventLoop的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。

    主站蜘蛛池模板: 欧美久久一区二区三区 | 这里只有精品66 | 谁有免费黄色网址 | 免费视频www在线观看网站 | 亚洲综合丁香 | 亚洲一线在线观看 | 爱操综合| 成人在线免费视频 | 国产精品99久久久精品无码 | 黄色小视频大全 | 午夜激情在线观看 | 男女羞羞在线观看 | 亚洲国产乱| 美女扒开腿男人爽桶 | 欧美xxxxx牲另类人与 | 欧美黑人一级 | 中国亚洲女人69内射少妇 | 日韩欧美一区二区区 | 久久网伊人 | 欧美成人精品一区二区三区在线看 | 春色网站 | 2019国产在线 | 精品视频区 | 国产黄av | 少妇视频网| 少妇真人直播免费视频 | 日韩精品视频一区二区在线观看 | 亚欧洲乱码视频 | 人人看人人模 | 亚洲一区在线视频 | 日精品 | 国产男女在线 | 国产一区一一区高清不卡 | 精品一区二区国产 | 五月天国产精品 | 天堂av亚洲av国产av电影 | 久久精品2019中文字幕 | 乳孔很大能进去的av番号 | 亚洲精品天堂在线 | 在线观看二区 | 久久国产乱 | 亚洲 欧美 日韩 在线 | 色羞羞| 日本 奴役 捆绑 受虐狂xxxx | 久久官网| av777777| 一本色道久久综合无码人妻 | 最新在线黄色网址 | 三年中国片在线高清观看 | 天天干 夜夜操 | 天天爽夜夜爽夜夜爽精品 | 午夜一本 | 超碰69| 黄色裸体网站 | www国产亚洲精品 | 在线观看xxxx | 国产乱人对白 | 黄色片在线免费 | 国产欧美综合一区二区三区 | 精品人妻一区二区三区免费看 | 欧美色图久久 | 超碰激情| 在线视频一区二区 | 黄色高清免费 | 97人人澡人人爽人人模亚洲 | 一本大道熟女人妻中文字幕在线 | 国产高清视频在线免费观看 | 噜噜噜精品欧美成人 | 黄色特级视频 | 国产馆av | 国产精品系列在线播放 | 日韩av一区在线播放 | 久久精品a亚洲国产v高清不卡 | 欧美视频| 久久人精品 | 日韩成人激情视频 | 亚洲风情av| 华人色 | 日韩欧美中文字幕一区二区 | 狼色网| 日韩欧美成 | 手机看片日韩在线 | 就去干成人网 | 岛国大片在线观看 | 一本加勒比hezyo黑人 | 成人在线视频一区 | 中文字幕亚洲成人 | 狠狠操综合网 | 国产福利在线视频 | av永久免费网站 | 卡一卡二卡三 | 影音先锋成人 | 高清日韩av | 尤物网站在线观看 | 久久久久久久久久久久久久免费看 | 一级免费在线观看 | 日本va在线 | 欧美123| 欧美1区2区 |