HikariCP 性能分析

HikariCP 性能分析

wait() 还是用 yield()

下面代码摘自 HikariCP 在添加一个 BagEntry 时候的实现:

1
2
3
4
5
6
7
public void add(final T bagEntry) {
sharedList.add(bagEntry);
// spin until a thread takes it or none are waiting
while (waiters.get() > 0 && !handoffQueue.offer(bagEntry)) {
yield();
}
}

这个地方,我不明白为什么使用 yield,于是上网搜索得到三个网友的答案:

yield() is a Thread method , wait() is at the origins Object method inheritid in thread as for all classes.


wait is for waiting on 一个条件. This might not jump into the eye when looking at the method as it is entirely up to you to define what kind of condition it is. But the API tries to 强制 you to use it correctly by requiring that you own the monitor of the object on which you are waiting, which is necessary for a correct condition check in a multi-threaded environment.

So a 正确使用 of wait looks like:

1
2
3
4
5
synchronized(object) {
while( ! /* your defined condition */)
object.wait();
/* execute other critical actions if needed */
}

And it must be paired with another thread executing code like:

1
2
3
4
synchronized(object) {
/* make your defined condition true */)
object.notify();
}

In contrast Thread.yield() is just a hint that your thread might 释放 CPU at this point of time. It’s not specified whether it actually does anything and, regardless of whether the CPU has been released or not, it has no impact on the semantics in respect to the memory model. In other words, it does not create any relationship to other threads which would be required for accessing shared variables correctly.

For example the following loop accessing sharedVariable (which is not declared volatile) might run forever without ever noticing updates made by other threads:

1
while(sharedVariable != expectedValue) Thread.yield();

While Thread.yield might help other threads to run (they will run anyway on most systems), it does not enforce re-reading the value of sharedVariable from the shared memory. Thus, without other constructs enforcing memory visibility, e.g. decaring sharedVariable as volatile, this loop is broken.


Not even close, because yield() does not wait for anything.

Every thread can be in one of a number of different states: Running means that the thread is actually running on a CPU, Runnable means that nothing is preventing the thread from running except, maybe the availability of a CPU for it to run on. All of the other states can be lumped into a category called blocked. A blocked thread is a thread that is waiting for something to happen before it can become runnable.

The operating system preempts 抢占 running threads on a regular basis: Every so often (between 10 times per second and 100 times per second on most operating systems) the OS tags each running thread and says, “your turn is up, go to the back of the run queue’ (i.e., change state from running to runnable). Then it lets whatever thread is at the head of the run queue use that CPU (i.e., become running again).

When your program calls Thread.yield(), it’s saying to the operating system, “I still have work to do, but it might not be as important as the work that some other thread is doing. Please send me to the back of the run queue right now.” If there is an available CPU for the thread to run on though, then it effectively will just keep running (i.e., the yield() call will immediately return).

When your program calls foobar.wait() on the other hand, it’s saying to the operating system, “Block me until some other thread calls foobar.notify().

Yielding was first implemented on non-preemptive operating systems and, in non-preemptive threading libraries. On a computer with 只有一个 CPU, the only way that more than one thread ever got to run was when the threads 明确地 yielded to one another.

Yielding also was useful for 忙等待. That’s where a thread waits for something to happen by sitting in a tight loop, testing the same condition over and over again. If the condition depended on some other thread to do some work, the waiting thread would yield() each time around the loop in order to let the other thread do its work.

Now that we have preemption and multiprocessor systems and libraries that provide us with higher-level synchronization objects, there is basically no reason why an application programs would need to call yield() anymore.

为什么使用 SynchronousQueue

以下代码摘自 HikariCP 在实现 ConcurrentBag 借与还的时候都用到的重要同步队列的代码:

1
2
3
4
5
private final SynchronousQueue<T> handoffQueue;

public ConcurrentBag(final IBagStateListener listener) {
this.handoffQueue = new SynchronousQueue<>(true);
}

相应的这个 handoffQueue 在如下几种情况下都使用了:

  • :
1
final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
  • :
1
2
3
if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
return;
}
  • 添加:
1
2
3
while (waiters.get() > 0 && !handoffQueue.offer(bagEntry)) {
yield();
}

以下内容摘自 《Java 并发编程的艺术》:

SynchronousQueue 是一个不存储元素的阻塞队列。每一个 put 操作必须等待一个 take 操作,否则不能继续添加元素。它支持公平访问队列。默认情况下线程采用非公平性策略访问队列。使用以下构造方法可以创建公平性访问的 SynchronousQueue,如果设置为 true,则等待的线程会采用先进先出的顺序访问队列

1
2
3
4
5
6
7
public SynchronousQueue() {
this(false);
}

public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

SynchronousQueue 可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合传递性场景。SynchronousQueue 的吞吐量高于 LinkedBlockingQueueArrayBlockingQueue


《实战 Java 高并发程序设计》 中也有对其介绍,以下是部分摘抄:

对于 SynchronousQueue 来说,它将 put()take() 两个功能截然不同的操作抽象为一个共通的方法 Transferer.transfer(),从字面意思上看,这就是数据传递的意思:

1
abstract E transfer(E e, boolean timed, long nanos);

e 参数为非空时,表示当前操作传递给一个消费者,如果为空,则表示当前操作需要请求一个数据。如果返回值非空,则表示数据已经接受或者正常提供,如果为空,则表示失败(超时或者中断)。


API 介绍与实现:

  • offer():
1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Inserts the specified element into this queue, waiting if necessary
* up to the specified wait time for another thread to receive it.
*/
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
return true;
if (!Thread.interrupted())
return false;
throw new InterruptedException();
}
  • poll():
1
2
3
4
5
6
7
8
9
10
11
/**
* Retrieves and removes the head of this queue, waiting
* if necessary up to the specified wait time, for another thread
* to insert it.
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E e = transferer.transfer(null, true, unit.toNanos(timeout));
if (e != null || !Thread.interrupted())
return e;
throw new InterruptedException();
}

CompletableFuture

CompletableFuture 是 Java 8 新增的一个超大型工具类,它实现了 Future 接口,而更重要的是,它也实现了 CompletionStage 接口,拥有多达 40 种方法!

严重怀疑这个地方其实用错了:

1
2
3
4
5
6
7
8
9
10
@Override
public void addBagItem(final int waiting)
{
final boolean shouldAdd = waiting - addConnectionQueue.size() >= 0; // Yes, >= is intentional.
if (shouldAdd) {
addConnectionExecutor.submit(POOL_ENTRY_CREATOR);
}

CompletableFuture.completedFuture(Boolean.TRUE);
}

无锁修改状态

不用这种方法:

1
2
3
synchronized (this) {
state = newState;
}

而采用无锁方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
final class PoolEntry implements IConcurrentBagEntry {
private static final
AtomicIntegerFieldUpdater<PoolEntry> stateUpdater;
private volatile int state = 0;

static {
stateUpdater = AtomicIntegerFieldUpdater
.newUpdater(PoolEntry.class, "state");
}

@Override
public int getState() {
return stateUpdater.get(this);
}

@Override
public boolean compareAndSet(int expect, int update) {
return stateUpdater.compareAndSet(this, expect, update);
}

@Override
public void setState(int update) {
stateUpdater.set(this, update);
}
}

  • Updater 使用反射得到这个变量。
1
return tclass.getDeclaredField(fieldName);
  • 为了确保变量被正确的读取,它必须是 volatile 类型的。

AtomicIntegerFieldUpdater 能够原子更新整型字段。

参考

推荐文章