curator 实现分布式一致性锁

时间:2023-02-18 23:23:50

最近准备在项目中引入分布式锁,故而研究基于zookeeper的curator框架。

网上资料不多,自己研究其源码发现,这个框架已经帮我做了很多现成的实现。

下面介绍下锁的实现:

通过源码中LockingExample例子作为切入(推荐多利用现有资源,最快切入),为减小篇幅,代码仅保留关键部分。

curator已经为我们准备好了锁的实现 ----InterProcessMutex,基于zookeeper跨jvm的公平互斥锁实现.

-----------------------------------------------------------------------------------------------------------------------------

1.看下锁的定义,将线程对象和锁对象(线程、路径、锁的数量)关联

    private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();

    private static class LockData
{
final Thread owningThread;
final String lockPath;
final AtomicInteger lockCount = new AtomicInteger(1); private LockData(Thread owningThread, String lockPath)
{
this.owningThread = owningThread;
this.lockPath = lockPath;
}
}

2.获得锁:

  • 如果线程已经有锁,则增加锁的数量,返回  String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
  • 否则尝试获取锁,获得则加入线程持有锁的MAP,否则返回未获得锁。
    private final LockInternals internals;    

    @Override
public boolean acquire(long time, TimeUnit unit) throws Exception
{
return internalLock(time, unit);
}
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
Thread currentThread = Thread.currentThread(); LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
lockData.lockCount.incrementAndGet();
return true;
} String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null )
{
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}

3.可以看到,逻辑是基于LockInternals,来看一下他是怎么做到的

  • 通过dirver在锁目录下创建EPHEMERAL_SEQUENTIAL节点
  • 循环尝试获取,基于监听,wait,notifyAll
  1. 获取锁目录下子节点的有序集合
  2. 通过dirver尝试得到PredicateResults(含有是否得到锁及需要监视的目录)
  3. 更新监听信息,开始下一轮尝试
    String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{while ( !isDone )
{
isDone = true;
try
{
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
}
}
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
boolean haveTheLock = false;
boolean doDelete = false;
try
{
if ( revocable.get() != null )
{
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
} while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{
List<String> children = getSortedChildren();
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() )
{
haveTheLock = true;
}
else
{
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
System.out.println("ourpath : " + ourPath + " : " + previousSequencePath); synchronized(this)
{
try
{
// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if ( millisToWait != null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 )
{
doDelete = true; // timed out - delete our node
break;
}
wait(millisToWait);
}
else
{
wait();
}
}
catch ( KeeperException.NoNodeException e )
{
}
}
}
}
}
catch ( Exception e )
{
doDelete = true;
throw e;
}
finally
{
if ( doDelete )
{
deleteOurPath(ourPath);
}
}
return haveTheLock;

