Raft Basic Notes

本来打算介绍 Etcd 的 Raft,然后抽离开视线来讲讲 etcd-raft 怎么用,它怎么抽象的。但是读了一下 raftexample 发现不太现实,感觉 etcd-raft 的外部逻辑和实现是严重耦合的,所以或许我们不得不从实现开始,了解一下 etcd-raft 大概提供了什么样的抽象, 然后再介绍一下官方的 raftexample 的内容,便于我们实现简单的 raft 服务器。

这里先复习一下 Basic Raft,然后 Paper 是一回事,怎么响应这个并发的事件这些也没啥问题,然后看看 floyd 这个项目是怎么实现的。

Raft 复习

基本上来面试的大佬都会 Raft,但我其实已经忘光了,这里尝试用最快的方式建起 Raft 的基本概念。以后有更新也尽量直接在这个地方补充。

另外,这里只介绍小论文里面的基础部分,小论文没提到的我都不太想介绍。工程优化太多了,最开始入门的时候还是不要搞太复杂,那么开始吧。

Raft 使用一个 RSM 模型:

170FFACD-191C-474F-833E-06A1A36E1150

额,这图其实做了一些暗示:

  1. Log 和 State Machine 是分开来存储的
  2. 通过某种回调来写 State Machine,写完之后才能通知 client

那么进入算法部分,Raft 除了成员变更,基础算法包括:

  1. Leader Election
  2. Log Replication
  3. Safety

同时,这里有需要持久化的内容表(暂不包括 snapshot):

CABFCF9B-9C07-48C1-9718-380E659F5DD7

其实可以发现这部分非常非常精简。而 Log 包含的内容如下:

  1. Command for state machine (用户定义)
  2. Leader Term (我感觉也相当于确定了 Leader)

Raft 的 Invariant 有:

  1. 对于一个给定的 Term,至多只有一个 Leader
  2. Leader 不会 Truncate 自己的日志,它是 Append Only 的(这里涉及安全性一节)
  3. 对于任何两条日志,如果 indexterm 相同,那么它们之前的内容都相同
  4. 如果某个 termLog 被 Commit, 那么 term 比它大的所有 Leader 都应该包含它
  5. 一条已经 Apply 到状态机的日志,不会被在同一个位置提交(感觉是 trivial 的,推出 Commit 了才能 Apply?)

RPC 包含:

  1. AppendEntries: 维护心跳包、同步 Log
  2. RequestVote: 投票

Leader Election

  1. 启动的时候,集群都是 follower
  2. 没有收到 Leader 的心跳包或者 Candidate 的选举,在 electionTimeout 的时间后,发起选举

若发起选举:

  1. ++term, 状态变成 candidate,广播 requestVoteRPC
  2. 给自己投票
  3. electionTimeout 内收到 1/n 以上的选票,成为 term 的 leader

对于 follower:

  1. 对同一个 term 只能给出一个成功的 RequestVote

(个人好奇,某个 candidate 收到了 term 更高的 candidate 的 RequestVote 信息,也会变为 follower)

Log Replication

Leader 可以为 client 提供服务, 日志复制有几个位点:

  • [snapshot/0, applied, committed, current log]

Commit 的日志 Apply 到状态机是安全的,当:

  1. Leader 将 Log 复制到 1/2 以上的服务器时,就可以 Commit
  2. 在 Leader 将自己 term 的日志复制后,Leader 之前的日志 可以 Commit (这里有个问题,是说 Leader 之前的日志可以 commit,但不代表 Leader 自己的日志可以 Commit,然后这是可以 Commit)
    1. 这里细节上有个 match 操作,会匹配到相同的日志,根据之前的性质,这之前的东西都相同了。然后可以覆盖。这个日志号叫 nextIndex, 不需要持久化
    2. 大部分情况是没有问题的,只有 Leader crash 的时候,可能出现这个问题

Leader 会给每个成员提供 nextIndexmatchIndex. nextIndex 表示下一次尝试对齐/同步的起始位置,matchIndex 表示已经对齐的位置。

上面说的部分是 Leader 做日志同步和提交的部分,日志同步的时候,Leader 的 committed 水位可能比 follower 高,需要推高 followercommitted 水位。Leader Committed 的推高应该是 match 成功才能使用的(就是同步成功)。

Safety

