之前的两篇文章中,我们介绍了异步编程,也介绍了线程池的基本概念。也说了,线程池的实现天生也实现了异步任务,允许直接向线程池中进行任务的提交与结果获取。
但是,我们始终没有去深入的了解下,异步任务框架对于任务执行的进度是如何监控的,任务执行的结果该如何获取。
那么,本篇文章就来详细地探讨下异步框架中,关于任务执行过程中的一些状态以及执行结果反馈的相关细节。
传统的 Future 模式
我们说过,异步编程的一个好处是:
我只需要定义好任务,向 ExecutorService 中提交即可,而不用关心什么时候,什么线程在执行我们的任务。它会返回一个 Future 对象,我们通过他了解当前任务的执行细节。
Future 接口中定义了以下一些方法:
public interface Future<V> { //取消执行当前任务 boolean cancel(boolean mayInterruptIfRunning); //当前任务是否被取消了 boolean isCancelled(); //当前任务是否已经完成 boolean isDone(); //返回任务执行的返回结果,如果任务未完成 //将阻塞在 Future 内部队列上等待 V get() //新增超时限制 V get(long timeout, TimeUnit unit) }这五个方法,每一个都很重要,为我们监控任务的执行提供有力的支持。而我们的 ThreadPoolExecutor 使用的是 FutureTask 作为 Future 的实现类。
而我们也不妨看看这个 FutureTask 内部都有些哪些成员:

state 和它可取的这些值共同描述了当前任务的执行状态,是刚开始执行,还是正在执行中,还是正常结束,还是异常结束,还是被取消了,都由这个 state 来体现。

callable 代表当前正在执行的工作内容,这里说一下为什么只有 Callable 类型的任务,因为所有的 Runnable 类型任务都会被事先转换成 Callable 类型,我觉得主要是统一和抽象实现吧。
outcome 是任务执行结束的返回值,runner 是正在执行当前任务的线程,waiters 是一个简单的单链表,维护的是所有在任务执行结束之前尝试调用 get 方法获取执行结果的线程集合。当任务执行结束自当唤醒队列中所有的线程。
除此之外,还有一个稍显重要的方法,就是 run 方法,这个方法会在任务开始时由 ExecutorService 调用,这是一个很核心的方法,虽然方法体有点长,但是逻辑简单,我们大体上概括下。

- 如果任务已经开始将退出方法逻辑的执行
- 调度任务执行,调用 call 方法
- 调用成功将保存结果,异常则将保存异常信息
- 处理中断
这里需要额外去说一下,第三步中的 set 方法除了会将任务执行的返回结果设置到 FutureTask 的 outcome 字段上,还会调用 finishCompletion 方法完成任务的调用,尝试唤醒所有在等待任务执行结果的线程。
其他的方法就不去看了,也比较多,还算是简单的,如果有所想法,也欢迎你和我探讨交流。
那么,我们也来看一个最简单的应用示例:

我们向线程池提交了一个任务,这个任务的工作量不大,就是睡觉然后返回执行结果。而我们可以直接调用 get 方法去获取任务执行的结果,不过 get 方法是阻塞式的,一旦任务还未执行结束,当前线程将丢失 CPU 进而被阻塞到 Future 的内部队列上。
所以,推荐大家在 get 返回结果之前,先判断下目标任务是否已经执行结束,进而避免当前线程的阻塞唤醒所带来的代价。
到这里,相信你也一定看出来了,FutureTask 实现的 Future 的弊端在 get 方法,这个方法非异步,如果没有成功获取到任务的执行结果就将直接阻塞当前线程,以等待任务的执行完成。
但是,有一种情境,当我们向线程池中提交了很多任务,但是不清楚各个任务的执行效率,也就是不知道谁先执行结束,如果直接 get 某个未完成的任务,将导致当前线程阻塞等待。
那么我们能不能阻塞,直接获取已经执行结束的任务 Future,而未完成的任务不允许获取它的 Future?
使用 CompletionService
分析 CompletionService 之前,我们搬出之前分析过的一张类图:

左半边的类我们已经在前面的文章中都涉猎了,唯独落下了 CompletionService 这个接口,我们当时说以后会分析它的,现在我们来看看这个接口会给我们带来哪些能力。
首先,从类的继承体系上来看,CompletionService 并不与我们的 Executor 产生任何直接关系,线程池的实现也没有继承该接口。
实际上来说,CompletionService 只是利用了 Executor 乃至线程池为自己提供任务的提交与执行能力,而自己不过额外的维护一个队列,保存着所有已经完成的任务的 Future,以至于我们可以直接在外部调用 take 方法直接获取已完成的任务返回结果,无需阻塞。
废话不多说,我们写个小 demo,或许你会有更直接的体验:
==要求:使用多线程计算 1-10000 之间的总和==
==思路:分段计算,最后总和相加==
实现:


相信你运行后一定和我是同样的答案:50005000
可能很多人会有疑问,这段代码其实也没什么特别的地方啊,我使用基本的线程池不一样也能实现吗?
但是,实际上并没有那么简单,因为你不能确定哪个任务完成了,哪个还没有,所以你至少需要写五个循环自旋等待。
而如果你的运气不好,第一个任务特别慢,即便后续的任务已经结束了,主线程也依然由于第一个任务的结果拿不到而阻塞,耽误了对其他已完成任务的返回结果处理。
乍一看,你可能觉得差别不大,但仔细分析了才会发现,一旦任务量增大、增多,真的是「差之毫厘,谬以千里」。
