zookeeper(四)核心watch和watcher

时间:2022-08-30 14:23:05

zookeeper有watch事件,是一次性触发的,当watch监视的数据发生变化时,通知设置了该watch的client,即watcher。

同样,其watcher是监听数据发送了某些变化,那就一定会有对应的事件类型,和状态类型。

事件类型:(znode节点相关的)

EventType.NodeCreated

EventType.NodeDataChanged

EventType.NodeChildrenChanged

EventType.NodeDeleted

状态类型:(是跟客户端实例相关的)

KeeperState.Disconnected

KeeperState.SyncConnected

KeeperState.AuthFailed

KeeperState.Expired

 

wather的特性:一次性,客户端串行执行,轻量。

一次性:对于ZK的watcher,你只需要记住一点:zookeeper有watch事件,是一次性触发的,当watch监视的数据发生变化时,通知设置了该watch的client,即watcher,由于zookeeper的监控都是一次性的,所以每次都必须监控。

客户端串行执行:客户端watcher回调的过程是一个串行同步的过程,这为我们保证了顺序,同时需要开发人员注意一点,千万不用因为一个watcher的处理逻辑影响了整个客户端的watcher回调。

轻量:WatcherEvent是Zookeeper整个Watcher通知机制的最小通知单元。整个单元结构只包含三部分:通知状态,事件类型和节点路径。也就是说Watcher通知非常的简单,只会告诉客户端发生了事件而不会告知其具体内容,需要客户自己去进行获取,而不会直接提供具体的数据内容。

 

我们通过一个示例,详细学习下Watcher的概念和其目的。Watcher示例:

【ZookeeperWatcher】

 

  1 package bhz.zookeeper.watcher;
2
3 import java.util.List;
4 import java.util.concurrent.CountDownLatch;
5 import java.util.concurrent.atomic.AtomicInteger;
6
7 import org.apache.zookeeper.CreateMode;
8 import org.apache.zookeeper.WatchedEvent;
9 import org.apache.zookeeper.Watcher;
10 import org.apache.zookeeper.Watcher.Event.EventType;
11 import org.apache.zookeeper.Watcher.Event.KeeperState;
12 import org.apache.zookeeper.ZooDefs.Ids;
13 import org.apache.zookeeper.ZooKeeper;
14 import org.apache.zookeeper.data.Stat;
15
16 /**
17 * Zookeeper Wathcher
18 * 本类就是一个Watcher类(实现了org.apache.zookeeper.Watcher类)
19 * @author(alienware)
20 * @since 2015-6-14
21 */
22 public class ZooKeeperWatcherYuCong implements Watcher {
23
24 /** 定义原子变量 */
25 AtomicInteger seq = new AtomicInteger();
26 /** 定义session失效时间 */
27 private static final int SESSION_TIMEOUT = 10000;
28 /** zookeeper服务器地址 */
29 private static final String CONNECTION_ADDR = "127.0.0.1:2181";
30 /** zk父路径设置 */
31 private static final String PARENT_PATH = "/p";
32 /** zk子路径设置 */
33 private static final String CHILDREN_PATH = "/p/c";
34 /** 进入标识 */
35 private static final String LOG_PREFIX_OF_MAIN = "【Main】";
36 /** zk变量 */
37 private ZooKeeper zk = null;
38 /**用于等待zookeeper连接建立之后 通知阻塞程序继续向下执行 */
39 private CountDownLatch connectedSemaphore = new CountDownLatch(1);
40
41 /**
42 * 创建ZK连接
43 * @param connectAddr ZK服务器地址列表
44 * @param sessionTimeout Session超时时间
45 */
46 public void createConnection(String connectAddr, int sessionTimeout) {
47 this.releaseConnection();
48 try {
49 //this表示把当前对象进行传递到其中去(也就是在主函数里实例化的new ZooKeeperWatcher()实例对象)
50 zk = new ZooKeeper(connectAddr, sessionTimeout, this);
51 System.out.println(LOG_PREFIX_OF_MAIN + "开始连接ZK服务器");
52 connectedSemaphore.await();
53 } catch (Exception e) {
54 e.printStackTrace();
55 }
56 }
57
58 /**
59 * 关闭ZK连接
60 */
61 public void releaseConnection() {
62 if (this.zk != null) {
63 try {
64 this.zk.close();
65 } catch (InterruptedException e) {
66 e.printStackTrace();
67 }
68 }
69 }
70
71 /**
72 * 创建节点
73 * @param path 节点路径
74 * @param data 数据内容
75 * @return
76 */
77 public boolean createPath(String path, String data, boolean needWatch) {
78 try {
79 //设置监控(由于zookeeper的监控都是一次性的所以 每次必须设置监控)
80 this.zk.exists(path, needWatch);
81 System.out.println(LOG_PREFIX_OF_MAIN + "节点创建成功, Path: " +
82 this.zk.create( /**路径*/
83 path,
84 /**数据*/
85 data.getBytes(),
86 /**所有可见*/
87 Ids.OPEN_ACL_UNSAFE,
88 /**永久存储*/
89 CreateMode.PERSISTENT ) +
90 ", content: " + data);
91 } catch (Exception e) {
92 e.printStackTrace();
93 return false;
94 }
95 return true;
96 }
97
98 /**
99 * 读取指定节点数据内容
100 * @param path 节点路径
101 * @return
102 */
103 public String readData(String path, boolean needWatch) {
104 try {
105 System.out.println("读取数据操作...");
106 return new String(this.zk.getData(path, needWatch, null));
107 } catch (Exception e) {
108 e.printStackTrace();
109 return "";
110 }
111 }
112
113 /**
114 * 更新指定节点数据内容
115 * @param path 节点路径
116 * @param data 数据内容
117 * @return
118 */
119 public boolean writeData(String path, String data) {
120 try {
121 System.out.println(LOG_PREFIX_OF_MAIN + "更新数据成功,path:" + path + ", stat: " +
122 this.zk.setData(path, data.getBytes(), -1));
123 } catch (Exception e) {
124 e.printStackTrace();
125 return false;
126 }
127 return true;
128 }
129
130 /**
131 * 删除指定节点
132 *
133 * @param path
134 * 节点path
135 */
136 public void deleteNode(String path) {
137 try {
138 this.zk.delete(path, -1);
139 System.out.println(LOG_PREFIX_OF_MAIN + "删除节点成功,path:" + path);
140 } catch (Exception e) {
141 e.printStackTrace();
142 }
143 }
144
145 /**
146 * 判断指定节点是否存在
147 * @param path 节点路径
148 */
149 public Stat exists(String path, boolean needWatch) {
150 try {
151 return this.zk.exists(path, needWatch);
152 } catch (Exception e) {
153 e.printStackTrace();
154 return null;
155 }
156 }
157
158 /**
159 * 获取子节点
160 * @param path 节点路径
161 */
162 private List<String> getChildren(String path, boolean needWatch) {
163 try {
164 System.out.println("读取子节点操作...");
165 return this.zk.getChildren(path, needWatch);
166 } catch (Exception e) {
167 e.printStackTrace();
168 return null;
169 }
170 }
171
172 /**
173 * 删除所有节点
174 */
175 public void deleteAllTestPath(boolean needWatch) {
176 if(this.exists(CHILDREN_PATH, needWatch) != null){
177 this.deleteNode(CHILDREN_PATH);
178 }
179 if(this.exists(PARENT_PATH, needWatch) != null){
180 this.deleteNode(PARENT_PATH);
181 }
182 }
183
184 /**
185 * 收到来自Server的Watcher通知后的处理。
186 */
187 @Override
188 public void process(WatchedEvent event) {
189
190 System.out.println("进入 process 。。。。。event = " + event);
191
192 try {
193 Thread.sleep(200);
194 } catch (InterruptedException e) {
195 e.printStackTrace();
196 }
197
198 if (event == null) {
199 return;
200 }
201
202 // 连接状态
203 KeeperState keeperState = event.getState();
204 // 事件类型
205 EventType eventType = event.getType();
206 // 受影响的path
207 String path = event.getPath();
208 //原子对象seq 记录进入process的次数
209 String logPrefix = "【Watcher-" + this.seq.incrementAndGet() + "】";
210
211 System.out.println(logPrefix + "收到Watcher通知");
212 System.out.println(logPrefix + "连接状态:\t" + keeperState.toString());
213 System.out.println(logPrefix + "事件类型:\t" + eventType.toString());
214
215 if (KeeperState.SyncConnected == keeperState) {
216 // 成功连接上ZK服务器
217 if (EventType.None == eventType) {
218 System.out.println(logPrefix + "成功连接上ZK服务器");
219 connectedSemaphore.countDown();
220 }
221 //创建节点
222 else if (EventType.NodeCreated == eventType) {
223 System.out.println(logPrefix + "节点创建");
224 try {
225 Thread.sleep(100);
226 } catch (InterruptedException e) {
227 e.printStackTrace();
228 }
229 }
230 //更新节点
231 else if (EventType.NodeDataChanged == eventType) {
232 System.out.println(logPrefix + "节点数据更新");
233 try {
234 Thread.sleep(100);
235 } catch (InterruptedException e) {
236 e.printStackTrace();
237 }
238 }
239 //更新子节点
240 else if (EventType.NodeChildrenChanged == eventType) {
241 System.out.println(logPrefix + "子节点变更");
242 try {
243 Thread.sleep(3000);
244 } catch (InterruptedException e) {
245 e.printStackTrace();
246 }
247 }
248 //删除节点
249 else if (EventType.NodeDeleted == eventType) {
250 System.out.println(logPrefix + "节点 " + path + " 被删除");
251 }
252 else {
253 System.out.println(logPrefix + "其他事件:" + eventType);
254 };
255 }
256 else if (KeeperState.Disconnected == keeperState) {
257 System.out.println(logPrefix + "与ZK服务器断开连接");
258 }
259 else if (KeeperState.AuthFailed == keeperState) {
260 System.out.println(logPrefix + "权限检查失败");
261 }
262 else if (KeeperState.Expired == keeperState) {
263 System.out.println(logPrefix + "会话失效");
264 }
265 else {
266 System.out.println(logPrefix + "其他状态:" + keeperState);
267 };
268
269 System.out.println("--------------------------------------------");
270
271 }
272
273 /**
274 * <B>方法名称:</B>测试zookeeper监控<BR>
275 * <B>概要说明:</B>主要测试watch功能<BR>
276 * @param args
277 * @throws Exception
278 */
279 public static void main(String[] args) throws Exception {
280
281 //建立watcher //当前客户端可以称为一个watcher 观察者角色
282 ZooKeeperWatcherYuCong zkWatch = new ZooKeeperWatcherYuCong();
283 //创建连接
284 zkWatch.createConnection(CONNECTION_ADDR, SESSION_TIMEOUT);
285 System.out.println(zkWatch.zk.toString());
286
287 Thread.sleep(1000);
288
289 // 清理节点
290 zkWatch.deleteAllTestPath(false);
291
292 //-----------------第一步: 创建父节点 /p ------------------------//
293 if (zkWatch.createPath(PARENT_PATH, System.currentTimeMillis() + "", false)) {
294
295 Thread.sleep(1000);
296
297 //-----------------第二步: 读取节点 /p 和 读取/p节点下的子节点(getChildren)的区别 --------------//
298 // 读取数据
299 zkWatch.readData(PARENT_PATH, false);
300
301 // 读取子节点(监控childNodeChange事件)
302 zkWatch.getChildren(PARENT_PATH, false);
303
304 // 更新数据
305 zkWatch.writeData(PARENT_PATH, System.currentTimeMillis() + "");
306 Thread.sleep(1000);
307
308 // 创建子节点
309 zkWatch.createPath(CHILDREN_PATH, System.currentTimeMillis() + "", false);
310
311
312 //-----------------第三步: 建立子节点的触发 --------------//
313 // zkWatch.createPath(CHILDREN_PATH + "/c1", System.currentTimeMillis() + "", true);
314 // zkWatch.createPath(CHILDREN_PATH + "/c1/c2", System.currentTimeMillis() + "", true);
315
316 //-----------------第四步: 更新子节点数据的触发 --------------//
317 //在进行修改之前,我们需要watch一下这个节点:
318 Thread.sleep(1000);
319 zkWatch.readData(CHILDREN_PATH, true);
320 zkWatch.writeData(CHILDREN_PATH, System.currentTimeMillis() + "");
321
322 }
323
324 Thread.sleep(10000);
325 // 清理节点
326 zkWatch.deleteAllTestPath(false);
327
328
329 Thread.sleep(10000);
330 zkWatch.releaseConnection();
331
332 }
333
334 }