hiredis 极简分析

高性能的服务端要搭配高性能的客户端才能发挥作用。为了方便用户使用,客户端一般提供同步语义,也就是调用返回的时候要得到结果。 我一直比较好奇如何实现高效的同步语义,是简单的用阻塞 I/O 搭配超时,还是用 I/O 多路复用模拟同步?在公司里,Java 客户端使用第一种方式,C++ 客户端使用 第二种方式,性能差不多,可能和实现方式有关。

hiredisRedis 内部也有使用,只提供了最基础的功能,实现也很简单。

同步

hiredis 的同步语义是通过阻塞 I/O 搭配超时实现的,每次命令调用会阻塞直到出错或者成功解析响应。命令会缓冲在输出 buffer 中,在获取响应前才会发送,这样可以实现 pipelinehiredis 也提供了相应的接口将命令添加在输出 buffer 中,在获取响应时一次性发送。

Marshal

Redis 使用 RESP 协议来通信,格式很简单,分为几类,具体可以看 RESP。客户端发送的命令格式是固定的 Bulk StringArrayhiredis 提供了类 printf 的接口,以空格分隔的各个 argvmarshalBulk String,并在头部添加 Array 的长度构成完整的命令。

Unmarshal

比较麻烦的是解析响应,因为 Redis 会返回多种类型的响应,hiredis 返回的响应是如下结构,通过 type 区分,包含了所有类型:

typedef struct redisReply {
    int type; /* REDIS_REPLY_* */
    long long integer; /* The integer when type is REDIS_REPLY_INTEGER */
    size_t len; /* Length of string */
    char *str; /* Used for both REDIS_REPLY_ERROR and REDIS_REPLY_STRING */
    size_t elements; /* number of elements, for REDIS_REPLY_ARRAY */
    struct redisReply **element; /* elements vector for REDIS_REPLY_ARRAY */
} redisReply;

hiredis 将解析响应的模块 redisReader 单独拆了出来。除了 Array 类型,其余类型的响应都很简单,去掉头部和 \r\n 即可,而 Array 可以包含任意类型的元素,包括嵌套 Array, 且不要求每个元素类型相同。Array 的解析使用栈结构,只有 Array 的所有元素解析完成,该 Array 才解析完成,hiredis 也是这样实现的:

typedef struct redisReadTask {
    int type;
    int elements; /* number of elements in multibulk container */
    int idx; /* index in parent (array) object */
    void *obj; /* holds user-generated value for a read task */
    struct redisReadTask *parent; /* parent task */
    void *privdata; /* user-settable arbitrary field */
} redisReadTask;

typedef struct redisReader {
    int err; /* Error flags, 0 when there is no error */
    char errstr[128]; /* String representation of error when applicable */

    char *buf; /* Read buffer */
    size_t pos; /* Buffer cursor */
    size_t len; /* Buffer length */
    size_t maxbuf; /* Max length of unused buffer */

    redisReadTask rstack[9];
    int ridx; /* Index of current read task */
    void *reply; /* Temporary reply pointer */

    redisReplyObjectFunctions *fn;
    void *privdata;
} redisReader;

redisReaderbuf 保存从 Redis 接收到的响应,rstack[] 是解析任务的栈,redisReadTask 是一个解析任务,解析单个 RESP 结构。每个解析任务开始会 pushrstack[],解析完成就 pop,当 rstack[] 为空时,解析完成。hiredis 限制了 rstack[] 的深度,所以嵌套的 Array 最多为 8 层,不过 Redis 目前不会返回嵌套如此深的响应。

异步

hiredis 提供了异步接口,但没有提供完整独立的功能,而是嵌入式的,依托于用户使用的 eventloop,需要用户提供注册事件的接口,hiredisadapters 目录下提供了一些常见网络库的接口,如 libevlibevent, 还包括 Redis 内部的 ae。当使用异步接口时,会把命令添加到输出 buffer 中,并调用用户提供的注册写事件的接口,当写就绪时就会发送请求,并注册读事件处理响应。用户需要提供 callback:

typedef void (redisCallbackFn)(struct redisAsyncContext*, void*, void*);

其中第一个参数是 redisReply,第二个参数是 privdata,当成功接收到响应时就会调用 callbackcallback 以单向链表保存,按照请求顺序依次调用。 异步接口没有超时时间,当连接出错时,hiredis 会传入 NULL reply 调用所有未完成请求的 callbackcallback 通过 reply 来判断请求成功与否。

hiredis 还提供了另外两种 callback:

/* Connection callback prototypes */
typedef void (redisDisconnectCallback)(const struct redisAsyncContext*, int status);
typedef void (redisConnectCallback)(const struct redisAsyncContext*, int status);

hiredis 异步接口使用 nonblocking connect,在第一次读写就绪时会调用的 onConnect callback,并传入连接建立的状态;当连接断开时,会调用 onDisconnect callback,传入的 status 标记是因为出错而断连还是主动要求断连,可以很方便的进行 重连或者清理工作。

因为 hiredisminimalistic client,采用嵌入式的异步接口实现,对应用带来的影响很小,使用也很方便。

Pub/Sub

hiredisdict 保存 channelcallback 的映射,会根据 Redis 传来的 channel 找到对应的 callback 并调用。与同步相比,异步接口的单个连接就可以既发送普通命令,又处理 Pub/Sub 响应。

I/O 多路复用模拟同步

就我所知,一般实现如下:

  • 单独的线程跑 eventloop:负责接收其他线程的请求、发送请求、接收响应。
  • 一般使用 pipe 或者 lock-free 队列在请求线程和 eventloop 线程间传递消息。
  • eventloop 线程接收完整响应后,调用 callback 将响应传递给请求线程。一般实现为请求线程使用 pthread_cond_wait 等待请求完成,eventloop 调用的 callback 中会进行赋值和 pthread_cond_signal

分类:

更新时间: