ZooKeeper如何完成分布式锁?

时间:2023-03-09 06:38:22
ZooKeeper如何完成分布式锁?

* 面试答案为LZ所写,如需转载请注明出处,谢谢。

1.最基本的思路:

  将<local_ip>:<task_id>存在某个路径节点里。

  刚开始并没有这个节点,当有executor执行操作时,都会询问这个节点。

  如果不存在,则创建这个临时节点,并将<local_ip>:<task_id>写进去。

  如果存在,则比对<local_ip>:<task_id>是否与这个临时节点中的数据<local_ip>:<task_id>一致,若一致,则允许运行,若不一致,则阻塞。

总结一下,如何识别唯一的一个executor? 通过<local_ip>:<task_id>来识别。那如何进行每个executor之间的互斥?那就是通过写入临时节点上的数据的方式来允许执行。

也就是说,某个指定临时节点上的数据<local_ip>:<task_id>是允许哪个executor运行的唯一标志。

2.方法

核心的三个方法为connect、lock、unlock方法。

1)connect方法

思路:

STEP1:进行ZK连接,如连接不上则重试

STEP2:检查是否存在该临时节点,如不存在则创建,以<local_ip>:<task_id>为数据内容。

public void connect() {
try {
       // STEP1:进行ZK连接,如连接不上则重试
zk = new ZooKeeper("node1:2181,node3:2181", 3000, new Watcher() {
// 监控zk状态
@Override
public void process(WatchedEvent event) {
// System.out.println("event:" + event.getType());
}
});
// 连接重试
while (zk.getState() != ZooKeeper.States.CONNECTED) {
Thread.sleep(1000);
}
       // STEP2: STEP2:检查是否存在该临时节点,如不存在则创建,以<local_ip>:<task_id>为数据内容。
// <ip>:<taskID> 格式作为识别格式
InetAddress ip = InetAddress.getLocalHost();
lockData = ip.getHostAddress() + ":" + context.getThisTaskId();
// 如果没有节点
if (zk.exists(zk_path, false) == null) {
zk.create(zk_path, lockData.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
} catch (Exception e) {
       this.unlock();
e.printStackTrace();
}
}

2).lock方法

思路:

STEP1:获得节点数据,如果节点不存在,则使用connect()方法重新连接创建节点。

STEP2:比对节点数据与本地的<local_ip>:<task_id>,如果一致,则返回方法,继续执行业务内容。如果不一致,则阻塞,每隔一段时间检查一次。

public void lock() {
// 互斥逻辑
String nodeData;
try {
nodeData = new String(zk.getData(zk_path, false, null));
while (!lockData.equals(nodeData) || nodeData==null) {
// 若节点不存在,重新连接
          if(nodeData == null){
connect();
}else{
            Thread.sleep(10);
}
}
} catch (Exception e) {
}
}

3).unlock方法

思路:

因为这个临时节点是依赖连接存在的,所以unlock方法直接断开连接就可以释放临时节点。

    public void unlock() {
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}