ScyllaDB 学习(一) – smf

学习 ScyllaDB 会从 seastar 入手,在这之前先学习下 smf,它是用 seastar 实现的 RPC 框架,号称西半球最快 RPC。

RPC 框架实现

实现最基础的 RPC 框架还是比较简单的,从上到下的话大概分为这几步:

  1. 支持自定义请求、服务:现在的 RPC 框架大都使用代码生成,用户只要根据协议定义好类型和接口即可,RPC 框架会生成好对应的类、客户端和服务端。类型通常使用现成的序列化库,比如 protobuf。客户端和服务端的代码生成要由 RPC 框架自己写,因为需要依赖框架的内部实现,比如框架通常都会有一个 client 基类,功能是发送各种请求给服务端,而生成的客户端就会继承这个基类,并在此之上提供强类型的、用户定义好的接口。客户端是拿来即用的,而服务端一般也是个基类,需要用户实现定义好的接口。
  2. 通信协议:最基本的要求包括两部分,一是类型的序列化,这由序列化库解决了;二是找到要调用的接口,比如每个接口都有一个 UUID,客户端发请求就会带着它,服务端收到后就通过 UUID 找到对应的接口再调用。以 gRPC 为例,它的功能比较多,协议用的是 HTTP/2,接口的 UUID 是 /{service name}/{method name}
  3. 网络框架:负责数据的收发,没啥好说的,主要关注它的性能,也是这次的重点。

smf 实现

自定义请求、服务

smf 序列化用的是 FlatBuffers,之前没听说过(孤陋寡闻了),看了下介绍说最大的亮点是不需要反序列化就能访问数据,那盲猜序列化是直接把内存结构保存下来了,所以就不需要反序列,从 benchmark 结果看,性能确实很好,序列化后的大小确实比 raw struct 还大一些,应该加了 header 记录 offset 之类的。它也支持向前/后兼容,不了解支持到哪种程度,应该是在 header 上做了些文章。

代码生成器就是按照固定的框架生成代码,就不介绍实现了,只看看生成的代码是什么样的。生成的 service 会继承 smf::rpc_service,并且定义好各个接口的默认实现,用户需要继承这个 service 来实现自己的逻辑,再注册到 smf::rpc_server 里使用。smf::rpc_service 定义如下,从定义就能猜出来 smf::rpc_server 是怎么处理请求了:请求里会带着 uint32_t 的 request ID,server 遍历注册的 service 找到对应的 method 来调用。

struct rpc_service {
  virtual const char *service_name() const = 0;
  virtual uint32_t service_id() const = 0;
  virtual rpc_service_method_handle *method_for_request_id(uint32_t idx) = 0;
  virtual std::ostream &print(std::ostream &) const = 0;
  virtual ~rpc_service() {}
  rpc_service() {}
};

看代码也确实是这样,smf::rpc_server 把注册的 service 保存在 vector 里,遍历调用 method_for_request_id() 找到 method 来调用,method_for_request_id() 的实现就是 swtich-case 。request ID 的计算方式如下:

  • request ID = service ID ^ method ID
  • service ID = crc32(service name)
  • method ID = crc32(method name:input type name:output type name),因为 C++ 支持重载,所以参数和返回值的类型也会用来计算。

rpc_service_method_handle 里 method 类型如下,这里就不介绍 seastar::future 了。因为 service 的接口都是具体的类型(rpc_typed_envelope<T>, rpc_recv_typed_context<T>),而这里需要用统一的类型来保存,所以用的是保存序列化之后数据的 rpc_recv_contextrpc_envelope 类型,这里返回的 method 也就是在用户实现的接口上再包了一层类型转换的。

using fn_t = seastar::noncopyable_function<seastar::future<rpc_envelope>(
  rpc_recv_context &&recv)>;

service 介绍完就到 client 了,生成的 client 会继承 smf::rpc_client,它提供了发送请求&接收响应的功能,生成的 client 就是在它之上包了一层来提供用户定义的接口。

通信协议

smf 的协议就是在 FlatBuffers 序列化的 payload 之前增加了固定大小的 header,header 的序列化也是把内存结构直接保存下来了,也没处理大小端之类的。header 格式如下:

/// layout
/// [ 8bits(compression) + 8bits(bitflags) + 16bits(session) + 32bits(size) + 32bits(checksum) + 32bits(meta) ]
/// total = 128bits == 16bytes

客户端会在 meta 里设置上面提到的 request ID,服务端会在 meta 里设置 HTTP status code;size 是 payload 的大小;session 在后面介绍;compression/checksum 看名字也知道作用。

网络框架