这是最需要理解的一节。我们要保证 Commit 过的日志不会被覆盖。这一节的限制包括了 Leader Election 的限制,和选上之后如何处理之前未 Commit 的条目(其实我瞎想的是选 Commit 最大的,然后把别的都 Truncate 了,不过实际上这可能导致 Leader 最新推进的 Commit 没有提交)

选举限制:Candidate 的 RPC 包含日志信息,这里会给 current log, follower 在投票的时候,会拒绝掉 candidate.current.term < current_log.current.term || (candidate.current.term == current_log.current.term && candidate.current.index < current_log.current.index) 的日志

5.4.3 使用了反证法来证明它的正确性,任何 commit 日志必定复制到了 1/2 以上的机器,投票也要 1/2 以上的机器。新 Leader 拿到投票,必定已经包含了这条日志。

之前 term 提交的限制:此外,Leader 必须靠提交一条自己 term 的日志,来保证之前的日志提交。

Figure8 描述了这种场景,如果 Leader 单独提交之前的日志,那就会挂:
057E9A5F-8EAF-4482-B9C9-47E4BC31BC64

对于 Follower Crash, 这里会发送 AppendEntities RPC, 直到日志对齐。

一个误区

之前看到下图,感觉很 confusing,不知道 1 1 1 3 那个是咋当选的:

committed

后来发现,我原来的想法是有一点误区的。Leader 的 Commit 必须要日志过半,但能够被选为 Leader,它一定包含已 Commit 的最后日志,所以,覆盖别的日志是安全的。然后我的误区是,AppendEntities RPC 必须和 Commit 类似,要一下补齐用户丢失的所有 Log。但 AppendEntities RPC 的 Logs 实际上是可以一条一条发的。

CommitIndex 和 ApplyIndex 的持久化

本来这部分在作者 PHD 论文里面介绍的很细,本质上 commitIndex 和 applyIndex 都是可以恢复出来的。不过工程上,可以根据幂等之类的语义,持久化以优化 applyIndex.

话说我前年在逼乎好像提过这个问题:https://www.zhihu.com/question/382888510

时间和可用性

broadcastTime << electionTimeout << MTBF 以 TiKV 为例,这个 timeout 是 10S: https://asktug.com/t/tikv-raft-election-timeout-10s/999

broadcastTime 也应该比 electionTimeout 小一个数量级。这里有个问题是,这个时间是怎么决定的呢?实际上 broadcastTime 一多,本身心跳也会多,然后 TiKV 本身是 multi-raft 什么的,心跳包数量会非常多,然后这个时间短会把集群打爆,所以时间可以设置长点。论文里推荐的时间是:

Raft’s RPCs typically require the recipient to persist information to stable storage, so the broadcast time may range from 0.5ms to 20ms, depending on storage technology. As a result, the election timeout is likely to be somewhere between 10ms and 500ms.

这几个时间还是很重要的,我感觉很多是在 0-1.5 倍这个时间选出来的?

集群成员变更

Raft Group 可能一般就是 3、5个成员,但是因为机器变更导致的 membership change 会是线上低频但一直有的行为。我们首先想的是,通过一条内部的 Log,即 旧配置 -> 新配置 的 Log,来切换到新配置的状态。但有个问题是,配置从 $C{old}$ 切成 $C{new}$ 的时候,可能有下图所示的状态:

v2-45dd50e08934f1dfc42dbc83251bfe76_r

在这里,$C{old}$ 是 {1, 2, 3},$C{new}$ 是 {1, 2, 3, 4, 5},比方说 Server 3 是 Leader,同步了 membership change 的日志,然后 commit 了,1和2收到了信息,但是没有 Commit,这个时候,1、2 中的选举可以给对方投票,因为满足安全性;3、4、5 同理,结果是这里有两个 Majority 了(2/33/5)。

这个时候,Raft 会切入到一个过渡时期,叫 Joint Consensus,它需要理解两种配置:

  1. $C{new}$ 和 $C{old}$ 任何一个机器都可以成为 Leader
  2. Election 和 Commit 都需要 $C{new}$ 和 $C{old}$ 双方 的 Majority

Joint Consensus 引入了一个 $C_{old, new}$ 中间状态,如下图:

487347AA-3308-4821-B662-2F8EE3720BFB

