dubbo源码解析五 --- 集群容错架构设计与原理分析

欢迎来我的 Star Followers 后期后继续更新Dubbo别的文章 Dubbo 源码分析系列之一环境搭建 博客园 Dubbo 入门之二 ——- 项目结构解析 博客园 Dubbo 源码分析系列之三 —— 架构原理 博客园 Dubbo 源码解析四 —— 负载均衡LoadBalance 博客园 下面是个人博客地址,页面比博客园美观一些其他都是一样的 Dubbo 源码分析系列之一环境搭建" Dubbo 源码分析系列之一环境搭建 个人博客地址" Dubbo 入门之二 ——- 项目结构解析"Dubbo 项目结构解析 个人博客地址" Dubbo 源码分析系列之三 —— 架构原理" Dubbo 源码分析系列之三---架构原理 个人博客地址" Dubbo 源码解析四 —— 负载均衡LoadBalance" dubbo源码解析四 --- 负载均衡LoadBalance 个人博客地址" Dubbo 源码解析五 —— 集群容错" dubbo源码解析五 --- 集群容错架构设计与原理分析 个人博客地址" 目录 面试中集群容错的经常的问题 Dubbo 官方文档关于集群容错的介绍 Dubbo集群容错的架构分析 Dubbo集群容错源码解析 面试中集群容错的经常的问题 什么是集群容错 Dubbo的集群容错知道吗 Dubbo 集群容错是如何配置的 集群容错如何实现的 Dubbo 集群容错介绍下 介绍下 几种集群容错方式,分析下其优缺点 你来设计一个容错算法,你会怎样的设计 Dubbo 官方文档关于集群容错的介绍 在集群调用失败时,Dubbo 提供了多种容错方案,缺省为 failover 重试。 cluster 各节点关系: 这里的 Invoker 是 Provider 的一个可调用 Service 的抽象,Invoker 封装了 Provider 地址及 Service 接口信息 Directory 代表多个 Invoker,可以把它看成 List ,但与 List 不同的是,它的值可能是动态变化的,比如注册中心推送变更 Cluster 将 Directory 中的多个 Invoker 伪装成一个 Invoker,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个 Router 负责从多个 Invoker 中按路由规则选出子集,比如读写分离,应用隔离等 LoadBalance 负责从多个 Invoker 中选出具体的一个用于本次调用,选的过程包含了负载均衡算法,调用失败后,需要重选 集群容错模式 可以自行扩展集群容错策略,参见:集群扩展 Failover Cluster 失败自动切换,当出现失败,重试其它服务器 [1]。通常用于读操作,但重试会带来更长延迟。可通过 retries="2" 来设置重试次数(不含第一次)。 重试次数配置如下: Failfast Cluster 快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。 Failsafe Cluster 失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。 Failback Cluster 失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。 Forking Cluster 并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks="2" 来设置最大并行数。 Broadcast Cluster 广播调用所有提供者,逐个调用,任意一台报错则报错 [2]。通常用于通知所有提供者更新缓存或日志等本地资源信息。 集群模式配置 按照以下示例在服务提供方和消费方配置集群模式 Dubbo集群容错的架构分析 通过官网上这张图我们能大致的了解到一个请求过来,在集群中的调用过程。那么我们就根据这个调用过程来进行分析吧。 图片 整个在调用的过程中 这三个关键词接下来会贯穿全文,他们就是Directory,Router,LoadBalance 我们只要牢牢的抓住这几个关键字就能贯穿整个调用链 先看下时序图,来看下调用的过程 时序图 最初我们一个方法调用 我们使用的是官方的dubbo-demo的dubbo-demo-consumer public static void main(String[] args) { DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy String hello = demoService.sayHello("world"); // call remote method System.out.println(hello); // get result } 调用 InvokerInvocationHandler#invoker方法代理类的调用 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class[] parameterTypes = method.getParameterTypes(); if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); } RpcInvocation invocation; if (RpcUtils.hasGeneratedFuture(method)) { Class clazz = method.getDeclaringClass(); String syncMethodName = methodName.substring(0, methodName.length() - Constants.ASYNC_SUFFIX.length()); Method syncMethod = clazz.getMethod(syncMethodName, method.getParameterTypes()); invocation = new RpcInvocation(syncMethod, args); invocation.setAttachment(Constants.FUTURE_GENERATED_KEY, "true"); invocation.setAttachment(Constants.ASYNC_KEY, "true"); } else { invocation = new RpcInvocation(method, args); if (RpcUtils.hasFutureReturnType(method)) { invocation.setAttachment(Constants.FUTURE_RETURNTYPE_KEY, "true"); invocation.setAttachment(Constants.ASYNC_KEY, "true"); } } //这里使用的是动态代理的方式获取到指定的代理类 // <1> return invoker.invoke(invocation).recreate(); } 1 执行invoke就要开始进入MockClusterInvoker#invoker public Result invoke(Invocation invocation) throws RpcException { Result result = null; // 获得 “mock” 配置项,有多种配置方式 String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); //【第一种】无 mock if (value.length() == 0 || value.equalsIgnoreCase("false")) { //no mock // 调用原 Invoker ,发起 RPC 调用 // 调用 invoker方法,进入到集群也就是CLuster类中 //<2> result = this.invoker.invoke(invocation); //【第二种】强制服务降级 } else if (value.startsWith("force")) { if (logger.isWarnEnabled()) { logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl()); } //force:direct mock // 直接调用 Mock Invoker ,执行本地 Mock 逻辑 result = doMockInvoke(invocation, null); } else { //fail-mock try { // 【第三种】失败服务降级 result = this.invoker.invoke(invocation); } catch (RpcException e) { // 业务性异常,直接抛出 if (e.isBiz()) { throw e; } else { if (logger.isWarnEnabled()) { logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e); } // 失败后,调用 Mock Invoker ,执行本地 Mock 逻辑 result = doMockInvoke(invocation, e); } } } return result; } 2 进入到 invoke就要开始进入到集群,也就是Cluster /** * 调用服务提供者 * @param invocation * @return * @throws RpcException */ @Override public Result invoke(final Invocation invocation) throws RpcException { // 校验是否销毁 checkWhetherDestroyed(); //TODO // binding attachments into invocation. Map contextAttachments = RpcContext.getContext().getAttachments(); if (contextAttachments != null && contextAttachments.size() != 0) { ((RpcInvocation) invocation).addAttachments(contextAttachments); } // 获得所有服务提供者 Invoker 集合 // <下面的list方法> List> invokers = list(invocation); // 获得 LoadBalance 对象 LoadBalance loadbalance = initLoadBalance(invokers, invocation); // 设置调用编号,若是异步调用 RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); // 执行调用 return doInvoke(invocation, invokers, loadbalance); } /** * 获得所有服务提供者 Invoker 集合 * @param invocation * @return * @throws RpcException */ protected List> list(Invocation invocation) throws RpcException { // 通过directory 进入到 AbstractDirectory 中选择 directory //<3 进入3 里面> return directory.list(invocation); } 3 进入到 AbstractDirectory 进行 directory的选择 /** * 获得所有服务 Invoker 集合 * @param invocation * @return Invoker 集合 * @throws RpcException */ @Override public List> list(Invocation invocation) throws RpcException { //当销毁时抛出异常 if (destroyed) { throw new RpcException("Directory already destroyed .url: " + getUrl()); } // 获得所有 Invoker 集合 // <4 RegistryDirectory 选择 invoker> List> invokers = doList(invocation); //根据路由规则,筛选Invoker集合 List localRouters = this.routers; // local reference 本地引用,避免并发问题 if (localRouters != null && !localRouters.isEmpty()) { for (Router router : localRouters) { try { if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) { invokers = router.route(invokers, getConsumerUrl(), invocation); } } catch (Throwable t) { logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t); } } } //< 6 获取 router 即将进入 MockInvokersSelector 类中> return invokers; } 调用list方法会进入到 RegistryDirectory#doList /** * 获得对应的 Invoker 集合。 * @param invocation * @return */ @Override public List> doList(Invocation invocation) { if (forbidden) { // 1. No service provider 2. Service providers are disabled throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist)."); } List> invokers = null; //从methodInvokerMap中取出invokers Map>> localMethodInvokerMap = this.methodInvokerMap; // local reference // 获得 Invoker 集合 if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) { // 获得方法名、方法参数 String methodName = RpcUtils.getMethodName(invocation); Object[] args = RpcUtils.getArguments(invocation); // 【第一】可根据第一个参数枚举路由 if (args != null && args.length > 0 && args[0] != null && (args[0] instanceof String || args[0].getClass().isEnum())) { invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter } // 【第二】根据方法名获得 Invoker 集合 if (invokers == null) { invokers = localMethodInvokerMap.get(methodName); } // 【第三】使用全量 Invoker 集合。例如,`#$echo(name)` ,回声方法 if (invokers == null) { invokers = localMethodInvokerMap.get(Constants.ANY_VALUE); } // 【第四】使用 `methodInvokerMap` 第一个 Invoker 集合。防御性编程。 if (invokers == null) { Iterator>> iterator = localMethodInvokerMap.values().iterator(); if (iterator.hasNext()) { invokers = iterator.next(); } } } return invokers == null ? new ArrayList>(0) : invokers; } 6 进入 MockInvokersSelector 类中根据路由规则拿到正常执行的invokers /** * ,根据 "invocation.need.mock" 路由匹配对应类型的 Invoker 集合: * @param invokers Invoker 集合 * @param url refer url * @param invocation * @param * @return * @throws RpcException */ @Override public List> route(final List> invokers, URL url, final Invocation invocation) throws RpcException { // 获得普通 Invoker 集合 if (invocation.getAttachments() == null) { //<7> 拿到能正常执行的invokers,并将其返回.也就是序号7 return getNormalInvokers(invokers); } else { // 获得 "invocation.need.mock" 配置项 String value = invocation.getAttachments().get(Constants.INVOCATION_NEED_MOCK); // 获得普通 Invoker 集合 if (value == null) return getNormalInvokers(invokers); // 获得 MockInvoker 集合 else if (Boolean.TRUE.toString().equalsIgnoreCase(value)) { return getMockedInvokers(invokers); } } // 其它,不匹配,直接返回 `invokers` 集合 return invokers; } <7> 拿到能正常执行的invokers,并将其返回 /** * 获得普通 Invoker 集合 * @param invokers * @param * @return */ private List> getNormalInvokers(final List> invokers) { // 不包含 MockInvoker 的情况下,直接返回 `invokers` 集合 if (!hasMockProviders(invokers)) { return invokers; } else { // 若包含 MockInvoker 的情况下,过滤掉 MockInvoker ,创建普通 Invoker 集合 List> sInvokers = new ArrayList>(invokers.size()); for (Invoker invoker : invokers) { if (!invoker.getUrl().getProtocol().equals(Constants.MOCK_PROTOCOL)) { sInvokers.add(invoker); } } return sInvokers; } } 8 拿到 invoker 返回到 AbstractClusterInvoker这个类 对于上面的这些步骤,主要用于做两件事 在Directory中找出本次集群中的全部invokers 在Router中,将上一步的全部invokers挑选出能正常执行的invokers 在 时序图的序号5和序号7处,做了上诉的处理。 在有多个集群的情况下,而且两个集群都是正常的,那么到底需要执行哪个? AbstractClusterInvoker#invoke /** * 调用服务提供者 * @param invocation * @return * @throws RpcException */ @Override public Result invoke(final Invocation invocation) throws RpcException { // 校验是否销毁 checkWhetherDestroyed(); //TODO // binding attachments into invocation. Map contextAttachments = RpcContext.getContext().getAttachments(); if (contextAttachments != null && contextAttachments.size() != 0) { ((RpcInvocation) invocation).addAttachments(contextAttachments); } // 获得所有服务提供者 Invoker 集合 List> invokers = list(invocation); // 获得 LoadBalance 对象 LoadBalance loadbalance = initLoadBalance(invokers, invocation); // 设置调用编号,若是异步调用 RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); // 执行调用 return doInvoke(invocation, invokers, loadbalance); } /** * 实现子 Cluster 的 Invoker 实现类的服务调用的差异逻辑, * @param invocation * @param invokers * @param loadbalance * @return * @throws RpcException */ // 抽象方法,子类自行的实现 因为我们使用的默认配置,所以 我们将会是FailoverClusterInvoker 这个类 //< 8 doInvoker 方法> protected abstract Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException; 9 进入到 相应的集群容错方案 类中 因为我们使用的默认配置,所以 我们将会是FailoverClusterInvoker 这个类 /** * 实际逻辑很简单:循环,查找一个 Invoker 对象,进行调用,直到成功 * @param invocation * @param invokers * @param loadbalance * @return * @throws RpcException */ @Override @SuppressWarnings({"unchecked", "rawtypes"}) public Result doInvoke(Invocation invocation, final List> invokers, LoadBalance loadbalance) throws RpcException { List> copyinvokers = invokers; // 检查copyinvokers即可用Invoker集合是否为空,如果为空,那么抛出异常 checkInvokers(copyinvokers, invocation); String
50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信