Dubbo(七):redis注册中心的应用

时间:2023-03-10 01:18:47
Dubbo(七):redis注册中心的应用

  上篇我们讲了Dubbo中有一个非常本质和重要的功能,那就是服务的自动注册与发现,而这个功能是通过注册中心来实现的。上篇中使用zookeeper实现了注册中心的功能,同时了提了dubbo中有其他许多的注册中心的实现。

  今天我们就来看看另一个注册中心的实现吧: redis 。

1. dubbo在 Redis 中的服务分布

  dubbo在zk中的服务体现是一个个的文件路径形式,如 /dubbo/xxx.xx.XxxService/providers/xxx 。 而在redis中,则体现是一个个的缓存key-value。具体分布如下:

    /dubbo/xxx.xx.XxxService/providers: 以hash类型存放所有提供者列表, 每个hash的字段为 url -> expireTime
    /dubbo/xxx.xx.XxxService/consumers: 以hash类型存放所有消费者列表, 每个hash的字段为 url -> expireTime
    /dubbo/xxx.xx.XxxService/configurators: 存放配置信息
    /dubbo/xxx.xx.XxxService/routers: 存放路由配置信息

  如上,同样,redis也是以service为粒度进行存储划分的。

2. Redis 组件的接入

  你可能需要先引入redis注册依赖包:

        <dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-registry-redis</artifactId>
</dependency>

  在配置dubbo服务时,需要将注册中心换为 redis, 如下选合适的一个即可:

    <dubbo:registry address="redis://127.0.0.1:6379" cluster="failover" />
<dubbo:registry address="redis://10.20.153.10:6379?backup=10.20.153.11:6379,10.20.153.12:6379" cluster="failover" />
<dubbo:registry protocol="redis" address="127.0.0.1:6379" cluster="failover" />
<dubbo:registry protocol="redis" address="10.20.153.10:6379,10.20.153.11:6379,10.20.153.12:6379" cluster="failover" />

  cluster 设置 redis 集群策略,缺省为 failover:(这个配置不会和集群容错配置有误会么,尴尬)

    failover: 失效转移策略。只写入和读取任意一台,失败时重试另一台,需要服务器端自行配置数据同步;

    replicate: 复制模式策略。在客户端同时写入所有服务器,只读取单台,服务器端不需要同步,注册中心集群增大,性能压力也会更大;

  redis作为注册中心与zk作为注册的前置操作都是一样的。都是一是作为服务提供者时会在 ServiceConfig#doExportUrlsFor1Protocol 中,进行远程服务暴露时会拉起。二是在消费者在进行远程调用时会 ReferenceConfig#createProxy 时拉取以便获取提供者列表。

  只是在依赖注入 RegistryFactory 时,根据是 zookeeper/redis, 选择了不一样的 RegistryFactory, 所以创建了不同的注册中心实例。

  redis 中根据SPI的配置创建, RedisRegistryFactory 工厂, 配置文件 META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory 的内容如下:

redis=org.apache.dubbo.registry.redis.RedisRegistryFactory
    /**
* Get an instance of registry based on the address of invoker
*
* @param originInvoker
* @return
*/
protected Registry getRegistry(final Invoker<?> originInvoker) {
URL registryUrl = getRegistryUrl(originInvoker);
// RegistryFactory 又是通过 SPI 机制生成的
// 会根据具体的注册中心的类型创建调用具体实例,如此处为: redis, 所以会调用 RedisRegistryFactory.getRegistry()
return registryFactory.getRegistry(registryUrl);
}
// 所有 RegistryFactory 都会被包装成 RegistryFactoryWrapper, 以便修饰
// org.apache.dubbo.registry.RegistryFactoryWrapper#getRegistry
@Override
public Registry getRegistry(URL url) {
// 对于zk, 会调用 RedisRegistryFactory
return new ListenerRegistryWrapper(registryFactory.getRegistry(url),
Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(RegistryServiceListener.class)
.getActivateExtension(url, "registry.listeners")));
}
// org.apache.dubbo.registry.support.AbstractRegistryFactory#getRegistry(org.apache.dubbo.common.URL)
@Override
public Registry getRegistry(URL url) {
if (destroyed.get()) {
LOGGER.warn("All registry instances have been destroyed, failed to fetch any instance. " +
"Usually, this means no need to try to do unnecessary redundant resource clearance, all registries has been taken care of.");
return DEFAULT_NOP_REGISTRY;
} url = URLBuilder.from(url)
.setPath(RegistryService.class.getName())
.addParameter(INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(EXPORT_KEY, REFER_KEY)
.build();
String key = createRegistryCacheKey(url);
// Lock the registry access process to ensure a single instance of the registry
LOCK.lock();
try {
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
//create registry by spi/ioc
// 调用子类方法创建 registry 实例,此处为 RedisRegistryFactory.createRegistry
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
REGISTRIES.put(key, registry);
return registry;
} finally {
// Release the lock
LOCK.unlock();
}
}
// org.apache.dubbo.registry.redis.RedisRegistryFactory#createRegistry
@Override
protected Registry createRegistry(URL url) {
// 最终将redis组件接入到应用中了,后续就可以使用redis提供的相应功能了
return new RedisRegistry(url);
}

  至此,redis被接入了。我们先来看下 redis 注册中心构造方法实现:

    // org.apache.dubbo.registry.redis.RedisRegistry#RedisRegistry
public RedisRegistry(URL url) {
// RedisRegistry 与zk一样,同样继承了 FailbackRegistry
// 所以,同样会创建retryTimer, 同样会创建缓存文件
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
// 使用redis连接池处理事务
// 设置各配置项
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setTestOnBorrow(url.getParameter("test.on.borrow", true));
config.setTestOnReturn(url.getParameter("test.on.return", false));
config.setTestWhileIdle(url.getParameter("test.while.idle", false));
if (url.getParameter("max.idle", 0) > 0) {
config.setMaxIdle(url.getParameter("max.idle", 0));
}
if (url.getParameter("min.idle", 0) > 0) {
config.setMinIdle(url.getParameter("min.idle", 0));
}
if (url.getParameter("max.active", 0) > 0) {
config.setMaxTotal(url.getParameter("max.active", 0));
}
if (url.getParameter("max.total", 0) > 0) {
config.setMaxTotal(url.getParameter("max.total", 0));
}
if (url.getParameter("max.wait", url.getParameter("timeout", 0)) > 0) {
config.setMaxWaitMillis(url.getParameter("max.wait", url.getParameter("timeout", 0)));
}
if (url.getParameter("num.tests.per.eviction.run", 0) > 0) {
config.setNumTestsPerEvictionRun(url.getParameter("num.tests.per.eviction.run", 0));
}
if (url.getParameter("time.between.eviction.runs.millis", 0) > 0) {
config.setTimeBetweenEvictionRunsMillis(url.getParameter("time.between.eviction.runs.millis", 0));
}
if (url.getParameter("min.evictable.idle.time.millis", 0) > 0) {
config.setMinEvictableIdleTimeMillis(url.getParameter("min.evictable.idle.time.millis", 0));
}
// redis 复用了cluster配置项?
String cluster = url.getParameter("cluster", "failover");
if (!"failover".equals(cluster) && !"replicate".equals(cluster)) {
throw new IllegalArgumentException("Unsupported redis cluster: " + cluster + ". The redis cluster only supported failover or replicate.");
}
replicate = "replicate".equals(cluster); List<String> addresses = new ArrayList<>();
addresses.add(url.getAddress());
String[] backups = url.getParameter(RemotingConstants.BACKUP_KEY, new String[0]);
if (ArrayUtils.isNotEmpty(backups)) {
addresses.addAll(Arrays.asList(backups));
}
//获得Redis主节点名称
String masterName = url.getParameter(REDIS_MASTER_NAME_KEY);
if (StringUtils.isEmpty(masterName)) {
//单机版redis
for (String address : addresses) {
int i = address.indexOf(':');
String host;
int port;
if (i > 0) {
host = address.substring(0, i);
port = Integer.parseInt(address.substring(i + 1));
} else {
host = address;
port = DEFAULT_REDIS_PORT;
}
this.jedisPools.put(address, new JedisPool(config, host, port,
url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT), StringUtils.isEmpty(url.getPassword()) ? null : url.getPassword(),
url.getParameter("db.index", 0)));
}
} else {
//哨兵版redis
Set<String> sentinelSet = new HashSet<>(addresses);
int index = url.getParameter("db.index", 0);
int timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
String password = StringUtils.isEmpty(url.getPassword()) ? null : url.getPassword();
JedisSentinelPool pool = new JedisSentinelPool(masterName, sentinelSet, config, timeout, password, index);
this.jedisPools.put(masterName, pool);
} this.reconnectPeriod = url.getParameter(REGISTRY_RECONNECT_PERIOD_KEY, DEFAULT_REGISTRY_RECONNECT_PERIOD);
String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(PATH_SEPARATOR)) {
group = PATH_SEPARATOR + group;
}
if (!group.endsWith(PATH_SEPARATOR)) {
group = group + PATH_SEPARATOR;
}
this.root = group;
// session=60000, 默认1分钟过期
this.expirePeriod = url.getParameter(SESSION_TIMEOUT_KEY, DEFAULT_SESSION_TIMEOUT);
// 使用定时任务刷新存活状态,相当于心跳维护线程,定时任务频率为 session有效其的1/2
this.expireFuture = expireExecutor.scheduleWithFixedDelay(() -> {
try {
deferExpired(); // Extend the expiration time
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected exception occur at defer expire time, cause: " + t.getMessage(), t);
}
}, expirePeriod / 2, expirePeriod / 2, TimeUnit.MILLISECONDS);
}

  RedisRegistry构造方法中,主要完成redis配置信息的转换接入,创建连接池,默认使用0号数据库。另外,每个客户端都是单例的RedisRegistry, 所以也就是说会开启一个过期扫描定时任务(可以称之为心跳任务)。

3. Redis 服务提供者注册

  与ZK过程类似,服务注册主要就分两步:1. 获取registry实例(通过SPI机制); 2. 将服务的信息注册到注册中心。只是zk是路径,redis是kv.

    // org.apache.dubbo.registry.redis.RedisRegistry#doRegister
@Override
public void doRegister(URL url) {
// 与zk一致,按服务组装key前缀
String key = toCategoryPath(url);
// 全服务路径作为value
String value = url.toFullString();
String expire = String.valueOf(System.currentTimeMillis() + expirePeriod);
boolean success = false;
RpcException exception = null;
for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
Pool<Jedis> jedisPool = entry.getValue();
try {
try (Jedis jedis = jedisPool.getResource()) {
// 使用hash存储提供者/消费者 标识,带过期时间(该时间需后续主动判定,redis并不维护该状态)
// 注册好自向标识后,pub一条消息,以便其他客户端可以sub感知到该服务
jedis.hset(key, value, expire);
jedis.publish(key, REGISTER);
success = true;
// 如果不是复制模式的redis 服务(即为failover模式),只需往一个redis写数据即可,
// 剩余redis自行同步实际上这里应该是存在数据一致性问题的
if (!replicate) {
break; //  If the server side has synchronized data, just write a single machine
}
}
} catch (Throwable t) {
exception = new RpcException("Failed to register service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
}
}
// 只要有一个成功,即算成功
if (exception != null) {
if (success) {
logger.warn(exception.getMessage(), exception);
} else {
throw exception;
}
}
}

  以hash类型存放所有提供者列表, key为服务粒度的前缀信息: /dubbo/xxx.xx.XxxService/providers, hash中每个field->value表示,服务全路径信息->过期时间。

  通过redis的 pub/sub 机制,通知其他客户端变化。注册时发布一条消息到提供者路径, publish <key> register 。

4. redis 消费者服务订阅

  服务注册的目的,主要是让注册中心及其他应用端可以发现自己。而服务订阅则为了让自己可以发现别的系统的变化。如查找所有提供者列表,接收应用上下线通知,开启监听等等。

    // org.apache.dubbo.registry.redis.RedisRegistry#doSubscribe
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
String service = toServicePath(url);
// 基于service开启订阅线程
Notifier notifier = notifiers.get(service);
if (notifier == null) {
// 主动开启一个 notifier 线程,进行subscribe处理
// 如果service很多,那就意味着有很多的此类线程,这并不是件好事
Notifier newNotifier = new Notifier(service);
notifiers.putIfAbsent(service, newNotifier);
notifier = notifiers.get(service);
if (notifier == newNotifier) {
notifier.start();
}
}
boolean success = false;
RpcException exception = null;
for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
Pool<Jedis> jedisPool = entry.getValue();
try {
try (Jedis jedis = jedisPool.getResource()) {
if (service.endsWith(ANY_VALUE)) {
admin = true;
Set<String> keys = jedis.keys(service);
if (CollectionUtils.isNotEmpty(keys)) {
Map<String, Set<String>> serviceKeys = new HashMap<>();
for (String key : keys) {
String serviceKey = toServicePath(key);
Set<String> sk = serviceKeys.computeIfAbsent(serviceKey, k -> new HashSet<>());
sk.add(key);
}
for (Set<String> sk : serviceKeys.values()) {
doNotify(jedis, sk, url, Collections.singletonList(listener));
}
}
} else {
// 首次订阅,使用 keys xx/* 将所有服务信息存储到本地
doNotify(jedis, jedis.keys(service + PATH_SEPARATOR + ANY_VALUE), url, Collections.singletonList(listener));
}
success = true;
break; // Just read one server's data
}
} catch (Throwable t) { // Try the next server
exception = new RpcException("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
}
}
if (exception != null) {
if (success) {
logger.warn(exception.getMessage(), exception);
} else {
throw exception;
}
}
}

  与zk的直接调用zkClient.addChildListener()实现订阅不同,redis中使用了多个独立的订阅线程,使用pub/sub机制进行处理。(因redis的pub/sub是基于channel进行的长连接通信,所以每个service只能使用单独的线程,有点伤!)。 使用 doNotify() 将redis中的数据接入应用中。在做订阅的同时,也拉取了提供者服务列表达到初始化的作用。

5. Redis 服务下线处理

  当应用要关闭,或者注册失败时,需要进行服务下线。当然,如果应用没有及时做下线处理,zk会通过其自身的临时节点过期机制,也会将该服务做下线处理。从而避免消费者或管理台看到无效的服务存在。

  应用服务的主动下线操作是由 ShutdownHookCallbacks 和在判断服务不可用时进行的 invoker.destroy() 来实现优雅下线。

    // org.apache.dubbo.registry.integration.RegistryDirectory#destroy
@Override
public void destroy() {
if (isDestroyed()) {
return;
} // unregister.
try {
if (getRegisteredConsumerUrl() != null && registry != null && registry.isAvailable()) {
registry.unregister(getRegisteredConsumerUrl());
}
} catch (Throwable t) {
logger.warn("unexpected error when unregister service " + serviceKey + "from registry" + registry.getUrl(), t);
}
// unsubscribe.
try {
if (getConsumerUrl() != null && registry != null && registry.isAvailable()) {
registry.unsubscribe(getConsumerUrl(), this);
}
ExtensionLoader.getExtensionLoader(GovernanceRuleRepository.class).getDefaultExtension()
.removeListener(ApplicationModel.getApplication(), CONSUMER_CONFIGURATION_LISTENER);
} catch (Throwable t) {
logger.warn("unexpected error when unsubscribe service " + serviceKey + "from registry" + registry.getUrl(), t);
}
super.destroy(); // must be executed after unsubscribing
try {
destroyAllInvokers();
} catch (Throwable t) {
logger.warn("Failed to destroy service " + serviceKey, t);
}
}
// org.apache.dubbo.registry.support.FailbackRegistry#unregister
@Override
public void unregister(URL url) {
super.unregister(url);
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {
// Sending a cancellation request to the server side
doUnregister(url);
} catch (Exception e) {
Throwable t = e; // If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to unregister " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to unregister " + url + ", waiting for retry, cause: " + t.getMessage(), t);
} // Record a failed registration request to a failed list, retry regularly
addFailedUnregistered(url);
}
}
// org.apache.dubbo.registry.redis.RedisRegistry#doUnregister
@Override
public void doUnregister(URL url) {
String key = toCategoryPath(url);
String value = url.toFullString();
RpcException exception = null;
boolean success = false;
for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
Pool<Jedis> jedisPool = entry.getValue();
try {
try (Jedis jedis = jedisPool.getResource()) {
// 直接删除当前服务对应的 key-field 信息
// 然后发布一条 UNREGISTER 消息,通知其他客户端
jedis.hdel(key, value);
jedis.publish(key, UNREGISTER);
success = true;
// 如果redis 是复制模型,需要在每个redis上都做一次删除
// 此时各应用端将会重复收到消息,重复处理,看起来并不是件好事
if (!replicate) {
break; //  If the server side has synchronized data, just write a single machine
}
}
} catch (Throwable t) {
exception = new RpcException("Failed to unregister service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
}
}
if (exception != null) {
if (success) {
logger.warn(exception.getMessage(), exception);
} else {
throw exception;
}
}
}

  总结: 下线处理两步骤: 1. 删除对应的hash key-field; 2. publish 一个下线消息通知其他应用; 3. 针对redis的集群配置决定是删除1次或n次,且反复通知操作;

6. redis 服务解除事件订阅

  事实上,redis的 doUnsubscribe, 已不再处理任何事件。

    @Override
public void doUnsubscribe(URL url, NotifyListener listener) {
}

  那么,前面注册的多个 Notifier 监听线程就不管了吗?那肯定是不行的,它会在 destroy() 被调用时进行收尾处理。实际上,它是 unregister() 的后续工作。

    // org.apache.dubbo.registry.support.AbstractRegistryFactory#destroyAll
