zookeeper动态通知实现

时间:2022-08-30 14:14:21

转载请注明:@ni掌柜

    本文重点围绕ZooKeeper的Watcher,介绍通知的状态类型和事件类型,以及这些事件通知的触发条件。

 

1、浅谈Watcher接口

在ZooKeeper中,接口类Watcher定义了事件通知相关的逻辑,包含了KeeperState和EventType两个枚举类,分别代表通知状态和事件类型。还有一个比较重要的接口方法:

 
 
  1. abstract public void process(WatchedEvent event); 

这个方法用于处理事件通知,每个实现类都应该自己实现合适的处理逻辑。参数WatchedEvent类封装了上面提到的两个枚举类,以及触发事件对应的ZK节点path,当然,这个path不一定每次通知都有,例如会话建立,会话失效或连接断开等通知类型,就不是针对某一个单独path的。

2、如何注册Watcher上面已经提到,Watcher接口已经提供了基本的回调方法用于处理来自服务器的通知。因此,我们只要在合适的地方实现这个接口,并传给服务器即可。下面来看看哪些是合适的地方: A、构造方法
  
  
  1. ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) 
上面这个是ZooKeeper的一个构造方法,与ZK创建连接的时候会用到这个。这里我们重点关注第三个参数:Watcher,很显然在,这个就是一个注册Watcher的地方,传入的参数就是开发者自己Watcher接口实现。需要注意的是,这个地方注册的Watcher实现,会成为当前ZK会话的默认Watcher实现。也就是说,其它地方如果也想注册一个Watcher,那么是可以默认使用这个实现的。具体下面会涉及到。B、API的读写接口中
  
  
  1. public Stat exists(String path, boolean watch)throws KeeperException, InterruptedException 
  2.  
  3. public List<String> getChildren(String path, boolean watch)throws KeeperException,InterruptedException 
  4.  
  5. public byte[] getData(String path,boolean watch,Stat stat)throws KeeperException,InterruptedException 
  6.  
  7. public void register(Watcher watcher) 

 

 

3、通知的状态类型与事件类型

在Watcher接口类中,已经定义了所有的状态类型和事件类型,这里把各个状态和事件类型之间的关系整理一下。

3.1状态:KeeperState.Disconnected(0)

此时客户端处于断开连接状态,和ZK集群都没有建立连接。

3.1.1事件:EventType.None(-1)

触发条件:一般是在与服务器断开连接的时候,客户端会收到这个事件。

 

3.2状态:KeeperState. SyncConnected(3)

3.2.1事件:EventType.None(-1)

触发条件:客户端与服务器成功建立会话之后,会收到这个通知。

3.2.2事件:EventType. NodeCreated (1)

触发条件:所关注的节点被创建。

3.2.3事件:EventType. NodeDeleted (2)

触发条件:所关注的节点被删除。

3.2.4事件:EventType. NodeDataChanged (3)

触发条件:所关注的节点的内容有更新。注意,这个地方说的内容是指数据的版本号dataVersion。因此,即使使用相同的数据内容来更新,还是会收到这个事件通知的。无论如何,调用了更新接口,就一定会更新dataVersion的。

3.2.5事件:EventType. NodeChildrenChanged (4)

触发条件:所关注的节点的子节点有变化。这里说的变化是指子节点的个数和组成,具体到子节点内容的变化是不会通知的。

 

3.3状态 KeeperState. AuthFailed(4)

3.3.1事件:EventType.None(-1)

 

3.4状态 KeeperState. Expired(-112)

3.4.1事件:EventType.None(-1)

 

具体代码如下:

package com.taobao.taokeeper.research.watcher;
import java.util.List;import java.util.concurrent.CountDownLatch;import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.PropertyConfigurator;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.Watcher.Event.EventType;import org.apache.zookeeper.Watcher.Event.KeeperState;import org.apache.zookeeper.ZooDefs.Ids;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.data.Stat;import org.slf4j.Logger;import org.slf4j.LoggerFactory;
import common.toolkit.java.util.ObjectUtil;import common.toolkit.java.util.ThreadUtil;
/** * 《ZooKeeper 事件类型详解》 * @author nileader/nileader@gmail.com * */public class AllZooKeeperWatcher implements Watcher{
private static final Logger LOG = LoggerFactory.getLogger( NodeDataChangedEvent.class );AtomicInteger seq = new AtomicInteger();private static final int SESSION_TIMEOUT = 10000;private static final String CONNECTION_STRING = "test.zookeeper.connection_string:2181," + "test.zookeeper.connection_string2:2181," + "test.zookeeper.connection_string3:2181";private static final String ZK_PATH = "/nileader";private static final String CHILDREN_PATH = "/nileader/ch";private static final String LOG_PREFIX_OF_MAIN = "【Main】";
private ZooKeeper zk = null;
private CountDownLatch connectedSemaphore = new CountDownLatch( 1 );
/** * 创建ZK连接 * @param connectString ZK服务器地址列表 * @param sessionTimeout Session超时时间 */public void createConnection( String connectString, int sessionTimeout ) {this.releaseConnection();try {zk = new ZooKeeper( connectString, sessionTimeout,this );LOG.info( LOG_PREFIX_OF_MAIN + "开始连接ZK服务器" );connectedSemaphore.await();} catch ( Exception e ) {}}
/** * 关闭ZK连接 */public void releaseConnection() {if ( !ObjectUtil.isBlank( this.zk ) ) {try {this.zk.close();} catch ( InterruptedException e ) {}}}
/** * 创建节点 * @param path 节点path * @param data 初始数据内容 * @return */public boolean createPath( String path, String data ) {try {this.zk.exists( path, true );LOG.info( LOG_PREFIX_OF_MAIN + "节点创建成功, Path: "+ this.zk.create( path, // data.getBytes(), // Ids.OPEN_ACL_UNSAFE, // CreateMode.PERSISTENT )+ ", content: " + data );} catch ( Exception e ) {}return true;}
/** * 读取指定节点数据内容 * @param path 节点path * @return */public String readData( String path, boolean needWatch ) {try {return new String( this.zk.getData( path, needWatch, null ) );} catch ( Exception e ) {return "";}}
/** * 更新指定节点数据内容 * @param path 节点path * @param data 数据内容 * @return */public boolean writeData( String path, String data ) {try {LOG.info( LOG_PREFIX_OF_MAIN + "更新数据成功,path:" + path + ", stat: " + this.zk.setData( path, data.getBytes(), -1 ) );} catch ( Exception e ) {}return false;}
/** * 删除指定节点 * @param path 节点path */public void deleteNode( String path ) {try {this.zk.delete( path, -1 );LOG.info( LOG_PREFIX_OF_MAIN + "删除节点成功,path:" + path );} catch ( Exception e ) {//TODO}}
/** * 删除指定节点 * @param path 节点path */public Stat exists( String path, boolean needWatch ) {try {return this.zk.exists( path, needWatch );} catch ( Exception e ) {return null;}}
/** * 获取子节点 * @param path 节点path */private List<String> getChildren( String path, boolean needWatch ) {try {return this.zk.getChildren( path, needWatch );} catch ( Exception e ) {return null;}}
public void deleteAllTestPath(){this.deleteNode( CHILDREN_PATH );this.deleteNode( ZK_PATH );}

public static void main( String[] args ) {
PropertyConfigurator.configure("src/main/resources/log4j.properties");
AllZooKeeperWatcher sample = new AllZooKeeperWatcher();sample.createConnection( CONNECTION_STRING, SESSION_TIMEOUT );//清理节点sample.deleteAllTestPath();if ( sample.createPath( ZK_PATH, System.currentTimeMillis()+"" ) ) {ThreadUtil.sleep( 3000 );//读取数据sample.readData( ZK_PATH, true );//读取子节点sample.getChildren( ZK_PATH, true );
//更新数据sample.writeData( ZK_PATH, System.currentTimeMillis()+"" );ThreadUtil.sleep( 3000 );//创建子节点sample.createPath( CHILDREN_PATH, System.currentTimeMillis()+"" );}ThreadUtil.sleep( 3000 );//清理节点sample.deleteAllTestPath();ThreadUtil.sleep( 3000 );sample.releaseConnection();}

/** * 收到来自Server的Watcher通知后的处理。 */@Overridepublic void process( WatchedEvent event ) {
ThreadUtil.sleep( 200 );if ( ObjectUtil.isBlank( event ) ) {return;}// 连接状态KeeperState keeperState = event.getState();// 事件类型EventType eventType = event.getType();// 受影响的pathString path = event.getPath();String logPrefix = "【Watcher-" + this.seq.incrementAndGet() + "】";
LOG.info( logPrefix + "收到Watcher通知" );LOG.info( logPrefix + "连接状态:\t" + keeperState.toString() );LOG.info( logPrefix + "事件类型:\t" + eventType.toString() );
if ( KeeperState.SyncConnected == keeperState ) {// 成功连接上ZK服务器if ( EventType.None == eventType ) {LOG.info( logPrefix + "成功连接上ZK服务器" );connectedSemaphore.countDown();} else if ( EventType.NodeCreated == eventType ) {LOG.info( logPrefix + "节点创建" );this.exists( path, true );} else if ( EventType.NodeDataChanged == eventType ) {LOG.info( logPrefix + "节点数据更新" );LOG.info( logPrefix + "数据内容: " + this.readData( ZK_PATH, true ) );} else if ( EventType.NodeChildrenChanged == eventType ) {LOG.info( logPrefix + "子节点变更" );LOG.info( logPrefix + "子节点列表:" + this.getChildren( ZK_PATH, true ) );} else if ( EventType.NodeDeleted == eventType ) {LOG.info( logPrefix + "节点 " + path + " 被删除" );}
} else if ( KeeperState.Disconnected == keeperState ) {LOG.info( logPrefix + "与ZK服务器断开连接" );} else if ( KeeperState.AuthFailed == keeperState ) {LOG.info( logPrefix + "权限检查失败" );} else if ( KeeperState.Expired == keeperState ) {LOG.info( logPrefix + "会话失效" );}
LOG.info( "--------------------------------------------" );
}
}



 

4、程序实例

这里有一个可以用来演示“触发事件通知”和“如何处理这些事件通知”的程序AllZooKeeperWatcher.java。

在这里:https://github.com/alibaba/taokeeper/blob/master/taokeeper-research/src/main/java/com/taobao/taokeeper/research/watcher/AllZooKeeperWatcher.java

运行结果如下:

 
 
  1. 2012-08-05 06:35:23,779 - 【Main】开始连接ZK服务器 
  2. 2012-08-05 06:35:24,196 - 【Watcher-1】收到Watcher通知 
  3. 2012-08-05 06:35:24,196 - 【Watcher-1】连接状态:  SyncConnected 
  4. 2012-08-05 06:35:24,196 - 【Watcher-1】事件类型:  None 
  5. 2012-08-05 06:35:24,196 - 【Watcher-1】成功连接上ZK服务器 
  6. 2012-08-05 06:35:24,196 - -------------------------------------------- 
  7. 2012-08-05 06:35:24,354 - 【Main】节点创建成功, Path: /nileader, content: 1353337464279 
  8. 2012-08-05 06:35:24,554 - 【Watcher-2】收到Watcher通知 
  9. 2012-08-05 06:35:24,554 - 【Watcher-2】连接状态:  SyncConnected 
  10. 2012-08-05 06:35:24,554 - 【Watcher-2】事件类型:  NodeCreated 
  11. 2012-08-05 06:35:24,554 - 【Watcher-2】节点创建 
  12. 2012-08-05 06:35:24,582 - -------------------------------------------- 
  13. 2012-08-05 06:35:27,471 - 【Main】更新数据成功,path:/nileader,  
  14.  
  15. 2012-08-05 06:35:27,667 - 【Watcher-3】收到Watcher通知 
  16. 2012-08-05 06:35:27,667 - 【Watcher-3】连接状态:  SyncConnected 
  17. 2012-08-05 06:35:27,667 - 【Watcher-3】事件类型:  NodeDataChanged 
  18. 2012-08-05 06:35:27,667 - 【Watcher-3】节点数据更新 
  19. 2012-08-05 06:35:27,696 - 【Watcher-3】数据内容: 1353337467434 
  20. 2012-08-05 06:35:27,696 - -------------------------------------------- 
  21. 2012-08-05 06:35:30,534 - 【Main】节点创建成功, Path: /nileader/ch, content: 1353337470471 
  22. 2012-08-05 06:35:30,728 - 【Watcher-4】收到Watcher通知 
  23. 2012-08-05 06:35:30,728 - 【Watcher-4】连接状态:  SyncConnected 
  24. 2012-08-05 06:35:30,728 - 【Watcher-4】事件类型:  NodeCreated 
  25. 2012-08-05 06:35:30,728 - 【Watcher-4】节点创建 
  26. 2012-08-05 06:35:30,758 - -------------------------------------------- 
  27. 2012-08-05 06:35:30,958 - 【Watcher-5】收到Watcher通知 
  28. 2012-08-05 06:35:30,958 - 【Watcher-5】连接状态:  SyncConnected 
  29. 2012-08-05 06:35:30,958 - 【Watcher-5】事件类型:  NodeChildrenChanged 
  30. 2012-08-05 06:35:30,958 - 【Watcher-5】子节点变更 
  31. 2012-08-05 06:35:30,993 - 【Watcher-5】子节点列表:[ch] 
  32. 2012-08-05 06:35:30,993 - -------------------------------------------- 
  33. 2012-08-05 06:35:33,618 - 【Main】删除节点成功,path:/nileader/ch 
  34. 2012-08-05 06:35:33,756 - 【Main】删除节点成功,path:/nileader 
  35. 2012-08-05 06:35:33,817 - 【Watcher-6】收到Watcher通知 
  36. 2012-08-05 06:35:33,817 - 【Watcher-6】连接状态:  SyncConnected 
  37. 2012-08-05 06:35:33,817 - 【Watcher-6】事件类型:  NodeDeleted 
  38. 2012-08-05 06:35:33,817 - 【Watcher-6】节点 /nileader/ch 被删除 
  39. 2012-08-05 06:35:33,817 - -------------------------------------------- 
  40. 2012-08-05 06:35:34,017 - 【Watcher-7】收到Watcher通知 
  41. 2012-08-05 06:35:34,017 - 【Watcher-7】连接状态:  SyncConnected 
  42. 2012-08-05 06:35:34,017 - 【Watcher-7】事件类型:  NodeChildrenChanged 
  43. 2012-08-05 06:35:34,017 - 【Watcher-7】子节点变更 
  44. 2012-08-05 06:35:34,109 - 【Watcher-7】子节点列表:null 
  45. 2012-08-05 06:35:34,109 - -------------------------------------------- 
  46. 2012-08-05 06:35:34,309 - 【Watcher-8】收到Watcher通知 
  47. 2012-08-05 06:35:34,309 - 【Watcher-8】连接状态:  SyncConnected 
  48. 2012-08-05 06:35:34,309 - 【Watcher-8】事件类型:  NodeDeleted 
  49. 2012-08-05 06:35:34,309 - 【Watcher-8】节点 /nileader 被删除 
  50. 2012-08-05 06:35:34,309 - --------------------------------------------