Linux网络编程 | 多路复用I/O :select、poll、epoll、水平触发与边缘触发、惊群问题
文章目錄
- 多路復(fù)用IO
- 多路復(fù)用IO的概念
- 多路復(fù)用IO與多線程/多進程的并發(fā)
- 多路復(fù)用IO模型進行服務(wù)器并發(fā)處理
- 多線程/多進程進行服務(wù)器并發(fā)處理
- select
- 工作原理
- 接口
- 優(yōu)缺點
- select的封裝
- select模型實現(xiàn)TCP服務(wù)器
- poll
- 工作原理
- 接口
- 優(yōu)缺點
- poll模型實現(xiàn)TCP服務(wù)器
- epoll
- 工作原理
- 接口
- 優(yōu)缺點
- epoll的封裝
- epoll的工作模式
- LT模式(水平觸發(fā))
- ET模式(邊緣觸發(fā))
- LT水平觸發(fā)與ET邊緣觸發(fā)
- epoll LT模式實現(xiàn)TCP服務(wù)器
- epoll ET模式實現(xiàn)TCP服務(wù)器
- 驚群問題
- 多線程環(huán)境下驚群問題的解決方法
- 多進程環(huán)境下驚群問題的解決方法
多路復(fù)用IO
多路復(fù)用IO的概念
多路復(fù)用IO用于對大量描述符進行IO就緒事件監(jiān)控,能夠讓用戶只針對就緒了指定事件的描述符進行操作。
IO的就緒事件分為可讀、可寫、異常
- 可讀事件:一個描述符對應(yīng)的緩沖區(qū)中有數(shù)據(jù)可讀
- 可寫事件:一個描述符對應(yīng)的緩沖區(qū)中有剩余空間可以寫入數(shù)據(jù)
- 異常事件:一個描述符發(fā)生了特定的異常信息
相比較于其他IO方式,多路復(fù)用IO 避免了對沒有就緒的描述符進行操作而帶來的阻塞,同時只針對已就緒的描述符進行操作,提高了效率
在Linux下,操作系統(tǒng)提供了三種模型:select模型、poll模型、epoll模型。
多路復(fù)用IO與多線程/多進程的并發(fā)
多路復(fù)用IO模型進行服務(wù)器并發(fā)處理
即在單執(zhí)行流中進行輪詢處理就緒的描述符。如果就緒的描述符較多時,很難做到負(fù)載均衡(最后一個描述符要等待很長時間,前邊的描述符處理完了才能處理它)。
解決這一問題的方法就是在用戶態(tài)實現(xiàn)負(fù)載均衡,規(guī)定每個描述符只能讀取指定數(shù)量的數(shù)據(jù),讀取了就進行下一個描述符。
多路復(fù)用IO模型適用于有大量描述符需要監(jiān)控,但是同一時間只有少量活躍的場景
多線程/多進程進行服務(wù)器并發(fā)處理
即操作系統(tǒng)通過輪詢調(diào)度執(zhí)行流實現(xiàn)每個執(zhí)行流中描述符的處理
由于其在內(nèi)核態(tài)實現(xiàn)了負(fù)載均衡,所以不需要用戶態(tài)做過多操作
多路復(fù)用適合于IO密集型服務(wù),多進程或線程適合于CPU密集型服務(wù),它們各有各的優(yōu)勢,并不存在誰取代誰的傾向。基于兩者的特點,通常可以將多路復(fù)用IO和多線程/多進程搭配一起使用。
使用多路復(fù)用IO監(jiān)控大量的描述符,哪個描述符有事件到來,就創(chuàng)建執(zhí)行流去處理。這樣做的好處是防止直接創(chuàng)建執(zhí)行流而描述符還未就緒,浪費資源。
select
工作原理
定義指定監(jiān)控事件的描述符集合(即位圖),初始化集合后,將需要監(jiān)控指定事件的描述符添加到指定事件(可讀、可寫、異常)的描述符集合中
將描述符集合拷貝到內(nèi)核當(dāng)中,對集合中所有描述符進行輪詢判斷,當(dāng)描述符就緒或者等待超時后就調(diào)用返回,返回后的集合中只剩下已就緒的描述符(未就緒會在位圖中置為0)
通過遍歷描述符,判斷哪些描述符還在集合中,就可以知道哪些描述符已經(jīng)就緒了,開始處理對應(yīng)的IO時間。
接口
//清空集合 void FD_ZERO(fd_set *set);//向集合中添加描述符fd void FD_SET(int fd, fd_set *set);//從集合中刪除描述符fd void FD_CLR(int fd, fd_set *set);//判斷描述符是否還在集合中 int FD_ISSET(int fd, fd_set *set);//發(fā)起調(diào)用將集合拷貝到內(nèi)核中并進行監(jiān)控 int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, struct timeval *timeout);/*fd:文件描述符set:描述符位圖nfds:集合中最大描述符數(shù)值+1readfds:可讀事件集合writefds:可寫事件集合exceptfds:異常事件集合timeout:超時等待時間timeval結(jié)構(gòu)體有兩個成員struct timeval {long tv_sec; 毫秒long tv_usec; 微秒}; */優(yōu)缺點
缺點:
優(yōu)點:
select的封裝
為了能讓select使用更加便利,對其進行一層封裝。
#ifndef __SELECT_H_ #define __SELECT_H_#include<iostream> #include<vector> #include<sys/socket.h> #include"TcpSocket.hpp"class Select {public:Select() : _maxfd(-1){//將集合初始化清空FD_ZERO(&_rfds);} //向集合中添加描述符bool Add(const TcpSocket& socket){int fd = socket.GetFd();FD_SET(fd, &_rfds);//如果新增描述符比最大描述符大,則更新if(fd > _maxfd){_maxfd = fd;}return true;}//從集合中刪除描述符bool Del(const TcpSocket& socket) {int fd = socket.GetFd();FD_CLR(fd, &_rfds);//如果被刪除的描述符是最大的,則從后往前再找一個if(fd == _maxfd){for(int i = _maxfd; i >= 0; i--){//如果這個描述符在集合中,則更新最大值if(FD_ISSET(i, &_rfds)){_maxfd = i;break;}}}return true;}//從集合中找到所有就緒的描述符bool Wait(std::vector<TcpSocket>& vec, int outlime = 3) {struct timeval tv;//以毫秒為單位tv.tv_sec = outlime;//計算剩余的微秒tv.tv_usec = 0;//因為select會去掉集合中沒就緒的描述符,所以不能直接操作集合,只能操作集合的拷貝fd_set set = _rfds;int ret = select(_maxfd + 1, &set, NULL, NULL, &tv);if(ret < 0){std::cerr << "select error" << std::endl;return false;}else if(ret == 0){std::cerr << "wait timeout" << std::endl;return true;}for(int i = 0; i < _maxfd + 1; i++){//將就緒描述符放入數(shù)組中if(FD_ISSET(i, &set)){TcpSocket socket;socket.SetFd(i);vec.push_back(socket);}}return true;} private://需要監(jiān)控的描述符,因為select會修改集合,所以每次進行操作的都是它的拷貝fd_set _rfds;//最大的描述符,因為fd_set是位圖,所以保存最大的描述符可以減少遍歷的次數(shù)。int _maxfd; };#endifselect模型實現(xiàn)TCP服務(wù)器
#include<iostream> #include<string> #include<unistd.h> #include<sys/socket.h> #include<arpa/inet.h> #include<netinet/in.h> #include"TcpSocket.hpp" #include"select.hpp"using namespace std;int main(int argc, char* argv[]) {if(argc != 3){ cerr << "正確輸入方式: ./select_srv.cc ip port\n" << endl;return -1; } string srv_ip = argv[1];uint16_t srv_port = stoi(argv[2]);TcpSocket lst_socket;//創(chuàng)建監(jiān)聽套接字CheckSafe(lst_socket.Socket());//綁定地址信息CheckSafe(lst_socket.Bind(srv_ip, srv_port));//開始監(jiān)聽CheckSafe(lst_socket.Listen());Select s;s.Add(lst_socket);while(1){vector<TcpSocket> vec; //去掉未就緒描述符bool ret = s.Wait(vec);if(ret == false){continue;}//取出就緒描述符進行處理for(auto socket : vec){//如果就緒的是監(jiān)聽套接字,則代表有新連接if(socket.GetFd() == lst_socket.GetFd()){TcpSocket new_socket;ret = lst_socket.Accept(&new_socket);if(ret == false){continue;}//新建套接字加入集合中s.Add(new_socket);}//新數(shù)據(jù)到來else{string data;//接收數(shù)據(jù)ret = socket.Recv(data);//斷開連接,移除監(jiān)控if(ret == false){s.Del(socket);socket.Close();continue;}cout << "cli send message: " << data << endl;data.clear();if(ret == false){s.Del(socket);socket.Close();continue;}}}}//關(guān)閉監(jiān)聽套接字lst_socket.Close();return 0; }poll
工作原理
接口
struct pollfd {int fd; //需要監(jiān)控的文件描述符short events; //需要監(jiān)控的事件short revents; //實際就緒的事件 }; /*操作相對簡單,如果某個描述符不需要繼續(xù)監(jiān)控時,直接將對應(yīng)結(jié)構(gòu)體中的fd置為-1即可。 *///發(fā)起監(jiān)控 int poll(struct pollfd *fds, nfds_t nfds, int timeout); /*fds:pollfd數(shù)組nfds:數(shù)組的大小timeout:超時等待時間,單位為毫秒 */優(yōu)缺點
缺點:
優(yōu)點:
poll模型實現(xiàn)TCP服務(wù)器
#include<poll.h> #include<vector> #include <sys/socket.h> #include"TcpSocket.hpp" #define MAX_SIZE 10using namespace std;int main(int argc, char* argv[]) {if(argc != 3){ cerr << "正確輸入方式: ./select_srv.cc ip port\n" << endl;return -1; } string srv_ip = argv[1];uint16_t srv_port = stoi(argv[2]);TcpSocket lst_socket;//創(chuàng)建監(jiān)聽套接字CheckSafe(lst_socket.Socket());//綁定地址信息CheckSafe(lst_socket.Bind(srv_ip, srv_port));//開始監(jiān)聽CheckSafe(lst_socket.Listen());struct pollfd poll_fd[MAX_SIZE];poll_fd[0].fd = lst_socket.GetFd();poll_fd[0].events = POLLIN;int i = 0, maxi = 0;for(i = 1; i < MAX_SIZE; i++){poll_fd[i].fd = -1;}while(1){int ret = poll(poll_fd, maxi + 1, 2000);if(ret < 0){cerr << "not ready" << endl;continue;}else if(ret == 0){cerr << "wait timeout" << endl;continue;}//監(jiān)聽套接字就緒則增加新連接if(poll_fd[0].revents & (POLLIN | POLLERR)){struct sockaddr_in addr;socklen_t len = sizeof(sockaddr_in);//創(chuàng)建一個新的套接字與客戶端建立連接int new_fd = accept(lst_socket.GetFd(), (sockaddr*)&addr, &len);for(i = 1; i < MAX_SIZE; i++){if(poll_fd[i].fd == -1){poll_fd[i].fd = new_fd;poll_fd[i].events = POLLIN;break;}}if(i > maxi){maxi = i;}if(--ret <= 0){continue;}}for(i = 1; i <= maxi; i++){ if(poll_fd[i].fd == -1){continue;}if(poll_fd[i].revents & (POLLIN | POLLERR)){//新數(shù)據(jù)到來char buff[4096] = { 0 };int ret = recv(poll_fd[i].fd, buff, 4096, 0); if(ret == 0){ std::cerr << "connect error" << std::endl;close(poll_fd[i].fd);poll_fd[i].fd = -1;} else if(ret < 0){ std::cerr << "recv error" << std::endl;close(poll_fd[i].fd);poll_fd[i].fd = -1;} else{cout << "cli send message: " << buff << endl;}if(--ret <= 0){break;}}}}lst_socket.Close();return 0; }epoll
工作原理
struct eventpoll{ .... /*紅黑樹的根節(jié)點,這顆樹中存儲著所有添加到epoll中的需要監(jiān)控的事件*/ struct rb_root rbr; /*雙鏈表中則存放著將要通過epoll_wait返回給用戶的滿足條件的事件*/ struct list_head rdlist; .... };接口
//在內(nèi)核中創(chuàng)建eventpoll結(jié)構(gòu)體,返回操作句柄(size為監(jiān)控的最大數(shù)量,但是在linux2.6.8后忽略上限,只需要給一個大于0的數(shù)字即可) int epoll_create(int size);//組織描述符事件結(jié)構(gòu)體 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); /*epfd:eventpoll結(jié)構(gòu)體的操作句柄op:操作的選項,EPOLL_CTL_ADD/EPOLL_CTL_MOD/EPOLL_CTL_DELfd:描述符event:監(jiān)控描述符對應(yīng)的事件信息結(jié)構(gòu)體struct epoll_event {uint32_t events; // 要監(jiān)控的事件,以及調(diào)用返回后實際就緒的事件 epoll_data_t data; // 聯(lián)合體,用來存放各種類型的描述符 };typedef union epoll_data {void *ptr;int fd;uint32_t u32;uint64_t u64;} epoll_data_t; *///開始監(jiān)控,當(dāng)有描述符就緒或者等待超時后調(diào)用返回 int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout); /*maxevents:events數(shù)組的結(jié)點數(shù)量timeout:超時等待時間返回值為就緒的描述符個數(shù) */優(yōu)缺點
epoll是Linux下性能最高的多路復(fù)用IO模型,幾乎具備了一切所需的優(yōu)點
缺點:
優(yōu)點:
4. 底層用的是紅黑樹存儲,監(jiān)控的描述符數(shù)量沒有上限
5. 所有的描述符事件信息只需要向內(nèi)核中拷貝一次
6. 監(jiān)控采用異步阻塞,性能不會隨著描述符增多而下降
7. 直接返回就緒描述符事件信息,可以直接對就緒描述符進行操作,不需要像select和poll一樣遍歷判斷。
epoll的封裝
#ifndef __EPOLL_H_ #define __EPOLL_H_ #include<iostream> #include<vector> #include<sys/epoll.h> #include<unistd.h> #include"TcpSocket.hpp"const int EPOLL_SIZE = 1000;class Epoll {public:Epoll(){//現(xiàn)版本已經(jīng)忽略size,隨便給一個大于0的數(shù)字即可_epfd = epoll_create(1);if(_epfd < 0){std::cerr << "epoll create error" << std::endl;exit(0);}}~Epoll(){close(_epfd);}//增加新的監(jiān)控事件bool Add(const TcpSocket& socket, bool epoll_et = false, uint32_t events = EPOLLIN) const{int fd = socket.GetFd();//組織監(jiān)控事件結(jié)構(gòu)體struct epoll_event ev;ev.data.fd = fd;//設(shè)置需要監(jiān)控的描述符if(epoll_et == true){ev.events = events | EPOLLET;}else{ ev.events = events;}int ret = epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &ev);if(ret < 0){std::cerr << "epoll ctl add error " << std::endl;return false;}return true;}//刪除監(jiān)控事件bool Del(const TcpSocket& socket) const {int fd = socket.GetFd();int ret = epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL);if(ret < 0){std::cerr << "epoll ctl del error" << std::endl;return false;}return true;}//開始監(jiān)控bool Wait(std::vector<TcpSocket>& vec, int timeout = 3000) const {vec.clear();struct epoll_event evs[EPOLL_SIZE];//開始監(jiān)控,返回值為就緒描述符數(shù)量int ret = epoll_wait(_epfd, evs, EPOLL_SIZE, timeout);//當(dāng)前沒有描述符就緒if(ret < 0){std::cerr << "epoll not ready" << std::endl;return false;}//等待超時else if(ret == 0){std::cerr << "epoll wait timeout" << std::endl;return false;}for(int i = 0; i < ret; i++){//將所有就緒描述符放進數(shù)組中TcpSocket new_socket;new_socket.SetFd(evs[i].data.fd);vec.push_back(new_socket);}return true;}private://epoll的操作句柄int _epfd; };#endifepoll的工作模式
epoll有兩種工作模式,LT模式(水平觸發(fā)模式)和ET模式(邊緣觸發(fā)模式)。
LT模式(水平觸發(fā))
LT模式也就是水平觸發(fā)模式,是epoll的默認(rèn)觸發(fā)模式(select和poll只有這種模式)
觸發(fā)條件
可讀事件:接受緩沖區(qū)中的數(shù)據(jù)大小高于低水位標(biāo)記,則會觸發(fā)事件
可寫事件:發(fā)送緩沖區(qū)中的剩余空間大小大于低水位標(biāo)記,則會觸發(fā)事件
低水位標(biāo)記:一個基準(zhǔn)值,默認(rèn)是1
所以簡單點說,水平觸發(fā)模式就是只要緩沖區(qū)中還有數(shù)據(jù),就會一直觸發(fā)事件
- 當(dāng)epoll檢測到socket上事件就緒的時候, 可以不立刻進行處理. 或者只處理一部分.
- 如上面的例子, 由于只讀了1K數(shù)據(jù), 緩沖區(qū)中還剩1K數(shù)據(jù), 在第二次調(diào)用 epoll_wait 時, epoll_wait 仍然會立刻返回并通知socket讀事件就緒.
- 直到緩沖區(qū)上所有的數(shù)據(jù)都被處理完, epoll_wait 才不會立刻返回.
- 支持阻塞讀寫和非阻塞讀寫
ET模式(邊緣觸發(fā))
ET模式也就是邊緣觸發(fā)模式,如果我們在第1步將socket添加到epoll_event描述符的時候使用了EPOLLET標(biāo)志, epoll就會進入ET工作模式
觸發(fā)條件
可讀事件:(不關(guān)心接受緩沖區(qū)是否有數(shù)據(jù))每當(dāng)有新數(shù)據(jù)到來時,才會觸發(fā)事件。
可寫事件:剩余空間從無到有的時候才會觸發(fā)事件(即從不可寫到可寫)
簡單點說,ET模式下只有在新數(shù)據(jù)到來的情況下才會觸發(fā)事件。這也就要求我們在新數(shù)據(jù)到來的時候最好能夠一次性將所有數(shù)據(jù)取出,否則不會觸發(fā)第二次事件,只有等到下次再有新數(shù)據(jù)到來才會觸發(fā)。而我們也不知道具體有多少數(shù)據(jù),所以就需要循環(huán)處理,直到緩沖區(qū)為空,但是recv是一個阻塞讀取,如果沒有數(shù)據(jù)時就會阻塞等待,這時候就需要將描述符的屬性設(shè)置為非阻塞,才能解決這個問題
void SetNoBlock(int fd) {int flag = fcntl(fd, F_GETFL);flag |= O_NONBLOCK;fcntl(fd, F_SETFL, flag); }- 當(dāng)epoll檢測到socket上事件就緒時, 必須立刻處理.
- 如上面的例子, 雖然只讀了1K的數(shù)據(jù), 緩沖區(qū)還剩1K的數(shù)據(jù), 在第二次調(diào)用 epoll_wait 的時候, epoll_wait 不會再返回了.
- 也就是說, ET模式下, 文件描述符上的事件就緒后, 只有一次處理機會.
- ET的性能比LT性能更高( epoll_wait 返回的次數(shù)少了很多). Nginx默認(rèn)采用ET模式使用epoll.
- 只支持非阻塞的讀寫
LT水平觸發(fā)與ET邊緣觸發(fā)
所以簡單點說,LT就是只要緩沖區(qū)中還有數(shù)據(jù),就會一直觸發(fā)事件,而ET模式下只有在新數(shù)據(jù)到來的情況下才會觸發(fā)事件。
LT模式的優(yōu)點主要在于其簡單且穩(wěn)定,不容易出現(xiàn)問題,傳統(tǒng)的select和poll都是使用這個模式。但是他也有缺點,就是因為事件觸發(fā)過多導(dǎo)致效率降低
ET最大的優(yōu)點就是減少了epoll的觸發(fā)次數(shù),但是這也帶來了巨大的代價,就是要求必須一次性將所有的數(shù)據(jù)處理完,雖然效率得到了提高,但是代碼的復(fù)雜程度大大的增加了。Nginx就是默認(rèn)采用ET模式
還有一種場景適合ET模式使用,如果我們需要接受一條數(shù)據(jù),但是這條數(shù)據(jù)因為某種問題導(dǎo)致其發(fā)送不完整,需要分批發(fā)送。所以此時的緩沖區(qū)中數(shù)據(jù)只有部分,如果此時將其取出,則會增加維護數(shù)據(jù)的開銷,正確的做法應(yīng)該是等待后續(xù)數(shù)據(jù)到達(dá)后將其補全,再一次性取出。但是如果此時使用的是LT模式,就會因為緩沖區(qū)不為空而一直觸發(fā)事件,所以這種情況下使用ET會比較好。
epoll LT模式實現(xiàn)TCP服務(wù)器
#include<poll.h> #include<vector> #include <sys/socket.h> #include"TcpSocket.hpp" #include"epoll.hpp"using namespace std;int main(int argc, char* argv[]) {if(argc != 3){ cerr << "正確輸入方式: ./epoll_lt_srv ip port\n" << endl;return -1; } string srv_ip = argv[1];uint16_t srv_port = stoi(argv[2]);TcpSocket lst_socket;//創(chuàng)建監(jiān)聽套接字CheckSafe(lst_socket.Socket());//綁定地址信息CheckSafe(lst_socket.Bind(srv_ip, srv_port));//開始監(jiān)聽CheckSafe(lst_socket.Listen());Epoll epoll;epoll.Add(lst_socket);while(1){vector<TcpSocket> vec;int ret = epoll.Wait(vec);if(ret <= 0){continue;}for(auto& socket : vec){//如果就緒的是監(jiān)聽套接字,則說明有新連接到來if(socket.GetFd() == lst_socket.GetFd()){TcpSocket new_socket;lst_socket.Accept(&new_socket);epoll.Add(new_socket);}//如果不是,則說明已連接的套接字有新數(shù)據(jù)到來else{ string data;//接收數(shù)據(jù)ret = socket.Recv(data);//斷開連接,移除監(jiān)控if(ret == false){ epoll.Del(socket);socket.Close();continue;} cout << "cli send message: " << data << endl;data.clear();if(ret == false){ epoll.Del(socket);socket.Close();continue;} }}}lst_socket.Close();return 0; }epoll ET模式實現(xiàn)TCP服務(wù)器
因為ET模式只支持非阻塞的讀寫,所以需要新增非阻塞讀以及非阻塞寫的接口,同時要對加入epoll的套接字加上EPOLLET的選項
//非阻塞發(fā)送數(shù)據(jù),因為ET模式對于讀寫的響應(yīng)只處理一次,所以需要通過輪詢的將緩沖區(qū)一次性讀取完 bool SendNoBlock(const std::string& data) {ssize_t pos = 0;ssize_t left_size = data.size();while (1){ssize_t ret = send(_socket_fd, data.data() + pos, left_size, 0);if (ret < 0){//嘗試重新寫入if (errno == EAGAIN || errno == EWOULDBLOCK){continue;}return false;}pos += ret;left_size -= ret;//如果數(shù)據(jù)發(fā)送完畢if (left_size <= 0){break;}}return true; }//非阻塞接收數(shù)據(jù) bool RecvNoBlock(std::string& data) {data.clear();char buff[4096] = { 0 };while (1){ssize_t ret = recv(_socket_fd, buff, 4096, 0);//沒有內(nèi)容if (ret < 0){//嘗試重新寫入if (errno == EAGAIN || errno == EWOULDBLOCK){continue;}return false;}//對端關(guān)閉else if (ret == 0){return false;}buff[ret] = '\0';data += buff;//如果當(dāng)前接受數(shù)據(jù)小于緩沖區(qū)長度,則說明數(shù)據(jù)全部接收完畢,反之則說明還需要多次輪詢接收if (ret < 4096){break;}}return true; } #include<poll.h> #include<vector> #include <sys/socket.h> #include"TcpSocket.hpp" #include"epoll.hpp"using namespace std;int main(int argc, char* argv[]) {if(argc != 3){ cerr << "正確輸入方式: ./epoll_et_srv ip port\n" << endl;return -1; } string srv_ip = argv[1];uint16_t srv_port = stoi(argv[2]);TcpSocket lst_socket;//創(chuàng)建監(jiān)聽套接字CheckSafe(lst_socket.Socket());//綁定地址信息CheckSafe(lst_socket.Bind(srv_ip, srv_port));//開始監(jiān)聽CheckSafe(lst_socket.Listen());lst_socket.SetNoBlock();Epoll epoll;epoll.Add(lst_socket);while(1){vector<TcpSocket> vec;int ret = epoll.Wait(vec);if(ret <= 0){continue;}for(auto& socket : vec){//如果就緒的是監(jiān)聽套接字,則說明有新連接到來if(socket.GetFd() == lst_socket.GetFd()){TcpSocket new_socket;lst_socket.Accept(&new_socket);new_socket.SetNoBlock();epoll.Add(new_socket, true);}//如果不是,則說明已連接的套接字有新數(shù)據(jù)到來else{ string data;//接收數(shù)據(jù)bool ret = socket.RecvNoBlock(data);//斷開連接,移除監(jiān)控if(!ret){ epoll.Del(socket);socket.Close();continue;} cout << "cli send message: " << data << endl;data.clear();if(ret == false){ epoll.Del(socket);socket.Close();continue;} }}}lst_socket.Close();return 0; }驚群問題
在一個執(zhí)行流中,如果添加了特別多的描述符進行監(jiān)控,則輪詢處理就會比較慢。
因此就會采取多執(zhí)行流的解決方法,在多個執(zhí)行流中創(chuàng)建epoll,每個epoll監(jiān)控一部分描述符,使壓力分?jǐn)偂5强赡芤驗闊o法確定哪些描述符即將就緒,所以就會讓每個執(zhí)行流都監(jiān)控所有描述符,誰先搶到事件則誰去處理。
所以當(dāng)多個執(zhí)行流同時在等待就緒事件時,如果某個描述符就緒,他就會喚醒全部執(zhí)行流中的epoll進行爭搶,但是此時就只會有一個執(zhí)行流搶到并執(zhí)行,而此時其他的執(zhí)行流都會因為爭搶失敗而報錯,錯誤碼EAGAIN。這就是驚群問題。
驚群問題帶來了什么壞處呢?
多線程環(huán)境下驚群問題的解決方法
這種方法其實也就是本篇博客開頭提到的一種做法。只使用一個線程進行事件的監(jiān)控,每當(dāng)有就緒事件到來時,就將這些事件轉(zhuǎn)交給其他線程去處理,這樣就避免了因為多執(zhí)行流同時使用epoll監(jiān)控而帶來的驚群問題。
多進程環(huán)境下驚群問題的解決方法
這里主要借鑒的是lighttpd和nginx的解決方法。
lighttpd的解決思路很簡單粗暴,就是直接無視這個問題,事件到來后依舊能夠喚醒多個進程來爭搶,并且只有一個能成功,其他進程爭搶失敗后的報錯EAGAIN會被捕獲,捕獲后不會處理這個錯誤,而是直接無視,就當(dāng)做沒有發(fā)生。
nginx的解決思路是其實就是加鎖與負(fù)載均衡。使用一個全局的互斥鎖,每當(dāng)有描述符就緒,就會讓每個進程都去競爭這把鎖(如果某個進程當(dāng)前連接數(shù)達(dá)到了最大連接數(shù)的7/8,也就是其負(fù)載均衡點,此時這個進程就不會再去爭搶所資源,而是將負(fù)載均衡到其他進程上),如果成功競爭到了鎖,則將描述符加入進自己的wait集合中,而對于沒有競爭到鎖的進程,則將其從自己的wait集合中移除,這樣就保證了不會讓多個進程同一時間進行監(jiān)控,而是讓每個進程都通過競爭鎖的方式輪流進行監(jiān)控,這樣保證了同一時間只會有一個進程進行監(jiān)控,所以驚群問題也得到了解決。
參考資料
高并發(fā)網(wǎng)絡(luò)編程之epoll詳解
epoll詳解
Linux驚群效應(yīng)詳解
epoll的驚群效應(yīng)
Apache與Nginx網(wǎng)絡(luò)模型
[框架]高并發(fā)中的驚群效應(yīng)
Nginx如何解決“驚群”現(xiàn)象
總結(jié)
以上是生活随笔為你收集整理的Linux网络编程 | 多路复用I/O :select、poll、epoll、水平触发与边缘触发、惊群问题的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: C++ 类型转换 :C语言的类型转换、C
- 下一篇: Linux下守护进程(daemon)的实