许多程序员都熟悉编写顺序(同步)代码,在异步世界中,事件的发生独立于主程序流程。这意味着动作在后台执行,无需等待上一个动作完成。

换句话说,代码行是并发执行的。

想象一下,你有一些独立的任务,每个任务都需要大量的运行时间才能完成。他们的输出不相互依赖。所以,您想一次启动它们。如果这些任务按特定顺序执行,程序将不得不等待每个任务完成后再开始下一个任务。等待时间会阻塞程序。

异步编程范例有助于并发执行这些任务,并确保您可以克服等待时间并更有效地使用资源。

在 Python 中引入异步

Python 中引入了两个主要组件:

  1. async io 这是一个 Python 包,允许 API 运行和管理协程。
  2. async/await 用来定义协程。

例如要进行 HTTP 调用,请考虑使用 aiohttp ,这是一个 Python 包,允许异步进行 HTTP 调用。在异步代码中,常用的 requests 库可能效果不是很好。

同样,如果您正在使用 Mongo 数据库,而不是依赖同步驱动程序(如 mongo-python ),您必须使用异步驱动程序(如 moto 异步访问 MongoDB)。

在异步世界中,一切都在事件循环中运行。这允许您一次运行多个协程。我们将在本教程中了解协程是什么。

里面的一切 async def 都是异步代码,其他一切都是同步的。

编写异步代码不像编写同步代码那么容易。Python 异步模型基于事件、回调、传输、协议和期货等概念。

异步如何工作

asyncio 库提供了两个关键字, asyncawait .

让我们看一下这个 async hello-world 示例:

import asyncio


async def hello():
    print("Hello World!")
    await asyncio.sleep(1)
    print("Hello again!")


asyncio.run(hello())


# Hello World!
# Hello again!

乍一看你可能认为这是一个同步代码,因为第二次打印等待 1 秒打印Hello again!Hello World!之后。但是这段代码实际上是异步的。

协程

任何定义为 async def 的函数都是像上面那样的协程。需要注意的是,调用 hello() 函数需要在 asyncio.run() 中执行,

为了运行协程,asyncio 提供了三种主要机制:

asyncio.run() 函数,它是启动事件循环并运行异步的主要入口点。

使用 await 协程的结果并将控制权传递给事件循环。

import asyncio
import time


async def say_something(delay, words):
    print(f"Before {words}")
    await asyncio.sleep(delay)
    print(f"After {words}")
    
async def main():
    print(f"Started: {time.strftime('%X')}") 
    await say_something(1, "Task 1")
    await say_something(2, "Task 2")
    print(f"Finished: {time.strftime( '%X' )}")


asyncio.run(main())


# Started:15:59:52
# Before Task 1
# After Task 1
# Before Task 2
# After Task 2
# Finished:15:59:55

前面的代码片段仍然等待 say_something() 协程完成,因此它在 1 秒内执行第一个任务,然后在等待 2 秒后执行第二个任务。

要让协程并发运行,我们应该创建任务,这是第三种机制。

asyncio.create_task() 用于安排协程执行的函数。

import asyncio
import time


async def say_something(delay, words):
    print(f"Before {words}")
    await asyncio.sleep(delay)
    print(f"After {words}")
    
async def main():
    print(f"Started: {time.strftime('%X')}")
    task1 = asyncio.create_task(say_something(1,"Task 1")) 
    task2 = asyncio.create_task(say_something(2, "Task 2")) 
    await task1
    await task2
    print(f"Finished: {time.strftime('%X')}")


asyncio.run(main())


# Started:16:07:35
# Before Task 1
# Before Task 2
# After Task 1
# After Task 2
# Finished:16:07:37

上面的代码现在并发运行,say_something() 协程不再等待 say_something() 协程完成。而是同时运行具有不同参数的同一个协程。

发生的情况如下:

say_something() 协程从参数的第一个任务(1 秒和一个字符串Task 1)开始。这个任务叫做 task1

然后它会暂停协程的执行并等待 1 秒让 say_something() 协程在遇到 await 关键字时完成。它将控制权返回给事件循环。

