Redis源码阅读(二) – client
在initServer()
中调用listenToPort()
创建了多个监听套接字,用于接受客户端连接。listenToPort()
使用getaddrinfo()
获取地址,可以实现协议无关的网络编程,同时将
监听套接字设为NonBlock
避免accept()
阻塞。为监听套接字创建的事件acceptTcpHandler()
,做了下面事情:
- 接受连接
- 创建
client
:createClient()
- 添加事件:
readQueryFromClient()
Client
TCP
是流式协议,不能保证输入输出的完整性,所以需要维护连接的状态,主要就是输入和输出缓冲区。client
结构比较复杂,和单节点相关的大概有下面一些元素:
/* With multiplexing we need to take per-client state.
* Clients are taken in a linked list. */
typedef struct client {
int fd; /* Client socket. */
redisDb *db; /* Pointer to currently SELECTed DB. */
sds querybuf; /* Buffer we use to accumulate client queries. */
size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */
int argc; /* Num of arguments of current command. */
robj **argv; /* Arguments of current command. */
struct redisCommand *cmd, *lastcmd; /* Last command executed. */
int reqtype; /* Request protocol type: PROTO_REQ_* */
int multibulklen; /* Number of multi bulk arguments left to read. */
long bulklen; /* Length of bulk argument in multi bulk request. */
list *reply; /* List of reply objects to send to the client. */
unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
size_t sentlen; /* Amount of bytes already sent in the current
buffer or object being sent. */
time_t ctime; /* Client creation time. */
time_t lastinteraction; /* Time of the last interaction, used for timeout */
/* Response buffer */
int bufpos;
char buf[PROTO_REPLY_CHUNK_BYTES];
} client;
client
中主要有下面一些元素:
- 输入缓冲区:
querybuf
- 与解析请求相关的元素
- 输出缓冲区:
buf
RESP
TCP
网络编程需要自己设定通信协议,常见的有:
- 以分隔符划分完整的元素,如
\r\n
header + content
结构:header
一般保存着content
的长度,header
可以是固定字节长度或者按照分隔符划分
Reids
使用RESP
协议通信,详见文档。RESP
是二进制安全(binary safe
)的,因为不对任何字符做特殊解释,C
语言依赖'\0'
作为字符串的结束,
所以C
语言字符串中不能有'\0'
,就是二进制不安全的。RESP
有下面几种类型:
Type | Encoding |
---|---|
Simple String | +{content}\r\n |
Error | -{error type} {content}\r\n |
Integer | :{number}\r\n |
Bulk String | ${length}\r\n{content}\r\n |
Array | *{array length}\r\n{各类型元素 * {array length}} |
客户端发送命令有2种格式:
- A client sends to the Redis server a RESP Array consisting of just Bulk Strings.
inline command
: space-separated arguments
inline command
主要为了类似telnet之类的连接Redis
,客户端需要实现RESP
协议。处理命令的流程如下:
readQueryFromClient()
读取输入到c->querybuf
proccessInputBuffer()
: 根据reqtype
执行processInlineBuffer()
或processMultibulkBuffer()
解析request
,设置argc
和argv
processCommand()
: 调用命令
使用\r\n
作为分隔符,非常容易解析,querybuf
中包含\r\n
意味着是一个完整的部分,才会进行解析。
解析inline command
主要是由sdssplitargs()
这个函数来解析,源码中有下面几个注意点:
- 由
double quote "
包围的会进行转义字符的转换,也会按照16进制保存\xXX
- 由
single quote '
包围的只会对\'
做转义
解析RESP Array of Bulk Strings
首先是解析*{array length}
存入c->multibulklen
,然后解析c->multibulklen
个bulk string
,每个bulk string
的长度会
存在c->bulklen
。最后完整的命令会存入c->argc
和c->argv
中。
缓冲区管理
解析协议很简单,对于C
语言来说,比较麻烦的倒是缓冲区的管理,定长的缓冲区会带来缓冲区不够的问题,动态增长又有可能不够高效而且也会造成
空间浪费,Redis
使用sds
作为输入缓冲区,实现了高效的缓冲区管理。
输入缓冲区
输入缓冲区为sds querybuf
。querybuf
是一个动态增长的单个缓冲区,既需要读入也需要解析,所以需要记录两个位置,一是写入的起点sdslen(c->querybuf)
,二是解析的起点c->querybuf
。
Redis
使用sdsrange()
确保每次解析的起点都是c->querybuf
。
输入缓冲区可能会增长过大,导致内存浪费,在serverCron
中会调整queyrbuf
的大小:
/* The client query buffer is an sds.c string that can end with a lot of
* free space not used, this function reclaims space if needed.
*
* The function always returns 0 as it never terminates the client. */
int clientsCronResizeQueryBuffer(client *c) {
size_t querybuf_size = sdsAllocSize(c->querybuf);
time_t idletime = server.unixtime - c->lastinteraction;
/* There are two conditions to resize the query buffer:
* 1) Query buffer is > BIG_ARG and too big for latest peak.
* 2) Client is inactive and the buffer is bigger than 1k. */
if (((querybuf_size > PROTO_MBULK_BIG_ARG) &&
(querybuf_size/(c->querybuf_peak+1)) > 2) ||
(querybuf_size > 1024 && idletime > 2))
{
/* Only resize the query buffer if it is actually wasting space. */
if (sdsavail(c->querybuf) > 1024) {
c->querybuf = sdsRemoveFreeSpace(c->querybuf);
}
}
/* Reset the peak again to capture the peak memory usage in the next
* cycle. */
c->querybuf_peak = 0;
return 0;
}
输出缓冲区
输出缓冲区分为2个部分:
- 静态的数组:
char buf[PROTO_REPLY_CHUNK_BYTES]
- 动态的链表:
list *reply
首先将response
写入buf
中,由于buf
是定长的,当空间不够用时,会添加到reply
中,其中的每个node
的大小也限制在PROTO_REPLY_CHUNK_BYTES
,输出时会首先输出buf
,
然后是reply
,同时释放写完的listNode
。
同样的输出缓冲区也需要记录两个位置:
- 写入缓冲区的位置:
buf
:bufpos
reply
:sdslen()
- 输出给客户端的位置:
c->sentlen
缓冲区限制
- 输入: 当
querybuf
大小超过server.client_max_querybuf_len
时,会立即关闭客户端,默认大小为PROTO_MAX_QUERYBUF_LEN 1GB
- 输出: 输出缓冲区由
server.client_obuf_limits[]
限制。当大小超过hard_limit_bytes
或超过soft_limit_bytes
一定时间(soft_limit_seconds
)时会被异步关闭freeClientAsync()
。 要注意此时并没有禁止继续添加response
,并且会在beforeSleep()
中写了一部分response
才会在serverCron()
中释放连接,这会导致客户端接收到不完整的stream
Command
Redis
以dict
存放命令在server.commands
。在initServerConfig()
中会用硬编码的redisCommandTable[]
初始化server.commands
。当命令解析完成后,
会调用processCommand()
处理命令,流程如下:
lookupCommand()
查找命令,并检查参数合法性- 一些状态的检查
- 最后才是调用命令:
call()
Response
一般的水平触发epoll
发送相应流程如下:
- 解析完一条完整的命令,将结果保存在缓冲区
- 注册
EPOLLOUT
,等待事件处理 - 当响应发送完毕,删除事件
为什么不可以一开始就注册写事件呢?因为使用LT
水平触发模式,会一直返回可写条件,造成资源不必要的浪费。但是在上面的处理过程中,除了真正的I/O
操作,还包含了
2次系统调用epoll_ctl()
,为了减少epoll_ctl()
的调用可以使用EPOLLET
模式,一开始就注册写事件,但是ET
模式代码容易写错。Reids
使用LT
模式,并做了一些优化,
流程如下:
- 调用
addReply()
之类的函数时,会首先调用prepareClientToWrite()
:
/* Here instead of installing the write handler, we just flag the
* client and put it into a list of clients that have something
* to write to the socket. This way before re-entering the event
* loop, we can try to directly write to the client sockets avoiding
* a system call. We'll only really install the write handler if
* we'll not be able to write the whole reply at once. */
c->flags |= CLIENT_PENDING_WRITE;
listAddNodeHead(server.clients_pending_write,c);
- 在
beforeSleep()
中调用handleClientsWithPendingWrites()
写response
:
/* This function is called just before entering the event loop, in the hope
* we can just write the replies to the client output buffer without any
* need to use a syscall in order to install the writable event handler,
* get it called, and so forth. */
int handleClientsWithPendingWrites(void) {
listIter li;
listNode *ln;
int processed = listLength(server.clients_pending_write);
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
listDelNode(server.clients_pending_write,ln);
/* Try to write buffers to the client socket. */
if (writeToClient(c->fd,c,0) == C_ERR) continue;
/* If there is nothing left, do nothing. Otherwise install
* the write handler. */
if (clientHasPendingReplies(c) &&
aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
sendReplyToClient, c) == AE_ERR)
{
freeClientAsync(c);
}
}
return processed;
}
writeToClient()
会首先写c->buf
,之后是c->reply
。为了防止饥饿,每次写限制在NET_MAX_WRITES_PER_EVENT
- 一次写不完时,才会注册写事件
sendReplyToClient()
:
/* Write event handler. Just send data to the client. */
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
UNUSED(el);
UNUSED(mask);
writeToClient(fd,privdata,1);
}
- 写完后会删除写事件
Redis
使用这种方法减少了epoll_ctl()
系统调用,提高了性能。
连接管理
在acceptCommonHandler()
中调用createClient()
会将新连接的客户端添加到server.clients
。server.clients
链表长度代表了接入的客户端数量,
当超过maxclients
时,会立即关闭,默认为10000。
客户端释放有下面几种方式:
- 直接调用
freeClient()
- 当出错时,比如协议错误,会标记为
CLIENT_CLOSE_AFTER_REPLY
,会在writeToClient()
发送完响应后会关闭客户端连接 - 当后续还有些客户端操作,但也需要释放客户端时,需要异步释放:
freeClientAsync()
会将客户端添加到server.clients_to_close
并标记CLIENT_CLOSE_ASAP
, 之后在serverCron()
中调用freeClientsInAsyncFreeQueue()
释放 - 当空闲时间过长时,会在
serverCron()
中clientsCron()
调用clientsCronHandleTimeout()
释放
留下评论