Nginx之进程间的通信机制-Channel
Nginx主要使用了其中的三種方式:
- 匿名套接字對
- 共享內(nèi)存
- 信號
1. Nginx 頻道
ngx_channel_t 頻道是 Nginx master 進程與 worker 進程之間通信的常用工具,它是使用 匿名套接字對 實現(xiàn)的,即 socketpair 方法,它用于創(chuàng)建父子進程間使用的套接字。
#include <sys/types.h> /* See NOTES */ #include <sys/socket.h>int socketpair(int domain, int type, int protocol, int sv[2]);這個方法可以創(chuàng)建一對關聯(lián)的套接字 sv[2]。
- domain:表示域,在 Linux 下通常取值為 AF_UNIX;
- type:取值為 SOCK_STREAM 或 SOCK_DGRAM,它表示在套接字上使用的是 TCP 還是 UDP;
- protocol:必須傳遞 0;
- sv[2]:是一個含有兩個元素的整型數(shù)組,實際上就是兩個套接字。
- 當 socketpair 返回 0 時,sv[2] 這兩個套接字創(chuàng)建成功,否則 sockpair 返回 -1 表示失敗.
當 socketpair 執(zhí)行成功時,sv[2] 這兩個套接字具備下列關系:
- 向 sv[0] 套接字寫入數(shù)據(jù),將可以從 sv[1] 套接字中讀取到剛寫入的數(shù)據(jù);
- 同樣,向 sv[1] 套接字寫入數(shù)據(jù),也可以從 sv[0] 中讀取到寫入的數(shù)據(jù)。
- 通常,在父、子進程通信前,會先調(diào)用 socketpair 方法創(chuàng)建這樣一組套接字,在調(diào)用 fork 方法創(chuàng)建出子進程后,將會在父進程中關閉 sv[1] 套接字,僅使用 sv[0] 套接字用于向子進程發(fā)送數(shù)據(jù)以及接收子進程發(fā)送來的數(shù)據(jù);
- 而在子進程中則關閉 sv[0] 套接字,僅使用 sv[1] 套接字既可以接收父進程發(fā)送來的數(shù)據(jù),也可以向父進程發(fā)送數(shù)據(jù)。
ngx_channel_t 結構體是 Nginx 定義的 master 父進程與 worker 子進程間的消息格式,如下:
typedef struct {// 傳遞的 TCP 消息中的命令ngx_uint_t command;// 進程 ID,一般是發(fā)送命令方的進程 IDngx_pid_t pid;// 表示發(fā)送命令方在 ngx_processes 進程數(shù)組間的序號ngx_int_t slot;// 通信的套接字句柄ngx_fd_t fd; }ngx_channel_t;Nginx 針對 command 成員定義了如下命令:
// 打開頻道,使用頻道這種方式通信前必須發(fā)送的命令 #define NGX_CMD_OPEN_CHANNEL 1// 關閉已經(jīng)打開的頻道,實際上也就是關閉套接字 #define NGX_CMD_CLOSE_CHANNEL 2// 要求接收方正常地退出進程 #define NGX_CMD_QUIT 3// 要求接收方強制地結束進程 #define NGX_CMD_TERMINATE 4// 要求接收方重新打開進程已經(jīng)打開過的文件 #define NGX_CMD_REOPEN 5問:master 是如何啟動、停止 worker 子進程的?
答:正是通過 socketpair 產(chǎn)生的套接字發(fā)送命令的,即每次要派生一個子進程之前,都會先調(diào)用 socketpair 方法。
在 Nginx 派生子進程的 ngx_spawn_process 方法中,會首先派生基于 TCP 的套接字,如下:
ngx_pid_t ngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data, char *name, ngx_int_t respawn) {if (respawn != NGX_PROCESS_DETACHED) {/* Solaris 9 still has no AF_LOCAL */// ngx_processes[s].channel 數(shù)組正是將要用于父、子進程間通信的套接字對if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1){return NGX_INVALID_PID;}// 將 channel 套接字對都設置為非阻塞模式if (ngx_nonblocking(ngx_processes[s].channel[0]) == -1) {ngx_close_channel(ngx_processes[s].channel, cycle->log);return NGX_INVALID_PID;}if (ngx_nonblocking(ngx_processes[s].channel[1]) == -1) {ngx_close_channel(ngx_processes[s].channel, cycle->log);return NGX_INVALID_PID;}... }ngx_processes 數(shù)組定義了 Nginx 服務中所有的進程,包括 master 進程和 worker 進程,如下:
#define NGX_MAX_PROCESSES 1024// 雖然定義了 NGX_MAX_PROCESSES 個成員,但是已經(jīng)使用的元素僅與啟動的進程個數(shù)有關 ngx_processes_t ngx_processes[NGX_MAX_PROCESSES];ngx_processes 數(shù)組的類型是 ngx_processes_t,對于頻道來說,這個結構體只關心它的 channel 成員:
typedef struct {...// socketpair 創(chuàng)建的套接字對ngx_socket_t channel[2]; }ngx_processes_t;1. ngx_write_channel:使用頻道發(fā)送 ngx_channel_t 消息
ngx_int_t ngx_write_channel(ngx_socket_t s, ngx_channel_t *ch, size_t size,ngx_log_t *log) {ssize_t n;ngx_err_t err;struct iovec iov[1];struct msghdr msg;#if (NGX_HAVE_MSGHDR_MSG_CONTROL)union {struct cmsghdr cm;char space[CMSG_SPACE(sizeof(int))];}cmsg;if (ch->fd == -1) {msg.msg_control = NULL;msg.msg_controllen = 0;} else {// 輔助數(shù)據(jù)msg.msg_control = (caddr_t)&cmsg;msg.msg_controllen = sizeof(cmsg);ngx_memzero(&cmsg, sizeof(cmsg));cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));cmsg.cm.cmsg_level = SOL_SOCKET;cmsg.cm.cmsg_type = SCM_RIGHTS;/** We have to use ngx_memcpy() instead of simple* *(int *) CMSG_DATA(&cmsg.cm) = ch->fd;* because some gcc 4.4 with -O2/3/s optimization issues the warning:* dereferencing type-punned pointer will break strict-aliasing rules** Fortunately, gcc with -O1 compiles this ngx_memcpy()* in the same simple assignment as in the code above*/ngx_memcpy(CMSG_DATA(&cmsg.cm), &ch->fd, sizeof(int));}msg.msg_flags = 0;#elseif (ch->fd == -1) {msg.msg_accrights = NULL;msg.msg_accrightslen = 0;} else {msg.msg_accrights = (caddr_t) &ch->fd;msg.msg_accrightslen = sizeof(int);}#endif// 指向要發(fā)送的 ch 起始地址iov[0].iov_base = (char *) ch;iov[0].iov_len = size;// msg_name 和 msg_namelen 僅用于未連接套接字(如UDP)msg.msg_name = NULL;msg.msg_namelen = 0;msg.msg_iov = iov;msg.msg_iovlen = 1;// 將該 ngx_channel_t 消息發(fā)出去n = sendmsg(s, &msg, 0);if (n == -1) {err = ngx_errno;if (err == NGX_EAGAIN) {return NGX_AGAIN;}return NGX_ERROR;}return NGX_OK; }2. ngx_read_channel: 讀取消息
ngx_int_t ngx_read_channel(ngx_socket_t s, ngx_channel_t *ch, size_t size, ngx_log_t *log) {ssize_t n;ngx_err_t err;struct iovec iov[1];struct msghdr msg;#if (NGX_HAVE_MSGHDR_MSG_CONTROL)union {struct cmsghdr cm;char space[CMSG_SPACE(sizeof(int))];} cmsg; #elseint fd; #endifiov[0].iov_base = (char *)ch;iov[0].iov_len = size;msg.msg_name = NULL;msg.msg_namelen = 0;msg.msg_iov = iov;msg.msg_iovlen = 1;#if (NGX_HAVE_MSGHDR_MSG_CONTROL)msg.msg_control = (caddr_t) &cmsg;msg.msg_controllen = sizeof(cmsg); #elsemsg.msg_accrights = (caddr_t) &fd;msg.msg_accrightslen = sizeof(int); #endif// 接收命令n = recvmsg(s, &msg, 0);if (n == -1) {err = ngx_errno;if (err == NGX_EAGAIN) {return NGX_AGAIN;}return NGX_ERROR;}if (n == 0) {return NGX_ERROR;}// 接收的數(shù)據(jù)不足if ((size_t) n < sizeof(ngx_channel_t)) {return NGX_ERROR;}#if (NGX_HAVE_MSGHDR_MSG_CONTROL)// 若接收到的命令為"打開頻道,使用頻道這種方式通信前必須發(fā)送的命令"if (ch->command == NGX_CMD_OPEN_CHANNEL) {if (cmsg.cm.cmsg_len < (socklen_t) CMSG_LEN(sizeof(int))) {return NGX_ERROR;}if (cmsg.cm.cmsg_level != SOL_SOCKET || cmsg.cm.cmsg_type != SCM_RIGHTS){return NGX_ERROR;}/* ch->fd = *(int *) CMSG_DATA(&cmsg.cm); */ngx_memcpy(&ch->fd, CMSG_DATA(&cmsg.cm), sizeof(int));}// 若接收到的消息是被截斷的if (msg.msg_flags & (MSG_TRUNC|MSG_CTRUNC)) {ngx_log_error(NGX_LOG_ALERT, log, 0,"recvmsg() truncated data");}#elseif (ch->command == NGX_CMD_OPEN_CHANNEL) {if (msg.msg_accrightslen != sizeof(int)) {return NGX_ERROR;}ch->fd = fd;} #endifreturn n; }在 Nginx 中,目前僅存在 master 進程向 worker 進程發(fā)送消息的場景,這時對于 socketpair 方法創(chuàng)建的 channel[2] 套接字來說,master 進程會使用 channel[0] 套接字來發(fā)送消息,而 worker 進程會使用 channel[1] 套接字來接收消息。
需要C/C++ Linux服務器架構師學習資料加群973961276獲取(資料包括C/C++,Linux,golang技術,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒體,CDN,P2P,K8S,Docker,TCP/IP,協(xié)程,DPDK,ffmpeg等),免費分享3. ngx_add_channel_event: 把接收頻道消息的套接字添加到 epoll 中
worker 進程調(diào)度 ngx_read_channel 方法接收頻道消息是通過該 ngx_add_channel_event 函數(shù)將接收頻道消息的套接字(對于 worker 即為channel[1])添加到 epoll 中,當接收到父進程消息時子進程會通過 epoll 的事件回調(diào)相應的 handler 方法來處理這個頻道消息,如下:
ngx_int_t ngx_add_channel_event(ngx_cycle_t *cycle, ngx_fd_t fd, ngx_int_t event, ngx_event_handler_pt handler) {ngx_event_t *ev, *rev, *wev;ngx_connection_t *c;// 獲取一個空閑連接c = ngx_get_connection(fd, cycle->log);if (c == NULL) {return NGX_ERROR;}c->pool = cycle->pool;rev = c->read;wev = c->write;rev->log = cycle->log;wev->log = cycle->log;rev->channel = 1;wev->channel = 1;ev = (event == NGX_READ_EVENT) ? rev : wev;// 初始化監(jiān)聽該 ev 事件時調(diào)用的回調(diào)函數(shù)ev->handler = handler;// 將該接收頻道消息的套接字添加到 epoll 中if (ngx_add_conn && (ngx_event_flags && NGX_USE_EPOLL_EVENT) == 0) {// 這里是同時監(jiān)聽該套接字的讀、寫事件if (ngx_add_conn(c) == NGX_ERROR) {ngx_free_connection(c);return NGX_ERROR;}} else {// 這里是僅監(jiān)聽 ev 事件if (ngx_add_event(ev, event, 0) == NGX_ERROR) {ngx_free_connection(c);return NGX_ERROR;}}return NGX_OK; }4. ngx_close_channel: 關閉這個頻道通信方式
void ngx_close_channel(ngx_fd_t *fd, ngx_log_t *log) {if (close(fd[0]) == -1) {}if (close(fd[1]) == -1) {} }總結
以上是生活随笔為你收集整理的Nginx之进程间的通信机制-Channel的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: nginx的函数调用
- 下一篇: TomcatNginx源码笔记分析