Java操作zookeeper

时间:2022-09-07 18:46:30

Java操作zookeeper总共有三种方式:

1.原生的Java API

2.zkclient

3.curator

第一种实现代码:

pom.xml

<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.8</version>
</dependency>

示例的java代码如下:

package zook;

import java.io.IOException;
import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs; public class App { public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
String connStr = "192.168.126.128:2181";
CountDownLatch countDown = new CountDownLatch(1); Watcher watcher=new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.SyncConnected) {
System.err.println("eventType:"+event.getType());
if(event.getType()==Event.EventType.None){
countDown.countDown();
}else if(event.getType()==Event.EventType.NodeCreated){
System.out.println("listen:节点创建");
}else if(event.getType()==Event.EventType.NodeChildrenChanged){
System.out.println("listen:子节点修改");
}
}
}
}; ZooKeeper zookeeper = new ZooKeeper(connStr, 5000,watcher );
countDown.await(); //注册监听,每次都要重新注册,否则监听不到
zookeeper.exists("/top/jinyong", watcher); // 创建节点
String result = zookeeper.create("/top/jinyong", "一生一世".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(result); Thread.sleep(10); // 获取节点
byte[] bs = zookeeper.getData("/top/jinyong", true, null);
result = new String(bs);
System.out.println("创建节点后的数据是:" + result); // 修改节点
zookeeper.setData("/top/jinyong", "I love you".getBytes(), -1); Thread.sleep(10); bs = zookeeper.getData("/top/jinyong", true, null);
result = new String(bs);
System.out.println("修改节点后的数据是:" + result); // 删除节点
zookeeper.delete("/top/jinyong", -1);
System.out.println("节点删除成功");
} }

说明:

1.会话连接是异步的,需要自己去处理。此处用的CountDownLatch

2.Watch需要重复注册,不然就不能生效,比如开始的zookeeper.exists("/top/jinyong", watcher);就是为了注册监听

3.开发的复杂性还是比较高的

4.不支持多节点删除和创建。需要自己去递归。后面有一个关于递归的示例。

第二种实现:

pom.xml

<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>

示例的Java代码如下:

package zook;

import java.util.List;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher.Event.KeeperState; public class Client { public static void main(String[] args) throws InterruptedException {
String connStr = "192.168.126.128:2181";
ZkClient zk = new ZkClient(connStr); // 注册【数据】事件
zk.subscribeDataChanges("/top/zhuzhu", new IZkDataListener() { @Override
public void handleDataDeleted(String arg0) throws Exception {
System.err.println("数据删除:" + arg0); } @Override
public void handleDataChange(String arg0, Object arg1) throws Exception {
System.err.println("数据修改:" + arg0 + "------" + arg1); }
}); zk.subscribeChildChanges("/top", new IZkChildListener() { @Override
public void handleChildChange(String arg0, List<String> arg1) throws Exception {
System.err.println("子节点发生变化:" + arg0);
arg1.forEach(f -> {
System.out.println("content:" + f);
});
}
}); List<String> list = zk.getChildren("/");
list.forEach(e -> {
System.out.println(e);
}); String res = zk.create("/top/zhuzhu", "I love you", CreateMode.PERSISTENT);
System.out.println("创建节点/top/zhuzhu成功:" + res); zk.writeData("/top/zhuzhu", "forerver");
System.out.println("修改节点/top/zhuzhu数据成功"); res = zk.readData("/top/zhuzhu");
System.out.println("节点数据:" + res); Thread.sleep(1000); zk.delete("/top/zhuzhu");
System.out.println("删除节点/top/zhuzhu成功"); Thread.sleep(1000); System.out.println("------------------------------------------------"); for (int i = 0; i < 10; i++) {
zk.create("/top/zhuzhu", "I love you", CreateMode.PERSISTENT);
Thread.sleep(1000);
zk.delete("/top/zhuzhu");
Thread.sleep(1000);
} } }

说明:

1.subscribe开头的为注册监听的一些方法

2.addAuthInfo和setAcl为权限相关控制

3.普通使用这种方式还是值得推荐的

第三种实现:

pom.xml

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.11.0</version>
</dependency>

示例的Java代码如下:

package zook;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat; public class Curator { public static void main(String[] args) throws Exception {
String connStr = "192.168.23.24:2181";
CuratorFramework cur=CuratorFrameworkFactory.builder()
.connectString(connStr)
.connectionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000,3))
.build();
cur.start();//连接 //创建监听
PathChildrenCache cache=new PathChildrenCache(cur,"/top",true);
cache.start();
cache.rebuild();
cache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework framwork, PathChildrenCacheEvent event) throws Exception {
System.err.println("节点发生变化:"+event.getType());
}
}); Stat stat=cur.checkExists().forPath("/top/zhuzhu");
if(stat!=null){
System.out.println("【/top/zhuzhu】节点存在,直接删除");
cur.delete().forPath("/top/zhuzhu");
}
cur.delete().forPath("/top/zhuzhu"); System.in.read(); System.out.println("准备创建【/top/zhuzhu】");
cur.create().withMode(CreateMode.PERSISTENT)
.forPath("/top/zhuzhu", "love forever".getBytes());
System.out.println("节点【/top/zhuzhu】创建成功"); Thread.sleep(1000); byte[] bs=cur.getData().forPath("/top/zhuzhu");
System.out.println("数据:"+new String(bs)); Thread.sleep(1000); cur.delete().forPath("/top/zhuzhu"); Thread.sleep(1000); } /**
* 三种watcher来做节点的监听
* pathcache 监视一个路径下子节点的创建、删除、节点数据更新
* NodeCache 监视一个节点的创建、更新、删除
* TreeCache pathcaceh+nodecache 的合体(监视路径下的创建、更新、删除事件),
* 缓存路径下的所有子节点的数据
*/ public static void main1(String[] args) throws Exception {
String connStr = "192.168.23.24:2181";
CuratorFramework curatorFramework=CuratorFrameworkFactory.builder()
.connectString(connStr)
.connectionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000,3))
.build();
curatorFramework.start(); /**
* 节点变化NodeCache
*/
/* NodeCache cache=new NodeCache(curatorFramework,"/curator",false);
cache.start(true); cache.getListenable().addListener(()-> System.out.println("节点数据发生变化,变化后的结果" +
":"+new String(cache.getCurrentData().getData()))); curatorFramework.setData().forPath("/curator","菲菲".getBytes());*/ /**
* PatchChildrenCache
*/ PathChildrenCache cache=new PathChildrenCache(curatorFramework,"/event",true);
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
cache.rebuild();
// Normal / BUILD_INITIAL_CACHE /POST_INITIALIZED_EVENT cache.getListenable().addListener((curatorFramework1,pathChildrenCacheEvent)->{
switch (pathChildrenCacheEvent.getType()){
case CHILD_ADDED:
System.out.println("增加子节点");
break;
case CHILD_REMOVED:
System.out.println("删除子节点");
break;
case CHILD_UPDATED:
System.out.println("更新子节点");
break;
default:break;
}
}); // curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/event","event".getBytes());
// TimeUnit.SECONDS.sleep(1);
// System.out.println("1");
// curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/event/event1","1".getBytes());
// TimeUnit.SECONDS.sleep(1);
// System.out.println("2");
//
// curatorFramework.setData().forPath("/event/event1","222".getBytes());
// TimeUnit.SECONDS.sleep(1);
// System.out.println("3"); curatorFramework.delete().forPath("/event/event1");
System.out.println("4"); System.in.read(); } }

说明:

1.支持事务

2.支持Flush写法

3.开始测试多次程序启动就执行删除节点,而监听的结果确实新增,后来加了cache.rebuild();代码就没问题了。跟源码,在cache.start()里面有一个构造函数也是调用了rebuild方法的。

4.功能还是比较强大的。高级功能都会用到这种方式

最后贴一个原生API的递归操作方式:

package zook;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs; public class ZookManager {
ZooKeeper zookeeper = null; public ZookManager(String connStr) throws IOException, InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
zookeeper = new ZooKeeper(connStr, 5000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == EventType.None) {
if (event.getState() == KeeperState.SyncConnected) {
latch.countDown();
} else {
System.out.println("连接失败.");
latch.countDown();
}
}
} });
latch.await();
} /** 创建节点,不存在父节点将新增,如果节点已经存在将抛出异常 **/
public String create(String path, String val) throws KeeperException, InterruptedException {
if (!checkPath(path)) {
return "";
} String p = getParentPath(path);
cycleCreate(p); String url = zookeeper.create(path, val.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
return url;
} /** 设置节点的数据,如果节点不存在将新增该节点 **/
public Stat setData(String path, String val) throws KeeperException, InterruptedException {
if (!checkPath(path)) {
return null;
} cycleCreate(path);
return zookeeper.setData(path, val.getBytes(), -1);
} /** 删除节点,如果存在子节点将递归删除
* @throws InterruptedException
* @throws KeeperException **/
public void delete(String path) throws KeeperException, InterruptedException {
if (!checkPath(path)) {
return;
} List<String> chidren = zookeeper.getChildren(path, false);
for (String p : chidren) {
delete(path + "/" + p);
}
zookeeper.delete(path, -1);
} private void cycleCreate(String path) throws KeeperException, InterruptedException {
Stat stat = zookeeper.exists(path, null);
if (stat == null) {
String p = getParentPath(path);
cycleCreate(p);// 递归
// 创建
zookeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} /**
* 检查目录是否正确
* @param path
* @return
*/
private boolean checkPath(String path) {
if (!path.startsWith("/")) {
System.err.println("路径必须以/开头:" + path);
return false;
}
if (path.endsWith("/")) {
System.err.println("路径不能以/结尾:" + path);
return false;
}
if (path.contains("//")) {
System.err.println("路径格式不对,存在连续的/:" + path);
return false;
}
if (path.equals("/")) {
System.err.println("路径格式不对,只有一个/:" + path);
return false;
}
return true;
} /**
* 获得父级目录
* @param path /root/abc
* @return
*/
private String getParentPath(String path) {
int index = path.lastIndexOf("/");
return path.substring(0, index);
} public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
ZookManager zoo = new ZookManager("192.168.23.24:2181");
zoo.setData("/top/enjoy/abc", "abc");
zoo.setData("/top/enjoy/bbb", "bbb");
zoo.setData("/top/enjoy/ccc", "ccc");
System.out.println("成功新增");
zoo.delete("/top/enjoy");
System.out.println("成功删除");
}
}

