使用 Async IO 在 Python 中进行异步编程的介绍

许多程序员都熟悉编写顺序(同步)代码,在异步世界中,事件的发生独立于主程序流程。这意味着动作在后台执行,无需等待上一个动作完成。

换句话说,代码行是并发执行的。

想象一下,你有一些独立的任务,每个任务都需要大量的运行时间才能完成。他们的输出不相互依赖。所以,您想一次启动它们。如果这些任务按特定顺序执行,程序将不得不等待每个任务完成后再开始下一个任务。等待时间会阻塞程序。

异步编程范例有助于并发执行这些任务,并确保您可以克服等待时间并更有效地使用资源。

在 Python 中引入异步
Python 中引入了两个主要组件:

  1. async io 这是一个 Python 包,允许 API 运行和管理协程。
  2. async/await 用来定义协程。
    例如要进行 HTTP 调用,请考虑使用 aiohttp ,这是一个 Python 包,允许异步进行 HTTP 调用。在异步代码中,常用的 requests 库可能效果不是很好。

同样,如果您正在使用 Mongo 数据库,而不是依赖同步驱动程序(如 mongo-python ),您必须使用异步驱动程序(如 moto 异步访问 MongoDB)。

在异步世界中,一切都在事件循环中运行。这允许您一次运行多个协程。我们将在本教程中了解协程是什么。

里面的一切 async def 都是异步代码,其他一切都是同步的。

编写异步代码不像编写同步代码那么容易。Python 异步模型基于事件、回调、传输、协议和期货等概念。

异步如何工作
asyncio 库提供了两个关键字, async 和 await .

让我们看一下这个 async hello-world 示例:

import asyncio

async def hello():
print(“Hello World!”)
await asyncio.sleep(1)
print(“Hello again!”)

asyncio.run(hello())

Hello World!

Hello again!

乍一看你可能认为这是一个同步代码,因为第二次打印等待 1 秒打印Hello again! 在Hello World!之后。但是这段代码实际上是异步的。

协程
任何定义为 async def 的函数都是像上面那样的协程。需要注意的是,调用 hello() 函数需要在 asyncio.run() 中执行,

为了运行协程,asyncio 提供了三种主要机制:

asyncio.run() 函数,它是启动事件循环并运行异步的主要入口点。

使用 await 协程的结果并将控制权传递给事件循环。

import asyncio
import time

async def say_something(delay, words):
print(f”Before {words}”)
await asyncio.sleep(delay)
print(f”After {words}”)

async def main():
print(f”Started: {time.strftime(‘%X’)}”)
await say_something(1, “Task 1”)
await say_something(2, “Task 2”)
print(f”Finished: {time.strftime( ‘%X’ )}”)

asyncio.run(main())

Started:15:59:52

Before Task 1

After Task 1

Before Task 2

After Task 2

Finished:15:59:55

前面的代码片段仍然等待 say_something() 协程完成,因此它在 1 秒内执行第一个任务,然后在等待 2 秒后执行第二个任务。

要让协程并发运行,我们应该创建任务,这是第三种机制。

asyncio.create_task() 用于安排协程执行的函数。

import asyncio
import time

async def say_something(delay, words):
print(f”Before {words}”)
await asyncio.sleep(delay)
print(f”After {words}”)

async def main():
print(f”Started: {time.strftime(‘%X’)}”)
task1 = asyncio.create_task(say_something(1,”Task 1”))
task2 = asyncio.create_task(say_something(2, “Task 2”))
await task1
await task2
print(f”Finished: {time.strftime(‘%X’)}”)

asyncio.run(main())

Started:16:07:35

Before Task 1

Before Task 2

After Task 1

After Task 2

Finished:16:07:37

上面的代码现在并发运行,say_something() 协程不再等待 say_something() 协程完成。而是同时运行具有不同参数的同一个协程。

发生的情况如下:

say_something() 协程从参数的第一个任务(1 秒和一个字符串Task 1)开始。这个任务叫做 task1。

然后它会暂停协程的执行并等待 1 秒让 say_something() 协程在遇到 await 关键字时完成。它将控制权返回给事件循环。