加入 $C{old, new}$ 这样的配置变更日志之后,**即使这条变更日志没有 Commit, 集群也会采用它指定的 Conf,这表示,变更日志会在 $C{old, new}$ 达成 Majority 后提交**. 一旦 $C{old, new}$ 提交了,那么,只有采用这个配置的集群可以成为 Leader。这个时候,Leader 可以再创建一条 $C{new}$ 的日志,提交完成之后,$C{new} - C{old}$ 集合中的服务器可以关掉了。

正确性: $C{old}$ 和 $C{new}$ 如 Figure 11,会在不同的时间内生效,由 $C_{old, new}$ 提交为 Barrier。

问题:

  1. 新加入的机器,没有任何日志,可能要 InstallSnapshot 或经过长期同步?
    1. 这里引入了 Learner,不需要投票、只接收 Leader 日志的阶段。
  2. 在 Propose $C{new}$ 的时候,Leader 可能并不在 $C{new}$ 中。这个协议还是可以运行的,但是 Majority 并不包括 Leader。在 $C_{new}$ 提交之后,才能发生 Leader 转移。
  3. 采用新配置时,旧配置没有收到 Heartbeat,可能 ++term 然后变 candidate,随之会影响一些没收到 $C_{new}$ 日志的机器,把它们 term 也搞起来。这里的方案是,Leader 存在的时候,不会切 Leader 重新选举;然后 Server 在一次选举之间,如果收到了更高 term 的请求,也要等待一定时间,防止心跳把自己打炸

那么,为什么 Conf 需要直接用 new 的呢?走 Committed 在进入下一阶段的目的是在第一阶段达成两方的 Majority,第二阶段只需要新的 Majority,它们不相交。而直接用的目的是不用追踪大部分机器的 CommitIndex:

If servers adopted $C{new}$ only when they learned that $C{new}$ was committed, Raft leaders would have a difficult time knowing when a majority of the old cluster had adopted it. They would need to track which servers know of the entry’s commitment, and the servers would need to persist their commit index to disk; neither of these mechanisms is required in Raft. Instead, each server adopts $C{new}$ as soon as that entry exists in its log, and the leader knows it’s safe to allow further configuration changes as soon as the $C{new}$ entry has been committed. Unfortunately, this decision does imply that a log entry for a configuration change can be removed (if leadership changes); in this case, a server must be prepared to fall back to the previous configuration in its log.

有的地方实现会采用单步变更,但是实际上单步变更会导致一定的集群不可用问题,详见:作者的 https://github.com/ongardie/dissertation#chapter-4-cluster-membership-changes 和二手文章 https://zhuanlan.zhihu.com/p/359206808

Snapshot 和 Log Compaction

Snapshot 的职责归功于状态机,而非 Raft。它应用于已经 Applied 的数据,然后由各个机器 分别 进行,通过 InstallSnapshot 传输。(InstallSnapshot 还可以分多次发,神必)

作者认为,Log Compaction 是非 Leader 主导的,但是毕竟是已经 Applied 的数据了,所以也没那么 Care。

在收到 Snapshot 之后,可以 Truncate 后续的 Log,等 AppendEntities 把自己整活了。这个时候估摸着也可以是个 Learner 啥的,不给系统瞎 vote 添乱。

Client 交互

一般的读写操作

Raft 是一个 Leader-Based 的一致性协议,下面的形式是为了保证 Linearizable 而要求的

一般的写消息要么会让 client 走到 Leader 上,要么从节点负责转发给 Leader,而 Apply 之后能够收到消息。如果 Leader Crash 了,这些操作可能就 Timeout。而读操作可以依靠在读发出后,写一条 no-op 或者等待写操作完成,来做相关的保证(总之,收到读请求到读状态机之间,一定要有一条日志,来保证同步完成)。读也可以使用大论文描述的 Lease。

当然,Zookeeper 允许本地读,这种方式只能保证 Serializable (Zookeeper 论文提的是 A-Linearizable)。

写重试

对于写,可以给每个请求一个 Seq,重试的时候,论文提到,可以维护一个 seq,发现 FSM 里面有这个 seq,可以直接返回成功。当然这肯定没问题,但万一是没 commit 或者 commit 了的没同步,怎么办呢?这里可以:

  1. Apply 的阶段,做 Seq 的去重
  2. 维护一个集合,来区分 {Commit 但没有 Apply}{没有 Commit} 的 Seq 集合。

Some Quiz

Quiz 可以见:https://ongardie.net/static/raft/userstudy/quizzes.html