4.分析下前文中的driver实现,即StandardLockInternalsDriver 关键代码

  • 创建锁是基于目录下创建的EPHEMERAL_SEQUENTIAL节点,即与客户端生命周期相同,并且名字后自动加创建的序列号
  • 得到PredicateResult,如果当前节点为最小节点,则得到锁,getsTheLock为true,否则得到该序列的前一个节点,设为pathToWatch,监控之
    @Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
{
int ourIndex = children.indexOf(sequenceNodeName);
validateOurIndex(sequenceNodeName, ourIndex);
boolean getsTheLock = ourIndex < maxLeases;
String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
return new PredicateResults(pathToWatch, getsTheLock);
} @Override
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
{
String ourPath;
if ( lockNodeBytes != null )
{
ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
}
else
{
ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
return ourPath;
}

先发布,后续有心得再续。

curator 实现分布式一致性锁的更多相关文章

  1. Curator Zookeeper分布式锁

    Curator Zookeeper分布式锁 pom.xml中添加如下配置 <!-- https://mvnrepository.com/artifact/org.apache.curator/c ...

  2. 《从Paxos到ZooKeeper 分布式一致性原理与实践》读书笔记

    一.分布式架构 1.分布式特点 分布性 对等性.分布式系统中的所有计算机节点都是对等的 并发性.多个节点并发的操作一些共享的资源 缺乏全局时钟.节点之间通过消息传递进行通信和协调,因为缺乏全局时钟,很 ...

  3. 我读《从Paxos到zookeeper分布式一致性原理与实践》

    从年后拿到这本书开始阅读,到准备系统分析师考试之前,终于读完了一遍,对Zookeeper有了一个全面的认识,整本书从理论到应用再到细节的阐述,内容安排从逻辑性和实用性上都是很优秀的,对全面认识Zook ...

  4. 分布式一致性算法--Paxos

    Paxos算法是莱斯利·兰伯特(Leslie Lamport)1990年提出的一种基于消息传递的一致性算法.Paxos算法解决的问题是一个分布式系统如何就某个值(决议)达成一致.在工程实践意义上来说, ...

  5. 《从Paxos到ZooKeeper分布式一致性原理与实践》学习笔记

    第一章 分布式架构 1.1 从集中式到分布式 集中式的特点: 部署结构简单(因为基于底层性能卓越的大型主机,不需考虑对服务多个节点的部署,也就不用考虑多个节点之间分布式协调问题) 分布式系统是一个硬件 ...

  6. 分布式一致性算法2PC和3PC

    为了解决分布式一致性问题,产生了不少经典的分布式一致性算法,本文将介绍其中的2PC和3PC.2PC即Two-Phase Commit,译为二阶段提交协议.3PC即Three-Phase Commit, ...

  7. 分布式一致性算法——paxos

    一.什么是paxos算法 Paxos 算法是分布式一致性算法用来解决一个分布式系统如何就某个值(决议)达成一致的问题. 人们在理解paxos算法是会遇到一些困境,那么接下来,我们带着以下几个问题来学习 ...

  8. 基于Zookeeper实现的分布式互斥锁 - InterProcessMutex

    Curator是ZooKeeper的一个客户端框架,其中封装了分布式互斥锁的实现,最为常用的是InterProcessMutex,本文将对其进行代码剖析 简介 InterProcessMutex基于Z ...

  9. 搞懂分布式技术2:分布式一致性协议与Paxos,Raft算法

    搞懂分布式技术2:分布式一致性协议与Paxos,Raft算法 2PC 由于BASE理论需要在一致性和可用性方面做出权衡,因此涌现了很多关于一致性的算法和协议.其中比较著名的有二阶提交协议(2 Phas ...

随机推荐

  1. c&num; 读取mck码

    private List<string> GetMac() { List<string> macs = new List<string>(); try { var ...

  2. 一例完整的websocket实现群聊demo

    前言 业余我都会花一些时间在tcp.http和websocket等领域的学习,现在觉得有点收获,所以把一个基于websocket的群聊功能的例子提供给大家玩玩.当然这是一个很完整的例子,包括webso ...

  3. 九度OJ 1505 两个链表的第一个公共结点 【数据结构】

    题目地址:http://ac.jobdu.com/problem.php?pid=1505 题目描述: 输入两个链表,找出它们的第一个公共结点. 输入: 输入可能包含多个测试样例. 对于每个测试案例, ...

  4. 利用netstat和tasklist查看PC的端口占用情况

    经常,我们在启动应用的时候发现系统需要的端口被别的程序占用,如何知道谁占有了我们需要的端口? 1.Windows平台在windows命令行窗口下执行: E:\oracle\ora92\bin>n ...

  5. 初码-Azure系列-如何在控制面板中选择中文版操作系统

    之前在文章<初码-Azure系列-记一次从阿里云到Azure的迁移和部署>中说到,默认的Windows Server 2016操作系统是英文版,后来摸索出中文版的方法,如下:

  6. HTTP请求的header头解析

    Request Headers: 下图是我访问一个URL:http://www.hzau.edu.cn的一个header,根据具体实例来分析一下各部分的功能及其作用. Accept 作用: 浏览器端可 ...

  7. Android为TV端助力之解决setOnItemSelectedListener一进来就自动执行一次的问题

    我们经常会遇到listview或者其他view设置setOnItemSelectedListener监听时,一加载界面,setOnItemSelectedListener监听就会自动执行一遍,导致你第 ...

  8. MT【47】求一道分式的最值

    评:技巧性很大,需要敏锐的洞察力通过柯西不等式把分母变成一样.请记住这个变形$$(a+b+ab+1)=(a+1)(b+1)\le\sqrt{(a^2+1)(b^2+1)}$$

  9. Total Difference String

    Total Difference Strings 给一个string列表,判断有多少个不同的string,返回个数相同的定义:字符串长度相等并从左到右,或从右往左是同样的字符 abc 和 cba 为视 ...

  10. 【C&plus;&plus;】STL之队列queue

    1.头文件 # include<queue> 2.成员函数 empty() 当队列为空时,返回true size() 返回队列内元素个数 front() 返回队首元素 back() 返回队 ...