Raft 笔记(三) – etcd/raft
正如 Raft 的设计目标,Raft 的理论很简单,但是正确又高效的实现没那么容易,需要注意很多 corner case。
目前最出名的实现当属 etcd/raft,它的实现基本按照Raft-thesis来,
所以在看代码前最好过一遍。
etcd/raft
etcd/raft 最大的优点是把整个 raft 实现为独立的状态机,以消息为输入,然后执行相应的状态转换,再返回需要由用户处理的部分。当使用时,需要用户实现相应的
transportation layer 和 storage layer。这种实现有很多好处:
- 灵活性:定制化选择
transportation layer和storage layer; - 性能:
raft只实现状态转移,性能由用户决定; - 确定性:方便测试,可以很容易的
mock出网络和磁盘。
结构
etcd/raft 为纯粹的状态机模型,所有的事件都以消息的形式处理。它主要有两个结构:
raft:raft状态机,根据消息触发状态转换;Node: 代表了raft cluster中的一个节点,提供了与raft状态机交互的接口。
raft 结构体维护了当前节点的 raft 状态,如 Term, Vote, lead 等,是非线程安全的,提供方法触发状态变化。主要的方法如下:
tick: 每次调用会增加内部的逻辑时钟,用于触发超时事件如leader election和heartbeat;step: 类型为type stepFunc func(r *raft, m pb.Message) error,用于处理消息;- 还有其他方法用于处理成员变更、获取状态等。
Node 是与用户交互的入口,是一个接口,具体实现会调用 raft 提供的方法,触发状态变化。它主要提供了如下几种方法:
Tick(): 用于增加raft的逻辑时钟;Propose(): 用于propose客户端的请求;ProposeConfChange(): 用于处理成员变更的请求;Ready(): 返回只读的chan,用于获取raft的当前状态,由用户进行相应处理;Advance(): 通知Node之前从Ready()返回的状态已被处理完,可以进行一下轮处理。
目前 etcd/raft 中有两种实现,后面的 node 都指线程安全的:
node.go: 线程安全的Node,通过chan将整个流程串行化,用户可以在任何情况下调用该Node的方法,也是后面主要介绍和etcd使用的;rawnode.go: 非线程安全的Node,直接调用raft的方法,用于实现multi-raft。
node 是由各种 chan 组成:
// node is the canonical implementation of the Node interface
type node struct {
propc chan msgWithResult
recvc chan pb.Message
confc chan pb.ConfChange
confstatec chan pb.ConfState
readyc chan Ready
advancec chan struct{}
tickc chan struct{}
done chan struct{}
stop chan struct{}
status chan chan Status
logger Logger
}
它的所有方法都是向内部 chan 发送一条消息,在创建 node 后,会启动一个 goroutine,内部为无限循环 select 处理 chan 的消息,所有涉及到 raft 状态机的
操作都在这里执行,使得所有操作都变为串行的,实现了线程安全。拿 tick 举例,用户需要定期调用 node.Tick() 来增加内部逻辑时钟:
// Tick increments the internal logical clock for this Node. Election timeouts
// and heartbeat timeouts are in units of ticks.
func (n *node) Tick() {
select {
case n.tickc <- struct{}{}:
case <-n.done:
default:
n.logger.Warningf("A tick missed to fire. Node blocks too long!")
}
}
在 goroutine 循环中接收到 n.tickc 的消息时,调用 raft.tick() 增加内部逻辑时钟:
for {
select {
// ...
case <-n.tickc:
r.tick()
// ...
}
}
分层
上面提到,需要由用户处理 storage 和 transportation。etcd/raft 对这两层的解耦方式不太相同,因为 storage 和 raft 联系紧密,所以 etcd/raft 提供了
Storage 接口,由用户实现,然后在启动时设置到 Config 中,Storage 接口只涉及查询操作,何时持久化和如何持久化由用户决定:
// Storage is an interface that may be implemented by the application
// to retrieve log entries from storage.
//
// If any Storage method returns an error, the raft instance will
// become inoperable and refuse to participate in elections; the
// application is responsible for cleanup and recovery in this case.
type Storage interface {
// InitialState returns the saved HardState and ConfState information.
InitialState() (pb.HardState, pb.ConfState, error)
// Entries returns a slice of log entries in the range [lo,hi).
// MaxSize limits the total size of the log entries returned, but
// Entries returns at least one entry if any.
Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)
// Term returns the term of entry i, which must be in the range
// [FirstIndex()-1, LastIndex()]. The term of the entry before
// FirstIndex is retained for matching purposes even though the
// rest of that entry may not be available.
Term(i uint64) (uint64, error)
// LastIndex returns the index of the last entry in the log.
LastIndex() (uint64, error)
// FirstIndex returns the index of the first log entry that is
// possibly available via Entries (older entries have been incorporated
// into the latest Snapshot; if storage only contains the dummy entry the
// first log entry is not available).
FirstIndex() (uint64, error)
// Snapshot returns the most recent snapshot.
// If snapshot is temporarily unavailable, it should return ErrSnapshotTemporarilyUnavailable,
// so raft state machine could know that Storage needs some time to prepare
// snapshot and call Snapshot later. 正在做?
Snapshot() (pb.Snapshot, error)
}
而 transportation 并没有在 etcd/raft 中处理,而是全权交给用户处理,需要发送的消息都会存放在 raft.msgs 中。Ready 结构体包含自上次处理后已就绪的需要由用户处理的信息,
包括需要发送的 msg 和需要持久化存储的信息。在 goroutine 每次循环开始会对比之前的状态与当前状态,生成 Ready,
node.Ready() 方法就是返回对应的 readyc。由用户对 Ready 处理,从而实现了分层:
- 持久化
Entries、HardState、Snapshot; - 发送
Messages给其他节点; - 应用
snapshot、CommittedEntries到状态机; - 调用
node.Advance()通知raft之前的更新已处理完,可以进行清理,同时可以处理下一轮更新。
for {
if advancec != nil {
readyc = nil
} else {
rd = newReady(r, prevSoftSt, prevHardSt)
if rd.containsUpdates() {
readyc = n.readyc
} else {
readyc = nil
}
}
// ...
select {
// ...
case readyc <- rd:
// 保存当前状态,用于之后对比
if rd.SoftState != nil {
prevSoftSt = rd.SoftState
}
if len(rd.Entries) > 0 {
prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index
prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term
havePrevLastUnstablei = true
}
if !IsEmptyHardState(rd.HardState) {
prevHardSt = rd.HardState
}
if !IsEmptySnap(rd.Snapshot) {
prevSnapi = rd.Snapshot.Metadata.Index
}
r.msgs = nil
r.readStates = nil
advancec = n.advancec
case <-advancec:
if prevHardSt.Commit != 0 {
r.raftLog.appliedTo(prevHardSt.Commit)
}
if havePrevLastUnstablei {
// shrink unstable entries
r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet)
// delete unstable snapshot
havePrevLastUnstablei = false
}
r.raftLog.stableSnapTo(prevSnapi)
advancec = nil
}
使用
基本使用方法在 README 里有,并且还有一个例子可以参考:raftexample:
- 当收到其他节点的
message时,调用node.Step()传给raft状态机处理; - 当收到客户端的请求时,调用
node.Propose(); - 最主要的结构就是一个状态机循环:
for { select { case <-s.Ticker: n.Tick() // 增加逻辑时钟 case rd := <-s.Node.Ready(): // 处理 Ready 结构体 saveToStorage(rd.HardState, rd.Entries, rd.Snapshot) // 持久化 send(rd.Messages) // 发送消息给其他节点 if !raft.IsEmptySnap(rd.Snapshot) { processSnapshot(rd.Snapshot) } for _, entry := range rd.CommittedEntries { // 处理 commited entries process(entry) if entry.Type == raftpb.EntryConfChange { var cc raftpb.ConfChange cc.Unmarshal(entry.Data) s.Node.ApplyConfChange(cc) } } s.Node.Advance() // 通知 raft 当前状态已处理完,可以处理下一轮状态 case <-s.done: return } }
测试
测试一直是分布式系统的难点和重点,etcd/raft 的实现为纯粹的状态机,可以轻易的 mock 出网络故障或磁盘故障,从而确保实现的正确性。
留下评论