老实说我 2 错了,因为实在不知道那个为 1 1 1 3 的是怎么当选的…详细原因可以看我 Safety 那节下面的勘误。

Qihoo360/floyd

Qihoo360/floyd 是一个简单的 Raft 实现,它提供了 KV、分布式锁的功能。相对 Etcd 提供一个轻量的 Raft 模块,floyd 耦合的比较重,甚至把存储数据、Log、Raft 的 Index 等信息都丢到了一个 CF,很多 C++ 代码感觉也写的比较乱。但它差不多只有数千行,而且展示了如何实现一个 Basic Raft。这些还是比较重要的,因为本身实现一个并发的状态机就要有各种坑,你可能还要处理 RPC 的各种 corner case。所以有个简单的项目借鉴还是求之不得。

它对 Raft 论文支持如下:

功能 是否支持
领导选举/日志同步/安全性
Client
Log Compaction
Membership Change 有支持单步变更的接口,但没实现
Learner

floyd/include/floyd.h 表示了它对外的接口,简单表示如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class Floyd  {
public:
static Status Open(const Options& options, Floyd** floyd);

Floyd() { }
virtual ~Floyd();

virtual Status Write(const std::string& key, const std::string& value) = 0;
virtual Status Delete(const std::string& key) = 0;
virtual Status Read(const std::string& key, std::string* value) = 0;
virtual Status DirtyRead(const std::string& key, std::string* value) = 0;
// ttl is millisecond
virtual Status TryLock(const std::string& name, const std::string& holder, uint64_t ttl) = 0;
virtual Status UnLock(const std::string& name, const std::string& holder) = 0;

// return true if leader has been elected
virtual bool GetLeader(std::string* ip_port) = 0;
virtual bool GetLeader(std::string* ip, int* port) = 0;
virtual bool HasLeader() = 0;
virtual bool IsLeader() = 0;
virtual Status GetAllServers(std::set<std::string>* nodes) = 0;

// used for debug
virtual bool GetServerStatus(std::string* msg) = 0;

// log level can be modified
virtual void set_log_level(const int log_level) = 0;

private:
// No coping allowed
Floyd(const Floyd&);
void operator=(const Floyd&);
};

example/simple/t.cc 描述了一个简单的使用 Case,非常简单易懂:https://github.com/mapleFU/floyd-notes/blob/mwish-notes/floyd/example/simple/t.cc#L18

它内部实现如下(我图画的有点乱,以自己看得懂为准,有修改意见可以随便喷):

DDAC89FE-87C2-4E59-9657-CEC2EB3721B8

  • 部分消息的结构体定义在 proto/floyd.proto,包括 Raft 日志的结构和状态机的日志结构
  • 持久状态存储在 RocksDB 中,有下列两类
    • RaftLog: 存储论文中的 Log[]
    • RaftMeta: 存储 Term, VoteFor, commitIndex, applyIndex,后两者在 Raft Paper 中不要求持久化, 不过工程上持久化一下也没啥毛病。任何一个 Get/Set 接口, RaftMeta 本身都不会走缓存(可能有 RocksDB 的 Cache), 直接进 RocksDB.
  • 共享的状态被封装到 FloydContext 中,它持有 RaftLogRaftMeta 的状态,在各个对象间共享,Commit 推高 Commit 后,会提高 FloydContext 中的内存状态,而触发 Apply,同时,Apply 也会更新这个状态。它负责逻辑比较混杂:
    • [Persistent] 负责读取 RaftMeta,维护 commitIndexapplyIndexvoteForterm
    • [Volatile] 负责集群的 Role,一些内存 BecomeLeader BecomeFollower 的逻辑会走它。
  • FloydApply 将 Committed 的用户日志 Apply 到数据存储中
  • Peers会存放一些配置的 IP,以及仅对 Leader 有用的 nextIndexmatchIndex

在这里,系统中也有不少线程,最主要的是:

  1. floyd_primary_thread:只有一个的线程,负责定时发起 HeartBeat(AppendEntities RPC)、检查 Leader (查看是否 electionTimeout)、处理用户的 RPC
  2. floyd_peer_thread:理论上,Raft Group 每个成员,这里都会多一个 floyd_peer_thread。它负责给每个成员发送 RPC,并通过 FloydContext 来变更状态.

Leader Election

Sender

