* 面试答案为LZ所写,如需转载请注明出处,谢谢。
1.最基本的思路:
将<local_ip>:<task_id>存在某个路径节点里。
刚开始并没有这个节点,当有executor执行操作时,都会询问这个节点。
如果不存在,则创建这个临时节点,并将<local_ip>:<task_id>写进去。
如果存在,则比对<local_ip>:<task_id>是否与这个临时节点中的数据<local_ip>:<task_id>一致,若一致,则允许运行,若不一致,则阻塞。
总结一下,如何识别唯一的一个executor? 通过<local_ip>:<task_id>来识别。那如何进行每个executor之间的互斥?那就是通过写入临时节点上的数据的方式来允许执行。
也就是说,某个指定临时节点上的数据<local_ip>:<task_id>是允许哪个executor运行的唯一标志。
2.方法
核心的三个方法为connect、lock、unlock方法。
1)connect方法
思路:
STEP1:进行ZK连接,如连接不上则重试
STEP2:检查是否存在该临时节点,如不存在则创建,以<local_ip>:<task_id>为数据内容。
public void connect() {
try {
// STEP1:进行ZK连接,如连接不上则重试
zk = new ZooKeeper("node1:2181,node3:2181", 3000, new Watcher() {
// 监控zk状态
@Override
public void process(WatchedEvent event) {
// System.out.println("event:" + event.getType());
}
});
// 连接重试
while (zk.getState() != ZooKeeper.States.CONNECTED) {
Thread.sleep(1000);
}
// STEP2: STEP2:检查是否存在该临时节点,如不存在则创建,以<local_ip>:<task_id>为数据内容。
// <ip>:<taskID> 格式作为识别格式
InetAddress ip = InetAddress.getLocalHost();
lockData = ip.getHostAddress() + ":" + context.getThisTaskId();
// 如果没有节点
if (zk.exists(zk_path, false) == null) {
zk.create(zk_path, lockData.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
} catch (Exception e) {
this.unlock();
e.printStackTrace();
}
}
2).lock方法
思路:
STEP1:获得节点数据,如果节点不存在,则使用connect()方法重新连接创建节点。
STEP2:比对节点数据与本地的<local_ip>:<task_id>,如果一致,则返回方法,继续执行业务内容。如果不一致,则阻塞,每隔一段时间检查一次。
public void lock() {
// 互斥逻辑
String nodeData;
try {
nodeData = new String(zk.getData(zk_path, false, null));
while (!lockData.equals(nodeData) || nodeData==null) {
// 若节点不存在,重新连接
if(nodeData == null){
connect();
}else{
Thread.sleep(10);
}
}
} catch (Exception e) {
}
}
3).unlock方法
思路:
因为这个临时节点是依赖连接存在的,所以unlock方法直接断开连接就可以释放临时节点。
public void unlock() {
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}