zkClient的使用

时间:2023-03-09 20:23:34
zkClient的使用

ZKClient是由DataMeer的工程师StefanGroschupf和Peter Voss 一起开发的,在源生API接口基础上进行了封装,简化了ZK的复杂性。

1、 创建客户端方法:ZKClient(Arguments)

参数1:zkServers zookeeper服务器的地址,用“,”分割

参数2:session Timeout超时会话,为毫秒,默认为30 000ms

参数3:connection Timeout 连接超时会话

参数4:IZKConnection接口实现类

参数5:zkSerializer自定义序列化实现、

2、创建节点方法:

create、createEphemeral 、createEphemeralSequential、

createPersistent、createPersistentSequential

参数1: path 路径

参数2: data 数据内容 可以为null

参数3:mode 节点类型,为一个枚举类型,4种形式

参数4: acl策略

参数5: callback回调函数

参数6:context 上下文对象

参数7: createParents 是否创建父节点

3、删除节点方法delete deleteRecursive

参数1:path路径

参数2:callback 回调函数

参数3:context 上下文对象

4、读取子节点数据方法:getChildren

参数1:path路径

5、读取节点数据方法:readData

参数1:path路径

参数2:returnNUllLfPathNotExists(避免为空节点抛出异常,直接返回null)

参数3:节点状态

6、更新数据方法writeData

参数1:path 路径

参数2:data数据信息

参数3:version版本号

 package bjsxt.zkclient.base;

 import java.util.List;
import java.util.concurrent.TimeUnit; import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection; public class ZkClientBase { /** zookeeper地址 */
static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181";
/** session超时时间 */
static final int SESSION_OUTTIME = 5000;//ms public static void main(String[] args) throws Exception {
ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), 5000);
//1. create and delete方法
zkc.createEphemeral("/temp");
zkc.createPersistent("/super/c1", true);
Thread.sleep(10000);
zkc.delete("/temp");
zkc.deleteRecursive("/super"); //2. 设置path和data 并且读取子节点和每个节点的内容
// zkc.createPersistent("/super", "1234");
// zkc.createPersistent("/super/c1", "c1内容");
// zkc.createPersistent("/super/c2", "c2内容");
// List<String> list = zkc.getChildren("/super");
// for(String p : list){
// System.out.println(p);
// String rp = "/super/" + p;
// String data = zkc.readData(rp);
// System.out.println("节点为:" + rp + ",内容为: " + data);
// } //3. 更新和判断节点是否存在
// zkc.writeData("/super/c1", "新内容");
// System.out.println(zkc.readData("/super/c1"));
// System.out.println(zkc.exists("/super/c1")); //4.递归删除/super内容
// zkc.deleteRecursive("/super");
}
}

7、检测节点是否存在方法 exists

参数1:path

我们发现。上述ZkClient里面并没有类似的watcher,watch参数,这也就是说我们开发人员无需关心反复注册Watcher的问题,ZkClient给我们提供了一套监听方式,我们可以使用监听节点的方式进行操作,剔除了繁琐的反复wather操作,简化了代码复杂程度、

8、subscribeChildChanges方法

参数1:path路径

参数2:实现了lZkChildListener接口的类(如:实例化lZkChildListener类)

只需要重写其handleChildChanges(String parentPath,List<String> currentChilds)方法,其中参数parentPath 为所监听节点全路径,

currentChilds为最新的子节点列表(相对路径)

lZkChildListener事件说明针对于下面三个事件触发:

新增子节点,减少子节点,删除节点

lZkChildListener有以下特点:

1、客户端可以对一个不存在的节点进行变更的监听

2、一旦客户端对一个节点注册了子节点列表变更监听后,那么当前节点的子节点列表发送变更的时候,服务器端都会通知客户端,并将最新的子节点列表发给客户端

3、该节点本身创建或删除也会通知到客户端

4、这个监听一直存在,不是单次监听,比元素API简单

 package bjsxt.zkclient.watcher;

 import java.util.List;

 import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection; public class ZkClientWatcher1 { /** zookeeper地址 */
static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181";
/** session超时时间 */
static final int SESSION_OUTTIME = 5000;//ms public static void main(String[] args) throws Exception {
ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), 5000); //对父节点添加监听子节点变化。
zkc.subscribeChildChanges("/super", new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println("parentPath: " + parentPath);
System.out.println("currentChilds: " + currentChilds);
}
}); Thread.sleep(3000); zkc.createPersistent("/super");
Thread.sleep(1000); zkc.createPersistent("/super" + "/" + "c1", "c1内容");
Thread.sleep(1000); zkc.createPersistent("/super" + "/" + "c2", "c2内容");
Thread.sleep(1000); zkc.delete("/super/c2");
Thread.sleep(1000); zkc.deleteRecursive("/super");
Thread.sleep(Integer.MAX_VALUE); }
}
package bjsxt.zkclient.watcher;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection; public class ZkClientWatcher2 { /** zookeeper地址 */
static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181";
/** session超时时间 */
static final int SESSION_OUTTIME = 5000;//ms public static void main(String[] args) throws Exception {
ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), 5000); zkc.createPersistent("/super", "1234"); //对父节点添加监听子节点变化。
zkc.subscribeDataChanges("/super", new IZkDataListener() {
@Override
public void handleDataDeleted(String path) throws Exception {
System.out.println("删除的节点为:" + path);
} @Override
public void handleDataChange(String path, Object data) throws Exception {
System.out.println("变更的节点为:" + path + ", 变更内容为:" + data);
}
}); Thread.sleep(3000);
zkc.writeData("/super", "456", -1);
Thread.sleep(1000); zkc.delete("/super");
Thread.sleep(Integer.MAX_VALUE); }
}