/**
* Close all created registries
*/
public static void destroyAll() {
if (!destroyed.compareAndSet(false, true)) {
return;
} if (LOGGER.isInfoEnabled()) {
LOGGER.info("Close all registries " + getRegistries());
}
// Lock up the registry shutdown process
LOCK.lock();
try {
for (Registry registry : getRegistries()) {
try {
registry.destroy();
} catch (Throwable e) {
LOGGER.error(e.getMessage(), e);
}
}
REGISTRIES.clear();
} finally {
// Release the lock
LOCK.unlock();
}
}
// org.apache.dubbo.registry.redis.RedisRegistry#destroy
@Override
public void destroy() {
// 该方法甚至可以去调用 unregister(), unsubscribe() 方法
super.destroy();
try {
expireFuture.cancel(true);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
// 遍历所有 notifiers, 依次调用 shutdown, 即停止订阅工作
for (Notifier notifier : notifiers.values()) {
notifier.shutdown();
}
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
Pool<Jedis> jedisPool = entry.getValue();
try {
jedisPool.destroy();
} catch (Throwable t) {
logger.warn("Failed to destroy the redis registry client. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
}
}
// 最后优雅关闭过期扫描定时任务线程池,即 shutdown()..awaitTermination()的应用。
ExecutorUtil.gracefulShutdown(expireExecutor, expirePeriod);
}
// 停止notifier
// org.apache.dubbo.registry.redis.RedisRegistry.Notifier#shutdown
public void shutdown() {
try {
// step1. 设置停止标识
// step2. 断开redis连接,这不只是一断开的操作,它会停止psubscribe的调用,从而间接中止订阅线程工作
running = false;
jedis.disconnect();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
// 如下方法,即是其父类的 destroy(), 里面涵盖了未关闭的 地址信息,则会触发 unregister, unsubscribe
// org.apache.dubbo.registry.support.AbstractRegistry#destroy
@Override
public void destroy() {
if (logger.isInfoEnabled()) {
logger.info("Destroy registry:" + getUrl());
}
Set<URL> destroyRegistered = new HashSet<>(getRegistered());
// step1. unregister 未下线的服务
if (!destroyRegistered.isEmpty()) {
for (URL url : new HashSet<>(getRegistered())) {
if (url.getParameter(DYNAMIC_KEY, true)) {
try {
unregister(url);
if (logger.isInfoEnabled()) {
logger.info("Destroy unregister url " + url);
}
} catch (Throwable t) {
logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
}
}
}
}
// step2. unsubscribe 未取消订阅的服务
Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<>(getSubscribed());
if (!destroySubscribed.isEmpty()) {
for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
URL url = entry.getKey();
for (NotifyListener listener : entry.getValue()) {
try {
unsubscribe(url, listener);
if (logger.isInfoEnabled()) {
logger.info("Destroy unsubscribe url " + url);
}
} catch (Throwable t) {
logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
}
}
}
}
// step3. 从已注册列表中删除当前实例
AbstractRegistryFactory.removeDestroyedRegistry(this);
}
// org.apache.dubbo.registry.support.AbstractRegistryFactory#removeDestroyedRegistry
public static void removeDestroyedRegistry(Registry toRm) {
LOCK.lock();
try {
REGISTRIES.entrySet().removeIf(entry -> entry.getValue().equals(toRm));
} finally {
LOCK.unlock();
}
}

  总结:此处讲了更多unregister,unsubscribe的前置操作。而 notifier.shutdown(); 才是关闭redis订阅相关工作的关键。它是通过设置停止循环标识,以及关闭redis连接实现的。事实上,这各取消订阅方式并没有很优雅。

7. 服务心跳的维护处理

  redis本身只是一个缓存存储系统,心跳逻辑需要自行实现。实际上,我们也可以依赖于redis的自动过期机制,进行心跳续期。那么,redis注册中心是否也是这样实现的呢?好像并不是!

    // 在 RedisRegistry 的构造方法中,初始化了一个定时任务的调度
this.expireFuture = expireExecutor.scheduleWithFixedDelay(() -> {
try {
deferExpired(); // Extend the expiration time
} catch (Throwable t) { // Defensive fault tolerance
logger.error("Unexpected exception occur at defer expire time, cause: " + t.getMessage(), t);
}
}, expirePeriod / 2, expirePeriod / 2, TimeUnit.MILLISECONDS);
// org.apache.dubbo.registry.redis.RedisRegistry#deferExpired
private void deferExpired() {
for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
Pool<Jedis> jedisPool = entry.getValue();
try {
try (Jedis jedis = jedisPool.getResource()) {
// 取出所有注册了的服务,进行心跳更新
for (URL url : new HashSet<>(getRegistered())) {
if (url.getParameter(DYNAMIC_KEY, true)) {
String key = toCategoryPath(url);
// 增加过期时间+expirePeriod, url -> expireAt
if (jedis.hset(key, url.toFullString(), String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) {
// 如果是第一次新增该值,或者重新新增该值(可能由于原来的地址过期被删除),则触发一次regiter的消息发布,自会有相应订阅者处理该变更
jedis.publish(key, REGISTER);
}
}
}
// 如果是管理类配置,interface=*, 则会开启清理服务功能,注意此类操作会很重,将会消耗很大
// 该值会在subscribe()的时候置为 true
// 按文档说明该操作会在 监控中心执行,而非存在于应用端
if (admin) {
clean(jedis);
}
if (!replicate) {
break;//  If the server side has synchronized data, just write a single machine
}
}
} catch (Throwable t) {
logger.warn("Failed to write provider heartbeat to redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
}
}
}
// The monitoring center is responsible for deleting outdated dirty data
private void clean(Jedis jedis) {
// redis: keys * , 列举所有相关的key, 根据服务数量来定该值多少
Set<String> keys = jedis.keys(root + ANY_VALUE);
if (CollectionUtils.isNotEmpty(keys)) {
for (String key : keys) {
// redis: hgetall <key>
Map<String, String> values = jedis.hgetAll(key);
if (CollectionUtils.isNotEmptyMap(values)) {
boolean delete = false;
long now = System.currentTimeMillis();
for (Map.Entry<String, String> entry : values.entrySet()) {
URL url = URL.valueOf(entry.getKey());
// 根据hash中value 指定的时间,判定是否过期,如果过期则做删除操作
// redis: hdel <key> <field>
if (url.getParameter(DYNAMIC_KEY, true)) {
long expire = Long.parseLong(entry.getValue());
if (expire < now) {
jedis.hdel(key, entry.getKey());
delete = true;
if (logger.isWarnEnabled()) {
logger.warn("Delete expired key: " + key + " -> value: " + entry.getKey() + ", expire: " + new Date(expire) + ", now: " + new Date(now));
}
}
}
}
// 只要有一个服务被判定为过期,则订阅了该服务的客户端都应该被通知到
// 多个服务下线只会被通知一次
if (delete) {
jedis.publish(key, UNREGISTER);
}
}
}
}
}

  deferExpired() 的作用,就是维护本实例的所有服务的有效性,做续期作用。两个重量级操作: 1. 依次延期某service下的所有url的过期时间;2. 做全量清理过期服务url;keys xx* 的操作,也对redis提出了一些要求,因为有些redis出于安全限制可能会禁用keys命令。

8. 服务信息变更通知处理notify

  redis注册中心其实不会主动发现服务变更,只有应用自己发布regiter或unregister消息后,其他应用才能感知到变化。前面在 doRegister() 时,我看到,应用是通过hash添加字段注册自己,并同时发布 REGISTER 消息通知所有订阅者。在 doSubscribe() 时开启另一个服务线程处理subscribe();

    // org.apache.dubbo.registry.redis.RedisRegistry#doSubscribe
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
String service = toServicePath(url);
// 订阅是基于服务处理的,每个服务一个订阅处理线程
Notifier notifier = notifiers.get(service);
if (notifier == null) {
Notifier newNotifier = new Notifier(service);
notifiers.putIfAbsent(service, newNotifier);
notifier = notifiers.get(service);
// 此处应为防止并发所做的努力
if (notifier == newNotifier) {
notifier.start();
}
}
boolean success = false;
RpcException exception = null;
for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
Pool<Jedis> jedisPool = entry.getValue();
try {
try (Jedis jedis = jedisPool.getResource()) {
// 使用 /dubbo/* 代表是管理服务,其需要做清理过期key的作用
if (service.endsWith(ANY_VALUE)) {
admin = true;
...
} else {
// 使用 keys xxx/* 命令,列举出该服务下所有缓存key, 实际上就是 providers, consumers, configurators, routers
doNotify(jedis, jedis.keys(service + PATH_SEPARATOR + ANY_VALUE), url, Collections.singletonList(listener));
}
success = true;
break; // Just read one server's data
}
} catch (Throwable t) { // Try the next server
exception = new RpcException("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
}
}
if (exception != null) {
if (success) {
logger.warn(exception.getMessage(), exception);
} else {
throw exception;
}
}
}
// 根据列如上得到redis-key信息,做服务信息变更
private void doNotify(Jedis jedis, Collection<String> keys, URL url, Collection<NotifyListener> listeners) {
if (keys == null || keys.isEmpty()
|| listeners == null || listeners.isEmpty()) {
return;
}
long now = System.currentTimeMillis();
List<URL> result = new ArrayList<>();
List<String> categories = Arrays.asList(url.getParameter(CATEGORY_KEY, new String[0]));
String consumerService = url.getServiceInterface();
for (String key : keys) {
if (!ANY_VALUE.equals(consumerService)) {
// 截取出 service
String providerService = toServiceName(key);
if (!providerService.equals(consumerService)) {
continue;
}
}
String category = toCategoryName(key);
// consumers应用只会处理, providers,routers,configurators 的服务, 从而忽略 consumers 下的数据
if (!categories.contains(ANY_VALUE) && !categories.contains(category)) {
continue;
}
List<URL> urls = new ArrayList<>();
// 获取所有hash值
Map<String, String> values = jedis.hgetAll(key);
if (CollectionUtils.isNotEmptyMap(values)) {
for (Map.Entry<String, String> entry : values.entrySet()) {
URL u = URL.valueOf(entry.getKey());
// 判断服务是否过期,过期且存在的服务将不会被利用,但不会做更多处理
if (!u.getParameter(DYNAMIC_KEY, true)
|| Long.parseLong(entry.getValue()) >= now) {
if (UrlUtils.isMatch(url, u)) {
urls.add(u);
}
}
}
}
// 如果没有找到合适的可用服务,则添加一个 empty:// 的地址
if (urls.isEmpty()) {
urls.add(URLBuilder.from(url)
.setProtocol(EMPTY_PROTOCOL)
.setAddress(ANYHOST_VALUE)
.setPath(toServiceName(key))
.addParameter(CATEGORY_KEY, category)
.build());
}
result.addAll(urls);
if (logger.isInfoEnabled()) {
logger.info("redis notify: " + key + " = " + urls);
}
}
if (CollectionUtils.isEmpty(result)) {
return;
}
// 调用父类 FailbackRegistry.notify 方法,与zk调用一致了
// 刷新提供者列表,路由,配置等本地缓存信息
for (NotifyListener listener : listeners) {
notify(url, listener, result);
}
}
private String toServiceName(String categoryPath) {
// 截取root+interfaceName
// 截取 interfaceName
String servicePath = toServicePath(categoryPath);
return servicePath.startsWith(root) ? servicePath.substring(root.length()) : servicePath;
}
private String toServicePath(String categoryPath) {
int i;
// 排除root路径,找到第一个'/', 取出servicePath
if (categoryPath.startsWith(root)) {
i = categoryPath.indexOf(PATH_SEPARATOR, root.length());
} else {
i = categoryPath.indexOf(PATH_SEPARATOR);
}
return i > 0 ? categoryPath.substring(0, i) : categoryPath;
}
// 另外,对于某个服务发生变更时,需要遍历所有consumer, 确认是否需要刷新
// 额,意义嘛,暂是没太明白
private void doNotify(Jedis jedis, String key) {
for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<>(getSubscribed()).entrySet()) {
doNotify(jedis, Collections.singletonList(key), entry.getKey(), new HashSet<>(entry.getValue()));
}
}

  总结:

    1. redis 做初次subscribe时,notify会通过redis-keys 命令获取所有需要的key, 然后依次将其提供者、路由、配置等信息都缓存起来。
    2. 针对每个服务,都会开启相关的订阅线程Notifier处理订阅工作。
    3. 最终的listener处理默认会由 RegistryDirectory 处理。

  接下来,我们来看 Notifier 是如何处理订阅的?

        // org.apache.dubbo.registry.redis.RedisRegistry.Notifier#run
@Override
public void run() {
// 每个订阅线程,死循环处理只是为了避免网络等其他异常情况出现,以便重新尝试连接redis 订阅channel
while (running) {
try {
// 额,这是个优化,我不懂的
if (!isSkip()) {
try {
for (Map.Entry<String, Pool<Jedis>> entry : jedisPools.entrySet()) {
Pool<Jedis> jedisPool = entry.getValue();
try {
if (jedisPool.isClosed()) {
continue;
}
jedis = jedisPool.getResource();
if (!jedis.isConnected()) {
continue;
}
try {
if (service.endsWith(ANY_VALUE)) {
if (first) {
first = false;
Set<String> keys = jedis.keys(service);
if (CollectionUtils.isNotEmpty(keys)) {
for (String s : keys) {
doNotify(jedis, s);
}
}
resetSkip();
}
jedis.psubscribe(new NotifySub(jedisPool), service); // blocking
} else {
if (first) {
// 首次处理,通知RegistryDirectory 按service刷新缓存
first = false;
doNotify(jedis, service);
resetSkip();
}
// 使用 psubscribe channel 命令,阻塞监听channel信息
// 当消息返回时,使用 NotifySub 进行业务处理,实际就是调用 doNotify() 的过程
// 订阅的channel 为: /dubbo/xxx.xx.XxxService/*
jedis.psubscribe(new NotifySub(jedisPool), service + PATH_SEPARATOR + ANY_VALUE); // blocking
}
break;
} finally {
jedis.close();
}
} catch (Throwable t) { // Retry another server
logger.warn("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
// If you only have a single redis, you need to take a rest to avoid overtaking a lot of CPU resources
sleep(reconnectPeriod);
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
// 异常发生后,sleep片刻再重试
sleep(reconnectPeriod);
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
}
}
// org.apache.dubbo.registry.redis.RedisRegistry.NotifySub#onMessage
@Override
public void onMessage(String key, String msg) {
if (logger.isInfoEnabled()) {
logger.info("redis event: " + key + " = " + msg);
}
// 只关注 REGISTER / UNREGISTER, 两个消息
if (msg.equals(REGISTER)
|| msg.equals(UNREGISTER)) {
try {
Jedis jedis = jedisPool.getResource();
try {
// 复用 doNotify
doNotify(jedis, key);
} finally {
jedis.close();
}
} catch (Throwable t) { // TODO Notification failure does not restore mechanism guarantee
logger.error(t.getMessage(), t);
}
}
}
// 最后还是来看下 isSkip() 的小优化吧
// 虽然不懂为什么,但是感觉很厉害的样子
// org.apache.dubbo.registry.redis.RedisRegistry.Notifier#isSkip
private boolean isSkip() {
// connectSkip: 已经跳过连接的总次数, connectSkipped: 当前周期内已跳过连接的次数
// step1. 在connectSkip < 10 情况下,直接用 connectSkipped 与其比较,connectSkipped<connectSkip, 则继续跳过本次,否则不跳过,进入连接逻辑connectSkipped, connectSkip次数增加
// step2. connectSkip >= 10, 不可再用其作为判定跳过次数, 使用一个10-20间的随机值,作为跳过连接次数判定
// step3. 如果本次判定为不跳过,则重置 connectSkipped已连接次数自增
int skip = connectSkip.get(); // Growth of skipping times
if (skip >= 10) { // If the number of skipping times increases by more than 10, take the random number
if (connectRandom == 0) {
connectRandom = ThreadLocalRandom.current().nextInt(10);
}
skip = 10 + connectRandom;
}
if (connectSkipped.getAndIncrement() < skip) { // Check the number of skipping times
return true;
}
connectSkip.incrementAndGet();
connectSkipped.set(0);
connectRandom = 0;
return false;
}

  监听服务就做好一件事就行,调用 psubscribe命令订阅channel, 发生变化时调用 doNotify() 回调listener处理刷新。为避免异常情况下订阅功能仍然成立,使用外部的while循环包裹订阅逻辑重试。

  注意其订阅的redis channel 为 /dubbo/xxx.xx.XxxService/*, 所以相当于其自身的变更也被包含在内了。而是否要处理该事件,则依赖于url中的categorys配置,如消费为:category=providers,configurators,routers, 即它会处理这三种类型的key变更。

9. 一点感想

  dubbo用redis做注册中心,可以看作是一个简单的扩展实现。其核心是基于redis的 pub/sub 能力。

  但和zk比起来,redis功能实现会相对困难些,甚至看起来有些蹩脚(如其redis集群策略需要自行从外部保证同步,这恐怕不会是件容易的事,现有的主从,集群方案都完全无法cover其场景。既要保证任意写,又要保证全同步(数据一致性),呵呵)。它需要单独去维护一些心跳、过期类的事务。过多的服务会导致这类工作更加繁重。

  但这也许不能成为大家拒绝应用的理由,毕竟,按官方说明阿里内部是基于数据库实现的注册中心,自然有其道理。

(事实上,redis版本的注册中心,并非是完全优化的,你完全可以顺手美化下再使用)