许多程序员都熟悉编写顺序(同步)代码,在异步世界中,事件的发生独立于主程序流程。这意味着动作在后台执行,无需等待上一个动作完成。
换句话说,代码行是并发执行的。
想象一下,你有一些独立的任务,每个任务都需要大量的运行时间才能完成。他们的输出不相互依赖。所以,您想一次启动它们。如果这些任务按特定顺序执行,程序将不得不等待每个任务完成后再开始下一个任务。等待时间会阻塞程序。
异步编程范例有助于并发执行这些任务,并确保您可以克服等待时间并更有效地使用资源。
在 Python 中引入异步
Python 中引入了两个主要组件:
async io
这是一个 Python 包,允许 API 运行和管理协程。async/await
用来定义协程。
例如要进行 HTTP 调用,请考虑使用 aiohttp
,这是一个 Python 包,允许异步进行 HTTP 调用。在异步代码中,常用的 requests
库可能效果不是很好。
同样,如果您正在使用 Mongo 数据库,而不是依赖同步驱动程序(如 mongo-python
),您必须使用异步驱动程序(如 moto
异步访问 MongoDB)。
在异步世界中,一切都在事件循环中运行。这允许您一次运行多个协程。我们将在本教程中了解协程是什么。
里面的一切 async def
都是异步代码,其他一切都是同步的。
编写异步代码不像编写同步代码那么容易。Python 异步模型基于事件、回调、传输、协议和期货等概念。
异步如何工作
asyncio
库提供了两个关键字, async
和 await
.
让我们看一下这个 async hello-world 示例:
import asyncio
async def hello():
print("Hello World!")
await asyncio.sleep(1)
print("Hello again!")
asyncio.run(hello())
# Hello World!
# Hello again!
乍一看你可能认为这是一个同步代码,因为第二次打印等待 1 秒打印Hello again!
在Hello World!
之后。但是这段代码实际上是异步的。
协程
任何定义为 async def
的函数都是像上面那样的协程。需要注意的是,调用 hello()
函数需要在 asyncio.run()
中执行,
为了运行协程,asyncio
提供了三种主要机制:
asyncio.run()
函数,它是启动事件循环并运行异步的主要入口点。
使用 await
协程的结果并将控制权传递给事件循环。
import asyncio
import time
async def say_something(delay, words):
print(f"Before {words}")
await asyncio.sleep(delay)
print(f"After {words}")
async def main():
print(f"Started: {time.strftime('%X')}")
await say_something(1, "Task 1")
await say_something(2, "Task 2")
print(f"Finished: {time.strftime( '%X' )}")
asyncio.run(main())
# Started:15:59:52
# Before Task 1
# After Task 1
# Before Task 2
# After Task 2
# Finished:15:59:55
前面的代码片段仍然等待 say_something()
协程完成,因此它在 1 秒内执行第一个任务,然后在等待 2 秒后执行第二个任务。
要让协程并发运行,我们应该创建任务,这是第三种机制。
asyncio.create_task()
用于安排协程执行的函数。
import asyncio
import time
async def say_something(delay, words):
print(f"Before {words}")
await asyncio.sleep(delay)
print(f"After {words}")
async def main():
print(f"Started: {time.strftime('%X')}")
task1 = asyncio.create_task(say_something(1,"Task 1"))
task2 = asyncio.create_task(say_something(2, "Task 2"))
await task1
await task2
print(f"Finished: {time.strftime('%X')}")
asyncio.run(main())
# Started:16:07:35
# Before Task 1
# Before Task 2
# After Task 1
# After Task 2
# Finished:16:07:37
上面的代码现在并发运行,say_something()
协程不再等待 say_something()
协程完成。而是同时运行具有不同参数的同一个协程。
发生的情况如下:
say_something()
协程从参数的第一个任务(1 秒和一个字符串Task 1
)开始。这个任务叫做 task1
。
然后它会暂停协程的执行并等待 1 秒让 say_something()
协程在遇到 await
关键字时完成。它将控制权返回给事件循环。
同样对于第二个任务,它会暂停协程的执行并等待 2 秒让 say_something()
协程完成,因为它遇到 await
关键字。
task1
控制返回到事件循环后,事件循环继续执行第二个任务 task2
因为 asyncio.sleep()
还没有完成。
asyncio.create_task()
包装 say_something()
函数并使其作为异步任务同时运行协程。 如您所见,上面的代码片段显示它的运行速度比以前快了 1 秒。
当 asyncio.create_task()
被调用时,协程会自动安排在事件循环中运行。
任务可以帮助您并发运行多个协程,但这并不是实现并发的唯一方法。
使用 asyncio.gather() 运行并发任务
另一种同时运行多个协程的方法是使用 asyncio.gather()
函数。此函数将协程作为参数并并发运行它们。
import asyncio
import time
async def greetings():
print("Welcome")
await asyncio.sleep(1)
print("Goodbye")
async def main():
start = time.time()
await asyncio.gather(greetings(), greetings())
elapsed = time.time() - start
print(f"{__name__} executed in {elapsed:0.2f} seconds.")
asyncio.run(main())
# Welcome
# Welcome
# Goodbye
# Goodbye
# __main__ executed in 1.00 seconds.
在前面的代码中,greetings()
协程被并发执行了两次。
等待对象
如果一个对象可以与 await
关键字一起使用,则该对象称为可等待对象。可等待对象主要有 3 种类型:coroutines
、tasks
和futures
。
coroutines
import asyncio
async def mult(first, second):
print("Calculating multiplication...")
await asyncio.sleep(1)
mul = first * second
print(f"{first} multiplied by {second} is {mul}")
return mul
async def add(first, second):
print("Calculating sum...")
await asyncio.sleep(1)
sum = first + second
print(f"Sum of {first} and {second} is {sum}")
return sum
async def main(first, second):
await mult(first, second)
await add(first, second)
asyncio.run(main(10, 20))
# Calculating multiplication...
# 10 multiplied by 20 is 200
# Calculating sum...
# Sum of 10 and 20 is 30
在前面的示例中, 协同程序 main()
等待 mult()
和 add()
结束。
假设您在 mult
协程之前省略了 await
关键字。然后您将收到以下错误: RuntimeWarning: coroutine 'mult' was never awaited.
。
tasks
要安排协程在事件循环中运行,我们使用 asyncio.create_task()
函数。
import asyncio
async def mult(first, second):
print("Calculating multiplication...")
await asyncio.sleep(1)
mul = first * second
print(f"{first} multiplied by {second} is {mul}")
return mul
async def add(first, second):
print("Calculating sum...")
await asyncio.sleep(1)
sum = first + second
print(f"Sum of {first} and {second} is {sum}")
return sum
async def main(first, second):
mult_task = asyncio.create_task(mult(first, second))
add_task = asyncio.create_task(add(first, second))
await mult_task
await add_task
asyncio.run(main(10, 20))
# Calculating multiplication...
# Calculating sum...
# 10 multiplied by 20 is 200
# Sum of 10 and 20 is 30
futures
Future 是表示异步计算结果的低级可等待对象。它是通过调用 asyncio.Future()
函数创建的。
from asyncio import Future
future = Future()
print(future.done())
print(future.cancelled())
future.cancel()
print(future.done())
print(future.cancelled())
# False
# False
# True
# True
超时
用于 asyncio.wait_for(aw, timeout, *)
设置等待对象完成的超时。请注意, aw
这里是可等待的对象。如果要在可等待对象完成时间过长时引发异常,这将很有用。作为异常 asyncio.TimeoutError
。
import asyncio
async def slow_operation():
await asyncio.sleep(400)
print("Completed.")
async def main():
try:
await asyncio.wait_for(slow_operation(), timeout=1.0)
except asyncio.TimeoutError:
print("Timed out!")
asyncio.run(main())
# Timed out!
尽管协程需要 400 秒才能完成,但 slow_operation()
中的超时 Future 设置为 1 秒。