dubbo源码解析五 --- 集群容错架构设计与原理分析
                        
                     
                    
                    
                        欢迎来我的 Star Followers 后期后继续更新Dubbo别的文章
Dubbo 源码分析系列之一环境搭建 博客园
Dubbo 入门之二 ——- 项目结构解析 博客园
Dubbo 源码分析系列之三 —— 架构原理 博客园
Dubbo 源码解析四 —— 负载均衡LoadBalance 博客园
下面是个人博客地址,页面比博客园美观一些其他都是一样的
Dubbo 源码分析系列之一环境搭建" Dubbo 源码分析系列之一环境搭建 个人博客地址"
Dubbo 入门之二 ——- 项目结构解析"Dubbo 项目结构解析 个人博客地址"
Dubbo 源码分析系列之三 —— 架构原理" Dubbo 源码分析系列之三---架构原理 个人博客地址"
Dubbo 源码解析四 —— 负载均衡LoadBalance" dubbo源码解析四 --- 负载均衡LoadBalance 个人博客地址"
Dubbo 源码解析五 —— 集群容错" dubbo源码解析五 --- 集群容错架构设计与原理分析 个人博客地址"
目录
面试中集群容错的经常的问题
Dubbo 官方文档关于集群容错的介绍
Dubbo集群容错的架构分析
Dubbo集群容错源码解析
面试中集群容错的经常的问题
什么是集群容错
Dubbo的集群容错知道吗
Dubbo 集群容错是如何配置的
集群容错如何实现的
Dubbo 集群容错介绍下
介绍下 几种集群容错方式,分析下其优缺点
你来设计一个容错算法,你会怎样的设计
Dubbo 官方文档关于集群容错的介绍
在集群调用失败时,Dubbo 提供了多种容错方案,缺省为 failover 重试。
cluster
各节点关系:
这里的 Invoker 是 Provider 的一个可调用 Service 的抽象,Invoker 封装了 Provider 地址及 Service 接口信息
Directory 代表多个 Invoker,可以把它看成 List ,但与 List 不同的是,它的值可能是动态变化的,比如注册中心推送变更
Cluster 将 Directory 中的多个 Invoker 伪装成一个 Invoker,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个
Router 负责从多个 Invoker 中按路由规则选出子集,比如读写分离,应用隔离等
LoadBalance 负责从多个 Invoker 中选出具体的一个用于本次调用,选的过程包含了负载均衡算法,调用失败后,需要重选
集群容错模式
可以自行扩展集群容错策略,参见:集群扩展
Failover Cluster
失败自动切换,当出现失败,重试其它服务器 [1]。通常用于读操作,但重试会带来更长延迟。可通过 retries="2" 来设置重试次数(不含第一次)。
重试次数配置如下:
或
或
    
Failfast Cluster
快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。
Failsafe Cluster
失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。
Failback Cluster
失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。
Forking Cluster
并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks="2" 来设置最大并行数。
Broadcast Cluster
广播调用所有提供者,逐个调用,任意一台报错则报错 [2]。通常用于通知所有提供者更新缓存或日志等本地资源信息。
集群模式配置
按照以下示例在服务提供方和消费方配置集群模式
或
Dubbo集群容错的架构分析
通过官网上这张图我们能大致的了解到一个请求过来,在集群中的调用过程。那么我们就根据这个调用过程来进行分析吧。
图片
整个在调用的过程中 这三个关键词接下来会贯穿全文,他们就是Directory,Router,LoadBalance
我们只要牢牢的抓住这几个关键字就能贯穿整个调用链
先看下时序图,来看下调用的过程
时序图
最初我们一个方法调用
我们使用的是官方的dubbo-demo的dubbo-demo-consumer
public static void main(String[] args) {
    DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy
    String hello = demoService.sayHello("world"); // call remote method
    System.out.println(hello); // get result
   
}
调用 InvokerInvocationHandler#invoker方法代理类的调用
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    String methodName = method.getName();
    Class>[] parameterTypes = method.getParameterTypes();
    if (method.getDeclaringClass() == Object.class) {
        return method.invoke(invoker, args);
    }
    if ("toString".equals(methodName) && parameterTypes.length == 0) {
        return invoker.toString();
    }
    if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
        return invoker.hashCode();
    }
    if ("equals".equals(methodName) && parameterTypes.length == 1) {
        return invoker.equals(args[0]);
    }
    RpcInvocation invocation;
    if (RpcUtils.hasGeneratedFuture(method)) {
        Class> clazz = method.getDeclaringClass();
        String syncMethodName = methodName.substring(0, methodName.length() - Constants.ASYNC_SUFFIX.length());
        Method syncMethod = clazz.getMethod(syncMethodName, method.getParameterTypes());
        invocation = new RpcInvocation(syncMethod, args);
        invocation.setAttachment(Constants.FUTURE_GENERATED_KEY, "true");
        invocation.setAttachment(Constants.ASYNC_KEY, "true");
    } else {
        invocation = new RpcInvocation(method, args);
        if (RpcUtils.hasFutureReturnType(method)) {
            invocation.setAttachment(Constants.FUTURE_RETURNTYPE_KEY, "true");
            invocation.setAttachment(Constants.ASYNC_KEY, "true");
        }
    }
    //这里使用的是动态代理的方式获取到指定的代理类
   // <1>
    return invoker.invoke(invocation).recreate();
}
1 执行invoke就要开始进入MockClusterInvoker#invoker
public Result invoke(Invocation invocation) throws RpcException {
    Result result = null;
    // 获得 “mock” 配置项,有多种配置方式
    String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
    //【第一种】无 mock
    if (value.length() == 0 || value.equalsIgnoreCase("false")) {
        //no mock
        // 调用原 Invoker ,发起 RPC 调用
        // 调用 invoker方法,进入到集群也就是CLuster类中
        //<2>
        result = this.invoker.invoke(invocation);
    //【第二种】强制服务降级
    } else if (value.startsWith("force")) {
        if (logger.isWarnEnabled()) {
            logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
        }
        //force:direct mock
        // 直接调用 Mock Invoker ,执行本地 Mock 逻辑
        result = doMockInvoke(invocation, null);
    } else {
        //fail-mock
        try {
            // 【第三种】失败服务降级
            result = this.invoker.invoke(invocation);
        } catch (RpcException e) {
            // 业务性异常,直接抛出
            if (e.isBiz()) {
                throw e;
            } else {
                if (logger.isWarnEnabled()) {
                    logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
                }
                // 失败后,调用 Mock Invoker ,执行本地 Mock 逻辑
                result = doMockInvoke(invocation, e);
            }
        }
    }
    return result;
}
2 进入到 invoke就要开始进入到集群,也就是Cluster
/**
 * 调用服务提供者
 * @param invocation
 * @return
 * @throws RpcException
 */
