分布式锁之 Apache Curator InterProcessReadWriteLock

分布式锁之 Apache Curator InterProcessReadWriteLock

在上一篇文章 分布式锁之 Apache Curator InterProcessMutex 中介绍了基于 ZooKeeper 实现的互斥锁。除此之外,还可以实现读写锁。这篇文章就来简要介绍一下 InterProcessReadWriteLock 的实现原理。

老规矩,先看看类的注释:

/**
 * <p>
 *    A re-entrant read/write mutex that works across JVMs. Uses Zookeeper to hold the lock. All processes
 *    in all JVMs that use the same lock path will achieve an inter-process critical section. Further, this mutex is
 *    "fair" - each user will get the mutex in the order requested (from ZK's point of view).
 * </p>
 *
 * <p>
 *    A read write lock maintains a pair of associated locks, one for read-only operations and one
 *    for writing. The read lock may be held simultaneously by multiple reader processes, so long as
 *    there are no writers. The write lock is exclusive.
 * </p>
 *
 * <p>
 *    <b>Reentrancy</b><br>
 *    This lock allows both readers and writers to reacquire read or write locks in the style of a
 *    re-entrant lock. Non-re-entrant readers are not allowed until all write locks held by the
 *    writing thread/process have been released. Additionally, a writer can acquire the read lock, but not
 *    vice-versa. If a reader tries to acquire the write lock it will never succeed.<br><br>
 *
 *    <b>Lock downgrading</b><br>
 *    Re-entrancy also allows downgrading from the write lock to a read lock, by acquiring the write
 *    lock, then the read lock and then releasing the write lock. However, upgrading from a read
 *    lock to the write lock is not possible.
 * </p>
 */
public class InterProcessReadWriteLock
{

这个注释说明了几个重点:

  1. 这是一个支持重入的,跨 JVM 的读写锁;

  2. 读锁共享,写锁排他;

  3. 支持重入;

  4. 锁可以降级,从写锁降级为读锁;但是不能升级。

下面,我们来看一下构造函数以及实例变量:

  private final InterProcessMutex readMutex;
  private final InterProcessMutex writeMutex;

  // must be the same length. LockInternals depends on it
  private static final String READ_LOCK_NAME  = "READ";
  private static final String WRITE_LOCK_NAME = "WRIT";

  private static class SortingLockInternalsDriver extends StandardLockInternalsDriver
  {
      @Override
      public final String fixForSorting(String str, String lockName)
      {
          str = super.fixForSorting(str, READ_LOCK_NAME);
          str = super.fixForSorting(str, WRITE_LOCK_NAME);
          return str;
      }
  }

  private static class InternalInterProcessMutex extends InterProcessMutex
  {
      private final String lockName;
      private final byte[] lockData;

      InternalInterProcessMutex(CuratorFramework client, String path, String lockName, byte[] lockData, int maxLeases, LockInternalsDriver driver)
      {
          super(client, path, lockName, maxLeases, driver);
          this.lockName = lockName;
          this.lockData = lockData;
      }

      @Override
      public Collection<String> getParticipantNodes() throws Exception
      {
          Collection<String>  nodes = super.getParticipantNodes();
          Iterable<String>    filtered = Iterables.filter
          (
              nodes,
              new Predicate<String>()
              {
                  @Override
                  public boolean apply(String node)
                  {
                      return node.contains(lockName);
                  }
              }
          );
          return ImmutableList.copyOf(filtered);
      }

      @Override
      protected byte[] getLockNodeBytes()
      {
          return lockData;
      }
  }

/
  * @param client the client
  * @param basePath path to use for locking
  */
  public InterProcessReadWriteLock(CuratorFramework client, String basePath)
  {
      this(client, basePath, null);
  }

/
  * @param client the client
  * @param basePath path to use for locking
  * @param lockData the data to store in the lock nodes
  */
  public InterProcessReadWriteLock(CuratorFramework client, String basePath, byte[] lockData)
  {
      lockData = (lockData == null) ? null : Arrays.copyOf(lockData, lockData.length);

      writeMutex = new InternalInterProcessMutex
      (
          client,
          basePath,
          WRITE_LOCK_NAME,
          lockData,
          1,
          new SortingLockInternalsDriver()
          {
              @Override
              public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
              {
                  return super.getsTheLock(client, children, sequenceNodeName, maxLeases);
              }
          }
      );

      readMutex = new InternalInterProcessMutex
      (
          client,
          basePath,
          READ_LOCK_NAME,
          lockData,
          Integer.MAX_VALUE,
          new SortingLockInternalsDriver()
          {
              @Override
              public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
              {
                  return readLockPredicate(children, sequenceNodeName);
              }
          }
      );
  }

从这里可以看出:

  1. 读写锁还是使用 InterProcessMutex 来实现的,具体实现可以看上一篇文章 分布式锁之 Apache Curator InterProcessMutex

  2. 使用名称来区分读写锁: READ_LOCK_NAMEWRITE_LOCK_NAME

  3. 通过 SortingLockInternalsDriver 重写 StandardLockInternalsDriverfixForSorting 方法,来达到将锁的名称前缀去掉,实现锁的排序功能。

  4. 通过 InternalInterProcessMutex 重写 InterProcessMutexgetParticipantNodes 方法,达到分别获取读锁列表和写锁列表的功能。

  5. 有一点特别关键:在构造函数中创建 writeMutexreadMutex 时:

    1. 写锁 writeMutexmaxLeases 参数值为 1,表示排他锁,同一时间只有一个线程可以获得写锁;

    2. 读锁 readMutexmaxLeases 参数值为 Integer.MAX_VALUE,表示共享锁,所有线程都可以获得读锁。

  6. 最后一点也很关键:读锁 readMutex 通过重写 SortingLockInternalsDrivergetsTheLock 方法,来达到可以让所有线程可以获得读锁。它调用了 readLockPredicate 方法,我们接下来看一下这个方法:

private PredicateResults readLockPredicate(List<String> children, String sequenceNodeName) throws Exception
{
    if ( writeMutex.isOwnedByCurrentThread() )
    {
        return new PredicateResults(null, true);
    }

    int         index = 0;
    int         firstWriteIndex = Integer.MAX_VALUE;
    int         ourIndex = -1;
    for ( String node : children )
    {
        if ( node.contains(WRITE_LOCK_NAME) )
        {
            firstWriteIndex = Math.min(index, firstWriteIndex);
        }
        else if ( node.startsWith(sequenceNodeName) )
        {
            ourIndex = index;
            break;
        }

        ++index;
    }

    StandardLockInternalsDriver.validateOurIndex(sequenceNodeName, ourIndex);

    boolean     getsTheLock = (ourIndex < firstWriteIndex);
    String      pathToWatch = getsTheLock ? null : children.get(firstWriteIndex);
    return new PredicateResults(pathToWatch, getsTheLock);
}

这个方法的业务很清楚:

  1. 如果是一个线程获得了写锁,那么它就自动获得了读锁;

  2. 在排序集合中,找到自己的 index 和第一个写锁的 index:

  3. 如果自身的 index 小于第一个写锁的 index,则读锁在前,可以获得锁;否则,被写锁阻塞,同时监听第一个写锁节点的状态,等待被唤醒。

总结

InterProcessReadWriteLock 是基于 InterProcessMutex 来实现读写锁的。所以,要理解 InterProcessReadWriteLock 需要先搞懂 InterProcessMutex 的原理。

基于 InterProcessMutex 来实现读写锁,还可以避免不必要的重复代码,提高代码的重用性,更有利于后期维护。

不知道能否基于 Redis 来实现读写锁?有时间再研究研究。