一. 概念识别 阻塞: A 调用 B, A 会被挂起, 一直等待 B 的结果, 什么都不干.
非阻塞: A 调用 B, A 自己不用被挂起来等待 B 的结果, A 可以去干其他事情.
同步: A 调用 B, 此时只有等 B 有了结果才返回. 同步意味着有序
异步: A 调用 B, B 立即返回, 无需等待, 等 B 处理完之后再告诉 A 结果. 异步意味着无序
并发: 为了让独立的子任务能够尽快完成.
并行: 为了利用多核加速多任务的完成.
事件循环: 并非真正的循环, 而是线程不断从事件列表中取事件的动作.
回调:
同步回调: 一种 阻塞式调用
, 需要等待调用事件返回.回调: 一种 双向调用模式
, 被调用方调用时也会调用对方.异步回调: 一种类似消息或事件的机制, 即收到某种信息时, 会主动通知调用方.二. 协程 2.1 协程 协程(coroutine): 又称微线程, 一种用户态的轻量级线程.
协程(coroutine): 又称微线程, 一种用户态的轻量级线程.
async : 用来定义协程的关键字.
await : 等待到对象的返回结果,才会继续执行后续代码.
1 2 3 4 5 6 7 8 9 10 11 import asyncioasync def demo1 (): print ("hello" ) await asyncio.sleep(1 ) print ("python" ) asyncio.run(demo1())
2.2 asyncio 库 asyncio 三种主要机制 :
asyncio.run()
: 函数用来运行最高层级的入口点 “main()” 函数.等待一个协程. asyncio.create_task()
: 函数用来并发运行作为 asyncio 任务 的多个协程。1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import asyncioimport timeasync def say_after (delay, what ): await asyncio.sleep(delay) print (what) async def main (): start_time = time.time() await say_after(1 , 'hello' ) await say_after(2 , 'world' ) end_time = time.time() print (f"耗时: {end_time - start_time} " ) asyncio.run(main()) """ 运行结果: hello world 耗时: 3.003185987472534 """
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import asyncioimport timeasync def say_after (delay, what ): await asyncio.sleep(delay) print (what) async def main (): task1 = asyncio.create_task(say_after(1 , "hello" )) task2 = asyncio.create_task(say_after(2 , "Python" )) start_time = time.time() await task1 await task2 end_time = time.time() print (f"耗时: {end_time - start_time} " ) asyncio.run(main()) """ 运行结果: hello Python 耗时: 2.001649856567383 """
2.2.1 运行 asyncio 程序: run() asyncio.run(coro, *, debug=False)
运行传入的协程,负责管理 asyncio 事件循环,终结异步生成器,并关闭线程池。 用作 asyncio 程序的主入口点,理想情况下应当只被调用一次。 1 2 3 4 5 async def main (): await asyncio.sleep(1 ) print ('hello' ) asyncio.run(main())
2.2.2 创建任务: create_task() asyncio.create_task(coro, *, name=None)
将协程添加到asyncio.create_task()中,则该协程将很快的自动计划运行。 将 coro 协程 封装为一个 Task 并调度其执行。返回 Task 对象。 2.2.3 休眠: sleep() asyncio.sleep(delay, result=None, *, loop=None)
阻塞 delay 指定的秒数。 如果指定了 result,则当协程完成时将其返回给调用者。 sleep() 总是会挂起当前任务,以允许其他任务运行。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 import asyncioimport datetimeasync def display_date (): loop = asyncio.get_running_loop() end_time = loop.time() + 5.0 while True : print (datetime.datetime.now()) if (loop.time() + 1.0 ) >= end_time: break await asyncio.sleep(1 ) asyncio.run(display_date())
2.2.4 并发运行任务: wait()、gather() 相同 :asyncio.wait 和 asyncio.gather 实现的效果是相同的,都是把所有 Task 任务结果收集起来。
不同 :
asyncio.wait 会返回两个值:done 和 pending,done 为已完成的协程 Task, pending 为超时未完成的协程 Task,需通过 future.result 调用 Task 的 result; asyncio.gather 返回的是所有已完成 Task 的 result,不需要再进行调用或其他操作,就可以得到全部结果。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 import asyncioimport arrowdef current_time (): cur_time = arrow.now().to('Asia/Shanghai' ).format ('YYYY-MM-DD HH:mm:ss' ) return cur_time async def func (sleep_time ): func_name_suffix = sleep_time print (f"[{current_time()} ] 执行异步函数 {func.__name__} -{func_name_suffix} " ) await asyncio.sleep(sleep_time) print (f"[{current_time()} ] 函数 {func.__name__} -{func_name_suffix} 执行完毕" ) return f"【[{current_time()} ] 得到函数 {func.__name__} -{func_name_suffix} 执行结果】" async def run (): task_list = [] for i in range (5 ): task = asyncio.create_task(func(i)) task_list.append(task) done, pending = await asyncio.wait(task_list) for result in done: print (f"[{current_time()} ] 得到执行结果 {result.result()} " ) asyncio.run(run()) """ 运行结果: [2023-04-27 14:29:00] 执行异步函数 func-0 [2023-04-27 14:29:00] 执行异步函数 func-1 [2023-04-27 14:29:00] 执行异步函数 func-2 [2023-04-27 14:29:00] 执行异步函数 func-3 [2023-04-27 14:29:00] 执行异步函数 func-4 [2023-04-27 14:29:00] 函数 func-0 执行完毕 [2023-04-27 14:29:01] 函数 func-1 执行完毕 [2023-04-27 14:29:02] 函数 func-2 执行完毕 [2023-04-27 14:29:03] 函数 func-3 执行完毕 [2023-04-27 14:29:04] 函数 func-4 执行完毕 [2023-04-27 14:29:04] 得到执行结果 【[2023-04-27 14:29:04] 得到函数 func-4 执行结果】 [2023-04-27 14:29:04] 得到执行结果 【[2023-04-27 14:29:02] 得到函数 func-2 执行结果】 [2023-04-27 14:29:04] 得到执行结果 【[2023-04-27 14:29:03] 得到函数 func-3 执行结果】 [2023-04-27 14:29:04] 得到执行结果 【[2023-04-27 14:29:01] 得到函数 func-1 执行结果】 [2023-04-27 14:29:04] 得到执行结果 【[2023-04-27 14:29:00] 得到函数 func-0 执行结果】 """
2.2.5 防止取消操作: shield() asyncio.shield(aw, *, loop=None)
1 res = await shield(something())
2.2.6 超时: wait_for() asyncio.wait_for(aw, timeout, *, loop=None)
等待 aw 可等待对象 完成,指定 timeout 秒数后超时 timeout 可以为 None,也可以为 float 或 int 型数值表示的等待秒数 任务取消引发 asyncio.TimeoutError
, 要避免任务取消,可以加上 shield()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import asyncioasync def eternity (): await asyncio.sleep(3600 ) print ('yay!' ) async def main (): try : await asyncio.wait_for(eternity(), timeout=1.0 ) except asyncio.TimeoutError: print ('timeout!' ) asyncio.run(main()) """ timeout! """
2.2.7 Task task
对象作用: 可以将多个任务添加到事件循环当中,达到多任务并发的效果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import asyncioasync def func2 (): print (1111 ) await asyncio.sleep(2 ) print (2222 ) return "test" tasks = [func2(), func2()] x, y = asyncio.run(asyncio.wait(tasks)) print (x)print (y)''' 运行结果: 1111 1111 2222 2222 {<Task finished name='Task-3' coro=<func2() done, defined at /Users/chengqiande/Documents/PythonObject/Demo/进程、线程、协程/多线程/Demo2/demo6.py:7> result='test'>, <Task finished name='Task-2' coro=<func2() done, defined at /Users/chengqiande/Documents/PythonObject/Demo/进程、线程、协程/多线程/Demo2/demo6.py:7> result='test'>} set() '''
2.2.8 多线程异步执行: to_thread() asyncio.to_thread()
: 将同步阻塞放入多线程异步执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 import asyncioimport threadingimport timeimport atexitstart = time.time() atexit.register(lambda : print ('用时(秒):' , time.time()-start)) def hard_work (): print ('thread id:' , threading.get_ident()) time.sleep(5 ) async def do_async_job (): await asyncio.to_thread(hard_work) await asyncio.sleep(1 ) print ('job done!' ) async def main (): tasks = [] for i in range (3 ): tasks.append(asyncio.create_task(do_async_job())) await asyncio.wait(tasks) asyncio.run(main()) ''' 运行结果: thread id: 12936740864 thread id: 12953530368 thread id: 12970319872 job done! job done! job done! 用时(秒): 6.017727851867676 '''