HikariCP 源码分析 -- ConcurrentBag
以前无意间搜资料了解到 HikariCP,一下子就被它的简洁代码和卓越性能吸引住了。以前也有翻过它的代码,但是不是很系统,最近再次翻阅,正好做些笔记,方便以后学习。
D瓜哥最近在学习 Java 并发知识。那就从 HikariCP 自定义的并发集合 ConcurrentBag
开始学习。
在 HikariCP 的 Wiki 中,有 Down the Rabbit Hole · ConcurrentBag 的章节来专门介绍 ConcurrentBag
:
ConcurrentBag
的灵感借鉴自 C# .NET 的 ConcurrentBag
类。但是实现却是完全不同的。这里的 ConcurrentBag
有如下特性:
A lock-free design
ThreadLocal caching
Queue-stealing
Direct hand-off optimizations
下面,通过代码来对此做个说明。
在 ConcurrentBag
类的定义中,声明了集合元素必须是 IConcurrentBagEntry
的子类。先来看看这个接口的定义:
public interface IConcurrentBagEntry
{
int STATE_NOT_IN_USE = 0;
int STATE_IN_USE = 1;
int STATE_REMOVED = -1;
int STATE_RESERVED = -2;
boolean compareAndSet(int expectState, int newState);
void setState(int newState);
int getState();
}
接下来,看一下成员变量:
// 存放共享元素
private final CopyOnWriteArrayList<T> sharedList;
private final boolean weakThreadLocals;
// 在 ThreadLocal 缓存线程本地元素,避免线程争用
private final ThreadLocal<List<Object>> threadList;
private final IBagStateListener listener;
//
private final AtomicInteger waiters;
private volatile boolean closed;
// 接力队列
private final SynchronousQueue<T> handoffQueue;
在 ConcurrentBag
开头的 JavaDoc 中就做了明确说明:
翻译一下就是:注意,从 ConcurrentBag
中“借用”(borrow
)对象,实际上并未从任何集合中删除(只是将其状态设置为 STATE_IN_USE
),因此即使删除引用也不会进行垃圾收集。因此必须注意归还(requite
)借用的对象(将元素状态设置为 STATE_NOT_IN_USE
),否则将导致内存泄漏。 只有“删除”(remove
)方法才能从袋子中完全删除一个对象。具体看代码:
/**
* The method will borrow a BagEntry from the bag, blocking for the
* specified timeout if none are available.
*
* @param timeout how long to wait before giving up, in units of unit
* @param timeUnit a <code>TimeUnit</code> determining how to interpret the timeout parameter
* @return a borrowed instance from the bag or null if a timeout occurs
* @throws InterruptedException if interrupted while waiting
*/
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
{
// 1. 尝试从 ThreadLocal 中查找目标值
// Try the thread-local list first
final List<Object> list = threadList.get();
for (int i = list.size() - 1; i >= 0; i--) {
final Object entry = list.remove(i);
@SuppressWarnings("unchecked")
final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
}
// Otherwise, scan the shared list ... then poll the handoff queue
final int waiting = waiters.incrementAndGet();
try {
// 2. 如果 ThreadLocal 中没有目标元素:没有元素 或者 修改元素状态失败,则从 `sharedList` 中获取目标元素。
// 这里可以看出,只是将目标元素的状态从 `STATE_NOT_IN_USE` 修改为 `STATE_IN_USE`,并没有删除。
// 换句话说,在 `sharedList` 变量中,保存着集合中所有的元素。
for (T bagEntry : sharedList) {
if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
// If we may have stolen another waiter's connection, request another bag add.
if (waiting > 1) {
listener.addBagItem(waiting - 1);
}
return bagEntry;
}
}
listener.addBagItem(waiting);
// 3. 如果 `sharedList` 也没有目标元素,则在接力队列 handoffQueue 中获取,直到超时
timeout = timeUnit.toNanos(timeout);
do {
final long start = currentTime();
final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
timeout -= elapsedNanos(start);
} while (timeout > 10_000);
return null;
}
finally {
waiters.decrementAndGet();
}
}
/**
* This method will return a borrowed object to the bag. Objects
* that are borrowed from the bag but never "requited" will result
* in a memory leak.
*
* @param bagEntry the value to return to the bag
* @throws NullPointerException if value is null
* @throws IllegalStateException if the bagEntry was not borrowed from the bag
*/
public void requite(final T bagEntry)
{
// 将归还元素的状态设置成 `STATE_NOT_IN_USE`
bagEntry.setState(STATE_NOT_IN_USE);
// 如果等待大于零,则先尝试将元素交给接力队列 handoffQueue,这样更快地交给消费方。
for (int i = 0; waiters.get() > 0; i++) {
if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
return;
}
else if ((i & 0xff) == 0xff) {
parkNanos(MICROSECONDS.toNanos(10));
}
else {
Thread.yield();
}
}
// 如果没有等待,则将元素放入到 ThreadLocal 中,方便方便下次使用。
final List<Object> threadLocalList = threadList.get();
if (threadLocalList.size() < 50) {
threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
}
}
集合元素的添加和删除是通过 add
和 remove
方法来实现的。代码如下:
/**
* Add a new object to the bag for others to borrow.
*
* @param bagEntry an object to add to the bag
*/
public void add(final T bagEntry)
{
if (closed) {
LOGGER.info("ConcurrentBag has been closed, ignoring add()");
throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
}
// 从这里可以看出,添加的元素都会添加到 sharedList 变量中。
sharedList.add(bagEntry);
// spin until a thread takes it or none are waiting
while (waiters.get() > 0 && bagEntry.getState() == STATE_NOT_IN_USE && !handoffQueue.offer(bagEntry)) {
Thread.yield();
}
}
/**
* Remove a value from the bag. This method should only be called
* with objects obtained by <code>borrow(long, TimeUnit)</code> or <code>reserve(T)</code>
*
* @param bagEntry the value to remove
* @return true if the entry was removed, false otherwise
* @throws IllegalStateException if an attempt is made to remove an object
* from the bag that was not borrowed or reserved first
*/
public boolean remove(final T bagEntry)
{
// 删除元素之前,需要确保可以将状态设置为 STATE_REMOVED
if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);
return false;
}
// 从 sharedList 删除元素
final boolean removed = sharedList.remove(bagEntry);
if (!removed && !closed) {
LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
}
// 从 ThreadLocal 中也要删除。
// 在上面 borrow 方法借用元素时,从 ThreadLocal 中获得的元素要从本地 List 中删除的。
// 这样就不需要但是因为 ThreadLocal 中的元素没有删除导致的内存泄露问题了。
threadList.get().remove(bagEntry);
return removed;
}
D瓜哥这里有一个疑问:只处理了状态是 STATE_IN_USE
和 STATE_RESERVED
的元素。那么,状态是 STATE_NOT_IN_USE
的元素,为什么不能删除?
下一节,我们来分析一下 HikariCP 中另外一个非常重要的数据结构: FastList。