18.异步编程 asyncio
大约 6 分钟学习笔记Python基础
一. 概念识别
阻塞: A 调用 B, A 会被挂起, 一直等待 B 的结果, 什么都不干.
非阻塞: A 调用 B, A 自己不用被挂起来等待 B 的结果, A 可以去干其他事情.
同步: A 调用 B, 此时只有等 B 有了结果才返回. 同步意味着有序
异步: A 调用 B, B 立即返回, 无需等待, 等 B 处理完之后再告诉 A 结果. 异步意味着无序
并发: 为了让独立的子任务能够尽快完成.
并行: 为了利用多核加速多任务的完成.
事件循环: 并非真正的循环, 而是线程不断从事件列表中取事件的动作.
回调:
- 同步回调: 一种
阻塞式调用
, 需要等待调用事件返回. - 回调: 一种
双向调用模式
, 被调用方调用时也会调用对方. - 异步回调: 一种类似消息或事件的机制, 即收到某种信息时, 会主动通知调用方.
二. 协程
2.1 协程
相关信息
协程(coroutine): 又称微线程, 一种用户态的轻量级线程.
相关信息
协程(coroutine): 又称微线程, 一种用户态的轻量级线程.
async: 用来定义协程的关键字.
await: 等待到对象的返回结果,才会继续执行后续代码.
import asyncio
# 打印 hello 等待 1s 后打印 python
async def demo1():
print("hello")
await asyncio.sleep(1)
print("python")
asyncio.run(demo1())
2.2 asyncio 库
asyncio 三种主要机制:
asyncio.run()
: 函数用来运行最高层级的入口点 "main()" 函数.- 等待一个协程.
asyncio.create_task()
: 函数用来并发运行作为 asyncio 任务 的多个协程。
# 等待 1 秒后打印 "hello",然后 再次 等待 2 秒后打印 "world"
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
start_time = time.time()
await say_after(1, 'hello')
await say_after(2, 'world')
end_time = time.time()
print(f"耗时: {end_time - start_time}")
asyncio.run(main())
"""
运行结果:
hello
world
耗时: 3.003185987472534
"""
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
task1 = asyncio.create_task(say_after(1, "hello"))
task2 = asyncio.create_task(say_after(2, "Python"))
start_time = time.time()
await task1
await task2
end_time = time.time()
print(f"耗时: {end_time - start_time}")
asyncio.run(main())
"""
运行结果:
hello
Python
耗时: 2.001649856567383
"""
2.2.1 运行 asyncio 程序: run()
asyncio.run(coro, *, debug=False)
- 运行传入的协程,负责管理 asyncio 事件循环,终结异步生成器,并关闭线程池。
- 用作 asyncio 程序的主入口点,理想情况下应当只被调用一次。
async def main():
await asyncio.sleep(1)
print('hello')
asyncio.run(main())
2.2.2 创建任务: create_task()
asyncio.create_task(coro, *, name=None)
- 将协程添加到 asyncio.create_task()中,则该协程将很快的自动计划运行。
- 将 coro 协程 封装为一个 Task 并调度其执行。返回 Task 对象。
2.2.3 休眠: sleep()
asyncio.sleep(delay, result=None, *, loop=None)
- 阻塞 delay 指定的秒数。
- 如果指定了 result,则当协程完成时将其返回给调用者。
- sleep() 总是会挂起当前任务,以允许其他任务运行。
# 运行 5 秒,每秒显示一次当前日期:
import asyncio
import datetime
async def display_date():
loop = asyncio.get_running_loop()
end_time = loop.time() + 5.0
while True:
print(datetime.datetime.now())
if (loop.time() + 1.0) >= end_time:
break
await asyncio.sleep(1)
asyncio.run(display_date())
2.2.4 并发运行任务: wait()、gather()
相关信息
相同:asyncio.wait 和 asyncio.gather 实现的效果是相同的,都是把所有 Task 任务结果收集起来。
不同:
- asyncio.wait 会返回两个值:done 和 pending,
- done 为已完成的协程 Task,
- pending 为超时未完成的协程 Task,需通过 future.result 调用 Task 的 result;
- asyncio.gather 返回的是所有已完成 Task 的 result,不需要再进行调用或其他操作,就可以得到全部结果。
import asyncio
import arrow
def current_time():
# 获取当前时间
cur_time = arrow.now().to('Asia/Shanghai').format('YYYY-MM-DD HH:mm:ss')
return cur_time
async def func(sleep_time):
func_name_suffix = sleep_time # 使用 sleep_time(函数 I/O 等待时长)作为函数名后缀,以区分任务对象
print(f"[{current_time()}] 执行异步函数 {func.__name__}-{func_name_suffix}")
await asyncio.sleep(sleep_time)
print(f"[{current_time()}] 函数 {func.__name__}-{func_name_suffix} 执行完毕")
return f"【[{current_time()}] 得到函数 {func.__name__}-{func_name_suffix} 执行结果】"
async def run():
task_list = []
for i in range(5):
task = asyncio.create_task(func(i))
task_list.append(task)
# asyncio.gather 返回的是所有已完成 Task 的 result
# results = await asyncio.gather(*task_list)
# for result in results:
# print(f"[{current_time()}] 得到执行结果 {result}")
# asyncio.wait 会返回两个值:done 和 pending
done, pending = await asyncio.wait(task_list)
for result in done:
print(f"[{current_time()}] 得到执行结果 {result.result()}")
asyncio.run(run())
"""
运行结果:
[2023-04-27 14:29:00] 执行异步函数 func-0
[2023-04-27 14:29:00] 执行异步函数 func-1
[2023-04-27 14:29:00] 执行异步函数 func-2
[2023-04-27 14:29:00] 执行异步函数 func-3
[2023-04-27 14:29:00] 执行异步函数 func-4
[2023-04-27 14:29:00] 函数 func-0 执行完毕
[2023-04-27 14:29:01] 函数 func-1 执行完毕
[2023-04-27 14:29:02] 函数 func-2 执行完毕
[2023-04-27 14:29:03] 函数 func-3 执行完毕
[2023-04-27 14:29:04] 函数 func-4 执行完毕
[2023-04-27 14:29:04] 得到执行结果 【[2023-04-27 14:29:04] 得到函数 func-4 执行结果】
[2023-04-27 14:29:04] 得到执行结果 【[2023-04-27 14:29:02] 得到函数 func-2 执行结果】
[2023-04-27 14:29:04] 得到执行结果 【[2023-04-27 14:29:03] 得到函数 func-3 执行结果】
[2023-04-27 14:29:04] 得到执行结果 【[2023-04-27 14:29:01] 得到函数 func-1 执行结果】
[2023-04-27 14:29:04] 得到执行结果 【[2023-04-27 14:29:00] 得到函数 func-0 执行结果】
"""
2.2.5 防止取消操作: shield()
asyncio.shield(aw, *, loop=None)
- 保护一个 可等待对象 防止其被 取消
res = await shield(something())
2.2.6 超时: wait_for()
asyncio.wait_for(aw, timeout, *, loop=None)
- 等待 aw 可等待对象 完成,指定 timeout 秒数后超时
- timeout 可以为 None,也可以为 float 或 int 型数值表示的等待秒数
- 任务取消引发
asyncio.TimeoutError
, 要避免任务取消,可以加上shield()
import asyncio
async def eternity():
# Sleep for one hour
await asyncio.sleep(3600)
print('yay!')
async def main():
# 最多等待1秒
try:
await asyncio.wait_for(eternity(), timeout=1.0)
except asyncio.TimeoutError:
print('timeout!')
asyncio.run(main())
"""
timeout!
"""
2.2.7 Task
task
对象作用: 可以将多个任务添加到事件循环当中,达到多任务并发的效果
import asyncio
async def func2():
print(1111)
await asyncio.sleep(2)
print(2222)
return "test"
tasks = [func2(), func2()]
x, y = asyncio.run(asyncio.wait(tasks))
print(x)
print(y)
'''
运行结果:
1111
1111
2222
2222
{<Task finished name='Task-3' coro=<func2() done, defined at /Users/chengqiande/Documents/PythonObject/Demo/进程、线程、协程/多线程/Demo2/demo6.py:7> result='test'>, <Task finished name='Task-2' coro=<func2() done, defined at /Users/chengqiande/Documents/PythonObject/Demo/进程、线程、协程/多线程/Demo2/demo6.py:7> result='test'>}
set()
'''
2.2.8 多线程异步执行: to_thread()
asyncio.to_thread()
: 将同步阻塞放入多线程异步执行
import asyncio
import threading
import time
import atexit
start = time.time()
atexit.register(lambda: print('用时(秒):', time.time()-start))
def hard_work():
print('thread id:', threading.get_ident())
time.sleep(5)
async def do_async_job():
await asyncio.to_thread(hard_work)
await asyncio.sleep(1)
print('job done!')
async def main():
tasks = []
for i in range(3):
tasks.append(asyncio.create_task(do_async_job()))
await asyncio.wait(tasks)
asyncio.run(main())
'''
运行结果:
thread id: 12936740864
thread id: 12953530368
thread id: 12970319872
job done!
job done!
job done!
用时(秒): 6.017727851867676
'''