分布式锁之 Apache Curator InterProcessReadWriteLock
在上一篇文章 分布式锁之 Apache Curator InterProcessMutex 中介绍了基于 ZooKeeper 实现的互斥锁。除此之外,还可以实现读写锁。这篇文章就来简要介绍一下 InterProcessReadWriteLock
的实现原理。
老规矩,先看看类的注释:
/**
* <p>
* A re-entrant read/write mutex that works across JVMs. Uses Zookeeper to hold the lock. All processes
* in all JVMs that use the same lock path will achieve an inter-process critical section. Further, this mutex is
* "fair" - each user will get the mutex in the order requested (from ZK's point of view).
* </p>
*
* <p>
* A read write lock maintains a pair of associated locks, one for read-only operations and one
* for writing. The read lock may be held simultaneously by multiple reader processes, so long as
* there are no writers. The write lock is exclusive.
* </p>
*
* <p>
* <b>Reentrancy</b><br>
* This lock allows both readers and writers to reacquire read or write locks in the style of a
* re-entrant lock. Non-re-entrant readers are not allowed until all write locks held by the
* writing thread/process have been released. Additionally, a writer can acquire the read lock, but not
* vice-versa. If a reader tries to acquire the write lock it will never succeed.<br><br>
*
* <b>Lock downgrading</b><br>
* Re-entrancy also allows downgrading from the write lock to a read lock, by acquiring the write
* lock, then the read lock and then releasing the write lock. However, upgrading from a read
* lock to the write lock is not possible.
* </p>
*/
public class InterProcessReadWriteLock
{
这个注释说明了几个重点:
这是一个支持重入的,跨 JVM 的读写锁;
读锁共享,写锁排他;
支持重入;
锁可以降级,从写锁降级为读锁;但是不能升级。
下面,我们来看一下构造函数以及实例变量:
private final InterProcessMutex readMutex;
private final InterProcessMutex writeMutex;
// must be the same length. LockInternals depends on it
private static final String READ_LOCK_NAME = "__READ__";
private static final String WRITE_LOCK_NAME = "__WRIT__";
private static class SortingLockInternalsDriver extends StandardLockInternalsDriver
{
@Override
public final String fixForSorting(String str, String lockName)
{
str = super.fixForSorting(str, READ_LOCK_NAME);
str = super.fixForSorting(str, WRITE_LOCK_NAME);
return str;
}
}
private static class InternalInterProcessMutex extends InterProcessMutex
{
private final String lockName;
private final byte[] lockData;
InternalInterProcessMutex(CuratorFramework client, String path, String lockName, byte[] lockData, int maxLeases, LockInternalsDriver driver)
{
super(client, path, lockName, maxLeases, driver);
this.lockName = lockName;
this.lockData = lockData;
}
@Override
public Collection<String> getParticipantNodes() throws Exception
{
Collection<String> nodes = super.getParticipantNodes();
Iterable<String> filtered = Iterables.filter
(
nodes,
new Predicate<String>()
{
@Override
public boolean apply(String node)
{
return node.contains(lockName);
}
}
);
return ImmutableList.copyOf(filtered);
}
@Override
protected byte[] getLockNodeBytes()
{
return lockData;
}
}
/**
* @param client the client
* @param basePath path to use for locking
*/
public InterProcessReadWriteLock(CuratorFramework client, String basePath)
{
this(client, basePath, null);
}
/**
* @param client the client
* @param basePath path to use for locking
* @param lockData the data to store in the lock nodes
*/
public InterProcessReadWriteLock(CuratorFramework client, String basePath, byte[] lockData)
{
lockData = (lockData == null) ? null : Arrays.copyOf(lockData, lockData.length);
writeMutex = new InternalInterProcessMutex
(
client,
basePath,
WRITE_LOCK_NAME,
lockData,
1,
new SortingLockInternalsDriver()
{
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
{
return super.getsTheLock(client, children, sequenceNodeName, maxLeases);
}
}
);
readMutex = new InternalInterProcessMutex
(
client,
basePath,
READ_LOCK_NAME,
lockData,
Integer.MAX_VALUE,
new SortingLockInternalsDriver()
{
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
{
return readLockPredicate(children, sequenceNodeName);
}
}
);
}
从这里可以看出:
读写锁还是使用
InterProcessMutex
来实现的,具体实现可以看上一篇文章 分布式锁之 Apache Curator InterProcessMutex。使用名称来区分读写锁:
READ_LOCK_NAME
和WRITE_LOCK_NAME
。通过
SortingLockInternalsDriver
重写StandardLockInternalsDriver
的fixForSorting
方法,来达到将锁的名称前缀去掉,实现锁的排序功能。通过
InternalInterProcessMutex
重写InterProcessMutex
的getParticipantNodes
方法,达到分别获取读锁列表和写锁列表的功能。有一点特别关键:在构造函数中创建
writeMutex
和readMutex
时:写锁
writeMutex
的maxLeases
参数值为1
,表示排他锁,同一时间只有一个线程可以获得写锁;读锁
readMutex
的maxLeases
参数值为Integer.MAX_VALUE
,表示共享锁,所有线程都可以获得读锁。
最后一点也很关键:读锁
readMutex
通过重写SortingLockInternalsDriver
的getsTheLock
方法,来达到可以让所有线程可以获得读锁。它调用了readLockPredicate
方法,我们接下来看一下这个方法:
private PredicateResults readLockPredicate(List<String> children, String sequenceNodeName) throws Exception
{
if ( writeMutex.isOwnedByCurrentThread() )
{
return new PredicateResults(null, true);
}
int index = 0;
int firstWriteIndex = Integer.MAX_VALUE;
int ourIndex = -1;
for ( String node : children )
{
if ( node.contains(WRITE_LOCK_NAME) )
{
firstWriteIndex = Math.min(index, firstWriteIndex);
}
else if ( node.startsWith(sequenceNodeName) )
{
ourIndex = index;
break;
}
++index;
}
StandardLockInternalsDriver.validateOurIndex(sequenceNodeName, ourIndex);
boolean getsTheLock = (ourIndex < firstWriteIndex);
String pathToWatch = getsTheLock ? null : children.get(firstWriteIndex);
return new PredicateResults(pathToWatch, getsTheLock);
}
这个方法的业务很清楚:
如果是一个线程获得了写锁,那么它就自动获得了读锁;
在排序集合中,找到自己的
index
和第一个写锁的index
:如果自身的
index
小于第一个写锁的index
,则读锁在前,可以获得锁;否则,被写锁阻塞,同时监听第一个写锁节点的状态,等待被唤醒。
总结
InterProcessReadWriteLock
是基于 InterProcessMutex
来实现读写锁的。所以,要理解 InterProcessReadWriteLock
需要先搞懂 InterProcessMutex
的原理。
基于 InterProcessMutex
来实现读写锁,还可以避免不必要的重复代码,提高代码的重用性,更有利于后期维护。
不知道能否基于 Redis 来实现读写锁?有时间再研究研究。