日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

【Redis学习笔记】2018-06-12 复制与传播

發布時間:2025/3/20 数据库 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【Redis学习笔记】2018-06-12 复制与传播 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

順風車運營研發團隊 譚淼
一、復制

Redis的主從同步復制包括兩種方式:一種是完全復制,另一種是部分復制。

完全復制:主Redis生產RDB,傳輸到從服務器進行同步。

部分復制:主Redis從復制積壓緩沖區中獲取數據,發送給從服務器進行同步。

假設兩臺Redis服務器A和B啟動后,對B執行slaveof命令,使得B變為A的從服務器,整體流程如下:

(1)B接收到slaveof命令時候,會執行slaveofCommand()函數,slaveofCommand()函數會調用replicationSetMaster()函數中,將復制狀態為REPL_STATE_CONNECT

/* Set replication to the specified master address and port. */ void replicationSetMaster(char *ip, int port) {......server.repl_state = REPL_STATE_CONNECT;server.repl_down_since = 0; }

(2)周期調度的時間事件中,會定期執行replicationCron()函數

/* This is our timer interrupt, called server.hz times per second.* Here is where we do a number of things that need to be done asynchronously.* For instance:** - Active expired keys collection (it is also performed in a lazy way on* lookup).* - Software watchdog.* - Update some statistic.* - Incremental rehashing of the DBs hash tables.* - Triggering BGSAVE / AOF rewrite, and handling of terminated children.* - Clients timeout of different kinds.* - Replication reconnection.* - Many more...** Everything directly called here will be called server.hz times per second,* so in order to throttle execution of things we want to do less frequently* a macro is used: run_with_period(milliseconds) { .... }*/int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {....../* Replication cron function -- used to reconnect to master,* detect transfer failures, start background RDB transfers and so forth. */run_with_period(1000) replicationCron();...... }

(3)在replicationCron()函數中,如果檢測到REPL_STATE_CONNECT狀態,調用connectWithMaster()。

