libev实现分析
libev是一個事件驅(qū)動庫,底層是基于select、epoll、kqueue等I/O復(fù)用接口。所謂事件驅(qū)動庫,就是用戶定義一個事件以及改事件發(fā)生時調(diào)用的函數(shù),該庫會監(jiān)聽該事件,并在事件發(fā)生時調(diào)用相應(yīng)的函數(shù)。
libev提供了很多事件監(jiān)聽器(watcher),最主要的有IO、時間以及信號監(jiān)聽器。當(dāng)某一個文件的讀事件或者寫事件發(fā)生時,周期時間到了時,進(jìn)程接收到某個信號時,就會調(diào)用用戶定義的回調(diào)函數(shù)。
下面以IO事件為例,講述libev的工作原理:
1、實(shí)例
1 #include<stdio.h> 2 #include <ev.h> 3 // every watcher type has its own typedef'd struct 4 // with the name ev_TYPE 5 ev_io stdin_watcher; 6 ev_timer timeout_watcher; 7 8 // all watcher callbacks have a similar signature 9 // this callback is called when data is readable on stdin 10 static void 11 stdin_cb (EV_P_ ev_io *w, int revents) 12 { 13 puts ("stdin ready OK!"); 14 // for one-shot events, one must manually stop the watcher 15 // with its corresponding stop function. 16 ev_io_stop (EV_A_ w); 17 18 // this causes all nested ev_run's to stop iterating 19 ev_break (EV_A_ EVBREAK_ALL); 20 } 21 22 // another callback, this time for a time-out 23 static void 24 timeout_cb (EV_P_ ev_timer *w, int revents) 25 { 26 puts ("timeout"); 27 // this causes the innermost ev_run to stop iterating 28 ev_break (EV_A_ EVBREAK_ONE); 29 } 30 31 int 32 main (void) 33 { 34 // use the default event loop unless you have special needs 35 struct ev_loop *loop = EV_DEFAULT; 36 // initialise an io watcher, then start it 37 // this one will watch for stdin to become readable 38 ev_io_init (&stdin_watcher, stdin_cb, /*STDIN_FILENO*/ 0, EV_READ); 39 ev_io_start (loop, &stdin_watcher); 40 41 // initialise a timer watcher, then start it 42 // simple non-repeating 5.5 second timeout 43 ev_timer_init (&timeout_watcher, timeout_cb, 5.5, 0.); 44 ev_timer_start (loop, &timeout_watcher); 45 46 // now wait for events to arrive 47 ev_run (loop, 0); 48 // 49 // break was called, so exit 50 return 0; 51 }可以看出,libev庫的使用簡單、方便。我們只要定義個事件的監(jiān)聽器對象,初始化,開始,最后調(diào)用ev_run。不同事件監(jiān)聽器初始化的內(nèi)容也不一樣,比如,IO事件監(jiān)聽器需要初始化監(jiān)聽的文件描述符,事件以及回調(diào)函數(shù)。
2、事件監(jiān)聽器
typedef ev_watcher *W; typedef ev_watcher_list *WL; typedef ev_watcher_time *WT;typedef struct ev_watcher {int active; int pending;int priority;void *data;void (*cb)(EV_P_ struct type *w, int revents); } ev_watcher;typedef struct ev_watcher_list {int active; int pending;int priority;void *data;void (*cb)(EV_P_ struct ev_watcher_list *w, int revents);struct ev_watcher_list *next; } ev_watcher_list;typedef struct ev_io {int active; int pending;int priority;void *data;void (*cb)(EV_P_ struct ev_io *w, int revents);struct ev_watcher_list *next;int fd; /* ro */int events; /* ro */ } ev_io; ev_watcher是一個基礎(chǔ)監(jiān)聽器,包括回調(diào)函數(shù)cd;監(jiān)聽器列表(ev_watcher_list)是在監(jiān)聽器的基礎(chǔ)上添加指向下一個監(jiān)聽器的指針next;IO監(jiān)聽器是在監(jiān)聽器列表的基礎(chǔ)上加上了其特有的文件描述符和事件類型。 ev_io_init (&stdin_watcher, stdin_cb, /*STDIN_FILENO*/ 0, EV_READ);該函數(shù)就是初始化stdin_watcher這個監(jiān)聽器的回調(diào)函數(shù),文件描述符,以及事假類型。 3、struct ev_loop struct ev_loop; # define EV_P struct ev_loop *loop # define EV_P_ struct ev_loop *loop, # define EV_A loop # define EV_A_ loop, //這4個宏用于形參struct ev_loop{ //部分參數(shù)ANFD *anfdsint anfdmaxint *fdchangesint fdchangemaxint fdchangecntANPENDING *pendings [NUMPRI]int pendingmax [NUMPRI]int pendingcnt [NUMPRI]int backendint backend_fdvoid (*backend_modify)(EV_P_ int fd, int oev, int nev)void (*backend_poll)(EV_P_ ev_tstamp timeout) }typedef struct {WL head; //ev_watcher_list *head;unsigned char events; /* the events watched for */unsigned char reify; /* flag set when this ANFD needs reification (EV_ANFD_REIFY, EV__IOFDSET) */unsigned char emask; /* the epoll backend stores the actual kernel mask in here */unsigned char unused;unsigned int egen; /* generation counter to counter epoll bugs */SOCKET handle;OVERLAPPED or, ow; } ANFD;typedef struct {W w; //ev_watcher *w;int events; /* the pending event set for the given watcher */ } ANPENDING; ev_run函數(shù)主要是一個while循環(huán),在這個while循環(huán)中不斷檢測各個事件是否發(fā)生,如果發(fā)生就調(diào)用其回調(diào)函數(shù)。而這個過程中,主要用到的對象就是struct ev_loop結(jié)構(gòu)體對象,檢測哪些事件,回調(diào)哪個函數(shù)都存放在該對象中。 struct ev_loop結(jié)構(gòu)體中的字段很多,以IO事件為例介紹幾個主要的: anfds是一個數(shù)組,數(shù)組元素是結(jié)構(gòu)體ANFD,ANFD有一個成員是監(jiān)聽器列表。數(shù)組下標(biāo)是文件描述符,而列表成員是監(jiān)聽該文件的事件監(jiān)聽器。所以,anfds有點(diǎn)類似散列表,以文件描述符作為鍵,以監(jiān)聽器作為值,采用開鏈法解決散列沖突。 該字段的初始化在ev_io_start函數(shù)中,主要目的是用戶定義的監(jiān)聽器告訴ev_loop。 fdchanges是一個int數(shù)組,也是在ev_io_start中初始化。存放的是監(jiān)聽了的文件描述符。這樣ev_run每次循環(huán)的時候,要先從fdchanges中取出已經(jīng)監(jiān)聽的文件描述符,再以該描述符為下標(biāo),從anfds中取出監(jiān)聽器對象。這樣就得到文件描述符以及監(jiān)聽 的事件。pendings是一個二維數(shù)組,第一維是優(yōu)先級,第二維是監(jiān)聽器。這個數(shù)組是用于執(zhí)行相應(yīng)的回調(diào)函數(shù),根據(jù)優(yōu)先級,遍歷所有監(jiān)聽器,調(diào)用監(jiān)聽器的回調(diào)函數(shù)。3、ev_io_start ev_io_start (EV_P_ ev_io *w) EV_THROW {ev_start (EV_A_ (W)w, 1); //w->active = active;array_needsize (ANFD, anfds, anfdmax, fd + 1, array_init_zero); //when fd + 1 > anfdmax : 重新分配數(shù)組大小//anfds = (type *)array_realloc(sizeof (ANFD), (anfds), &(anfdmax), (fd + 1));wlist_add (&anfds[fd].head, (WL)w); // w->next = *head;*head = w; fd_change (EV_A_ fd, w->events & EV__IOFDSET | EV_ANFD_REIFY); //nfds [fd].reify |= flags;//++fdchangecnt;//array_needsize (int, fdchanges, fdchangemax, fdchangecnt, EMPTY2);//fdchanges [fdchangecnt - 1] = fd; }array_realloc (int elem, void *base, int *cur, int cnt) {*cur = array_nextsize (elem, *cur, cnt);return ev_realloc (base, elem * *cur); }//分配當(dāng)前數(shù)組大小的兩倍內(nèi)存,如果大于4096,則為4096的倍數(shù) int array_nextsize (int elem, int cur, int cnt) {int ncur = cur + 1;doncur <<= 1;while (cnt > ncur);/* if size is large, round to MALLOC_ROUND - 4 * longs to accommodate malloc overhead */if (elem * ncur > MALLOC_ROUND - sizeof (void *) * 4) //#define MALLOC_ROUND 4096 {ncur *= elem;ncur = (ncur + elem + (MALLOC_ROUND - 1) + sizeof (void *) * 4) & ~(MALLOC_ROUND - 1);ncur = ncur - sizeof (void *) * 4;ncur /= elem;}return ncur; }ev_io_start函數(shù)主要就是對ev_loop的anfds和fdchanges字段操作,上面已介紹。array_needsize函數(shù)實(shí)現(xiàn)當(dāng)數(shù)組大小不夠時,要重新分配內(nèi)存,分配方式與stl::vector有些類似,都是新分配的內(nèi)存為當(dāng)前內(nèi)存的2倍,然后移動原先數(shù)據(jù)到新內(nèi)存,釋放舊內(nèi)存。
4、ev_run ev_run (EV_P_ int flags) {...do{fd_reify (EV_A);backend_poll (EV_A_ waittime);if (expect_false (checkcnt))queue_events (EV_A_ (W *)checks, checkcnt, EV_CHECK);EV_INVOKE_PENDING;}while(...)... } ev_run函數(shù)代碼比較多,以上是以IO事件為例,進(jìn)行的精簡。下面是以epoll作為IO多路復(fù)用的機(jī)制進(jìn)行ev_run說明 1、ev_loop對象的初始化: ev_loop對象初始化: 1、# define EV_DEFAULT ev_default_loop (0)2、 static struct ev_loop default_loop_struct; EV_API_DECL struct ev_loop *ev_default_loop_ptr = 0;ev_default_loop (unsigned int flags) EV_THROW {if (!ev_default_loop_ptr){EV_P = ev_default_loop_ptr = &default_loop_struct;loop_init (EV_A_ flags);}return ev_default_loop_ptr; }3、 static void noinline ecb_cold loop_init (EV_P_ unsigned int flags) EV_THROW {flags = atoi (getenv ("LIBEV_FLAGS")); #if EV_USE_IOCPif (!backend && (flags & EVBACKEND_IOCP )) backend = iocp_init (EV_A_ flags); #endif #if EV_USE_PORTif (!backend && (flags & EVBACKEND_PORT )) backend = port_init (EV_A_ flags); #endif #if EV_USE_KQUEUEif (!backend && (flags & EVBACKEND_KQUEUE)) backend = kqueue_init (EV_A_ flags); #endif #if EV_USE_EPOLLif (!backend && (flags & EVBACKEND_EPOLL )) backend = epoll_init (EV_A_ flags); #endif #if EV_USE_POLLif (!backend && (flags & EVBACKEND_POLL )) backend = poll_init (EV_A_ flags); #endif #if EV_USE_SELECTif (!backend && (flags & EVBACKEND_SELECT)) backend = select_init (EV_A_ flags); } EV_USE_EPOLL 、EV_USE_SELECT等宏是在調(diào)用./configure時,搜索sys/epoll.h sys/select.h等文件,如果文件存在,就將宏設(shè)置為1. __cplusplus宏是g++編譯器定義的4、 int inline_size epoll_init (EV_P_ int flags) {backend_fd = epoll_create (256);if (backend_fd < 0)return 0;fcntl (backend_fd, F_SETFD, FD_CLOEXEC);backend_mintime = 1e-3; /* epoll does sometimes return early, this is just to avoid the worst */backend_modify = epoll_modify;backend_poll = epoll_poll;epoll_eventmax = 64; /* initial number of events receivable per poll */epoll_events = (struct epoll_event *)ev_malloc (sizeof (struct epoll_event) * epoll_eventmax);return EVBACKEND_EPOLL;//即 EVBACKEND_EPOLL = 0x00000004U } epoll_init返回值為非0,所以不會調(diào)用后面的poll_init、select_init。所以,在初始化時,調(diào)用了epoll_create初始化backend_fd。
2、fd_reify
fd_reify (EV_P) {for (i = 0; i < fdchangecnt; ++i){int fd = fdchanges [i];ANFD *anfd = anfds + fd;if (o_reify & EV__IOFDSET)backend_modify (EV_A_ fd, o_events, anfd->events); //即poll_modify }fdchangecnt = 0; } epoll_modify (EV_P_ int fd, int oev, int nev) {struct epoll_event ev;ev.data.u64 = (uint64_t)(uint32_t)fd| ((uint64_t)(uint32_t)++anfds [fd].egen << 32);ev.events = (nev & EV_READ ? EPOLLIN : 0)| (nev & EV_WRITE ? EPOLLOUT : 0);if (expect_true (!epoll_ctl (backend_fd, oev && oldmask != nev ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &ev)))return; }在fd_reify中將andfs中監(jiān)聽的事件添加到backend_fd中。
3、backend_poll
backend_poll (EV_A_ waittime); // 即epoll_poll epoll_poll (EV_P_ ev_tstamp timeout) {eventcnt = epoll_wait (backend_fd, epoll_events, epoll_eventmax, timeout * 1e3);for (i = 0; i < eventcnt; ++i){struct epoll_event *ev = epoll_events + i;int fd = (uint32_t)ev->data.u64; /* mask out the lower 32 bits */int want = anfds [fd].events;int got = (ev->events & (EPOLLOUT | EPOLLERR | EPOLLHUP) ? EV_WRITE : 0)| (ev->events & (EPOLLIN | EPOLLERR | EPOLLHUP) ? EV_READ : 0);fd_event (EV_A_ fd, got); //將watcher設(shè)置到loop的pending數(shù)組中 } } fd_event (EV_P_ int fd, int revents) {ANFD *anfd = anfds + fd;if (expect_true (!anfd->reify))fd_event_nocheck (EV_A_ fd, revents); } fd_event_nocheck (EV_P_ int fd, int revents) {ANFD *anfd = anfds + fd;ev_io *w;for (w = (ev_io *)anfd->head; w; w = (ev_io *)((WL)w)->next){int ev = w->events & revents;if (ev)ev_feed_event (EV_A_ (W)w, ev);} } ev_feed_event (EV_P_ void *w, int revents) EV_THROW {W w_ = (W)w;int pri = ABSPRI (w_);if (expect_false (w_->pending))pendings [pri][w_->pending - 1].events |= revents;else{w_->pending = ++pendingcnt [pri];array_needsize (ANPENDING, pendings [pri], pendingmax [pri], w_->pending, EMPTY2);pendings [pri][w_->pending - 1].w = w_;pendings [pri][w_->pending - 1].events = revents;} }在backend_poll中會調(diào)用epoll_wait,通過fd_event函數(shù),將就緒的文件描述符對應(yīng)的監(jiān)聽器添加到ev_loop對象的pendings字段中
4、EV_INVOKE_PENDING
void noinline ev_invoke_pending (EV_P) {pendingpri = NUMPRI;while (pendingpri) /* pendingpri possibly gets modified in the inner loop */{--pendingpri;while (pendingcnt [pendingpri]){ANPENDING *p = pendings [pendingpri] + --pendingcnt [pendingpri];p->w->pending = 0;EV_CB_INVOKE (p->w, p->events);}} } # define EV_CB_INVOKE(watcher,revents) (watcher)->cb (EV_A_ (watcher), (revents))EV_INVOKE_PENDING會一次調(diào)用pendings中的監(jiān)聽器的回調(diào)函數(shù)。
至此,ev_run大體介紹完畢。
小結(jié):
總的來說,對于IO事件驅(qū)動,libev是先將監(jiān)聽器存放在一個數(shù)組,每次遍歷都將監(jiān)聽器監(jiān)聽的文件描述符添加到epoll_wait進(jìn)行監(jiān)聽,然后將eopll_wait返回的就緒描述符對應(yīng)的監(jiān)聽器添加到pendings,最后調(diào)用pendings中監(jiān)聽器的回調(diào)函數(shù)。
總結(jié)
- 上一篇: 使用异步 I/O 大大提高应用程序的性能
- 下一篇: libuv 中文编程指南