ThreadPoolExecutor 源码分析

ThreadPoolExecutor 线程池核心实现类 线程池的生命周期 RUNNING: 接受新任务,同时处理工作队列中的任务 SHUTDOWN: 不接受新任务,但是能处理工作队列中的任务 STOP: 不接受新任务,不处理工作队列中的任务,并且强制中断正在运行的工作者线程。 TIDYING: 所有的工作者线程都已经停止,将运行 terminated() 钩子函数。 TERMINATED: terminated() 钩子函数运行完毕 创建实例 /** * 低 29 位设置为线程池的工作线程数 * 高 3 为设置为线程池的生命周期状态 */ private final AtomicInteger ctl = new AtomicInteger(ThreadPoolExecutor.ctlOf(ThreadPoolExecutor.RUNNING, 0)); // 线程池的工作线程数在 int 中占用的位数 private static final int COUNT_BITS = Integer.SIZE - 3; // 工作线程数掩码 private static final int COUNT_MASK = (1 << ThreadPoolExecutor.COUNT_BITS) - 1; // runState is stored in the high-order bits // 线程池处于运行状态:接受新任务,同时处理工作队列中的任务 private static final int RUNNING = -1 << ThreadPoolExecutor.COUNT_BITS; // 线程池正在停止:不接受新任务,但是能处理工作队列中的任务 private static final int SHUTDOWN = 0 << ThreadPoolExecutor.COUNT_BITS; // 线程池已经停止:不接受新任务,不处理工作队列中的任务,并且强制中断正在运行的工作者线程 private static final int STOP = 1 << ThreadPoolExecutor.COUNT_BITS; // 线程池正在执行清理:所有的工作者线程都已经停止,将运行 terminated() 钩子函数 private static final int TIDYING = 2 << ThreadPoolExecutor.COUNT_BITS; // 线程池已经清理完毕:terminated() 钩子函数运行完毕 private static final int TERMINATED = 3 << ThreadPoolExecutor.COUNT_BITS; /** * 任务队列, * 1)如果工作者线程允许过期,则使用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 读取任务 * 2)否则使用 workQueue.take() 读取任务 */ private final BlockingQueue workQueue; /** * 添加工作者线程、关闭线程池、读取统计数据等操作中使用的互斥锁 */ private final ReentrantLock mainLock = new ReentrantLock(); /** * 线程池中的工作者线程集合,只有在持有 mainLock 时才能访问 */ private final HashSet workers = new HashSet<>(); /** * 执行 awaitTermination 操作时的条件 */ private final Condition termination = mainLock.newCondition(); /** * 跟踪线程池同时存在的最大工作线程数 * Accessed only under mainLock. */ private int largestPoolSize; /** * 线程池完成的任务数,只在工作者线程退出时更新 * Accessed only under mainLock. */ private long completedTaskCount; /** * 任务队列, * 1)如果工作者线程允许过期,则使用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 读取任务 * 2)否则使用 workQueue.take() 读取任务 */ private final BlockingQueue workQueue; /** * 添加工作者线程、关闭线程池、读取统计数据等操作中使用的互斥锁 */ private final ReentrantLock mainLock = new ReentrantLock(); /** * 线程池中的工作者线程集合,只有在持有 mainLock 时才能访问 */ private final HashSet workers = new HashSet<>(); /** * 执行 awaitTermination 操作时的条件 */ private final Condition termination = mainLock.newCondition(); /** * 跟踪线程池同时存在的最大工作线程数 * Accessed only under mainLock. */ private int largestPoolSize; /** * 线程池完成的任务数,只在工作者线程退出时更新 * Accessed only under mainLock. */ private long completedTaskCount; /** * 创建工作者线程的工厂,工作者线程创建失败会导致任务丢失 */ private volatile ThreadFactory threadFactory; /** * 线程池满载或关闭过程中,任务被拒绝时的处理器 */ private volatile RejectedExecutionHandler handler; /** * 空闲工作者线程的超时时间,以纳秒为单位。 * 1)当前工作者线程数 > 核心线程数 * 2)允许核心工作者线程超时 allowCoreThreadTimeOut=true */ private volatile long keepAliveTime; /** * 默认为 false,即使超时了,核心工作者线程也不会退出 */ private volatile boolean allowCoreThreadTimeOut; /** * 核心工作者线程数 */ private volatile int corePoolSize; /** * 最大工作者线程数 */ private volatile int maximumPoolSize; /** * 默认的拒绝处理器 */ private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), ThreadPoolExecutor.defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, ThreadPoolExecutor.defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } /** * 使用指定的初始化参数创建一个 ThreadPoolExecutor 实例 * * @param corePoolSize 核心工作者线程所 * @param maximumPoolSize 最大工作者线程数 * @param keepAliveTime 工作者线程存活时间 * @param unit 时间单位 * @param workQueue 工作队列 * @param threadFactory 创建工作者线程的线程工厂 * @param handler 拒绝处理器 */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { /** * 必须保证 * corePoolSize >=0 * maximumPoolSize > 0 * maximumPoolSize > corePoolSize * keepAliveTime > 0 表示工作者线程可超时退出 * keepAliveTime = 0 表示不可退出 */ if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) { throw new IllegalArgumentException(); } if (workQueue == null || threadFactory == null || handler == null) { throw new NullPointerException(); } acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } 可提交的任务类型 Runnable 接口无返回值并且不能显示抛出异常。 Callable 接口有返回值,并且能显示抛出异常。 @FunctionalInterface public interface Runnable { void run(); } @FunctionalInterface public interface Callable { /** * 计算并返回的一个结果,如果计算失败,则抛出异常 */ V call() throws Exception; } 执行一个 Runnable 任务,无返回值 /** * 往线程池提交一个 Runnable 任务, * 如果线程池已满或线程池关闭则,该任务会交给拒绝处理器处理。 */ @Override public void execute(Runnable command) { if (command == null) { throw new NullPointerException(); } // 读取控制变量 int c = ctl.get(); // 1)线程池工作线程数 < 核心线程数 if (ThreadPoolExecutor.workerCountOf(c) < corePoolSize) { // 尝试创建一个新的工作者线程来处理这个任务 if (addWorker(command, true)) { // 创建成功则直接返回 return; } // 创建失败,则重新读取控制变量 c = ctl.get(); } /** * 2)当前工作者线程数 >= 核心工作者线程 * && 线程池处于运行状态 * && 尝试向工作者队列中提交任务 */ if (ThreadPoolExecutor.isRunning(c) && workQueue.offer(command)) { // 重新读取控制变量 final int recheck = ctl.get(); // 1)如果线程池已经停止运行,则将目标任务从任务队列中移除,并尝试终止线程池 if (! ThreadPoolExecutor.isRunning(recheck) && remove(command)) { // 执行拒绝处理器 reject(command); // 2)如果已经没有可用的工作者线程 } else if (ThreadPoolExecutor.workerCountOf(recheck) == 0) { // 尝试添加一个新的工作者线程 addWorker(null, false); } } /** * 3)当前工作者线程数 >= 核心工作者线程 * && 工作队列已满 * && 尝试增加一个新的工作者线程来处理该任务 */ else if (!addWorker(command, false)) { // 任务处理失败,则交给拒绝处理器处理 reject(command); } } /** * 读取线程池的工作线程数 */ private static int workerCountOf(int c) { return c & ThreadPoolExecutor.COUNT_MASK; } /** * 尝试增加一个核心工作者线程来处理这个任务 */ private boolean addWorker(Runnable firstTask, boolean core) { retry: for (int c = ctl.get();;) { /** * 1)线程池状态在 STOP 及以上【线程池已经停止】 * 2)线程池正在停止,并且提交任务不为 null || 工作队列为空 * 则创建失败 */ if (ThreadPoolExecutor.runStateAtLeast(c, ThreadPoolExecutor.SHUTDOWN) && (ThreadPoolExecutor.runStateAtLeast(c, ThreadPoolExecutor.STOP) || firstTask != null || workQueue.isEmpty())) { return false; } for (;;) { /** * 1)工作者线程数已经 >= 核心线程数【任务队列未满时】 * 2)工作者线程数已经 >= 最大线程数【任务队列已满时】 * 则创建失败 */ if (ThreadPoolExecutor.workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & ThreadPoolExecutor.COUNT_MASK)) { return false; } // 尝试递增工作者线程数 if (compareAndIncrementWorkerCount(c)) { // 如果计数值递增成功,则将正式添加工作者线程来处理任务 break retry; } // 如果其他线程优先递增了计数值,则重新读取计数值进行重试 c = ctl.get(); // Re-read ctl // 线程池正在关闭,则重新进入循环后将直接退出 if (ThreadPoolExecutor.runStateAtLeast(c, ThreadPoolExecutor.SHUTDOWN)) { continue retry; // else CAS failed due to workerCount change; retry inner loop } } } // 工作者线程是否已经启动 boolean workerStarted = false; // 工作者线程是否已经添加到集合中 boolean workerAdded = false; Worker w = null; try { // 创建工作者线程 w = new Worker(firstTask); // 读取线程对象 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { /** * Recheck while holding lock. Back out on ThreadFactory failure or if shut down before lock acquired. * 读取控制变量再次进行校验 */ final int c = ctl.get(); /** * 1)线程池处于运行状态 * 2)线程池处于关闭状态 && 提交任务为 null */ if (ThreadPoolExecutor.isRunning(c) || ThreadPoolExecutor.runStateLessThan(c, ThreadPoolExecutor.STOP) && firstTask == null) { // 工作者线程已经启动 if (t.isAlive()) { throw new IllegalThreadStateException(); } // 将工作者线程添加到集合中 workers.add(w); // 如果当前当前工作者线程数 > largestPoolSize,则更新它 final int s = workers.size(); if (s > largestPoolSize) { largestPoolSize = s; } // 工作者线程添加成功 workerAdded = true; } } finally { mainLock.unlock(); } // 如果添加成功,则启动工作者线程 if (workerAdded) { t.start(); // 工作者线程启动成功 workerStarted = true; } } } finally { // 如果工作者线程启动失败,则进行回退和清理 if (!workerStarted) { addWorkerFailed(w); } } return workerStarted; } // 运行状态 c 小于指定状态 s private static boolean runStateLessThan(int c, int s) { return c < s; } // 运行状态 c 大于等于指定状态 s private static boolean runStateAtLeast(int c, int s) { return c >= s; } /** * 尝试原子的将工作者线程数 +1 */ private boolean compareAndIncrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect + 1); } /** * Rolls back the worker thread creation. * - removes worker from workers, if present * - decrements worker count * - rechecks for termination, in case the existence of this * worker was holding up termination */ private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 1)从 workers 集合中移除工作者 w if (w != null) { workers.remove(w); } // 递减总工作者线程数 decrementWorkerCount(); // 尝试进行线程池终止 tryTerminate(); } finally { mainLock.unlock(); } } /** * 将工作者线程总数递减 1 */ private void decrementWorkerCount() { ctl.addAndGet(-1); } final void tryTerminate() { for (;;) { final int c = ctl.get(); /** * 1)线程池在运行状态【RUNNING】 * 2)线程池在执行清理操作【TIDYING】 * 3)线程池正在停止【SHUTDOWN】并且工作队列非空 * 直接返回 */ if (ThreadPoolExecutor.isRunning(c) || ThreadPoolExecutor.runStateAtLeast(c, ThreadPoolExecutor.TIDYING) || ThreadPoolExecutor.runStateLessThan(c, ThreadPoolExecutor.STOP) && ! workQueue.isEmpty
50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信