asyncio异步IO--协程(Coroutine)与任务(Task)详解

摘要:本文翻译自Coroutines and Tasks,主要介绍asyncio中用于处理协程和任务的方法和接口。在翻译过程中,译者在官方文档的基础上增加了部分样例代码和示意图表,以帮助读者对文档的理解。本文内容主要针对python3.7,在低版本的python中可能不适用,敬请留意。原创内容,如需转载请注明出处。 译者:马鸣谦(邮箱:1612557569@qq.com) 协程 协程(coroutines)是通过async/await定义函数或方法,是使用asyncio进行异步编程的首选途径。如下,是一个协程的例子: import asyncio async def main(): print("hello") await asyncio.sleep(1) print("world") 上例中的 main 方法就是我们定义的协程 。 我们在交互环境(Python3.7)下执行以上代码,看看效果: >>> import asyncio >>> async def main(): ... print("hello") ... await asyncio.sleep(1) ... print("world") >>> asyncio.run(main()) hello world 需要注意的是:如果像执行普通代码一样直接调用main(),只会返回一个coroutine对象,main()方法内的代码不会执行: >>> main() #直接执行`main()`返回的是一个`coroutine对象`。 实际上,asyncio提供了三种执行协程的机制: 使用asyncio.run()执行协程。一般用于执行最顶层的入口函数,如main()。 await一个协程。一般用于在一个协程中调用另一协程。 如下是一个示例: >>> import time >>> async def say_after(delay,what): await asyncio.sleep(delay) print(what) >>> async def main(): print(f"started at {time.strftime('%X')}") await say_after(1,"hello") await say_after(2,"world") print(f"finished at {time.strftime('%X')}") >>> asyncio.run(main()) started at 16:47:10 hello world finished at 16:47:13 执行耗时 3秒 用asyncio.create_task()方法将Coroutine(协程)封装为Task(任务)。一般用于实现异步并发操作。 需要注意的是,只有在当前线程存在事件循环的时候才能创建任务(Task)。 我们修改以上的例程,并发执行 两个say_after协程。 async def main(): task1 = asyncio.create_task(say_after(1,"hello")) task2 = asyncio.create_task(say_after(2,"world")) print(f"started at {time.strftime('%X')}") await task1 await task2 print(f"finished at {time.strftime('%X')}") 执行asyncio.run(main()),结果如下: started at 17:01:34 hello world finished at 17:01:36 耗时2秒 “可等待”对象(Awaitables) 如果一个对象能够被用在await表达式中,那么我们称这个对象是可等待对象(awaitable object)。很多asyncio API都被设计成了可等待的。 主要有三类可等待对象: 协程coroutine 任务Task 未来对象Future。 Coroutine(协程) Python的协程是可等待的(awaitable),因此能够被其他协程用在await表达式中。 import asyncio async def nested(): print("something") async def main(): # 如果直接调用 "nested()",什么都不会发生. # 直接调用的时候只是创建了一个 协程对象 ,但这个对象没有被 await, # 所以它并不会执行. nested() # 那么我们 await 这个协程,看看会是什么结果: await nested() # 将会打印 "something". asyncio.run(main()) 重要:在这篇文章中,术语coroutine或协程指代两个关系紧密的概念: 协程函数(coroutine function):由async def定义的函数; 协程对象(coroutine object):调用 协程函数返回的对象。 asyncio也支持传统的基于生成器的协程。 Task(任务) Task用来 并发的 调度协程。 当一个协程通过类似 asyncio.create_task() 的函数被封装进一个 Task时,这个协程 会很快被自动调度执行: import asyncio async def nested(): return 42 async def main(): # Schedule nested() to run soon concurrently # with "main()". task = asyncio.create_task(nested()) # "task" can now be used to cancel "nested()", or # can simply be awaited to wait until it is complete: await task asyncio.run(main()) Future(未来对象) Future 是一种特殊的 底层 可等待对象,代表一个异步操作的最终结果。 当一个Future对象被await的时候,表示当前的协程会持续等待,直到 Future对象所指向的异步操作执行完毕。 在asyncio中,Future对象能使基于回调的代码被用于asyn/await表达式中。 一般情况下,在应用层编程中,没有必要 创建Future对象。 有时候,有些Future对象会被一些库和asyncio API暴露出来,我们可以await它们: async def main(): await function_that_returns_a_future_object() # this is also valid: await asyncio.gather( function_that_returns_a_future_object(), some_python_coroutine() ) 底层函数返回Future对象的一个例子是:loop.run_in_executor 执行asyncio程序 asyncio.run(coro, * , debug=False) 这个函数运行coro参数指定的 协程,负责 管理asyncio事件循环 , 终止异步生成器。 在同一个线程中,当已经有asyncio事件循环在执行时,不能调用此函数。 如果debug=True,事件循环将运行在 调试模式。 此函数总是创建一个新的事件循环,并在最后关闭它。建议将它用作asyncio程序的主入口,并且只调用一次。 Python3.7新增 重要:这个函数是在Python3.7被临时添加到asyncio中的。 创建Task asyncio.create_task(coro) 将coro参数指定的协程(coroutine)封装到一个Task中,并调度执行。返回值是一个Task对象。 任务在由get_running_loop()返回的事件循环(loop)中执行。如果当前线程中没有正在运行的事件循环,将会引发RuntimeError异常: import asyncio async def coro_1(): print("do somthing") task = asyncio.create_task(coro_1()) 因为当前线程中没有正运行的事件循环,所以引发异常: Traceback (most recent call last): File "C:\Program Files\Python37\lib\site-packages\IPython\core\interactiveshell.py", line 3265, in run_code exec(code_obj, self.user_global_ns, self.user_ns) File "", line 1, in task = asyncio.create_task(coro_1()) File "C:\Program Files\Python37\lib\asyncio\tasks.py", line 324, in create_task loop = events.get_running_loop() RuntimeError: no running event loop 对以上代码稍作修改,创建main()方法,在其中创建Task对象,然后在主程序中利用asyncio.run()创建事件循环: import asyncio async def coro(): print("something is running") async def main(): task = asyncio.create_task(coro()) print(asyncio.get_running_loop()) asyncio.run(main()) 执行结果如下: <_WindowsSelectorEventLoop running=True closed=False debug=False> something is running 此函数已经被引入到Python3.7。在Python早期版本中,可以使用底层函数asyncio.ensure_future()代替。 async def coro(): ... # In Python 3.7+ task = asyncio.create_task(coro()) ... # This works in all Python versions but is less readable task = asyncio.ensure_future(coro()) ... Python3.7新增 Sleeping coroutine asyncio.sleep(delay,result=None,* ,loop=None) 阻塞delay秒,例如delay=3,则阻塞3秒。 如果指定了result参数的值,则在协程结束时,将该值返回给调用者。 sleep()通常只暂停当前task,并不影响其他task的执行。 不建议使用loop参数,因为Python计划在3.10版本中移除它。 以下是一个协程的例子,功能是在5秒钟内,每秒显示一次当前的日期: import asyncio import datetime async def display_date(): loop = asyncio.get_running_loop() end_time = loop.time() + 5.0 while True: print(datetime.datetime.now()) if (loop.time() + 1.0) >= end_time: break await asyncio.sleep(1) asyncio.run(display_date()) 执行结果大致如下: 2018-11-20 11:27:15.961830 2018-11-20 11:27:16.961887 2018-11-20 11:27:17.961944 2018-11-20 11:27:18.962001 2018-11-20 11:27:19.962059 2018-11-20 11:27:20.962116 并发执行Tasks awaitable asyncio.gather(* aws, loop=None, return_exceptions=False) 并发执行aws参数指定的 可等待(awaitable)对象序列。 如果 aws 序列中的某个 awaitable 对象 是一个 协程,则自动将这个 协程 封装为 Task对象进行处理。例如: import asyncio async def factorial(name, number): f = 1 for i in range(2, number + 1): print(f"Task {name}: Compute factorial({i})...") await asyncio.sleep(1) f *= i print(f"Task {name}: factorial({number}) = {f}") async def main(): # Schedule three calls *concurrently*: await asyncio.gather( factorial("A", 2), factorial("B", 3), factorial("C", 4), ) asyncio.run(main()) # Expected output: # # Task A: Compute factorial(2)... # Task B: Compute factorial(2)... # Task C: Compute factorial(2)... # Task A: factorial(2) = 2 # Task B: Compute factorial(3)... # Task C: Compute factorial(3)... # Task B: factorial(3) = 6 # Task C: Compute factorial(4)... # Task C: factorial(4) = 24 如果所有的awaitable对象都执行完毕,则返回 awaitable对象执行结果的聚合列表。返回值的顺序于aws参数的顺序一致。 简单修改以上代码: import asyncio async def factorial(name, number): f = 1 for i in range(2, number + 1): #print(f"Task {name}: Compute factorial({i})...") await asyncio.sleep(1) f *= i #print(f"Task {name}: factorial({number}) = {f}") return number async def main(): # Schedule three calls *concurrently*: print(await asyncio.gather( factorial("A", 2), factorial("B", 3), factorial("C", 4), )) asyncio.run(main()) # Expected output: # #[2, 3, 4]#await asyncio.gather()的返回值是一个列表, #分别对应factorial("A", 2),factorial("B", 3),factorial("C", 4)的执行结果。 如果return_execptions参数为False(默认值即为False),引发的第一个异常会立即传播给等待gather()的任务,即调用await asyncio.gather()对象。序列中其他awaitable对象的执行不会受影响。例如: import asyncio async def division(divisor, dividend): if divisor == 0: raise ZeroDivisionError else: print(f"{dividend}/{divisor}={dividend/divisor}") return dividend/divisor async def main(): # Schedule three calls *concurrently*: print(await asyncio.gather( division(0, 2), division(1, 2), division(2, 2), )) asyncio.run(main()) 执行结果: 2/1=2.0 2/2=1.0 Traceback (most recent call last): File "test.py", line 19, in asyncio.run(main()) File "c:\Program Files\Python37\lib\asyncio\runners.py", line 43, in run return loop.run_until_complete(main) File "c:\Program Files\Python37\lib\asyncio\base_events.py", line 573, in run_until_complete return future.result() File "test.py", line 16, in main division(2, 2), File "test.py", line 6, in division raise ZeroDivisionError ZeroDivisionError 如果return_exceptions参数为True,异常会和正常结果一样,被聚合到结果列表中返回。 对以上代码稍作修改,将return_exceptions设为True: import asyncio async def division(divisor, dividend): if divisor == 0: raise ZeroDivisionError else: print(f"{dividend}/{divisor}={dividend/divisor}") return dividend/divisor async def main(): # Schedule three calls *concurrently*: print(await asyncio.gather( division(0, 2), division(1, 2), division(2, 2), return_exceptions=True )) asyncio.run(main()) 执行结果如下: 2/1=2.0 2/2=1.0 [ZeroDivisionError(), 2.0, 1.0]#错误不会向上传播,而是作为结果返回 如果gather()被取消,则提交的所有awaitable对象(尚未执行完成的)都会被取消。例如: import asyncio async def division(divisor, dividend): if divisor == 0: raise ZeroDivisionError else: await asyncio.sleep(divisor) print(f"{dividend}/{divisor}={dividend/divisor}") return dividend/divisor async def main(): # Schedule three calls *concurrently*: t = asyncio.gather( division(0, 2), division(1, 5), division(3, 6), return_exceptions=True ) await asyncio.sleep(2) t.cancel() await t asyncio.run(main()) 执行结果: 5/1=5.0 #除已执行的之外,其他的任务全部被取消 Traceback (most recent call last): File "test.py", line 23, in asyncio.run(main()) File "c:\Program Files\Python37\lib\asyncio\runners.py", line 43, in run return loop.run_until_complete(main) File "c:\Program Files\Python37\lib\asyncio\base_events.py", line 573, in run_until_complete return future.result() concurrent.futures._base.CancelledError #在return_exceptions=True的情况下,异常依然向上传播。 如果aws中某些Task或Future被取消,gather()调用不会被取消,被取消的Task或Future会以引发CancelledError的方式被处理。这样可以避免个别awaitable对象的取消操作影响其他awaitable对象的执行。 例如: import asyncio async def division(divisor, dividend): if divisor == 0: raise ZeroDivisionError else: await asyncio.sleep(divisor) print(f"{dividend}/{divisor}={dividend/divisor}") return dividend/divisor async def main(): # Schedule three calls *concurrently*: task1 = asyncio.create_task(division(0, 2)) task2 = asyncio.create_task(division(1, 5)) task3 = asyncio.create_task(division(3, 6)) t = asyncio.gather( task1, task2, task3, return_exceptions=True ) task1.cancel() print(await t) asyncio.run(main()) 预期执行结果如下: 5/1=5.0 6/3=2.0 [CancelledError(), 5.0, 2.0] # 仅task1被取消,其他任务不受影响。 避免取消 awaitable asyncio.shield(aw, * , loop=None) 防止awaitable对象被取消(cancelled)执行。 如果aw参数是一个协程(coroutines),该对象会被自动封装为Task对象进行处理。 通常,代码: #code 1 res = await shield(something()) 同代码: #code 2 res = await something() 是等价的。 特殊情况是,如果包含以上代码的协程被 取消,code 1与code 2的执行效果就完全不同了: code 1中,运行于something()中的任务 不会被取消。 code 2中,运行于something()中的任务 会被取消。 在code 1中,从something()的视角看,取消操作并没有发生。然而,事实上它的调用者确实被取消了,所以await shield(something())仍然会引发一个CancelledError异常。 import asyncio import time async def division(divisor, dividend): if divisor == 0: raise ZeroDivisionError else: await asyncio.sleep(divisor) print(f"{time.strftime('%X')}:{dividend}/{divisor}={dividend/divisor}") return dividend/divisor async def main(): # Schedule three calls *concurrently*: print(f"Start time:{time.strftime('%X')}") task1 = asyncio.shield(division(1, 2)) task2 = asyncio.create_task(division(1, 5)) task3 = asyncio.create_task(division(3, 6)) res = asyncio.gather(task1, task2, task3, return_exceptions=True) task1.cancel() task2.cancel() print(await res) asyncio.run(main()) 执行结果: Start time:10:38:48 10:38:49:2/1=2.0 10:38:51:6/3=2.0 [CancelledError(), CancelledError(), 2.0] #task1虽然被取消,但是division(1,2)依然正常执行了。 #task2被取消后,division(1,5)没有执行 #虽然task1内的协程被执行,但返回值依然为CancelledError 如果something()以其他的方式被取消,比如从自身内部取消,那么shield()也会被取消。 如果希望完全忽略取消操作(不推荐这么做),则可以将shield()与try/except结合起来使用: try: res = await shield(something()) except CancelledError: res = None 超时(Timeouts) coroutine asyncio.wait_for(aw,timeout,*,loop=None) 在timeout时间之内,等待aw参数指定的awaitable对象执行完毕。 如果aw是一个协程,则会被自动作为Task处理。 timeout可以是None也可以是一个float或int类型的数字,表示需要等待的秒数。如果timeout是None,则永不超时,一直阻塞到aw执行完毕。 如果达到timeout时间,将会取消待执行的任务,引发asyncio.TimeoutError. 如果想避免任务被取消,可以将其封装在shield()中。 程序会等待到任务确实被取消掉,所以等待的总时间会比timeout略大。 如果await_for()被取消,aw也会被取消。 loop参数将在Python3.10中删除,所以不推荐使用。 示例: async def eternity(): # Sleep for one hour await asyncio.sleep(3600) print('yay!') async def main(): # Wait for at most 1 second try: await asyncio.wait_for(eternity(), timeout=1.0) except asyncio.TimeoutError: print('timeout!') asyncio.run(main()) # Expected output: # # timeout! Python3.7新特性:当aw因为超时被取消,wait_for()等到aw确实被取消之后返回异常。在以前的版本中,wait_for会立即返回异常。 等待原语(Waiting Primitives) wait() coroutine asyncio.wait(aws,*,loop=None,timeout=None,return_when=ALL_COMPLETED) 并发执行aws中的awaitable对象,一直阻塞到return_when指定的情况出现。 如果aws中的某些对象是协程(coroutine),则自动转换为Task对象进行处理。直接将coroutine对象传递给wait()会导致令人迷惑的执行结果,所以不建议这么做。 返回值是两个Task/Future集合:(done,pending)。 用法示例: done,pending = await asyncio.wait(aws) loop参数将在Python3.10中删除,所以不建议使用。 timeout参数可以是一个int或float类型的值,可以控制最大等待时间。 需要注意的是,wa
50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信