@Override
public Result invoke(final Invocation invocation) throws RpcException {
    // 校验是否销毁
    checkWhetherDestroyed();
    //TODO
    // binding attachments into invocation.
    Map contextAttachments = RpcContext.getContext().getAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        ((RpcInvocation) invocation).addAttachments(contextAttachments);
    }
    // 获得所有服务提供者 Invoker 集合
    // <下面的list方法>
    List> invokers = list(invocation);
    // 获得 LoadBalance 对象
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    // 设置调用编号,若是异步调用
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    // 执行调用
    return doInvoke(invocation, invokers, loadbalance);
}
/**
 *  获得所有服务提供者 Invoker 集合
 * @param invocation
 * @return
 * @throws RpcException
 */
protected List> list(Invocation invocation) throws RpcException {
    // 通过directory 进入到 AbstractDirectory 中选择 directory
    //<3 进入3 里面>
    return directory.list(invocation);
}
3 进入到 AbstractDirectory 进行 directory的选择
/**
 * 获得所有服务 Invoker 集合
 * @param invocation
 * @return Invoker 集合
 * @throws RpcException
 */
@Override
public List> list(Invocation invocation) throws RpcException {
    //当销毁时抛出异常
    if (destroyed) {
        throw new RpcException("Directory already destroyed .url: " + getUrl());
    }
    // 获得所有 Invoker 集合
    // <4 RegistryDirectory 选择 invoker>
    List> invokers = doList(invocation);
    //根据路由规则,筛选Invoker集合
    List localRouters = this.routers; // local reference 本地引用,避免并发问题
    if (localRouters != null && !localRouters.isEmpty()) {
        for (Router router : localRouters) {
            try {
                if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
                  
                    invokers = router.route(invokers, getConsumerUrl(), invocation);
                }
            } catch (Throwable t) {
                logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
            }
        }
    }
      //< 6 获取 router 即将进入 MockInvokersSelector 类中>
    return invokers;
}
调用list方法会进入到 RegistryDirectory#doList
/**
 *  获得对应的 Invoker 集合。
 * @param invocation
 * @return
 */