一个 FloydImpl 启动后,floyd_primary_thread 会检查自己的状态,如果 timeout 了,可能要变更为 Candidate. 注意变成 Candidate 之后,递增了 Term,然后给了自己一票。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//! 如果没有 Leader, 把自身变成 Candidate, 然后通知 Peer 线程.
//! 再触发一个 CheckLeader, 相当于 ElectionTimeout.
void FloydPrimary::LaunchCheckLeader() {
slash::MutexLock l(&context_->global_mu);
if (context_->role == Role::kFollower || context_->role == Role::kCandidate) {
if (options_.single_mode) {
// 忽略 SingleMode
...
} else if (context_->last_op_time + options_.check_leader_us < slash::NowMicros()) {
context_->BecomeCandidate();

// 吐槽: 实现上下面进行了 3次 RocksDB Batch 写, 因为每次 Set 都会写, 这是真的牛批...
raft_meta_->SetCurrentTerm(context_->current_term);
raft_meta_->SetVotedForIp(context_->voted_for_ip);
raft_meta_->SetVotedForPort(context_->voted_for_port);
NoticePeerTask(kHeartBeat);
}
}
// 在 ElectionTimeout 之后, 尝试再次调度 checkLeader
AddTask(kCheckLeader);
}

// when adding task to peer thread, we can consider that this job have been in the network
// even it is still in the peer thread's queue
void FloydPrimary::NoticePeerTask(TaskType type) {
for (auto& peer : (*peers_)) {
switch (type) {
case kHeartBeat:
peer.second->AddRequestVoteTask();
break;
case kNewCommand:
peer.second->AddAppendEntriesTask();
break;
default:
}
}
}

这里会用 RequestVote 来做 kHeartbeat(你可能会很奇怪,Heartbeat 不是 AppendEntry 吗?不过它 AppendEntry 又会走到 kNewCommand,我觉得有点乱),然后触发所有 Peer 线程的 RequestVote,触发如下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
void Peer::RequestVoteRPC() {
uint64_t last_log_term;
uint64_t last_log_index;
CmdRequest req;
// 初始化 RequestVote RPC
{
slash::MutexLock l(&context_->global_mu);
raft_log_->GetLastLogTermAndIndex(&last_log_term, &last_log_index);

req.set_type(Type::kRequestVote);
CmdRequest_RequestVote* request_vote = req.mutable_request_vote();
request_vote->set_ip(options_.local_ip);
request_vote->set_port(options_.local_port);
request_vote->set_term(context_->current_term);
request_vote->set_last_log_term(last_log_term);
request_vote->set_last_log_index(last_log_index);
}

// 同步发送请求.
CmdResponse res;
Status result = pool_->SendAndRecv(peer_addr_, req, &res);

if (!result.ok()) {
return;
}

{
slash::MutexLock l(&context_->global_mu);
// RequestVote 发生 RPC 错误
if (!result.ok()) {
return;
}
/***
* 下列为正常的 Raft 处理逻辑
*/

// 对方的 Term 比自身高, 设置状态为 Follower, 更新 Term.
if (res.request_vote_res().term() > context_->current_term) {
// RequestVote fail, maybe opposite has larger term, or opposite has
// longer log. if opposite has larger term, this node will become follower
// otherwise we will do nothing
context_->BecomeFollower(res.request_vote_res().term());
raft_meta_->SetCurrentTerm(context_->current_term);
// 实际上相当于清空 ip/port.
raft_meta_->SetVotedForIp(context_->voted_for_ip);
raft_meta_->SetVotedForPort(context_->voted_for_port);
return;
}

// 如果自身状态还是 Follower, 且对方的 Term 不比自己高, 那么要么投给自己, 要么没投.

if (context_->role == Role::kCandidate) {
// kOk means RequestVote success, opposite vote for me
if (res.request_vote_res().vote_granted() == true) { // granted
// However, we need check whether this vote is vote for old term
// we need ignore these type of vote.
//
// 可以成为 Leader 啦!
if (CheckAndVote(res.request_vote_res().term())) {
context_->BecomeLeader();
UpdatePeerInfo();
primary_->AddTask(kHeartBeat, false);
}
} else {
// Note(mwish): 对方没有投票给自己, 怎么就变成 Follower 了.
// 这个地方没有 Bug(因为 context_ 变更不会导致错误), 但是会很容易引起 bug
// 我觉得可以全部 comment 掉.
context_->BecomeFollower(res.request_vote_res().term());
raft_meta_->SetCurrentTerm(context_->current_term);
raft_meta_->SetVotedForIp(context_->voted_for_ip);
raft_meta_->SetVotedForPort(context_->voted_for_port);
}
}
}
return;
}

