使用Consul做leader选举的方案
在分布式集群部署模式下,为了维护数据一致性,通常需要选举出一个leader来进行协调,并且在leader挂掉后能从集群中选举出一个新的leader。选举leader的方案有很多种,对Paxos和Raft协议有过了解的同学应该对leader选举有一些认识,一般都是按照少数服从多数的原则来实现,但是因为分布式环境中无法避免的网络不稳定、数据不同步、时间偏差等问题,要想搞好leader选举并不是一件特别容易的事。这篇文章将提供一个使用Consul做leader选举的简单方案。
原理
Consul 的leader选举只有两步:
1、Create Session:参与选举的应用分别去创建Session,Session的存活状态关联到健康检查。
2、Acquire KV:多个应用带着创建好的Session去锁定同一个KV,只能有一个应用锁定住,锁定成功的应用就是leader。
如上图所示,这里假设App2用Session锁定住了KV,其实就是KV的Session属性设置为了Session2。
什么时候会触发重新选举呢?
- Session失效:Session被删除、Session关联的健康检查失败、Session TTL过期等。
- KV被删除:这个没什么需要特别说明的。
那应用们怎么感知这些情况呢?
应用在选举结束后,应该保持一个到KV的阻塞查询,这个查询会在超时或者KV发生变化的时候返回结果,这时候应用可以根据返回结果判断是否发起新的选举。
示例
这里给出一个Java的例子:这是一个控制台程序,程序会创建一个Session,然后尝试使用这个Session锁定key为“program/leader”的Consul KV,同时也会尝试设置KV的值为当前节点Id“007”。不管捕获成功还是失败,程序随后都会启动一个针对“program/leader”的阻塞查询,在阻塞查询返回时会判断KV是否存在或者绑定的Session是否存在,如果有任何一个不存在,则发起选举,否则继续阻塞查询。这个“阻塞查询->选举”的操作是一个无限循环操作。
package cn.bossma; import com.ecwid.consul.v1.ConsulClient; import com.ecwid.consul.v1.QueryParams; import com.ecwid.consul.v1.kv.model.GetValue; import com.ecwid.consul.v1.kv.model.PutParams; import com.ecwid.consul.v1.session.model.NewSession; import com.ecwid.consul.v1.session.model.Session; import org.apache.commons.lang3.StringUtils; /** * consul leader 选举演示程序 * * @author: bossma.cn */public class Main { private static ConsulClient client = new ConsulClient(); private static String sesssionId = ""; private static String nodeId = "007"; private static String electName = "program/leader"; /** * @param args */ public static void main(String[] args) { System.out.println("starting"); watch(); } /** * 监控选举 * * @param: * @return: * @author: bossma.cn */ private static void watch() { System.out.println("start first leader election"); // 上来就先选举一次,看看结果 ElectResponse electResponse = elect(); System.out.printf("elect result: %s, current manager: %s" + System.getProperty("line.separator"), electResponse.getElectResult(), electResponse.getLeaderId()); long waitIndex = electResponse.modifyIndex++; int waitTime = 30; do { try { System.out.println("start leader watch query"); // 阻塞查询 GetValue kv = getKVValue(electName, waitTime, waitIndex); // kv被删除或者kv绑定的session不存在 if (null == kv || StringUtils.isEmpty(kv.getSession())) { System.out.println("leader missing, start election right away"); electResponse = elect(); waitIndex = electResponse.modifyIndex++; System.out.printf("elect result: %s, current manager: %s" + System.getProperty("line.separator"), electResponse.getElectResult(), electResponse.getLeaderId()); } else {