dubbo-源码分析Consumer

时间:2022-09-06 11:05:38

counsumer使用服务的时候会在xml中配置<dubbo:reference> dubbo在spring.handles里的NamespaceHandle又有如下配置:

registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
registerBeanDefinitionParser("config-center", new DubboBeanDefinitionParser(ConfigCenterBean.class, true));
registerBeanDefinitionParser("metadata-report", new DubboBeanDefinitionParser(MetadataReportConfig.class, true));
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());

所以我们来看下ReferenceBean这个类:

public p class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean

这个类跟proiver一样都实现了InitializingBean接口,所以创建实例化完的时候都会调用他的afterPropertiesSet方法:

public void afterPropertiesSet() throws Exception {
if (applicationContext != null) {
BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ConfigCenterBean.class, false, false);
}

...

Boolean b = isInit();
if (b == null && getConsumer() != null) {
b = getConsumer().isInit();
}
if (b != null && b) {
//我们主要看下这个方法
getObject();
}
}

我们主要来看下这个方法getObject() 这个方法最后会调用到init方法:

private void init() {
if (initialized) {
return;
}
initialized = true;
//检查本地存根
checkStubAndLocal(interfaceClass);
//检查当前接口Mock机制
checkMock(interfaceClass);
Map<String, String> map = new HashMap<String, String>();

map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
appendRuntimeParameters(map);
if (!isGeneric()) {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
}

String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("NO method found in service interface " + interfaceClass.getName());
map.put("methods", Constants.ANY_VALUE);
} else {
map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
map.put(Constants.INTERFACE_KEY, interfaceName);
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, consumer, Constants.DEFAULT_KEY);
appendParameters(map, this);
Map<String, Object> attributes = null;
if (methods != null && !methods.isEmpty()) {
attributes = new HashMap<String, Object>();
for (MethodConfig methodConfig : methods) {
appendParameters(map, methodConfig, methodConfig.getName());
String retryKey = methodConfig.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(methodConfig.getName() + ".retries", "0");
}
}
attributes.put(methodConfig.getName(), convertMethodConfig2AyncInfo(methodConfig));
}
}

String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
if (hostToRegistry == null || hostToRegistry.length() == 0) {
hostToRegistry = NetUtils.getLocalHost();
} else if (isInvalidLocalHost(hostToRegistry)) {
throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
}
map.put(Constants.REGISTER_IP_KEY, hostToRegistry);

//创建接口代理对象 主要就是这里,返回接口的代理类,包括负载均衡算法
ref = createProxy(map);

ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), interfaceClass, ref, interfaceClass.getMethods(), attributes);
ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
}

我们主要看createProxy方法,根据名称我们就可以猜到他是去创建一个代理对象:

private T createProxy(Map<String, String> map) {
URL tmpUrl = new URL("temp", "localhost", 0, map);
final boolean isJvmRefer;
//判断是否是在本地暴露
if (isInjvm() == null) {
//当url不为空时认为不是本地暴露
if (url != null && url.length() > 0) { // if a url is specified, don't do local reference
isJvmRefer = false;
} else {
// by default, reference local service if there is
isJvmRefer = InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl);
}
} else {
isJvmRefer = isInjvm();
}

if (isJvmRefer) {
//只调用当前jvm下服务
URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
} else {
//当是直接在<dubbo:reference url="xxx"/>配置时直接用配置的url
if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (url.getPath() == null || url.getPath().length() == 0) {
url = url.setPath(interfaceName);
}
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
//当上面都不是时去注册中心获取信息
} else { // assemble URL from register center's configuration

checkRegistry();
List<URL> us = loadRegistries(false);
if (us != null && !us.isEmpty()) {
for (URL u : us) {
//获取监控中心地址
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}

if (urls.size() == 1) {
//监听注册中心包括创建invoker 负载均衡都在这里
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
//当有多个url时使用最后一个注册中心地址 registry://
for (URL url : urls) {
//RegistryProtocol.refer
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // registry url is available
// use RegistryAwareCluster only when register's cluster is available
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, RegistryAwareCluster.NAME);
// The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // not a registry url, must be direct invoke.
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}

Boolean c = check;
if (c == null && consumer != null) {
c = consumer.isCheck();
}
if (c == null) {
c = true; // default true
}
if (c && !invoker.isAvailable()) {
// make it possible for consumer to retry later if provider is temporarily unavailable
initialized = false;
throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
/**
* @since 2.7.0
* ServiceData Store
*/
MetadataReportService metadataReportService = null;
if ((metadataReportService = getMetadataReportService()) != null) {
URL consumerURL = new URL(Constants.CONSUMER_PROTOCOL, map.remove(Constants.REGISTER_IP_KEY), 0, map.get(Constants.INTERFACE_KEY), map);
metadataReportService.publishConsumer(consumerURL);
}
// create service proxy
//为invoker创建一个代理对象
return (T) proxyFactory.getProxy(invoker);
}

上面的代码有点多,最主要的逻辑都在refprotocol.refer(interfaceClass, urls.get(0));这个方法里面,我们都知道refprotocol是一个自适应的扩展点,所以他会根据你的协议去调用对应的refer方法,我们的url是一个registry://协议,所以会调用到RegistryProtocol的refer方法,我们来看下:

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
//设置URL的protocol 默认是dubbo并移除registry 参数
//这里的protocol不是指前面的registry://头,是指parameter里面的registry参数值,我这里是zookeeper
//下面setProtocol后就是zookeeper://了
url = url.setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY)).removeParameter(REGISTRY_KEY);
//所以这里获取到的就是zookeeperRegistry
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}

// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}
return doRefer(cluster, registry, type, url);
}

这个方法主要是去设置一个协议头,我这里是用到zookeeper所以url设置完之后会是zookeeper://xx这样的格式,所以下面的registry也是ZookeeperRegistry 最后会调用doRefer:

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
//上面传过来的是zookeeperRegistry
directory.setRegistry(registry);
//这里的protocol是一个自适应扩展点
directory.setProtocol(protocol);
// all attributes of REFER_KEY
//获取url里的所以参数
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
//这里的url顺序是 protocol,host,port,path parameters
//所以这个url的protocol是consumer,host是参数移除掉registry.ip后的字符串拼接,port是0,path就是我们代理的接口了
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
//这里的REGISTER_KEY在上面移除掉了,所以这里是空的,取默认值也就是true
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
//getRegisteredConsumerUrl里面会添加check=false和category=consumer参数
//接着会进入zookeeperRegistry的register方法,因为zookeeperRegistry没重写该方法,所以会到父类
//FailbackRegistry的register方法
registry.register(getRegisteredConsumerUrl(subscribeUrl, url));
}
//subscribeUrl=consumer://192.168.1.2/com.lin.service.UserService?application=annotation-consumer
// &default.timeout=3000&dubbo=2.0.2&interface=com.lin.service.UserService
// &methods=add,findUserByName,findUserById&pid=11180&release=2.7.0&side=consumer&timestamp=1553971735120
//构建路由链
directory.buildRouterChain(subscribeUrl);
//订阅
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));

Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}

这个方法比较复杂,最主要有三步,注册zookeeper然后根据url去构建一个调用链,然后去订阅url,我们一个一个来看下,先看下registry的register,因为上面传进来的是zookeeperRegistry 它又没重写register所以会到父类的方法:

public void register(URL url) {
//继续进入父类的方法 主要是加入已注册标识集合中
super.register(url);
//在注册失败集合中移除该url
removeFailedRegistered(url);
//在未注册失败集合中移除该url
removeFailedUnregistered(url);
try {
// Sending a registration request to the server side
//注册服务 是模版模式,所以会在子类实现 我这里是zookeeperRegistry
doRegister(url);
} catch (Exception e) {
Throwable t = e;

// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}

// Record a failed registration request to a failed list, retry regularly
addFailedRegistered(url);
}
}

这个方法比较简单,我们直接看doRegister方法,他是在zookeeperRegistry实现的:

@Override
public void doRegister(URL url) {
try {
//创建一个临时节点
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
private String toUrlPath(URL url) {
//toCtegoryPath是获取节点的路径也就是/dubbo/xxx.xxx.xxx/consumers
//全路径是:/dubbo/com.lin.service.UserService/consumers/consumer%3A%2F%2F192.168.1.2%2Fcom.lin.service.UserService%3Fapplication%3Dannotation-consumer%26category%3Dconsumers%26check%3Dfalse%26default.timeout%3D3000%26dubbo%3D2.0.2%26interface%3Dcom.lin.service.UserService%26methods%3Dadd%2CfindUserByName%2CfindUserById%26pid%3D11180%26release%3D2.7.0%26side%3Dconsumer%26timestamp%3D1553971735120
return toCategoryPath(url) + Constants.PATH_SEPARATOR + URL.encode(url.toFullString());
}

看到这里我们知道这里主要是去zookeeper创建一个consumer的临时结点,这样我们的register就先看到这里,我们继续看他下一步的构建调用链是怎样的也就是buildRouterChain方法,他里面会调用:

public static <T> RouterChain<T> buildChain(URL url) {
return new RouterChain<>(url);
}
private RouterChain(URL url) {
//根据url去获取RouterFactory的自适应扩展点列表
//这里会获得配置的
//MockRouterFactory,TagRouterFactory,AppRouterFactory,ServiceRouterFactory
//配置文件有7个类,但只有类上标识或者方法上标识了Activate注解的才是自适应类,所以只有这四个符合
List<RouterFactory> extensionFactories = ExtensionLoader.getExtensionLoader(RouterFactory.class)
.getActivateExtension(url, (String[]) null);

//MockInvokersSelector、TagRouter、AppRouter、ServiceRouter
List<Router> routers = extensionFactories.stream()
.map(factory -> factory.getRouter(url))
.collect(Collectors.toList());

//配置routers
initWithRouters(routers);
}

根据上面可以知道我们的调用链其实就是MockInvokersSelector、TagRouter、AppRouter、ServiceRouter这四个,最后会封装为一个RouterChain,目前我们先知道这个调用链会有这四个就行了,再来看订阅方法这里subscribe:

public void subscribe(URL url) {
setConsumerUrl(url);
//添加到添加消费监听列表中
consumerConfigurationListener.addNotifyListener(this);
//添加到服务监听列表
serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
registry.subscribe(url, this);
}
public void subscribe(URL url, NotifyListener listener) {
//把listener添加到该url的监听列表中
super.subscribe(url, listener);
//移除失败的订阅
removeFailedSubscribed(url, listener);
try {
// Sending a subscription request to the server side
//真正订阅的地方
doSubscribe(url, listener);
} catch (Exception e) {
Throwable t = e;

List<URL> urls = getCacheUrls(url);
if (urls != null && !urls.isEmpty()) {
notify(url, listener, urls);
logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
} else {
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true);
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}

// Record a failed registration request to a failed list, retry regularly
addFailedSubscribed(url, listener);
}
}

再看下真正干活的方法doSubscribe 这个方法同样也是在zookeeperRegistry里实现的:

doSubscribe(final URL url, final NotifyListener listener) {
try {
//当url的服务接口是*号时
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
for (String child : currentChilds) {
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
}
});
zkListener = listeners.get(listener);
}
zkClient.create(root, false);
List<String> services = zkClient.addChildListener(root, zkListener);
if (services != null && !services.isEmpty()) {
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
} else {
List<URL> urls = new ArrayList<URL>();
for (String path : toCategoriesPath(url)) {
//path=/dubbo/xxx.xxx.xx/providers
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
//如果listener不存在listeners中的时候添加到集合中
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
}
//因为zkClient是由zookeeperTransporter创建的,zookeeperTransporter又是这里zkClient会是CuratorZookeeperTransporter
//的实例,所以zkClient是CuratorZookeeperClient
//这里会去创建/dubbo/com.lin.service.UserService/providers路径,如果已经存在就返回
zkClient.create(path, false);
//这里添加对该路径的监听器
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
//通知
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}

这里会先去创建一个ChildListener去添加到listeners中,然后zkClient会把这个listeners添加到自己的集合中,

首先zkClient这个类是由zookeeperTransporter去创建的,zookeeperTransporter这个类又是一个自适应扩展类,所以他会根据环境去创建一个CuratorZookeeperClient实例,所以最后会进入CuratorZookeeperClient的addChildListener这个方法:

@Override
public List<String> addChildListener(String path, final ChildListener listener) {
ConcurrentMap<ChildListener, TargetChildListener> listeners = childListeners.get(path);
if (listeners == null) {
childListeners.putIfAbsent(path, new ConcurrentHashMap<ChildListener, TargetChildListener>());
listeners = childListeners.get(path);
}
TargetChildListener targetListener = listeners.get(listener);
if (targetListener == null) {
listeners.putIfAbsent(listener, createTargetChildListener(path, listener));
targetListener = listeners.get(listener);
}
//添加一个watch
return addTargetChildListener(path, targetListener);
}
public List<String> addTargetChildListener(String path, CuratorWatcher listener) {
try {
return client.getChildren().usingWatcher(listener).forPath(path);
} catch (NoNodeException e) {
return null;
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}

所以可以看到这里会去zookeeper注册一个watcher,这个path如果有变动的话都会通知到lisener,这个path我们要记得这个path其实是我们的/dubbo/com.lin.service.UserService/providers这个路径,所以当有服务提供者添加或者下线的时候我们都会收到通知就可以去更新我们的service列表了,我们现在也知道服务者的更新是怎样的了,我们继续看下上面原来的订阅后我们要继续处理的东西 也就是:

Invoker invoker = cluster.join(directory);

这个cluster是我们上面传进来的,这个cluster也是一个自适应的扩展点,他会根据我们directory的url里面的参数cluster去调用对应的方法,因为我们的url里面没有配置cluster所以他会去调用默认的FailbackCluster,又因为我们的dubbo spi 中关于Cluster的配置中又有一个包装类mock:

mock=org.apache.dubbo.rpc.cluster.support.wrapper.MockClusterWrapper

所以他会去用MockClusterWrapper去包装一个FailbackCluster:

public class MockClusterWrapper implements Cluster {

private Cluster cluster;

public MockClusterWrapper(Cluster cluster) {
this.cluster = cluster;
}

@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
//cluster是FailbackCluster
return new MockClusterInvoker<T>(directory,
this.cluster.join(directory));
}

}
public class FailoverCluster implements Cluster {

public final static String NAME = "failover";

@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<T>(directory);
}

}

所以cluster.join返回的是一个MockClusterWrapper包装类,FailbackCluster.join又回返回一个FailoverClusterInvoker,所以我们Invoker的结构是这样的

new MockClusterInvoker(directory,new FailoverClusterInvoker(directory));

最后会根据这个invoker去创建一个代理类:

// create service proxy
return (T) proxyFactory.getProxy(invoker);

那我们的服务负载均衡到底是在哪里初始化的呢,我们继续来看下,因为我们是根据invoker去为我们的服务接口做代理的,那我们使用接口调用方法的时候其实都会调用到我们invoker的invoker方法,那这样我们就先来看下

MockClusterInvoker的invoker方法:

public Result invoke(Invocation invocation) throws RpcException {
Result result = null;

String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
if (value.length() == 0 || value.equalsIgnoreCase("false")) {
//no mock
result = this.invoker.invoke(invocation);
} else if (value.startsWith("force")) {
if (logger.isWarnEnabled()) {
logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
}
//force:direct mock
result = doMockInvoke(invocation, null);
} else {
//fail-mock
try {
result = this.invoker.invoke(invocation);
} catch (RpcException e) {
if (e.isBiz()) {
throw e;
} if (logger.isWarnEnabled()) {
logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
}
result = doMockInvoke(invocation, e);
}
}
return result;
}

所以我们后面还会调用到FailoverClusterInvoker的invoker方法,因为FailoverClusterInvoker没有重写invoke(Invocation invocation)方法,所以我们到他的父类来继续看:

public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();

// binding attachments into invocation.
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
}
//获取调用链
List<Invoker<T>> invokers = list(invocation);
//初始化负载均衡器
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);
}

所以这里初始化了我们的负载均衡器:

protected LoadBalance initLoadBalance(List<Invoker<T>> invokers, Invocation invocation) {
if (CollectionUtils.isNotEmpty(invokers)) {
//当我们的方法没有loadbalance配置的时候就使用默认的random
return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
} else {
return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
}
}

所以这里我们可以得到我们使用的负载均衡器了。