这里在状态变更的时候,持有了 global_mu_,然后有个「半数选票」的逻辑:

1
2
3
4
5
6
7
8
9
//! 如果非本 term, 则无效, 否则 increment vote_quorum, 然后返回本 term 是否达到 1/2.
bool Peer::CheckAndVote(uint64_t vote_term) {
// 检测 ABA 问题.
if (context_->current_term != vote_term) {
return false;
}
// 半数检查
return (++context_->vote_quorum) > (options_.members.size() / 2);
}

Receiver

此外,对应处理 RequestVote RPC 的逻辑涉及选举和安全性, 内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
//! 响应投票 RPC.
int FloydImpl::ReplyRequestVote(const CmdRequest& request, CmdResponse* response) {
slash::MutexLock l(&context_->global_mu);
bool granted = false;
CmdRequest_RequestVote request_vote = request.request_vote();

/*
* If RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower (5.1)
*
* 变成 follower, 然后设置 VoteFor.
*/
if (request_vote.term() > context_->current_term) {
context_->BecomeFollower(request_vote.term()); // 推高自己的 term.
raft_meta_->SetCurrentTerm(context_->current_term);
}
// if caller's term smaller than my term, then I will notice him
if (request_vote.term() < context_->current_term) {
LOGV(INFO_LEVEL, info_log_, "FloydImpl::ReplyRequestVote: Leader %s:%d term %lu is smaller than my %s:%d current term %lu",
request_vote.ip().c_str(), request_vote.port(), request_vote.term(), options_.local_ip.c_str(), options_.local_port,
context_->current_term);
BuildRequestVoteResponse(context_->current_term, granted, response);
return -1;
}

// 论文: 安全性, 不能给 Log 比自己旧的投票.

uint64_t my_last_log_term = 0;
uint64_t my_last_log_index = 0;
raft_log_->GetLastLogTermAndIndex(&my_last_log_term, &my_last_log_index);
// if votedfor is null or candidateId, and candidated's log is at least as up-to-date
// as receiver's log, grant vote
if ((request_vote.last_log_term() < my_last_log_term) ||
((request_vote.last_log_term() == my_last_log_term) && (request_vote.last_log_index() < my_last_log_index))) {
BuildRequestVoteResponse(context_->current_term, granted, response);
return -1;
}

// vote_for 不能给这个 term 已经投过票的做操作.
if (vote_for_.find(request_vote.term()) != vote_for_.end()
&& vote_for_[request_vote.term()] != std::make_pair(request_vote.ip(), request_vote.port())) {
return -1;
}
// 给予选票.
vote_for_[request_vote.term()] = std::make_pair(request_vote.ip(), request_vote.port());
context_->BecomeFollower(request_vote.term());
// Note(mwish): -> 难道这个地方不应该做一个 Batch Op 的吗, orz...
raft_meta_->SetCurrentTerm(context_->current_term);
raft_meta_->SetVotedForIp(context_->voted_for_ip);
raft_meta_->SetVotedForPort(context_->voted_for_port);
// Got my vote
GrantVote(request_vote.term(), request_vote.ip(), request_vote.port());
granted = true;

context_->last_op_time = slash::NowMicros();
BuildRequestVoteResponse(context_->current_term, granted, response);
return 0;
}

竞选失败

如果没有 Leader, floyd_primary_thread 会不停调用 CheckLeader,这里的条件是:

1
context_->last_op_time + options_.check_leader_us < slash::NowMicros()

这里会根据 last_op_time 来 Leader 相关的操作时间。接收成功的 RequestVote 和任何 AppendEntities 都会更新这个时间(题外话,这里不是有效 Leader 的 AppendEntities 都会更新,可能是 Leader 心跳包是 RequestVote,我觉得 tmd 好诡异)

Log Replication

外部的请求会走到 AppendEntries

1
2
3
4
5
6
7
8
9
10
11
12
13
void FloydPrimary::NoticePeerTask(TaskType type) {
for (auto& peer : (*peers_)) {
switch (type) {
case kHeartBeat:
...
break;
case kNewCommand:
peer.second->AddAppendEntriesTask();
break;
default:
}
}
}