同样对于第二个任务,它会暂停协程的执行并等待 2 秒让 say_something() 协程完成,因为它遇到 await 关键字。

task1 控制返回到事件循环后,事件循环继续执行第二个任务 task2 因为 asyncio.sleep() 还没有完成。

asyncio.create_task() 包装 say_something() 函数并使其作为异步任务同时运行协程。 如您所见,上面的代码片段显示它的运行速度比以前快了 1 秒。

当 asyncio.create_task() 被调用时,协程会自动安排在事件循环中运行。

任务可以帮助您并发运行多个协程,但这并不是实现并发的唯一方法。

使用 asyncio.gather() 运行并发任务
另一种同时运行多个协程的方法是使用 asyncio.gather() 函数。此函数将协程作为参数并并发运行它们。

import asyncio
import time

async def greetings():
print(“Welcome”)
await asyncio.sleep(1)
print(“Goodbye”)

async def main():
start = time.time()
await asyncio.gather(greetings(), greetings())
elapsed = time.time() - start
print(f”{name} executed in {elapsed:0.2f} seconds.”)

asyncio.run(main())

Welcome

Welcome

Goodbye

Goodbye

main executed in 1.00 seconds.

在前面的代码中,greetings() 协程被并发执行了两次。

等待对象
如果一个对象可以与 await 关键字一起使用,则该对象称为可等待对象。可等待对象主要有 3 种类型:coroutines、tasks和futures。

coroutines
import asyncio

async def mult(first, second):
print(“Calculating multiplication…”)
await asyncio.sleep(1)
mul = first * second
print(f”{first} multiplied by {second} is {mul}”)
return mul

async def add(first, second):
print(“Calculating sum…”)
await asyncio.sleep(1)
sum = first + second
print(f”Sum of {first} and {second} is {sum}”)
return sum

async def main(first, second):
await mult(first, second)
await add(first, second)

asyncio.run(main(10, 20))

Calculating multiplication…

10 multiplied by 20 is 200

Calculating sum…

Sum of 10 and 20 is 30

在前面的示例中, 协同程序 main() 等待 mult() 和 add() 结束。

假设您在 mult 协程之前省略了 await 关键字。然后您将收到以下错误: RuntimeWarning: coroutine ‘mult’ was never awaited.。

tasks
要安排协程在事件循环中运行,我们使用 asyncio.create_task() 函数。

import asyncio

async def mult(first, second):
print(“Calculating multiplication…”)
await asyncio.sleep(1)
mul = first * second
print(f”{first} multiplied by {second} is {mul}”)
return mul

async def add(first, second):
print(“Calculating sum…”)
await asyncio.sleep(1)
sum = first + second
print(f”Sum of {first} and {second} is {sum}”)
return sum

async def main(first, second):
mult_task = asyncio.create_task(mult(first, second))
add_task = asyncio.create_task(add(first, second))
await mult_task
await add_task

asyncio.run(main(10, 20))

Calculating multiplication…

Calculating sum…

10 multiplied by 20 is 200

Sum of 10 and 20 is 30

futures
Future 是表示异步计算结果的低级可等待对象。它是通过调用 asyncio.Future() 函数创建的。

from asyncio import Future

future = Future()
print(future.done())
print(future.cancelled())
future.cancel()
print(future.done())
print(future.cancelled())

False

False

True

True

超时
用于 asyncio.wait_for(aw, timeout, *) 设置等待对象完成的超时。请注意, aw 这里是可等待的对象。如果要在可等待对象完成时间过长时引发异常,这将很有用。作为异常 asyncio.TimeoutError。

import asyncio

async def slow_operation():
await asyncio.sleep(400)
print(“Completed.”)

async def main():
try:
await asyncio.wait_for(slow_operation(), timeout=1.0)
except asyncio.TimeoutError:
print(“Timed out!”)

asyncio.run(main())

Timed out!

尽管协程需要 400 秒才能完成,但 slow_operation() 中的超时 Future 设置为 1 秒。