Redis源码剖析(二)io多路复用函数及事件驱动流程
作為服務(wù)器監(jiān)聽客戶端請求的方法,io多路復(fù)用起到了不可忽略的作用,利用io復(fù)用監(jiān)聽的方法叫Reactor模式,在前一篇也提到過,使用io復(fù)用是現(xiàn)在常用的提高并發(fā)性的方法,而且效果顯著。
通常io多路復(fù)用連同事件回調(diào)是一起出現(xiàn)的,在將文件描述符(套接字)注冊到io多路復(fù)用函數(shù)中時,同時也需要保存當(dāng)這個文件描述符被激活時調(diào)用的函數(shù)(稱作回調(diào)函數(shù)),這樣,使用者無需考慮何時事件被激活又何時調(diào)用相應(yīng)處理函數(shù),只管注冊即可,執(zhí)行回調(diào)函數(shù)的任務(wù)由Reactor接管,極大提高了并發(fā)性
在C語言中,回調(diào)函數(shù)通常是以函數(shù)指針的形式出現(xiàn)的(參考libevent)
在C++語言中,回調(diào)函數(shù)可以是函數(shù)指針,但是通常會是通過std::bind綁定的std::function對象,當(dāng)然隨著C++11的出現(xiàn),也可以以lambda代替std::bind
既然Redis是C語言實現(xiàn)的,就老老實實使用函數(shù)指針好了,不過在此之前,先簡單復(fù)習(xí)一下io多路復(fù)用函數(shù)
io多路復(fù)用函數(shù)
Linux平臺三種io多路復(fù)用函數(shù)的區(qū)別
在不同的平臺(linux,window),存在著不同的io復(fù)用函數(shù),以Linux平臺為例,就有select,poll,epoll三種,這三種的區(qū)別主要在于監(jiān)聽事件的底層方法不同,從而導(dǎo)致效率的差異
- select是早期Linux引入的io復(fù)用函數(shù),底層采用輪詢的方法判斷每個文件描述符是否被激活。所謂輪詢就是一遍遍的遍歷,依次判斷每一個文件描述符的狀態(tài),效率可想而知,慢
- poll是在select之后引入的,使用方法上稍微簡單于select,但是仍然沒有擺脫輪詢帶來的問題
- epoll作為輪詢的終結(jié)者,底層沒有采用輪詢的方法,而是基于事件回調(diào)的。簡單的說,就是在內(nèi)核中當(dāng)文件描述符被激活時都會調(diào)用一個回調(diào)函數(shù),epoll根據(jù)回調(diào)函數(shù)直接定位文件描述符,極大提高了效率,同時也減輕了CPU的負擔(dān),不用一遍遍輪詢
當(dāng)然,除了效率問題,三者在使用上也是存在諸多差異
select接口
/* * maxfds : 最大的文件描述符 + 1* readfs : 可讀事件集* writefds : 可寫事件集* exceptfds : 其它(錯誤)事件集* tvptr : 超時時間*/ int select(int maxfds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, struct timeval* tvptr);其中fd_set結(jié)構(gòu)保存的是需要監(jiān)聽的文件描述符,select將可讀,可寫,其它(錯誤)事件分開監(jiān)聽,返回被激活描述符的個數(shù)。但是仍需要一個一個遍歷使用FD_ISSET判斷是否被激活
poll接口
/* * fdarray[] : 監(jiān)聽事件集* nfds : 監(jiān)聽事件個數(shù)* timeout : 超時時間*/ int poll(struct pollfd fdarray[], nfds_t nfds, int timeout);在pollfd結(jié)構(gòu)中保存需要監(jiān)聽的文件描述符,需要監(jiān)聽的事件,激活原因。使用起來比select簡便的多
epoll接口
/* * epollfd : epoll文件描述符,用于監(jiān)聽所有的注冊事件* events : 保存所有激活事件* maxevents : events最大可容納的激活事件個數(shù)* timeout : 超時時間*/ int epoll_wait(int epollfd, struct epoll_event* events, int maxevents, int timeout);epoll_event結(jié)構(gòu)保存了監(jiān)聽的文件描述符,監(jiān)聽的事件以及激活原因,與select和poll不同的是,epoll_wait直接將所有激活的事件保存在events中,這樣就不需要一個個遍歷判斷哪個激活了
Redis對io多路復(fù)用的封裝
接下來以epoll為例,了解Redis內(nèi)部是如何封裝io多路復(fù)用的
為了將所有io復(fù)用統(tǒng)一,Redis為所有io復(fù)用統(tǒng)一了類型名aeApiState,對于epoll而言,類型成員就是調(diào)用epoll_wait所需要的參數(shù)
//ae_epoll.c typedef struct aeApiState {int epfd; //epollfd,文件描述符struct epoll_event *events; //保存激活的事件(epoll_event) } aeApiState;為什么保存兩個就夠了呢,epoll_wait明明需要4個參數(shù)。原因是在Redis初始化時,已經(jīng)將保存激活事件的數(shù)組(events)的容量調(diào)至最大,所以maxevents只需要設(shè)置成最大即可,無需保存。對于超時時間,Redis的策略是在時間事件中找到最早超時的那個,計算還有多久到達超時時間,將這個時間差(相對時間)作為io復(fù)用的超時時間
這么設(shè)計的原因是如果Redis中沒有時間事件,那么io復(fù)用函數(shù)可以一直阻塞在那里直到有事件被激活,如果有時間事件,為了不影響超時事件的回調(diào),需要在事件超時時從io復(fù)用中返回,那么設(shè)置成超時時間是最合適的(這一點和libevent的策略相同)
接下來就是一些對epoll接口的封裝了,包括創(chuàng)建epoll(epoll_create),注冊事件(epoll_ctl),刪除事件(epoll_ctl),阻塞監(jiān)聽(epoll_wait)等
創(chuàng)建epoll就是簡單的為aeApiState申請內(nèi)存空間,然后將返回的指針保存在事件驅(qū)動循環(huán)中
//ae_epoll.c /* 創(chuàng)建epollfd,即調(diào)用::epoll_create */ static int aeApiCreate(aeEventLoop *eventLoop) {/* 申請內(nèi)存 */aeApiState *state = zmalloc(sizeof(aeApiState));if (!state) return -1;/* events用于保存激活的事件,需要足夠大的空間(不小于epoll_create時傳入的參數(shù)) *//* eventLoop->setsize是初始化時設(shè)置的最大文件描述符個數(shù) */state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);if (!state->events) {zfree(state);return -1;}/* 創(chuàng)建epoll文件描述符 */state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */if (state->epfd == -1) {zfree(state->events);zfree(state);return -1;}/* 保存io復(fù)用數(shù)據(jù)成員到事件驅(qū)動中 */eventLoop->apidata = state;return 0; }注冊事件和刪除事件就是對epoll_ctl的封裝,根據(jù)操作不同選擇不同的參數(shù),以注冊事件為例
//ae_epoll.c /* * 將文件描述符和對應(yīng)事件注冊到io多路復(fù)用中* 即調(diào)用::epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event)*/ static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {/* 從事件驅(qū)動中獲取io復(fù)用 */aeApiState *state = eventLoop->apidata;/* 用于傳給epoll_ctl的參數(shù) */struct epoll_event ee = {0}; /* avoid valgrind warning *//* If the fd was already monitored for some event, we need a MOD* operation. Otherwise we need an ADD operation. *//* 判斷是否是第一次注冊,如果是則添加否則是修改 */int op = eventLoop->events[fd].mask == AE_NONE ?EPOLL_CTL_ADD : EPOLL_CTL_MOD;ee.events = 0;/* 合并以前的監(jiān)聽事件,因為不一定是首次添加 */mask |= eventLoop->events[fd].mask; /* Merge old events *//* 根據(jù)監(jiān)聽事件的不同設(shè)置struct epoll_event中的events字段 */if (mask & AE_READABLE) ee.events |= EPOLLIN;if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;/* 保存監(jiān)聽的文件描述符 */ee.data.fd = fd;/* 調(diào)用接口 */if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;return 0; }阻塞監(jiān)聽是對epoll_wait的封裝,在返回后將激活的事件保存在事件驅(qū)動中
//ae_epoll.c /* 阻塞監(jiān)聽,即調(diào)用::epoll_wait(epollfd, struct epoll_event*, int, struct timeval*); */ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {aeApiState *state = eventLoop->apidata;int retval, numevents = 0;/* 時間單位是毫秒 */retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);/* 有事件被激活 */if (retval > 0) {int j;numevents = retval;/* 保存所有激活的事件,將其文件描述符和激活原因保存在fired數(shù)組中 */for (j = 0; j < numevents; j++) {int mask = 0;struct epoll_event *e = state->events+j;if (e->events & EPOLLIN) mask |= AE_READABLE;if (e->events & EPOLLOUT) mask |= AE_WRITABLE;if (e->events & EPOLLERR) mask |= AE_WRITABLE;if (e->events & EPOLLHUP) mask |= AE_WRITABLE;/* fired數(shù)組中只保存文件描述符和激活原因* 當(dāng)需要獲取激活事件時,根據(jù)文件描述符從eventLoop->events數(shù)組中查找 */eventLoop->fired[j].fd = e->data.fd;eventLoop->fired[j].mask = mask;}}/* 返回激活事件的個數(shù) */return numevents; }事件驅(qū)動循環(huán)流程
io復(fù)用的封裝實現(xiàn)完成,那么Redis是何時調(diào)用io復(fù)用函數(shù)的呢,這就需要從server.c/main函數(shù)入手,可以猜測到當(dāng)main函數(shù)初始化工作完成后,就需要進行事件驅(qū)動循環(huán),而在循環(huán)中,會調(diào)用io復(fù)用函數(shù)進行監(jiān)聽
在初始化完成后,main函數(shù)調(diào)用了aeMain函數(shù),傳入的參數(shù)就是服務(wù)器的事件驅(qū)動
//server.c int main(int argc, char **argv) {/* 一系列的初始化工作 */...aeMain(server.el);... }在ae_epoll.c中可以找到aeMain函數(shù),這個函數(shù)便是一直在循環(huán),每次循環(huán)會調(diào)用aeProcessEvents函數(shù)
//ae_epoll.c void aeMain(aeEventLoop *eventLoop) {eventLoop->stop = 0;/* 一直循環(huán)監(jiān)聽 */while (!eventLoop->stop) {if (eventLoop->beforesleep != NULL)eventLoop->beforesleep(eventLoop);aeProcessEvents(eventLoop, AE_ALL_EVENTS);} }可以猜測,aeProcessEvents函數(shù)中一定調(diào)用io復(fù)用函數(shù)進行監(jiān)聽,當(dāng)io復(fù)用返回后,執(zhí)行每個激活事件的回調(diào)函數(shù),這個函數(shù)比較長,但是還是蠻好理解的
/* 每次事件循環(huán)都會調(diào)用一次該函數(shù) */ int aeProcessEvents(aeEventLoop *eventLoop, int flags) {int processed = 0, numevents;if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;if (eventLoop->maxfd != -1 ||((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {int j;aeTimeEvent *shortest = NULL;struct timeval tv, *tvp;/* 為io復(fù)用函數(shù)尋找超時時間(通常是最先超時的時間事件的時間(相對時間)) *//* redis中有時間事件 */if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))shortest = aeSearchNearestTimer(eventLoop);/* 根據(jù)最早超時的那個時間事件獲取超時的相對時間 */if (shortest) {long now_sec, now_ms;/* 獲取當(dāng)前時間 */aeGetTime(&now_sec, &now_ms);tvp = &tv;/* 計算時間差(相對時間) */long long ms =(shortest->when_sec - now_sec)*1000 +shortest->when_ms - now_ms;if (ms > 0) {tvp->tv_sec = ms/1000;tvp->tv_usec = (ms % 1000)*1000;} else {tvp->tv_sec = 0;tvp->tv_usec = 0;}} else {/* 如果沒有時間事件,那么io復(fù)用要么一直等,要么不等,取決于flags的設(shè)置 *//* 傳入的struct timeval*是NULL表示一直等直到有事件被激活* 傳入的timeval->tv_src = timeval->tv_usec = 0表示不等,直接返回 */if (flags & AE_DONT_WAIT) {tv.tv_sec = tv.tv_usec = 0;tvp = &tv;} else {/* Otherwise we can block */tvp = NULL; /* wait forever */}}/* 調(diào)用io復(fù)用函數(shù),返回被激活事件的個數(shù),所有被激活的事件保存在epollLoop->fired數(shù)組中 */numevents = aeApiPoll(eventLoop, tvp);for (j = 0; j < numevents; j++) {/* fired只保存的文件描述符和激活原因,實際的文件事件仍需要從events數(shù)組中取出 */aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];int mask = eventLoop->fired[j].mask;int fd = eventLoop->fired[j].fd;int rfired = 0;/* 根據(jù)激活原因調(diào)用回調(diào)函數(shù)(先執(zhí)行可讀,再執(zhí)行可寫) */if (fe->mask & mask & AE_READABLE) {rfired = 1;fe->rfileProc(eventLoop,fd,fe->clientData,mask);}if (fe->mask & mask & AE_WRITABLE) {if (!rfired || fe->wfileProc != fe->rfileProc)fe->wfileProc(eventLoop,fd,fe->clientData,mask);}processed++;}}/* 處理可能超時的時間事件 */if (flags & AE_TIME_EVENTS)processed += processTimeEvents(eventLoop);return processed; /* return the number of processed file/time events */ }至此一次事件驅(qū)動循環(huán)就執(zhí)行完畢,里面的細節(jié)比較多,比如如何為io復(fù)用函數(shù)尋找超時時間,如果從激活事件調(diào)用回調(diào)函數(shù),如果處理已超時事件等
Redis對于時間事件是采用鏈表的形式記錄的,這導(dǎo)致每次尋找最早超時的那個事件都需要遍歷整個鏈表,容易造成性能瓶頸。而libevent是采用最小堆記錄時間事件,尋找最早超時事件只需要O(1)的復(fù)雜度
如何選擇合適的io多路復(fù)用函數(shù)
到目前位置還有一個問題沒有解決,既然有那么多io復(fù)用函數(shù),Redis怎么知道應(yīng)該選擇哪個呢,Redis的策略是選擇當(dāng)前平臺存在的,效率最高的io復(fù)用函數(shù)
#ifdef HAVE_EVPORT #include "ae_evport.c" #else#ifdef HAVE_EPOLL#include "ae_epoll.c"#else#ifdef HAVE_KQUEUE#include "ae_kqueue.c"#else#include "ae_select.c"#endif#endif #endif小結(jié)
其實任何一個基于網(wǎng)絡(luò)請求的程序在這部分的內(nèi)容都是相似的,無非就是Reactor模式的實現(xiàn),不過畢竟Redis主要內(nèi)容在數(shù)據(jù)庫方面,網(wǎng)絡(luò)這一塊不會太過苛刻,如果只是想要學(xué)習(xí)服務(wù)器設(shè)計,可以參考libevent(C語言),muduo(C++語言)
總結(jié)
以上是生活随笔為你收集整理的Redis源码剖析(二)io多路复用函数及事件驱动流程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 每天一道LeetCode-----将二叉
- 下一篇: Redis源码剖析(三)字典结构的设计与