ThreadPoolExecutor源码解析

ThreadPoolExecutor源码解析

简介

线程池作用就是限制系统中执行线程的数量。

根据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果;如果线程少了会浪费系统资源,多了又会造成系统拥挤效率不高。用线程池控制线程数量,使得其他线程排队等候。一个任务执行完毕,再从队列的中取最前面的任务开始执行。若队列中没有任务等待进程,则线程池中的线程处于等待。

主要属性

  • RUNNING:线程池状态,运行中。此时线程池可以接受新的任务和处理等待队列中的任务
  • SHUTDOWN:线程池状态,关闭。此时线程池不接受新的任务,但是会处理等待任务中的任务
  • STOP:线程池状态,停止。此时,线程池不会接受新的任务也不会处理等待队列中的任务
  • TIDYING:线程池状态。所有的任务都销毁。线程池状态转为此状态时,会调用terminated()方法
  • TERMINATED:线程池状态,已终止。terminated方法执行完之后,线程池状态就变为这个。
  • workQueue:任务队列。通常为ArrayBlockingQueue和LinkedBlockingQueue
  • threadFactory:线程工厂。用于生成线程
  • handler:线程池满了之后的拒绝策略
  • keepAliveTime:空闲线程的保活时间。当线程总数超过了核心线程数之后,超过的那部分线程在空闲了指定的时间之后,就会被关闭。
  • corePoolSize:核心线程数。如果没有设置**allowCoreThreadTimeOut(true)**,核心线程超过keepAliveTime也不会被回收
  • maximumPoolSize:线程池的最大线程数
  • defaultHandler:默认的拒绝策略。AbortPolicy

线程池的状态转换

线程池可以有以下状态的转换:

  • RUNNING->SHUTDOWN:当调用了shutdown方法后
  • RUNNING->STOP:当调用了shutDownNow方法后
  • SHUTDOWN->STOP:当调用了shutDownNow方法后
  • SHUTDOWN->TIDYING:当任务队列和线程池都清空后
  • STOP->TIDYING:当任务队列清空后
  • TIDYING->TERMINATED:当调用了terminated方法后

线程池状态转换

主要函数

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

初始化各个参数

execute

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
// ctl 保存着当前线程池的workerCount和runState
int c = ctl.get();
// 如果当前线程数小于核心线程数
if (workerCountOf(c) < corePoolSize) {
// 添加当前任务到worker中,true表示以coresize上界,否则以maximumPoolSize为上界
if (addWorker(command, true))
// 添加成功,直接返回
return;
// 重新获取workerCount和runState
c = ctl.get();
}
// 如果线程池是RUNNING状态,并且添加任务到队列成功了(表示当前worker数目大于等于corePOL,可以将任务放入队列
if (isRunning(c) && workQueue.offer(command)) {
// 再一次获取workerCount和runState
int recheck = ctl.get();
// 如果线程池没有在运行,并且从任务队列中移除成功了
if (! isRunning(recheck) && remove(command))
// 拒绝任务
reject(command);

else if (workerCountOf(recheck) == 0)
// 如果当前worker数目为0,启动一个新的任务去队列里面拿任务执行
addWorker(null, false);
}
// 当前线程数大于等于corePoolSize,并且阻塞队列已满,添加任务,按照maximumPoolSize为上界,启动线程执行任务,否则拒绝任务
else if (!addWorker(command, false))
reject(command);
}
  1. 如果Command为null,返回空指针异常
  2. 如果当前执行的Worker数目小于corePoolSize。直接创建一个新的线程执行任务,返回
  3. 如果当前执行的worker数目大于等于corePoolSize,并且任务队列没有满的话,将任务放到队列中。否则执行第6步。
  4. 任务当如队列中之后,还要做一次recheck,以防另外一个线程在此时关闭了线程池。如果这个时候线程池不是RUNNING状态,将任务从队列中移除,调用拒绝策略
  5. 否则,查看当前的worker数目,如果为0的话,启动一个线程,去任务队列获取任务执行
  6. 新启动一个线程去任务队列执行任务,这次的线程数目上界就以maximumPoolSize为准,如果addWorker失败,就拒绝任务

addWorker

