Curator框架基础使用

时间:2023-03-09 03:35:43
Curator框架基础使用

为了更好的实现java操作zookeeper服务器。后来出现Curator框架,非常强大,目前已经是Apache的*项目,有丰富的操作,,例如:session超时重连,主从选举。分布式计数器,分布式锁,等等适用于各种复杂的zookeeper场景api封装

maven依赖

<dependency>

  <groupld>org.apache.curator</groupld>

  <artifactld>curator-framewprk</artifactld>

  <version>2.4.2</version>

</dependency>

Curator 框架中使用链式变成风格,易读性更强,使用工程方法创建连接对象。

1.使用CuratorFrameworkFactory 的两个静态工厂方法(参数不同) 来实现:

参数1:connectString ,连接串

参数2:retryPolicy 重试连接策略,有四种实现,分别为:ExponentialBackoffRetry, RetryNtimes,RetryOneTimes ,RetryUntilElapsed

参数3:sessionTimeoutMs 会话超时时间,默认为60 000ms

参数4:connectionTimeoutMs 连接超时时间,默认为15 000ms

注意,对于RetryPolicy策略通过一个接口来让用户自定义实现

2.创建节点create方法,可选链式:

creatingParentslfNeeded、withMode、forPath、withACL等

3、删除节点delete方法,可选链式项

deletingClildrenlfNeeded、guaranteed、withVersion 、forPath等

4、读取和修改数据getData、setData方法

5、异步绑定回调方法,比如创建节点时绑定一个回调函数,该回调函数可以输出服务器状态码以及服务器事件类型,还可以加入一个线程池进行优化操作,

6、读取子节点方法getChildren

7、判断节点是否存在方法checkExists

 package bjsxt.curator.base;

 import java.util.List;

 import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.data.Stat; public class CuratorBase { /** zookeeper地址 */
static final String CONNECT_ADDR = "192.168.2.2:2181";
/** session超时时间 */
static final int SESSION_OUTTIME = 5000;// ms public static void main(String[] args) throws Exception { // 1 重试策略:初试时间为1s 重试10次
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
// 2 通过工厂创建连接
CuratorFramework cf = CuratorFrameworkFactory.builder()
.connectString(CONNECT_ADDR).sessionTimeoutMs(SESSION_OUTTIME)
.retryPolicy(retryPolicy)
// .namespace("super")
.build();
// 3 开启连接
cf.start(); System.out.println(States.CONNECTED);
System.out.println(cf.getState()); // 新加、删除 // 4 建立节点 指定节点类型(不加withMode默认为持久类型节点)、路径、数据内容
/**
* cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
* .forPath("/super/c1", "c1内容".getBytes());
*/
// 5 删除节点
/**
* cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super")
* ;
*/
// 读取、修改 // 创建节点
/**
* cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
* .forPath("/super/c1", "c1内容".getBytes());
* cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
* .forPath("/super/c2", "c2内容".getBytes());
*/
// 读取节点
/**
* String ret1 = new String(cf.getData().forPath("/super/c2"));
* System.out.println(ret1); // 修改节点 cf.setData().forPath("/super/c2",
* "修改c2内容".getBytes()); String ret2 = new
* String(cf.getData().forPath("/super/c2")); System.out.println(ret2);
*/ // 绑定回调函数 /**
* ExecutorService pool = Executors.newCachedThreadPool();
* cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
* .inBackground(new BackgroundCallback() {
*
* @Override public void processResult(CuratorFramework cf, CuratorEvent
* ce) throws Exception { System.out.println("code:" +
* ce.getResultCode()); System.out.println("type:" +
* ce.getType()); System.out.println("线程为:" +
* Thread.currentThread().getName()); } },
* pool).forPath("/super/c3", "c3内容".getBytes());
* Thread.sleep(Integer.MAX_VALUE);
*/ // 读取子节点getChildren方法 和 判断节点是否存在checkExists方法 List<String> list = cf.getChildren().forPath("/super");
for (String p : list) {
System.out.println(p);
} Stat stat = cf.checkExists().forPath("/super/c3");
System.out.println(stat); Thread.sleep(2000);
cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super"); // cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super"); }
}