Dubbo2.7源码分析-如何发布服务

Dubbo的服务发布逻辑是比较复杂的,我们还是以Dubbo自带的示例进行讲解,抽丝剥茧,拨云见日。 Provider配置如下: ApplicationContext ClassPathXmlApplicationContext父类AbstractApplicationContext的方法refresh()在实例化bean之后的最后一步finishRefresh()中,此方法作用是发布相应的事件。 protected void finishRefresh() { //省略LifeCycleProcessor刷新代码 // Publish the final event. publishEvent(new ContextRefreshedEvent(this)); // 省略注册到 LiveBeansView MBean代码 } 可以看到发布了一个ContextRefreshedEvent事件。 protected void publishEvent(Object event, ResolvableType eventType) { //省略部分代码 getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType); //省略分部代码 首先获取ApplicationEvent事件广播对象,然后广播事件。 ApplicationEvent事件广播对象默认是SimpleApplicationEventMulticaster,这个对象是在AbstractApplicationContext的方法initApplicationEventMulticaster()初始化的,如果需要自定义,可以实现接口ApplicationEventMulticaster,并将bean的名字命名为applicationEventMulticaster。 接下来看看SimpleApplicationEventMulticaster类的multicastEvent方法。 @Override public void multicastEvent(final ApplicationEvent event, ResolvableType eventType) { //事件类型 ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event)); //applicationListener for (final ApplicationListener listener : getApplicationListeners(event, type)) { //异常执行 Executor executor = getTaskExecutor(); if (executor != null) { executor.execute(new Runnable() { @Override public void run() { invokeListener(listener, event); } }); } else { invokeListener(listener, event); } } } 可以看到此方法会调用applicationListener的方法,对于Dubbo而言,就是ServiceBean. 怎么样获取到ServiceBean的呢? ServiceBean实现了好几个接口,其中有两个接口ApplicationContextAware和ApplicationListener,其中ApplicationContextAware使ServiceBean具有获取ApplicationContext的能力(了解bean的生命周期),而ApplicationListener使ServiceBean具有响应事件响应的能力。dubbo实现ApplicationContextAware的目的是通过反射把自己添加到ApplicationContext的ApplicationListener列表中,即使不实现ApplicationContextAware接口,spring也会将实现了ApplicationListener接口的bean添加到其listener列表中的,dubbo这样做估计是向后兼容。 接着看invokeListener(listener, event);方法 protected void invokeListener(ApplicationListener listener, ApplicationEvent event) { ErrorHandler errorHandler = getErrorHandler(); if (errorHandler != null) { try { doInvokeListener(listener, event); } catch (Throwable err) { errorHandler.handleError(err); } } else { doInvokeListener(listener, event); } } private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) { try { listener.onApplicationEvent(event); } catch (ClassCastException ex) { //省略异常处理 } else { throw ex; } } } invokeListener方法内部调用了doInvokeListener方法,而doInvokeListener方法调用了listener(ServiceBean)的onApplicationEvent方法. ServiceBean public void onApplicationEvent(ContextRefreshedEvent event) { if (isDelay() && !isExported() && !isUnexported()) { if (logger.isInfoEnabled()) { logger.info("The service ready on spring started. service: " + getInterface()); } export(); } } onApplicationEvent方法调用了export方法,export方法首先判断是否已经发布了服务,发布了则直接返回,没有发布则会判断是否需要延迟发布,如果需要延迟,则将发布服务做为一个任务添加到ScheduledThreadPoolExecutor线程池中,如果不延迟,则调用doExport方法立即发布服务。 doExport方法中会获取application/registries/monitor/module/protocols,并做一些检查和属性填充,然后调用doExportUrls();发布服务。doExportUrls()首先调用loadRegistries方法得到要注册的url,然后发布相关Protocol的服务。 简单叙述一下获取url的过程,url通过map组装参数和对应的值,参数有ApplicationConfig和RegistryConfig对象的属性以及path、dubbo、timestamp、pid、protocol、registry。 本示例applicationConfig是: registryURL registryConfig是: 最终map组装结果是:url parameters 最后得到registryURL是: registry://224.5.6.7:1234/org.apache.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&pid=4892&qos.port=22222®istry=multicast×tamp=1536112339884 然后调用doExportUrlsFor1Protocol方法发布服务,此方法开始部分是构造发布的服务URL,然后再发布url。 服务URL URL包括以下几部分:服务端还是客户端标识,Dubbo版本,时间戳,Pid,服务的方法名,token、ApplicationConfig,MoudleConfig,ProviderConfig,ProtocolConfig,*MethodConfig对象的相关属性等。 例如本示例的url: dubbo://192.168.124.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.124.1&bind.port=20880&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=8004&qos.port=22222&side=provider×tamp=1536114090787 我们来着重看一下在构造URL过程中port的获取过程。 //protocolConfig是配置的生成的对象 //name是protocol的name,本示例为"dubbo" //map保存了url的键值对 Integer port = this.findConfigedPorts(protocolConfig, name, map); findConfigedPorts顾名思义是查找配置的port,从哪查呢,先从系统环境变量查,如果没找到,再查找名字为name的protocol协义。 private Integer findConfigedPorts(ProtocolConfig protocolConfig, String name, Map map) { Integer portToBind = null; // 从环境变量从查找绑定的port String port = getValueFromConfig(protocolConfig, Constants.DUBBO_PORT_TO_BIND); portToBind = parsePort(port); // 如果没有从环境变量从查到,则从名称为name的protocol查找 if (portToBind == null) { portToBind = protocolConfig.getPort(); if (provider != null && (portToBind == null || portToBind == 0)) { portToBind = provider.getPort(); } //这一句是关键,示例中name值是"dubbo",所以会实例化DubboProtocol,得到默认的port:20880 final int defaultPort = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(name).getDefaultPort(); if (portToBind == null || portToBind == 0) { portToBind = defaultPort; } if (portToBind == null || portToBind <= 0) { portToBind = getRandomPort(name); if (portToBind == null || portToBind < 0) { portToBind = getAvailablePort(defaultPort); putRandomPort(name, portToBind); } logger.warn("Use random available port(" + portToBind + ") for protocol " + name); } } //保存port到map中,以便后面url使用 map.put(Constants.BIND_PORT_KEY, String.valueOf(portToBind)); // 从环境变量中查找注册的port,如果没有找到,则等于绑定的Port. String portToRegistryStr = getValueFromConfig(protocolConfig, Constants.DUBBO_PORT_TO_REGISTRY); Integer portToRegistry = parsePort(portToRegistryStr); if (portToRegistry == null) { portToRegistry = portToBind; } return portToRegistry; } 有人或许有疑问,ServiceConfig在实例化时,不是已经加载过Protocol了吗?为什么还要使用ExtensionLoader加载一次呢? final int defaultPort =ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(name).getDefaultPort(); 答: ServiceConfig实例化时,加载的Protocol是自适应的Protocol,是动态生成的,类名是Protocol$Adaptive(见Dubbo源码分析-SPI的应用中有分析)。而这里获取Port时加载的也是Protocol类,但指名了具体加载的是哪个Protocol(本示例是名称为dubbo的Protocol,即DubboProtocol,此类默认的端口是20880)。 发布URL 发布本地服务 调用ServiceConfig类的exportLocal(URL url)发布本地服务。 private void exportLocal(URL url) { if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { //本地服务url URL local = URL.valueOf(url.toFullString()) .setProtocol(Constants.LOCAL_PROTOCOL) .setHost(LOCALHOST) .setPort(0); ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref)); Exporter exporter = protocol.export( proxyFactory.getInvoker(ref, (Class) interfaceClass, local)); exporters.add(exporter); logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry"); } } 本示例的本地服务 url是: injvm://127.0.0.1/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.124.1&bind.port=20880&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=3008&qos.port=22222&side=provider×tamp=1536125473655 重点看这一句: Exporter exporter = protocol.export( proxyFactory.getInvoker(ref, (Class) interfaceClass, local)); 其中涉及到ProxyFactory和Protocol,下面分别来看一看。 ProxyFactory proxyFactory也是通过SPI加载的自适应类对象,类名为ProxyFactory$Adaptive,我们来看一下其class文件反编译后的源码。 package org.apache.dubbo.rpc; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.extension.ExtensionLoader; public class ProxyFactory$Adaptive implements ProxyFactory { public ProxyFactory$Adaptive() { } public Invoker getInvoker(Object var1, Class var2, URL var3) throws RpcException { if (var3 == null) { throw new IllegalArgumentException("url == null"); } else { String var5 = var3.getParameter("proxy", "javassist"); if (var5 == null) { throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.ProxyFactory) name from url(" + var3.toString() + ") use keys([proxy])"); } else { ProxyFactory var6 = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(var5); return var6.getInvoker(var1, var2, var3); } } } public Object getProxy(Invoker var1, boolean var2) throws RpcException { if (var1 == null) { throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null"); } else if (var1.getUrl() == null) { throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null"); } else { URL var3 = var1.getUrl(); String var4 = var3.getParameter("proxy", "javassist"); if (var4 == null) { throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.ProxyFactory) name from url(" + var3.toString() + ") use keys([proxy])"); } else { ProxyFactory var5 = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(var4); return var5.getProxy(var1, var2); } } } public Object getProxy(Invoker var1) throws RpcException { if (var1 == null) { throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null"); } else if (var1.getUrl() == null) { throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null"); } else { URL var2 = var1.getUrl(); String var3 = var2.getParameter("proxy", "javassist"); if (var3 == null) { throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.ProxyFactory) name from url(" + var2.toString() + ") use keys([proxy])"); } else { ProxyFactory var4 = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(var3); return var4.getProxy(var1); } } } } 其中有三个方法,两个获取代理,一个获取Invoker。我们来看其中的getInvoker方法,默认获取名称为javassist的ProxyFactory。 由于本地服务URL中没有proxy参数,所以会调用JavassistProxyFactory的getInvoker(T proxy, Class type, URL url)方法,返回AbstractProxyInvoker的匿名类对象,此对象代理了服务对象(本示例中为DemoServiceImpl对象)。 其实(ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension("javassist");获取到的并不是JavassistProxyFactory对象,而是StubProxyFactoryWrapper对象,为什么呢?我们可以看下ExtensionLoader的getExtension(String name)方法 public T getExtension(String name) { //检查name是否合法 if (name == null || name.length() == 0) throw new IllegalArgumentException("Extension name == null"); //如果name等于true,则加载SPI的默认插件 if ("true".equals(name)) { return getDefaultExtension(); } //从当前插件类的缓存实例对象中获取 Holder holder = cachedInstances.get(name); if (holder == null) { cachedInstances.putIfAbsent(name, new Holder()); holder = cachedInstances.get(name); } Object instance = holder.get(); if (instance == null) { synchronized (holder) { instance = holder.get(); if (instance == null) { //创建插件实例 instance = createExtension(name); holder.set(instance); } } } return (T) instance; } private T createExtension(String name) { //从文件目录中加载插件类 Class clazz = getExtensionClasses().get(name); if (clazz == null) { throw findException(name);
50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信