ThreadPoolExecutor 源码分析

TOP 带着问题看源码

  1. ThreadPoolExecutor 线程池是如何实现的

1. 基本介绍

前面文章的 Thread 我们也分析了,因为 Java 中的Thread 和 内核线程是 1 : 1 的,所以线程是一个重量级的对象,应该避免频繁创建和销毁,我们可以使用线程池来避免。

ThreadPoolExecutor 是 Java 实现的线程池,它并没有采取常见的池化资源的设计方法,而是采用的 生产者-消费者 模式。

img

上图的左边是线程池的核心体系,右边是 JDK 提供创建线程池的工具类。

Executor 接口

提供最基础的执行方法 execute(Runnable command)

ExecutorService 接口

基于 Executor 接口,新增了线程池的一些操作能力

AbstractExecutorService 抽象类

使用模板模式,丰富了一部分操作的细节流程

ForkJoinPool 实现类

jdk1.7 中新增的线程池类,适用于分治的场景

2. 成员变量 & 核心类分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 控制变量 前 3 位标示运行状态,后 29 位标识工作线程的数量
// 初始化为 RUNNING 状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 移位的段
private static final int COUNT_BITS = Integer.SIZE - 3;
// 后29位,标识容量
// ‭0001 1111 1111 1111 1111 1111 1111 1111‬
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 下面是线程池状态
// 表示可以接受新任务,且可执行队列的任务
// 111 0 0000 ... 0000
private static final int RUNNING = -1 << COUNT_BITS;
// 不接收新任务,但可以执行队列的任务
// 000 0 0000 ... 0000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 中断正在执行的,不再接收和执行队列的任务
// 001 0 0000 ... 0000
private static final int STOP = 1 << COUNT_BITS;
// 半中止状态,所有任务都已中止且无工作线程,修改为这个状态,然后执行 terminated() 方法
// 010 0 0000 ... 0000
private static final int TIDYING = 2 << COUNT_BITS;
// 中止状态,已经执行过 terminated() 方法
// 011 0 0000 ... 0000
private static final int TERMINATED = 3 << COUNT_BITS;
// 获取状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取工作线程的数量
private static int workerCountOf(int c) { return c & CAPACITY; }
// ctl 的值
private static int ctlOf(int rs, int wc) { return rs | wc; }

3. 核心方法分析

3.1 普通任务提交

3.1.1 execute(Runnable command)

主要过程就是:

  1. 如果当前工作线程没有达到核心线程数量阈值,就直接添加一个核心工作线程
  2. 如果达到了核心线程数量阈值,就入任务队列,如果状态不正常,执行拒绝策略
  3. 如果队列满了,就创建非核心线程
  4. 如果创建非核心线程失败(达到了最大数量阈值、线程池状态不正常),执行拒绝策略
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void execute(Runnable command) {
// 校验是否为空
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 如果工作线程数小于核心数
if (workerCountOf(c) < corePoolSize) {
// 添加一个核心工作线程
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果线程池状态正常,并且达到了核心数量,就入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次检查状态,如果不是运行状态就移除任务并执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 再次检查,如果工作线程数量是0,就创建一个
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果入队列失败,就尝试创建非核心工作线程
else if (!addWorker(command, false))
// 创建非核心线程失败,执行拒绝策略
reject(command);
}

3.1.2 addWorker(Runnable firstTask, boolean core)

addWorker 方法主要作用就是创建一个工作线程,并加入到工作线程的集合中,然后启动。在此期间会进行状态和数量的校验。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// 校验状态
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
// 校验工作线程数量
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 数量+1 跳出循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建工作线程,把 firstTask 封装到 Worker 对象,然后把 Worker 对象传给 thread
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
// 再次检查状态
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 加入到工作线程集合
workers.add(w);
// 目前集合的数量
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// 标记添加成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 添加成功就启动线程
// 通过上面 new Worker 的分析,我们知道这里会调用 Worker对象的 run方法
// run 方法里接着调用 runWorker(this)
t.start();
workerStarted = true;
}
}
} finally {
// 没有启动成功,执行降级方法(从集合中清除掉、数量减少、)
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

3.1.3 runWorker(Worker w)

如果有第一个任务就先执行,之后从任务队列取任务执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
final void runWorker(Worker w) {
// 获取当前工作线程
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// ???
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// task如果为空就取任务,如果任务也取不到就结束循环
// getTask() 方法主要就是从任务队列中取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// 检查状态
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 任务执行前扩展方法,例如:可以在执行前等待实现暂停的效果
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 任务执行后扩展方法,例如:可以在执行后释放一些资源
afterExecute(task, thrown);
}
} finally {
// 置 null,重新从队列取
task = null;
// 增加完成数
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

3.2 异步任务提交

3.2.1 submit(Callable task)

submit 方法定义在模板类 AbstractExecutorService 中,然后把 task 封装为 FutureTask , 最后调用 execute 方法来提交任务

AbstractExecutorService#submit

1
2
3
4
5
6
7
8
9
10
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}

我们上面分析 execute 方法知道其最终执行的地方还是调用的 task 的 run 方法,所以我们来分析 FutureTask 的 run 方法。

3.2.2 run()

主要是多了一个执行结果的记录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public void run() {
// 线程状态不为 NEW 或者 修改当前线程来运行当前任务失败,直接返回
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
// 再次校验线程状态
if (c != null && state == NEW) {
// 注意盯着这个运行结果变量
V result;
boolean ran;
try {
// 任务执行
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
// 执行异常就修改线程状态为 EXCEPTIONAL
setException(ex);
}
if (ran)
// 执行正常就修改线程的状态为 NORMAL
set(result);
}
} finally {
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}

3.2.3 get()

主要思路就是自旋等待线程执行完

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 如果线程状态没完成,就进入等待队列
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}

private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
// 自旋
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}

int s = state;
// 已完成就返回
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
// 快完成(异常),就等一会
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// 第一次进来一般会走到这里,把当前线程构建一个等待节点
else if (q == null)
q = new WaitNode();
// 第二次循环尝试把节点入队
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 如果有超时时间
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
// 如果发现入队失败(已经入队过了),就挂起当前线程
else
LockSupport.park(this);
}
}

4. 总结

可以看到,线程池实际上是一个生产-消费模型的实现,其支持普通任务提交和异步任务提交(ps.. 其实叫异步并不是很合适,对于用户来说线程池本来就是异步的)。

知道了核心数量以及等待队列还有最大数量这些功能的实现,相信对如何更好的使用线程池会更有帮助。