zookeeper集群之开源客户端Curator的使用(五)

时间:2021-11-03 21:13:24

package com.ilike.testCurator;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.retry.RetryUntilElapsed;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;

/**
* zookeeper的开源客户端curator的使用
* 创建,删除,修改,读取节点, 获取子节点,判断节点是否存在,监听节点变化,监听子节点变化
*
* @author 桑伟东
*
*/
public class TestCurator {
// curator 的操作连接对象
final CuratorFramework client2;
// 创建线程池
ExecutorService es = Executors.newFixedThreadPool(5);

{
    /**
     * 重试策略1(使用Curator自带的重试策略)
     * 第一个参数为基本重试时间,指的是第一次重试的间隔时间为1秒,之后会越来越久,但是最大重试次数为三次 第二个参数为最大重试次数
     */
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    /**
     * 重试策略2(使用Curator自带的重试策略) 第一个参数为共重试的次数 第二个参数为两次重试之间的时间间隔
     */
    RetryPolicy retryPolicy2 = new RetryNTimes(3, 1000);
    /**
     * 重试策略3(使用Curator自带的重试策略) 第一个参数为从开始重试到结束重试不能超过这么长时间 第二个参数为两次重试之间的时间间隔
     */
    RetryPolicy retryPolicy3 = new RetryUntilElapsed(5000, 1000);
    // 第一种连接方式 最后一个参数为连接失败后的重试策略
    CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.111.129:2181", 5000, 5000, retryPolicy);
    // 第二种连接方式
    client2 = CuratorFrameworkFactory.builder().connectString("192.168.111.129:2181").sessionTimeoutMs(5000)
            .connectionTimeoutMs(5000).retryPolicy(retryPolicy3).build();
    client2.start();
}

/**
 * 创建节点
 * 
 * @throws Exception
 */
@Test
public void testCreateNode() throws Exception {
    client2.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/node_20/node_20_1",
            "7899".getBytes());

    Thread.sleep(Integer.MAX_VALUE);
}

/**
 * 删除节点
 * 
 * @throws Exception
 */
@Test
public void testDelNode() throws Exception {
    // 删除单节点
    // client2.delete().withVersion(-1).forPath("/node_20/node_20_1");
    // 删除带有子节点的节点
    client2.delete().guaranteed().deletingChildrenIfNeeded().withVersion(-1).forPath("/node_20");
    Thread.sleep(Integer.MAX_VALUE);

}

/**
 * 获取子节点
 * 
 * @throws Exception
 */
@Test
public void testGetChild() throws Exception {
    // 获取子节点
    List<String> childs = client2.getChildren().forPath("/");
    System.out.println(childs);

}

/**
 * 获取节点数据
 * 
 * @throws Exception
 */
@Test
public void testGetData() throws Exception {
    // 获取节点的数据内容
    byte[] bs = client2.getData().forPath("/node_4");
    System.out.println(new String(bs));
    // 获取节点的数据及其状态信息
    Stat stat = new Stat();
    byte[] bs2 = client2.getData().storingStatIn(stat).forPath("/node_4");
    System.out.println(new String(bs2));
    System.out.println(stat);

}

/**
 * 判断节点是否存在
 * 
 * @throws Exception
 */
@Test
public void testNodeExists() throws Exception {
    // 判断节点是否存在(同步调用),返回zookeeper的stat对象,如果节点不存在,返回null
    // Stat stat=client2.checkExists().forPath("/node_40");
    // System.out.println(stat);

    // 判断节点是否存在(异步调用)
    client2.checkExists().inBackground(new BackgroundCallback() {
        /**
         * 异步调用会增加系统开销,如果在系统中存在大量的异步调用,就会非常耗费系统资源,因此需要创建线程池
         */
        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
            // TODO Auto-generated method stub
            // 获取事件的类型
            CuratorEventType type = event.getType();
            // 获取操作的返回码
            int code = event.getResultCode();// 执行成功,得到0,否则就是非0
            // 获取异步调用的上下文
            Object context = event.getContext();
            // 获取触发该事件的节点路径
            String path = event.getPath();
            // 获取子节点的列表
            List<String> childs = event.getChildren();
            // 获取节点的数据
            byte[] bs = event.getData();
            String data = new String(bs);
            StringBuilder sb = new StringBuilder();
            sb.append("type:" + type).append("\n");
            sb.append("code:" + code).append("\n");
            sb.append("context:" + context).append("\n");
            sb.append("path:" + path).append("\n");
            sb.append("childs:" + childs).append("\n");
            sb.append("data:" + data).append("\n");
            System.out.println(sb.toString());
        }
    }, "异步调用判断节点是否存在", es).forPath("/node_4");
    Thread.sleep(Integer.MAX_VALUE);
}

/**
 * 修改节点数据
 * 
 * @throws Exception
 */
@Test
public void testUpdateData() throws Exception {
    Stat stat = new Stat();
    client2.getData().storingStatIn(stat).forPath("/node_4");
    client2.setData().withVersion(stat.getVersion()).forPath("/node_4", "sda89ss".getBytes());
}

/**
 * 监听节点数据的变化
 * 
 * @throws Exception
 */
@Test
public void testNodeListener() throws Exception {
    // 删除单节点
    // client2.delete().withVersion(-1).forPath("/node_20/node_20_1");
    // 删除带有子节点的节点
    client2.delete().guaranteed().deletingChildrenIfNeeded().withVersion(-1).forPath("/node_20");
    Thread.sleep(Integer.MAX_VALUE);
    // 修改节点数据
    Stat stat = new Stat();
    client2.getData().storingStatIn(stat).forPath("/node_4");
    client2.setData().withVersion(stat.getVersion()).forPath("/node_4", "sda89ss".getBytes());
}

/**
 * 监听子节点的变化
 * 
 * @throws Exception
 */
@Test
public void testChildListener() throws Exception {
    // 监听子节点,第三个参数为监听到节点变化时,是否获取子节点内容
    final PathChildrenCache cache = new PathChildrenCache(client2, "/node_4", true);
    // 开启监听
    cache.start();
    // 注册监听器
    cache.getListenable().addListener(new PathChildrenCacheListener() {

        public void childEvent(CuratorFramework clent, PathChildrenCacheEvent event) throws Exception {
            // TODO Auto-generated method stub
            switch (event.getType()) {
            case CHILD_ADDED:
                System.out.println("增加了子节点:" + event.getData());
                break;
            case CHILD_UPDATED:
                System.out.println("子节点的数据内容发生改变:" + event.getData());
                break;
            case CHILD_REMOVED:
                System.out.println("子节点被删除:" + event.getData());
                break;
            default:
                break;
            }

        }
    });

    Thread.sleep(Integer.MAX_VALUE);
}

}