Raft 笔记(三) – etcd/raft
正如 Raft
的设计目标,Raft
的理论很简单,但是正确又高效的实现没那么容易,需要注意很多 corner case
。
目前最出名的实现当属 etcd/raft,它的实现基本按照Raft-thesis来,
所以在看代码前最好过一遍。
etcd/raft
etcd/raft
最大的优点是把整个 raft
实现为独立的状态机,以消息为输入,然后执行相应的状态转换,再返回需要由用户处理的部分。当使用时,需要用户实现相应的
transportation layer
和 storage layer
。这种实现有很多好处:
- 灵活性:定制化选择
transportation layer
和storage layer
; - 性能:
raft
只实现状态转移,性能由用户决定; - 确定性:方便测试,可以很容易的
mock
出网络和磁盘。
结构
etcd/raft
为纯粹的状态机模型,所有的事件都以消息的形式处理。它主要有两个结构:
raft
:raft
状态机,根据消息触发状态转换;Node
: 代表了raft cluster
中的一个节点,提供了与raft
状态机交互的接口。
raft
结构体维护了当前节点的 raft
状态,如 Term
, Vote
, lead
等,是非线程安全的,提供方法触发状态变化。主要的方法如下:
tick
: 每次调用会增加内部的逻辑时钟,用于触发超时事件如leader election
和heartbeat
;step
: 类型为type stepFunc func(r *raft, m pb.Message) error
,用于处理消息;- 还有其他方法用于处理成员变更、获取状态等。
Node
是与用户交互的入口,是一个接口,具体实现会调用 raft
提供的方法,触发状态变化。它主要提供了如下几种方法:
Tick()
: 用于增加raft
的逻辑时钟;Propose()
: 用于propose
客户端的请求;ProposeConfChange()
: 用于处理成员变更的请求;Ready()
: 返回只读的chan
,用于获取raft
的当前状态,由用户进行相应处理;Advance()
: 通知Node
之前从Ready()
返回的状态已被处理完,可以进行一下轮处理。
目前 etcd/raft
中有两种实现,后面的 node
都指线程安全的:
node.go
: 线程安全的Node
,通过chan
将整个流程串行化,用户可以在任何情况下调用该Node
的方法,也是后面主要介绍和etcd
使用的;rawnode.go
: 非线程安全的Node
,直接调用raft
的方法,用于实现multi-raft
。
node
是由各种 chan
组成:
// node is the canonical implementation of the Node interface
type node struct {
propc chan msgWithResult
recvc chan pb.Message
confc chan pb.ConfChange
confstatec chan pb.ConfState
readyc chan Ready
advancec chan struct{}
tickc chan struct{}
done chan struct{}
stop chan struct{}
status chan chan Status
logger Logger
}
它的所有方法都是向内部 chan
发送一条消息,在创建 node
后,会启动一个 goroutine
,内部为无限循环 select
处理 chan
的消息,所有涉及到 raft
状态机的
操作都在这里执行,使得所有操作都变为串行的,实现了线程安全。拿 tick
举例,用户需要定期调用 node.Tick()
来增加内部逻辑时钟:
// Tick increments the internal logical clock for this Node. Election timeouts
// and heartbeat timeouts are in units of ticks.
func (n *node) Tick() {
select {
case n.tickc <- struct{}{}:
case <-n.done:
default:
n.logger.Warningf("A tick missed to fire. Node blocks too long!")
}
}
在 goroutine
循环中接收到 n.tickc
的消息时,调用 raft.tick()
增加内部逻辑时钟:
for {
select {
// ...
case <-n.tickc:
r.tick()
// ...
}
}
分层
上面提到,需要由用户处理 storage
和 transportation
。etcd/raft
对这两层的解耦方式不太相同,因为 storage
和 raft
联系紧密,所以 etcd/raft
提供了
Storage
接口,由用户实现,然后在启动时设置到 Config
中,Storage
接口只涉及查询操作,何时持久化和如何持久化由用户决定:
// Storage is an interface that may be implemented by the application
// to retrieve log entries from storage.
//
// If any Storage method returns an error, the raft instance will
// become inoperable and refuse to participate in elections; the
// application is responsible for cleanup and recovery in this case.
type Storage interface {
// InitialState returns the saved HardState and ConfState information.
InitialState() (pb.HardState, pb.ConfState, error)
// Entries returns a slice of log entries in the range [lo,hi).
// MaxSize limits the total size of the log entries returned, but
// Entries returns at least one entry if any.
Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)
// Term returns the term of entry i, which must be in the range
// [FirstIndex()-1, LastIndex()]. The term of the entry before
// FirstIndex is retained for matching purposes even though the
// rest of that entry may not be available.
Term(i uint64) (uint64, error)
// LastIndex returns the index of the last entry in the log.
LastIndex() (uint64, error)
// FirstIndex returns the index of the first log entry that is
// possibly available via Entries (older entries have been incorporated
// into the latest Snapshot; if storage only contains the dummy entry the
// first log entry is not available).
FirstIndex() (uint64, error)
// Snapshot returns the most recent snapshot.
// If snapshot is temporarily unavailable, it should return ErrSnapshotTemporarilyUnavailable,
// so raft state machine could know that Storage needs some time to prepare
// snapshot and call Snapshot later. 正在做?
Snapshot() (pb.Snapshot, error)
}
而 transportation
并没有在 etcd/raft
中处理,而是全权交给用户处理,需要发送的消息都会存放在 raft.msgs
中。Ready
结构体包含自上次处理后已就绪的需要由用户处理的信息,
包括需要发送的 msg
和需要持久化存储的信息。在 goroutine
每次循环开始会对比之前的状态与当前状态,生成 Ready
,
node.Ready()
方法就是返回对应的 readyc
。由用户对 Ready
处理,从而实现了分层:
- 持久化
Entries
、HardState
、Snapshot
; - 发送
Messages
给其他节点; - 应用
snapshot
、CommittedEntries
到状态机; - 调用
node.Advance()
通知raft
之前的更新已处理完,可以进行清理,同时可以处理下一轮更新。
for {
if advancec != nil {
readyc = nil
} else {
rd = newReady(r, prevSoftSt, prevHardSt)
if rd.containsUpdates() {
readyc = n.readyc
} else {
readyc = nil
}
}
// ...
select {
// ...
case readyc <- rd:
// 保存当前状态,用于之后对比
if rd.SoftState != nil {
prevSoftSt = rd.SoftState
}
if len(rd.Entries) > 0 {
prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index
prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term
havePrevLastUnstablei = true
}
if !IsEmptyHardState(rd.HardState) {
prevHardSt = rd.HardState
}
if !IsEmptySnap(rd.Snapshot) {
prevSnapi = rd.Snapshot.Metadata.Index
}
r.msgs = nil
r.readStates = nil
advancec = n.advancec
case <-advancec:
if prevHardSt.Commit != 0 {
r.raftLog.appliedTo(prevHardSt.Commit)
}
if havePrevLastUnstablei {
// shrink unstable entries
r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet)
// delete unstable snapshot
havePrevLastUnstablei = false
}
r.raftLog.stableSnapTo(prevSnapi)
advancec = nil
}
使用
基本使用方法在 README
里有,并且还有一个例子可以参考:raftexample:
- 当收到其他节点的
message
时,调用node.Step()
传给raft
状态机处理; - 当收到客户端的请求时,调用
node.Propose()
; - 最主要的结构就是一个状态机循环:
for { select { case <-s.Ticker: n.Tick() // 增加逻辑时钟 case rd := <-s.Node.Ready(): // 处理 Ready 结构体 saveToStorage(rd.HardState, rd.Entries, rd.Snapshot) // 持久化 send(rd.Messages) // 发送消息给其他节点 if !raft.IsEmptySnap(rd.Snapshot) { processSnapshot(rd.Snapshot) } for _, entry := range rd.CommittedEntries { // 处理 commited entries process(entry) if entry.Type == raftpb.EntryConfChange { var cc raftpb.ConfChange cc.Unmarshal(entry.Data) s.Node.ApplyConfChange(cc) } } s.Node.Advance() // 通知 raft 当前状态已处理完,可以处理下一轮状态 case <-s.done: return } }
测试
测试一直是分布式系统的难点和重点,etcd/raft
的实现为纯粹的状态机,可以轻易的 mock
出网络故障或磁盘故障,从而确保实现的正确性。
留下评论