@Override
public List> doList(Invocation invocation) {
    if (forbidden) {
        // 1. No service provider 2. Service providers are disabled
        throw new RpcException(RpcException.FORBIDDEN_EXCEPTION,
                "No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " + NetUtils.getLocalHost()
                        + " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist).");
    }
    List> invokers = null;
    //从methodInvokerMap中取出invokers
    Map>> localMethodInvokerMap = this.methodInvokerMap; // local reference
    // 获得 Invoker 集合
    if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
        // 获得方法名、方法参数
        String methodName = RpcUtils.getMethodName(invocation);
        Object[] args = RpcUtils.getArguments(invocation);
        // 【第一】可根据第一个参数枚举路由
        if (args != null && args.length > 0 && args[0] != null
                && (args[0] instanceof String || args[0].getClass().isEnum())) {
            invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter
        }
        // 【第二】根据方法名获得 Invoker 集合
        if (invokers == null) {
            invokers = localMethodInvokerMap.get(methodName);
        }
        // 【第三】使用全量 Invoker 集合。例如,`#$echo(name)` ,回声方法
        if (invokers == null) {
            invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
        }
        // 【第四】使用 `methodInvokerMap` 第一个 Invoker 集合。防御性编程。
        if (invokers == null) {
            Iterator>> iterator = localMethodInvokerMap.values().iterator();
            if (iterator.hasNext()) {
                invokers = iterator.next();
            }
        }
    }
    return invokers == null ? new ArrayList>(0) : invokers;
}
6 进入 MockInvokersSelector 类中根据路由规则拿到正常执行的invokers
/**
 * ,根据 "invocation.need.mock" 路由匹配对应类型的 Invoker 集合:
 * @param invokers Invoker 集合
 * @param url        refer url
 * @param invocation
 * @param 
 * @return
 * @throws RpcException
 */
@Override
public  List> route(final List> invokers,
                                  URL url, final Invocation invocation) throws RpcException {
    // 获得普通 Invoker 集合
    if (invocation.getAttachments() == null) {
        //<7> 拿到能正常执行的invokers,并将其返回.也就是序号7
        return getNormalInvokers(invokers);
    } else {
        // 获得 "invocation.need.mock" 配置项
        String value = invocation.getAttachments().get(Constants.INVOCATION_NEED_MOCK);
        // 获得普通 Invoker 集合
        if (value == null)
            return getNormalInvokers(invokers);
        // 获得 MockInvoker 集合
        else if (Boolean.TRUE.toString().equalsIgnoreCase(value)) {
            return getMockedInvokers(invokers);
        }
    }
    // 其它,不匹配,直接返回 `invokers` 集合
    return invokers;
}
<7> 拿到能正常执行的invokers,并将其返回
/**
 * 获得普通 Invoker 集合
 * @param invokers
 * @param 
 * @return
 */
private  List> getNormalInvokers(final List> invokers) {
    // 不包含 MockInvoker 的情况下,直接返回 `invokers` 集合
    if (!hasMockProviders(invokers)) {
        return invokers;
    } else {
        // 若包含 MockInvoker 的情况下,过滤掉 MockInvoker ,创建普通 Invoker 集合
        List> sInvokers = new ArrayList>(invokers.size());
        for (Invoker invoker : invokers) {
            if (!invoker.getUrl().getProtocol().equals(Constants.MOCK_PROTOCOL)) {
                sInvokers.add(invoker);
            }
        }
        return sInvokers;
    }
}
8 拿到 invoker 返回到 AbstractClusterInvoker这个类
对于上面的这些步骤,主要用于做两件事
在Directory中找出本次集群中的全部invokers
在Router中,将上一步的全部invokers挑选出能正常执行的invokers
在 时序图的序号5和序号7处,做了上诉的处理。
在有多个集群的情况下,而且两个集群都是正常的,那么到底需要执行哪个?
AbstractClusterInvoker#invoke
/**
 * 调用服务提供者
 * @param invocation
 * @return
 * @throws RpcException
 */
@Override
public Result invoke(final Invocation invocation) throws RpcException {
    // 校验是否销毁
    checkWhetherDestroyed();
    //TODO
    // binding attachments into invocation.
    Map contextAttachments = RpcContext.getContext().getAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        ((RpcInvocation) invocation).addAttachments(contextAttachments);
    }
    // 获得所有服务提供者 Invoker 集合
    List> invokers = list(invocation);
    // 获得 LoadBalance 对象
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    // 设置调用编号,若是异步调用
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    // 执行调用
    return doInvoke(invocation, invokers, loadbalance);
}
/**
 *  实现子 Cluster 的 Invoker 实现类的服务调用的差异逻辑,
 * @param invocation
 * @param invokers
 * @param loadbalance
 * @return
 * @throws RpcException
 */
 // 抽象方法,子类自行的实现  因为我们使用的默认配置,所以 我们将会是FailoverClusterInvoker 这个类
 //< 8 doInvoker 方法>
protected abstract Result doInvoke(Invocation invocation, List> invokers,
                                   LoadBalance loadbalance) throws RpcException;
9 进入到 相应的集群容错方案 类中 因为我们使用的默认配置,所以 我们将会是FailoverClusterInvoker 这个类
/**
 * 实际逻辑很简单:循环,查找一个 Invoker 对象,进行调用,直到成功
 * @param invocation
 * @param invokers
 * @param loadbalance
 * @return
 * @throws RpcException
 */
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(Invocation invocation, final List> invokers, LoadBalance loadbalance) throws RpcException {
    List> copyinvokers = invokers;
    // 检查copyinvokers即可用Invoker集合是否为空,如果为空,那么抛出异常
    checkInvokers(copyinvokers, invocation);
    String