同样对于第二个任务,它会暂停协程的执行并等待 2 秒让 say_something() 协程完成,因为它遇到 await 关键字。

task1 控制返回到事件循环后,事件循环继续执行第二个任务 task2 因为 asyncio.sleep() 还没有完成。

asyncio.create_task() 包装 say_something() 函数并使其作为异步任务同时运行协程。 如您所见,上面的代码片段显示它的运行速度比以前快了 1 秒。

asyncio.create_task() 被调用时,协程会自动安排在事件循环中运行。

任务可以帮助您并发运行多个协程,但这并不是实现并发的唯一方法。

使用 asyncio.gather() 运行并发任务

另一种同时运行多个协程的方法是使用 asyncio.gather() 函数。此函数将协程作为参数并并发运行它们。

import asyncio
import time


async def greetings():
    print("Welcome")
    await asyncio.sleep(1)
    print("Goodbye")


async def main():
    start = time.time()
    await asyncio.gather(greetings(), greetings())
    elapsed = time.time() - start
    print(f"{__name__} executed in {elapsed:0.2f} seconds.")


asyncio.run(main())


# Welcome
# Welcome
# Goodbye
# Goodbye
# __main__ executed in 1.00 seconds.

在前面的代码中,greetings() 协程被并发执行了两次。

等待对象

如果一个对象可以与 await 关键字一起使用,则该对象称为可等待对象。可等待对象主要有 3 种类型:coroutinestasksfutures

coroutines

import asyncio


async def mult(first, second):
    print("Calculating multiplication...")
    await asyncio.sleep(1)
    mul = first * second
    print(f"{first} multiplied by {second} is {mul}")
    return mul


async def add(first, second):
    print("Calculating sum...")
    await asyncio.sleep(1)
    sum = first + second
    print(f"Sum of {first} and {second} is {sum}")
    return sum


async def main(first, second):
    await mult(first, second)
    await add(first, second)


asyncio.run(main(10, 20))


# Calculating multiplication...
# 10 multiplied by 20 is 200
# Calculating sum...
# Sum of 10 and 20 is 30

在前面的示例中, 协同程序 main() 等待 mult()add() 结束。

假设您在 mult 协程之前省略了 await 关键字。然后您将收到以下错误: RuntimeWarning: coroutine 'mult' was never awaited.

tasks

要安排协程在事件循环中运行,我们使用 asyncio.create_task() 函数。

import asyncio


async def mult(first, second):
    print("Calculating multiplication...")
    await asyncio.sleep(1)
    mul = first * second
    print(f"{first} multiplied by {second} is {mul}")
    return mul


async def add(first, second):
    print("Calculating sum...")
    await asyncio.sleep(1)
    sum = first + second
    print(f"Sum of {first} and {second} is {sum}")
    return sum


async def main(first, second):
    mult_task = asyncio.create_task(mult(first, second))
    add_task = asyncio.create_task(add(first, second))
    await mult_task
    await add_task


asyncio.run(main(10, 20))


# Calculating multiplication...
# Calculating sum...
# 10 multiplied by 20 is 200
# Sum of 10 and 20 is 30

futures

Future 是表示异步计算结果的低级可等待对象。它是通过调用 asyncio.Future() 函数创建的。

from asyncio import Future


future = Future()
print(future.done())
print(future.cancelled())
future.cancel()
print(future.done())
print(future.cancelled())


# False
# False
# True
# True

超时

用于 asyncio.wait_for(aw, timeout, *) 设置等待对象完成的超时。请注意, aw 这里是可等待的对象。如果要在可等待对象完成时间过长时引发异常,这将很有用。作为异常 asyncio.TimeoutError

import asyncio


async def slow_operation():
    await asyncio.sleep(400)
    print("Completed.")


async def main():
    try:
        await asyncio.wait_for(slow_operation(), timeout=1.0)
    except asyncio.TimeoutError:
        print("Timed out!")


asyncio.run(main())


# Timed out!

尽管协程需要 400 秒才能完成,但 slow_operation() 中的超时 Future 设置为 1 秒。

最后修改:2024 年 03 月 19 日
本站福利|微信扫描二维码,永久享受 96 折充话费电费