/* --------------------------- REPLICATION CRON ---------------------------- *//* Replication cron function, called 1 time per second. */ void replicationCron(void) {....../* Check if we should connect to a MASTER */if (server.repl_state == REPL_STATE_CONNECT) {serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",server.masterhost, server.masterport);if (connectWithMaster() == C_OK) {serverLog(LL_NOTICE,"MASTER <-> SLAVE sync started");}}...... }

(4)在connectWithMaster()中,會設置文件事件,事件處理函數為syncWithMaster()函數

int connectWithMaster(void) {int fd;fd = anetTcpNonBlockBestEffortBindConnect(NULL,server.masterhost,server.masterport,NET_FIRST_BIND_ADDR);if (fd == -1) {serverLog(LL_WARNING,"Unable to connect to MASTER: %s",strerror(errno));return C_ERR;}if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==AE_ERR){close(fd);serverLog(LL_WARNING,"Can't create readable event for SYNC");return C_ERR;}server.repl_transfer_lastio = server.unixtime;server.repl_transfer_s = fd;server.repl_state = REPL_STATE_CONNECTING;return C_OK; }

(5)在syncWithMaster中,從Redis首先會與主Redis進行通信,交換關鍵信息

(6)master根接收到PSYN消息后,根據復制ID如果復制ID與自己的復制ID相同且復制偏移量仍然存在于復制緩存區中(server.repl_backlog),那么執行部分重同步,回復CONTINUE消息,并從復制緩存區中復制相關的數據到slave。否則執行全量重同步,回復FULLRESYNC消息,生成RDB傳輸到slave。

二、命令傳播

當在主Redis寫一條命令時,會調用server.c中的call()函數,call()函數會調用propagate()來向從Redis更新數據

/* Call() is the core of Redis execution of a command.** The following flags can be passed:* CMD_CALL_NONE No flags.* CMD_CALL_SLOWLOG Check command speed and log in the slow log if needed.* CMD_CALL_STATS Populate command stats.* CMD_CALL_PROPAGATE_AOF Append command to AOF if it modified the dataset* or if the client flags are forcing propagation.* CMD_CALL_PROPAGATE_REPL Send command to salves if it modified the dataset* or if the client flags are forcing propagation.* CMD_CALL_PROPAGATE Alias for PROPAGATE_AOF|PROPAGATE_REPL.* CMD_CALL_FULL Alias for SLOWLOG|STATS|PROPAGATE.** The exact propagation behavior depends on the client flags.* Specifically:** 1. If the client flags CLIENT_FORCE_AOF or CLIENT_FORCE_REPL are set* and assuming the corresponding CMD_CALL_PROPAGATE_AOF/REPL is set* in the call flags, then the command is propagated even if the* dataset was not affected by the command.* 2. If the client flags CLIENT_PREVENT_REPL_PROP or CLIENT_PREVENT_AOF_PROP* are set, the propagation into AOF or to slaves is not performed even* if the command modified the dataset.** Note that regardless of the client flags, if CMD_CALL_PROPAGATE_AOF* or CMD_CALL_PROPAGATE_REPL are not set, then respectively AOF or* slaves propagation will never occur.** Client flags are modified by the implementation of a given command* using the following API:** forceCommandPropagation(client *c, int flags);* preventCommandPropagation(client *c);* preventCommandAOF(client *c);* preventCommandReplication(client *c);**/ void call(client *c, int flags) {....../* Propagate the command into the AOF and replication link */if (flags & CMD_CALL_PROPAGATE &&(c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP){......if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);}...... }

propagate()會調用replicationFeedSlaves(),向復制積壓緩沖區中寫入數據

/* Propagate the specified command (in the context of the specified database id)* to AOF and Slaves.** flags are an xor between:* + PROPAGATE_NONE (no propagation of command at all)* + PROPAGATE_AOF (propagate into the AOF file if is enabled)* + PROPAGATE_REPL (propagate into the replication link)** This should not be used inside commands implementation. Use instead* alsoPropagate(), preventCommandPropagation(), forceCommandPropagation().*/ void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,int flags) {if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)feedAppendOnlyFile(cmd,dbid,argv,argc);if (flags & PROPAGATE_REPL)replicationFeedSlaves(server.slaves,dbid,argv,argc); }

replicationFeedSlaves()負責向復制積壓緩沖區中寫入數據

/* Propagate write commands to slaves, and populate the replication backlog* as well. This function is used if the instance is a master: we use* the commands received by our clients in order to create the replication* stream. Instead if the instance is a slave and has sub-slaves attached,* we use replicationFeedSlavesFromMaster() */ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {....../* Write the command to the replication backlog if any. */if (server.repl_backlog) {char aux[LONG_STR_SIZE+3];/* Add the multi bulk reply length. */aux[0] = '*';len = ll2string(aux+1,sizeof(aux)-1,argc);aux[len+1] = '\r';aux[len+2] = '\n';feedReplicationBacklog(aux,len+3);for (j = 0; j < argc; j++) {long objlen = stringObjectLen(argv[j]);/* We need to feed the buffer with the object as a bulk reply* not just as a plain string, so create the $..CRLF payload len* and add the final CRLF */aux[0] = '$';len = ll2string(aux+1,sizeof(aux)-1,objlen);aux[len+1] = '\r';aux[len+2] = '\n';feedReplicationBacklog(aux,len+3);feedReplicationBacklogWithObject(argv[j]);feedReplicationBacklog(aux+len+1,2);}}/* Write the command to every slave. */listRewind(slaves,&li);while((ln = listNext(&li))) {client *slave = ln->value;/* Don't feed slaves that are still waiting for BGSAVE to start */if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;/* Feed slaves that are waiting for the initial SYNC (so these commands* are queued in the output buffer until the initial SYNC completes),* or are already in sync with the master. *//* Add the multi bulk length. */addReplyMultiBulkLen(slave,argc);/* Finally any additional argument that was not stored inside the* static buffer if any (from j to argc). */for (j = 0; j < argc; j++)addReplyBulk(slave,argv[j]);} }

總結

以上是生活随笔為你收集整理的【Redis学习笔记】2018-06-12 复制与传播的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。