hiredis 极简分析
高性能的服务端要搭配高性能的客户端才能发挥作用。为了方便用户使用,客户端一般提供同步语义,也就是调用返回的时候要得到结果。
我一直比较好奇如何实现高效的同步语义,是简单的用阻塞 I/O
搭配超时,还是用 I/O
多路复用模拟同步?在公司里,Java
客户端使用第一种方式,C++
客户端使用
第二种方式,性能差不多,可能和实现方式有关。
hiredis
在 Redis
内部也有使用,只提供了最基础的功能,实现也很简单。
同步
hiredis
的同步语义是通过阻塞 I/O
搭配超时实现的,每次命令调用会阻塞直到出错或者成功解析响应。命令会缓冲在输出 buffer
中,在获取响应前才会发送,这样可以实现 pipeline
,
hiredis
也提供了相应的接口将命令添加在输出 buffer
中,在获取响应时一次性发送。
Marshal
Redis
使用 RESP
协议来通信,格式很简单,分为几类,具体可以看 RESP。客户端发送的命令格式是固定的 Bulk String
的 Array
,
hiredis
提供了类 printf
的接口,以空格分隔的各个 argv
会 marshal
成 Bulk String
,并在头部添加 Array
的长度构成完整的命令。
Unmarshal
比较麻烦的是解析响应,因为 Redis
会返回多种类型的响应,hiredis
返回的响应是如下结构,通过 type
区分,包含了所有类型:
typedef struct redisReply {
int type; /* REDIS_REPLY_* */
long long integer; /* The integer when type is REDIS_REPLY_INTEGER */
size_t len; /* Length of string */
char *str; /* Used for both REDIS_REPLY_ERROR and REDIS_REPLY_STRING */
size_t elements; /* number of elements, for REDIS_REPLY_ARRAY */
struct redisReply **element; /* elements vector for REDIS_REPLY_ARRAY */
} redisReply;
hiredis
将解析响应的模块 redisReader
单独拆了出来。除了 Array
类型,其余类型的响应都很简单,去掉头部和 \r\n
即可,而 Array
可以包含任意类型的元素,包括嵌套 Array
,
且不要求每个元素类型相同。Array
的解析使用栈结构,只有 Array
的所有元素解析完成,该 Array
才解析完成,hiredis
也是这样实现的:
typedef struct redisReadTask {
int type;
int elements; /* number of elements in multibulk container */
int idx; /* index in parent (array) object */
void *obj; /* holds user-generated value for a read task */
struct redisReadTask *parent; /* parent task */
void *privdata; /* user-settable arbitrary field */
} redisReadTask;
typedef struct redisReader {
int err; /* Error flags, 0 when there is no error */
char errstr[128]; /* String representation of error when applicable */
char *buf; /* Read buffer */
size_t pos; /* Buffer cursor */
size_t len; /* Buffer length */
size_t maxbuf; /* Max length of unused buffer */
redisReadTask rstack[9];
int ridx; /* Index of current read task */
void *reply; /* Temporary reply pointer */
redisReplyObjectFunctions *fn;
void *privdata;
} redisReader;
redisReader
的 buf
保存从 Redis
接收到的响应,rstack[]
是解析任务的栈,redisReadTask
是一个解析任务,解析单个 RESP
结构。每个解析任务开始会 push
到 rstack[]
,解析完成就 pop
,当
rstack[]
为空时,解析完成。hiredis
限制了 rstack[]
的深度,所以嵌套的 Array
最多为 8
层,不过 Redis
目前不会返回嵌套如此深的响应。
异步
hiredis
提供了异步接口,但没有提供完整独立的功能,而是嵌入式的,依托于用户使用的 eventloop
,需要用户提供注册事件的接口,hiredis
在 adapters
目录下提供了一些常见网络库的接口,如 libev
、libevent
,
还包括 Redis
内部的 ae
。当使用异步接口时,会把命令添加到输出 buffer
中,并调用用户提供的注册写事件的接口,当写就绪时就会发送请求,并注册读事件处理响应。用户需要提供 callback
:
typedef void (redisCallbackFn)(struct redisAsyncContext*, void*, void*);
其中第一个参数是 redisReply
,第二个参数是 privdata
,当成功接收到响应时就会调用 callback
,callback
以单向链表保存,按照请求顺序依次调用。
异步接口没有超时时间,当连接出错时,hiredis
会传入 NULL reply
调用所有未完成请求的 callback
,callback
通过 reply
来判断请求成功与否。
hiredis
还提供了另外两种 callback
:
/* Connection callback prototypes */
typedef void (redisDisconnectCallback)(const struct redisAsyncContext*, int status);
typedef void (redisConnectCallback)(const struct redisAsyncContext*, int status);
hiredis
异步接口使用 nonblocking connect
,在第一次读写就绪时会调用的 onConnect callback
,并传入连接建立的状态;当连接断开时,会调用 onDisconnect callback
,传入的 status
标记是因为出错而断连还是主动要求断连,可以很方便的进行
重连或者清理工作。
因为 hiredis
是 minimalistic client
,采用嵌入式的异步接口实现,对应用带来的影响很小,使用也很方便。
Pub/Sub
hiredis
用 dict
保存 channel
到 callback
的映射,会根据 Redis
传来的 channel
找到对应的 callback
并调用。与同步相比,异步接口的单个连接就可以既发送普通命令,又处理 Pub/Sub
响应。
I/O 多路复用模拟同步
就我所知,一般实现如下:
- 单独的线程跑
eventloop
:负责接收其他线程的请求、发送请求、接收响应。 - 一般使用
pipe
或者lock-free
队列在请求线程和eventloop
线程间传递消息。 eventloop
线程接收完整响应后,调用callback
将响应传递给请求线程。一般实现为请求线程使用pthread_cond_wait
等待请求完成,eventloop
调用的callback
中会进行赋值和pthread_cond_signal
。
留下评论