Redis源码阅读(二) – client

initServer()中调用listenToPort()创建了多个监听套接字,用于接受客户端连接。listenToPort()使用getaddrinfo()获取地址,可以实现协议无关的网络编程,同时将 监听套接字设为NonBlock避免accept()阻塞。为监听套接字创建的事件acceptTcpHandler(),做了下面事情:

  1. 接受连接
  2. 创建client: createClient()
  3. 添加事件: readQueryFromClient()

Client

TCP是流式协议,不能保证输入输出的完整性,所以需要维护连接的状态,主要就是输入和输出缓冲区。client结构比较复杂,和单节点相关的大概有下面一些元素:

/* With multiplexing we need to take per-client state.
 * Clients are taken in a linked list. */
typedef struct client {
    int fd;                 /* Client socket. */
    redisDb *db;            /* Pointer to currently SELECTed DB. */
    sds querybuf;           /* Buffer we use to accumulate client queries. */
    size_t querybuf_peak;   /* Recent (100ms or more) peak of querybuf size. */
    int argc;               /* Num of arguments of current command. */
    robj **argv;            /* Arguments of current command. */
    struct redisCommand *cmd, *lastcmd;  /* Last command executed. */
    int reqtype;            /* Request protocol type: PROTO_REQ_* */
    int multibulklen;       /* Number of multi bulk arguments left to read. */
    long bulklen;           /* Length of bulk argument in multi bulk request. */
    list *reply;            /* List of reply objects to send to the client. */
    unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
    size_t sentlen;         /* Amount of bytes already sent in the current
                               buffer or object being sent. */
    time_t ctime;           /* Client creation time. */
    time_t lastinteraction; /* Time of the last interaction, used for timeout */
    /* Response buffer */
    int bufpos;
    char buf[PROTO_REPLY_CHUNK_BYTES];
} client;

client中主要有下面一些元素:

  • 输入缓冲区: querybuf
  • 与解析请求相关的元素
  • 输出缓冲区: buf

RESP

TCP网络编程需要自己设定通信协议,常见的有:

  1. 以分隔符划分完整的元素,如\r\n
  2. header + content结构:header一般保存着content的长度,header可以是固定字节长度或者按照分隔符划分

Reids使用RESP协议通信,详见文档RESP是二进制安全(binary safe)的,因为不对任何字符做特殊解释,C语言依赖'\0'作为字符串的结束, 所以C语言字符串中不能有'\0',就是二进制不安全的。RESP有下面几种类型:

Type Encoding
Simple String +{content}\r\n
Error -{error type} {content}\r\n
Integer :{number}\r\n
Bulk String ${length}\r\n{content}\r\n
Array *{array length}\r\n{各类型元素 * {array length}}

客户端发送命令有2种格式:

  1. A client sends to the Redis server a RESP Array consisting of just Bulk Strings.
  2. inline command: space-separated arguments

inline command主要为了类似telnet之类的连接Redis,客户端需要实现RESP协议。处理命令的流程如下:

  1. readQueryFromClient()读取输入到c->querybuf
  2. proccessInputBuffer(): 根据reqtype执行processInlineBuffer()processMultibulkBuffer()解析request,设置argcargv
  3. processCommand(): 调用命令

使用\r\n作为分隔符,非常容易解析,querybuf中包含\r\n意味着是一个完整的部分,才会进行解析。

解析inline command

主要是由sdssplitargs()这个函数来解析,源码中有下面几个注意点:

  1. double quote "包围的会进行转义字符的转换,也会按照16进制保存\xXX
  2. single quote '包围的只会对\'做转义

解析RESP Array of Bulk Strings

首先是解析*{array length}存入c->multibulklen,然后解析c->multibulklenbulk string,每个bulk string的长度会 存在c->bulklen。最后完整的命令会存入c->argcc->argv中。

缓冲区管理

解析协议很简单,对于C语言来说,比较麻烦的倒是缓冲区的管理,定长的缓冲区会带来缓冲区不够的问题,动态增长又有可能不够高效而且也会造成 空间浪费,Redis使用sds作为输入缓冲区,实现了高效的缓冲区管理。

输入缓冲区

输入缓冲区为sds querybufquerybuf是一个动态增长的单个缓冲区,既需要读入也需要解析,所以需要记录两个位置,一是写入的起点sdslen(c->querybuf),二是解析的起点c->querybufRedis使用sdsrange()确保每次解析的起点都是c->querybuf

输入缓冲区可能会增长过大,导致内存浪费,在serverCron中会调整queyrbuf的大小:

/* The client query buffer is an sds.c string that can end with a lot of
 * free space not used, this function reclaims space if needed.
 *
 * The function always returns 0 as it never terminates the client. */
int clientsCronResizeQueryBuffer(client *c) {
    size_t querybuf_size = sdsAllocSize(c->querybuf);
    time_t idletime = server.unixtime - c->lastinteraction;

    /* There are two conditions to resize the query buffer:
     * 1) Query buffer is > BIG_ARG and too big for latest peak.
     * 2) Client is inactive and the buffer is bigger than 1k. */
    if (((querybuf_size > PROTO_MBULK_BIG_ARG) &&
         (querybuf_size/(c->querybuf_peak+1)) > 2) ||
         (querybuf_size > 1024 && idletime > 2))
    {
        /* Only resize the query buffer if it is actually wasting space. */
        if (sdsavail(c->querybuf) > 1024) {
            c->querybuf = sdsRemoveFreeSpace(c->querybuf);
        }
    }
    /* Reset the peak again to capture the peak memory usage in the next
     * cycle. */
    c->querybuf_peak = 0;
    return 0;
}

输出缓冲区

输出缓冲区分为2个部分:

  1. 静态的数组:char buf[PROTO_REPLY_CHUNK_BYTES]
  2. 动态的链表:list *reply

首先将response写入buf中,由于buf是定长的,当空间不够用时,会添加到reply中,其中的每个node的大小也限制在PROTO_REPLY_CHUNK_BYTES,输出时会首先输出buf, 然后是reply,同时释放写完的listNode

同样的输出缓冲区也需要记录两个位置:

  1. 写入缓冲区的位置:
    • buf: bufpos
    • reply: sdslen()
  2. 输出给客户端的位置: c->sentlen

缓冲区限制

  • 输入: 当querybuf大小超过server.client_max_querybuf_len时,会立即关闭客户端,默认大小为PROTO_MAX_QUERYBUF_LEN 1GB
  • 输出: 输出缓冲区由server.client_obuf_limits[]限制。当大小超过hard_limit_bytes或超过soft_limit_bytes一定时间(soft_limit_seconds)时会被异步关闭freeClientAsync()。 要注意此时并没有禁止继续添加response,并且会在beforeSleep()中写了一部分response才会在serverCron()中释放连接,这会导致客户端接收到不完整的stream

Command

Redisdict存放命令在server.commands。在initServerConfig()中会用硬编码的redisCommandTable[]初始化server.commands。当命令解析完成后, 会调用processCommand()处理命令,流程如下:

  1. lookupCommand()查找命令,并检查参数合法性
  2. 一些状态的检查
  3. 最后才是调用命令: call()

Response

一般的水平触发epoll发送相应流程如下:

  1. 解析完一条完整的命令,将结果保存在缓冲区
  2. 注册EPOLLOUT,等待事件处理
  3. 当响应发送完毕,删除事件

为什么不可以一开始就注册写事件呢?因为使用LT水平触发模式,会一直返回可写条件,造成资源不必要的浪费。但是在上面的处理过程中,除了真正的I/O操作,还包含了 2次系统调用epoll_ctl(),为了减少epoll_ctl()的调用可以使用EPOLLET模式,一开始就注册写事件,但是ET模式代码容易写错。Reids使用LT模式,并做了一些优化, 流程如下:

  • 调用addReply()之类的函数时,会首先调用prepareClientToWrite():
/* Here instead of installing the write handler, we just flag the
    * client and put it into a list of clients that have something
    * to write to the socket. This way before re-entering the event
    * loop, we can try to directly write to the client sockets avoiding
    * a system call. We'll only really install the write handler if
    * we'll not be able to write the whole reply at once. */
c->flags |= CLIENT_PENDING_WRITE;
listAddNodeHead(server.clients_pending_write,c);
  • beforeSleep()中调用handleClientsWithPendingWrites()response:
/* This function is called just before entering the event loop, in the hope
 * we can just write the replies to the client output buffer without any
 * need to use a syscall in order to install the writable event handler,
 * get it called, and so forth. */
int handleClientsWithPendingWrites(void) {
    listIter li;
    listNode *ln;
    int processed = listLength(server.clients_pending_write);

    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;
        listDelNode(server.clients_pending_write,ln);

        /* Try to write buffers to the client socket. */
        if (writeToClient(c->fd,c,0) == C_ERR) continue;

        /* If there is nothing left, do nothing. Otherwise install
         * the write handler. */
        if (clientHasPendingReplies(c) &&
            aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
                sendReplyToClient, c) == AE_ERR)
        {
            freeClientAsync(c);
        }
    }
    return processed;
}
  • writeToClient()会首先写c->buf,之后是c->reply。为了防止饥饿,每次写限制在NET_MAX_WRITES_PER_EVENT
  • 一次写不完时,才会注册写事件sendReplyToClient():
/* Write event handler. Just send data to the client. */
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    UNUSED(el);
    UNUSED(mask);
    writeToClient(fd,privdata,1);
}
  • 写完后会删除写事件

Redis使用这种方法减少了epoll_ctl()系统调用,提高了性能。

连接管理

acceptCommonHandler()中调用createClient()会将新连接的客户端添加到server.clientsserver.clients链表长度代表了接入的客户端数量, 当超过maxclients时,会立即关闭,默认为10000

客户端释放有下面几种方式:

  1. 直接调用freeClient()
  2. 当出错时,比如协议错误,会标记为CLIENT_CLOSE_AFTER_REPLY,会在writeToClient()发送完响应后会关闭客户端连接
  3. 当后续还有些客户端操作,但也需要释放客户端时,需要异步释放: freeClientAsync()会将客户端添加到server.clients_to_close并标记CLIENT_CLOSE_ASAP, 之后在serverCron()中调用freeClientsInAsyncFreeQueue()释放
  4. 当空闲时间过长时,会在serverCron()clientsCron()调用clientsCronHandleTimeout()释放

分类:

更新时间:

留下评论