Raft 笔记(三) – etcd/raft

正如 Raft 的设计目标,Raft 的理论很简单,但是正确又高效的实现没那么容易,需要注意很多 corner case。 目前最出名的实现当属 etcd/raft,它的实现基本按照Raft-thesis来, 所以在看代码前最好过一遍。

etcd/raft

etcd/raft 最大的优点是把整个 raft 实现为独立的状态机,以消息为输入,然后执行相应的状态转换,再返回需要由用户处理的部分。当使用时,需要用户实现相应的 transportation layerstorage layer。这种实现有很多好处:

  • 灵活性:定制化选择 transportation layerstorage layer;
  • 性能:raft 只实现状态转移,性能由用户决定;
  • 确定性:方便测试,可以很容易的 mock 出网络和磁盘。

结构

etcd/raft 为纯粹的状态机模型,所有的事件都以消息的形式处理。它主要有两个结构:

  • raft: raft 状态机,根据消息触发状态转换;
  • Node: 代表了 raft cluster 中的一个节点,提供了与 raft 状态机交互的接口。

raft 结构体维护了当前节点的 raft 状态,如 Term, Vote, lead 等,是非线程安全的,提供方法触发状态变化。主要的方法如下:

  • tick: 每次调用会增加内部的逻辑时钟,用于触发超时事件如 leader electionheartbeat
  • step: 类型为 type stepFunc func(r *raft, m pb.Message) error,用于处理消息;
  • 还有其他方法用于处理成员变更、获取状态等。

Node 是与用户交互的入口,是一个接口,具体实现会调用 raft 提供的方法,触发状态变化。它主要提供了如下几种方法:

  • Tick(): 用于增加 raft 的逻辑时钟;
  • Propose(): 用于 propose 客户端的请求;
  • ProposeConfChange(): 用于处理成员变更的请求;
  • Ready(): 返回只读的 chan,用于获取 raft 的当前状态,由用户进行相应处理;
  • Advance(): 通知 Node 之前从 Ready() 返回的状态已被处理完,可以进行一下轮处理。

目前 etcd/raft 中有两种实现,后面的 node 都指线程安全的:

  1. node.go: 线程安全的 Node,通过 chan 将整个流程串行化,用户可以在任何情况下调用该 Node 的方法,也是后面主要介绍和 etcd 使用的;
  2. rawnode.go: 非线程安全的 Node,直接调用 raft 的方法,用于实现 multi-raft

node 是由各种 chan 组成:

// node is the canonical implementation of the Node interface
type node struct {
	propc      chan msgWithResult
	recvc      chan pb.Message
	confc      chan pb.ConfChange
	confstatec chan pb.ConfState
	readyc     chan Ready
	advancec   chan struct{}
	tickc      chan struct{}
	done       chan struct{}
	stop       chan struct{}
	status     chan chan Status

	logger Logger
}

它的所有方法都是向内部 chan 发送一条消息,在创建 node 后,会启动一个 goroutine,内部为无限循环 select 处理 chan 的消息,所有涉及到 raft 状态机的 操作都在这里执行,使得所有操作都变为串行的,实现了线程安全。拿 tick 举例,用户需要定期调用 node.Tick() 来增加内部逻辑时钟:

// Tick increments the internal logical clock for this Node. Election timeouts
// and heartbeat timeouts are in units of ticks.
func (n *node) Tick() {
	select {
	case n.tickc <- struct{}{}:
	case <-n.done:
	default:
		n.logger.Warningf("A tick missed to fire. Node blocks too long!")
	}
}

goroutine 循环中接收到 n.tickc 的消息时,调用 raft.tick() 增加内部逻辑时钟:

for {
    select {
        // ...
        case <-n.tickc:
            r.tick()
        // ...
    }
}

分层

上面提到,需要由用户处理 storagetransportationetcd/raft 对这两层的解耦方式不太相同,因为 storageraft 联系紧密,所以 etcd/raft 提供了 Storage 接口,由用户实现,然后在启动时设置到 Config 中,Storage 接口只涉及查询操作,何时持久化和如何持久化由用户决定:

