从零开始学多线程之线程池(五)

单线程的缺点&使用多线程的好处 围绕执行任务来管理应用程序时,第一步要指明一个清晰的任务边界(task boundaries).理想情况下,任务是独立的活动:它的工作并不依赖于其他任务的状态、结果或者边界效应.独立有利于并发性,如果能得到相应的处理器资源,独立的任务还可以并行执行. 在正常的负载下,服务器应用程序应该兼具良好的吞吐量和快速的响应性.应用程序提供商希望程序支持尽可能多的用户,所以会努力降低每个用户的开销;而用户希望尽快获得响应.进一步讲,应用程序应该在负荷过载时平缓地劣化,而不应该负载一高就简单地以失败告终.为了达到这些目的,你要选择一个清晰的任务边界,并配合一个明确的任务执行策略. 大多数服务器应用程序都选择了下面这个自然的任务边界:单独的客户请求.将独立的请求作为任务边界,可以让任务兼顾独立性和适当的大小. 单线程的缺点: 对于一个单线程化的服务器,阻塞不仅仅延迟了当前请求的完成,而且还完全阻止了需要被处理的等待请求.如果请求阻塞的时间过长,用户将看不到响应,可能任务服务器已经不可用了.同时,单线程在等待它的I/O操作时,CPU会处于闲置状态,因此导致了资源利用率非常低. 顺序化处理几乎不能为服务器应用程序提供良好的吞吐量或快速的响应性.不过也有特例,比如当任务的数量很少但生命周期很长时,或者当服务器只服务于唯一的用户时,服务器在同一时刻只需同时处理一个请求---但是大多数服务器应用程序都不是以这种方式工作的. 使用多线程编程,可以提高响应性,避免上述的缺点. 复制代码 public static void main(String[] args) { //要执行的任务 Runnable r = new Runnable() { @Override public void run() { //do something } }; //创建一个新的线程 Thread t = new Thread(r); //执行任务 t.start(); } 复制代码 多线程的好处&注意事项: 1. 由新创建的线程去执行任务,主线程可以更迅速的开始下一个任务.从而提高了响应性 2. 并行处理任务,这使得多个任务可以同时得到处理,如果有多个处理器,或者出于I/O未完成、锁请求以及资源可用性等任何因素需要阻塞任务时,程序的吞吐量会得到提高. 3. 任务处理代码必须是线程安全的,因为有多个任务会并发地调用它. 在中等强度的负载水平下,"每任务每线程(thread-per-taks)"方法是对顺序化执行的良好改进.只要请求的到达速度尚未超出 服务器的请求处理能力,那么这种方法可以同时的带来更快的响应性和更大的吞吐量, 无限制创建线程的缺点 1. 线程生命周期的开销.线程的创建与关闭不是"免费的".如果请求是频繁的且轻量的,就像大多数服务器程序一样,那么为每个请求创建一个新线程的做法就会消耗大量的计算资源. 2. 资源消耗量.活动线程会消耗系统资源,尤其是内存.如果可运行的线程数多于可用的处理器数,线程将会空闲.大量空闲线程占用更多内存,给垃圾回收器带来压力,而且大量线程在竞争CPU资源时,还会产生其他的性能开销.如果你已经有了足够多的线程保持所有CPU忙碌,那么再创建更多的线程是有百害而无一利的. 3. 稳定性.应该限制可创建线程的数目,否则可能会出现内存溢出错误. 凡事有度,在一定范围内,增加线程可以提高系统的吞吐量,一旦超出了这个范围,再创建更多的线程只会拖垮你的程序.创建过多的线程,会导致应用程序面临崩溃.为了摆脱这种危险,应该设置一个范围来限制你的应用程序可以创建的线程数,然后彻底地测试你的应用程序,确保即使线程数到达了这个范围的极限,程序也不至于耗尽所有的资源. "每任务每线程(thread-per-task)"方法的问题在于它没有对已创建线程的数量进行任何限制,除非对客户端能够抛出的请求速率进行限制.像其他的并发危险一样,无限制创建线程的行为可能在原型和开发阶段还能表现得运行良好,而当应用程序部署后,并运行于高负载下,它的问题才会暴露出来.所以一个恶意用户或者足够多的用户,都会使你的Web Server的负载超过某个确定的极限值,从而导致服务器的崩溃.对于一个服务器,我们希望它具有高可用性,而且在高负载下可以平缓地劣化,但是上面的问题对我们的目标是个严重的阻碍. 使用Executor框架规避每任务每线程的缺点 单线程和"每任务每线程"的局限性: 顺序执行会产生糟糕的响应性和吞吐量,"每任务每线程"可能会导致程序崩溃. 上一篇博客我们说到了如何用有界队列(BlockingQueue)防止应用程序过载而耗尽内存.线程池(Thread pool)为线程管理提供了同样的好处.在java中我们不是使用Thread来进行多线程的操作的,而是使用Executor. 复制代码 public interface Executor { void execute(Runnable command); } 复制代码 Executor基于生产者-消费者模式.提交任务的执行者是生成者(产生待完成的工作单元),执行任务的线程是消费者(消耗掉这些工作单元).如果要在你的程序中实现一个生产者-消费者的模式,使用Executor通常是最简单的方式. 复制代码 public class ExecutorTest { //可以把这个参数抽取到配置文件,增加灵活性 private static final int NUM = 100; // 创建了一个定长的线程池,100个线程 private static final Executor EXECUTOR = Executors.newFixedThreadPool(NUM); public static void main(String [] args){ for (int i = 0; i < 100; i++) { Runnable r = new Runnable() { @Override public void run() { //do someting. } }; EXECUTOR.execute(r); } } } 复制代码 我们通过使用Executor,将处理请求任务的提交与它的执行体进行了解耦.只要替换一个不同的Executor实现,就可以改变服务器的行为.改变Executor的实现或者配置,所产生的影响远远小于直接改变任务的执行方式. 可以扩展Executor,自定义自己的执行策略 每任务每线程: 复制代码 public class ThreadPerTaskExecutor implements Executor { @Override public void execute(Runnable command) { new Thread(command).start(); } } 复制代码 同步进行: 复制代码 public class WithinThreadExecutor implements Executor { @Override public void execute(Runnable command) { command.run(); } } 复制代码 无论何时挡你看到这种形式的代码: new Thread(runnable).start() 并且你可能最终希望获得一个更加灵活的执行策略时,请认真考虑使用Executor代替Thread 线程池的具体使用 线程池管理一个工作者线程的同构池(homogeneous pool).线程池是与工作队列(work queue)紧密绑定的.所谓工作队列,其作用是持有所有等待执行的任务.工作者线程的生活从此轻松起来:它从工作队列中获取下一个任务,执行它,然后回来继续等待下一个任务. 使用线程池的好处: 1.重用存在的线程,而不是创建新的线程,这可以在处理多请求时抵消县城创建、消亡产生的开销. 2.另外一个额外的好处就是,在请求到达时,工作者线程通常已经存在,用于创建线程的等待时间并不会延迟任务的执行,因此提高了响应性. 3.通过适当地调整线程池的大小,你可以得到足够多的线程以保持处理器忙碌,同时还可以 防止过多的线程相互竞争资源,导致应用程序耗尽内存而失败. 可以使用Executors静态工厂方法来创建一个线程池(已经帮你配置好了): 复制代码 public static void main(String [] args){ /* * 创建一个单线程化的executor,它只创建唯一的工作者线程来执行任务 * 如果这个线程异常结束,会有另一个取代它. * 因为是单线程所以可以保证任务依照队列所规定的的顺序(FIFO,LIFO,优先级)执行 * */ Executor singleThreadExecutor = Executors.newSingleThreadExecutor(); /* * 从构造方法传入参数,创建一个定长的线程池,每当提交一个任务就创建一个线程, * 直到达到池的最大长度,这时线程池保持长度不在变化. * 如果一个线程由于非预期的Exception而结束,线程池会补充一个新的线程 * */ Executor fixedThreadExecutor = Executors.newFixedThreadPool(100); /* * 这个是功能最强的,创建一个可缓存的线程池,如果当前线程池的长度超过了处理的 * 需要时,它可以灵活地回收空闲的线程,当需求增加时,它可以灵活地添加新的线程, * 而不会对池的长度做任何限制 * */ Executor cachedThreadExecutor = Executors.newCachedThreadPool(); /* * 创建一个定长的线程池,而且支持定时的以及周期性的任务执行,类似于Timer(定时任务, * 取代了Timer) * */ Executor scheduledThreadExecutor = Executors.newScheduledThreadPool(100); /* * 上面的工厂方法的内部实现,用的都是这个,只不过参数配置的不同. * 当我们需要配置个性化的线程池的时候,可以自己进行配置(阿里巴巴规约推荐你自己进行配置) * */ Executor executor = new ThreadPoolExecutor(1,1,1L,TimeUnit.SECONDS,new ArrayBlockingQueue(100)); } } 复制代码 从"每线程每任务"策略迁移到基于池的策略,会对应用程序的稳定性产生重大的影响:Web Server 再也不会因过高的负载失败了. 使用ExecutorService关闭线程池 Executor如果没有关闭那么JVM也无法结束:Executor通常只是为执行任务而创建线程.但是JVM会在所有(非后台的,nondaemon)线程全部终止后才退出.因此,如果无法正确关闭Executor,将会阻止JVM的结束. 为了解决这个执行服务的生命周期问题,ExecutorService接口扩展了Executor,并且添加了一些用于生命周期管理的方法(同时还有一些用于任务提交的便利方法). 复制代码 public interface ExecutorService extends Executor { //关闭服务用的方法们... void shutdown(); List shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; // 任务提交的便利方法.. Future submit(Callable task); Future submit(Runnable task, T result); Future submit(Runnable task); List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException; T invokeAny(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; } 复制代码 ExecutorService暗示了生命周期有3种状态:运行(running)、关闭(shutting down)和终止(terminated).ExecutorServie最初创建后的初始状态是运行状态.shutdown方法会启动一个平缓地关闭过程:停止接受新的任务,同时等待已经提交的任务完成----包括尚未开始执行的任务. shutdownNow方法会启动一个强制的关闭过程:尝试取消所有运行中的任务和排在队列中尚未开始的任务. 在关闭后提交到ExecutorService中的任务,会被拒绝执行处理器器(rejected execution handler)处理.拒绝执行处理器(拒绝执行处理器是ExecutorService的一种实现.ThreadPoolExecutor提供的,ExecutorService接口中的方法并不提供拒绝执行处理器.) 可能只是简单地放弃任务,也可能会引起execute抛出一个未检查的RejectedExecutionException.一旦所有的任务全部完成后,ExecutorService会转入终止状态.你可以调用awaitTermination等待ExecutorService到达终止状态.也可以轮询检查isTerminated判断ExecutorService是否已经终止.通常shutdown会紧随awaitTermination之后,这样可以产生同步地关闭ExecutorService的效果. 简单的测试代码: 复制代码 public class ExecutorServiceTest { private final static ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(10); public void start(){ //根据线程池的中断状态执行任务 while (!EXECUTOR_SERVICE.isShutdown()){ try { Runnable r = new Runnable(){ @Override public void run() { // System.out.println("执行着呢"); } }; EXECUTOR_SERVICE.execute(r); } catch (RejectedExecutionException e) { e.printStackTrace(); if (!EXECUTOR_SERVICE.isShutdown()){ System.out.println("捕获了结束异常..."); } } } } /* * 终止现场城池 * */ public void stop(){ EXECUTOR_SERVICE.shutdown(); } public static void main(String [] args) throws InterruptedException { ExecutorServiceTest executorServiceTest = new ExecutorServiceTest(); //创建一个新线程,3秒后停止线程池 new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(3000); executorServiceTest.stop(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); executorServiceTest.start(); } } 复制代码 定时任务 Timer工具管理任务的延迟执行("100ms后执行该任务")以及周期执行("每10ms执行一次任务").但是Timer存在一些缺陷.应该考虑使用ScheduledThreadPoolExecutor替代Timer. Timer的缺点: 1. 时间准确性:Timer只创建唯一的线程来执行所有timer任务.如果一个timer任务很耗时,会导致其他TimerTask的时效准确性出问题.例如一个TimerTask每10ms执行一次,而另一个TimerTask每40ms执行一次,重复出现的任务要么会在耗时的任务完成后快速连续调用4次,要么完全"丢失"4次调用(取决于它是否按照固定的频率或延迟执行调度).调度线程池(Scheduled thread pool)解决了这个缺陷,它让你可以提供多个线程来执行延迟、并具周期性的任务. 2.线程被意外终止不会再启动:Timer的另一个问题在于,如果TimerTask抛出未检查的异常,Timer将会产生无法预料的行为.Timer线程并不捕获异常,所以TimerTask抛出的未检查的异常会终止timer线程.这种情况下Timer也不会在重新恢复线程的执行了:它错误地任务整个Timer都被取消了.此时,已经背安排但尚未执行的TimerTask永远不会再执行了,新的任务也不能被调度了.(这个问题叫线程泄露) 复制代码 public class OutOfTime { static class ThrowTask extends TimerTask { @Override public void run() { System.out.println("执行了任务."); throw new RuntimeException(); } } public static void main(String [] args) throws InterruptedException { //创建一个timer Timer timer = new Timer(); //延迟一秒执行执行任务. timer.schedule(new ThrowTask(),1); Thread.sleep(1000); //延迟三秒执行任务 timer.schedule(new ThrowTask(),3); } } 复制代码 打印输出: 执行了任务. Exception in thread "Timer-0" java.lang.RuntimeException at cn.bj.lbr.test.chap6.OutOfTime$ThrowTask.run(OutOfTime.java:19) at java.util.TimerThread.mainLoop(Timer.java:555) at java.util.TimerThread.run(Timer.java:505) Exception in thread "main" java.lang.IllegalStateException: Timer already cancelled. at java.util.Timer.sched(Timer.java:397) at java.util.Timer.schedule(Timer.java:193) at cn.bj.lbr.test.chap6.OutOfTime.main(OutOfTime.java:31) 可以看到"执行了任务"就输出了一次,后面的任务在抛出异常后没有继续执行. 再次重申: 应该使用ScheduledThreadPoolExecutor来替换Timer. 可携带结果的任务:Callable和Future Callable就是一个可以有返回值的Runnable. 复制代码 @FunctionalInterface public interface Callable { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; } 复制代码 Executors包含了一些工具方法,可以把其他类型的任务封装成一个Callable,比如Runnable和 java.security.PrivilegedAction. 一个Executor执行的任务的生命周期有四个阶段:创建、提交、开始和完成.由于任务的执行可能会话费很长时间,我们也希望可以取消一个任务.在Executor框架中,总可以取消已经提交但尚未开始的任务,但是对于已经开始的任务,只有它们响应中断,才可以取消.取消一个已经完成的任务没有影响. Future描述了任务的生命周期,并提供了相关的方法来获得任务的结果、取消任务以及校验任务是否已经完成还是被取消.Future的归约中暗示了任务的生命周期是单向的,不能后退--就像ExecutorService的生命周期一样,一旦任务完成,他就永远停留在完成状态上了. 任务的状态(尚未开始,运行中,完成)决定了get方法的行为,如果任务已经完成,get会立即返回或者抛出一个Excption,如果任务没有完成,get会阻塞直到它完成.如果任务抛出了异常,get会将该异常封装为ExecutionException,然后重新抛出;如果任务被取消,get会抛出CancellationException.当抛出ExecutionException时,可以用getCause重新获得被封装的原始异常. 有很多种方法可以创建一个描述任务的Future.ExecutorService中的所有submit方法都返回一个Future,因此你可以将一个Runnable或一个Callable提交给executor,然后得到一个Future,用它来重新获得任务执行的结果,或取消任务. 复制代码 public static void main(String [] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); Callable callable = new Callable() { @Override public String call() throws Exception { return "返回callable的结果"; } }; Future future = executorService.submit(callable); future.get(); //取消任务 //future.cancel(true); } 复制代码 你也可以显示地为给定的Runnable或Callable实例化一个FutureTask.(FutureTask实现了Runnable,所以既可以将它提交给Executor来执行,又可以直接调用run方法运行). 复制代码 //创建一个callable Callable callable = new Callable() { @Override public String call() throws Exception { return "返回callable的结果"; } }; //包装进一个FutureTask FutureTask futureTask = new FutureTask(callable); //必须run否则会阻塞.. futureTask.run(); //获得结果.. String s = futureTask.get(); System.out.println("s = " + s); 复制代码 ExecutorService.submit()之所以能返回FutureTask是通过调用内部的newTaskFor方法,把Runnable或Callable包装成一个FutureTask. AbstractExecutorService,ExecutorService的实现类: 复制代码 protected RunnableFuture newTaskFor(Runnable runnable, T value) { return new FutureTask(runnable, value); } protected RunnableFuture newTaskFor(Callable callable) { return new FutureTask(callable); } 复制代码 将Runnable或Callable提交到Executor的行为,可以建立一个安全发布(可以看第二篇博客),以保证Runnable或Callable从提交线程暴露到最终执行任务的线程的过程是线程安全的.类似地,设置Future结果值的行为,也可以简历一个安全发布,以保证这个结果从计算它的线程暴露到通过get重获他的任何线程都是线程安全的.示例: 复制代码 ExecutorService executorService = Executors.newCachedThreadPool(); //将对象声明为final的,保证线程传递对象的线程安全性. //将对象声明为final的,是安全发布的一种形式. final List list = new ArrayList<>(); Callable callable = new Callable() { @Override public List call() throws Exception { List newList = new ArrayList<>(); for (Object o : list) { newList.add(o); } return newList; }
50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信