我们回忆一下,选上 Leader 的时候会做一些初始化:

1
2
3
4
5
6
7
8
9
10
11
//! 成为了 Leader, 可以更新 Peer 的信息了.
//! Q: 这个地方不会有并发吗...
//! A: 你本机理论上只有 Leader 线程会改,相对论保证你没有并发.
//!
//! 初始化 nextIndex 为 Leader 的 index, matchIndex 为 0.
void Peer::UpdatePeerInfo() {
for (auto& pt : (*peers_)) {
pt.second->set_next_index(raft_log_->GetLastLogIndex() + 1);
pt.second->set_match_index(0);
}
}

这个函数比较长,我们会分几部分讲,并顺便介绍 Peer 。第一部分是构造和发送请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
void Peer::AppendEntriesRPC() {
uint64_t prev_log_index = 0;
uint64_t num_entries = 0;
uint64_t prev_log_term = 0;
uint64_t last_log_index = 0;
uint64_t current_term = 0;
CmdRequest req;
CmdRequest_AppendEntries* append_entries = req.mutable_append_entries();
{
slash::MutexLock l(&context_->global_mu);
prev_log_index = next_index_ - 1;
last_log_index = raft_log_->GetLastLogIndex();
/*
* LOGV(INFO_LEVEL, info_log_, "Peer::AppendEntriesRPC: next_index_ %d last_log_index %d peer_last_op_time %lu nowmicros %lu",
* next_index_.load(), last_log_index, peer_last_op_time, slash::NowMicros());
*/
if (next_index_ > last_log_index && peer_last_op_time + options_.heartbeat_us > slash::NowMicros()) {
return;
}
peer_last_op_time = slash::NowMicros();

if (prev_log_index != 0) {
Entry entry;
if (raft_log_->GetEntry(prev_log_index, &entry) != 0) {
} else {
prev_log_term = entry.term();
}
}
current_term = context_->current_term;

// 下面这些和论文一样.

req.set_type(Type::kAppendEntries);
append_entries->set_ip(options_.local_ip);
append_entries->set_port(options_.local_port);
append_entries->set_term(current_term);
append_entries->set_prev_log_index(prev_log_index);
append_entries->set_prev_log_term(prev_log_term);
append_entries->set_leader_commit(context_->commit_index);
}

// 获取 [next_index_, last_log_index] 之间的 LogEntry.
// 发送受到 `Options` 的限制, 最多发送 append_entries_count_once 个
// 或者 `append_entries_size_once` bytes.
Entry *tmp_entry = new Entry();
for (uint64_t index = next_index_; index <= last_log_index; index++) {
if (raft_log_->GetEntry(index, tmp_entry) == 0) {
// TODO(ba0tiao) how to avoid memory copy here
Entry *entry = append_entries->add_entries();
*entry = *tmp_entry;
} else {
break;
}

num_entries++;
if (num_entries >= options_.append_entries_count_once
|| (uint64_t)append_entries->ByteSize() >= options_.append_entries_size_once) {
break;
}
}
delete tmp_entry;

CmdResponse res;
Status result = pool_->SendAndRecv(peer_addr_, req, &res);

// ...
}

上面的主要内容是 Raft 怎么构造一个 AppendEntry 请求的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
  {
slash::MutexLock l(&context_->global_mu);
if (!result.ok()) {
return;
}

// here we may get a larger term, and transfer to follower
// so we need to judge the role here
if (context_->role == Role::kLeader) {
/*
* receiver has higer term than myself, so turn from candidate to follower
*/
if (res.append_entries_res().term() > context_->current_term) {
context_->BecomeFollower(res.append_entries_res().term());
raft_meta_->SetCurrentTerm(context_->current_term);
raft_meta_->SetVotedForIp(context_->voted_for_ip);
raft_meta_->SetVotedForPort(context_->voted_for_port);
} else if (res.append_entries_res().success() == true) {
if (num_entries > 0) {
match_index_ = prev_log_index + num_entries;
// only log entries from the leader's current term are committed
// by counting replicas
if (append_entries->entries(num_entries - 1).term() == context_->current_term) {
AdvanceLeaderCommitIndex();
apply_->ScheduleApply();
}
next_index_ = prev_log_index + num_entries + 1;
}
} else {
uint64_t adjust_index = std::min(res.append_entries_res().last_log_index() + 1,
next_index_ - 1);
if (adjust_index > 0) {
// Prev log don't match, so we retry with more prev one according to
// response
next_index_ = adjust_index;
AddAppendEntriesTask();
}
}
}
}
return;
}

