Redis源码阅读(二) – client
在initServer()中调用listenToPort()创建了多个监听套接字,用于接受客户端连接。listenToPort()使用getaddrinfo()获取地址,可以实现协议无关的网络编程,同时将
监听套接字设为NonBlock避免accept()阻塞。为监听套接字创建的事件acceptTcpHandler(),做了下面事情:
- 接受连接
- 创建
client:createClient() - 添加事件:
readQueryFromClient()
Client
TCP是流式协议,不能保证输入输出的完整性,所以需要维护连接的状态,主要就是输入和输出缓冲区。client结构比较复杂,和单节点相关的大概有下面一些元素:
/* With multiplexing we need to take per-client state.
* Clients are taken in a linked list. */
typedef struct client {
int fd; /* Client socket. */
redisDb *db; /* Pointer to currently SELECTed DB. */
sds querybuf; /* Buffer we use to accumulate client queries. */
size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */
int argc; /* Num of arguments of current command. */
robj **argv; /* Arguments of current command. */
struct redisCommand *cmd, *lastcmd; /* Last command executed. */
int reqtype; /* Request protocol type: PROTO_REQ_* */
int multibulklen; /* Number of multi bulk arguments left to read. */
long bulklen; /* Length of bulk argument in multi bulk request. */
list *reply; /* List of reply objects to send to the client. */
unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
size_t sentlen; /* Amount of bytes already sent in the current
buffer or object being sent. */
time_t ctime; /* Client creation time. */
time_t lastinteraction; /* Time of the last interaction, used for timeout */
/* Response buffer */
int bufpos;
char buf[PROTO_REPLY_CHUNK_BYTES];
} client;
client中主要有下面一些元素:
- 输入缓冲区:
querybuf - 与解析请求相关的元素
- 输出缓冲区:
buf
RESP
TCP网络编程需要自己设定通信协议,常见的有:
- 以分隔符划分完整的元素,如
\r\n header + content结构:header一般保存着content的长度,header可以是固定字节长度或者按照分隔符划分
Reids使用RESP协议通信,详见文档。RESP是二进制安全(binary safe)的,因为不对任何字符做特殊解释,C语言依赖'\0'作为字符串的结束,
所以C语言字符串中不能有'\0',就是二进制不安全的。RESP有下面几种类型:
| Type | Encoding |
|---|---|
| Simple String | +{content}\r\n |
| Error | -{error type} {content}\r\n |
| Integer | :{number}\r\n |
| Bulk String | ${length}\r\n{content}\r\n |
| Array | *{array length}\r\n{各类型元素 * {array length}} |
客户端发送命令有2种格式:
- A client sends to the Redis server a RESP Array consisting of just Bulk Strings.
inline command: space-separated arguments
inline command主要为了类似telnet之类的连接Redis,客户端需要实现RESP协议。处理命令的流程如下:
readQueryFromClient()读取输入到c->querybufproccessInputBuffer(): 根据reqtype执行processInlineBuffer()或processMultibulkBuffer()解析request,设置argc和argvprocessCommand(): 调用命令
使用\r\n作为分隔符,非常容易解析,querybuf中包含\r\n意味着是一个完整的部分,才会进行解析。
解析inline command
主要是由sdssplitargs()这个函数来解析,源码中有下面几个注意点:
- 由
double quote "包围的会进行转义字符的转换,也会按照16进制保存\xXX - 由
single quote '包围的只会对\'做转义
解析RESP Array of Bulk Strings
首先是解析*{array length}存入c->multibulklen,然后解析c->multibulklen个bulk string,每个bulk string的长度会
存在c->bulklen。最后完整的命令会存入c->argc和c->argv中。
缓冲区管理
解析协议很简单,对于C语言来说,比较麻烦的倒是缓冲区的管理,定长的缓冲区会带来缓冲区不够的问题,动态增长又有可能不够高效而且也会造成
空间浪费,Redis使用sds作为输入缓冲区,实现了高效的缓冲区管理。
输入缓冲区
输入缓冲区为sds querybuf。querybuf是一个动态增长的单个缓冲区,既需要读入也需要解析,所以需要记录两个位置,一是写入的起点sdslen(c->querybuf),二是解析的起点c->querybuf。
Redis使用sdsrange()确保每次解析的起点都是c->querybuf。
输入缓冲区可能会增长过大,导致内存浪费,在serverCron中会调整queyrbuf的大小:
/* The client query buffer is an sds.c string that can end with a lot of
* free space not used, this function reclaims space if needed.
*
* The function always returns 0 as it never terminates the client. */
int clientsCronResizeQueryBuffer(client *c) {
size_t querybuf_size = sdsAllocSize(c->querybuf);
time_t idletime = server.unixtime - c->lastinteraction;
/* There are two conditions to resize the query buffer:
* 1) Query buffer is > BIG_ARG and too big for latest peak.
* 2) Client is inactive and the buffer is bigger than 1k. */
if (((querybuf_size > PROTO_MBULK_BIG_ARG) &&
(querybuf_size/(c->querybuf_peak+1)) > 2) ||
(querybuf_size > 1024 && idletime > 2))
{
/* Only resize the query buffer if it is actually wasting space. */
if (sdsavail(c->querybuf) > 1024) {
c->querybuf = sdsRemoveFreeSpace(c->querybuf);
}
}
/* Reset the peak again to capture the peak memory usage in the next
* cycle. */
c->querybuf_peak = 0;
return 0;
}
输出缓冲区
输出缓冲区分为2个部分:
- 静态的数组:
char buf[PROTO_REPLY_CHUNK_BYTES] - 动态的链表:
list *reply
首先将response写入buf中,由于buf是定长的,当空间不够用时,会添加到reply中,其中的每个node的大小也限制在PROTO_REPLY_CHUNK_BYTES,输出时会首先输出buf,
然后是reply,同时释放写完的listNode。
同样的输出缓冲区也需要记录两个位置:
- 写入缓冲区的位置:
buf:bufposreply:sdslen()
- 输出给客户端的位置:
c->sentlen
缓冲区限制
- 输入: 当
querybuf大小超过server.client_max_querybuf_len时,会立即关闭客户端,默认大小为PROTO_MAX_QUERYBUF_LEN 1GB - 输出: 输出缓冲区由
server.client_obuf_limits[]限制。当大小超过hard_limit_bytes或超过soft_limit_bytes一定时间(soft_limit_seconds)时会被异步关闭freeClientAsync()。 要注意此时并没有禁止继续添加response,并且会在beforeSleep()中写了一部分response才会在serverCron()中释放连接,这会导致客户端接收到不完整的stream
Command
Redis以dict存放命令在server.commands。在initServerConfig()中会用硬编码的redisCommandTable[]初始化server.commands。当命令解析完成后,
会调用processCommand()处理命令,流程如下:
lookupCommand()查找命令,并检查参数合法性- 一些状态的检查
- 最后才是调用命令:
call()
Response
一般的水平触发epoll发送相应流程如下:
- 解析完一条完整的命令,将结果保存在缓冲区
- 注册
EPOLLOUT,等待事件处理 - 当响应发送完毕,删除事件
为什么不可以一开始就注册写事件呢?因为使用LT水平触发模式,会一直返回可写条件,造成资源不必要的浪费。但是在上面的处理过程中,除了真正的I/O操作,还包含了
2次系统调用epoll_ctl(),为了减少epoll_ctl()的调用可以使用EPOLLET模式,一开始就注册写事件,但是ET模式代码容易写错。Reids使用LT模式,并做了一些优化,
流程如下:
- 调用
addReply()之类的函数时,会首先调用prepareClientToWrite():
/* Here instead of installing the write handler, we just flag the
* client and put it into a list of clients that have something
* to write to the socket. This way before re-entering the event
* loop, we can try to directly write to the client sockets avoiding
* a system call. We'll only really install the write handler if
* we'll not be able to write the whole reply at once. */
c->flags |= CLIENT_PENDING_WRITE;
listAddNodeHead(server.clients_pending_write,c);
- 在
beforeSleep()中调用handleClientsWithPendingWrites()写response:
/* This function is called just before entering the event loop, in the hope
* we can just write the replies to the client output buffer without any
* need to use a syscall in order to install the writable event handler,
* get it called, and so forth. */
int handleClientsWithPendingWrites(void) {
listIter li;
listNode *ln;
int processed = listLength(server.clients_pending_write);
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
listDelNode(server.clients_pending_write,ln);
/* Try to write buffers to the client socket. */
if (writeToClient(c->fd,c,0) == C_ERR) continue;
/* If there is nothing left, do nothing. Otherwise install
* the write handler. */
if (clientHasPendingReplies(c) &&
aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
sendReplyToClient, c) == AE_ERR)
{
freeClientAsync(c);
}
}
return processed;
}
writeToClient()会首先写c->buf,之后是c->reply。为了防止饥饿,每次写限制在NET_MAX_WRITES_PER_EVENT- 一次写不完时,才会注册写事件
sendReplyToClient():
/* Write event handler. Just send data to the client. */
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
UNUSED(el);
UNUSED(mask);
writeToClient(fd,privdata,1);
}
- 写完后会删除写事件
Redis使用这种方法减少了epoll_ctl()系统调用,提高了性能。
连接管理
在acceptCommonHandler()中调用createClient()会将新连接的客户端添加到server.clients。server.clients链表长度代表了接入的客户端数量,
当超过maxclients时,会立即关闭,默认为10000。
客户端释放有下面几种方式:
- 直接调用
freeClient() - 当出错时,比如协议错误,会标记为
CLIENT_CLOSE_AFTER_REPLY,会在writeToClient()发送完响应后会关闭客户端连接 - 当后续还有些客户端操作,但也需要释放客户端时,需要异步释放:
freeClientAsync()会将客户端添加到server.clients_to_close并标记CLIENT_CLOSE_ASAP, 之后在serverCron()中调用freeClientsInAsyncFreeQueue()释放 - 当空闲时间过长时,会在
serverCron()中clientsCron()调用clientsCronHandleTimeout()释放
留下评论