到了这次的重点,结果发现没什么好写的,全是 seastar 的功劳🤣。

数据收发

先来看数据的收发,在上面也提到了类型转换,单向的类型转换如下:

  • rpc_typed_envelope<T> -> rpc_envelope -> rpc_recv_context -> rpc_recv_typed_context<T>

typed 类型是包含具体类型的,序列化后就是 non-typed 的类型,发数据的实现在 rpc_envelope,收数据在 rpc_recv_context,实现就是调 seastar 的方法先发/收 header 再发/收 payload,看代码就够了,不介绍了。

seastar::future<>
rpc_envelope::send(seastar::output_stream<char> *out, rpc_envelope e) {
  ...
  return out->write(std::move(header_buf))
    .then([out, e = std::move(e)]() mutable {
      return out->write(std::move(e.letter.body));
    })
    .then([out] { return out->flush(); });
}

// rpc_recv_context::parse_payload() 类似
seastar::future<std::optional<rpc::header>>
rpc_recv_context::parse_header(rpc_connection *conn) {
  ...
  return conn->istream.read_exactly(kRPCHeaderSize)
    .then([conn](seastar::temporary_buffer<char> header) {
      auto hdr = rpc::header();
      std::memcpy(&hdr, header.get(), kRPCHeaderSize);
      return seastar::make_ready_future<ret_type>(std::move(hdr));
    })
  ...
}

rpc_server

现在看下 rpc_server 是如何处理连接的,rpc_server::start() 会 spawn 一个一直接收新连接的 future,handle_client_connection() 也是 spawn 出去的 future 所以是异步处理每个连接的。

void
rpc_server::start() {
  ...
  (void)seastar::keep_doing([this] {
    return listener_->accept().then([this, stats = stats_, limits = limits_](
                                      seastar::accept_result result) mutable {
      ...
      // DO NOT return the future. Need to execute in parallel
      (void)handle_client_connection(conn);
    });
  })
  ...
}

每个连接的处理流程简化下来就是 rpc_recv_context::parse_header() -> rpc_recv_context::parse_payload() -> method() -> rpc_envelope::send() 的 continuation,后两步也是在 spawn 出去的 future 执行的,所以即使单个连接也是支持并发请求的,但每个连接只有一个 socket,肯定不能并发发消息,所以这里用了 seastar::semaphore serialize_writes{1} 来保证串行发送:

          return seastar::with_semaphore(
            conn->serialize_writes, 1, [conn, ee = std::move(e)]() mutable {
              return smf::rpc_envelope::send(&conn->conn.ostream,
                                             std::move(ee));
            });

rpc_client

rpc_client 同理,不过是反过来的流程,它同样支持并发发请求,也是用 seastar::semaphore 来串行化的。因为支持并发请求也意味着不能保证响应到达的顺序,所以 rpc_client 会给每个请求在 header 里设置 session ID,service method 也会给响应设置 session ID,这样就能保证请求和响应匹配了。rpc_client 发请求时用 unordered_map 保存了每个请求的 work_item,发请求的 future 就会阻塞在等待 promise 完成:work->pr.get_future()

  struct work_item {
    using promise_t = seastar::promise<std::optional<rpc_recv_context>>;
    ...
    promise_t pr;
    uint16_t session{0};
  };

rpc_client 在连接到 server 后会 spawn 专门读响应的 future,每收到完整的响应后就会从 map 里根据 session ID 找到对应的 work_item 并设置 seastar::promise 的值,从而唤醒对应发请求的 future。

          uint16_t sess = opt->session();
          auto it = rpc_slots_.find(sess);
          ...
          it->second->pr.set_value(std::move(opt));
          rpc_slots_.erase(it);

使用

smf 其实就是基于 seastar 实现了支持 RPC 的 future,所以使用方式还是遵循 seastar 的。server 的启动方式大致如下,意思是在每个 core 上都创建 smf::rpc_server,注册 service 再启动。

  seastar::distributed<smf::rpc_server> rpc;
  seastar::app_template app;
  return app.run_deprecated(args, argv, [&] {
    return seastar::async([&] {
      rpc.start(args).get();
      rpc.invoke_on_all(&smf::rpc_server::register_service<storage_service>)
        .get();
      rpc.invoke_on_all(&smf::rpc_server::start).get();
    });
  });

总结

smf 就是最简单的 RPC 实现,高性能全依赖 seastar,从 smf 使用 seastar 的方式看,seastar 非常像还没 async/.await 时期的 Rust future,我要是没 Rust 经验的话看起来还吃力点,现在看还蛮亲切的。之后会开始学习 seastar

分类:

更新时间:

留下评论