Dubbo的服务发布逻辑是比较复杂的,我们还是以Dubbo自带的示例进行讲解,抽丝剥茧,拨云见日。
Provider配置如下:
ApplicationContext
ClassPathXmlApplicationContext父类AbstractApplicationContext的方法refresh()在实例化bean之后的最后一步finishRefresh()中,此方法作用是发布相应的事件。
protected void finishRefresh() {
//省略LifeCycleProcessor刷新代码
// Publish the final event.
publishEvent(new ContextRefreshedEvent(this));
// 省略注册到 LiveBeansView MBean代码
}
可以看到发布了一个ContextRefreshedEvent事件。
protected void publishEvent(Object event, ResolvableType eventType) {
//省略部分代码
getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
//省略分部代码
首先获取ApplicationEvent事件广播对象,然后广播事件。
ApplicationEvent事件广播对象默认是SimpleApplicationEventMulticaster,这个对象是在AbstractApplicationContext的方法initApplicationEventMulticaster()初始化的,如果需要自定义,可以实现接口ApplicationEventMulticaster,并将bean的名字命名为applicationEventMulticaster。
接下来看看SimpleApplicationEventMulticaster类的multicastEvent方法。
@Override
public void multicastEvent(final ApplicationEvent event, ResolvableType eventType) {
//事件类型
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
//applicationListener
for (final ApplicationListener> listener : getApplicationListeners(event, type)) {
//异常执行
Executor executor = getTaskExecutor();
if (executor != null) {
executor.execute(new Runnable() {
@Override
public void run() {
invokeListener(listener, event);
}
});
}
else {
invokeListener(listener, event);
}
}
}
可以看到此方法会调用applicationListener的方法,对于Dubbo而言,就是ServiceBean.
怎么样获取到ServiceBean的呢?
ServiceBean实现了好几个接口,其中有两个接口ApplicationContextAware和ApplicationListener,其中ApplicationContextAware使ServiceBean具有获取ApplicationContext的能力(了解bean的生命周期),而ApplicationListener使ServiceBean具有响应事件响应的能力。dubbo实现ApplicationContextAware的目的是通过反射把自己添加到ApplicationContext的ApplicationListener列表中,即使不实现ApplicationContextAware接口,spring也会将实现了ApplicationListener接口的bean添加到其listener列表中的,dubbo这样做估计是向后兼容。
接着看invokeListener(listener, event);方法
protected void invokeListener(ApplicationListener> listener, ApplicationEvent event) {
ErrorHandler errorHandler = getErrorHandler();
if (errorHandler != null) {
try {
doInvokeListener(listener, event);
}
catch (Throwable err) {
errorHandler.handleError(err);
}
}
else {
doInvokeListener(listener, event);
}
}
private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
try {
listener.onApplicationEvent(event);
}
catch (ClassCastException ex) {
//省略异常处理
}
else {
throw ex;
}
}
}
invokeListener方法内部调用了doInvokeListener方法,而doInvokeListener方法调用了listener(ServiceBean)的onApplicationEvent方法.
ServiceBean
public void onApplicationEvent(ContextRefreshedEvent event) {
if (isDelay() && !isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
export();
}
}
onApplicationEvent方法调用了export方法,export方法首先判断是否已经发布了服务,发布了则直接返回,没有发布则会判断是否需要延迟发布,如果需要延迟,则将发布服务做为一个任务添加到ScheduledThreadPoolExecutor线程池中,如果不延迟,则调用doExport方法立即发布服务。
doExport方法中会获取application/registries/monitor/module/protocols,并做一些检查和属性填充,然后调用doExportUrls();发布服务。doExportUrls()首先调用loadRegistries方法得到要注册的url,然后发布相关Protocol的服务。
简单叙述一下获取url的过程,url通过map组装参数和对应的值,参数有ApplicationConfig和RegistryConfig对象的属性以及path、dubbo、timestamp、pid、protocol、registry。
本示例applicationConfig是:
registryURL
registryConfig是:
最终map组装结果是:url parameters
最后得到registryURL是:
registry://224.5.6.7:1234/org.apache.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&pid=4892&qos.port=22222®istry=multicast×tamp=1536112339884
然后调用doExportUrlsFor1Protocol方法发布服务,此方法开始部分是构造发布的服务URL,然后再发布url。
服务URL
URL包括以下几部分:服务端还是客户端标识,Dubbo版本,时间戳,Pid,服务的方法名,token、ApplicationConfig,MoudleConfig,ProviderConfig,ProtocolConfig,*MethodConfig对象的相关属性等。
例如本示例的url:
dubbo://192.168.124.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.124.1&bind.port=20880&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=8004&qos.port=22222&side=provider×tamp=1536114090787
我们来着重看一下在构造URL过程中port的获取过程。
//protocolConfig是配置的生成的对象
//name是protocol的name,本示例为"dubbo"
//map保存了url的键值对
Integer port = this.findConfigedPorts(protocolConfig, name, map);
findConfigedPorts顾名思义是查找配置的port,从哪查呢,先从系统环境变量查,如果没找到,再查找名字为name的protocol协义。
private Integer findConfigedPorts(ProtocolConfig protocolConfig, String name, Map map) {
Integer portToBind = null;
// 从环境变量从查找绑定的port
String port = getValueFromConfig(protocolConfig, Constants.DUBBO_PORT_TO_BIND);
portToBind = parsePort(port);
// 如果没有从环境变量从查到,则从名称为name的protocol查找
if (portToBind == null) {
portToBind = protocolConfig.getPort();
if (provider != null && (portToBind == null || portToBind == 0)) {
portToBind = provider.getPort();
}
//这一句是关键,示例中name值是"dubbo",所以会实例化DubboProtocol,得到默认的port:20880
final int defaultPort = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(name).getDefaultPort();
if (portToBind == null || portToBind == 0) {
portToBind = defaultPort;
}
if (portToBind == null || portToBind <= 0) {
portToBind = getRandomPort(name);
if (portToBind == null || portToBind < 0) {
portToBind = getAvailablePort(defaultPort);
putRandomPort(name, portToBind);
}
logger.warn("Use random available port(" + portToBind + ") for protocol " + name);
}
}
//保存port到map中,以便后面url使用
map.put(Constants.BIND_PORT_KEY, String.valueOf(portToBind));
// 从环境变量中查找注册的port,如果没有找到,则等于绑定的Port.
String portToRegistryStr = getValueFromConfig(protocolConfig, Constants.DUBBO_PORT_TO_REGISTRY);
Integer portToRegistry = parsePort(portToRegistryStr);
if (portToRegistry == null) {
portToRegistry = portToBind;
}
return portToRegistry;
}
有人或许有疑问,ServiceConfig在实例化时,不是已经加载过Protocol了吗?为什么还要使用ExtensionLoader加载一次呢?
final int defaultPort =ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(name).getDefaultPort();
答: ServiceConfig实例化时,加载的Protocol是自适应的Protocol,是动态生成的,类名是Protocol$Adaptive(见Dubbo源码分析-SPI的应用中有分析)。而这里获取Port时加载的也是Protocol类,但指名了具体加载的是哪个Protocol(本示例是名称为dubbo的Protocol,即DubboProtocol,此类默认的端口是20880)。
发布URL
发布本地服务
调用ServiceConfig类的exportLocal(URL url)发布本地服务。
private void exportLocal(URL url) {
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
//本地服务url
URL local = URL.valueOf(url.toFullString())
.setProtocol(Constants.LOCAL_PROTOCOL)
.setHost(LOCALHOST)
.setPort(0);
ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));
Exporter> exporter = protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
}
}
本示例的本地服务 url是:
injvm://127.0.0.1/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.124.1&bind.port=20880&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=3008&qos.port=22222&side=provider×tamp=1536125473655
重点看这一句:
Exporter> exporter = protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
其中涉及到ProxyFactory和Protocol,下面分别来看一看。
ProxyFactory
proxyFactory也是通过SPI加载的自适应类对象,类名为ProxyFactory$Adaptive,我们来看一下其class文件反编译后的源码。
package org.apache.dubbo.rpc;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
public class ProxyFactory$Adaptive implements ProxyFactory {
public ProxyFactory$Adaptive() {
}
public Invoker getInvoker(Object var1, Class var2, URL var3) throws RpcException {
if (var3 == null) {
throw new IllegalArgumentException("url == null");
} else {
String var5 = var3.getParameter("proxy", "javassist");
if (var5 == null) {
throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.ProxyFactory) name from url(" + var3.toString() + ") use keys([proxy])");
} else {
ProxyFactory var6 = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(var5);
return var6.getInvoker(var1, var2, var3);
}
}
}
public Object getProxy(Invoker var1, boolean var2) throws RpcException {
if (var1 == null) {
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
} else if (var1.getUrl() == null) {
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
} else {
URL var3 = var1.getUrl();
String var4 = var3.getParameter("proxy", "javassist");
if (var4 == null) {
throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.ProxyFactory) name from url(" + var3.toString() + ") use keys([proxy])");
} else {
ProxyFactory var5 = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(var4);
return var5.getProxy(var1, var2);
}
}
}
public Object getProxy(Invoker var1) throws RpcException {
if (var1 == null) {
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
} else if (var1.getUrl() == null) {
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
} else {
URL var2 = var1.getUrl();
String var3 = var2.getParameter("proxy", "javassist");
if (var3 == null) {
throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.ProxyFactory) name from url(" + var2.toString() + ") use keys([proxy])");
} else {
ProxyFactory var4 = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(var3);
return var4.getProxy(var1);
}
}
}
}
其中有三个方法,两个获取代理,一个获取Invoker。我们来看其中的getInvoker方法,默认获取名称为javassist的ProxyFactory。
由于本地服务URL中没有proxy参数,所以会调用JavassistProxyFactory的getInvoker(T proxy, Class type, URL url)方法,返回AbstractProxyInvoker的匿名类对象,此对象代理了服务对象(本示例中为DemoServiceImpl对象)。
其实(ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension("javassist");获取到的并不是JavassistProxyFactory对象,而是StubProxyFactoryWrapper对象,为什么呢?我们可以看下ExtensionLoader的getExtension(String name)方法
public T getExtension(String name) {
//检查name是否合法
if (name == null || name.length() == 0)
throw new IllegalArgumentException("Extension name == null");
//如果name等于true,则加载SPI的默认插件
if ("true".equals(name)) {
return getDefaultExtension();
}
//从当前插件类的缓存实例对象中获取
Holder