HikariCP 源码分析 -- ConcurrentBag

以前无意间搜资料了解到 HikariCP,一下子就被它的简洁代码和卓越性能吸引住了。以前也有翻过它的代码,但是不是很系统,最近再次翻阅,正好做些笔记,方便以后学习。

D瓜哥最近在学习 Java 并发知识。那就从 HikariCP 自定义的并发集合 ConcurrentBag 开始学习。

在 HikariCP 的 Wiki 中,有 Down the Rabbit Hole · ConcurrentBag 的章节来专门介绍 ConcurrentBag

ConcurrentBag 的灵感借鉴自 C# .NET 的 ConcurrentBag 类。但是实现却是完全不同的。这里的 ConcurrentBag 有如下特性:

  • A lock-free design

  • ThreadLocal caching

  • Queue-stealing

  • Direct hand-off optimizations

下面,通过代码来对此做个说明。

ConcurrentBag 类的定义中,声明了集合元素必须是 IConcurrentBagEntry 的子类。先来看看这个接口的定义:

public interface IConcurrentBagEntry
{
    int STATE_NOT_IN_USE = 0;
    int STATE_IN_USE = 1;
    int STATE_REMOVED = -1;
    int STATE_RESERVED = -2;

    boolean compareAndSet(int expectState, int newState);
    void setState(int newState);
    int getState();
}

接下来,看一下成员变量:

// 存放共享元素
private final CopyOnWriteArrayList<T> sharedList;
private final boolean weakThreadLocals;

// 在 ThreadLocal 缓存线程本地元素,避免线程争用
private final ThreadLocal<List<Object>> threadList;
private final IBagStateListener listener;
//
private final AtomicInteger waiters;
private volatile boolean closed;

// 接力队列
private final SynchronousQueue<T> handoffQueue;

ConcurrentBag 开头的 JavaDoc 中就做了明确说明:

Note that items that are "borrowed" from the bag are not actually removed from any collection, so garbage collection will not occur even if the reference is abandoned. Thus care must be taken to "requite" borrowed objects otherwise a memory leak will result. Only the "remove" method can completely remove an object from the bag.

翻译一下就是:注意,从 ConcurrentBag 中“借用”(borrow)对象,实际上并未从任何集合中删除(只是将其状态设置为 STATE_IN_USE),因此即使删除引用也不会进行垃圾收集。因此必须注意归还(requite)借用的对象(将元素状态设置为 STATE_NOT_IN_USE),否则将导致内存泄漏。 只有“删除”(remove)方法才能从袋子中完全删除一个对象。具体看代码:

/**
* The method will borrow a BagEntry from the bag, blocking for the
* specified timeout if none are available.
*
* @param timeout how long to wait before giving up, in units of unit
* @param timeUnit a <code>TimeUnit</code> determining how to interpret the timeout parameter
* @return a borrowed instance from the bag or null if a timeout occurs
* @throws InterruptedException if interrupted while waiting
*/
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
{
    // 1. 尝试从 ThreadLocal 中查找目标值
    // Try the thread-local list first
    final List<Object> list = threadList.get();
    for (int i = list.size() - 1; i >= 0; i--) {
        final Object entry = list.remove(i);
        @SuppressWarnings("unchecked")
        final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
        if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
            return bagEntry;
        }
    }

    // Otherwise, scan the shared list ... then poll the handoff queue
    final int waiting = waiters.incrementAndGet();
    try {
        // 2. 如果 ThreadLocal 中没有目标元素:没有元素 或者 修改元素状态失败,则从 `sharedList` 中获取目标元素。
        //    这里可以看出,只是将目标元素的状态从 `STATE_NOT_IN_USE` 修改为 `STATE_IN_USE`,并没有删除。
        //    换句话说,在 `sharedList` 变量中,保存着集合中所有的元素。
        for (T bagEntry : sharedList) {
        if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
            // If we may have stolen another waiter's connection, request another bag add.
            if (waiting > 1) {
                listener.addBagItem(waiting - 1);
            }
            return bagEntry;
        }
        }

        listener.addBagItem(waiting);

        // 3. 如果 `sharedList` 也没有目标元素,则在接力队列 handoffQueue 中获取,直到超时
        timeout = timeUnit.toNanos(timeout);
        do {
        final long start = currentTime();
        final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
        if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
            return bagEntry;
        }

        timeout -= elapsedNanos(start);
        } while (timeout > 10_000);

        return null;
    }
    finally {
        waiters.decrementAndGet();
    }
}

/**
* This method will return a borrowed object to the bag.  Objects
* that are borrowed from the bag but never "requited" will result
* in a memory leak.
*
* @param bagEntry the value to return to the bag
* @throws NullPointerException if value is null
* @throws IllegalStateException if the bagEntry was not borrowed from the bag
*/
public void requite(final T bagEntry)
{
    // 将归还元素的状态设置成 `STATE_NOT_IN_USE`
    bagEntry.setState(STATE_NOT_IN_USE);

    // 如果等待大于零,则先尝试将元素交给接力队列 handoffQueue,这样更快地交给消费方。
    for (int i = 0; waiters.get() > 0; i++) {
        if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
            return;
        }
        else if ((i & 0xff) == 0xff) {
            parkNanos(MICROSECONDS.toNanos(10));
        }
        else {
            Thread.yield();
        }
    }

    // 如果没有等待,则将元素放入到 ThreadLocal 中,方便方便下次使用。
    final List<Object> threadLocalList = threadList.get();
    if (threadLocalList.size() < 50) {
        threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
    }
}

集合元素的添加和删除是通过 addremove 方法来实现的。代码如下:

/**
* Add a new object to the bag for others to borrow.
*
* @param bagEntry an object to add to the bag
*/
public void add(final T bagEntry)
{
    if (closed) {
        LOGGER.info("ConcurrentBag has been closed, ignoring add()");
        throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
    }

    // 从这里可以看出,添加的元素都会添加到 sharedList 变量中。
    sharedList.add(bagEntry);

    // spin until a thread takes it or none are waiting
    while (waiters.get() > 0 && bagEntry.getState() == STATE_NOT_IN_USE && !handoffQueue.offer(bagEntry)) {
        Thread.yield();
    }
}

/**
* Remove a value from the bag.  This method should only be called
* with objects obtained by <code>borrow(long, TimeUnit)</code> or <code>reserve(T)</code>
*
* @param bagEntry the value to remove
* @return true if the entry was removed, false otherwise
* @throws IllegalStateException if an attempt is made to remove an object
*         from the bag that was not borrowed or reserved first
*/
public boolean remove(final T bagEntry)
{
    // 删除元素之前,需要确保可以将状态设置为 STATE_REMOVED
    if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
        LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);
        return false;
    }

    // 从 sharedList 删除元素
    final boolean removed = sharedList.remove(bagEntry);
    if (!removed && !closed) {
        LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
    }

    // 从 ThreadLocal 中也要删除。
    // 在上面 borrow 方法借用元素时,从 ThreadLocal 中获得的元素要从本地 List 中删除的。
    // 这样就不需要但是因为 ThreadLocal 中的元素没有删除导致的内存泄露问题了。
    threadList.get().remove(bagEntry);

    return removed;
}

D瓜哥这里有一个疑问:只处理了状态是 STATE_IN_USESTATE_RESERVED 的元素。那么,状态是 STATE_NOT_IN_USE 的元素,为什么不能删除?