TCP服务器epoll的多种实现
TCP服務器epoll的多種實現
對于網絡IO會涉及到兩個系統對象
比如發生read操作時就會經歷兩個階段
由于各個階段多有不同的情況,一組合么就產生了多種網絡 IO 模型
阻塞IO
在Linux中默認所有socket都是blocking的,一個典型的讀流程
當應用進程調用read這個系統調用,如果數據沒有到達,或者收到的數據包還不完整就會阻塞read調用,等待足夠的數據到達
Kernel準備好數據,他就會將數據從Kernel中拷貝到用戶內存,Kernel返回結果,解除block狀態,重新運行起來
于是就有了下面這種服務結構
代碼實現一個簡單的反射服務器:
#include <iostream> #include <sys/types.h> #include <sys/socket.h> #include <arpa/inet.h> #include <unistd.h> #include <cstring> #pragma clang diagnostic push #pragma ide diagnostic ignored "EndlessLoop" using std::cout; using std::endl; int main(int argc,char * argv[]) {//1.create socketint listenfd = socket(AF_INET,SOCK_STREAM,0);if(listenfd == -1){cout<<"create listenfd failed"<<endl;return -1;}//2.Initialize server addressstruct sockaddr_in bindaddr{};bindaddr.sin_family =AF_INET;bindaddr.sin_addr.s_addr = htonl(INADDR_ANY);bindaddr.sin_port= htons(3000);if (bind(listenfd,(struct sockaddr*) &bindaddr, sizeof(bindaddr)) == -1){cout<<"bind listen socket failed!"<<endl;return -1;}//3.Start listeningif(listen(listenfd,SOMAXCONN) == -1){cout<<"listen error"<<endl;return -1;}while (true){sockaddr_in clientaddr{};socklen_t clientaddrlen = sizeof(clientaddr);//4.accept client connectint clientfd = accept(listenfd,(struct sockaddr*)&clientaddr,&clientaddrlen);if (clientfd != -1){//5.Receive data from the clientchar recvBuf[32]={0};int ret = recv(clientfd,recvBuf,32,0);if (ret > 0){cout<<"Receive data from the client:"<<recvBuf<<endl;ret = send(clientfd,recvBuf, strlen(recvBuf),0);if(ret != strlen(recvBuf))cout<<"send failed"<<endl;elsecout<<"send successfully"<<endl;}else{cout<<"Receive data error"<<endl;}close(clientfd);}}//7.close listenclose(listenfd);return 0; } #pragma clang diagnostic pop但這樣的架構有巨大的缺陷:
- 因為所有IO都是阻塞的,這就造成send 過程中線程將被阻塞,會浪費大量的CPU時間,效率極低
非阻塞IO
在Linux下,我們可以主動將socket設置為非阻塞,這時流程就會編程下面這樣
| 大于0 | 接收到的字節數 |
| 等于0 | 連接正常斷開 |
| 等于-1,error等于EAGAIN | 表示recv操作還沒有完成 |
| 等于-1,error不等于EAGAIN | 表示recv操作遇到系統錯誤 |
使用如下函數將socket設置為非阻塞狀態
fcntl( fd, F_SETFL, O_NONBLOCK );于是我們可以實現如下模型
可以看到服務器線程可以通過循環調用 recv()接口,可以在單個線程內實現對所有連接的數據接收工作。但是上述模型絕不被推薦。因為,循環調用 recv()將大幅度推高 CPU 占用率;此外,在這個方案中 recv()更多的是起到檢測“操作是否完成”的作用,實際操作系統提供了更為高效的檢測“操作是否完成“作用的接口,例如 select()多路復用模式, 可以一次檢測多個連接是否活躍
多路復用IO (IO multiplexing)
采用Linux中的select或者poll
下面我們以select舉例
select函數用于檢測一組socket中是否有事件就緒.這里的事件為以下三類:
- 在socket內核中,接收緩沖區中的字節數大于或者等于低水位標記SO_RCVLOWAT,此時調用rec或read函數可以無阻塞的讀取該文件描述符,并且返回值大于零
- TCP連接的對端關閉連接,此時本端調用rrecv或read函數對socket進行讀操作,recv或read函數返回0
- 在監聽的socket上有新的連接請求
- 在socket尚有未處理的錯誤
- 在socket內核中,發送緩沖區中的可用字節數大于等于低水位標記時,可以無阻塞的寫,并且返回值大于0
- socket的寫操作被關閉時,對一個寫操作被關閉的socket進行寫操作,會觸發SIGPIPE信號
- socket使用非阻塞connect連接成功或失敗時
select()如下:
#include <sys/select.h> int select(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset,struct timeval *timeout);參數說明
| readfds: | 需要監聽可讀事件的fd集合 |
| writefds: | 需要監聽可寫事件fd的集合 |
| exceptfds: | 需要監聽異常事件的fd集合 |
| timeout: | 超時時間,即在這個參數設定的時間內檢測這些fd的事件,超過這個時間后,select函數立即返回,這是一個timeval結構體 |
其定義如下:
struct timeval{ long tv_sec; /*秒 */long tv_usec; /*微秒 */ }參數readfds,writefds,exceptfds的類型都是fd_set,這是一個結構體信息
定義如下
//#define __FD_SETSIZE 1024 #define __NFDBITS (8 * (int) sizeof (__fd_mask)) #define __FD_ELT(d) ((d) / __NFDBITS) #define __FD_MASK(d) ((__fd_mask) (1UL << ((d) % __NFDBITS)))/* fd_set for select and pselect. */ typedef struct{/* XPG4.2 requires this member name. Otherwise avoid the namefrom the global namespace. */ #ifdef __USE_XOPEN//typedef long int __fd_mask;__fd_mask fds_bits[__FD_SETSIZE / __NFDBITS]; # define __FDS_BITS(set) ((set)->fds_bits) #endif} fd_set;/* 最大數量`fd_set'. */ #define FD_SETSIZE __FD_SETSIZE假設未定義__USE_XOPEN整理一年
typedef struct{ //typedef long int __fd_mask;long int fds_bits[__FD_SETSIZE / __NFDBITS];} fd_set;將一個fd添加到fd_set這個集合中時需要使用FD_SET宏,其定義如下:
void FD_SET(fd, fdsetp)實現如下:
#define FD_SET(fd, fdsetp) __FD_SET (fd, fdsetp)__FD_SET (fd, fdsetp)實現如下:
/* We don't use `memset' because this would require a prototype and the array isn't too big. */# define __FD_ZERO(set) \ do { \ unsigned int __i; \ fd_set *__arr = (set); \ for (__i = 0; __i < sizeof (fd_set) / sizeof (__fd_mask); ++__i) \ __FDS_BITS (__arr)[__i] = 0; \ } while (0)#endif /* GNU CC */#define __FD_SET(d, set) \ ((void) (__FDS_BITS (set)[__FD_ELT (d)] |= __FD_MASK (d)))舉個例子,假設現在fd的值為43,那么在數組下表為0的元素中第43個bit被置為1
再Linux上,向fd_set集合中添加新的fd時,采用位圖法確定位置;在windows中添加fd至fd_set的實現規則依次從數組第0個位置開始向后遞增
也就是說,FD_SET宏本質上是在一個有1024個連續bit的數組的第fd位置置1.
同理,FD_CLR刪除一個fd的原理,也就是將數組的第fd位置置為0
實例;
#include <sys/types.h>#include <sys/socket.h>#include <arpa/inet.h>#include <unistd.h>#include <iostream>#include <cstring>#include <sys/time.h>#include <vector>#include <cerrno>//Customize the value representing invalid fd#pragma clang diagnostic push#pragma ide diagnostic ignored "EndlessLoop"#define INVALID_FD -1int main(int argc,char * argv[]){ //create a listen socket int listenfd = socket(AF_INET,SOCK_STREAM,0); if(listenfd == INVALID_FD) { printf("創建監聽socket失敗"); return -1; } //init server addr sockaddr_in bindaddr{}; bindaddr.sin_family = AF_INET; bindaddr.sin_addr.s_addr = htonl(INADDR_ANY); bindaddr.sin_port= htons(3000); if(bind(listenfd,(struct sockaddr*) &bindaddr, sizeof(bindaddr)) == -1) { printf("綁定socket失敗"); close(listenfd); return -1; } //start listen if(listen(listenfd,SOMAXCONN) == -1) { printf("監聽失敗!"); close(listenfd); return -1; } //Store the client's socket data std::vector<int> clientfds; int maxfd; while(true) { fd_set readset; FD_ZERO(&readset); FD_SET(listenfd,&readset); maxfd = listenfd; unsigned long clientfdslength = clientfds.size(); for (int i = 0; i < clientfdslength; ++i) { if(clientfds[i] != INVALID_FD) { FD_SET(clientfds[i],&readset); if(maxfd<clientfds[i]) maxfd = clientfds[i]; } } timeval tm{}; tm.tv_sec = 1; tm.tv_usec =0; int ret = select(maxfd+1,&readset, nullptr, nullptr,&tm); if(ret == -1) { if (errno != EINTR) break; } //time out else if (ret ==0 ) { continue; } else { //event detected on a socket if (FD_ISSET(listenfd,&readset)) { sockaddr_in clientaddr{}; socklen_t clientaddrlen = sizeof(clientaddr); //accept client connection int clientfd = accept(listenfd,(struct sockaddr *)&clientaddr,&clientaddrlen); if (clientfd == INVALID_FD) { break; } std::cout<<"接受到客戶端連接,fd:"<<clientfd<<std::endl; clientfds.push_back(clientfd); } else { //Assume that the data length sent by the client is not greater than 63 char recvbuf[64]; unsigned long clientfdslength = clientfds.size(); for (int i = 0; i < clientfdslength; ++i) { if(clientfds[i] != INVALID_FD && FD_ISSET(clientfds[i],&readset)) { memset(recvbuf,0, sizeof(recvbuf)); //accept data int length = recv(clientfds[i],recvbuf,64,0); //recv的返回值等于0,表示客戶端關閉了連接 if (length <=0 ) { //error std::cout<<"error"<<clientfds[i]<<std::endl; close(clientfds[i]); clientfds[i] == INVALID_FD; continue; } std::cout<<"clientfd: "<<clientfds[i]<<", recv data:"<<recvbuf<<std::endl; } } } } } //close all client socket int clientfdslength = clientfds.size(); for (int i = 0; i < clientfdslength; ++i) { if(clientfds[i] != INVALID_FD) { close(clientfds[i]); } } //close socket close(listenfd); return 0;}#pragma clang diagnostic pop使用nc -v 127.0.0.1 3000來模擬客戶端,打開三個終端
關于以上代碼,需要注意以下幾點:
select函數在調用前后可能會修改readfds,writefds,exceptfds所以想在下次調用select函數時服用這些fd_set變量需要重新清零,添加內容
for (int i = 0; i < clientfdslength; ++i) { if(clientfds[i] != INVALID_FD) { FD_SET(clientfds[i],&readset); if(maxfd<clientfds[i]) maxfd = clientfds[i]; } }select函數也會修改timeval結構體的值,如果想復用這些變量,需要重新設置
timeval tm{}; tm.tv_sec = 1; tm.tv_usec =0;如果將select的timeval參數設置為NULL,則select函數會一直阻塞下去
總結
以上是生活随笔為你收集整理的TCP服务器epoll的多种实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: matlab画半球面,Matlab 绘制
- 下一篇: 咸鱼笔记:《实用软件工程》第一、二章课后