Java并发之线程池ThreadPoolExecutor源码分析学习

线程池学习 以下所有内容以及源码分析都是基于JDK1.8的,请知悉。 我写博客就真的比较没有顺序了,这可能跟我的学习方式有关,我自己也觉得这样挺不好的,但是没办法说服自己去改变,所以也只能这样想到什么学什么了。 ​ 池化技术真的是一门在我看来非常牛逼的技术,因为它做到了在有限资源内实现了资源利用的最大化,这让我想到了一门课程,那就是运筹学,当时在上运筹学的时候就经常做这种类似的问题。 ​ 言归正传吧,我接下来会进行一次线程池方面知识点的学习,也会记录下来分享给大家。 线程池的内容当中有涉及到AQS同步器的知识点,如果对AQS同步器知识点感觉有点薄弱,可以去看我的上一篇文章。 线程池的优势 ​ 既然说到线程池了,而且大多数的大牛也都会建议我们使用池化技术来管理一些资源,那线程池肯定也是有它的好处的,要不然怎么会那么出名并且让大家使用呢? ​ 我们就来看看它究竟有什么优势? 资源可控性:使用线程池可以避免创建大量线程而导致内存的消耗 提高响应速度:线程池地创建实际上是很消耗时间和性能的,由线程池创建好有任务就运行,提升响应速度。 便于管理:池化技术最突出的一个特点就是可以帮助我们对池子里的资源进行管理。由线程池统一分配和管理。 线程池的创建 ​ 我们要用线程池来统一分配和管理我们的线程,那首先我们要创建一个线程池出来,还是有很多大牛已经帮我们写好了很多方面的代码的,Executors的工厂方法就给我们提供了创建多种不同线程池的方法。因为这个类只是一个创建对象的工厂,并没有涉及到很多的具体实现,所以我不会过于详细地去说明。 ​ 老规矩,还是直接上代码吧。 public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); } 这里也就举出一个方法的例子来进行之后的讲解吧,我们可以看出,Executors只是个工厂而已,方法也只是来实例化不同的对象,实际上实例化出来的关键类就是ThreadPoolExecutor。现在我们就先来简单地对ThreadPoolExecutor构造函数内的每个参数进行解释一下吧。 corePoolSize(核心线程池大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,当任务数大于核心线程数的时候就不会再创建。在这里要注意一点,线程池刚创建的时候,其中并没有创建任何线程,而是等任务来才去创建线程,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法 ,这样才会预先创建好corePoolSize个线程或者一个线程。 maximumPoolSize(线程池最大线程数):线程池允许创建的最大线程数,如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是,如果使用了无界队列,此参数就没有意义了。 keepAliveTime(线程活动保持时间):此参数默认在线程数大于corePoolSize的情况下才会起作用, 当线程的空闲时间达到keepAliveTime的时候就会终止,直至线程数目小于corePoolSize。不过如果调用了allowCoreThreadTimeOut方法,则当线程数目小于corePoolSize的时候也会起作用. unit(keelAliveTime的时间单位):keelAliveTime的时间单位,一共有7种,在这里就不列举了。 workQueue(阻塞队列):阻塞队列,用来存储等待执行的任务,这个参数也是非常重要的,在这里简单介绍一下几个阻塞队列。 ArrayBlockingQueue:这是一个基于数组结构的有界阻塞队列,此队列按照FIFO的原则对元素进行排序。 LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按照FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()就是使用了这个队列。 SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool()就使用了这个队列。 PriorityBlockingQueue:一个具有优先级的无阻塞队列。 handler(饱和策略);当线程池和队列都满了,说明线程池已经处于饱和状态了,那么必须采取一种策略来处理还在提交过来的新任务。这个饱和策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。共有四种饱和策略提供,当然我们也可以选择自己实现饱和策略。 AbortPolicy:直接丢弃并且抛出RejectedExecutionException异常 CallerRunsPolicy:只用调用者所在线程来运行任务。 DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。 DiscardPolicy:丢弃任务并且不抛出异常。 ​ 线程池的执行流程就用参考资料里的图介绍一下了,具体我们还是通过代码去讲解。 线程池流程.jpg 在上面我们简单的讲解了一下Executors这个工厂类里的工厂方法,并且讲述了一下创建线程池的一些参数以及它们的作用,当然上面的讲解并不是很深入,因为想要弄懂的话是需要持续地花时间去看去理解的,而博主自己也还是没有完全弄懂,不过博主的学习方法是先学了个大概,再回头来看看之前的知识点,可能会更加好理解,所以我们接着往下面讲吧。 ThreadPoolExecutor源码分析 ​ 在上面我们就发现了,Executors的工厂方法主要就返回了ThreadPoolExecutor对象,至于另一个在这里暂时不讲,也就是说,要学习线程池,其实关键的还是得学会分析ThreadPoolExecutor这个对象里面的源码,我们接下来就会对ThreadPoolExecutor里的关键代码进行分析。 AtomicInteger ctl ​ ctl是主要的控制状态,是一个复合类型的变量,其中包括了两个概念。 workerCount:表示有效的线程数目 runState:线程池里线程的运行状态 我们来分析一下跟ctl有关的一些源代码吧,直接上代码 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //用来表示线程池数量的位数,很明显是29,Integer.SIZE=32 private static final int COUNT_BITS = Integer.SIZE - 3; //线程池最大数量,2^29 - 1 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits //我们可以看出有5种runState状态,证明至少需要3位来表示runState状态 //所以高三位就是表示runState了 private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; } //用于存放线程任务的阻塞队列 private final BlockingQueue workQueue; //重入锁 private final ReentrantLock mainLock = new ReentrantLock(); //线程池当中的线程集合,只有当拥有mainLock锁的时候,才可以进行访问 private final HashSet workers = new HashSet(); //等待条件支持终止 private final Condition termination = mainLock.newCondition(); //创建新线程的线程工厂 private volatile ThreadFactory threadFactory; //饱和策略 private volatile RejectedExecutionHandler handler; CAPACITY 在这里我们讲一下这个线程池最大数量的计算吧,因为这里涉及到源码以及位移之类的操作,我感觉大多数人都还是不太会这个,因为我一开始看的时候也是不太会的。 private static final int CAPACITY = (1 << COUNT_BITS) - 1; 从代码我们可以看出,是需要1往左移29位,然后再减去1,那个1往左移29位是怎么计算的呢? 1 << COUNT_BITS ​ 1的32位2进制是 00000000 00000000 00000000 00000001 ​ 左移29位的话就是 00100000 00000000 00000000 00000000 ​ 再进行减一的操作 000 11111 11111111 11111111 11111111 ​ 也就是说线程池最大数目就是 000 11111 11111111 11111111 11111111 2.runState 正数的原码、反码、补码都是一样的 在计算机底层,是用补码来表示的 private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; RUNNING 可以接受新任务并且处理已经在阻塞队列的任务 高3位全部是1的话,就是RUNNING状态 -1 << COUNT_BITS 这里是-1往左移29位,稍微有点不一样,-1的话需要我们自己算出补码来 ​ -1的原码 10000000 00000000 00000000 00000001 ​ -1的反码,负数的反码是将原码除符号位以外全部取反 11111111 11111111 11111111 11111110 ​ -1的补码,负数的补码就是将反码+1 11111111 11111111 11111111 11111111 ​ 关键了,往左移29位,所以高3位全是1就是RUNNING状态 111 00000 00000000 00000000 00000000 SHUTDOWN 不接受新任务,但是处理已经在阻塞队列的任务 高3位全是0,就是SHUTDOWN状态 0 << COUNT_BITS ​ 0的表示 00000000 00000000 00000000 00000000 ​ 往左移29位 00000000 00000000 00000000 00000000 STOP 不接受新任务,也不处理阻塞队列里的任务,并且会中断正在处理的任务 所以高3位是001,就是STOP状态 1 << COUNT_BITS ​ 1的表示 00000000 00000000 00000000 00000001 ​ 往左移29位 00100000 00000000 00000000 00000000 TIDYING 所有任务都被中止,workerCount是0,线程状态转化为TIDYING并且调用terminated()钩子方法 所以高3位是010,就是TIDYING状态 2 << COUNT_BITS ​ 2的32位2进制 00000000 00000000 00000000 00000010 ​ 往左移29位 01000000 00000000 00000000 00000000 TERMINATED terminated()钩子方法已经完成 所以高3位是110,就是TERMINATED状态 3 << COUNT_BITS ​ 3的32位2进制 00000000 00000000 00000000 00000011 ​ 往左移29位 11000000 00000000 00000000 00000000 3.部分方法介绍 runStateOf(int c) 实时获取runState的方法 private static int runStateOf(int c) { return c & ~CAPACITY; } ~CAPACITY ~是按位取反的意思 &是按位与的意思 ​ 而CAPACITY是,高位3个0,低29位都是1,所以是 000 11111 11111111 11111111 11111111 ​ 取反的话就是 111 00000 00000000 00000000 00000000 ​ 传进来的c参数与取反的CAPACITY进行按位与操作 1、低位29个0进行按位与,还是29个0 2、高位3个1,既保持c参数的高3位 既高位保持原样,低29位都是0,这也就获得了线程池的运行状态runState workerCountOf(int c) 获取线程池的当前有效线程数目 private static int workerCountOf(int c) { return c & CAPACITY; } CAPACITY的32位2进制是 000 11111 11111111 11111111 11111111 ​ 用入参c跟CAPACITY进行按位与操作 1、低29位都是1,所以保留c的低29位,也就是有效线程数 2、高3位都是0,所以c的高3位也是0 ​ 这样获取出来的便是workerCount的值 ctlOf(int rs, int wc) 原子整型变量ctl的初始化方法 //结合这几句代码来看 private static final int RUNNING = -1 << COUNT_BITS; private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); ​ private static int ctlOf(int rs, int wc) { return rs | wc; } RUNNING是 111 00000 00000000 00000000 00000000 ​ ctlOf是将rs和wc进行按位或的操作 ​ 初始化的时候是将RUNNING和0进行按位或 0的32位2进制是 00000000 00000000 00000000 00000000 ​ 所以初始化的ctl是 111 00000 00000000 00000000 00000000 核心方法源码分析 execute(Runnable command)方法 public void execute(Runnable command) { //需要执行的任务command为空,抛出空指针异常 if (command == null) // 1 throw new NullPointerException(); /* *执行的流程实际上分为三步 *1、如果运行的线程小于corePoolSize,以用户给定的Runable对象新开一个线程去执行 * 并且执行addWorker方法会以原子性操作去检查runState和workerCount,以防止当返回false的 * 时候添加了不应该添加的线程 *2、 如果任务能够成功添加到队列当中,我们仍需要对添加的线程进行双重检查,有可能添加的线程在前 * 一次检查时已经死亡,又或者在进入该方法的时候线程池关闭了。所以我们需要复查状态,并有有必 * 要的话需要在停止时回滚入列操作,或者在没有线程的时候新开一个线程 *3、如果任务无法入列,那我们需要尝试新增一个线程,如果新建线程失败了,我们就知道线程可能关闭了 * 或者饱和了,就需要拒绝这个任务 * */ //获取线程池的控制状态 int c = ctl.get(); // 2 //通过workCountOf方法算workerCount值,小于corePoolSize if (workerCountOf(c) < corePoolSize) { //添加任务到worker集合当中 if (addWorker(command, true)) return; //成功返回 //失败的话再次获取线程池的控制状态 c = ctl.get(); } /* *判断线程池是否正处于RUNNING状态 *是的话添加Runnable对象到workQueue队列当中 */ if (isRunning(c) && workQueue.offer(command)) { // 3 //再次获取线程池的状态 int recheck = ctl.get(); //再次检查状态 //线程池不处于RUNNING状态,将任务从workQueue队列中移除 if (! isRunning(recheck) && remove(command)) //拒绝任务 reject(command); //workerCount等于0 else if (workerCountOf(recheck) == 0) // 4 //添加worker addWorker(null, false); } //加入阻塞队列失败,则尝试以线程池最大线程数新开线程去执行该任务 else if (!addWorker(command, false)) // 5 //执行失败则拒绝任务 reject(command); } 我们来说一下上面这个代码的流程: 1、首先判断任务是否为空,空则抛出空指针异常 2、不为空则获取线程池控制状态,判断小于corePoolSize,添加到worker集合当中执行, 如成功,则返回 失败的话再接着获取线程池控制状态,因为只有状态变了才会失败,所以重新获取 3、判断线程池是否处于运行状态,是的话则添加command到阻塞队列,加入时也会再次获取状态并且检测 ​ 状态是否不处于运行状态,不处于的话则将command从阻塞队列移除,并且拒绝任务 4、如果线程池里没有了线程,则创建新的线程去执行获取阻塞队列的任务执行 5、如果以上都没执行成功,则需要开启最大线程池里的线程来执行任务,失败的话就丢弃 有时候再多的文字也不如一个流程图来的明白,所以还是画了个execute的流程图给大家方便理解。 execute执行流程.jpg 2.addWorker(Runnable firstTask, boolean core) private boolean addWorker(Runnable firstTask, boolean core) { //外部循环标记 retry: //外层死循环 for (;;) { //获取线程池控制状态 int c = ctl.get(); //获取runState int rs = runStateOf(c); ​ // Check if queue empty only if necessary. /** *1.如果线程池runState至少已经是SHUTDOWN *2\. 有一个是false则addWorker失败,看false的情况 * - runState==SHUTDOWN,即状态已经大于SHUTDOWN了 * - firstTask为null,即传进来的任务为空,结合上面就是runState是SHUTDOWN,但是 * firstTask不为空,代表线程池已经关闭了还在传任务进来 * - 队列为空,既然任务已经为空,队列为空,就不需要往线程池添加任务了 */ if (rs >= SHUTDOWN && //runState大于等于SHUTDOWN,初始位RUNNING ! (rs == SHUTDOWN && //runState等于SHUTDOWN firstTask == null && //firstTask为null ! workQueue.isEmpty())) //workQueue队列不为空 return false; ​ //内层死循环 for (;;) { //获取线程池的workerCount数量 int wc = workerCountOf(c); //如果workerCount超出最大值或者大于corePoolSize/maximumPoolSize //返回false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //通过CAS操作,使workerCount数量+1,成功则跳出循环,回到retry标记 if (compareAndIncrementWorkerCount(c)) break retry; //CAS操作失败,再次获取线程池的控制状态 c = ctl.get(); // Re-read ctl //如果当前runState不等于刚开始获取的runState,则跳出内层循环,继续外层循环 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop //CAS由于更改workerCount而失败,继续内层循环 } } ​ //通过以上循环,能执行到这是workerCount成功+1了 //worker开始标记 boolean workerStarted = false; //worker添加标记 boolean workerAdded = false; //初始化worker为null Worker w = null; try { //初始化一个当前Runnable对象的worker对象 w = new Worker(firstTask); //获取该worker对应的线程 final Thread t = w.thread; //如果线程不为null if (t != null) { //初始线程池的锁 final ReentrantLock mainLock = this.mainLock; //获取锁 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. //获取锁后再次检查,获取线程池runState int rs = runStateOf(ctl.get()); ​ //当runState小于SHUTDOWN或者runState等于SHUTDOWN并且firstTask为null if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { //线程已存活 if (t.isAlive()) // precheck that t is startable //线程未启动就存活,抛出IllegalThreadStateException异常 throw new IllegalThreadStateException(); //将worker对象添加到workers集合当中 workers.add(w); //获取workers集合的大小 int s = workers.size(); //如果大小超过largestPoolSize if (s > largestPoolSize) //重新设置largestPoolSize largestPoolSize = s; //标记worker已经被添加 workerAdded = true; } } finally {
50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信