ScyllaDB 学习(一) – smf
学习 ScyllaDB
会从 seastar
入手,在这之前先学习下 smf
,它是用 seastar
实现的 RPC 框架,号称西半球最快 RPC。
RPC 框架实现
实现最基础的 RPC 框架还是比较简单的,从上到下的话大概分为这几步:
- 支持自定义请求、服务:现在的 RPC 框架大都使用代码生成,用户只要根据协议定义好类型和接口即可,RPC 框架会生成好对应的类、客户端和服务端。类型通常使用现成的序列化库,比如
protobuf
。客户端和服务端的代码生成要由 RPC 框架自己写,因为需要依赖框架的内部实现,比如框架通常都会有一个 client 基类,功能是发送各种请求给服务端,而生成的客户端就会继承这个基类,并在此之上提供强类型的、用户定义好的接口。客户端是拿来即用的,而服务端一般也是个基类,需要用户实现定义好的接口。 - 通信协议:最基本的要求包括两部分,一是类型的序列化,这由序列化库解决了;二是找到要调用的接口,比如每个接口都有一个 UUID,客户端发请求就会带着它,服务端收到后就通过 UUID 找到对应的接口再调用。以
gRPC
为例,它的功能比较多,协议用的是HTTP/2
,接口的 UUID 是/{service name}/{method name}
。 - 网络框架:负责数据的收发,没啥好说的,主要关注它的性能,也是这次的重点。
smf 实现
自定义请求、服务
smf
序列化用的是 FlatBuffers
,之前没听说过(孤陋寡闻了),看了下介绍说最大的亮点是不需要反序列化就能访问数据,那盲猜序列化是直接把内存结构保存下来了,所以就不需要反序列,从 benchmark 结果看,性能确实很好,序列化后的大小确实比 raw struct 还大一些,应该加了 header 记录 offset 之类的。它也支持向前/后兼容,不了解支持到哪种程度,应该是在 header 上做了些文章。
代码生成器就是按照固定的框架生成代码,就不介绍实现了,只看看生成的代码是什么样的。生成的 service 会继承 smf::rpc_service
,并且定义好各个接口的默认实现,用户需要继承这个 service 来实现自己的逻辑,再注册到 smf::rpc_server
里使用。smf::rpc_service
定义如下,从定义就能猜出来 smf::rpc_server
是怎么处理请求了:请求里会带着 uint32_t
的 request ID,server 遍历注册的 service 找到对应的 method 来调用。
struct rpc_service {
virtual const char *service_name() const = 0;
virtual uint32_t service_id() const = 0;
virtual rpc_service_method_handle *method_for_request_id(uint32_t idx) = 0;
virtual std::ostream &print(std::ostream &) const = 0;
virtual ~rpc_service() {}
rpc_service() {}
};
看代码也确实是这样,smf::rpc_server
把注册的 service 保存在 vector 里,遍历调用 method_for_request_id()
找到 method 来调用,method_for_request_id()
的实现就是 swtich-case 。request ID 的计算方式如下:
- request ID = service ID ^ method ID
- service ID = crc32(service name)
- method ID = crc32(method name:input type name:output type name),因为 C++ 支持重载,所以参数和返回值的类型也会用来计算。
rpc_service_method_handle
里 method 类型如下,这里就不介绍 seastar::future
了。因为 service 的接口都是具体的类型(rpc_typed_envelope<T>
, rpc_recv_typed_context<T>
),而这里需要用统一的类型来保存,所以用的是保存序列化之后数据的 rpc_recv_context
和 rpc_envelope
类型,这里返回的 method 也就是在用户实现的接口上再包了一层类型转换的。
using fn_t = seastar::noncopyable_function<seastar::future<rpc_envelope>(
rpc_recv_context &&recv)>;
service 介绍完就到 client 了,生成的 client 会继承 smf::rpc_client
,它提供了发送请求&接收响应的功能,生成的 client 就是在它之上包了一层来提供用户定义的接口。
通信协议
smf
的协议就是在 FlatBuffers
序列化的 payload 之前增加了固定大小的 header,header 的序列化也是把内存结构直接保存下来了,也没处理大小端之类的。header 格式如下:
/// layout
/// [ 8bits(compression) + 8bits(bitflags) + 16bits(session) + 32bits(size) + 32bits(checksum) + 32bits(meta) ]
/// total = 128bits == 16bytes
客户端会在 meta
里设置上面提到的 request ID,服务端会在 meta
里设置 HTTP status code;size
是 payload 的大小;session
在后面介绍;compression
/checksum
看名字也知道作用。
网络框架
到了这次的重点,结果发现没什么好写的,全是 seastar
的功劳🤣。
数据收发
先来看数据的收发,在上面也提到了类型转换,单向的类型转换如下:
rpc_typed_envelope<T>
->rpc_envelope
->rpc_recv_context
->rpc_recv_typed_context<T>
typed 类型是包含具体类型的,序列化后就是 non-typed 的类型,发数据的实现在 rpc_envelope
,收数据在 rpc_recv_context
,实现就是调 seastar
的方法先发/收 header 再发/收 payload,看代码就够了,不介绍了。
seastar::future<>
rpc_envelope::send(seastar::output_stream<char> *out, rpc_envelope e) {
...
return out->write(std::move(header_buf))
.then([out, e = std::move(e)]() mutable {
return out->write(std::move(e.letter.body));
})
.then([out] { return out->flush(); });
}
// rpc_recv_context::parse_payload() 类似
seastar::future<std::optional<rpc::header>>
rpc_recv_context::parse_header(rpc_connection *conn) {
...
return conn->istream.read_exactly(kRPCHeaderSize)
.then([conn](seastar::temporary_buffer<char> header) {
auto hdr = rpc::header();
std::memcpy(&hdr, header.get(), kRPCHeaderSize);
return seastar::make_ready_future<ret_type>(std::move(hdr));
})
...
}
rpc_server
现在看下 rpc_server
是如何处理连接的,rpc_server::start()
会 spawn 一个一直接收新连接的 future,handle_client_connection()
也是 spawn 出去的 future 所以是异步处理每个连接的。
void
rpc_server::start() {
...
(void)seastar::keep_doing([this] {
return listener_->accept().then([this, stats = stats_, limits = limits_](
seastar::accept_result result) mutable {
...
// DO NOT return the future. Need to execute in parallel
(void)handle_client_connection(conn);
});
})
...
}
每个连接的处理流程简化下来就是 rpc_recv_context::parse_header()
-> rpc_recv_context::parse_payload()
-> method()
-> rpc_envelope::send()
的 continuation,后两步也是在 spawn 出去的 future 执行的,所以即使单个连接也是支持并发请求的,但每个连接只有一个 socket,肯定不能并发发消息,所以这里用了 seastar::semaphore serialize_writes{1}
来保证串行发送:
return seastar::with_semaphore(
conn->serialize_writes, 1, [conn, ee = std::move(e)]() mutable {
return smf::rpc_envelope::send(&conn->conn.ostream,
std::move(ee));
});
rpc_client
rpc_client
同理,不过是反过来的流程,它同样支持并发发请求,也是用 seastar::semaphore
来串行化的。因为支持并发请求也意味着不能保证响应到达的顺序,所以 rpc_client
会给每个请求在 header 里设置 session ID,service method 也会给响应设置 session ID,这样就能保证请求和响应匹配了。rpc_client
发请求时用 unordered_map
保存了每个请求的 work_item
,发请求的 future 就会阻塞在等待 promise 完成:work->pr.get_future()
。
struct work_item {
using promise_t = seastar::promise<std::optional<rpc_recv_context>>;
...
promise_t pr;
uint16_t session{0};
};
rpc_client
在连接到 server 后会 spawn 专门读响应的 future,每收到完整的响应后就会从 map 里根据 session ID 找到对应的 work_item
并设置 seastar::promise
的值,从而唤醒对应发请求的 future。
uint16_t sess = opt->session();
auto it = rpc_slots_.find(sess);
...
it->second->pr.set_value(std::move(opt));
rpc_slots_.erase(it);
使用
smf
其实就是基于 seastar
实现了支持 RPC 的 future,所以使用方式还是遵循 seastar
的。server 的启动方式大致如下,意思是在每个 core 上都创建 smf::rpc_server
,注册 service 再启动。
seastar::distributed<smf::rpc_server> rpc;
seastar::app_template app;
return app.run_deprecated(args, argv, [&] {
return seastar::async([&] {
rpc.start(args).get();
rpc.invoke_on_all(&smf::rpc_server::register_service<storage_service>)
.get();
rpc.invoke_on_all(&smf::rpc_server::start).get();
});
});
总结
smf
就是最简单的 RPC 实现,高性能全依赖 seastar
,从 smf
使用 seastar
的方式看,seastar
非常像还没 async/.await 时期的 Rust future,我要是没 Rust 经验的话看起来还吃力点,现在看还蛮亲切的。之后会开始学习 seastar
。
留下评论