接下来我们看下addWorker方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 获取线程池的运行状态
int rs = runStateOf(c);
// 添加任务的2种失败情况
// 1.如果线程池的状态是SHUTDOWN、TIDYING或者是TERMINATED
// 2. 如果!(运行状态为SHUTDOWN并且任务为null并且队列不为空)
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
// 获取当前线程数
int wc = workerCountOf(c);
// 判断线程数是否超过capacity,或者
// 如果core为true,根据核心线程数判断当前线程数是否超标
// 否则,根据最大线程数判断当前线程数是否超标
// 超过直接返回
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 线程数加1,如果成功,跳出
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
// 当前运行状态被其他线程改变,重试
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 新建一个worker
w = new Worker(firstTask);
final Thread t = w.thread;
// 判断线程工厂创建出的线程是否为null
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 加锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 重新获取运行状态
int rs = runStateOf(ctl.get());
// 如果当前线程池状态是RUNNING
// 或者线程池状态为SHUTDOWN并且firstTask为null,说明只需要新建线程,不需要执行任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 如果线程状态为alive,就抛出异常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 加入workers的HashSet
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
// 更新最大线程池线程数目
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动线程
t.start();
// 修改线程启动标记值
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// 添加线程失败
addWorkerFailed(w);
}
return workerStarted;
}

Work是封装的一个内部类,它继承了AQS类,实现了Runnable接口。

Worker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;

/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;

/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
// 将状态值设置为-1,禁止任何线程获得锁
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

/** Delegates main run loop to outer runWorker */
public void run() {
// 调用外部的runWorker方法
runWorker(this);
}

// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.

protected boolean isHeldExclusively() {
return getState() != 0;
}

protected boolean tryAcquire(int unused) {
// 初始化的时候,state为-1,所以,这边的expect 0 条件永不会满足,除非先调用过runWorker方法。在runWorker方法中会调用unlock方法
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}

public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }

void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

runWorker

接下来看一下runWorker方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
final void runWorker(Worker w) {
// 获取当前线程
Thread wt = Thread.currentThread();
// 获取当前线程的任务
Runnable task = w.firstTask;
// 置为空
w.firstTask = null;
// 解锁,使得worker可以被中断
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 如果worker的任务不为空,或者从任务队列获取到了任务
while (task != null || (task = getTask()) != null) {
// 加锁
w.lock();

// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 如果当前线程池状态是STOP或者TERMINATED 条件1
// 如果当前线程已经被中断并且线程池状态是STOP或者TERMINATED 条件2
// 条件1 或 条件3 并且当前线程没有被中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
// 中断当前线程
wt.interrupt();
try {
// 空方法,可用于做一些线程执行前的预操作
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 线程执行后的扫尾操作,当前实现也为空
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
// 解锁当前worker
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 如果task为null或者任务执行完,销毁线程
processWorkerExit(w, completedAbruptly);
}
}

getTask

接下来看下getTask方法,它的作用是从阻塞队列中获取一个任务来执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
// 如果当前线程池状态为SHUTDOWN、TIDYING、STOP或者是TERMINATED 条件1
// 如果当前线程状态为STOP或者是TERMINATED,或者队列为空 条件2
// 条件1和条件2取和值
// worker数目减一,返回null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
// 获取当前的worker数目
int wc = workerCountOf(c);

// Are workers subject to culling?
// 核心线程数默认是不回收的,所以,如果当前线程数目大于核心线程数,worker就是需要回收的
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果当前的线程数大于最大线程数
// 或者需要worker设置了超时回收并且超时了都没有从队列中拿到数据
// 或者(当前线程数大于1或者任务队列为空)
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 减少worker的数目,返回null
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

processWorkerExit

接着再看看processWorkerExit方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
// 如果当前worker不是正常结束,worker个数减一,否则,减一操作已经在getTask中做过了
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 记录完成的总任务数
completedTaskCount += w.completedTasks;
// 从线程池的workers的集合中删除需要回收的worker
workers.remove(w);
} finally {
mainLock.unlock();
}

// 尝试结束线程池
tryTerminate();