// Storage is an interface that may be implemented by the application
// to retrieve log entries from storage.
//
// If any Storage method returns an error, the raft instance will
// become inoperable and refuse to participate in elections; the
// application is responsible for cleanup and recovery in this case.
type Storage interface {
	// InitialState returns the saved HardState and ConfState information.
	InitialState() (pb.HardState, pb.ConfState, error)
	// Entries returns a slice of log entries in the range [lo,hi).
	// MaxSize limits the total size of the log entries returned, but
	// Entries returns at least one entry if any.
	Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)
	// Term returns the term of entry i, which must be in the range
	// [FirstIndex()-1, LastIndex()]. The term of the entry before
	// FirstIndex is retained for matching purposes even though the
	// rest of that entry may not be available.
	Term(i uint64) (uint64, error)
	// LastIndex returns the index of the last entry in the log.
	LastIndex() (uint64, error)
	// FirstIndex returns the index of the first log entry that is
	// possibly available via Entries (older entries have been incorporated
	// into the latest Snapshot; if storage only contains the dummy entry the
	// first log entry is not available).
	FirstIndex() (uint64, error)
	// Snapshot returns the most recent snapshot.
	// If snapshot is temporarily unavailable, it should return ErrSnapshotTemporarilyUnavailable,
	// so raft state machine could know that Storage needs some time to prepare
	// snapshot and call Snapshot later. 正在做?
	Snapshot() (pb.Snapshot, error)
}

transportation 并没有在 etcd/raft 中处理,而是全权交给用户处理,需要发送的消息都会存放在 raft.msgs 中。Ready 结构体包含自上次处理后已就绪的需要由用户处理的信息, 包括需要发送的 msg 和需要持久化存储的信息。在 goroutine 每次循环开始会对比之前的状态与当前状态,生成 Readynode.Ready() 方法就是返回对应的 readyc。由用户对 Ready 处理,从而实现了分层:

  • 持久化 EntriesHardStateSnapshot
  • 发送 Messages 给其他节点;
  • 应用 snapshotCommittedEntries 到状态机;
  • 调用 node.Advance() 通知 raft 之前的更新已处理完,可以进行清理,同时可以处理下一轮更新。
for {
    if advancec != nil {
        readyc = nil
    } else {
        rd = newReady(r, prevSoftSt, prevHardSt)
        if rd.containsUpdates() {
            readyc = n.readyc
        } else {
            readyc = nil
        }
    }
    // ...
    select {
    // ...
    case readyc <- rd:
        // 保存当前状态,用于之后对比
        if rd.SoftState != nil {
            prevSoftSt = rd.SoftState
        }
        if len(rd.Entries) > 0 {
            prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index
            prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term
            havePrevLastUnstablei = true
        }
        if !IsEmptyHardState(rd.HardState) {
            prevHardSt = rd.HardState
        }
        if !IsEmptySnap(rd.Snapshot) {
            prevSnapi = rd.Snapshot.Metadata.Index
        }

        r.msgs = nil
        r.readStates = nil
        advancec = n.advancec

    case <-advancec:
        if prevHardSt.Commit != 0 {
            r.raftLog.appliedTo(prevHardSt.Commit)
        }
        if havePrevLastUnstablei {
            // shrink unstable entries
            r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet)
            // delete unstable snapshot
            havePrevLastUnstablei = false
        }
        r.raftLog.stableSnapTo(prevSnapi)
        advancec = nil
}

使用

基本使用方法在 README 里有,并且还有一个例子可以参考:raftexample:

  • 当收到其他节点的 message 时,调用 node.Step() 传给 raft 状态机处理;
  • 当收到客户端的请求时,调用 node.Propose()
  • 最主要的结构就是一个状态机循环:
    for {
      select {
      case <-s.Ticker:
        n.Tick() // 增加逻辑时钟
      case rd := <-s.Node.Ready(): // 处理 Ready 结构体
        saveToStorage(rd.HardState, rd.Entries, rd.Snapshot) // 持久化
        send(rd.Messages) // 发送消息给其他节点
        if !raft.IsEmptySnap(rd.Snapshot) {
          processSnapshot(rd.Snapshot)
        }
        for _, entry := range rd.CommittedEntries { // 处理 commited entries
          process(entry)
          if entry.Type == raftpb.EntryConfChange {
            var cc raftpb.ConfChange
            cc.Unmarshal(entry.Data)
            s.Node.ApplyConfChange(cc)
          }
        }
        s.Node.Advance() // 通知 raft 当前状态已处理完,可以处理下一轮状态
      case <-s.done:
        return
      }
    }
    

测试

测试一直是分布式系统的难点和重点,etcd/raft 的实现为纯粹的状态机,可以轻易的 mock 出网络故障或磁盘故障,从而确保实现的正确性。

分类:

更新时间: