基于ZooKeeper实现队列源码

时间:2022-10-06 11:49:49

实现原理

先进先出队列是最常用的队列,使用Zookeeper实现先进先出队列就是在特定的目录下创建PERSISTENT_EQUENTIAL节点,创建成功时Watcher通知等待的队列,队列删除序列号最小的节点用以消费。此场景下Zookeeper的znode用于消息存储,znode存储的数据就是消息队列中的消息内容,SEQUENTIAL序列号就是消息的编号,按序取出即可。由于创建的节点是持久化的,所以不必担心队列消息的丢失问题。

队列(Queue)

分布式队列是通用的数据结构,为了在 Zookeeper 中实现分布式队列,首先需要指定一个 Znode 节点作为队列节点(queue node), 各个分布式客户端通过调用 create() 函数向队列中放入数据,调用create()时节点路径名带"qn-"结尾,并设置顺序(sequence)节点标志。 由于设置了节点的顺序标志,新的路径名具有以下字符串模式:"_path-to-queue-node_/qn-X",X 是唯一自增号。需要从队列中获取数据/移除数据的客户端首先调用 getChildren() 函数,有数据则获取(获取数据后可以删除也可以不删),没有则在队列节点(queue node)上将 watch 设置为 true,等待触发并处理最小序号的节点(即从序号最小的节点中取数据)。

应用场景

Zookeeper队列不太适合要求高性能的场合,但可以在数据量不大的情况下考虑使用。比如已在项目中使用Zookeeper又需要小规模的队列应用,这时可以使用Zookeeper实现的队列;毕竟引进一个消息中间件会增加系统的复杂性和运维的压力。

详细代码

ZookeeperClient工具类

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package org.massive.common;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
 * Created by Massive on 2016/12/18.
 */
public class ZooKeeperClient {
 private static String connectionString = "localhost:2181";
 private static int sessionTimeout = 10000;
 public static ZooKeeper getInstance() throws IOException, InterruptedException {
 //--------------------------------------------------------------
 // 为避免连接还未完成就执行zookeeper的get/create/exists操作引起的(KeeperErrorCode = ConnectionLoss)
 // 这里等Zookeeper的连接完成才返回实例
 //--------------------------------------------------------------
 final CountDownLatch connectedSignal = new CountDownLatch(1);
 ZooKeeper zk = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
  @Override
  public void process(WatchedEvent event) {
   if (event.getState() == Event.KeeperState.SyncConnected) {
   connectedSignal.countDown();
   } else if (event.getState() == Event.KeeperState.Expired) {
   }
  }
  });
 connectedSignal.await(sessionTimeout, TimeUnit.MILLISECONDS);
 return zk;
 }
 public static int getSessionTimeout() {
 return sessionTimeout;
 }
 public static void setSessionTimeout(int sessionTimeout) {
 ZooKeeperClient.sessionTimeout = sessionTimeout;
 }
}

ZooKeeperQueue

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package org.massive.queue;
import org.apache.commons.lang3.RandomUtils;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.massive.common.ZooKeeperClient;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
/**
 * Created by Allen on 2016/12/22.
 */
public class ZooKeeperQueue {
 private ZooKeeper zk;
 private int sessionTimeout;
 private static byte[] ROOT_QUEUE_DATA = {0x12,0x34};
 private static String QUEUE_ROOT = "/QUEUE";
 private String queueName;
 private String queuePath;
 private Object mutex = new Object();
 public ZooKeeperQueue(String queueName) throws IOException, KeeperException, InterruptedException {
 this.queueName = queueName;
 this.queuePath = QUEUE_ROOT + "/" + queueName;
 this.zk = ZooKeeperClient.getInstance();
 this.sessionTimeout = zk.getSessionTimeout();
 //----------------------------------------------------
 // 确保队列根目录/QUEUE和当前队列的目录的存在
 //----------------------------------------------------
 ensureExists(QUEUE_ROOT);
 ensureExists(queuePath);
 }
 public byte[] consume() throws InterruptedException, KeeperException, UnsupportedEncodingException {
 List<String> nodes = null;
 byte[] returnVal = null;
 Stat stat = null;
 do {
  synchronized (mutex) {
  nodes = zk.getChildren(queuePath, new ProduceWatcher());
  //----------------------------------------------------
  // 如果没有消息节点,等待生产者的通知
  //----------------------------------------------------
  if (nodes == null || nodes.size() == 0) {
   mutex.wait();
  } else {
   SortedSet<String> sortedNode = new TreeSet<String>();
   for (String node : nodes) {
   sortedNode.add(queuePath + "/" + node);
   }
   //----------------------------------------------------
   // 消费队列里序列号最小的消息
   //----------------------------------------------------
   String first = sortedNode.first();
   returnVal = zk.getData(first, false, stat);
   zk.delete(first, -1);
   System.out.print(Thread.currentThread().getName() + " ");
   System.out.print("consume a message from queue:" + first);
   System.out.println(", message data is: " + new String(returnVal,"UTF-8"));
   return returnVal;
  }
  }
 } while (true);
 }
 class ProduceWatcher implements Watcher {
 @Override
 public void process(WatchedEvent event) {
  //----------------------------------------------------
  // 生产一条消息成功后通知一个等待线程
  //----------------------------------------------------
  synchronized (mutex) {
  mutex.notify();
  }
 }
 }
 public void produce(byte[] data) throws KeeperException, InterruptedException, UnsupportedEncodingException {
 //----------------------------------------------------
 // 确保当前队列目录存在
 // example: /QUEUE/queueName
 //----------------------------------------------------
 ensureExists(queuePath);
 String node = zk.create(queuePath + "/", data,
  ZooDefs.Ids.OPEN_ACL_UNSAFE,
  CreateMode.PERSISTENT_SEQUENTIAL);
 System.out.print(Thread.currentThread().getName() + " ");
 System.out.print("produce a message to queue:" + node);
 System.out.println(" , message data is: " + new String(data,"UTF-8"));
 }
 public void ensureExists(String path) {
 try {
  Stat stat = zk.exists(path, false);
  if (stat == null) {
  zk.create(path, ROOT_QUEUE_DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  }
 } catch (KeeperException e) {
  e.printStackTrace();
 } catch (InterruptedException e) {
  e.printStackTrace();
 }
 }
 public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
 String queueName = "test";
 final ZooKeeperQueue queue = new ZooKeeperQueue(queueName);
 for (int i = 0; i < 10; i++) {
  new Thread(new Runnable() {
  @Override
  public void run() {
   try {
   queue.consume();
   System.out.println("--------------------------------------------------------");
   System.out.println();
   } catch (InterruptedException e) {
   e.printStackTrace();
   } catch (KeeperException e) {
   e.printStackTrace();
   } catch (UnsupportedEncodingException e) {
   e.printStackTrace();
   }
  }
  }).start();
 }
 new Thread(new Runnable() {
  @Override
  public void run() {
  for (int i = 0; i < 10; i++) {
   try {
   Thread.sleep(RandomUtils.nextInt(100 * i, 200 * i));
   queue.produce(("massive" + i).getBytes());
   } catch (InterruptedException e) {
   e.printStackTrace();
   } catch (KeeperException e) {
   e.printStackTrace();
   } catch (UnsupportedEncodingException e) {
   e.printStackTrace();
   }
  }
  }
 },"Produce-thread").start();
 }
}

测试

运行main方法,本机器的某次输出结果

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
Produce-thread produce a message to queue:/QUEUE/test/0000000000 , message data is: massive0
Thread-8 consume a message from queue:/QUEUE/test/0000000000, message data is: massive0
--------------------------------------------------------
Produce-thread produce a message to queue:/QUEUE/test/0000000001 , message data is: massive1
Thread-6 consume a message from queue:/QUEUE/test/0000000001, message data is: massive1
--------------------------------------------------------
Produce-thread produce a message to queue:/QUEUE/test/0000000002 , message data is: massive2
Thread-3 consume a message from queue:/QUEUE/test/0000000002, message data is: massive2
--------------------------------------------------------
Produce-thread produce a message to queue:/QUEUE/test/0000000003 , message data is: massive3
Thread-0 consume a message from queue:/QUEUE/test/0000000003, message data is: massive3
--------------------------------------------------------
Produce-thread produce a message to queue:/QUEUE/test/0000000004 , message data is: massive4
Thread-5 consume a message from queue:/QUEUE/test/0000000004, message data is: massive4
--------------------------------------------------------
Produce-thread produce a message to queue:/QUEUE/test/0000000005 , message data is: massive5
Thread-2 consume a message from queue:/QUEUE/test/0000000005, message data is: massive5
--------------------------------------------------------
Produce-thread produce a message to queue:/QUEUE/test/0000000006 , message data is: massive6
Thread-4 consume a message from queue:/QUEUE/test/0000000006, message data is: massive6
--------------------------------------------------------
Produce-thread produce a message to queue:/QUEUE/test/0000000007 , message data is: massive7
Thread-9 consume a message from queue:/QUEUE/test/0000000007, message data is: massive7
--------------------------------------------------------
Produce-thread produce a message to queue:/QUEUE/test/0000000008 , message data is: massive8
Thread-7 consume a message from queue:/QUEUE/test/0000000008, message data is: massive8
--------------------------------------------------------
Produce-thread produce a message to queue:/QUEUE/test/0000000009 , message data is: massive9
Thread-1 consume a message from queue:/QUEUE/test/0000000009, message data is: massive9

总结

以上就是本文有关于队列和基于ZooKeeper实现队列源码介绍的全部内容,希望对大家有所帮助。

感谢朋友们对本站的支持!

原文链接:https://www.2cto.com/kf/201612/582914.html