RocksDB 源码分析 – Write Batch
RocksDB 和 LevelDB 相同,不支持并行写,原因有几个:
memtable可能不支持并行写。- 单纯的并行写不能保证
WAL和操作的执行顺序一致,要有同步机制。
一条条的写 WAL 性能、执行命令不高,所以 LevelDB/RocksDB 会把多个写操作合并由单个 leader 线程来执行,leader 执行完成后通知其他线程写操作完成。
LevelDB 的实现很简单,写操作添加到 deque 中,队首的 leader 合并其他写操作来执行,deque 由 mutex 保护,通知由 cond var 实现。
JoinBatchGroup
RocksDB 的实现做了很多优化,用于提高性能。每个 Writer 会处于以下状态之一:
STATE_INITSTATE_GROUP_LEADERSTATE_MEMTABLE_WRITER_LEADERSTATE_PARALLEL_MEMTABLE_WRITERSTATE_COMPLETEDSTATE_LOCKED_WAITING
初始时,每个 Writer 都是 STATE_INIT。Writer 添加到队列中来竞争 leader:
Writer实现为双向链表结点:Writer* link_older/link_newer。WriteThread中维护原子变量的链表头,指向最新添加的Writer:std::atomic<Writer*> newest_writer_。- 采用
latch-free方式将Writer添加到链表中。bool WriteThread::LinkOne(Writer* w, std::atomic<Writer*>* newest_writer) { assert(newest_writer != nullptr); assert(w->state == STATE_INIT); Writer* writers = newest_writer->load(std::memory_order_relaxed); while (true) { ... w->link_older = writers; if (newest_writer->compare_exchange_weak(writers, w)) { return (writers == nullptr); } } }
第一个添加成功的,也就是添加时 newest_writer_ 为 nullptr 的 Writer 成为 leader,设置状态为 STATE_GROUP_LEADER,其余的 Writer 等待 leader 通知状态变更。
leader 从链表中从后往前挑选 Writer 构成 WriteGroup,WriteGroup 有大小限制,最大 1MB,防止构建耗时太多(EnterAsBatchGroupLeader()),之后合并 WriteBatch 写入 WAL、遍历 WriteGroup 写入 memtable,
最后通知其他 Writer 操作完成(ExitAsBatchGroupLeader())。
通知
RocksDB 主要针对通知操作做了很多优化,因为发现从 FUTEX_WAKE 到 FUTEX_WAIT 返回平均需要 10us,延迟太大,所以 RocksDB 尽量不使用 cond var 来通知状态变更:
For reference, on my 4.0 SELinux test server with support for syscall auditing enabled, the minimum latency between FUTEX_WAKE to returning from FUTEX_WAIT is 2.7 usec, and the average is more like 10 usec. That can be a big drag on RocksDB’s single-writer design.
每个 Writer 有 std::atomic<uint8_t> state 标记状态,也是通过它来检测状态变更。当 leader 执行完成后,会选出新的 leader 并设置 follower 的状态为 STATE_COMPLETED:
static WriteThread::AdaptationContext eabgl_ctx("ExitAsBatchGroupLeader");
void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group,
Status status) {
Writer* leader = write_group.leader;
Writer* last_writer = write_group.last_writer;
...
Writer* head = newest_writer_.load(std::memory_order_acquire);
if (head != last_writer ||
!newest_writer_.compare_exchange_strong(head, nullptr)) {
assert(head != last_writer);
CreateMissingNewerLinks(head);
assert(last_writer->link_newer->link_older == last_writer);
last_writer->link_newer->link_older = nullptr;
SetState(last_writer->link_newer, STATE_GROUP_LEADER); // 设置 WriteGroup 后一个 Writer 为 leader
}
while (last_writer != leader) {
last_writer->status = status;
auto next = last_writer->link_older;
SetState(last_writer, STATE_COMPLETED); // 设置 follower 完成
last_writer = next;
}
...
}
非 leader 的 Writer 等待状态变更分为 3 个阶段(AwaitState()):
Busy loop using "pause" for 1 micro secBusy loop using "yield" for 100 micro sec (default)Blocking wait
1. Busy loop using “pause”
循环 pause 检测状态变更持续约 1us:
for (uint32_t tries = 0; tries < 200; ++tries) {
state = w->state.load(std::memory_order_acquire);
if ((state & goal_mask) != 0) {
return state;
}
port::AsmVolatilePause();
}
asm volatile("pause") 用于提高 spin-wait loop 的性能:
Improves the performance of spin-wait loops. When executing a “spin-wait loop,” a Pentium 4 or Intel Xeon processor suffers a severe performance penalty when exiting the loop because it detects a possible memory order violation. The PAUSE instruction provides a hint to the processor that the code sequence is a spin-wait loop. The processor uses this hint to avoid the memory order violation in most situations, which greatly improves processor performance. For this reason, it is recommended that a PAUSE instruction be placed in all spin-wait loops.
2. Busy loop using “yield”
如果等待时间很长或者有其他线程在竞争使用相同的核(involuntary context switch)时,就不适合使用 spin-loop,太消耗 CPU 资源而且利用率不高,
所以 RocksDB 又实现了灵活的 spin-loop,会在合适的时机跳出循环。
RocksDB 有如下配置:
allow_concurrent_memtable_write: 开启yielding spin-loop,能够提高吞吐。write_thread_max_yield_usec:yielding spin-loop持续的最长时间,默认为100us。write_thread_slow_yield_usec: 当yield调用耗时超过该值时,就认为有其他线程使用了相同的CPU,默认为3us,因为当没有其他线程在相同的核时,yield就不会发生context switch,耗时在1us内。
当 spin-loop 超过 write_thread_max_yield_usec 或者 yield 耗时大于 write_thread_slow_yield_usec 的次数达到 kMaxSlowYieldsWhileSpinning 次就会跳出循环
(固定 3 次,若是默认配置,则 3 次耗时 9us 就和使用 cond var 通知的平均耗时 10us 差不多):
auto spin_begin = std::chrono::steady_clock::now();
size_t slow_yield_count = 0;
auto iter_begin = spin_begin;
while ((iter_begin - spin_begin) <=
std::chrono::microseconds(max_yield_usec_)) {
std::this_thread::yield();
state = w->state.load(std::memory_order_acquire);
if ((state & goal_mask) != 0) {
would_spin_again = true;
break;
}
auto now = std::chrono::steady_clock::now();
if (now == iter_begin ||
now - iter_begin >= std::chrono::microseconds(slow_yield_usec_)) {
++slow_yield_count;
if (slow_yield_count >= kMaxSlowYieldsWhileSpinning) {
update_ctx = true;
break;
}
}
iter_begin = now;
}
除了配置作为开关以外,还有动态的开关 yield_credit,只有该值大于等于 0 时或者 update_ctx 为 true 时( 1/256 的概率),才会使用 yielding spin-loop:
if (max_yield_usec_ > 0) {
update_ctx = Random::GetTLSInstance()->OneIn(sampling_base);
if (update_ctx || yield_credit.load(std::memory_order_relaxed) >= 0) {
...
}
...
}
yield_credit 初始值为 0,会根据 spin-loop 的结果动态调整:
- 因慢
yield跳出循环会减少。 - 若(未)超时有
1/256的概率(增加)减少,因为在理想情况下,成功的概率很高,若每次都成功都增加会导致yield_credit很大,少量的超时或者慢yield带来的衰减就微不足道了。if (update_ctx) { // Since our update is sample based, it is ok if a thread overwrites the // updates by other threads. Thus the update does not have to be atomic. auto v = yield_credit.load(std::memory_order_relaxed); // fixed point exponential decay with decay constant 1/1024, with +1 // and -1 scaled to avoid overflow for int32_t // // On each update the positive credit is decayed by a facor of 1/1024 (i.e., // 0.1%). If the sampled yield was successful, the credit is also increased // by X. Setting X=2^17 ensures that the credit never exceeds // 2^17*2^10=2^27, which is lower than 2^31 the upperbound of int32_t. Same // logic applies to negative credits. v = v - (v / 1024) + (would_spin_again ? 1 : -1) * 131072; yield_credit.store(v, std::memory_order_relaxed); }
3. Blocking wait
最后阶段就是用 cond var 来通知,但 RocksDB 也做了很多优化:
// 等待
uint8_t WriteThread::BlockingAwaitState(Writer* w, uint8_t goal_mask) {
w->CreateMutex();
auto state = w->state.load(std::memory_order_acquire);
assert(state != STATE_LOCKED_WAITING);
if ((state & goal_mask) == 0 &&
w->state.compare_exchange_strong(state, STATE_LOCKED_WAITING)) { // write-release
std::unique_lock<std::mutex> guard(w->StateMutex());
w->StateCV().wait(guard, [w] {
return w->state.load(std::memory_order_relaxed) != STATE_LOCKED_WAITING;
});
state = w->state.load(std::memory_order_relaxed);
}
assert((state & goal_mask) != 0);
return state;
}
// 通知
void WriteThread::SetState(Writer* w, uint8_t new_state) {
auto state = w->state.load(std::memory_order_acquire); // read-acquire
if (state == STATE_LOCKED_WAITING ||
!w->state.compare_exchange_strong(state, new_state)) { // read-acquire
assert(state == STATE_LOCKED_WAITING);
std::lock_guard<std::mutex> guard(w->StateMutex());
assert(w->state.load(std::memory_order_relaxed) != new_state);
w->state.store(new_state, std::memory_order_relaxed);
w->StateCV().notify_one();
}
}
这里有很多细节需要注意:
mutex和cond var是lazy create的,只有第一次使用blocking wait时才会创建,使用aligned_storage是为了避免动态内存分配:struct Writer { ... std::aligned_storage<sizeof(std::mutex)>::type state_mutex_bytes; std::aligned_storage<sizeof(std::condition_variable)>::type state_cv_bytes; ... void CreateMutex() { if (!made_waitable) { made_waitable = true; new (&state_mutex_bytes) std::mutex; new (&state_cv_bytes) std::condition_variable; } } }- 通过设置
Writer状态为STATE_LOCKED_WAITING保证了leader通知时mutex + cond var是已构造完成的,参看上面标记的acquire/release。 leader只会设置follower状态为goal state,follower只会设置状态为STATE_LOCKED_WAITING,所以compare_exchange_strong失败时可以采取适当的措施。std::condition_variable::wait()等价于while (!pred()) wait(lock);,保证了即使notify_one()丢失也不会一直等待。
留下评论