dubbo源码之服务消费

时间:2023-03-09 22:48:52
dubbo源码之服务消费

消费端启动初始化过程

  消费端的代码解析也是从配置文件解析开始的,服务发布对应的<dubbo:service,解析xml的时候解析了一个ServiceBean,并且调用ServiceConfig进行服务的发布。服务的消费对应的<dubbo:reference,在初始化的过程中也解析了一个 ReferenceBean类去做处理。在bean加载后会调用里面的 afterPropertiesSet()这个方法。也就是把配置文件解析到各个对应bean里面。然后调用了getObject():

public Object getObject() throws Exception {
return get();
}

  然后也有一个 ReferenceConfig这个类来一起执行,进入他的get()方法:

public synchronized T get() {
if (destroyed) {
throw new IllegalStateException("Already destroyed!");
}
if (ref == null) {
init();//初始化
}
return ref;
}

  然后进入初始化操作。init() 方法里的操作跟服务发布的时候很类似,还是检查各种配置,解析各种标签,最后封装到一个 map 里面。然后调用 ref = createProxy(map); 返回一个代理对象,对于该方法体内前面很多代码都是执行初始化。判断虚拟机情况,组装URL什么的。主要关于服务端调用的的部分代码如下:

} else { //从注册中心上获得相应的协议url地址
List<URL> us = loadRegistries(false);
if (us != null && us.size() > 0) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls == null || urls.size() == 0) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
} if (urls.size() == 1) {
// 获得invoker代理对象MockClusterInvoker
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // 用了最后一个registry url }
}
if (registryURL != null) { //有 注册中心协议的URL
//对有注册中心的Cluster 只用 AvailableCluster
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { //不是 注册中心的URL
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
Boolean c = check;
if (c == null && consumer != null) {
c = consumer.isCheck();
}
if (c == null) {
c = true; // default true
}
if (c && !invoker.isAvailable()) {
throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
//
return (T) proxyFactory.getProxy(invoker);

  其中 List<URL> us = loadRegistries(false); 是从配置文件解析,最后组装了这么一个地址,基于前面的案例,debug以后发现该地址如下:

registry://192.168.254.135:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-client&dubbo=2.5.4&owner=wuzz&pid=53420&registry=zookeeper&timestamp=1543905535962

  我们可以看到以上代码有一些步骤是关于  Monitor 的操作,暂时不管。由于我们这里注册中心为1个,最后会进入以下操作:

if (urls.size() == 1) {
// 获得invoker代理对象
invoker = refprotocol.refer(interfaceClass, urls.get(0));
}

  可以从代码中发现这个 refprotocol 又是一个自适应适配器 Protocol$Adaptive ,他会调用里面的refer() 方法:

public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1)
throws com.alibaba.dubbo.rpc.RpcException {
if (arg1 == null)
throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url("
+ url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader
.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}

  又是熟悉的套路,这里代码中的 url.getProtocol() = registry ,所以 extension 就是 RegistryProtocol,调用他的 refer() ,跟发布服务的流程真的很像:

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// url本身是一个 registry://....
// 替换成了<dubbo:registry address="zookeeper://192.168.254.135:2181" />中的zookeeper
// zookeeper://....
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
// 跟服务注册的时候一模一样的代码,最后获得ZookeeperRegistry 获得跟zk的链接
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {// 这里的type是interface com.gupaoedu.dubbo.IGpHello
return proxyFactory.getInvoker((T) registry, type, url);
} // group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0) {
if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
|| "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}
// 最后调用这个
return doRefer(cluster, registry, type, url);
}

  doRefer(cluster, registry, type, url):com.alibaba.dubbo.rpc.cluster.Cluster$Adpative,ZookeeperRigistry,interface com.gupaoedu.dubbo.IGpHello,zookeeper://192.168.254.135:2181/...:

  cluster : 这个是个依赖注入的拓展点:

@SPI(FailoverCluster.NAME)
public interface Cluster { /**
* Merge the directory invokers to a virtual invoker.
*
* @param <T>
* @param directory
* @return cluster invoker
* @throws RpcException
*/
@Adaptive
<T> Invoker<T> join(Directory<T> directory) throws RpcException; }

  而且可以从源码中发现这cluster默认是 FailoverCluster ,而且@Adaptive是标注在方法上,所以会生成一个自适应的适配器 Cluster$Adaptive:

public class Cluster$Adaptive implements com.alibaba.dubbo.rpc.cluster.Cluster {
public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("cluster", "failover");
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])");
com.alibaba.dubbo.rpc.cluster.Cluster extension =
(com.alibaba.dubbo.rpc.cluster.Cluster) ExtensionLoader
.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName);
return extension.join(arg0);
}
}

  很明显,这个时候我们可以知道这个适配器里面会生成一个拓展点,而这个 extension 默认他就是 FailoverCluster 而且在文件 com.alibaba.dubbo.rpc.cluster.Cluster中:

mock=com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterWrapper
failover=com.alibaba.dubbo.rpc.cluster.support.FailoverCluster
failfast=com.alibaba.dubbo.rpc.cluster.support.FailfastCluster
failsafe=com.alibaba.dubbo.rpc.cluster.support.FailsafeCluster
failback=com.alibaba.dubbo.rpc.cluster.support.FailbackCluster
forking=com.alibaba.dubbo.rpc.cluster.support.ForkingCluster
available=com.alibaba.dubbo.rpc.cluster.support.AvailableCluster
mergeable=com.alibaba.dubbo.rpc.cluster.support.MergeableCluster
broadcast=com.alibaba.dubbo.rpc.cluster.support.BroadcastCluster

  我们发现了一个装饰器 Wrapper :mock=com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterWrapper,经过查看其源码,发现内有一个携带了拓展点Cluster的构造方法,所以上面说到的 extension其实是MockClusterWrapper(FailoverCluster)。这里就是之前提到的服务降级里面的 Mock机制,在你 FailoverCluster失败之后调用Mock。

  了解了 Cluster 我们继续进入 doRefer(cluster, registry, type, url)::

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 对多个invoker 进行组装,也就是做到服务的动态上下线的功能
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);//ZookeeperRegistry
directory.setProtocol(protocol);// Protocol$Adaptive
// 组装一个 url=comsumer://192.168.254.135/.......
// 就是ZK上面会注册一个消费的协议地址
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
// 注册一个消费协议地址 最后是zkClient.create()
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
return cluster.join(directory);
}

  最后的两行代码我们从官网上能找到一张图:http://dubbo.apache.org/zh-cn/docs/user/demos/fault-tolerent-strategy.html

dubbo源码之服务消费

  通过 Directory 传入cluster,返回一个 List<invoker>,传入路由,经过路由机制,继续传入负载均衡做一个负载,得到最终调用的 invoker。

  先来看一下 cluster.join(directory):实际会调用到 MockClusterWrapper 的 join方法:

public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new MockClusterInvoker<T>(directory,
this.cluster.join(directory));
}

  到这里new 出来一个 MockClusterInvoker 。到此为止 invoker = refprotocol.refer(interfaceClass, urls.get(0));所得到的对象就是  MockClusterInvoker 。

  接着流程来到了 proxyFactory.getProxy(invoker) :

  proxyFactory 这个东西是否是曾相识呢?? 在服务的发布之前也调用了proxyFactory去获得一个服务的代理对象去发布,而且这个proxyFactory对应的是 :先是生成一个ProxyFactory$Adpative,再调用里面的getProxy,通过默认实现extName=Javassist来获得一个Extension,即JavassistProxyFactory,然后去调用他的getProxy方法,但是在拓展点文件 com.alibaba.dubbo.rpc.ProxyFactory文件中:

