为什么线程处于 WAITING 状态

理解线程池的工作原理

一、背景

我们使用线程池提交了四个任务 DirectChannelURLProducer:

1
2
3
4
5
6
7
8
9
10
producerThreadPool = new ThreadPoolExecutor(N_CPU, 
N_CPU,
0L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10),
ThreadUtils.getThreadFactory("direct-channelurl-producer"));

for (int i=0; i<4; i++) {
producerThreadPool.submit(new DirectChannelURLProducer(this.workingQueue, this.redisQueueName));
}

我们可以确定的是 DirectChannelURLProducer 这个任务在运行的时候,会由于 redis 服务器连接不上而导致获取 redis 失败。由于我们并没有捕获这一异常,所以会直接跳出 while 循环,而直接导致这个任务运行结束:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class DirectChannelURLProducer implements Runnable {

@Override
public void run() {
try {
produceDirectChannelURL();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

private void produceDirectChannelURL() throws InterruptedException {
while (!Thread.currentThread().isInterrupted()) {

// 抛出异常
Jedis jedis = MyRedisPool.getUrlRedis();

// do something with jedis

Thread.sleep(5 * 1000);
}
}

}

二、困惑点

按照之前的逻辑,我认为当一个任务结束的时候,整个线程应该也跟着退出才对。但是当我使用 jstack 对当前这个程序做线程 DUMP 的时候,却发现这 4 个线程依然在运行着,而且还处于阻塞 (WAITING) 状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
"direct-channelurl-producer-thread-3" #23 prio=5 os_prio=0 tid=0x00007f5a94a62800 nid=0x261c waiting on condition [0x00007f5a217ed000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000008551bd30> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

"direct-channelurl-producer-thread-2" #22 prio=5 os_prio=0 tid=0x00007f5a94a60800 nid=0x261b waiting on condition [0x00007f5a218ee000]
java.lang.Thread.State: WAITING (parking)
...

"direct-channelurl-producer-thread-1" #21 prio=5 os_prio=0 tid=0x00007f5a94a5e800 nid=0x261a waiting on condition [0x00007f5a219ef000]
java.lang.Thread.State: WAITING (parking)
...

"direct-channelurl-producer-thread-0" #20 prio=5 os_prio=0 tid=0x00007f5a94a5d000 nid=0x2619 waiting on condition [0x00007f5a21af0000]
java.lang.Thread.State: WAITING (parking)
...

为什么任务结束之后,线程却依然还在运行呢?

三、线程池如何运行任务

原来,Java 自身提供的线程池的一个基本想法就是将任务和线程分离开来。任务的结束,并不意味着线程的结束。线程池本身可以维护一定数量的活跃线程,这些线程默认情况下永远不结束,永远在等待新的任务的到来。

其中活跃线程的数量是由线程池类的构造器的第一个参数 corePoolSize 传入的:

threadPoolExecutor_corePoolSize

我们这里传入的是 N_CPU ,即 CPU 的个数 (测试机器为 8)。

线程池中的核心线程是按需创建的,并不是一次就创建好 8 个。因此当我们首次提交了 4 个任务后,线程池中至少有 4 个线程处于活跃状态:

threadpoolexecutor-fixedthreadpool

在线程池内部,一个线程绑定在一个 Worker 上。不同的 Worker 绑定着不同的线程,每个 Worker不停的尝试从运行任务队列中获取未运行的任务,然后运行任务:

threadpoolexecutor_worker

Worker 不停拿取并运行任务的精简代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class ThreadPoolExecutor extends AbstractExecutorService {

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// ...
try {
// 不停地拿取任务
while (task != null || (task = getTask()) != null) {
// ...

task.run();

// ...
}
} finally {
processWorkerExit(w, completedAbruptly);
}
}

private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {

public void run() {
runWorker(this);
}

}

}

四、线程池何时创建线程

关于线程的创建首先应该意识到当调用 addWorker() 方法的时候会创建一个 Worker :

new_worker

Worker 的构造器中会为这个 Worker 创建一个线程:

1
2
3
4
5
6
7
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;

// 创建线程
this.thread = getThreadFactory().newThread(this);
}

我们提到过线程的创建是按需创建的,线程池是否创建新的线程与两个因素有关:

  • (1) 当前已有线程数量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void execute(Runnable command) {

// ...

int c = ctl.get();
// 当前 Worker 数量小于核心线程数量
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}

// ...

}
  • (2) BlockingQueue 是否能添加新的任务

Worker 数量等于核心线程数量之后,那么线程池首先会尝试将任务暂时放到阻塞队列中,如果能够添加成功,那么当任一 Worker 空闲之后便会立即从队列中取出这个任务,进行执行:

1
2
3
4
5
6
7
8
9
10
11
public void execute(Runnable command) {

// ...

// 尝试将待运行任务放到阻塞队列 workQueue 中
if (isRunning(c) && workQueue.offer(command)) {
// ...
}

// ...
}

一般而言,只要阻塞队列还有多余位置,那么任务都将会被放到队列中。如果添加不成功,比如使用了 SynchronousQueue 队列或者容量较小的 ArrayBlockingQueue 等,那么便会尝试新建线程:

1
2
3
4
5
6
7
8
9
10
11
public void execute(Runnable command) {
// ...

if (isRunning(c) && workQueue.offer(command)) {
// ...
}

// 新建线程
else if (!addWorker(command, false))
reject(command);
}

addWorker 的内部实现中,在尝试创建新的线程之前,其会判断当前已有线程数量是否大于最大允许的线程池数量 (maximumPoolSize):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {

// ...

for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||

// core 值传入的是 false
// 当前线程数量是否大于最大线程数量
wc >= (core ? corePoolSize : maximumPoolSize))

// 不允许创建新的线程
return false;

// ...
}
}

// 新建 Worker
w = new Worker(firstTask);

// ...
}

如果用户提交的任务无法放到阻塞队列中,又不允许创建新的线程的话,那么将会调用线程池的拒绝任务策略。

五、线程池何时结束线程

通过直接阅读线程池类的 DOC 文档,我们也能知道:

当当前线程池的线程数量超过了 corePoolSize 后,如果超出部分的线程一直迟迟没有任务 (即处于空闲状态) 的话,那么这些线程最多等待 keepAliveTime 时间,便会结束。默认情况下,线程池不会结束所有线程,使用 allowCoreThreadTimeout 方法可以修改这一默认行为。

对于上述的描述,代码对应的便是 getTask() 方法中的如下片段:

gettask_return_null

我们可以看到有两个条件可以导致 timed 变量为 true:

  • wc > corePoolSize: 当前 Worker 数量超过核心线程数量
  • allowCoreThreadTimeout: 超过一段时间没有任务后,允许核心线程结束

当从阻塞队列中取不出来任务的时候,便会将 timeOut 变量设置为 true:

getTask_set_timeOut_to_true

在这个地方我们可以看到 timed 变量的真正含义: 如果没有 timed 那么便会一直阻塞在阻塞队列的 take() 方法上,否则就调用带有超时功能的 poll() 方法来尝试获取任务。

getTask() 方法是整个被包裹在一个无限循环里面的:

1
2
3
4
5
private Runnable getTask() {
for (;;) {
// ...
}
}

因此将 timeOut 变量设置为 true 之后,在下次循环将会看到这个最新的值,然后这个方法便会返回 null

我们之前提到过 Worker 是一直不停的从运行队列中拿取并运行任务的。想要这个 Worker 一直运行的前提就是,拿取的任务不能为空:

worker_gettask_cannot_be_null

所以一旦返回的任务为空的话,那么直接就会运行 processWorkerExit 这个方法。至此这个 Workerrun 方法运行结束,与这个 Worker 绑定的线程运行也会结束。

六、线程池的常见实现

Executors 类提供了线程池的各种常见的实现版本,我们现在讨论常见的两种实现:

  • (1) newCachedThreadPool()
1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

从代码来看,我们知道 CachedThreadPool 有如下行为:

  1. 空闲线程最多存活 60 秒
  2. 最多创建 Integer.MAX_VALUE 个线程
  • (2) newFixedThreadPool(int nThreads)
1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
  1. 最多创建 nThreads 个线程
  2. 多余任务会缓存到 LinkedBlockingQueue
  3. 不会有超过 nThreads 数量的线程被创建,因此 keepAliveTime 被设置为 0

七、解决困惑

经过上述分析,我们大致对与线程池如何工作有了一个大致了解。我们为线程池配置的阻塞队列是:

1
new ArrayBlockingQueue<Runnable>(10)

因此当首次提交的那 4 个任务出现异常的时候,那么线程池中的对应的 4 个 Worker 便一直会阻塞在 getTask() 这个方法上,具体而言是,是阻塞在 take() 方法上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private Runnable getTask() {
for (;;) {
int wc = workerCountOf(c);

// wc 永远都不会大于 corePoolSize
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// 阻塞在这个地方
workQueue.take();
if (r != null)
return r;


}
}

因此我们对线程进行 DUMP 会看到线程处于 WAITING 状态。

推荐文章