Java操作zookeeper的更多相关文章

  1. java 操作zookeeper

    java 操作zookeeper(一) 首先要使用java操作zookeeper,zookeeper的javaclient 使我们更轻松的去对zookeeper进行各种操作,我们引入zookeeper ...

  2. zookeeper(三):java操作zookeeper

    引入jar包 首先要使用java操作zookeeper,zookeeper的javaclient 使我们更轻松的去对zookeeper进行各种操作,我们引入zookeeper-3.4.5.jar 和 ...

  3. Java 使用ZkClient操作Zookeeper

    目录 ZkClient介绍 导入jar包依赖 简单使用样例 ZkClient介绍 因为Zookeeper API比较复杂,使用并不方便,所以出现了ZkClient,ZkClient对Zookeeper ...

  4. Java代码操作zookeeper

    .personSunflowerP { background: rgba(51, 153, 0, 0.66); border-bottom: 1px solid rgba(0, 102, 0, 1); ...

  5. Zookeeper--java操作zookeeper

    如果是使用java操作zookeeper,zookeeper的javaclient 使我们更轻松的去对zookeeper进行各种操作,我们引入zookeeper-3.4.5.jar 和 zkclien ...

  6. Zookeeper入门&lpar;七&rpar;之Java连接Zookeeper

    Java操作Zookeeper很简单,但是前提要把包导对. 关于Zookeeper的Linux环境搭建可以参考我的这篇博客:Linux环境下Zookeeper安装 下面进入正题: 一.导入依赖 &lt ...

  7. Java curator操作zookeeper获取kafka

    Java curator操作zookeeper获取kafka Curator是Netflix公司开源的一个Zookeeper客户端,与Zookeeper提供的原生客户端相比,Curator的抽象层次更 ...

  8. Hbase深入学习&lpar;六&rpar; Java操作HBase

    Hbase深入学习(六) ―― Java操作HBase 本文讲述如何用hbase shell命令和hbase java api对hbase服务器进行操作. 先看以下读取一行记录hbase是如何进行工作 ...

  9. Java操作hbase总结

    用过以后,总得写个总结,不然,就忘喽. 一.寻找操作的jar包. java操作hbase,首先要考虑到使用hbase的jar包. 因为咱装的是CDH5,比较方便,使用SecureCRT工具,远程连接到 ...

随机推荐

  1. Java 集合 - HashSet

    一.源码解析 public class HashSet<E> extends AbstractSet<E> implements Set<E>, Cloneable ...

  2. 关于window&period;onload

    window.onload是当文档加载完成后执行. <script>之间的代码会在代码加载到此处执行.function内的代码是调用时才执行. 但window.onload有个坏处,它非要 ...

  3. 布局 position

    position : 设置定位方式 跟『定位』相关的有一些属性,最重要的一个是『position』,它主要是设置『定位方式』. 而定位方式最重要的是设置『参照物』. 配合 position 使用的有这 ...

  4. CSS中样式覆盖优先顺序

    原文地址:http://www.3lian.com/edu/2014/09-25/168393.html 层叠优先级是: 浏览器缺省 < 外部样式表 < 内部样式表 < 内联样式 其 ...

  5. android AppWidgwtProvider学习

    实现AppWidgwtProvider: onUpdate() //在达到制定的更新时间之后或者当用户向桌面添加   App Widget时会调用该方法. onDeleted() //当App Wid ...

  6. Android Intent实现页面跳转

      Intent可以来协助完成Android各个组件之间的通信   1:startActivity(intent);     //直接启动                /*              ...

  7. scala map

    map 返回元组 下面是如果不存在key 报错 a.(2) 下面两种方式 不同的写法. 如果存在 Some(2) 不存在 None a get 2 a.get(2) getOrElse 如果不存在 回 ...

  8. AsyncTask onPreExecute方法用于在执行后台任务前做一些UI操作

    1.实例化 TableListsTask task = new TableListsTask(ServerIP,"ALL", MenuActivity.this);   //第三参 ...

  9. let和const

    ES6新增了let取代var,let主要有以下特点. 1 只在代码块内有效,代码块外不能使用let声明的变量.let很适合声明循环体的变量. 它可以解决一些闭包的问题存在的问题比如: var a = ...

  10. react16实战总结

    实战总结 react实战基础 遇到的一些坑 li里要带key值否则会警告,这个问题在vue中也存在, 考虑到虚拟DOM中diff,所以不要用index作为key值,而要用item. 2.immutab ...