stub=com.alibaba.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper
jdk=com.alibaba.dubbo.rpc.proxy.jdk.JdkProxyFactory
javassist=com.alibaba.dubbo.rpc.proxy.javassist.JavassistProxyFactory

  说明有个装饰的Wapper,所以这里需要包装 ,所以proxyFactory其实就是StubProxyFactoryWrapper(JavassistProxyFactory):做了一系列装饰以后生成本地stub存根,还是会进入 JavassistProxyFactory 的getProxy方法:

public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}

  该方法内基于javassist 机制生成一个动态的代理类,跟之前手写RPC时候获取代理类有点像了,跟之前服务端发布服务获取invoker类似,这个里面最后组装了一个 Class<?> pc = ccm.toClass(); debug看到里面有个 mMethods是一个数组:

public java.lang.String sayHello(java.lang.String arg0){
Object[] args = new Object[1];
args[0] = ($w)$1;
Object ret = handler.invoke(this, methods[0], args);
return (java.lang.String)ret;
} public java.lang.Object $echo(java.lang.Object arg0) {
Object[] args = new Object[1];
args[0] = ($w) $1;
Object ret = handler.invoke(this, methods[1], args);
return (java.lang.Object) ret;
}

  最后调用方法的时候通过 InvokerInvocationHandler 的invoke去进行远程调用。最后生成的代理类当中会有一个 InvokerInvocationHandler 属性,并且要通过构造进行赋值,也就是上面的Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));这段代码,然后再通过哪个生成的pc这个代理类的 sayHello去调用方法 。至此,消费端去消费获得引用的流程就结束了。

什么时候建立和服务端的连接:

  消费端的初始化过程,但是似乎没有看到客户端和服务端建立NIO连接。实际上,建立连接的过程在消费端初始化的时候就建立好的,只是前面我们没有分析,代码在RegistryProtocol.doRefer方法内directory.subscribe方法中:做订阅

directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));

  调用RegistryDirectory<T>里面的subscribe(URL url):

public void subscribe(URL url) {
// url=consumer://192.168.254.1/com.gupaoedu.dubbo.IGpHello?....
setConsumerUrl(url);//设置消费端URL
registry.subscribe(url, this);
}

  这里的registry就是之前所设置的ZookeeperRegistry ,url 就是comsumer://....,this就是 RegistryDirectory,由于ZookeeperRegistry没有实现 subscribe方法,所以去他的父类中去找,父类是 FailbackRegistry:

public void subscribe(URL url, NotifyListener listener) {
if (destroyed.get()){
return;
}
super.subscribe(url, listener);
//移除失败的订阅
removeFailedSubscribed(url, listener);
try {
// 向服务器端发送订阅请求
doSubscribe(url, listener);
} catch (Exception e) {
Throwable t = e; List<URL> urls = getCacheUrls(url);
if (urls != null && urls.size() > 0) {
notify(url, listener, urls);
logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
} else {
// 如果开启了启动检测,则直接抛出异常
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true);
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
} // 将失败的订阅请求记录到失败列表,定时重试
addFailedSubscribed(url, listener);
}
}

  doSubscribe(url, listener);是个模板方法,会在子类中实现,即 ZookeeperRegistry:

protected void doSubscribe(final URL url, final NotifyListener listener) {
try {//这里的Interface是com.gupaoedu.dubbo.IGpHello,明显不相等,走else
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
public void childChanged(String parentPath, List<String> currentChilds) {
for (String child : currentChilds) {
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
}
});
zkListener = listeners.get(listener);
}
zkClient.create(root, false);
List<String> services = zkClient.addChildListener(root, zkListener);
if (services != null && services.size() > 0) {
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
} else {// 进入这里
List<URL> urls = new ArrayList<URL>();
//toCategoriesPath(url) 会得到一个数组:[/dubbo/com.gupaoedu.dubbo.IGpHello/providers,
// /dubbo/com.gupaoedu.dubbo.IGpHello/configurators, /dubbo/com.gupaoedu.dubbo.IGpHello/routers]
for (String path : toCategoriesPath(url)) {
//第一次进来是空的,存放监听器
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
// 将url做key 新建一个map做value,放入监听器
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
// listeners ={}
listeners = zkListeners.get(url);
}
// null
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
// 这里新建了一个监听器该监听器为子节点变化监听并且变化后调用notify做事件处理
listeners.putIfAbsent(listener, new ChildListener() {
public void childChanged(String parentPath, List<String> currentChilds) {
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
// 得到该监听器
zkListener = listeners.get(listener);
}
// 创建节点,并且将上面创建的监听器放入
zkClient.create(path, false);
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}

  notify(url, listener, urls):将刚刚的消费者的地址,监听器,以及自己组装的URLS传入:

protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
try {
doNotify(url, listener, urls);
} catch (Exception t) {
//
Map<NotifyListener, List<URL>> listeners = failedNotified.get(url);
if (listeners == null) {
failedNotified.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, List<URL>>());
listeners = failedNotified.get(url);
}
listeners.put(listener, urls);
logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}

  doNotify(url, listener, urls);这个方法最终通过调用链进入 AbstactRegistry :

protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
if ((urls == null || urls.size() == 0)
&& !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
logger.warn("Ignore empty notify urls for subscribe url " + url);
return;
}
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
}
// 通过判断 新建一个map
Map<String, List<URL>> result = new HashMap<String, List<URL>>();
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
List<URL> categoryList = result.get(category);
if (categoryList == null) {
categoryList = new ArrayList<URL>();
result.put(category, categoryList);
}
categoryList.add(u);
}
}// 判断
if (result.size() == 0) {
return;
}//对 notified做个赋值 以comsumer的url为key value为{}
Map<String, List<URL>> categoryNotified = notified.get(url);
if (categoryNotified == null) {
notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
categoryNotified = notified.get(url);
}//循环
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();//获得key
List<URL> categoryList = entry.getValue();//获得value
categoryNotified.put(category, categoryList);//存值
saveProperties(url);//保存到本地配置
listener.notify(categoryList);
}
}

  上述代码中:

for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
List<URL> categoryList = result.get(category);
if (categoryList == null) {
categoryList = new ArrayList<URL>();
result.put(category, categoryList);
}
categoryList.add(u);
}
}

  这段代码片段循环过后会将map做个初始化,结果为:其实就是我们在ZK上面看到的节点

{
configurators=[empty://192.168.254.1/com.gupaoedu.dubbo.IGpHello?application=dubbo-client&category=configurators&dubbo=2.5.4&interface=com.gupaoedu.dubbo.IGpHello&methods=sayHello
&mock=com.gupaoedu.dubbo.TestMock&owner=wuzz&pid=&protocol=dubbo&side=consumer&timeout=&timestamp=],
routers=[empty://192.168.254.1/com.gupaoedu.dubbo.IGpHello?application=dubbo-client&category=routers&dubbo=2.5.4&interface=com.gupaoedu.dubbo.IGpHello&methods=sayHello
&mock=com.gupaoedu.dubbo.TestMock&owner=wuzz&pid=&protocol=dubbo&side=consumer&timeout=&timestamp=],
providers=[dubbo://192.168.254.1:20880/com.gupaoedu.dubbo.IGpHello?anyhost=true&application=dubbo-server&dubbo=2.5.4&generic=false&interface=com.gupaoedu.dubbo.IGpHello&methods=sayHello
&owner=wuzz&pid=&side=provider&timestamp=]}

  在最后循环里面的  listener.notify(categoryList); 进行一个ZK目录节点下的变化做一个更新:其实就是做到服务地址上下线的通知更新(缓存和变更缓存):该方法内前半部分主要是新建了3个关于providers,routers,configurations 3个目录下的地址集合,并进行赋值,然后判断处理,到最后调用refreshInvoker :

    public synchronized void notify(List<URL> urls) {
.......
// providers
refreshInvoker(invokerUrls);
}

  其实RegistryDirectory<T>这个类的作用就是整合 多个invoker ,以及对于zk的指定的服务节点的变化进行监听并进行刷新。refreshInvoker(invokerUrls);这个方法主要的作用就是将传入的 invokerUrls转化成invoker列表,如果已经转换则不再重新引用,直接从缓存中获取,如果传入的invokerUrls不为空,则表示最新的invokerUrls,如果为空,则表示只是下发更新地址活路由router规则:

private void refreshInvoker(List<URL> invokerUrls) {
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // 禁止访问
this.methodInvokerMap = null; // 置空列表
destroyAllInvokers(); // 关闭所有的nvoker
} else {
this.forbidden = false; // 允许访问
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls.size() == 0 && this.cachedInvokerUrls != null) {
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<URL>();
this.cachedInvokerUrls.addAll(invokerUrls);//缓存invokerUrl列表
}
if (invokerUrls.size() == 0) {
return;
}
// 将URL列表转换成invoker列表
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 将方法名映射到invoker列表
// state change
//计算失误不处理
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
return;
}
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
try {
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap);
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}

  最核心的方法则是  Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);这个方法,就是转化的方法:比如一个地址是dubbo://192.168.254.135...这样的地址 转化成一个DubboInvoker,下面来看一下里面的部分代码:

private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
...... if (invoker == null) { //
try {
boolean enabled = true;
if (url.hasParameter(Constants.DISABLED_KEY)) {
enabled = !url.getParameter(Constants.DISABLED_KEY, false);
} else {
enabled = url.getParameter(Constants.ENABLED_KEY, true);
}
if (enabled) {
invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
}
if (invoker != null) { //
newUrlInvokerMap.put(key, invoker);
}
} else {
newUrlInvokerMap.put(key, invoker);
}
.......
}

   这个里面有调了一个拓展点 invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl); 这个protocol是 Protocol$Adaptive ,而Url里面拿到的protocol是dubbo,最后进入了 DubboProtocol的refer:

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}

  这里新建了一个 DubboInvoker,里面getClient(url) 方法就是与服务端产生连接的代码,顺着这条主线下去,我们会在initClient(url) 这个方法里面找到如下代码:

client = Exchangers.connect(url, requestHandler);

  期间判断我们的连接状态,接着进入getExchanger(url).connect(url, handler);这里跟服务端的原理是一样的,可以翻过去看看,然后也是通过NettyTransporter去创建一个连接。

  然后我们回到RegistryDirectory<T>里面的toInvokers(List<URL>) 方法里:

// 缓存key为没有合并消费端参数的URL,不管消费端如何合并,如果服务端URL发生变法,则重新refer
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == ) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
return;
}//把 invoker 放入成员变量中,后续需要去获取
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
try {
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); //
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}

  这里最后获得的 invoker是:

dubbo源码之服务消费

  而且这里是经过包装的 DubboInvoker .到这里,消费端的建立会话跟获取invoker的过程都结束了,下面应该进入数据交互,之前服务端发布的时候获取到的 invoker对象是 JavassistProxyFactory.在代理调用方法的时候会调用 代理的sayHello方法,在动态生成的代理类字节码上我们可以看到他调用的是 handler.invoke,也就是 InvocationHandler 里面的invoke 方法:

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}

  最后调用  invoker.invoke(new RpcInvocation(method, args)).recreate(); 这里需要知道 invoke是什么,其实就是之前在ReferenceConfig类里面的createProxy方法内:

invoker = refprotocol.refer(interfaceClass, urls.get(0));

  其实就是这个 InvokerInvocationHandler 对象,接着我们进入 MockClusterInvoker的invoke方法:

    public Result invoke(Invocation invocation) throws RpcException {
Result result = null;
//*******关键,这个地方会从url里面回去 mock的key,也就是之前配置文件配置的moke
String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
if (value.length() == 0 || value.equalsIgnoreCase("false")) {
//no mock 没有
result = this.invoker.invoke(invocation);
// force 表示强制调用,不仅仅是请求失败的情况采取调用
} else if (value.startsWith("force")) {
if (logger.isWarnEnabled()) {
logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
}
//force:direct mock
result = doMockInvoke(invocation, null);
} else {//所以配置了mock一般会进来这里
//fail-mock
try {
result = this.invoker.invoke(invocation);
} catch (RpcException e) {
if (e.isBiz()) {
throw e;
} else {
if (logger.isWarnEnabled()) {
logger.info("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
}//原来mock机制在这里
result = doMockInvoke(invocation, e);
}
}
}
return result;
}

  由于我是配置了mock的,所以debug可以看到:这里拿到的value就是我们对应的类名

dubbo源码之服务消费

  所以到了 result = this.invoker.invoke(invocation);这个result 就是我们需要的结果,而这个invoker就是他的默认的服务降级的配置了:是他 FailoverClusterInvoker

dubbo源码之服务消费

  为什么是 Failover呢,因为 Cluster的默认拓展点就是这个类,而MockCluster仅仅是对其做了一个封装:进入他的invoke方法会发现 FailoverClusterInvoker对该方法并没有做实现,所以还是跟往常一样,进入父类的方法 AbstractClusterInvoker<T> 这个类的invoke:我们会发现里面有一个 LoadBalance的局部变量,而这个就是光网所提供的图里面的关于负载那块相关的,通过负载便会返回一个具体的invoker去调用

    public Result invoke(final Invocation invocation) throws RpcException {

        checkWhetherDestroyed();
// 做负载
LoadBalance loadbalance; List<Invoker<T>> invokers = list(invocation);
if (invokers != null && invokers.size() > 0) {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
} else {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);
}

  通过 RegistryDiectory 去获取一个invoker的list。在 RegistryDiectory 里面的doList 就是通过之前的成员变量 methodInvokerMap 去拿的

   ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension,又发现拓展点,进 LoadBalance.class 去看可以看到默认实现是 @SPI(RandomLoadBalance.NAME) 随机负载算法。nice

  最后调用  doInvoke(invocation, invokers, loadbalance); 这里是继续进入 FailoverClusterInvoker 的doInvoke方法:该方法拿到了一个委托的Invoker(DelegateInvoker):

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyinvokers = invokers;
checkInvokers(copyinvokers, invocation);
int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//
//
if (i > 0) {
checkWhetherDestroyed();
copyinvokers = list(invocation);
//
checkInvokers(copyinvokers, invocation);
}//做负载 获得一个具体的 invoker
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {//调用该方法获取结果
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " + invocation.getMethodName()
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ " (" + providers.size() + "/" + copyinvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
}
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
+ invocation.getMethodName() + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyinvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
+ (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
}

  因为之前在toInvokers方法中进行转换的时候 是一个DubboInvoker 进行了层层包装,我们又发现 里面 并没有实现这个方法,所以继续找父类 AbstractInvoker 这个里面可以看到invoke方法,该方法内最终是调用了一个 doInvoke 方法,而这个方法是具体调到 Dubboinvoker里面的方法:

protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {// 判断当前的通信方式,是单向啊异步啊什么的
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}

  最后调用 currentClient.request(inv, timeout).get(),获取结果,这个currentClient 就是创建invoker里面的NettyClient,

    public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.组装请求,跟手写RPC框架一样的
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {//通过这个去发送请求
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}

  channel.send(req);这个就是最后的发送方法了,通过 HeaderExchangeChannel 类的 send方法 最后进入 NettyChannel,利用netty做一个发送。这就是整个通信过程

  需要异步调用的话在 reference里面配置 <dubbo:method name="goodbye" async="true"/>

上一张官网的时序图:

dubbo源码之服务消费

消费端调用过程流程图

dubbo源码之服务消费