最近准备在项目中引入分布式锁,故而研究基于zookeeper的curator框架。
网上资料不多,自己研究其源码发现,这个框架已经帮我做了很多现成的实现。
下面介绍下锁的实现:
通过源码中LockingExample例子作为切入(推荐多利用现有资源,最快切入),为减小篇幅,代码仅保留关键部分。
curator已经为我们准备好了锁的实现 ----InterProcessMutex,基于zookeeper跨jvm的公平互斥锁实现.
-----------------------------------------------------------------------------------------------------------------------------
1.看下锁的定义,将线程对象和锁对象(线程、路径、锁的数量)关联
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap(); private static class LockData
{
final Thread owningThread;
final String lockPath;
final AtomicInteger lockCount = new AtomicInteger(1); private LockData(Thread owningThread, String lockPath)
{
this.owningThread = owningThread;
this.lockPath = lockPath;
}
}
2.获得锁:
- 如果线程已经有锁,则增加锁的数量,返回 String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
- 否则尝试获取锁,获得则加入线程持有锁的MAP,否则返回未获得锁。
private final LockInternals internals; @Override
public boolean acquire(long time, TimeUnit unit) throws Exception
{
return internalLock(time, unit);
}
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
Thread currentThread = Thread.currentThread(); LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
lockData.lockCount.incrementAndGet();
return true;
} String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null )
{
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
3.可以看到,逻辑是基于LockInternals,来看一下他是怎么做到的
- 通过dirver在锁目录下创建EPHEMERAL_SEQUENTIAL节点
- 循环尝试获取,基于监听,wait,notifyAll
- 获取锁目录下子节点的有序集合
- 通过dirver尝试得到PredicateResults(含有是否得到锁及需要监视的目录)
- 更新监听信息,开始下一轮尝试
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{while ( !isDone )
{
isDone = true;
try
{
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
}
}
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
boolean haveTheLock = false;
boolean doDelete = false;
try
{
if ( revocable.get() != null )
{
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
} while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{
List<String> children = getSortedChildren();
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() )
{
haveTheLock = true;
}
else
{
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
System.out.println("ourpath : " + ourPath + " : " + previousSequencePath); synchronized(this)
{
try
{
// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if ( millisToWait != null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 )
{
doDelete = true; // timed out - delete our node
break;
}
wait(millisToWait);
}
else
{
wait();
}
}
catch ( KeeperException.NoNodeException e )
{
}
}
}
}
}
catch ( Exception e )
{
doDelete = true;
throw e;
}
finally
{
if ( doDelete )
{
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
4.分析下前文中的driver实现,即StandardLockInternalsDriver 关键代码
- 创建锁是基于目录下创建的EPHEMERAL_SEQUENTIAL节点,即与客户端生命周期相同,并且名字后自动加创建的序列号
- 得到PredicateResult,如果当前节点为最小节点,则得到锁,getsTheLock为true,否则得到该序列的前一个节点,设为pathToWatch,监控之
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
{
int ourIndex = children.indexOf(sequenceNodeName);
validateOurIndex(sequenceNodeName, ourIndex);
boolean getsTheLock = ourIndex < maxLeases;
String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
return new PredicateResults(pathToWatch, getsTheLock);
} @Override
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
{
String ourPath;
if ( lockNodeBytes != null )
{
ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
}
else
{
ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
return ourPath;
}
先发布,后续有心得再续。