Python并行编程(十四):异步编程

 1、基本概念

  除了顺序执行和并行执行的模型以外,还有异步模型,这是事件驱动模型的基础。异步活动的执行模型可以只有一个单一的主控制流,能在单核心系统和多核心系统中运行。

  在并发执行的异步模型中,许多任务被穿插在同一时间线上,所有的任务都由一个控制流执行(单一线程)。任务的执行可能被暂停或恢复,中间的这段时间线程将会执行其他任务。大致如下:

  

  如上图所示,任务(不同的颜色表示不同的任务)可能被其他任务插入,但是都处在同一个线程下。这表明当某一个任务执行的时候,其他任务都暂停了。与多线程编程模型很大的一点不同是,多线程的某个任务在时间线上什么时候挂起某个活动或恢复某个活动由系统决定,而在异步中,程序员必须假设线程可能在任何时间被挂起和替换。

  程序员可以将任务编写成许多可以间隔执行的小步骤,如果一个任务需要另一个任务的输出,那么被依赖的任务必须接收它的输入。

2、使用Python的concurrent.futures模块

  这个模块具有线程池和进程池、管理并行编程任务、处理非确定性的执行流程、进程/线程同步等功能。

  此模块由一下部分组成:

  - concurrent.futures.Executor:这是一个虚拟基类,提供了异步执行的方法。

  - submit(function, argument):调度函数(可调用的对象)的执行,将argument作为参数传入。

  - map(function, argument):将argument作为参数执行函数,以异步的方式。

  - shutdown(Wait=True):发出让执行者释放所有资源的信号。

  - concurrent.futures.Future:其中包括函数的异步执行。Future对象是submit任务(即带有参数的functions)到executor的实例。

  Executor是抽象类,可以通过子类访问,即线程或进程的ExecutorPools。因为线程或进程的实例是依赖于资源的任务,所以最好以池的形式将他们组织在一起,作为可以重用的launcher和executor。

  线程池和进程池是用于在程序中优化和简化线程/进程的使用。通过池可以提交任务给executor。池由两部分组成,一部分是内部的队列,存放着待执行的任务;另一部分是一系列的进程或线程,用于执行这些任务。池的概念主要目的是为了重用:让线程或进程在生命周期内可以多次使用。他减少了创建线程和进程的开销,提高了程序性能。重用不是必须的规则,但它是程序员在应用中使用池的主要原因。

  

  current.Futures提供了两种Executor的子类,各自独立操作一个线程池和一个进程池。这两个子类分别是:

  - concurrent.futures.ThreadPoolExecutor(max_workers)

  - concurrent.futures.ProcessPoolExecutor(max_workers)

  max_workers参数表示最多有多少个worker并行执行任务

  代码测试:

复制代码
import concurrent.futures import time  number_list = [1,2,3,4,5,6,7,8,9,10]  def evaluate_item(x):     #For time consuming    result_item = count(x)     return result_item  def count(number):     for i in range(0, 10000000):         i = i + 1    return i * number  if __name__ == "__main__":     # Sequential execution    start_time = time.time()     for item in number_list:         print(evaluate_item(item))     print("Sequential execution in %s seconds" %(str(time.time() - start_time)))     # Thread pool execution    start_time_1 = time.time()     with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:         futures = [executor.submit(evaluate_item, item) for item in number_list]         for future in concurrent.futures.as_completed(futures):             print(future.result())     print("Thread pool execution in %s seconds"  %(str(time.time() - start_time_1)))      # Process pool execution    start_time_2 = time.time()     with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:         futures = [executor.submit(evaluate_item, item) for item in number_list]     print("Process pool execution in %s seconds" %(str(time.time() - start_time_2)))
复制代码

  运行结果:

复制代码
100000002000000030000000400000005000000060000000700000008000000090000000100000000
                        
关键字:
50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信