Redis源码阅读(十一) – replication
slaveof
配置redis.conf中slaveof <masterip> <masterport>或使用slaveof命令,会调用replicationSetMaster()进行设置:
server.masterhost = sdsnew(argv[1]);
server.masterport = atoi(argv[2]);
server.repl_state = REPL_STATE_CONNECT;
之后会进入serverCron()中每秒调用一次的replicationCron()与master建立连接:
/* Check if we should connect to a MASTER */
if (server.repl_state == REPL_STATE_CONNECT) {
serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
server.masterhost, server.masterport);
if (connectWithMaster() == C_OK) {
serverLog(LL_NOTICE,"MASTER <-> SLAVE sync started");
}
}
connectWithMaster()使用非阻塞套接字建立连接,并注册可读可写syncWithMaster()文件事件,更新repl_state = REPL_STATE_CONNECTING。
connect()默认的行为是等到接收到对端的SYN + ACK再返回,如果连接不能立即建立,会阻塞一段时间,直到出错或成功建立连接。使用非阻塞connect()会在不能立即建立连接的情况下,立即返回EINPROGRESS错误。
确定连接是否成功建立需要使用select()或poll()或epoll(),监控套接字返回可写条件,不过因为连接出错会同时返回可读和可写,
所以在可写时,要检查SO_ERROR:getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len),当建立成功时,error为0;建立失败,error为相应的错误,需要关闭套接字,重新创建再连接,不能再次调用connect()会返回EADDRINUSE。
当阻塞的connect()被中断时,也需要使用上面的方法:
/* syncWithMaster() */
/* Check for errors in the socket: after a non blocking connect() we
* may find that the socket is in error state. */
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)
sockerr = errno;
if (sockerr) {
serverLog(LL_WARNING,"Error condition on socket for SYNC: %s",
strerror(sockerr));
goto error;
}
连接成功建立后,会删除可写事件syncWithMaster(), master和slave会有下面几个交互:
slave发送PING- 如果配置
auth,slave发送AUTH slave发送REPLCONF listening-port <port>;- 如果使用
docker等,可能会设置slave-announce-ip,slave发送REPLCONF ip-address <ip> - 发送
REPLCONF capca eof capa psync2,主要是为了兼容不同版本,master根据这些对slave采取不同的操作 - 开始
PSYNC
PSYNC
Redis使用Replication ID, offset唯一标识一个数据集:
Replication ID: 在Redis初始化时调用changeReplicationId随机生成的长度为40的字符串server.replid,用于确定Redis实例offset: 已经同步的命令的offset
syncWithMaster()调用slaveTryPartialResynchronization()会发送PSYNC与master同步:
slave如果有server.cached_master,发送PSYNC server.cached_master->replid server.cached_master->reploff+1- 没有,发送
PSYNC ? -1
master收到后,会在syncCommand()中调用masterTryPartialResynchronization()判断是否可以partial resync:
/* Is the replication ID of this master the same advertised by the wannabe
* slave via PSYNC? If the replication ID changed this master has a
* different replication history, and there is no way to continue.
*
* Note that there are two potentially valid replication IDs: the ID1
* and the ID2. The ID2 however is only valid up to a specific offset. */
if (strcasecmp(master_replid, server.replid) &&
(strcasecmp(master_replid, server.replid2) ||
psync_offset > server.second_replid_offset))
{
/* ... */
goto need_full_resync;
}
/* We still have the data our slave is asking for? */
if (!server.repl_backlog ||
psync_offset < server.repl_backlog_off ||
psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
{
/* ... */
goto need_full_resync;
}
只有slave发送的master_replid和master的replid相同,且repl_off在master的repl_backlog有效数据范围内,才会进行paritial resync。
只有到PSYNC阶段才会标记client为slave:
Full sync:c->replstate = SLAVE_STATE_WAIT_BGSAVE_START; if (server.repl_disable_tcp_nodelay) anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */ c->repldbfd = -1; c->flags |= CLIENT_SLAVE; listAddNodeTail(server.slaves,c);Partial resync:c->flags |= CLIENT_SLAVE; c->replstate = SLAVE_STATE_ONLINE; c->repl_ack_time = server.unixtime; c->repl_put_online_on_ack = 0; listAddNodeTail(server.slaves,c);
Full Sync
确定Full Sync后会尝试创建rdb,有这几种情况:
- 如果有
disk类型rdb并且有其他的slave处于SLAVE_STATE_WAIT_BGSAVE_END状态,会复制该slave的output buffer,等待同一个rdb完成,发送+FULLRESYNC server.replid slave->psync_initial_offset。 没有slave处于该状态会在replicationCron()等待触发rdb - 如果有
socket类型rdb,等待完成,在replicationCron()中再触发rdb - 当前没有
rdb:socket: 等待repl-diskless-sync-delay秒,为了一次性sync多个slavedisk: 调用startBgSaveForReplication()开始rdb
disk
master调用startBgSaveForReplication()同时处理disk和socket类型的rdb,disk类型会调用replicationSetupSlaveForResync()设置
slave->psync_initial_offset = server.master_repl_offsetserver->replstate = SLAVE_STATE_WAIT_BGSAVE_END- 发送
+FULLRESYNC server.replid server.master_repl_offset
slave收到FULLRESYNC replid offset后:
- 设置
server.master_replid = replid - 设置
server.master_initial_offset = offset - 创建
temp rdb文件,注册可读事件readSyncBulkPayload()代替syncWithMaster() - 更新
server.repl_state = REPL_STATE_TRANSFER
当master rdb完成后,会更新每个等待rdb完成的slave状态为SLAVE_STATE_SEND_BULK,注册写事件sendBulkToSlave():
/* serverCron()->backgroundSaveDoneHandler()->backgroundSaveDoneHandlerDisk()->updateSlavesWaitingBgsave() */
if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
redis_fstat(slave->repldbfd,&buf) == -1) {
freeClient(slave);
serverLog(LL_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
continue;
}
slave->repldboff = 0;
slave->repldbsize = buf.st_size;
slave->replstate = SLAVE_STATE_SEND_BULK;
slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
(unsigned long long) slave->repldbsize);
aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
freeClient(slave);
continue;
}
master写事件sendBulkToSlave()会首先发送slave->replpreamble: rdb文件的大小,为了防止阻塞,然后每次发送最多PROTO_IOBUF_LEN的数据,当全部发送完毕后,调用putSlaveOnline(),
更新slave->replstate = SLAVE_STATE_ONLINE,注册写事件sendReplyToClient()。
slave读事件readSyncBulkPayload()可以处理dist和socket两种类型的Full sync。和sendBulkToSlave()对应着,首先读取replpreamble,保存在server.repl_transfer_size,然后每次
最多读取4096 bytes数据并写入server.repl_transfer_fd,当全部接收完会作如下操作:
emptyDb(-1...): 清空所有db- 删除读事件
rename(server.repl_transfer_tempfile, server.rdb_filename), 调用rdbLoad()- 调用
replicationCreateMasterClient()创建server.master - 更新状态
sever.repl_state = REPL_STATE_CONNECTED - 设置
server.replid为master的replid - 清空
replid2 - 创建
repl_backlog
此时,Full sync结束。
socket
disk类型需要先创建rdb文件,然后每次发送的时候都需要lseek()、read(),然后再发送给slave,当网卡速度比磁盘速度快的时候可以设置repl-diskless-sync yes使用socket类型full sync。
socket类型的rdb无法像disk一样服用rdb,所以为了服务多个slave,会等待repl-diskless-sync-delay秒:
/* replicationCron() */
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
idle = server.unixtime - slave->lastinteraction;
if (idle > max_idle) max_idle = idle;
slaves_waiting++;
mincapa = (mincapa == -1) ? slave->slave_capa :
(mincapa & slave->slave_capa);
}
}
if (slaves_waiting &&
(!server.repl_diskless_sync ||
max_idle > server.repl_diskless_sync_delay))
{
/* Start the BGSAVE. The called function may start a
* BGSAVE with socket target or disk target depending on the
* configuration and slaves capabilities. */
startBgsaveForReplication(mincapa);
}
和普通rdb过程不一样的是,rio不再以文件初始化,而是以socket初始化,每次调用rioWrite()->rioFdsetWrite()会给每个socket发送,只要有一个成功就算成功。
为了编程简单,每个socket会设为阻塞,并设置超时时间:
/* Put the socket in blocking mode to simplify RDB transfer.
* We'll restore it when the children returns (since duped socket
* will share the O_NONBLOCK attribute with the parent). */
anetBlock(NULL,slave->fd);
anetSendTimeout(NULL,slave->fd,server.repl_timeout*1000);
SO_SNDTIMEO选项之前不知道,记录一下。当发送超时时,会返回EWOULDBLOCK:
/* With blocking sockets, which is the sole user of this
* rio target, EWOULDBLOCK is returned only because of
* the SO_SNDTIMEO socket option, so we translate the error
* into one more recognizable by the user. */
/* Set the socket send timeout (SO_SNDTIMEO socket option) to the specified
* number of milliseconds, or disable it if the 'ms' argument is zero. */
int anetSendTimeout(char *err, int fd, long long ms) {
struct timeval tv;
tv.tv_sec = ms/1000;
tv.tv_usec = (ms%1000)*1000;
if (setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) == -1) {
anetSetError(err, "setsockopt SO_SNDTIMEO: %s", strerror(errno));
return ANET_ERR;
}
return ANET_OK;
}
rdb格式也发生了变化,不像文件一样可以知道大小,增加了前缀和后缀用于判断结束:
prefix:$EOF:<40 bytes unguessable hex string>\r\nsuffix:<40 bytes unguessable hex string>
slave流程和disk一样。而rdb子进程结束也就意味着socket类型full sync结束,在backgroundSaveDoneHandlerSocket中会通过pipe判断所有slave的状态,然后恢复slave设置或关闭连接,设置slave状态:
/* Note: we wait for a REPLCONF ACK message from slave in
* order to really put it online (install the write handler
* so that the accumulated data can be transfered). However
* we change the replication state ASAP, since our slave
* is technically online now. */
slave->replstate = SLAVE_STATE_ONLINE;
slave->repl_put_online_on_ack = 1;
slave->repl_ack_time = server.unixtime; /* Timeout otherwise. */
Partial Resync
Partial Resync只需发送slave缺少的命令。master做如下操作:
- 设置
slave状态:c->flags |= CLIENT_SLAVE; c->replstate = SLAVE_STATE_ONLINE; c->repl_ack_time = server.unixtime; c->repl_put_online_on_ack = 0; listAddNodeTail(server.slaves,c); - 发送
+CONTINUE server.replid - 发送
repl_backlog中部分命令
slave收到+CONTINUE后会更新相关状态,并注册事件:
replid2 = cached_master.replid,即前一个master的idreplid更新为当前master的id- 创建
server.master,注册读事件readQueryFromClient(),所以当master发送增量写命令时,会像普通client一样对待
sync过程中的write
在sync过程中,master会收到写命令,需要同步给slave。在full sync和partial sync中行为有些不同,在开始sync时,master都会将
slave追加到server.slaves,但是状态不同:
full sync:SLAVE_STATE_WAIT_BGSAVE_*partial resync:SLAVE_STATE_ONLINE
master收到写操作会调用propagate()->replicationFeedSlaves()将命令发送给每个slave,真正写入buffer和普通client一样,会首先调用prepareClientToWrite():
/* addReply()->prepareClientToWrite() */
/* Schedule the client to write the output buffers to the socket only
* if not already done (there were no pending writes already and the client
* was yet not flagged), and, for slaves, if the slave can actually
* receive writes at this stage. */
if (!clientHasPendingReplies(c) &&
!(c->flags & CLIENT_PENDING_WRITE) &&
(c->replstate == REPL_STATE_NONE ||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
{
/* Here instead of installing the write handler, we just flag the
* client and put it into a list of clients that have something
* to write to the socket. This way before re-entering the event
* loop, we can try to directly write to the client sockets avoiding
* a system call. We'll only really install the write handler if
* we'll not be able to write the whole reply at once. */
c->flags |= CLIENT_PENDING_WRITE;
listAddNodeHead(server.clients_pending_write,c);
}
可以看到只有处于ONLINE状态的才会真正发送,其余状态的只会写入buffer。所以full sync需要主动将累计在buffer的命令发送给slave,否则一直没有写命令的话,slave数据会一直滞后:
disk: 在全部发送完毕后,调用putSaveOnline(), 更新slave->replstate = SLAVE_STATE_ONLINE,注册写事件sendReplyToClient(),将buffer中数据发送给slavesocket: 更新slave->replstate = SLAVE_STATE_ONLINE,设置slave->repl_put_online_on_ack = 1(disk为0,paritial也为0)。slave会在replicationCron()中发送REPLCONF ACK reploff给master,master会在这时调用putSlaveOnline()。为什么要多一步呢?没看出来原因。
当sync完成后(不管是full sync还是partial resync),此时master对于slave来说会像普通client一样,读取写命令,保持dataset的一致(最终一致性)。
增量数据
简单来说,类似aof的机制,master收到写操作会调用propagate()->replicationFeedSlaves()将命令写入到server.repl_backlog中,
并且发送给每个slave进行同步。
server.repl_backlog是一个环形缓冲区,大小通过repl-backlog-size配置,默认为1mb:
/* Add data to the replication backlog.
* This function also increments the global replication offset stored at
* server.master_repl_offset, because there is no case where we want to feed
* the backlog without incrementing the offset. */
void feedReplicationBacklog(void *ptr, size_t len) {
unsigned char *p = ptr;
server.master_repl_offset += len;
/* This is a circular buffer, so write as much data we can at every
* iteration and rewind the "idx" index if we reach the limit. */
while(len) {
size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;
if (thislen > len) thislen = len;
memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);
server.repl_backlog_idx += thislen;
if (server.repl_backlog_idx == server.repl_backlog_size)
server.repl_backlog_idx = 0;
len -= thislen;
p += thislen;
server.repl_backlog_histlen += thislen;
}
if (server.repl_backlog_histlen > server.repl_backlog_size)
server.repl_backlog_histlen = server.repl_backlog_size;
/* Set the offset of the first byte we have in the backlog. */
server.repl_backlog_off = server.master_repl_offset -
server.repl_backlog_histlen + 1;
}
从上面写入server.repl_backlog的代码可以看出:
server.master_repl_offset: 写入到repl_backlog的全部字节数server.repl_backlog_idx: 下一个字节写入的位置server.repl_backlog_histlen: 缓冲区中有效数据长度server.repl_backlog_off: 有效数据的起始位置
slave收到master发送的命令,会在readyQueryFromClient()判断是否是master:
- 更新
c->reploff - 调用
replicationFeedSlavesFromMasterStream():- 调用
feedReplicationBacklog(): 更新server.master_repl_offset - 作为代理,将
master发送过来的数据发送给自己的slave(级联slave)
- 调用
replicationCron()
在serverCron()中每秒调用一次replicationCron()处理建立连接和一些周期性任务:
- 超时: 连接超时、
rdb传输超时、masterping超时 server.repl_state == REPL_STATE_CONNECT时,建立连接。连接使用非阻塞connect,并注册可读可写文件事件syncWithMaster(),设置server.repl_state = REPL_STATE_CONNECTINGslave发送REPLCONF ACK。master用于更新slave->repl_ack_off,用在Wait命令中,实现同步复制,同时更新slave->repl_ack_time用于超时master定期(repl-ping-slave-period)发送ping,slave可以根据server.master.lastinteraction判断master超时- 处理超时
slave - 根据
repl-backlog-ttl,释放repl_back - 创建
rdb
当出现超时或套接字关闭时,会调用freeClient()释放连接,master和slave有不同的操作:
master: 关闭连接,释放client,slave会读0slave: 在释放master之前,会调用replicationCacheMaster()将master状态保存在server.cached_master,然后调用replicationHandleMasterDisconnection()更新状态为REPL_STATE_CONNECT, 会在之后尝试重连,并发送server.cached_master状态尝试paritial resync
可以看到,在连接已经成功建立后,master不会再对slave的reploff进行判断是否要进行full sync。Redis依赖tcp的“可靠性”,认为要么超时、要么报错,否则数据一定会到达slave。
重启
rdbPopulateSaveInfo()在下面几种情况下会返回非NULL:
- 是
master - 是
slave - 有
cached_master
在rdbSaveInfoAuxFields()会写入rsi相关信息,以便重启后能进行parital resync:
if (rsi) {
if (rdbSaveAuxFieldStrInt(rdb,"repl-stream-db",rsi->repl_stream_db)
== -1) return -1;
if (rdbSaveAuxFieldStrStr(rdb,"repl-id",server.replid)
== -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",server.master_repl_offset)
== -1) return -1;
}
启动时,如果配置了slaveof,会设置server状态:
server.masterhost = sdsnew(argv[1]);
server.masterport = atoi(argv[2]);
server.repl_state = REPL_STATE_CONNECT;
在loadDataFromDisk()中会读取rdb的rsi恢复replication信息,同时以当前信息创建cached_master,在后续的syncWithMaster()会发送cached_master信息,尝试resync:
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
if (rdbLoad(server.rdb_filename,&rsi) == C_OK) {
serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds",
(float)(ustime()-start)/1000000);
/* Restore the replication ID / offset from the RDB file. */
if (server.masterhost &&
rsi.repl_id_is_set &&
rsi.repl_offset != -1 &&
/* Note that older implementations may save a repl_stream_db
* of -1 inside the RDB file in a wrong way, see more information
* in function rdbPopulateSaveInfo. */
rsi.repl_stream_db != -1)
{
memcpy(server.replid,rsi.repl_id,sizeof(server.replid));
server.master_repl_offset = rsi.repl_offset;
/* If we are a slave, create a cached master from this
* information, in order to allow partial resynchronizations
* with masters. */
replicationCacheMasterUsingMyself();
selectDb(server.cached_master,rsi.repl_stream_db);
}
}
failover
在Redis4.0之前,进行主从切换failover需要全量同步,耗费资源,而且大多数情况数据集都保持一直,没有必要重新同步。Redis4.0之后解决了这个问题。
假设A为master,B为slave,进行主从切换:
- 发送
slaveof no one给B:B会调用shiftReplicationId():memcpy(server.replid2,server.replid,sizeof(server.replid)); server.second_replid_offset = server.master_repl_offset+1; changeReplicationId();所以
server.replid2和server.second_replid_offset保存了与之前master的同步进度,同时server.repl_backlog还保存着之前master同步过来的数据,用于之后的parital resync。 - 发送
slaveof B.ip B.port给A:A会调用replicationSetMaster()->replicationCacheMasterUsingMyself()以自己状态创建cached_master,用于之后的同步:server.cached_master.reploff = server.master_repl_offset; server.cached_master.repliid = server.master_replid;
再回顾一下slave发送PSYNC和master判断Partial resync的代码:
slave:if (server.cached_master) { psync_replid = server.cached_master->replid; snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1); serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset); }master:if (strcasecmp(master_replid, server.replid) && (strcasecmp(master_replid, server.replid2) || psync_offset > server.second_replid_offset)) { /* ... */ goto need_full_resync; } /* We still have the data our slave is asking for? */ if (!server.repl_backlog || psync_offset < server.repl_backlog_off || psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)) { /* ... */ goto need_full_resync; }
可以看到A以自己的状态发送给B尝试重连,而B同样会根据之前A的状态,判断是否需要full sync,当然如果在这期间有很多写操作,更新了B的repl_backlog,还是会full sync。
要注意如果以这种方式进行failover,如果在两次slaveof命令之间又有新的写入到旧的master可能会丢失数据,而且需要full sync。cluster模式的手动failover会在过程中拒绝master的流量。
expire
slave不会主动进行expire,在调用activeExpireCycle()前会先判断是否存在master,存在则不进行。同样expireIfNeeded()只会返回逻辑上的过期,不会删key:
/* If we are running in the context of a slave, return ASAP:
* the slave key expiration is controlled by the master that will
* send us synthesized DEL operations for expired keys.
*
* Still we try to return the right information to the caller,
* that is, 0 if we think the key should be still valid, 1 if
* we think the key is expired at this time. */
if (server.masterhost != NULL) return now > when;
master会调用propagateExpire()将相应的del操作发送给slave来达到一致,因为del操作同样需要查找key,所以slave对master的client做特殊对待:
robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
robj *val;
if (expireIfNeeded(db,key) == 1) {
/* Key expired. If we are in the context of a master, expireIfNeeded()
* returns 0 only when the key does not exist at all, so it's safe
* to return NULL ASAP. */
if (server.masterhost == NULL) return NULL;
/* However if we are in the context of a slave, expireIfNeeded() will
* not really try to expire the key, it only returns information
* about the "logical" status of the key: key expiring is up to the
* master in order to have a consistent view of master's data set.
*
* However, if the command caller is not the master, and as additional
* safety measure, the command invoked is a read-only command, we can
* safely return NULL here, and provide a more consistent behavior
* to clients accessign expired values in a read-only fashion, that
* will say the key as non exisitng.
*
* Notably this covers GETs when slaves are used to scale reads. */
if (server.current_client &&
server.current_client != server.master &&
server.current_client->cmd &&
server.current_client->cmd->flags & CMD_READONLY)
{
return NULL;
}
}
val = lookupKey(db,key,flags);
if (val == NULL)
server.stat_keyspace_misses++;
else
server.stat_keyspace_hits++;
return val;
}
evict
slave还是会进行evict的操作,同时master也会propagate()给slave它evict的key。这会有风险:
- 当
slave和mastermaxmemory相同,且slave为readonly时:所有的写操作都是由master同步过来,所以slave不会evcit,数据集保持一致 - 当
slavemaxmemory比master小时,会造成数据集不一致
当然没有理由设置不同的maxmemory。
readonly
slave默认为readonly,读写分离能提升qps:
/* Don't accept write commands if this is a read only slave. But
* accept write commands if this is our master. */
if (server.masterhost && server.repl_slave_ro &&
!(c->flags & CLIENT_MASTER) &&
c->cmd->flags & CMD_WRITE)
{
addReply(c, shared.roslaveerr);
return C_OK;
}
slave也可以配置为可写,可以用来操作比较耗时的操作,此时master和slave数据集可能会不一致,重启或重连后再次同步。
在4.0版本之前,可写slave无法设置key过期,因为slave只会等待master传递过期,这会导致key leak,不会获取到但是还存在。Redis4.0会将slave设置的key保存在
dict *slaveKeyWithExpire中,单独进行过期。
留下评论