Redis运行流程源码解析
原文作者:@凡趣科技 pesiwang
原文地址:http://blog.nosqlfan.com/html/4007.html
本文分析源碼基于 Redis 2.4.7 stable 版本。
?
概述
Redis通過定義一個 struct redisServer 類型的全局變量server 來保存服務器的相關信息(比如:配置信息,統計信息,服務器狀態等等)。啟動時通過讀取配置文件里邊的信息對server進行初始化(如果沒有指定配置文件,將使用默認值對sever進行初始化),初始化的內容有:起監聽端口,綁定有新連接時的回調函數,綁定服務器的定時函數,虛擬內存初始化,log初始化等等。
啟動
初始化服務器配置
先來看看redis 的main函數的入口
Redis.c:1694
int main(int argc, char **argv) {time_t start;initServerConfig();if (argc == 2) {if (strcmp(argv[1], "-v") == 0 ||strcmp(argv[1], "--version") == 0) version();if (strcmp(argv[1], "--help") == 0) usage();resetServerSaveParams();loadServerConfig(argv[1]);} else if ((argc > 2)) {usage();} else {...}if (server.daemonize) daemonize();initServer();...
- initServerConfig初始化全局變量 server 的屬性為默認值。
- 如果命令行指定了配置文件, resetServerSaveParams重置對落地備份的配置(即重置為默認值)并讀取配置文件的內容對全局變量 server 再進行初始化 ,沒有在配置文件中配置的將使用默認值。
- 如果服務器配置成后臺執行,則對服務器進行 daemonize。
- initServer初始化服務器,主要是設置信號處理函數,初始化事件輪詢,起監聽端口,綁定有新連接時的回調函數,綁定服務器的定時函數,初始化虛擬內存和log等等。
- 創建服務器監聽端口。
Redis.c:923
if (server.port != 0) {server.ipfd= anetTcpServer(server.neterr,server.port,server.bindaddr);if (server.ipfd == ANET_ERR) {redisLog(REDIS_WARNING, "Opening port %d: %s",server.port, server.neterr);exit(1);}}
- anetTcpServer創建一個socket并進行監聽,然后把返回的socket fd賦值給server.ipfd。
事件輪詢結構體定義
先看看事件輪詢的結構體定義
Ae.h:88
/* State of an event based program */
typedef struct aeEventLoop {int maxfd;long long timeEventNextId;aeFileEvent events[AE_SETSIZE]; /* Registered events */aeFiredEvent fired[AE_SETSIZE]; /* Fired events */aeTimeEvent *timeEventHead;int stop;void *apidata; /* This is used for polling API specific data */aeBeforeSleepProc *beforesleep;
} aeEventLoop;
- maxfd是最大的文件描述符,主要用來判斷是否有文件事件需要處理(ae.c:293)和當使用select 來處理網絡IO時作為select的參數(ae_select.c:50)。
- timeEventNextId 是下一個定時事件的ID。
- events[AE_SETSIZE]用于保存通過aeCreateFileEvent函數創建的文件事件,在sendReplyToClient函數和freeClient函數中通過調用aeDeleteFileEvent函數刪除已經處理完的事件。
- fired[AE_SETSIZE]用于保存已經觸發的文件事件,在對應的網絡I/O函數中進行賦值(epoll,select,kqueue),不會對fired進行刪除操作,只會一直覆蓋原來的值。然后在aeProcessEvents函數中對已經觸發的事件進行處理。
- timeEventHead 是定時事件鏈表的頭,定時事件的存儲用鏈表實現。
- Stop 用于停止事件輪詢處理。
- apidata 用于保存輪詢api需要的數據,即aeApiState結構體,對于epoll來說,aeApiState結構體的定義如下:
typedef struct aeApiState {int epfd;struct epoll_event events[AE_SETSIZE];
} aeApiState;
- beforesleep 是每次進入處理事件時執行的函數。
創建事件輪詢
Redis.c:920
server.el = aeCreateEventLoop();
Ae.c:55
aeEventLoop *aeCreateEventLoop(void) {aeEventLoop *eventLoop;int i;eventLoop = zmalloc(sizeof(*eventLoop));if (!eventLoop) return NULL;eventLoop->timeEventHead = NULL;eventLoop->timeEventNextId = 0;eventLoop->stop = 0;eventLoop->maxfd = -1;eventLoop->beforesleep = NULL;if (aeApiCreate(eventLoop) == -1) {zfree(eventLoop);return NULL;}
/* Events with mask == AE_NONE are not set. So let's initialize* the vector with it. */for (i = 0; i < AE_SETSIZE; i++)eventLoop->events[i].mask = AE_NONE;return eventLoop;
}
綁定定時函數和有新連接時的回調函數
redis.c:973
aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
if (server.ipfd > 0 &&aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR) oom("creating file event");
- aeCreateTimeEvent創建定時事件并綁定回調函數serverCron,這個定時事件第一次是超過1毫秒就有權限執行,如果其他事件的處理時間比較長,可能會出現超過一定時間都沒執行情況。這里的1毫秒只是超過后有可執行的權限,并不是一定會執行。第一次執行后,如果還要執行,是由定時函數的返回值確定的,在processTimeEvents(ae.c:219)中,當調用定時回調函數后,獲取定時回調函數的返回值,如果返回值不等于-1,則設置定時回調函數的下一次觸發時間為當前時間加上定時回調函數的返回值,即調用間隔時間。serverCron的返回值是100ms,表明從二次開始,每超過100ms就有權限執行。(定時回調函數serverCron用于更新lru時鐘,更新服務器的狀態,打印一些服務器信息,符合條件的情況下對hash表進行重哈希,啟動后端寫AOF或者檢查后端寫AOF或者備份是否完成,檢查過期的KEY等等)
- aeCreateFileEvent創建監聽端口的socket fd的文件讀事件(即注冊網絡io事件)并綁定回調函數acceptTcpHandler。
進入事件輪詢
初始化后將進入事件輪詢
Redis.c:1733
aeSetBeforeSleepProc(server.el,beforeSleep);aeMain(server.el);aeDeleteEventLoop(server.el);
- 設置每次進入事件處理前會執行的函數beforeSleep。
- 進入事件輪詢aeMain。
- 退出事件輪詢后刪除事件輪詢,釋放事件輪詢占用內存aeDeleteEventLoop(不過沒在代碼中發現有執行到這一步的可能,服務器接到shutdown命令時通過一些處理后直接就通過exit退出了,可能是我看錯了,待驗證)。
事件輪詢函數aeMain
看看aeMain的內容
Ae.c:382
void aeMain(aeEventLoop *eventLoop) {eventLoop->stop = 0;while (!eventLoop->stop) {if (eventLoop->beforesleep != NULL)eventLoop->beforesleep(eventLoop);aeProcessEvents(eventLoop, AE_ALL_EVENTS);}
}
- 每次進入事件處理前,都會調用設置的beforesleep,beforeSleep函數主要是處理被阻塞的命令和根據配置寫AOF。
- aeProcessEvents處理定時事件和網絡io事件。
啟動完畢,等待客戶端請求
到進入事件輪詢函數后,redis的啟動工作就做完了,接下來就是等待客戶端的請求了。
接收請求
新連接到來時的回調函數
在綁定定時函數和有新連接時的回調函數中說到了綁定有新連接來時的回調函數acceptTcpHandler,現在來看看這個函數的具體內容
Networking.c:427
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {int cport, cfd;char cip[128];REDIS_NOTUSED(el);REDIS_NOTUSED(mask);REDIS_NOTUSED(privdata);cfd = anetTcpAccept(server.neterr, fd, cip, &cport);if (cfd == AE_ERR) {redisLog(REDIS_WARNING,"Accepting client connection: %s", server.neterr);return;}redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);acceptCommonHandler(cfd);
}
- anetTcpAccept 函數 accept新連接,返回的cfd是新連接的socket fd。
- acceptCommonHandler 函數是對新建立的連接進行處理,這個函數在使用 unix socket 時也會被用到。
接收客戶端的新連接
接下來看看anetTcpAccept函數的具體內容
Anet.c:330
int anetTcpAccept(char *err, int s, char *ip, int *port) {int fd;struct sockaddr_in sa;socklen_t salen = sizeof(sa);if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == ANET_ERR)return ANET_ERR;if (ip) strcpy(ip,inet_ntoa(sa.sin_addr));if (port) *port = ntohs(sa.sin_port);return fd;
}
再進去anetGenericAccept 看看
Anet.c:313
static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {int fd;while(1) {fd = accept(s,sa,len);if (fd == -1) {if (errno == EINTR)continue;else {anetSetError(err, "accept: %s", strerror(errno));return ANET_ERR;}}break;}return fd;
}
- anetTcpAccept 函數中調用anetGenericAccept 函數進行接收新連接,anetGenericAccept函數在 unix socket 的新連接處理中也會用到。
- anetTcpAccept 函數接收新連接后,獲取客戶端得ip,port 并返回。
創建redisClient進行接收處理
anetTcpAccept 運行完后,返回新連接的socket fd, 然后返回到調用函數acceptTcpHandler中,繼續執行acceptCommonHandler 函數
Networking.c:403
static void acceptCommonHandler(int fd) {redisClient *c;if ((c = createClient(fd)) == NULL) {redisLog(REDIS_WARNING,"Error allocating resoures for the client");close(fd); /* May be already closed, just ingore errors */return;}/* If maxclient directive is set and this is one client more... close the* connection. Note that we create the client instead to check before* for this condition, since now the socket is already set in nonblocking* mode and we can send an error for free using the Kernel I/O */if (server.maxclients && listLength(server.clients) > server.maxclients) {char *err = "-ERR max number of clients reached\r\n";/* That's a best effort error message, don't check write errors */if (write(c->fd,err,strlen(err)) == -1) {/* Nothing to do, Just to avoid the warning... */}freeClient(c);return;}server.stat_numconnections++;
}
- 創建一個 redisClient 來處理新連接,每個連接都會創建一個 redisClient 來處理。
- 如果配置了最大并發客戶端,則對現有的連接數進行檢查和處理。
- 最后統計連接數。
綁定有數據可讀時的回調函數
Networking.c:15
redisClient *createClient(int fd) {redisClient *c = zmalloc(sizeof(redisClient));c->bufpos = 0;anetNonBlock(NULL,fd);anetTcpNoDelay(NULL,fd);if (aeCreateFileEvent(server.el,fd,AE_READABLE,readQueryFromClient, c) == AE_ERR){close(fd);zfree(c);return NULL;}selectDb(c,0);c->fd = fd;c->querybuf = sdsempty();
c->reqtype = 0;
...
}
- 創建新連接的socket fd對應的文件讀事件,綁定回調函數readQueryFromClient。
- 如果創建成功,則對 redisClient 進行一系列的初始化,因為 redisClient 是通用的,即不管是什么命令的請求,都是通過創建一個 redisClient 來處理的,所以會有比較多的字段需要初始化。
createClient 函數執行完后返回到調用處acceptCommonHandler函數,然后從acceptCommonHandler函數再返回到acceptTcpHandler函數。
接收請求完畢,準備接收客戶端得數據
到此為止,新連接到來時的回調函數acceptTcpHandler執行完畢,在這個回調函數中創建了一個redisClient來處理這個客戶端接下來的請求,并綁定了接收的新連接的讀文件事件。當有數據可讀時,網絡i/o輪詢(比如epoll)會有事件觸發,此時綁定的回調函數readQueryFromClient將會調用來處理客戶端發送過來的數據。
讀取客戶端請求的數據
在綁定有數據可讀時的回調函數中的createClient函數中綁定了一個有數據可讀時的回調函數readQueryFromClient函數,現在看看這個函數的具體內容
Networking.c:874
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {redisClient *c = (redisClient*) privdata;char buf[REDIS_IOBUF_LEN];int nread;REDIS_NOTUSED(el);REDIS_NOTUSED(mask);server.current_client = c;nread = read(fd, buf, REDIS_IOBUF_LEN);if (nread == -1) {if (errno == EAGAIN) {nread = 0;} else {redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));freeClient(c);return;}} else if (nread == 0) {redisLog(REDIS_VERBOSE, "Client closed connection");freeClient(c);return;}if (nread) {c->querybuf = sdscatlen(c->querybuf,buf,nread);c->lastinteraction = time(NULL);} else {server.current_client = NULL;return;}if (sdslen(c->querybuf) > server.client_max_querybuf_len) {sds ci = getClientInfoString(c), bytes = sdsempty();bytes = sdscatrepr(bytes,c->querybuf,64);redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);sdsfree(ci);sdsfree(bytes);freeClient(c);return;}processInputBuffer(c);server.current_client = NULL;
}
- 調用系統函數read來讀取客戶端傳送過來的數據,調用read后對讀取過程中被系統中斷的情況(nread == -1 && errno == EAGAIN),客戶端關閉的情況(nread == 0)進行了判斷處理。
- 如果讀取的數據超過限制(1GB)則報錯。
- 讀取完后進入processInputBuffer進行協議解析。
請求協議
從readQueryFromClient函數讀取客戶端傳過來的數據,進入processInputBuffer函數進行協議解析,可以把processInputBuffer函數看作是輸入數據的協議解析器
Networking.c:835
void processInputBuffer(redisClient *c) {/* Keep processing while there is something in the input buffer */while(sdslen(c->querybuf)) {/* Immediately abort if the client is in the middle of something. */if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return;/* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is* written to the client. Make sure to not let the reply grow after* this flag has been set (i.e. don't process more commands). */if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;/* Determine request type when unknown. */if (!c->reqtype) {if (c->querybuf[0] == '*') {c->reqtype = REDIS_REQ_MULTIBULK;} else {c->reqtype = REDIS_REQ_INLINE;}}if (c->reqtype == REDIS_REQ_INLINE) {if (processInlineBuffer(c) != REDIS_OK) break;} else if (c->reqtype == REDIS_REQ_MULTIBULK) {if (processMultibulkBuffer(c) != REDIS_OK) break;} else {redisPanic("Unknown request type");}/* Multibulk processing could see a <= 0 length. */if (c->argc == 0) {resetClient(c);} else {/* Only reset the client when the command was executed. */if (processCommand(c) == REDIS_OK)resetClient(c);}}
}
- Redis支持兩種協議,一種是inline,一種是multibulk。inline協議是老協議,現在一般只在命令行下的redis客戶端使用,其他情況一般是使用multibulk協議。
- 如果客戶端傳送的數據的第一個字符時‘*’,那么傳送數據將被當做multibulk協議處理,否則將被當做inline協議處理。Inline協議的具體解析函數是processInlineBuffer,multibulk協議的具體解析函數是processMultibulkBuffer。
- 當協議解析完畢,即客戶端傳送的數據已經解析出命令字段和參數字段,接下來進行命令處理,命令處理函數是processCommand。
Inline請求協議
Networking.c:679
int processInlineBuffer(redisClient *c) {...
}
- 根據空格分割客戶端傳送過來的數據,把傳送過來的命令和參數保存在argv數組中,把參數個數保存在argc中,argc的值包括了命令參數本身。即set key value命令,argc的值為3。詳細解析見協議詳解
Multibulk請求協議
Multibulk協議比inline協議復雜,它是二進制安全的,即傳送數據可以包含不安全字符。Inline協議不是二進制安全的,比如,如果set key value命令中的key或value包含空白字符,那么inline協議解析時將會失敗,因為解析出來的參數個數與命令需要的的參數個數會不一致。
協議格式
*<number of arguments> CR LF $<number of bytes of argument 1> CR LF <argument data> CR LF ... $<number of bytes of argument N> CR LF <argument data> CR LF
協議舉例
*3 $3 SET $5 mykey $7 myvalue
具體解析代碼位于
Networking.c:731
int processMultibulkBuffer(redisClient *c) {
...
}
詳細解析見協議詳解
處理命令
當協議解析完畢,則表示客戶端的命令輸入已經全部讀取并已經解析成功,接下來就是執行客戶端命令前的準備和執行客戶端傳送過來的命令
Redis.c:1062
/* If this function gets called we already read a whole* command, argments are in the client argv/argc fields.* processCommand() execute the command or prepare the* server for a bulk read from the client.** If 1 is returned the client is still alive and valid and* and other operations can be performed by the caller. Otherwise* if 0 is returned the client was destroied (i.e. after QUIT). */
int processCommand(redisClient *c) {
.../* Now lookup the command and check ASAP about trivial error conditions* such as wrong arity, bad command name and so forth. */
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
...
call(c);
...
}
- lookupCommand先根據客戶端傳送過來的數據查找該命令并找到命令的對應處理函數。
- Call函數調用該命令函數來處理命令,命令與對應處理函數的綁定位于。
Redi.c:72
struct redisCommand *commandTable;
struct redisCommand readonlyCommandTable[] = {
{"get",getCommand,2,0,NULL,1,1,1},
...
}
回復請求
回復請求位于對應的命令中,以get命令為例
T_string.c:67
void getCommand(redisClient *c) {getGenericCommand(c);
}
T_string.c:52
int getGenericCommand(redisClient *c) {robj *o;if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL)return REDIS_OK;if (o->type != REDIS_STRING) {addReply(c,shared.wrongtypeerr);return REDIS_ERR;} else {addReplyBulk(c,o);return REDIS_OK;}
}
- getGenericCommand在getset 命令中也會用到。
- lookupKeyReadOrReply是以讀數據為目的查詢key函數,并且如果該key不存在,則在該函數中做不存在的回包處理。
- 如果該key存在,則返回該key對應的數據,addReply函數以及以addReply函數開頭的都是回包函數。
綁定寫數據的回調函數
接下來看看addReply函數里的內容
Networking.c:190
void addReply(redisClient *c, robj *obj) {if (_installWriteEvent(c) != REDIS_OK) return;...
}
Networking.c:64
int _installWriteEvent(redisClient *c) {if (c->fd <= 0) return REDIS_ERR;if (c->bufpos == 0 && listLength(c->reply) == 0 &&(c->replstate == REDIS_REPL_NONE ||c->replstate == REDIS_REPL_ONLINE) &&aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,sendReplyToClient, c) == AE_ERR) return REDIS_ERR;return REDIS_OK;
}
- addReply函數一進來就先調用綁定寫數據的回調函數installWriteEvent。
- installWriteEvent函數中創建了一個文件寫事件和綁定寫事件的回調函數為sendReplyToClient。
準備寫的數據內容
addReply函數一進來后就綁定寫數據的回調函數,接下來就是準備寫的數據內容
Networking.c:190
void addReply(redisClient *c, robj *obj) {if (_installWriteEvent(c) != REDIS_OK) return;redisAssert(!server.vm_enabled || obj->storage == REDIS_VM_MEMORY);/* This is an important place where we can avoid copy-on-write* when there is a saving child running, avoiding touching the* refcount field of the object if it's not needed.** If the encoding is RAW and there is room in the static buffer* we'll be able to send the object to the client without* messing with its page. */if (obj->encoding == REDIS_ENCODING_RAW) {if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)_addReplyObjectToList(c,obj);} else {/* FIXME: convert the long into string and use _addReplyToBuffer()* instead of calling getDecodedObject. As this place in the* code is too performance critical. */obj = getDecodedObject(obj);if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)_addReplyObjectToList(c,obj);decrRefCount(obj);}
}
- 先嘗試把要返回的內容添加到發送數據緩沖區中(redisClient->buf),如果該緩沖區的大小已經放不下這次想放進去的數據,或者已經有數據在排隊(redisClient->reply 鏈表不為空),則把數據添加到發送鏈表的尾部。
給客戶端答復數據
在綁定寫數據的回調函數中看到綁定了回調函數sendReplyToClient,現在來看看這個函數的主要內容
Networking.c:566
void sendReplyToClient(aeEventLoop *el, int fd, ...) {...
while(c->bufpos > 0 || listLength(c->reply)) {...if(c->bufpos > 0){...nwritten=write(fd,...,c->bufpos-c->sentlen);...} else {o = listNodeValue(listFirst(c->reply));...nwritten=write(fd,...,objlen-c->sentlen);...}}
}
- 通過調用系統函數write給客戶端發送數據,如果緩沖區有數據就把緩沖區的數據發送給客戶端,緩沖區的數據發送完了,如果有排隊數據,則繼續發送。
退出
Redis 服務器的退出是通過shutdown命令來退出的,退出前會做一系列的清理工作
Db.c:347
void shutdownCommand(redisClient *c) {if (prepareForShutdown() == REDIS_OK)exit(0);addReplyError(c,"Errors trying to SHUTDOWN. Check logs.");
}
總結
框架從啟動,接收請求,讀取客戶端數據,請求協議解析,處理命令,回復請求,退出對redis運行的整個流程做了一個梳理。對整個redis的運作和框架有了一個初步的了解。
總結
以上是生活随笔為你收集整理的Redis运行流程源码解析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: redis源码分析--zslRandom
- 下一篇: redis常用命令参考