在并行任务启动后,强制性地从并行任务得到反馈。

假想有一个程序,可以发送批邮件,还使用了多线程机制。你想知道有多少邮件成功发送吗?你想知道在实际发送过程期间,这个批处理工作的实时进展吗?

要实现多线程的这种反馈,我们可以使用Callable接口。此接口的工作方式基本上与Runnable相同,但是执行方法(call())会返回一个值,该值反映了执行计算的结果。
 

 

package com.ricardozuasti;  
  1.   
  2. import java.util.concurrent.Callable;  
  3.   
  4. public class FictionalEmailSender implements Callable<Boolean>{  
  5.     private String to;  
  6.     private String subject;  
  7.     private String body;  
  8.     public FictionalEmailSender(String to, String subject, String body){  
  9.         this.to = to;  
  10.         this.subject = subject;  
  11.         this.body = body;  
  12.     }  
  13.   
  14.     @Override  
  15.     public Boolean call() throws InterruptedException {  
  16.         // 在0~0.5秒间模拟发送邮件   
  17.         Thread.sleep(Math.round(Math.random()*0.5*1000));  
  18.         // 假设我们有80%的几率成功发送邮件   
  19.         if(Math.random()>0.2){  
  20.             return true;  
  21.         }else{  
  22.             return false;  
  23.         }  
  24.     }  
  25.       
  26. }  


注意:Callable接口可用于返回任意数据类型,因此我们的任务可以返回我们需要的任何信息。
 

 

现在,我们使用一个线程池ExecutorService来发送邮件,由于我们的任务是以Callable接口实现的,我们提交执行的每个新任务,都会得到一个Future引用。注意我们要使用直接的构造器创建ExecutorService,而不是使用来自Executors的工具方法创建。这是因为使用指定类ThreadPoolExecutor提供了一些方法可以派上用场。
 

 

package com.ricardozuasti;  
  1.   
  2. import java.util.concurrent.Future;  
  3. import java.util.concurrent.LinkedBlockingQueue;  
  4. import java.util.concurrent.ThreadPoolExecutor;  
  5. import java.util.concurrent.TimeUnit;  
  6. import java.util.ArrayList;  
  7. import java.util.List;  
  8.   
  9. public class Concurrency2 {  
  10.     public static void main(String[] args){  
  11.         try{  
  12.             ThreadPoolExecutor executor = new ThreadPoolExecutor(30301,  
  13.      TimeUnit.SECONDS, new LinkedBlockingQueue());  
  14.             List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(9000);  
  15.             // 发送垃圾邮件, 用户名假设为4位数字   
  16.             for(int i=1000; i<10000; i++){  
  17.                 futures.add(executor.submit(new FictionalEmailSender(i+"@sina.com",  
  18.                         "Knock, knock, Neo""The Matrix has you...")));  
  19.             }  
  20.             // 提交所有的任务后,关闭executor   
  21.             System.out.println("Starting shutdown...");  
  22.             executor.shutdown();  
  23.               
  24.             // 每秒钟打印执行进度   
  25.             while(!executor.isTerminated()){  
  26.                 executor.awaitTermination(1, TimeUnit.SECONDS);  
  27.                 int progress = Math.round((executor.getCompletedTaskCount()  
  28. *100)/executor.getTaskCount());  
  29.                 System.out.println(progress + "% done (" +   
  30. executor.getCompletedTaskCount() + " emails have been sent).");  
  31.             }  
  32.             // 现在所有邮件已发送完, 检查futures, 看成功发送的邮件有多少   
  33.             int errorCount = 0;  
  34.             int successCount = 0;  
  35.             for(Future<Boolean> future : futures){  
  36.                 if(future.get()){  
  37.                     successCount++;  
  38.                 }else{  
  39.                     errorCount++;  
  40.                 }  
  41.             }  
  42.             System.out.println(successCount + " emails were successfully sent, but " +  
  43.                     errorCount + " failed.");  
  44.         }catch(Exception ex){  
  45.             ex.printStackTrace();  
  46.         }  
  47.     }  
  48. }