1、基本概念
除了顺序执行和并行执行的模型以外,还有异步模型,这是事件驱动模型的基础。异步活动的执行模型可以只有一个单一的主控制流,能在单核心系统和多核心系统中运行。
在并发执行的异步模型中,许多任务被穿插在同一时间线上,所有的任务都由一个控制流执行(单一线程)。任务的执行可能被暂停或恢复,中间的这段时间线程将会执行其他任务。大致如下:

如上图所示,任务(不同的颜色表示不同的任务)可能被其他任务插入,但是都处在同一个线程下。这表明当某一个任务执行的时候,其他任务都暂停了。与多线程编程模型很大的一点不同是,多线程的某个任务在时间线上什么时候挂起某个活动或恢复某个活动由系统决定,而在异步中,程序员必须假设线程可能在任何时间被挂起和替换。
程序员可以将任务编写成许多可以间隔执行的小步骤,如果一个任务需要另一个任务的输出,那么被依赖的任务必须接收它的输入。
2、使用Python的concurrent.futures模块
这个模块具有线程池和进程池、管理并行编程任务、处理非确定性的执行流程、进程/线程同步等功能。
此模块由一下部分组成:
- concurrent.futures.Executor:这是一个虚拟基类,提供了异步执行的方法。
- submit(function, argument):调度函数(可调用的对象)的执行,将argument作为参数传入。
- map(function, argument):将argument作为参数执行函数,以异步的方式。
- shutdown(Wait=True):发出让执行者释放所有资源的信号。
- concurrent.futures.Future:其中包括函数的异步执行。Future对象是submit任务(即带有参数的functions)到executor的实例。
Executor是抽象类,可以通过子类访问,即线程或进程的ExecutorPools。因为线程或进程的实例是依赖于资源的任务,所以最好以池的形式将他们组织在一起,作为可以重用的launcher和executor。
线程池和进程池是用于在程序中优化和简化线程/进程的使用。通过池可以提交任务给executor。池由两部分组成,一部分是内部的队列,存放着待执行的任务;另一部分是一系列的进程或线程,用于执行这些任务。池的概念主要目的是为了重用:让线程或进程在生命周期内可以多次使用。他减少了创建线程和进程的开销,提高了程序性能。重用不是必须的规则,但它是程序员在应用中使用池的主要原因。

current.Futures提供了两种Executor的子类,各自独立操作一个线程池和一个进程池。这两个子类分别是:
- concurrent.futures.ThreadPoolExecutor(max_workers)
- concurrent.futures.ProcessPoolExecutor(max_workers)
max_workers参数表示最多有多少个worker并行执行任务
代码测试:
import concurrent.futures import time number_list = [1,2,3,4,5,6,7,8,9,10] def evaluate_item(x): #For time consuming result_item = count(x) return result_item def count(number): for i in range(0, 10000000): i = i + 1 return i * number if __name__ == "__main__": # Sequential execution start_time = time.time() for item in number_list: print(evaluate_item(item)) print("Sequential execution in %s seconds" %(str(time.time() - start_time))) # Thread pool execution start_time_1 = time.time() with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: futures = [executor.submit(evaluate_item, item) for item in number_list] for future in concurrent.futures.as_completed(futures): print(future.result()) print("Thread pool execution in %s seconds" %(str(time.time() - start_time_1))) # Process pool execution start_time_2 = time.time() with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor: futures = [executor.submit(evaluate_item, item) for item in number_list] print("Process pool execution in %s seconds" %(str(time.time() - start_time_2)))
运行结果:
100000002000000030000000400000005000000060000000700000008000000090000000100000000

