最近私人的事情比较多,没有抽出时间来整理博客。书接上文,上一篇里总结了Redis故障迁移的几个关键点,以及Redis中故障检测的实现。本篇主要介绍集群检测到某主节点下线后,是如何选举新的主节点的。注意到Redis集群是无中心的,那么使用分布式一致性的算法来使集群中各节点能对在新主节点的选举上达成共识就是一个比较可行的方案。
在工程上,Raft一致性算法是比较易于实现和理解的分布式一致性算法;Redis也是使用了Raft来做主节点选举的。所以这里先简单介绍下Raft的原理:
一、Raft算法
Raft为解决一致性问题, 引入了Leader中心。简单来说就是通过选举出一个单Leader,让Leader来接收来自客户端的所有数据,Leader会将数据分发给所有节点,当Leader判断出数据已经分发到系统中大半节点以后,即认为该数据已经可以在系统中持久化存储,并通知客户端该数据提交成功,后续系统中数据的一致性可以让分布式系统内部通过数据同步的方式实现。(注意到这里和Redis去中心化的思想不符,所以Redis只是利用了其中选举的部分)
算法主要部分如下:
1. Leader选举
2. 客户数据一致化处理(即论文中的log repliecation部分)
Leader选举
Raft给系统中的节点分配了三种状态,分别是Follower, Candidates, Leader;选举过程即是在分布式式系统中选出Leader状态节点的过程。三种状态的转换关系如下所示:
当系统中没有Leader时,所有节点初始状态均为Follewer,每个节点在经历time_out时间后,会自动转变为Candidate,并尝试发起投票以便成为新的Leader;为了保证系统中只存在一个Leader,当选新Leader的条件是Candidate收到了超过半数节点以上的投票(每个节点在每轮投票中只能投给唯一的节点,通常是投个第一个发来邀票请求的节点),达到该条件后,Candidate即变为Leader。注意到投票是有轮次的,只有收到当前轮次的投票才是有效票。在状态机中,用term来表示投票的轮次。
根据上面介绍的流程,容易注意到为实现Leader的选举,有几个前提:1. 每个节点都需要知道系统中到底有哪些节点存在(为了能向每个节点邀票,同时要知道节点总数才能判断最终是否收到了多数的投票); 2. 所有节点应该有统一的初始化term,且后续需要不断的同步term
第一点前提,只需要初始化阶段给所有节点置统一的term即可;而第二点前提则要求各节点主动拥抱新term,无情抛弃老term;具体来说就是每个节点收到旧term的消息,可以不处理消息请求,并把自身的较高的term返回;而每个节点收到新的term之后,则要积极响应,并在响应结束之后更新自己的term。
有一种可能,就是在选举轮次中所有的节点都没有收到多数节点的投票认可,那么就需要发起新一轮的投票;为了减少新一轮投票依旧无法选出Leader的概率,每个Candidate节点的time_out时间是随机的,这样通常会有一个最先发出请求的节点占得先机,能收获大多数的选票。
客户数据一致化处理
由于Redis中并没有用到Raft算法的一致化数据处理,这里不过多描述(该部分内容比较复杂,需要考虑的异常场景较多);详细的介绍可以看这篇博文,https://www.cnblogs.com/mindwind/p/5231986.html;个人觉得这篇博文很直观的介绍了分布式系统下在各种异常情况下,数据的一致性是如何保证的。
二、Redis选举新主节点的实现
Redis选举新的主节点,有一个很重要的概念需要先理解下,就是epoch——纪元,类似于Raft中term的意义。Redis的数据结构中记录了两个epoch,分别是currentepoch和configepoch;这两个纪元的作用是不同的。
currentepoch: 这个变量是记录整个集群投票轮次的,初始时,集群中所有节点的currentepoch都是0;后面虽然没个发起投票的节点都会自增该值,但也会同时将该值作为投票轮次信息发给其他节点,整个集群最终还是会具有相同的currentepoch值;
configepoch: 这个变量是记录节点自身的特征值,主要用于判断槽的归属节点是否需要更新;考虑如下的场景,A节点负责槽1,2,3且configepoch=n,A节点掉线后从属节点B接替了A的工作成为新的主节点,那么B此时就负责1,2,3槽,B的configepoch=n1(n1一定大于n,因为每当有从节点通过故障迁移接替主节点工作时,该从节点的configepoch就会变更为整个集群中最大的configepoch);当A节点恢复工作后,还不知道自己已经被替代了,还向其他节点宣称自己是1,2,3槽的负责节点。其他节点已将1,2,3槽的负责节点改为B了,当其他节点收到A恢复之后的心跳包,会比较1,2,3槽所属节点B的configepoch(=n1)与A的configepoch(=n)哪个更大,发现n1更大就不会变更自己的记录,反过来还要通知A它所负责的槽已经被接管了。
介绍下Redis选举主节点的流程:
1. 从节点检测到自己从属的主节点下线,开始发起一次选举;发起方式是向所有主节点广播发送一条投票请求,希望其他主节点同意自己为新的主节点;(带上自己记录的currentepoch,即Raft算法中的term),源码中如下:
复制代码
1 /*
2 从节点的故障转移,是在函数clusterHandleSlaveFailover中处理的,该函数在集群定时器函数clusterCron中调用。本函数
3 用于处理从节点进行故障转移的整个流程,包括:判断是否可以发起选举;判断选举是否超时;判断自己是否拉
4 到了足够的选票;使自己升级为新的主节点这些所有流程。
5 */
6 //slave调用
7 void clusterHandleSlaveFailover(void) { //clusterBeforeSleep对CLUSTER_TODO_HANDLE_FAILOVER状态的处理,或者clusterCron中实时处理
8 //也就是当前从节点与主节点已经断链了多长时间,从通过ping pong超时,检测到本slave的master掉线了,从这时候开始算
9 mstime_t data_age;
10 //该变量表示距离发起故障转移流程,已经过去了多少时间;
11 mstime_t auth_age = mstime() - server.cluster->failover_auth_time;
12 //该变量表示当前从节点必须至少获得多少选票,才能成为新的主节点
13 int needed_quorum = (server.cluster->size / 2) + 1;
14 //表示是否是管理员手动触发的故障转移流程;
15 int manual_failover = server.cluster->mf_end != 0 &&
16 server.cluster->mf_can_start; //说明向从发送了cluster failover force要求该从进行强制故障转移
17 int j;
18 //该变量表示故障转移流程(发起投票,等待回应)的超时时间,超过该时间后还没有获得足够的选票,则表示本次故障转移失败;
19 mstime_t auth_timeout,
20 //该变量表示判断是否可以开始下一次故障转移流程的时间,只有距离上一次发起故障转移时,已经超过auth_retry_time之后,
21 //才表示可以开始下一次故障转移了(auth_age > auth_retry_time);
22 auth_retry_time;
23
24 server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_FAILOVER;
25
26 /* Compute the failover timeout (the max time we have to send votes
27 * and wait for replies), and the failover retry time (the time to wait
28 * before waiting again.
29 *
30 * Timeout is MIN(NODE_TIMEOUT*2,2000) milliseconds.
31 * Retry is two times the Timeout.
32 */
33 auth_timeout = server.cluster_node_timeout*2;
34 if (auth_timeout < 2000) auth_timeout = 2000;
35 auth_retry_time = auth_timeout*2;
36
37 /* Pre conditions to run the function, that must be met both in case
38 * of an automatic or manual failover:
39 * 1) We are a slave.
40 * 2) Our master is flagged as FAIL, or this is a manual failover.
41 * 3) It is serving slots. */
42 /*
43 当前节点是主节点;当前节点是从节点但是没有主节点;当前节点的主节点不处于下线状态并且不是手动强制进行故障转移;
44 当前节点的主节点没有负责的槽位。满足以上任一条件,则不能进行故障转移,直接返回即可;
45 */
46 if (nodeIsMaster(myself) ||
47 myself->slaveof == NULL ||
48 (!nodeFailed(myself->slaveof) && !manual_failover) ||
49 myself->slaveof->numslots == 0) {
50 //真正把slaveof置为NULL在后面真正备选举为主的时候设置,见后面的replicationUnsetMaster
51 /* There are no reasons to failover, so we set the reason why we
52 * are returning without failing over to NONE. */
53 server.cluster->cant_failover_reason = REDIS_CLUSTER_CANT_FAILOVER_NONE;
54 return;
55 };
56
57 //slave从节点进行后续处理,并且和主服务器断开了连接
58
59 /* Set data_age to the number of seconds we are disconnected from
60 * the master. */
61 //将data_age设置为从节点与主节点的断开秒数
62 if (server.repl_state == REDIS_REPL_CONNECTED) { //如果主从之间是因为网络不通引起的,read判断不出epoll err事件,则状态为这个
63 data_age = (mstime_t)(server.unixtime - server.master->lastinteraction)
64 * 1000; //也就是当前从节点与主节点最后一次通信过了多久了
65 } else {
66 //这里一般都是直接kill主master进程,从epoll err感知到了,会在replicationHandleMasterDisconnection把状态置为REDIS_REPL_CONNECT
67 //本从节点和主节点断开了多久,
68 data_age = (mstime_t)(server.unixtime - server.repl_down_since) * 1000;
69 }
70
71 /* Remove the node timeout from the data age as it is fine that we are
72 * disconnected from our master at least for the time it was down to be
73 * flagged as FAIL, that's the baseline. */
74 // node timeout 的时间不计入断线时间之内 如果data_age大于server.cluster_node_timeout,则从data_age中
75 //减去server.cluster_node_timeout,因为经过server.cluster_node_timeout时间没有收到主节点的PING回复,才会将其标记为PFAIL
76 if (data_age > server.cluster_node_timeout)
77 data_age -= server.cluster_node_timeout; //从通过ping pong超时,检测到本slave的master掉线了,从这时候开始算
78
79 /* Check if our data is recent enough. For now we just use a fixed
80 * constant of ten times the node timeout since the cluster should
81 * react much faster to a master down.
82 *
83 * Check bypassed for manual failovers. */
84 // 检查这个从节点的数据是否较新:
85 // 目前的检测办法是断线时间不能超过 node timeout 的十倍
86 /* data_age主要用于判断当前从节点的数据新鲜度;如果data_age超过了一定时间,表示当前从节点的数据已经太老了,
87 不能替换掉下线主节点,因此在不是手动强制故障转移的情况下,直接返回;*/
88 if (data_age >
89 ((mstime_t)server.repl_ping_slave_period * 1000) +
90 (server.cluster_node_timeout * REDIS_CLUSTER_SLAVE_VALIDITY_MULT))
91 {
92 if (!manual_failover) {
93 clusterLogCantFailover(REDIS_CLUSTER_CANT_FAILOVER_DATA_AGE);
94 return;
95 }
96 }
97
98 /* If the previous failover attempt timedout and the retry time has
99 * elapsed, we can setup a new one. */
100
101 /*
102 例如集群有7个master,其中redis1下面有2个slave,突然redis1掉了,则slave1和slave2竞争要求其他6个master进行投票,如果这6个
103 master投票给slave1和slave2的票数都是3,也就是3个master投给了slave1,另外3个master投给了slave2,那么两个slave都得不到超过一半
104 的票数,则只有靠这里的超时来进行重新投票了。不过一半这种情况很少发生,因为发起投票的时间是随机的,因此一半一个slave的投票报文auth req会比
105 另一个slave的投票报文先发出。越先发出越容易得到投票
106 */
107 /*
108 如果auth_age大于auth_retry_time,表示可以开始进行下一次故障转移了。如果之前没有进行过故障转移,则auth_age等
109 于mstime,肯定大于auth_retry_time;如果之前进行过故障转移,则只有距离上一次发起故障转移时,已经超过
110 auth_retry_time之后,才表示可以开始下一次故障转移。
111 */
112 if (auth_age > auth_retry_time) {
113 //每次超时从新发送auth req要求其他主master投票,都会先走这个if,然后下次调用该函数才会走if后面的流程
114 server.cluster->failover_auth_time = mstime() +
115 500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */
116 random() % 500; /* Random delay between 0 and 500 milliseconds. */ //等到这个时间到才进行故障转移
117 server.cluster->failover_auth_count = 0;
118 server.cluster->failover_auth_sent = 0;
119 server.cluster->failover_auth_rank = clusterGetSlaveRank();//本节点按照在master中的repl_offset来获取排名
120 /* We add another delay that is proportional to the slave rank.
121 * Specifically 1 second * rank. This way slaves that have a probably
122 * less updated replication offset, are penalized. */
123 server.cluster->failover_auth_time +=
124 server.cluster->failover_auth_rank * 1000;
125
126 /* However if this is a manual failover, no delay is needed. */
127
128 /*
129 注意如果是管理员发起的手动强制执行故障转移,则设置server.cluster->failover_auth_time为当前时间,表示会
130 立即开始故障转移流程;最后,调用clusterBroadcastPong,向该下线主节点的所有从节点发送PONG包,包头部分带
131 有当前从节点的复制数据量,因此其他从节点收到之后,可以更新自己的排名;最后直接返回;
132 */
133 if (server.cluster->mf_end) {
134 server.cluster->failover_auth_time = mstime();
135 server.cluster->failover_auth_rank = 0;
136 }
137 redisLog(REDIS_WARNING,
138 "Start of election delayed for %lld milliseconds "
139 "(rank #%d, offset %lld).",
140 server.cluster->failover_auth_time - mstime(),
141 server.cluster->failover_auth_rank,
142 replicationGetSlaveOffset());
143 /* Now that we have a scheduled election, broadcast our offset
144 * to all the other slaves so that they'll updated their offsets
145 * if our offset is better. */
146 /*
147 调用clusterBroadcastPong,向该下线主节点的所有从节点发送PONG包,包头部分带
148 有当前从节点的复制数据量,因此其他从节点收到之后,可以更新自己的排名;最后直接返回;
149 */
150 clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES);
151 return;
152 }
153
154 /* 进行故障转移 */
155
156 /* It is possible that we received more updated offsets from other
157 * slaves for the same master since we computed our election delay.
158 * Update the delay if our rank changed.
159 *
160 * Not performed if this is a manual failover. */
161 /*
162 如果还没有开始故障转移,则调用clusterGetSlaveRank,取得当前从节点的最新排名。因为在开始故障转移之前,
163 可能会收到其他从节点发来的心跳包,因而可以根据心跳包中的复制偏移量更新本节点的排名,获得新排名newrank,
164 如果newrank比之前的排名靠后,则需要增加故障转移开始时间的延迟,然后将newrank记录到server.cluster->failover_auth_rank中;
165 */
166 if (server.cluster->failover_auth_sent == 0 &&
167 server.cluster->mf_end == 0) //还没有进行过故障庄毅
168 {
169 int newrank = clusterGetSlaveRank();
170 if (newrank > server.cluster->failover_auth_rank) {
171 long long added_delay =
172 (newrank - server.cluster->failover_auth_rank) * 1000;
173 server.cluster->failover_auth_time += added_delay;
174 server.cluster->failover_auth_rank = newrank;
175 redisLog(REDIS_WARNING,
176 "Slave rank updated to #%d, added %lld milliseconds of delay.",
177 newrank, added_delay);
178 }
179 }
180
181 /* Return ASAP if we can't still start the election. */
182 // 如果执行故障转移的时间未到,先返回
183 if (mstime() < server.cluster->failover_auth_time) {
184 clusterLogCantFailover(REDIS_CLUSTER_CANT_FAILOVER_WAITING_DELAY);
185 return;
186 }
187
188 /* Return ASAP if the election is too old to be valid. */
189 // 如果距离应该执行故障转移的时间已经过了很久
190 // 那么不应该再执行故障转移了(因为可能已经没有需要了)
191 // 直接返回
192 if (auth_age > auth_timeout) {// 如果auth_age大于auth_timeout,说明之前的故障转移超时了,因此直接返回;
193 clusterLogCantFailover(REDIS_CLUSTER_CANT_FAILOVER_EXPIRED);
194 return;
195 }
196
197
198 /* Ask for votes if needed. */
199 // 向其他节点发送故障转移请求
200 if (server.cluster->failover_auth_sent == 0) {
201
202 // 增加配置纪元
203 server.cluster->currentEpoch++;
204
205 // 记录发起故障转移的配置纪元
206 server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
207
208 redisLog(REDIS_WARNING,"Starting a failover election for epoch %llu.",
209 (unsigned long long) server.cluster->currentEpoch);
210
211 //向其他所有节点发送信息,看它们是否支持由本节点来对下线主节点进行故障转移
212 clusterRequestFailoverAuth();
213
214 // 打开标识,表示已发送信息
215 server.cluster->failover_auth_sent = 1;
216
217 // TODO:
218 // 在进入下个事件循环之前,执行:
219 // 1)保存配置文件
220 // 2)更新节点状态
221 // 3)同步配置
222 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
223 CLUSTER_TODO_UPDATE_STATE|
224 CLUSTER_TODO_FSYNC_CONFIG);
225 return; /* Wait for replies. */
226 }
227
228 /* Check if we reached the quorum. */
229 // 如果当前节点获得了足够多的投票,那么对下线主节点进行故障转移
230 if (server.cluster->failover_auth_count >= needed_quorum) {
231 // 旧主节点
232 clusterNode *oldmaster = myself->slaveof; //在后面clusterSetNodeAsMaster中把slaveof置为NULL
233
234 redisLog(REDIS_WARNING,
235 "Failover election won: I'm the new master.");
236 redisLog(REDIS_WARNING,
237 "configEpoch set to %llu after successful failover",
238 (unsigned long long) myself->configEpoch);
239
240 /* We have the quorum, perform all the steps to correctly promote
241 * this slave to a master.
242 *
243 * 1) Turn this node into a master.
244 * 将当前节点的身份由从节点改为主节点
245 */
246 clusterSetNodeAsMaster(myself);
247 // 让从节点取消复制,成为新的主节点
248 replicationUnsetMaster();
249
250 /* 2) Claim all the slots assigned to our mas