Redis源码阅读(十一) – replication
slaveof
配置redis.conf
中slaveof <masterip> <masterport>
或使用slaveof
命令,会调用replicationSetMaster()
进行设置:
server.masterhost = sdsnew(argv[1]);
server.masterport = atoi(argv[2]);
server.repl_state = REPL_STATE_CONNECT;
之后会进入serverCron()
中每秒调用一次的replicationCron()
与master
建立连接:
/* Check if we should connect to a MASTER */
if (server.repl_state == REPL_STATE_CONNECT) {
serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
server.masterhost, server.masterport);
if (connectWithMaster() == C_OK) {
serverLog(LL_NOTICE,"MASTER <-> SLAVE sync started");
}
}
connectWithMaster()
使用非阻塞套接字建立连接,并注册可读可写syncWithMaster()
文件事件,更新repl_state = REPL_STATE_CONNECTING
。
connect()
默认的行为是等到接收到对端的SYN + ACK
再返回,如果连接不能立即建立,会阻塞一段时间,直到出错或成功建立连接。使用非阻塞connect()
会在不能立即建立连接的情况下,立即返回EINPROGRESS
错误。
确定连接是否成功建立需要使用select()
或poll()
或epoll()
,监控套接字返回可写条件,不过因为连接出错会同时返回可读和可写,
所以在可写时,要检查SO_ERROR
:getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len)
,当建立成功时,error
为0;建立失败,error
为相应的错误,需要关闭套接字,重新创建再连接,不能再次调用connect()
会返回EADDRINUSE
。
当阻塞的connect()
被中断时,也需要使用上面的方法:
/* syncWithMaster() */
/* Check for errors in the socket: after a non blocking connect() we
* may find that the socket is in error state. */
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)
sockerr = errno;
if (sockerr) {
serverLog(LL_WARNING,"Error condition on socket for SYNC: %s",
strerror(sockerr));
goto error;
}
连接成功建立后,会删除可写事件syncWithMaster()
, master
和slave
会有下面几个交互:
slave
发送PING
- 如果配置
auth
,slave
发送AUTH
slave
发送REPLCONF listening-port <port>
;- 如果使用
docker
等,可能会设置slave-announce-ip
,slave
发送REPLCONF ip-address <ip>
- 发送
REPLCONF capca eof capa psync2
,主要是为了兼容不同版本,master
根据这些对slave
采取不同的操作 - 开始
PSYNC
PSYNC
Redis
使用Replication ID, offset
唯一标识一个数据集:
Replication ID
: 在Redis
初始化时调用changeReplicationId
随机生成的长度为40
的字符串server.replid
,用于确定Redis
实例offset
: 已经同步的命令的offset
syncWithMaster()
调用slaveTryPartialResynchronization()
会发送PSYNC
与master
同步:
slave
如果有server.cached_master
,发送PSYNC server.cached_master->replid server.cached_master->reploff+1
- 没有,发送
PSYNC ? -1
master
收到后,会在syncCommand()
中调用masterTryPartialResynchronization()
判断是否可以partial resync
:
/* Is the replication ID of this master the same advertised by the wannabe
* slave via PSYNC? If the replication ID changed this master has a
* different replication history, and there is no way to continue.
*
* Note that there are two potentially valid replication IDs: the ID1
* and the ID2. The ID2 however is only valid up to a specific offset. */
if (strcasecmp(master_replid, server.replid) &&
(strcasecmp(master_replid, server.replid2) ||
psync_offset > server.second_replid_offset))
{
/* ... */
goto need_full_resync;
}
/* We still have the data our slave is asking for? */
if (!server.repl_backlog ||
psync_offset < server.repl_backlog_off ||
psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
{
/* ... */
goto need_full_resync;
}
只有slave
发送的master_replid
和master
的replid
相同,且repl_off
在master
的repl_backlog
有效数据范围内,才会进行paritial resync
。
只有到PSYNC
阶段才会标记client
为slave
:
Full sync
:c->replstate = SLAVE_STATE_WAIT_BGSAVE_START; if (server.repl_disable_tcp_nodelay) anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */ c->repldbfd = -1; c->flags |= CLIENT_SLAVE; listAddNodeTail(server.slaves,c);
Partial resync
:c->flags |= CLIENT_SLAVE; c->replstate = SLAVE_STATE_ONLINE; c->repl_ack_time = server.unixtime; c->repl_put_online_on_ack = 0; listAddNodeTail(server.slaves,c);
Full Sync
确定Full Sync
后会尝试创建rdb
,有这几种情况:
- 如果有
disk
类型rdb
并且有其他的slave
处于SLAVE_STATE_WAIT_BGSAVE_END
状态,会复制该slave
的output buffer
,等待同一个rdb
完成,发送+FULLRESYNC server.replid slave->psync_initial_offset
。 没有slave
处于该状态会在replicationCron()
等待触发rdb
- 如果有
socket
类型rdb
,等待完成,在replicationCron()
中再触发rdb
- 当前没有
rdb
:socket
: 等待repl-diskless-sync-delay
秒,为了一次性sync
多个slave
disk
: 调用startBgSaveForReplication()
开始rdb
disk
master
调用startBgSaveForReplication()
同时处理disk
和socket
类型的rdb
,disk
类型会调用replicationSetupSlaveForResync()
设置
slave->psync_initial_offset = server.master_repl_offset
server->replstate = SLAVE_STATE_WAIT_BGSAVE_END
- 发送
+FULLRESYNC server.replid server.master_repl_offset
slave
收到FULLRESYNC replid offset
后:
- 设置
server.master_replid = replid
- 设置
server.master_initial_offset = offset
- 创建
temp rdb
文件,注册可读事件readSyncBulkPayload()
代替syncWithMaster()
- 更新
server.repl_state = REPL_STATE_TRANSFER
当master
rdb
完成后,会更新每个等待rdb
完成的slave
状态为SLAVE_STATE_SEND_BULK
,注册写事件sendBulkToSlave()
:
/* serverCron()->backgroundSaveDoneHandler()->backgroundSaveDoneHandlerDisk()->updateSlavesWaitingBgsave() */
if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
redis_fstat(slave->repldbfd,&buf) == -1) {
freeClient(slave);
serverLog(LL_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
continue;
}
slave->repldboff = 0;
slave->repldbsize = buf.st_size;
slave->replstate = SLAVE_STATE_SEND_BULK;
slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
(unsigned long long) slave->repldbsize);
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
freeClient(slave);
continue;
}
master
写事件sendBulkToSlave()
会首先发送slave->replpreamble: rdb文件的大小
,为了防止阻塞,然后每次发送最多PROTO_IOBUF_LEN
的数据,当全部发送完毕后,调用putSlaveOnline()
,
更新slave->replstate = SLAVE_STATE_ONLINE
,注册写事件sendReplyToClient()
。
slave
读事件readSyncBulkPayload()
可以处理dist
和socket
两种类型的Full sync
。和sendBulkToSlave()
对应着,首先读取replpreamble
,保存在server.repl_transfer_size
,然后每次
最多读取4096 bytes
数据并写入server.repl_transfer_fd
,当全部接收完会作如下操作:
emptyDb(-1...)
: 清空所有db
- 删除读事件
rename(server.repl_transfer_tempfile, server.rdb_filename)
, 调用rdbLoad()
- 调用
replicationCreateMasterClient()
创建server.master
- 更新状态
sever.repl_state = REPL_STATE_CONNECTED
- 设置
server.replid
为master
的replid
- 清空
replid2
- 创建
repl_backlog
此时,Full sync
结束。
socket
disk
类型需要先创建rdb
文件,然后每次发送的时候都需要lseek()
、read()
,然后再发送给slave
,当网卡速度比磁盘速度快的时候可以设置repl-diskless-sync yes
使用socket
类型full sync
。
socket
类型的rdb
无法像disk
一样服用rdb
,所以为了服务多个slave
,会等待repl-diskless-sync-delay
秒:
/* replicationCron() */
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
idle = server.unixtime - slave->lastinteraction;
if (idle > max_idle) max_idle = idle;
slaves_waiting++;
mincapa = (mincapa == -1) ? slave->slave_capa :
(mincapa & slave->slave_capa);
}
}
if (slaves_waiting &&
(!server.repl_diskless_sync ||
max_idle > server.repl_diskless_sync_delay))
{
/* Start the BGSAVE. The called function may start a
* BGSAVE with socket target or disk target depending on the
* configuration and slaves capabilities. */
startBgsaveForReplication(mincapa);
}
和普通rdb
过程不一样的是,rio
不再以文件初始化,而是以socket
初始化,每次调用rioWrite()->rioFdsetWrite()
会给每个socket
发送,只要有一个成功就算成功。
为了编程简单,每个socket
会设为阻塞,并设置超时时间:
/* Put the socket in blocking mode to simplify RDB transfer.
* We'll restore it when the children returns (since duped socket
* will share the O_NONBLOCK attribute with the parent). */
anetBlock(NULL,slave->fd);
anetSendTimeout(NULL,slave->fd,server.repl_timeout*1000);
SO_SNDTIMEO
选项之前不知道,记录一下。当发送超时时,会返回EWOULDBLOCK
:
/* With blocking sockets, which is the sole user of this
* rio target, EWOULDBLOCK is returned only because of
* the SO_SNDTIMEO socket option, so we translate the error
* into one more recognizable by the user. */
/* Set the socket send timeout (SO_SNDTIMEO socket option) to the specified
* number of milliseconds, or disable it if the 'ms' argument is zero. */
int anetSendTimeout(char *err, int fd, long long ms) {
struct timeval tv;
tv.tv_sec = ms/1000;
tv.tv_usec = (ms%1000)*1000;
if (setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) == -1) {
anetSetError(err, "setsockopt SO_SNDTIMEO: %s", strerror(errno));
return ANET_ERR;
}
return ANET_OK;
}
rdb
格式也发生了变化,不像文件一样可以知道大小,增加了前缀和后缀用于判断结束:
prefix
:$EOF:<40 bytes unguessable hex string>\r\n
suffix
:<40 bytes unguessable hex string>
slave
流程和disk
一样。而rdb
子进程结束也就意味着socket
类型full sync
结束,在backgroundSaveDoneHandlerSocket
中会通过pipe
判断所有slave
的状态,然后恢复slave
设置或关闭连接,设置slave
状态:
/* Note: we wait for a REPLCONF ACK message from slave in
* order to really put it online (install the write handler
* so that the accumulated data can be transfered). However
* we change the replication state ASAP, since our slave
* is technically online now. */
slave->replstate = SLAVE_STATE_ONLINE;
slave->repl_put_online_on_ack = 1;
slave->repl_ack_time = server.unixtime; /* Timeout otherwise. */
Partial Resync
Partial Resync
只需发送slave
缺少的命令。master
做如下操作:
- 设置
slave
状态:c->flags |= CLIENT_SLAVE; c->replstate = SLAVE_STATE_ONLINE; c->repl_ack_time = server.unixtime; c->repl_put_online_on_ack = 0; listAddNodeTail(server.slaves,c);
- 发送
+CONTINUE server.replid
- 发送
repl_backlog
中部分命令
slave
收到+CONTINUE
后会更新相关状态,并注册事件:
replid2 = cached_master.replid
,即前一个master
的id
replid
更新为当前master
的id
- 创建
server.master
,注册读事件readQueryFromClient()
,所以当master
发送增量写命令时,会像普通client
一样对待
sync过程中的write
在sync
过程中,master
会收到写命令,需要同步给slave
。在full sync
和partial sync
中行为有些不同,在开始sync
时,master
都会将
slave
追加到server.slaves
,但是状态不同:
full sync
:SLAVE_STATE_WAIT_BGSAVE_*
partial resync
:SLAVE_STATE_ONLINE
master
收到写操作会调用propagate()->replicationFeedSlaves()
将命令发送给每个slave
,真正写入buffer
和普通client
一样,会首先调用prepareClientToWrite()
:
/* addReply()->prepareClientToWrite() */
/* Schedule the client to write the output buffers to the socket only
* if not already done (there were no pending writes already and the client
* was yet not flagged), and, for slaves, if the slave can actually
* receive writes at this stage. */
if (!clientHasPendingReplies(c) &&
!(c->flags & CLIENT_PENDING_WRITE) &&
(c->replstate == REPL_STATE_NONE ||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
{
/* Here instead of installing the write handler, we just flag the
* client and put it into a list of clients that have something
* to write to the socket. This way before re-entering the event
* loop, we can try to directly write to the client sockets avoiding
* a system call. We'll only really install the write handler if
* we'll not be able to write the whole reply at once. */
c->flags |= CLIENT_PENDING_WRITE;
listAddNodeHead(server.clients_pending_write,c);
}
可以看到只有处于ONLINE
状态的才会真正发送,其余状态的只会写入buffer
。所以full sync
需要主动将累计在buffer
的命令发送给slave
,否则一直没有写命令的话,slave
数据会一直滞后:
disk
: 在全部发送完毕后,调用putSaveOnline()
, 更新slave->replstate = SLAVE_STATE_ONLINE
,注册写事件sendReplyToClient()
,将buffer
中数据发送给slave
socket
: 更新slave->replstate = SLAVE_STATE_ONLINE
,设置slave->repl_put_online_on_ack = 1
(disk
为0
,paritial
也为0
)。slave
会在replicationCron()
中发送REPLCONF ACK reploff
给master
,master
会在这时调用putSlaveOnline()
。为什么要多一步呢?没看出来原因。
当sync
完成后(不管是full sync
还是partial resync
),此时master
对于slave
来说会像普通client
一样,读取写命令,保持dataset
的一致(最终一致性)。
增量数据
简单来说,类似aof
的机制,master
收到写操作会调用propagate()->replicationFeedSlaves()
将命令写入到server.repl_backlog
中,
并且发送给每个slave
进行同步。
server.repl_backlog
是一个环形缓冲区,大小通过repl-backlog-size
配置,默认为1mb
:
/* Add data to the replication backlog.
* This function also increments the global replication offset stored at
* server.master_repl_offset, because there is no case where we want to feed
* the backlog without incrementing the offset. */
void feedReplicationBacklog(void *ptr, size_t len) {
unsigned char *p = ptr;
server.master_repl_offset += len;
/* This is a circular buffer, so write as much data we can at every
* iteration and rewind the "idx" index if we reach the limit. */
while(len) {
size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;
if (thislen > len) thislen = len;
memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);
server.repl_backlog_idx += thislen;
if (server.repl_backlog_idx == server.repl_backlog_size)
server.repl_backlog_idx = 0;
len -= thislen;
p += thislen;
server.repl_backlog_histlen += thislen;
}
if (server.repl_backlog_histlen > server.repl_backlog_size)
server.repl_backlog_histlen = server.repl_backlog_size;
/* Set the offset of the first byte we have in the backlog. */
server.repl_backlog_off = server.master_repl_offset -
server.repl_backlog_histlen + 1;
}
从上面写入server.repl_backlog
的代码可以看出:
server.master_repl_offset
: 写入到repl_backlog
的全部字节数server.repl_backlog_idx
: 下一个字节写入的位置server.repl_backlog_histlen
: 缓冲区中有效数据长度server.repl_backlog_off
: 有效数据的起始位置
slave
收到master
发送的命令,会在readyQueryFromClient()
判断是否是master
:
- 更新
c->reploff
- 调用
replicationFeedSlavesFromMasterStream()
:- 调用
feedReplicationBacklog()
: 更新server.master_repl_offset
- 作为代理,将
master
发送过来的数据发送给自己的slave
(级联slave
)
- 调用
replicationCron()
在serverCron()
中每秒调用一次replicationCron()
处理建立连接和一些周期性任务:
- 超时: 连接超时、
rdb
传输超时、master
ping
超时 server.repl_state == REPL_STATE_CONNECT
时,建立连接。连接使用非阻塞connect
,并注册可读可写文件事件syncWithMaster()
,设置server.repl_state = REPL_STATE_CONNECTING
slave
发送REPLCONF ACK
。master
用于更新slave->repl_ack_off
,用在Wait
命令中,实现同步复制,同时更新slave->repl_ack_time
用于超时master
定期(repl-ping-slave-period
)发送ping
,slave
可以根据server.master.lastinteraction
判断master
超时- 处理超时
slave
- 根据
repl-backlog-ttl
,释放repl_back
- 创建
rdb
当出现超时或套接字关闭时,会调用freeClient()
释放连接,master
和slave
有不同的操作:
master
: 关闭连接,释放client
,slave
会读0slave
: 在释放master
之前,会调用replicationCacheMaster()
将master
状态保存在server.cached_master
,然后调用replicationHandleMasterDisconnection()
更新状态为REPL_STATE_CONNECT
, 会在之后尝试重连,并发送server.cached_master
状态尝试paritial resync
可以看到,在连接已经成功建立后,master
不会再对slave
的reploff
进行判断是否要进行full sync
。Redis
依赖tcp
的“可靠性”,认为要么超时、要么报错,否则数据一定会到达slave
。
重启
rdbPopulateSaveInfo()
在下面几种情况下会返回非NULL
:
- 是
master
- 是
slave
- 有
cached_master
在rdbSaveInfoAuxFields()
会写入rsi
相关信息,以便重启后能进行parital resync
:
if (rsi) {
if (rdbSaveAuxFieldStrInt(rdb,"repl-stream-db",rsi->repl_stream_db)
== -1) return -1;
if (rdbSaveAuxFieldStrStr(rdb,"repl-id",server.replid)
== -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",server.master_repl_offset)
== -1) return -1;
}
启动时,如果配置了slaveof
,会设置server
状态:
server.masterhost = sdsnew(argv[1]);
server.masterport = atoi(argv[2]);
server.repl_state = REPL_STATE_CONNECT;
在loadDataFromDisk()
中会读取rdb
的rsi
恢复replication
信息,同时以当前信息创建cached_master
,在后续的syncWithMaster()
会发送cached_master
信息,尝试resync
:
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
if (rdbLoad(server.rdb_filename,&rsi) == C_OK) {
serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds",
(float)(ustime()-start)/1000000);
/* Restore the replication ID / offset from the RDB file. */
if (server.masterhost &&
rsi.repl_id_is_set &&
rsi.repl_offset != -1 &&
/* Note that older implementations may save a repl_stream_db
* of -1 inside the RDB file in a wrong way, see more information
* in function rdbPopulateSaveInfo. */
rsi.repl_stream_db != -1)
{
memcpy(server.replid,rsi.repl_id,sizeof(server.replid));
server.master_repl_offset = rsi.repl_offset;
/* If we are a slave, create a cached master from this
* information, in order to allow partial resynchronizations
* with masters. */
replicationCacheMasterUsingMyself();
selectDb(server.cached_master,rsi.repl_stream_db);
}
}
failover
在Redis4.0
之前,进行主从切换failover
需要全量同步,耗费资源,而且大多数情况数据集都保持一直,没有必要重新同步。Redis4.0
之后解决了这个问题。
假设A
为master
,B
为slave
,进行主从切换:
- 发送
slaveof no one
给B
:B
会调用shiftReplicationId()
:memcpy(server.replid2,server.replid,sizeof(server.replid)); server.second_replid_offset = server.master_repl_offset+1; changeReplicationId();
所以
server.replid2
和server.second_replid_offset
保存了与之前master
的同步进度,同时server.repl_backlog
还保存着之前master
同步过来的数据,用于之后的parital resync
。 - 发送
slaveof B.ip B.port
给A
:A
会调用replicationSetMaster()->replicationCacheMasterUsingMyself()
以自己状态创建cached_master
,用于之后的同步:server.cached_master.reploff = server.master_repl_offset; server.cached_master.repliid = server.master_replid;
再回顾一下slave
发送PSYNC
和master
判断Partial resync
的代码:
slave
:if (server.cached_master) { psync_replid = server.cached_master->replid; snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1); serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset); }
master
:if (strcasecmp(master_replid, server.replid) && (strcasecmp(master_replid, server.replid2) || psync_offset > server.second_replid_offset)) { /* ... */ goto need_full_resync; } /* We still have the data our slave is asking for? */ if (!server.repl_backlog || psync_offset < server.repl_backlog_off || psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)) { /* ... */ goto need_full_resync; }
可以看到A
以自己的状态发送给B
尝试重连,而B
同样会根据之前A
的状态,判断是否需要full sync
,当然如果在这期间有很多写操作,更新了B
的repl_backlog
,还是会full sync
。
要注意如果以这种方式进行failover
,如果在两次slaveof
命令之间又有新的写入到旧的master
可能会丢失数据,而且需要full sync
。cluster
模式的手动failover
会在过程中拒绝master
的流量。
expire
slave
不会主动进行expire
,在调用activeExpireCycle()
前会先判断是否存在master
,存在则不进行。同样expireIfNeeded()
只会返回逻辑上的过期,不会删key
:
/* If we are running in the context of a slave, return ASAP:
* the slave key expiration is controlled by the master that will
* send us synthesized DEL operations for expired keys.
*
* Still we try to return the right information to the caller,
* that is, 0 if we think the key should be still valid, 1 if
* we think the key is expired at this time. */
if (server.masterhost != NULL) return now > when;
master
会调用propagateExpire()
将相应的del
操作发送给slave
来达到一致,因为del
操作同样需要查找key
,所以slave
对master
的client
做特殊对待:
robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
robj *val;
if (expireIfNeeded(db,key) == 1) {
/* Key expired. If we are in the context of a master, expireIfNeeded()
* returns 0 only when the key does not exist at all, so it's safe
* to return NULL ASAP. */
if (server.masterhost == NULL) return NULL;
/* However if we are in the context of a slave, expireIfNeeded() will
* not really try to expire the key, it only returns information
* about the "logical" status of the key: key expiring is up to the
* master in order to have a consistent view of master's data set.
*
* However, if the command caller is not the master, and as additional
* safety measure, the command invoked is a read-only command, we can
* safely return NULL here, and provide a more consistent behavior
* to clients accessign expired values in a read-only fashion, that
* will say the key as non exisitng.
*
* Notably this covers GETs when slaves are used to scale reads. */
if (server.current_client &&
server.current_client != server.master &&
server.current_client->cmd &&
server.current_client->cmd->flags & CMD_READONLY)
{
return NULL;
}
}
val = lookupKey(db,key,flags);
if (val == NULL)
server.stat_keyspace_misses++;
else
server.stat_keyspace_hits++;
return val;
}
evict
slave
还是会进行evict
的操作,同时master
也会propagate()
给slave
它evict
的key
。这会有风险:
- 当
slave
和master
maxmemory
相同,且slave
为readonly
时:所有的写操作都是由master
同步过来,所以slave
不会evcit
,数据集保持一致 - 当
slave
maxmemory
比master
小时,会造成数据集不一致
当然没有理由设置不同的maxmemory
。
readonly
slave
默认为readonly
,读写分离能提升qps
:
/* Don't accept write commands if this is a read only slave. But
* accept write commands if this is our master. */
if (server.masterhost && server.repl_slave_ro &&
!(c->flags & CLIENT_MASTER) &&
c->cmd->flags & CMD_WRITE)
{
addReply(c, shared.roslaveerr);
return C_OK;
}
slave
也可以配置为可写,可以用来操作比较耗时的操作,此时master
和slave
数据集可能会不一致,重启或重连后再次同步。
在4.0
版本之前,可写slave
无法设置key
过期,因为slave
只会等待master
传递过期,这会导致key
leak
,不会获取到但是还存在。Redis4.0
会将slave
设置的key
保存在
dict *slaveKeyWithExpire
中,单独进行过期。
留下评论