java异步编程降低延迟

 目录

java异步编程降低延迟

在平时开发的过程中,其实有很多可以采用多线程优化的地方,像ExecutorService、CompletionService、CompletableFuture和并行流等类,只是没有去注意,这里总结下日常工作中常用的一些方法。

一、ExecutorService和CompletionService

基本的execute和submit方法

这个其实没有太多好说的,因为这个是最基本的,基本使用线程池的都会使用到这个方法,主要用于异步执行任务,submit和execute的区别就在于,submit有一个方法的回执,可以利用这个Future对这个任务的生命周期进行干预。

invokeAll和invokeAny方法

很多人没有注意到这两个方法,这两个方法其实也是非常有用的,例如你有很多可以并行执行的操作投递到线程池,执行完之后就挨个调用Future的get获取结果最后生成结果,这两个步骤其实就是invokeAll已经封装好的。他的内部实现也很简单和你手动取每个值是一样的,这个方法只会到所有任务执行完毕或者设定的时间超时了才会返回。实现非常简单:

 for (int i = 0, size = futures.size(); i < size; i++) {                 Future<T> f = futures.get(i);                 if (!f.isDone()) {                     try {                         f.get();                     } catch (CancellationException ignore) {                     } catch (ExecutionException ignore) {                     } } 

invokeAny方法稍微比invokeAll复杂些,内部是基于ExecutorCompletionService实现的。如果有一个任务返回了就直接返回结果,如果第一个完成的任务抛出了异常那么这个方法会抛出对应的异常。

CompletionService

这个类名中文翻译就是完成服务,这个类组合了ExecutorService,实现逻辑非常简单,内部存放了一个阻塞队列,当投递的任务完成时会将对应的Future放入这个阻塞队列,这样就可以做到投递的任务在完成的顺序依次放入阻塞队列。这就是上面invokeAny实现利用主要逻辑。利用阻塞队列的poll和take方法,在第一个返回时就取消剩余的任务。
虽然invokeAny已经封装了CompletionService的逻辑但是有些场景这个类还是很有用的。比如现在我想要得到一个最先完成的但是没有抛出异常的,这种情况下我们就需要写一个类似于invokeAny的例子。jdk注释中给出了例子:

void solve(Executor e,             Collection<Callable<Result>> solvers)      throws InterruptedException {      CompletionService<Result> ecs          = new ExecutorCompletionService<Result>(e);      int n = solvers.size();      List<Future<Result>> futures          = new ArrayList<Future<Result>>(n);      Result result = null;      try {          for (Callable<Result> s : solvers)              futures.add(ecs.submit(s));          for (int i = 0; i < n; ++i) {              try {                  Result r = ecs.take().get();                  if (r != null) {                      result = r;                      break;                  }              } catch (ExecutionException ignore) {}          }      }      finally {          for (Future<Result> f : futures)              f.cancel(true);      }       if (result != null)          use(result);  }

这个类也很容易想到一个场景,我有很多任务是可以并发执行了,这时可以使用invokeAll,但是让必须等到所有的任务执行完毕才能返回,这时如果有一个任务被io阻塞了很慢将会导致整个方法阻塞。如果是利用CompletionService的话,因为他是按照任务的完成顺序往队列里放,所以我们可以全部提交后,利用他的poll或者take方法遍历任务,先完成的任务返回就可以直接消费。

Future

讲到这里我觉得有必要提一下Future,因为线程池中投递任务submit方法均为返回Future这个对象。Future你可以把它理解成对这个任务的建模,你得这个对象可以利用这个对象来管理任务的生命周期,例如get方法获取结果,cancel来取消这个任务,以及isDnoe来判断任务是否取消等。api没有什么难理解的地方,主要是取消任务这一块需要结合中断来理解,cancel参数的Boolean值就是说能不能给这个任务发中断,如果可以他内部实际就是通过中断来停止任务,需要用户代码响应中断。FutureTask中的cancel源码如下:

if (mayInterruptIfRunning) {     try {         Thread t = runner;         if (t != null)             t.interrupt();     } finally { // final state         UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);     } }

二、CompletableFuture(重要)

上面简单说了下Future,Future是jdk5.0就已经引进的,但是他的能力非常的弱,主要是缺少了一个回调的机制,很多框架都基于它提供了增强版像guava的ListenableFuture和spring中的ListenableFuture。直到java8出现了CompletableFuture才弥补了jdk的这个特性。
可能很多人没有注意到这个类,因为平时没关注这方面,其实如果好好的学习下这个类就会发现这个类的功能非常强大,和stream类似的设计思想,使用非常简洁。可以基于教程好好研究一下,这里介绍下常用的操作。

在以前我们投递到线程中任务返回的Future中,我们只能实现一些简单的轮询,取消等api。如果现在有这样的一些类似的需求:

执行一个任务,当任务执行完的时候执行一个动作(相当于任务执行完触发回调)

    CompletableFuture.runAsync(() -> System.out.println("hello word")).whenComplete((aVoid, throwable) -> System.out.println("任务完成"));

任务执行完的时候在发起另外一个任务(这里是有顺序性的,第二个依赖于第一个任务)

CompletableFuture.supplyAsync(() -> 12).thenApply(Function.identity()).thenAccept(System.out::println);

同时执行多个任务,当全部完成的时候执行一个动作

Integer join = CompletableFuture.supplyAsync(() -> {             try {                 Thread.sleep(3000);             } 
                        
关键字:
50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信