int c = ctl.get();
// 如果当前线程状态处于RUNNING或者SHUTDOWN状态
if (runStateLessThan(c, STOP)) {
// 如果当前worker是正常结束的
if (!completedAbruptly) {
// 如果核心线程不需要回收,那么min就是核心线程数
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果核心线程允许回收,并且任务队列不为空
if (min == 0 && ! workQueue.isEmpty())
// 至少需要一个线程来执行任务
min = 1;
// 如果当前worker的数目大于min,直接返回
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 新赠一个Worker代替原先的Worker
// 新开一个Worker需要满足以下3个条件中的任意一个:
// 1. 用户执行的任务发生了异常
// 2. Worker数量比线程池基本大小要小
// 3. 阻塞队列不空但是没有任何Worker在工作
addWorker(null, false);
}
}

tryTerminate

接着再看看tryTerminate方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 如果
// 当前线程池状态为正在运行
// 当前线程池状态为TIDYING或者TERMINATED,表示正在关闭,不可冲突
// 当前队列不为空,还有任务未处理完
// 直接返回,等待三个条件全部不满足
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 如果当前的worker数目不为0
if (workerCountOf(c) != 0) { // Eligible to terminate
// 中断一个空闲线程
// 中断一个空闲线程之后,这个空闲线程会被回收,回收的时候,会再一次调用tryTerminate方法,然后再次中断另一个空闲线程
interruptIdleWorkers(ONLY_ONE);
return;
}

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 设置线程池状态为TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
调用terminated方法
terminated();
} finally {
// 设置线程池状态为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}

以上的线程池执行过程,可以用下面这个图来概括:

线程池execute方法流程

shutdown

接着看下线程池是如何关闭的

shutdown方法将线程池状态改成SHUTDOWN,线程池还能继续处理阻塞队列里的任务,并且会回收一些闲置的Worker。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 确认当前线程是否有权限执行关闭线程池操作
checkShutdownAccess();
// 将线程池状态改为SHUTDOWN
advanceRunState(SHUTDOWN);
// 中断线程池的空闲线程/空闲worker
interruptIdleWorkers();
// 关闭时的回调方法,当前实现为空
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}

空闲线程

worker运行的时候会去任务队列拿数据(getTask方法),如果没有设置超时时间,就会一直阻塞等待,此时的worker就被称为空闲worker。由于worker也是一个AQS,在runWorker方法里会有一对lock和unlock操作,这对lock操作是为了确保Worker不是一个空闲Worker。

所以Worker被设计成一个AQS是为了根据Worker的锁来判断是否是空闲线程,是否可以被强制中断。

checkShutdownAccess

如果有SecurityManager的话,校验下当前线程是否有关闭的权限

接着再校验每个worker是否允许被中断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* If there is a security manager, makes sure caller has
* permission to shut down threads in general (see shutdownPerm).
* If this passes, additionally makes sure the caller is allowed
* to interrupt each worker thread. This might not be true even if
* first check passed, if the SecurityManager treats some threads
* specially.
*/
private void checkShutdownAccess() {
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkPermission(shutdownPerm);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
security.checkAccess(w.thread);
} finally {
mainLock.unlock();
}
}
}

interruptIdleWorkers

调用了重载方法interruptIdleWorkers(boolean onlyOne)

1
2
3
4
5
6
7
/**
* Common form of interruptIdleWorkers, to avoid having to
* remember what the boolean argument means.
*/
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}

interruptIdleWorkers(boolean )

参数传入false表示中断所有的空闲线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 如果线程没有被中断,并且可以获得线程的锁
if (!t.isInterrupted() && w.tryLock()) {
try {
// 中断线程
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
// 只中断一个
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}

shutdownNow

shutdownNow把线程池状态改成STOP状态,这样不会处理阻塞队列里的任务,也不会处理新的任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 校验当前线程是否具有权限关闭线程池
checkShutdownAccess();
// 将线程池的状态改为STOP
advanceRunState(STOP);
// 中断所有线程,无论当前状态是否存活
interruptWorkers();
// 将阻塞队列中的所有任务移除,并赋值给tasks集合
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}

interruptWorkers

1
2
3
4
5
6
7
8
9
10
11
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
// 中断当前线程,无论是否持有锁,或者是什么状态
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}

interruptIfStarted

只要当前线程没有中断,就中断当前线程

1
2
3
4
5
6
7
8
9
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}

drainQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
// 将任务列表中的任务全部移入ArrayList中
q.drainTo(taskList);
// 如果移除失败,挨个删除并添加到ArrayList中
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}

参考资料

Java线程池ThreadPoolExecutor源码分析

面试官:你分析过线程池源码吗?

0%