Receiver

Receiver 的逻辑特别清晰:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
int FloydImpl::ReplyAppendEntries(const CmdRequest& request, CmdResponse* response) {
bool success = false;
CmdRequest_AppendEntries append_entries = request.append_entries();
slash::MutexLock l(&context_->global_mu);
// update last_op_time to avoid another leader election
context_->last_op_time = slash::NowMicros();
// Ignore stale term
// if the append entries leader's term is smaller than my current term, then the caller must an older leader
if (append_entries.term() < context_->current_term) {

BuildAppendEntriesResponse(success, context_->current_term, raft_log_->GetLastLogIndex(), response);
return -1;
} else if ((append_entries.term() > context_->current_term)
|| (append_entries.term() == context_->current_term &&
(context_->role == kCandidate || (context_->role == kFollower && context_->leader_ip == "")))) {

context_->BecomeFollower(append_entries.term(),
append_entries.ip(), append_entries.port());
raft_meta_->SetCurrentTerm(context_->current_term);
raft_meta_->SetVotedForIp(context_->voted_for_ip);
raft_meta_->SetVotedForPort(context_->voted_for_port);
}
...
}

先过滤掉参数,再来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
{
if (append_entries.prev_log_index() > raft_log_->GetLastLogIndex()) {
BuildAppendEntriesResponse(success, context_->current_term, raft_log_->GetLastLogIndex(), response);
return -1;
}

// Append entry
if (append_entries.prev_log_index() < raft_log_->GetLastLogIndex()) {
raft_log_->TruncateSuffix(append_entries.prev_log_index() + 1);
}

// we compare peer's prev index and term with my last log index and term
uint64_t my_last_log_term = 0;
Entry entry;
if (append_entries.prev_log_index() == 0) {
my_last_log_term = 0;
} else if (raft_log_->GetEntry(append_entries.prev_log_index(), &entry) == 0) {
my_last_log_term = entry.term();
} else {
return -1;
}

if (append_entries.prev_log_term() != my_last_log_term) {
BuildAppendEntriesResponse(success, context_->current_term, raft_log_->GetLastLogIndex(), response);
return -1;
}

std::vector<const Entry*> entries;
for (int i = 0; i < append_entries.entries().size(); i++) {
entries.push_back(&append_entries.entries(i));
}
if (append_entries.entries().size() > 0) {
if (raft_log_->Append(entries) <= 0) {
BuildAppendEntriesResponse(success, context_->current_term, raft_log_->GetLastLogIndex(), response);
return -1;
}
}
if (append_entries.leader_commit() != context_->commit_index) {
AdvanceFollowerCommitIndex(append_entries.leader_commit());
apply_->ScheduleApply();
}
success = true;
// only when follower successfully do appendentries, we will update commit index
BuildAppendEntriesResponse(success, context_->current_term, raft_log_->GetLastLogIndex(), response);
return 0;
}

其实就很简单。

Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Status FloydImpl::ExecuteCommand(const CmdRequest& request,
CmdResponse *response) {
// Append entry local
std::vector<const Entry*> entries;
Entry entry;
BuildLogEntry(request, context_->current_term, &entry);
entries.push_back(&entry);
response->set_type(request.type());
response->set_code(StatusCode::kError);

uint64_t last_log_index = raft_log_->Append(entries);
if (last_log_index <= 0) {
return Status::IOError("Append Entry failed");
}

// Notify primary then wait for apply
primary_->AddTask(kNewCommand);


{
slash::MutexLock l(&context_->apply_mu);
while (context_->last_applied < last_log_index) {
if (!context_->apply_cond.TimedWait(1000)) {
return Status::Timeout("FloydImpl::ExecuteCommand Timeout");
}
}
}

在这里,不管是 Get 还是 Set,都会起一个 Command, 然后走状态机。这里不会根据 Seq 来重试,没了就是没了,没那么多花头。

Recall: 线程模型

这里需要注意到,floyd_primary_thread线程会单线程的维护 kHeartbeatkNewCommand,这些水位可能会由 Master 和 Peer 变更。Peer 的任务是单线程的,这是说,对单个 Peer,不会有两个并发的写操作。