线程池是众多池化思想的一种。在Java中,与多线程相关的是java.util.concurrent包,而其中最核心的便是线程池类java.util.concurrent.ThreadPoolExecutor。该类包含了线程池的维护与任务执行的过程。本文主要围绕ThreadPoolExecutor,既是源码分析,也是经验总结。如有理解不当,望不吝赐教。
整体结构
继承关系
在J.U.C包下,线程池主要包括四个类(或接口)。从上到下依次是Executor,ExecutorService,AbstractExecutorService和ThreadPoolExecutor

分工
- 
Executor作为顶层父接口,将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器 (Executor) 中,由 Executor 框架完成线程的调配和任务的执行部分。 
- 
ExecutorService接口增加了一些能力: - 扩充执行任务的能力,比如可以为一个或一批异步任务生成Future的方法(invokeAll);
- 提供了管控线程池的方法,比如停止线程池运行的方法(shutdown);
- 拓展了有返回值的提交任务方法(submit)。
 
- 
AbstractExecutorService是抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。 
- 
ThreadPoolExecutor实现最复杂的运行部分,一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。 
构造方法
ThreadPoolExecutor有四个构造方法,除全参构造外,其它三个都是全参构造的子集。
全参构造
共7个构造参数,赋值给6个成员变量。
| 1 | public ThreadPoolExecutor(int corePoolSize, | 
构造参数
含义分别如下:
| 参数 | 含义 | 
|---|---|
| corePoolSize | 核心线程数。核心线程即使空闲也不会被清除 | 
| maximumPoolSize | 最大线程数。超出核心线程数的线程是非核心线程,空闲一定时间会被清除 | 
| keepAliveTime | 空闲线程的最大存活时间 | 
| unit | 存活时间单位 | 
| workQueue | 任务队列。当核心线程没有空闲时,会将任务丢到等待队列 | 
| threadFactory | 线程工厂。创建线程的工厂 | 
| handler | 任务拒绝策略。当核心线程、任务队列、非核心线程都满了,会执行拒绝策略 | 
线程池运行流程
- 线程池创建时,并未创建线程,而是在第一次提交任务时创建;
- 在线程数未大于核心线程数corePoolSize时,每提交一个任务会创建一个线程(通过threadFactory);
- 在线程数达到核心线程数时,会将任务丢到工作队列workQueue里;
- 在工作队列满了时,会继续创建非核心线程;
- 当线程数达到最大线程数maximumPoolSize时,会执行拒绝策略handler;
- 当非核心线程空闲达到存活时间(keepAliveTimeunit)时,会被自动移除。
参数设置建议
- 如果是计算密集型,由于CPU几乎没有空闲时间,因此线程数不应大于CPU核数,对于核数为N的CPU,建议线程数=N-1(留一个核给主线程)
- 如果是IO密集型,可以根据单个线程CPU使用率来确定。对于使用率为k(0<k<1)的任务,建议线程数=N/k
- 队列长度应与核心线程数相当。队列太长,不能及时创建新线程,导致处理任务效率低;队列太短,可能会频繁创建非核心线程
成员变量
除构造参数中指定的6个参数外,还有以下成员变量
ctl
ctl是原子int类型,共32位。其中高3位表示线程池状态,低29位表示线程数。
| 1 | // 原子类型,线程池初始状态是RUNNING,初始线程数是0 | 
ctl初始值为ctlOf(RUNNING, 0),其中RUNNING值为-1<<29,即1110 00…00(29个0)。ctlOf方法中,与0做|运算后,结果不变。
1110 00…00表示线程池状态为111,线程数为0。
-1的二进制表示
源码:1000 00…01
反码:1111 11…10(符号位不变,其它取反)
补码:1111 11…11(反码加1)
largestPoolSize
曾经出现的最大线程数
| 1 | private int largestPoolSize; | 
completedTaskCount
已完成的任务数(所有线程)
| 1 | private long completedTaskCount; | 
allowCoreThreadTimeOut
是否允许核心线程超时,默认false
| 1 | private volatile boolean allowCoreThreadTimeOut; | 
常量
COUNT_BITS
COUNT_BITS 表示线程数占ctl的位数,即32-3=29。
| 1 | private static final int COUNT_BITS = Integer.SIZE - 3; | 
CAPACITY
CAPACITY用作拆分ctl。值为1 << 29 - 1,二进制表示为00011111 11111111 11111111 11111111,即3个0和29个1。
同时,CAPACITY也表示线程池中允许的最大线程数,这样线程数就被限制在ctl的低29位。
| 1 | private static final int CAPACITY = (1 << COUNT_BITS) - 1; | 
线程池的五种状态
状态分别用-1~3的左移29位表示,结果按从小到大排序。
| 1 | private static final int RUNNING = -1 << COUNT_BITS; | 
下表列出了线程池的五种状态的含义和触发条件。
| 状态 | 名称 | 二进制表示 | 说明 | 触发条件 | 
|---|---|---|---|---|
| RUNNING | 运行 | 1110 00…00 | 允许提交任务 | 初始化时 | 
| SHUTDOWN | 关闭 | 0000 00…00 | 不允许提交任务,允许执行已提交的任务 | 调用shutdown()后 | 
| STOP | 停止 | 0010 00…00 | 不允许提交任务,丢弃已提交的任务,中断正在执行的任务 | 调用shutdownNow()后 | 
| TIDYING | 整理 | 0100 00…00 | 线程数清零,任务清零 | 调用tryTerminate()后 | 
| TERMINATED | 终止 | 0110 00…00 | 线程池终止 | 调用terminated()后 | 
二进制下,各状态都只有高3位有效,分别为111,000,001,010,011。这样线程池状态就被限制在ctl的高3位。
内部类
Worker
Worker是ThreadPoolExecutor的内部类,继承了AbstractQueuedSynchronizer和Runnable接口。其中,实现Runnable接口表明Worker就是一个线程,承担着处理任务的工人角色。继承AQS使Worker具备了有锁/无锁两个状态,分别用来表示线程的空闲/忙碌。方便后续销毁。
| 1 | private final class Worker extends AbstractQueuedSynchronizer implements Runnable {...} | 
成员变量
Worker有三个成员变量,分别是:
- Thread thread实际的工作线程
- Runnable firstTask待处理的初始任务
- long completedTasks任务计数器,统计单个worker执行的任务数
构造方法
Worker只有一个构造方法。在调用构造方法时,会通过线程工厂创建线程,当前worker对象作为Runnable实例传入newThread方法中。这样,当调用thread.start()时,实际调用的是worker.run方法。
| 1 | // 构造方法 | 
这里有点绕,thread是worker的成员变量,但worker又是thread的target。也就Doug Lea敢这么写啊。
成员方法
run方法
run内部调用了runWorker方法,传入的是当前worker对象。
| 1 | public void run() { | 
interruptIfStarted方法
中断所有运行中的线程。此方法只在shutdownNow中有调用。
| 1 | void interruptIfStarted() { | 
拒绝策略
j.u.c.RejectedExecutionHandler接口提供了一个方法rejectedExecution,当线程池和队列都满的时候,会调用rejectedExecution方法,执行拒绝策略。
| 1 | // RejectedExecutionHandler只有一个方法 | 
ThreadPoolExecutor内置了4个RejectedExecutionHandler实现类,分别表示4种拒绝策略。
CallerRunsPolicy
任务交由主线程运行。由于主线程被占用,此策略会暂时阻止任务继续提交。
| 1 | public static class CallerRunsPolicy implements RejectedExecutionHandler { | 
AbortPolicy
抛出异常
| 1 | public static class AbortPolicy implements RejectedExecutionHandler { | 
DiscardPolicy
丢弃且静默处理
| 1 | public static class DiscardPolicy implements RejectedExecutionHandler { | 
DiscardOldestPolicy
丢弃最老的任务,提交当前任务
| 1 | public static class DiscardOldestPolicy implements RejectedExecutionHandler { | 
对比
分别对四种拒绝策略构建的线程池(queueSize=5,corePoolSize=1,maxPoolSize=5),for循环提交100个任务,执行如下:
| 拒绝策略 | 描述 | 作用 | 任务处理结果 | 
|---|---|---|---|
| CallerRunsPolicy | 交给主线程 | 队列和最大线程都满了,则会把新任务交给调用线程处理。 | 处理1-100 | 
| AbortPolicy | 拒绝 | 队列和最大线程都满了,则抛出异常 RejectedExecutionException | 处理1-10 | 
| DiscardPolicy | 丢弃 | 队列和最大线程都满了,直接丢弃 | 处理1-10 | 
| DiscardOldestPolicy | 丢弃最早 | 队列和最大线程都满了,丢弃旧任务,处理新任务 | 处理1,7-10,96-100 | 
分析如下
- 
CallerRunsPolicy:当线程和队列都满了,则由主线程处理第11号任务。等主线程处理完,才会重新提交12-21号任务,然后主线程继续处理22号任务。最终处理1-100。 注意:由于主线程的阻塞会影响任务提交,所以队列不应该设置的太小,否则有可能造成主线程繁忙、其它线程空闲的情况。建议 queueSize=maxPoolSize。
- 
AbortPolicy:当线程和队列都满了,直接抛出异常,程序中断,最终处理1-10。业务中可以捕获这个异常做后续处理。 
- 
DiscardPolicy:当线程和队列都满了,没有任何提示,程序中断,最终处理1-10。此策略应少用。 
- 
DiscardOldestPolicy:当线程和队列都满了的时候,此时线程持有1、7、8、9、10,队列持有2、3、4、5、6。此策略会丢弃队列头的任务,将新任务放入队列尾。因此就有:  丢弃2,放入11  丢弃3,放入12  …  丢弃94,放入99  丢弃95,放入100 等任务全部提交完毕,队列中还剩下96-100号任务。等线程空闲,就会来执行这5个,所以最后的执行结果是1、7、8、9、10、96、97、98、99、100。 当新任务对旧任务有替代作用时,可以使用此策略。比如业务需求是定时获取节目信息,那么新任务肯定比旧任务信息更准确,新任务到来时如果旧任务还没返回,可以丢弃旧任务。 注意,此策略下队列不应设置太小,否则可能导致任务被大量丢弃,如示例中所示。 
成员方法
ThreadPoolExecutor功能主要包含线程池维护和任务执行两部分,其中任务执行最重要的就是execute方法
execute方法
execute方法作用是将任务提交给线程池。具体可以分四种情况:
- 核心线程未满,创建核心线程,并将任务交给核心线程;
- 核心线程已满,队列未满,则将任务添加到队列;
- 队列已满,则创建非核心线程,将任务交给新线程;
- 最大线程已满,或线程池关闭,则执行拒绝策略。
源码如下:
| 1 | public void execute(Runnable command) { | 
isRunning方法
c < SHUTDOWN,则只有RUNNING一种情况。
| 1 | private static boolean isRunning(int c) { | 
addWorker方法
顾名思义,addWorker方法主要作用是创建工人对象。方法有两个参数
- Runnable firstTask,表示该线程需要处理的第一个任务
- boolean core,表示该线程是否为核心线程
在execute方法中,有三个地方会调用addWorker方法。分别是:
- 核心线程未满,创建核心线程;
- 核心线程数为0,且任务已加到队列里,创建非核心线程;
- 核心线程已满且队列已满,创建非核心线程
源码如下:
| 1 | // 返回true,表示线程添加成功 | 
workerCountOf方法
从ctl中获取线程数
| 1 | private static int workerCountOf(int c) { | 
CAPACITY = (1 << 29) - 1, 1<<29是 0010 00...11,再减1是 0001 11...11,即3个1和29个0。c & CAPACITY取的就是c的低29位。
runStateOf方法
从ctl中获取线程池状态
| 1 | private static int runStateOf(int c) { | 
CAPACITY 是 0001 11...11,取反就是1110 00...00,c & ~CAPACITY取的就是c的高3位。
compareAndIncrementWorkerCount方法
CAS操作,使ctl的值加1。跟线程数加1等效。
| 1 | private boolean compareAndIncrementWorkerCount(int expect) { | 
addWorkerFailed方法
线程启动失败,会调用addWorkerFailed方法,有三个作用:
- 从集合中移除当前Worker对象
- ctl值减1,因为在addWorker中先加了1
- 尝试终止线程池
此方法只在addWorker中调用。
源码如下:
| 1 | private void addWorkerFailed(Worker w) { | 
decrementWorkerCount方法
循环调用compareAndDecrementWorkerCount使ctl减1,直到成功
| 1 | private void decrementWorkerCount() { | 
compareAndDecrementWorkerCount方法
CAS操作,使ctl的值减1。跟线程数减1等效。
| 1 | private boolean compareAndDecrementWorkerCount(int expect) { | 
runWorker方法
由上面分析可知:在addWorker方法中,调用 t.start() 启动线程,t = worker.thread,而 thread 持有的是worker对象,因此会调用到worker的run方法,最终会调用到runWorker。
runWorker方法主要作用是:持续从队列中获取任务并消费,直到获取到的任务为null,则会结束循环并移除当前线程。
runWorker方法在run方法中唯一调用。
| 1 | final void runWorker(Worker w) { | 
注意一下Thread的三个方法:
| 方法 | 性质 | 返回值类型 | 作用 | 
|---|---|---|---|
| interrupt | 成员方法 | void | 设置线程的中断状态,作用于调用线程 | 
| isInterrupted | 成员方法 | boolean | 返回线程的中断状态,作用于调用线程 | 
| interrupted | 静态方法 | boolean | 返回并清除线程的中断状态,只能作用于当前线程 | 
getTask方法
在runWorker方法中,如果 task==null,则会调用getTask方法从队列获取任务。
具体逻辑是:
- 队列不为空,从队列尾取出一个任务,并返回;
- 队列为空
- 如果线程数未超出核心线程,则workQueue.take()会一直阻塞,直到有新的任务入队列;
- 如果线程数超出了核心线程,或者允许核心线程超时(核心线程默认不超时,可通过allowCoreThreadTimeOut()方法设置),则在从队列获取任务时设置一个超时时间(即keepAliveTime),超时则返回null,跳出循环,进入后续清理逻辑。
 
- 如果线程数未超出核心线程,则
这样就实现了设置线程存活时间的逻辑。非常巧妙。
getTask方法在runWorker方法中唯一调用。
源码如下:
| 1 | private Runnable getTask() { | 
processWorkerExit方法
在runWorker方法中,如果getTask方法有结果,则while循环得以持续;
如果结果为null,循环正常结束,执行processWorkerExit(w, false),将当前线程从集合中移除;
如果循环异常结束,则执行processWorkerExit(w, true),将当前线程移除并补充一个线程。
| 1 | private void processWorkerExit(Worker w, boolean completedAbruptly) { | 
shutdown方法
shutdown方法会使线程池进入SHUTDOWN状态,不再接收新任务,但可以处理已经提交的任务。
此方法主要分三步进行:
- 校验执行线程是否有关闭线程池的权限;
- 修改线程池状态为SHUTDOWN;
- 中断空闲的线程;
- 最后尝试关闭线程池。
| 1 | public void shutdown() { | 
怎么实现“不再接收新任务”?
在execute方法中,共有三处调用addWorker方法。

在执行advanceRunState(SHUTDOWN)后,线程池状态改为SHUTDOWN,因此只会进入1或3两个addWorker中。而这两处调用,都有command!=null。因此addWorker方法必然返回false(满足rs >= SHUTDOWN && firstTask != null)。最后会进入reject。

checkShutdownAccess方法
权限校验方法
| 1 | private void checkShutdownAccess() { | 
advanceRunState方法
advanceRunState方法会将ctl的线程状态设为targetState,循环直到成功为止
| 1 | private void advanceRunState(int targetState) { | 
interruptIdleWorkers方法
内部调用重载方法,中断所有空闲的线程
| 1 | private void interruptIdleWorkers() { | 
interruptIdleWorkers(boolean onlyOne)方法
中断空闲的线程。如果onlyOne=true,则最多只中断一个,否则中断所有。
| 1 | private void interruptIdleWorkers(boolean onlyOne) { | 
怎么实现“中断空闲的线程,但不影响已提交的任务”?
在执行t.interrupt前,首先判断if (!t.isInterrupted() && w.tryLock()),要求t线程未中断且worker处于空闲状态(未上锁)。
另一方面,在runWorker方法中,一旦task != null || (task = getTask()) != null成立,就会执行w.lock()使worker上锁。
因此只有阻塞在getTask()的t线程才会进入interruptIdleWorkers方法的if条件。
又由于t线程只会阻塞在BlockingQueue.poll或BlockingQueue.take方法处,而这两个方法都会抛出InterruptedException异常,因此t.interrupt()会使t线程中断运行。
tryTerminate方法
尝试终止线程池。只有在线程池状态为STOP或SHUTDOWN(且队列为空),且没有工作线程时,线程池才会终止,最后线程池进入TERMINATED状态。如果有线程在运行,则尝试中断一个,方法结束。
| 1 | final void tryTerminate() { | 
runStateAtLeast方法
判断状态。五种状态由小到大排序,比较大小即可。
| 1 | private static boolean runStateAtLeast(int c, int s) { | 
shutdownNow方法
与shutdown方法不同的是,shutdownNow方法调用advanceRunState方法使线程池进入STOP状态,不再接收新任务,通过interruptWorkers方法中断正在执行的任务,通过drainQueue方法丢弃并返回队列中的任务。
| 1 | public List<Runnable> shutdownNow() { | 
interruptWorkers方法
interruptWorkers方法通过调用w.interruptIfStarted()中断所有运行中的线程,来中断正在执行的任务。
但是注意:interruptWorkers并不一定真的会中断正在执行中的任务。因为interrupt方法实际上只是设置了一个中断标记,只有当线程执行到sleep, wait, join方法,抛出InterruptedException异常时,任务才会中断。否则当前任务会继续执行完毕。
| 1 | private void interruptWorkers() { | 
drainQueue方法
drainQueue方法将阻塞队列中的元素转移到集合中,并清空阻塞队列。
| 1 | private List<Runnable> drainQueue() { | 


