ThreadPoolExecutor
线程池核心实现类
线程池的生命周期
RUNNING: 接受新任务,同时处理工作队列中的任务
SHUTDOWN: 不接受新任务,但是能处理工作队列中的任务
STOP: 不接受新任务,不处理工作队列中的任务,并且强制中断正在运行的工作者线程。
TIDYING: 所有的工作者线程都已经停止,将运行 terminated() 钩子函数。
TERMINATED: terminated() 钩子函数运行完毕
创建实例
/**
* 低 29 位设置为线程池的工作线程数
* 高 3 为设置为线程池的生命周期状态
*/
private final AtomicInteger ctl = new AtomicInteger(ThreadPoolExecutor.ctlOf(ThreadPoolExecutor.RUNNING, 0));
// 线程池的工作线程数在 int 中占用的位数
private static final int COUNT_BITS = Integer.SIZE - 3;
// 工作线程数掩码
private static final int COUNT_MASK = (1 << ThreadPoolExecutor.COUNT_BITS) - 1;
// runState is stored in the high-order bits
// 线程池处于运行状态:接受新任务,同时处理工作队列中的任务
private static final int RUNNING = -1 << ThreadPoolExecutor.COUNT_BITS;
// 线程池正在停止:不接受新任务,但是能处理工作队列中的任务
private static final int SHUTDOWN = 0 << ThreadPoolExecutor.COUNT_BITS;
// 线程池已经停止:不接受新任务,不处理工作队列中的任务,并且强制中断正在运行的工作者线程
private static final int STOP = 1 << ThreadPoolExecutor.COUNT_BITS;
// 线程池正在执行清理:所有的工作者线程都已经停止,将运行 terminated() 钩子函数
private static final int TIDYING = 2 << ThreadPoolExecutor.COUNT_BITS;
// 线程池已经清理完毕:terminated() 钩子函数运行完毕
private static final int TERMINATED = 3 << ThreadPoolExecutor.COUNT_BITS;
/**
* 任务队列,
* 1)如果工作者线程允许过期,则使用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 读取任务
* 2)否则使用 workQueue.take() 读取任务
*/
private final BlockingQueue workQueue;
/**
* 添加工作者线程、关闭线程池、读取统计数据等操作中使用的互斥锁
*/
private final ReentrantLock mainLock = new ReentrantLock();
/**
* 线程池中的工作者线程集合,只有在持有 mainLock 时才能访问
*/
private final HashSet workers = new HashSet<>();
/**
* 执行 awaitTermination 操作时的条件
*/
private final Condition termination = mainLock.newCondition();
/**
* 跟踪线程池同时存在的最大工作线程数
* Accessed only under mainLock.
*/
private int largestPoolSize;
/**
* 线程池完成的任务数,只在工作者线程退出时更新
* Accessed only under mainLock.
*/
private long completedTaskCount;
/**
* 任务队列,
* 1)如果工作者线程允许过期,则使用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 读取任务
* 2)否则使用 workQueue.take() 读取任务
*/
private final BlockingQueue workQueue;
/**
* 添加工作者线程、关闭线程池、读取统计数据等操作中使用的互斥锁
*/
private final ReentrantLock mainLock = new ReentrantLock();
/**
* 线程池中的工作者线程集合,只有在持有 mainLock 时才能访问
*/
private final HashSet workers = new HashSet<>();
/**
* 执行 awaitTermination 操作时的条件
*/
private final Condition termination = mainLock.newCondition();
/**
* 跟踪线程池同时存在的最大工作线程数
* Accessed only under mainLock.
*/
private int largestPoolSize;
/**
* 线程池完成的任务数,只在工作者线程退出时更新
* Accessed only under mainLock.
*/
private long completedTaskCount;
/**
* 创建工作者线程的工厂,工作者线程创建失败会导致任务丢失
*/
private volatile ThreadFactory threadFactory;
/**
* 线程池满载或关闭过程中,任务被拒绝时的处理器
*/
private volatile RejectedExecutionHandler handler;
/**
* 空闲工作者线程的超时时间,以纳秒为单位。
* 1)当前工作者线程数 > 核心线程数
* 2)允许核心工作者线程超时 allowCoreThreadTimeOut=true
*/
private volatile long keepAliveTime;
/**
* 默认为 false,即使超时了,核心工作者线程也不会退出
*/
private volatile boolean allowCoreThreadTimeOut;
/**
* 核心工作者线程数
*/
private volatile int corePoolSize;
/**
* 最大工作者线程数
*/
private volatile int maximumPoolSize;
/**
* 默认的拒绝处理器
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), ThreadPoolExecutor.defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, ThreadPoolExecutor.defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
/**
* 使用指定的初始化参数创建一个 ThreadPoolExecutor 实例
*
* @param corePoolSize 核心工作者线程所
* @param maximumPoolSize 最大工作者线程数
* @param keepAliveTime 工作者线程存活时间
* @param unit 时间单位
* @param workQueue 工作队列
* @param threadFactory 创建工作者线程的线程工厂
* @param handler 拒绝处理器
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
/**
* 必须保证
* corePoolSize >=0
* maximumPoolSize > 0
* maximumPoolSize > corePoolSize
* keepAliveTime > 0 表示工作者线程可超时退出
* keepAliveTime = 0 表示不可退出
*/
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0) {
throw new IllegalArgumentException();
}
if (workQueue == null || threadFactory == null || handler == null) {
throw new NullPointerException();
}
acc = System.getSecurityManager() == null
? null
: AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
可提交的任务类型
Runnable 接口无返回值并且不能显示抛出异常。
Callable 接口有返回值,并且能显示抛出异常。
@FunctionalInterface
public interface Runnable {
void run();
}
@FunctionalInterface
public interface Callable {
/**
* 计算并返回的一个结果,如果计算失败,则抛出异常
*/
V call() throws Exception;
}
执行一个 Runnable 任务,无返回值
/**
* 往线程池提交一个 Runnable 任务,
* 如果线程池已满或线程池关闭则,该任务会交给拒绝处理器处理。
*/
@Override
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
// 读取控制变量
int c = ctl.get();
// 1)线程池工作线程数 < 核心线程数
if (ThreadPoolExecutor.workerCountOf(c) < corePoolSize) {
// 尝试创建一个新的工作者线程来处理这个任务
if (addWorker(command, true)) {
// 创建成功则直接返回
return;
}
// 创建失败,则重新读取控制变量
c = ctl.get();
}
/**
* 2)当前工作者线程数 >= 核心工作者线程
* && 线程池处于运行状态
* && 尝试向工作者队列中提交任务
*/
if (ThreadPoolExecutor.isRunning(c) && workQueue.offer(command)) {
// 重新读取控制变量
final int recheck = ctl.get();
// 1)如果线程池已经停止运行,则将目标任务从任务队列中移除,并尝试终止线程池
if (! ThreadPoolExecutor.isRunning(recheck) && remove(command)) {
// 执行拒绝处理器
reject(command);
// 2)如果已经没有可用的工作者线程
} else if (ThreadPoolExecutor.workerCountOf(recheck) == 0) {
// 尝试添加一个新的工作者线程
addWorker(null, false);
}
}
/**
* 3)当前工作者线程数 >= 核心工作者线程
* && 工作队列已满
* && 尝试增加一个新的工作者线程来处理该任务
*/
else if (!addWorker(command, false)) {
// 任务处理失败,则交给拒绝处理器处理
reject(command);
}
}
/**
* 读取线程池的工作线程数
*/
private static int workerCountOf(int c) { return c & ThreadPoolExecutor.COUNT_MASK; }
/**
* 尝试增加一个核心工作者线程来处理这个任务
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
/**
* 1)线程池状态在 STOP 及以上【线程池已经停止】
* 2)线程池正在停止,并且提交任务不为 null || 工作队列为空
* 则创建失败
*/
if (ThreadPoolExecutor.runStateAtLeast(c, ThreadPoolExecutor.SHUTDOWN)
&& (ThreadPoolExecutor.runStateAtLeast(c, ThreadPoolExecutor.STOP)
|| firstTask != null
|| workQueue.isEmpty())) {
return false;
}
for (;;) {
/**
* 1)工作者线程数已经 >= 核心线程数【任务队列未满时】
* 2)工作者线程数已经 >= 最大线程数【任务队列已满时】
* 则创建失败
*/
if (ThreadPoolExecutor.workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & ThreadPoolExecutor.COUNT_MASK)) {
return false;
}
// 尝试递增工作者线程数
if (compareAndIncrementWorkerCount(c)) {
// 如果计数值递增成功,则将正式添加工作者线程来处理任务
break retry;
}
// 如果其他线程优先递增了计数值,则重新读取计数值进行重试
c = ctl.get(); // Re-read ctl
// 线程池正在关闭,则重新进入循环后将直接退出
if (ThreadPoolExecutor.runStateAtLeast(c, ThreadPoolExecutor.SHUTDOWN))
{
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
}
// 工作者线程是否已经启动
boolean workerStarted = false;
// 工作者线程是否已经添加到集合中
boolean workerAdded = false;
Worker w = null;
try {
// 创建工作者线程
w = new Worker(firstTask);
// 读取线程对象
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
/**
* Recheck while holding lock. Back out on ThreadFactory failure or if shut down before lock acquired.
* 读取控制变量再次进行校验
*/
final int c = ctl.get();
/**
* 1)线程池处于运行状态
* 2)线程池处于关闭状态 && 提交任务为 null
*/
if (ThreadPoolExecutor.isRunning(c) ||
ThreadPoolExecutor.runStateLessThan(c, ThreadPoolExecutor.STOP) && firstTask == null) {
// 工作者线程已经启动
if (t.isAlive()) {
throw new IllegalThreadStateException();
}
// 将工作者线程添加到集合中
workers.add(w);
// 如果当前当前工作者线程数 > largestPoolSize,则更新它
final int s = workers.size();
if (s > largestPoolSize) {
largestPoolSize = s;
}
// 工作者线程添加成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 如果添加成功,则启动工作者线程
if (workerAdded) {
t.start();
// 工作者线程启动成功
workerStarted = true;
}
}
} finally {
// 如果工作者线程启动失败,则进行回退和清理
if (!workerStarted) {
addWorkerFailed(w);
}
}
return workerStarted;
}
// 运行状态 c 小于指定状态 s
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
// 运行状态 c 大于等于指定状态 s
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
/**
* 尝试原子的将工作者线程数 +1
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* Rolls back the worker thread creation.
* - removes worker from workers, if present
* - decrements worker count
* - rechecks for termination, in case the existence of this
* worker was holding up termination
*/
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 1)从 workers 集合中移除工作者 w
if (w != null) {
workers.remove(w);
}
// 递减总工作者线程数
decrementWorkerCount();
// 尝试进行线程池终止
tryTerminate();
} finally {
mainLock.unlock();
}
}
/**
* 将工作者线程总数递减 1
*/
private void decrementWorkerCount() {
ctl.addAndGet(-1);
}
final void tryTerminate() {
for (;;) {
final int c = ctl.get();
/**
* 1)线程池在运行状态【RUNNING】
* 2)线程池在执行清理操作【TIDYING】
* 3)线程池正在停止【SHUTDOWN】并且工作队列非空
* 直接返回
*/
if (ThreadPoolExecutor.isRunning(c) ||
ThreadPoolExecutor.runStateAtLeast(c, ThreadPoolExecutor.TIDYING) ||
ThreadPoolExecutor.runStateLessThan(c, ThreadPoolExecutor.STOP) && ! workQueue.isEmpty