写一个 Rust TPC Runtime(一) – Task
task 是 stateful future,需要保存 future、执行状态和 executor 的一些信息。task 的性能至关重要,主要就是要减少内存分配次数和大小。
接口
runtime 一般都会提供 spawn 方法来执行一个并发的 future,且需要返回能够获取返回值的 future。以 async-task
为例,Runnable
是 stateful future,Task<F::Output>
是获取 Future::Output
的 future。
pub fn spawn<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>)
where
F: Future + Send + 'static,
F::Output: Send + 'static,
S: Fn(Runnable) + Send + Sync + 'static,
{
...
}
tokio
也大同小异,glommio
和 monoio
分别是借鉴的 async-task
和 tokio
,yatp
因为是 Future<Output = ()>
就有一些区别。
内存分配
future 的类型各不相同,而 executor 需要保存统一类型的 task,就不能有范型 ,需要做类型擦除:
- 一种做法是用 trait object,也就是
struct Task { ... future: Box<dyn Future<Output = ()>>, }
但这会导致两次内存分配:一次 future 的 trait object,一次 task,也限制了
Future<Output>
的类型。 - 另一种就是分配包含 future 在内的一整块内存,future 的类型信息通过 vtable 来保存,类似 RawWaker。task 只要保存这块内存的地址即可。
tokio
早期用的方案一,现在常见的 runtime 都用的方案二。task 里大概需要保存这些信息:
- future 和 output:不会同时存在,可以共用一块内存。
- 状态信息:future 的状态转换有一定约束,比如已经完成的 future 就不应该 wake 和 poll 了,task 就需要保存 future 的当前状态,通常引用计数也会保存在这里。
- 获取 output 的 waker:用于 future ready 后唤醒获取 output 的 future。
- vtable:包含所有需要类型信息的函数,类型信息可通过函数的范型附带上。
- scheduler(可选):wake 时调用的方法。有些 runtime 会有多种线程和调度模型,就需要保存这部分信息,如果只有一种的话那写死就行。
状态转换
task 的状态转换和 task 的定位、runtime 的实现有关,比如 async-task
的目标是通用的 task 实现,就需要支持多线程即用 CAS 来执行状态转换,需要考虑各种可能出现 data race 的情况,async-task
的实现见之前的博客。
典型的 task 状态转换大致如下:
- 创建 task,状态为 idle;
- task 扔进 task queue,状态为 scheduled;
- task 在 task queue 中被处理到,状态为 running;
- ready:状态为 completed;
- pending:注册 waker 到 reactor。reactor 调用 wake 再把 task 扔进 task queue,状态为 scheduled。
上面只是最基本的状态转换,task 有些约束需要遵循,比如不应该 poll 已经完成的 task;不应该把 task 同时扔进 task queue 多次,否则就可能 poll 已经完成的 task 或者多个线程同时 poll 单个 task。Rust 的异步抽象无法防止上述情况,全靠 runtime 实现来保证。我这里实现的是 TPC 的 runtime,reactor 也在相同的线程,所以没有 data race,而且 runtime 会有一定约束,比如 task 创建后会立刻 schedule、reactor 不会 wake 多次 task,状态转换就会简单很多。至于引用计数,task、获取结果的 future 和 waker 均算一个,注意好所有权转移即可。
MISC
Waker
是Send + Sync
的,这里的实现不满足要求,可能会出现在其他线程调用了 waker 的情况,比如用flume
扔 task 给 executor 线程时就可能调用 receiver 注册的 waker。解决办法是要实现 foreign wake,通过判断是否在 executor 线程来选择 wake 的实现,比如把 task 扔进一个 thread-safe 的 queue 并唤醒 executor 线程。
seastar
seastar
里分为 future/promise/continuation。seastar
里的 async function 是立即执行的,future/promise 可以看作是 spsc oneshot channel,只用于通知结果不包含需要运行的 task,也就无需动态内存分配,而 rust 的 future 只有 poll 了才生效,通常也不会为立刻 ready 的 future 做优化,所以需要内存分配。
continuation 是 future ready 后运行的 task,这里才需要动态内存分配一次。continuation 只有在能获取上一个 future 结果时才会运行且只会运行一次,所以也就没有各种状态转换需要处理,而 rust 允许 poll 和 wake 多次 future,在有了 async/.await 后 future 会有 sub-future,这种行为就成为了常态,也就很难像 seastar
这样做,各有利弊吧。
留下评论