十八、异步编程 asyncio

一. 概念识别

阻塞: A 调用 B, A 会被挂起, 一直等待 B 的结果, 什么都不干.

非阻塞: A 调用 B, A 自己不用被挂起来等待 B 的结果, A 可以去干其他事情.

同步: A 调用 B, 此时只有等 B 有了结果才返回. 同步意味着有序

异步: A 调用 B, B 立即返回, 无需等待, 等 B 处理完之后再告诉 A 结果. 异步意味着无序

并发: 为了让独立的子任务能够尽快完成.

并行: 为了利用多核加速多任务的完成.

事件循环: 并非真正的循环, 而是线程不断从事件列表中取事件的动作.

回调:

  • 同步回调: 一种 阻塞式调用, 需要等待调用事件返回.
  • 回调: 一种 双向调用模式, 被调用方调用时也会调用对方.
  • 异步回调: 一种类似消息或事件的机制, 即收到某种信息时, 会主动通知调用方.

二. 协程

2.1 协程

::: info
协程(coroutine): 又称微线程, 一种用户态的轻量级线程.
:::

::: info
协程(coroutine): 又称微线程, 一种用户态的轻量级线程.
:::

async: 用来定义协程的关键字.

await: 等待到对象的返回结果,才会继续执行后续代码.

1
2
3
4
5
6
7
8
9
10
11
import asyncio


# 打印 hello 等待 1s 后打印 python
async def demo1():
print("hello")
await asyncio.sleep(1)
print("python")


asyncio.run(demo1())

2.2 asyncio 库

asyncio 三种主要机制:

  • asyncio.run(): 函数用来运行最高层级的入口点 “main()” 函数.
  • 等待一个协程.
  • asyncio.create_task(): 函数用来并发运行作为 asyncio 任务 的多个协程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 等待 1 秒后打印 "hello",然后 再次 等待 2 秒后打印 "world"
import asyncio
import time

async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)

async def main():
start_time = time.time()

await say_after(1, 'hello')
await say_after(2, 'world')

end_time = time.time()
print(f"耗时: {end_time - start_time}")

asyncio.run(main())

"""
运行结果:
hello
world
耗时: 3.003185987472534
"""
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio
import time

async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)

async def main():
task1 = asyncio.create_task(say_after(1, "hello"))
task2 = asyncio.create_task(say_after(2, "Python"))
start_time = time.time()
await task1
await task2
end_time = time.time()
print(f"耗时: {end_time - start_time}")

asyncio.run(main())

"""
运行结果:
hello
Python
耗时: 2.001649856567383
"""

2.2.1 运行 asyncio 程序: run()

  • asyncio.run(coro, *, debug=False)
    • 运行传入的协程,负责管理 asyncio 事件循环,终结异步生成器,并关闭线程池。
    • 用作 asyncio 程序的主入口点,理想情况下应当只被调用一次。
1
2
3
4
5
async def main():
await asyncio.sleep(1)
print('hello')

asyncio.run(main())

2.2.2 创建任务: create_task()

  • asyncio.create_task(coro, *, name=None)
    • 将协程添加到asyncio.create_task()中,则该协程将很快的自动计划运行。
    • 将 coro 协程 封装为一个 Task 并调度其执行。返回 Task 对象。

2.2.3 休眠: sleep()

  • asyncio.sleep(delay, result=None, *, loop=None)
    • 阻塞 delay 指定的秒数。
    • 如果指定了 result,则当协程完成时将其返回给调用者。
    • sleep() 总是会挂起当前任务,以允许其他任务运行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 运行 5 秒,每秒显示一次当前日期:
import asyncio
import datetime

async def display_date():
loop = asyncio.get_running_loop()
end_time = loop.time() + 5.0
while True:
print(datetime.datetime.now())
if (loop.time() + 1.0) >= end_time:
break
await asyncio.sleep(1)

asyncio.run(display_date())

2.2.4 并发运行任务: wait()、gather()

::: info
相同:asyncio.wait 和 asyncio.gather 实现的效果是相同的,都是把所有 Task 任务结果收集起来。

不同

  • asyncio.wait 会返回两个值:done 和 pending,
    • done 为已完成的协程 Task,
    • pending 为超时未完成的协程 Task,需通过 future.result 调用 Task 的 result;
  • asyncio.gather 返回的是所有已完成 Task 的 result,不需要再进行调用或其他操作,就可以得到全部结果。
    :::
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import asyncio
import arrow

def current_time():
# 获取当前时间
cur_time = arrow.now().to('Asia/Shanghai').format('YYYY-MM-DD HH:mm:ss')
return cur_time

async def func(sleep_time):
func_name_suffix = sleep_time # 使用 sleep_time(函数 I/O 等待时长)作为函数名后缀,以区分任务对象
print(f"[{current_time()}] 执行异步函数 {func.__name__}-{func_name_suffix}")
await asyncio.sleep(sleep_time)
print(f"[{current_time()}] 函数 {func.__name__}-{func_name_suffix} 执行完毕")
return f"【[{current_time()}] 得到函数 {func.__name__}-{func_name_suffix} 执行结果】"

async def run():
task_list = []
for i in range(5):
task = asyncio.create_task(func(i))
task_list.append(task)

# asyncio.gather 返回的是所有已完成 Task 的 result
# results = await asyncio.gather(*task_list)
# for result in results:
# print(f"[{current_time()}] 得到执行结果 {result}")

# asyncio.wait 会返回两个值:done 和 pending
done, pending = await asyncio.wait(task_list)
for result in done:
print(f"[{current_time()}] 得到执行结果 {result.result()}")

asyncio.run(run())

"""
运行结果:
[2023-04-27 14:29:00] 执行异步函数 func-0
[2023-04-27 14:29:00] 执行异步函数 func-1
[2023-04-27 14:29:00] 执行异步函数 func-2
[2023-04-27 14:29:00] 执行异步函数 func-3
[2023-04-27 14:29:00] 执行异步函数 func-4
[2023-04-27 14:29:00] 函数 func-0 执行完毕
[2023-04-27 14:29:01] 函数 func-1 执行完毕
[2023-04-27 14:29:02] 函数 func-2 执行完毕
[2023-04-27 14:29:03] 函数 func-3 执行完毕
[2023-04-27 14:29:04] 函数 func-4 执行完毕
[2023-04-27 14:29:04] 得到执行结果 【[2023-04-27 14:29:04] 得到函数 func-4 执行结果】
[2023-04-27 14:29:04] 得到执行结果 【[2023-04-27 14:29:02] 得到函数 func-2 执行结果】
[2023-04-27 14:29:04] 得到执行结果 【[2023-04-27 14:29:03] 得到函数 func-3 执行结果】
[2023-04-27 14:29:04] 得到执行结果 【[2023-04-27 14:29:01] 得到函数 func-1 执行结果】
[2023-04-27 14:29:04] 得到执行结果 【[2023-04-27 14:29:00] 得到函数 func-0 执行结果】
"""

2.2.5 防止取消操作: shield()

  • asyncio.shield(aw, *, loop=None)
    • 保护一个 可等待对象 防止其被 取消
1
res = await shield(something())

2.2.6 超时: wait_for()

  • asyncio.wait_for(aw, timeout, *, loop=None)
    • 等待 aw 可等待对象 完成,指定 timeout 秒数后超时
    • timeout 可以为 None,也可以为 float 或 int 型数值表示的等待秒数
    • 任务取消引发 asyncio.TimeoutError, 要避免任务取消,可以加上 shield()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio

async def eternity():
# Sleep for one hour
await asyncio.sleep(3600)
print('yay!')

async def main():
# 最多等待1秒
try:
await asyncio.wait_for(eternity(), timeout=1.0)
except asyncio.TimeoutError:
print('timeout!')

asyncio.run(main())

"""
timeout!
"""

2.2.7 Task

task对象作用: 可以将多个任务添加到事件循环当中,达到多任务并发的效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio

async def func2():
print(1111)
await asyncio.sleep(2)
print(2222)
return "test"
tasks = [func2(), func2()]

x, y = asyncio.run(asyncio.wait(tasks))
print(x)
print(y)

'''
运行结果:
1111
1111
2222
2222
{<Task finished name='Task-3' coro=<func2() done, defined at /Users/chengqiande/Documents/PythonObject/Demo/进程、线程、协程/多线程/Demo2/demo6.py:7> result='test'>, <Task finished name='Task-2' coro=<func2() done, defined at /Users/chengqiande/Documents/PythonObject/Demo/进程、线程、协程/多线程/Demo2/demo6.py:7> result='test'>}
set()
'''

2.2.8 多线程异步执行: to_thread()

asyncio.to_thread(): 将同步阻塞放入多线程异步执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import asyncio
import threading
import time
import atexit

start = time.time()
atexit.register(lambda: print('用时(秒):', time.time()-start))

def hard_work():
print('thread id:', threading.get_ident())
time.sleep(5)

async def do_async_job():
await asyncio.to_thread(hard_work)
await asyncio.sleep(1)
print('job done!')

async def main():
tasks = []
for i in range(3):
tasks.append(asyncio.create_task(do_async_job()))

await asyncio.wait(tasks)

asyncio.run(main())

'''
运行结果:
thread id: 12936740864
thread id: 12953530368
thread id: 12970319872
job done!
job done!
job done!
用时(秒): 6.017727851867676
'''