dubbo-源码分析Consumer的更多相关文章

  1. dubbo源码分析2-reference bean发起服务方法调用

    dubbo源码分析1-reference bean创建 dubbo源码分析2-reference bean发起服务方法调用 dubbo源码分析3-service bean的创建与发布 dubbo源码分 ...

  2. dubbo源码分析一:整体分析

    本文作为dubbo源码分析的第一章,先从总体上来分析一下dubbo的代码架构.功能及优缺点,注意,本文只分析说明开源版本提供的代码及功能. 1.dubbo的代码架构:  spring适配层:常规的sp ...

  3. Dubbo 源码分析 - 服务调用过程

    注: 本系列文章已捐赠给 Dubbo 社区,你也可以在 Dubbo 官方文档中阅读本系列文章. 1. 简介 在前面的文章中,我们分析了 Dubbo SPI.服务导出与引入.以及集群容错方面的代码.经过 ...

  4. Dubbo 源码分析 - 集群容错之 Router

    1. 简介 上一篇文章分析了集群容错的第一部分 -- 服务目录 Directory.服务目录在刷新 Invoker 列表的过程中,会通过 Router 进行服务路由.上一篇文章关于服务路由相关逻辑没有 ...

  5. Dubbo源码分析

    Dubbo源码分析1 Dubbo源码分析2 dubbo源码阅读:rpc请求处理流程(1) 架构设计:系统间通信(17)——服务治理与Dubbo 中篇(分析) 13. Dubbo原理解析-注册中心之Zo ...

  6. dubbo源码分析6-telnet方式的管理实现

    dubbo源码分析1-reference bean创建 dubbo源码分析2-reference bean发起服务方法调用 dubbo源码分析3-service bean的创建与发布 dubbo源码分 ...

  7. dubbo源码分析1-reference bean创建

    dubbo源码分析1-reference bean创建 dubbo源码分析2-reference bean发起服务方法调用 dubbo源码分析3-service bean的创建与发布 dubbo源码分 ...

  8. dubbo源码分析3-service bean的创建与发布

    dubbo源码分析1-reference bean创建 dubbo源码分析2-reference bean发起服务方法调用 dubbo源码分析3-service bean的创建与发布 dubbo源码分 ...

  9. dubbo源码分析4-基于netty的dubbo协议的server

    dubbo源码分析1-reference bean创建 dubbo源码分析2-reference bean发起服务方法调用 dubbo源码分析3-service bean的创建与发布 dubbo源码分 ...

  10. dubbo源码分析5-dubbo的扩展点机制

    dubbo源码分析1-reference bean创建 dubbo源码分析2-reference bean发起服务方法调用 dubbo源码分析3-service bean的创建与发布 dubbo源码分 ...

随机推荐

  1. JS preventDefault ,stopPropagation ,return false

    所谓的事件有两种:监听事件和浏览器对特殊标签元素的默认行为事件.监听事件:在节点上被监听的事件操作,如 select节点的change事件,a节点的click事件.浏览器的默认事件:特定页面元素上带的 ...

  2. BZOJ 1047&colon; &lbrack;HAOI2007&rsqb;理想的正方形

    题目 单调队列是个很神奇的东西,我以前在博客写过(吧) 我很佩服rank里那些排前几的大神,700ms做了时限10s的题,简直不能忍.(但是我还是不会写 我大概一年半没写单调队列,也有可能根本没有写过 ...

  3. MFC学习笔记(一)向模态对话框传递数据

    声明构造函数为2个参数,具有默认参数的参数须放在后面. CDialogDimmer::CDialogDimmer(CString name,CWnd* pParent /*=NULL*/) : CDi ...

  4. 基于DCMTK的DICOM相关程序编写攻略

    2008年09月10日 星期三 15:35 基于DCMTK的DICOM相关程序编写攻略 前言: 由于现在的医学影像设备的图像存储和传输正在逐渐向DICOM标准靠拢,在我们进行医学图像处理的过程中,经常 ...

  5. Python的map、filter、reduce函数 &lbrack;转&rsqb;

    1. map函数func作用于给定序列的每个元素,并用一个列表来提供返回值. map函数python实现代码: def map(func,seq): mapped_seq = []        fo ...

  6. UVA 12230 - Crossing Rivers&lpar;概率&rpar;

    UVA 12230 - Crossing Rivers 题目链接 题意:给定几条河,每条河上有来回开的船,某一天出门,船位置随机,如今要求从A到B,所须要的期望时间 思路:每条河的期望,最坏就是船刚开 ...

  7. Codeforces 487C&period; Prefix Product Sequence 逆&plus;结构体

    意甲冠军: 对于数字n, 他询问是否有1~n置换 这种布置能够在产品上模每个前缀n 有可能0~n-1 解析: 通过观察1肯定要在首位,n一定要在最后 除4意外的合数都没有解 其它质数构造 a[i]=i ...

  8. html标签大全(1)

     http标签详解及讲解        1.基础标签 <!DOCTYPE html> <!--表示文本类型--> <html> <!--<html&gt ...

  9. CentOS7配置php7&period;0支持redis

    配置之前应该是环境已经搭好了,phpinfo的页面可以加载出来. 使用git clone下载git上的phpredis扩展包 [root@VM_103_117_centos ]#git clone   ...

  10. bzoj 2437&colon; &lbrack;Noi2011&rsqb;兔兔与蛋蛋

    Description Solution 考虑犯错误的条件:之前是处于必胜状态,该操作之后就变成了必败状态. 我们可以把这个过程看成两人对网格图进行黑白染色,变成了一个二分图模型,即当前位置向相邻不同 ...