import asyncio
async def coroutine_example():
await asyncio.sleep(1)
return 'zhihu ID: Zarten'
coro = coroutine_example()
loop = asyncio.get_event_loop()
task = loop.create_task(coro)
print('运行情况:', task)
try:
print('返回值:', task.result())
except asyncio.InvalidStateError:
print('task状态未完成,捕获了 InvalidStateError 异常')
loop.run_until_complete(task)
print('再看下运行情况:', task)
print('返回值:', task.result())
loop.close()
可以参考一下这个,另外,建议了解一下coroutine
和task
的区别
我之前看b站高天的视频,大概记录了一些
视频链接
asyncio的理解与入门,搞不明白协程?看这个视频就够了。
await机制详解。再来个硬核内容,把并行和依赖背后的原理全给你讲明白
asyncio的理解与入门
coroutine和task
coroutine
async def的函数是coroutine func。
async def的函数单纯被call,会变成coroutine object,但只是单纯被call,会报错。
coroutine的执行方式
coroutine object可以通过以下方式被正常运行
await gather(*coroutine_objs)
- 这里也可以传入单个coroutine object。
coroutine和task的关系
create_task
显式地将coroutine object变成task
gather
隐式地将coroutine object变成task。
而直接await
一个coroutine_object,则不会产生task,类似立即调用了生成器(同步执行),不会将控制权交还给event loop。
整个asyncio的执行方式
event loop
另外asyncio.run(coroutine_func)
其实是先创建了一个event loop
,由event loop
来控制task的执行。
-
asyncio.run(coroutine_func)
,就会产生一个task。
-
event loop调度的时候最小单位是一个task。
-
event loop无法直接执行一个coroutine_obj。
直接await
直接await coroutine_obj
会导致event loop
逐个发现task的时候,直接同步执行这步代码,不会将控制权交还给event loop。
import time
import asyncio
async def nap(delay:int):
await asyncio.sleep(delay)
return f"after {delay:d} second nap finished"
async def main():
start_time = time.time()
ret = await nap(1)
ret2 = await nap(2)
print(ret)
print(ret2)
print(f"time cost: {time.time()-start_time:.1f}s")
asyncio.run(main())
run result
after 1 second nap finished
after 2 second nap finished
time cost: 3.0s
create_task配合await
而通过create_task,再逐个await task,可以异步执行。
import time
import asyncio
async def nap(delay:int):
await asyncio.sleep(delay)
return f"after {delay:d} second nap finished"
async def main():
start_time = time.time()
task = asyncio.create_task(nap(1))
task2 = asyncio.create_task(nap(2))
ret = await task
ret2 = await task2
print(ret)
print(ret2)
print(f"time cost: {time.time()-start_time:.1f}s")
asyncio.run(main())
run result
after 1 second nap finished
after 2 second nap finished
time cost: 2.0s
gather
await gather(coroutine_objs)
,也可以异步执行。
import time
import asyncio
async def nap(delay:int):
await asyncio.sleep(delay)
return f"after {delay:d} second nap finished"
async def main():
start_time = time.time()
rets = await asyncio.gather(
nap(1),
nap(2),
)
ret, ret2 = rets
print(ret)
print(ret2)
print(f"time cost: {time.time()-start_time:.1f}s")
asyncio.run(main())
run result
after 1 second nap finished
after 2 second nap finished
time cost: 2.0s
总结
直接await coutine_obj
,会导致这步代码同步执行。
「create_task配合await」和「gather」,可以异步执行,大致逻辑是。
- 会让
event loop
先收到外层的tasks。
- 在等待时,逐个发现是否还有其他task可执行。有点类似树结构,保证子节点们执行完,才能去执行父节点,实现异步执行。
补充asyncio.wait
相比于gather
, wait
可以
- 返回已完成的任务和待完成的任务,这两个序列。
- 而且传入对象必须为task的序列,不能为生成器,也不能为单个task。
import time
import random
import asyncio
async def foo(num:int):
return num
async def main():
task = asyncio.create_task(foo(random.randint(0,100)))
dones, pendings = await asyncio.wait([task]) # 必须是一个task的序列,不能是单个task。
# 这里说明dones,其实是完成的task
if task in dones:
result = task.result()
print(result)
print('='*30)
tasks = [asyncio.create_task(foo(random.randint(0, 100))) for _ in range(5)]
dones, pendings = await asyncio.wait(tasks)
for task in dones:
print(task.result())
asyncio.run(main())
await机制详解
await后面可以是coroutine object、task、future这三种。
import asyncio
async def main():
await asyncio.sleep(1)
dis.dis(main())
run result
98 0 LOAD_GLOBAL 0 (asyncio)
2 LOAD_METHOD 1 (sleep)
4 LOAD_CONST 1 (1)
6 CALL_METHOD 1
8 GET_AWAITABLE
10 LOAD_CONST 0 (None)
12 YIELD_FROM
14 POP_TOP
16 LOAD_CONST 0 (None)
18 RETURN_VALUE
其中主要部分
8 GET_AWAITABLE
10 LOAD_CONST 0 (None)
12 YIELD_FROM
async里,当return一个值的时候,其机制类似generator。
- return,其实等同
raise StopIteration
,return的值,则设置为StopIteration
的Exception
的value
try:
raise StopIteration('Value here')
except Exception as e:
print(e) # Value here