zookeeper开发

时间:2023-03-09 19:53:13
zookeeper开发

环境
  虚拟机:VMware 10
  Linux版本:CentOS-6.5-x86_64
  客户端:Xshell4
  FTP:Xftp4
  jdk8
  zookeeper-3.4.11

ZK客户端操作命令:

#登录ZK客户端
[root@PCS102 bin]# ./zkCli.sh
Connecting to localhost:
Welcome to ZooKeeper!
JLine support is enabled WATCHER:: WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:(CONNECTED) ] #查看客户端有哪些命令:
[zk: localhost:(CONNECTED) ] help
ZooKeeper -server host:port cmd args
stat path [watch]
set path data [version]
ls path [watch]
delquota [-n|-b] path
ls2 path [watch]
setAcl path acl
setquota -n|-b val path
history
redo cmdno
printwatches on|off
delete path [version]
sync path
listquota path
rmr path
get path [watch]
create [-s] [-e] path data acl
addauth scheme auth
quit
getAcl path
close
connect host:port
#创建节点
[zk: localhost:(CONNECTED) ] create /wjy hello
Created /wjy
#列出根目录下所有节点
[zk: localhost:(CONNECTED) ] ls /
[wjy,zookeeper]
#获取/wjy节点的值
[zk: localhost:(CONNECTED) ] get /wjy
hello
cZxid = 0x411320000000d
ctime = Mon Mar :: CST
mZxid = 0x411320000000d
mtime = Mon Mar :: CST
pZxid = 0x411320000000d
cversion =
dataVersion =
aclVersion =
ephemeralOwner = 0x0
dataLength =
numChildren =
#更新/wjy节点的值
[zk: localhost:(CONNECTED) ] set /wjy nihao
cZxid = 0x411320000000d
ctime = Mon Mar :: CST
mZxid = 0x411320000000e
mtime = Mon Mar :: CST
pZxid = 0x411320000000d
cversion =
dataVersion =
aclVersion =
ephemeralOwner = 0x0
dataLength =
numChildren =
[zk: localhost:(CONNECTED) ] 分别代表的时间戳对应为:
cZxid
    对应为该节点的创建时间(Create) mZxid
    对应该节点的最近一次修改的时间(Mofify)
    与其子节点无关 pZxid
    这个节点就和子节点有关啦!
    是与 该节点的子节点(或该节点)的最近一次 创建 / 删除 的时间戳对应
    注:只与 本节点 / 该节点的子节点,有关;与孙子节点无关。

java操作示例:

package testZK;

import java.io.IOException;
import java.util.List; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; public class ZookeeperTest { /**session过期时间*/
private static final int SESSION_TIMEOUT = 30000; public static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperTest.class); /**定义监听类*/
private Watcher watcher = new Watcher() { public void process(WatchedEvent event) {
LOGGER.info("process : " + event.getType());
}
}; private ZooKeeper zooKeeper; /**
* 连接ZK集群并启用监听对象watcher
* @throws IOException
*/
@Before
public void connect() throws IOException {
// indicate : all servers
zooKeeper = new ZooKeeper("192.168.133.19:2181,192.168.133.20:2181,192.168.133.21:2181",
SESSION_TIMEOUT,
watcher);
} /**
* 关闭
*/
@After
public void close() {
try {
zooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
} /**
* 创建 znode
* 1.CreateMode
* PERSISTENT
* PERSISTENT_SEQUENTIAL 序列化的持久节点
* EPHEMERAL 临时节点 session过期 就会消失
* EPHEMERAL_SEQUENTIAL 序列化的临时节点
* Access Control List: 访问控制列表
* https://baike.baidu.com/item/ACL/362453?fr=aladdin
* OPEN_ACL_UNSAFE: ANYONE CAN VISIT
* <br>------------------------------<br>
*/
@Test
public void testCreate() {
String result = null;
try {
result = zooKeeper.create("/zk002", //路径
"zk002data-e".getBytes(), //赋值 注意放的是字节
Ids.OPEN_ACL_UNSAFE, //访问的安全模式
CreateMode.EPHEMERAL); //节点类型 PERSISTENT 持久化节点 EPHEMERAL 临时的节点 Thread.sleep(30000);
} catch (Exception e) {
LOGGER.error(e.getMessage());
Assert.fail();
}
LOGGER.info("create result : {}", result);
} /**
* 删除
*/
@Test
public void testDelete() {
try
{
zooKeeper.delete("/zk001", -1);
}
catch (Exception e)
{
LOGGER.error(e.getMessage());
Assert.fail();
}
} /**
* 获取值
*/
@Test
public void testGetData() {
String result = null;
try
{
byte[] bytes = zooKeeper.getData("/sxt", null, null);
result = new String(bytes);
}
catch (Exception e)
{
LOGGER.error(e.getMessage());
Assert.fail();
}
LOGGER.info("getdata result--------- : {}", result);
} @Test
public void testGetData01() throws Exception {
String result = null;
try {
byte[] bytes = zooKeeper.getData("/zk001", null, null);
result = new String(bytes);
} catch (Exception e) {
LOGGER.error(e.getMessage());
Assert.fail();
}
LOGGER.info("getdata result--1------ : {}", result); Thread.sleep(30000); byte[] bytes;
try
{
bytes = zooKeeper.getData("/zk001", null, null);
result = new String(bytes);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
LOGGER.info("getdata result--2----- : {}", result); } /**
* 注册监听事件
*/
@Test
public void testGetDataWatch() {
String result = null;
try {
System.out.println("get:");
byte[] bytes = zooKeeper.getData("/zk001", new Watcher() {
public void process(WatchedEvent event) {
LOGGER.info("testGetDataWatch watch : {}", event.getType());
System.out.println("watcher ok");
}
}, null);
result = new String(bytes);
} catch (Exception e) {
LOGGER.error(e.getMessage());
Assert.fail();
}
LOGGER.info("getdata result------------------------------------------ : {}", result); // Watcher会单独起一个线程来处理 仅注册监听1次
try {
System.out.println("set1:");
zooKeeper.setData("/zk001", "testSetDataWAWWW".getBytes(), -1);
System.out.println("set2:");
zooKeeper.setData("/zk001", "testSetDataWAWWW".getBytes(), -1);
} catch (Exception e) {
LOGGER.error(e.getMessage());
Assert.fail();
}
System.out.println("over");
} /**
* 判断是否存在某个节点
*/
@Test
public void testExists() {
Stat stat = null;
try{
stat = zooKeeper.exists("/zk001", false);
} catch (Exception e) {
LOGGER.error(e.getMessage());
Assert.fail();
}
Assert.assertNotNull(stat);
LOGGER.info("exists result : {}", stat.getCzxid());
} /**
* 赋值
*/
@Test
public void testSetData() {
Stat stat = null;
try {
stat = zooKeeper.setData("/zk001", "testSetData".getBytes(), -1);
} catch (Exception e) {
LOGGER.error(e.getMessage());
Assert.fail();
}
Assert.assertNotNull(stat);
LOGGER.info("exists result : {}", stat.getVersion());
} /**
*
*/
@Test
public void testExistsWatch1() {
Stat stat = null;
try {
stat = zooKeeper.exists("/zk001", true);
} catch (Exception e) {
LOGGER.error(e.getMessage());
Assert.fail();
}
Assert.assertNotNull(stat); try {
zooKeeper.delete("/zk001", -1);
} catch (Exception e) {
e.printStackTrace();
}
} /**
* 校验节点是否存在
*/
@Test
public void testExistsWatch2() {
Stat stat = null;
try {
stat = zooKeeper.exists("/zk002", new Watcher() {
@Override
public void process(WatchedEvent event) {
LOGGER.info("testExistsWatch2 watch : {}", event.getType());
}
});
} catch (Exception e) {
LOGGER.error(e.getMessage());
Assert.fail();
}
Assert.assertNotNull(stat); // 赋值
try {
zooKeeper.setData("/zk002", "testExistsWatch2".getBytes(), -1);
} catch (Exception e) {
e.printStackTrace();
} // 删除
try {
zooKeeper.delete("/zk002", -1);
} catch (Exception e) {
e.printStackTrace();
}
} /**
* 获取子节点
*/
@Test
public void testGetChild() {
try {
zooKeeper.create("/zk/001", "001".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zooKeeper.create("/zk/002", "002".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); List<String> list = zooKeeper.getChildren("/zk", true);
for (String node : list) {
LOGGER.info("fffffff {}", node);
}
} catch (Exception e) {
LOGGER.error(e.getMessage());
Assert.fail();
}
}
}

代码连接:

https://download.****.net/download/cac2020/11002142