在分布式集群部署模式下,为了维护数据一致性,通常需要选举出一个leader来进行协调,并且在leader挂掉后能从集群中选举出一个新的leader。选举leader的方案有很多种,对Paxos和Raft协议有过了解的同学应该对leader选举有一些认识,一般都是按照少数服从多数的原则来实现,但是因为分布式环境中无法避免的网络不稳定、数据不同步、时间偏差等问题,要想搞好leader选举并不是一件特别容易的事。这篇文章将提供一个使用Consul做leader选举的简单方案。

原理

Consul 的leader选举只有两步:

1、Create Session:参与选举的应用分别去创建Session,Session的存活状态关联到健康检查。

2、Acquire KV:多个应用带着创建好的Session去锁定同一个KV,只能有一个应用锁定住,锁定成功的应用就是leader。

如上图所示,这里假设App2用Session锁定住了KV,其实就是KV的Session属性设置为了Session2。

什么时候会触发重新选举呢?

  • Session失效:Session被删除、Session关联的健康检查失败、Session TTL过期等。
  • KV被删除:这个没什么需要特别说明的。

那应用们怎么感知这些情况呢?

应用在选举结束后,应该保持一个到KV的阻塞查询,这个查询会在超时或者KV发生变化的时候返回结果,这时候应用可以根据返回结果判断是否发起新的选举。

示例

这里给出一个Java的例子:这是一个控制台程序,程序会创建一个Session,然后尝试使用这个Session锁定key为“program/leader”的Consul KV,同时也会尝试设置KV的值为当前节点Id“007”。不管捕获成功还是失败,程序随后都会启动一个针对“program/leader”的阻塞查询,在阻塞查询返回时会判断KV是否存在或者绑定的Session是否存在,如果有任何一个不存在,则发起选举,否则继续阻塞查询。这个“阻塞查询->选举”的操作是一个无限循环操作。

复制代码
package cn.bossma;  import com.ecwid.consul.v1.ConsulClient; import com.ecwid.consul.v1.QueryParams; import com.ecwid.consul.v1.kv.model.GetValue; import com.ecwid.consul.v1.kv.model.PutParams; import com.ecwid.consul.v1.session.model.NewSession; import com.ecwid.consul.v1.session.model.Session; import org.apache.commons.lang3.StringUtils;  /**  * consul leader 选举演示程序  *  * @author: bossma.cn  */public class Main {      private static ConsulClient client = new ConsulClient();     private static String sesssionId = "";     private static String nodeId = "007";     private static String electName = "program/leader";      /**      * @param args      */    public static void main(String[] args) {         System.out.println("starting");         watch();     }      /**      * 监控选举      *      * @param:      * @return:      * @author: bossma.cn      */    private static void watch() {          System.out.println("start first leader election");          // 上来就先选举一次,看看结果        ElectResponse electResponse = elect();         System.out.printf("elect result: %s, current manager: %s" + System.getProperty("line.separator"), electResponse.getElectResult(), electResponse.getLeaderId());          long waitIndex = electResponse.modifyIndex++;         int waitTime = 30;          do {             try {                 System.out.println("start leader watch query");                  // 阻塞查询                GetValue kv = getKVValue(electName, waitTime, waitIndex);                  // kv被删除或者kv绑定的session不存在                if (null == kv || StringUtils.isEmpty(kv.getSession())) {                     System.out.println("leader missing, start election right away");                     electResponse = elect();                     waitIndex = electResponse.modifyIndex++;                     System.out.printf("elect result: %s, current manager: %s" + System.getProperty("line.separator"), electResponse.getElectResult(), electResponse.getLeaderId());                 } else {