高性能 Python:Asyncio

Image description

并发编程是一种处理多个任务同时执行的编程方法。在 Python 中,`asyncio` 是一个实现异步编程的强大工具。`asyncio` 基于协程的概念,可以高效地处理 I/O 密集型任务。本文将介绍 `asyncio` 的基本原理和使用方法。

Image description

为什么我们需要 asyncio

我们知道,在处理 I/O 操作时,使用多线程可以比普通的单线程大大提高效率。那么,为什么还需要 `asyncio` 呢?

多线程具有很多优点,被广泛应用,但是也存在一定的局限性:

  • 例如多线程的运行过程很容易被打断,从而可能出现race condition的情况。
  • 而且线程切换本身也是有一定代价的,线程数量不可能无限增加,所以如果你的I/O操作非常繁重的话,多线程很可能达不到高效高质量的要求。
  • 正是为了解决这些问题,`asyncio` 才出现的。

    同步 VS 异步

    我们先来区分一下Sync(同步)和Async(异步)的概念。

  • 同步是指操作是依次执行的,只有前一个操作完成后,才能执行下一个操作。
  • Async 是指不同的操作可以交替执行,如果其中一个操作被阻塞了,程序不会等待而是会去寻找可执行的操作继续执行。
  • asyncio 的工作原理

  • 协程:asyncio 使用协程来实现异步操作。协程是使用 async 关键字定义的特殊函数。在协程中,可以使用 await 关键字暂停当前协程的执行并等待异步操作完成。
  • 事件循环:事件循环是 asyncio 的核心机制之一,负责协程的调度和执行,以及处理协程之间的切换。事件循环会不断轮询可执行的任务,一旦某个任务就绪(比如 I/O 操作完成或者定时器到期),事件循环就会将其放入执行队列,继续执行下一个任务。
  • 异步任务:在 asyncio 中,我们通过创建异步任务来执行协程。异步任务由 asyncio.create_task() 函数创建,该函数将协程封装成可等待对象并提交给事件循环进行处理。
  • 异步 I/O 操作:asyncio 提供了一组异步 I/O 操作(如网络请求、文件读写等),通过 await 关键字可以和协程、事件循环无缝结合。使用异步 I/O 操作可以避免等待 I/O 完成时的阻塞,提高程序性能和并发性。
  • 回调:asyncio 还支持使用回调函数来处理异步操作的结果,可以使用 asyncio.ensure_future() 函数将回调函数封装成 awaitable 对象,并提交给事件循环进行处理。
  • 并发执行:asyncio 可以并发执行多个协程任务,事件循环会根据任务的就绪情况,自动调度协程的执行,从而实现高效的并发编程。
  • 总结一下,asyncio 的工作原理是基于协程和事件循环的机制,通过使用协程进行异步操作,并由事件循环负责协程的调度和执行,asyncio 实现了一种高效的异步编程模型。

    协程和异步编程

    协程是 `asyncio` 中的一个重要概念,是轻量级的执行单元,可以在任务之间快速切换,而无需线程切换的开销。协程可以用 `async` 关键字定义,`await` 关键字用于暂停协程的执行,并在某个操作完成后恢复。

    下面是一个简单的示例代码,演示了如何使用协程进行异步编程:

    import asyncio
    
    async def hello():
        print("Hello")
        await asyncio.sleep(1)  # Simulate a time-consuming operation
        print("World")
    
    # Create an event loop
    loop = asyncio.get_event_loop()
    
    # Add the coroutine to the event loop and execute
    loop.run_until_complete(hello())

    在这个例子中,函数 `hello()` 是一个使用 `async` 关键字定义的协程。在协程内部,我们可以使用 `await` 来暂停其执行。这里,`asyncio.sleep(1)` 用于模拟耗时操作。`run_until_complete()` 方法将协程添加到事件循环并运行它。

    异步 I/O 操作

    `asyncio` 主要用于处理 I/O 密集型任务,例如网络请求、文件读写等,它提供了一系列异步 I/O 操作的 API,可以与 `await` 关键字结合使用,轻松实现异步编程。

    下面是一个简单的示例代码,展示如何使用 `asyncio` 进行异步网络请求:

    import asyncio
    import aiohttp
    
    async def fetch(session, url):
        async with session.get(url) as response:
            return await response.text()
    
    async def main():
        async with aiohttp.ClientSession() as session:
            html = await fetch(session, 'https://www.example.com')
            print(html)
    
    # Create an event loop
    loop = asyncio.get_event_loop()
    
    # Add the coroutine to the event loop and execute
    loop.run_until_complete(main())

    在这个例子中,我们使用 `aiohttp` 库进行网络请求。函数 `fetch()` 是一个协程,它通过 `session.get()` 方法发起异步 GET 请求,并使用 `await` 关键字等待响应返回。函数 `main()` 是另一个协程,它在内部创建一个 `ClientSession` 对象以供重用,然后调用 `fetch()` 方法获取网页内容并打印。

    **注意**:这里我们使用 `aiohttp` 库来替代 `requests` 库,是因为 `requests` 库不兼容 `asyncio`,而 `aiohttp` 库兼容。要想用好 `asyncio`,特别是发挥它的强大功能,很多时候都需要配套的 Python 库。

    多个任务并发执行

    `asyncio` 还提供了一些并发执行多个任务的机制,例如 `asyncio.gather()` 和 `asyncio.wait()`。以下是一段示例代码,展示了如何使用这些机制并发执行多个协程任务:

    import asyncio
    
    async def task1():
        print("Task 1 started")
        await asyncio.sleep(1)
        print("Task 1 finished")
    
    async def task2():
        print("Task 2 started")
        await asyncio.sleep(2)
        print("Task 2 finished")
    
    async def main():
        await asyncio.gather(task1(), task2())
    
    # Create an event loop
    loop = asyncio.get_event_loop()
    
    # Add the coroutine to the event loop and execute
    loop.run_until_complete(main())

    在这个例子中,我们定义了两个协程任务 `task1()` 和 `task2()`,这两个任务都执行一些耗时的操作。协程 `main()` 通过 `asyncio.gather()` 同时启动这两个任务并等待它们完成。并发执行可以提高程序执行效率。

    如何选择?

    实际项目中到底应该选择多线程还是 asyncio?有位大佬总结的很形象:

    if io_bound:
        if io_slow:
            print('Use Asyncio')
        else:
            print('Use multi-threading')
    elif cpu_bound:
        print('Use multi-processing')
  • 如果是I/O受限,且I/O操作较慢,需要许多任务/线程的配合,那么使用asyncio更为合适。
  • 如果是 I/O 密集型的,但是 I/O 操作很快并且只需要有限数量的任务/线程,那么多线程就可以了。
  • 如果是CPU密集型的,则需要多处理来提高程序运行效率。
  • 实践

    输入一个列表。对于列表中的每个元素,我们要计算从 0 到该元素的所有整数的平方和。

    同步实现

    import time
    
    def cpu_bound(number):
        return sum(i * i for i in range(number))
    
    def calculate_sums(numbers):
        for number in numbers:
            cpu_bound(number)
    
    def main():
        start_time = time.perf_counter()
        numbers = [10000000 + x for x in range(20)]
        calculate_sums(numbers)
        end_time = time.perf_counter()
        print('Calculation takes {} seconds'.format(end_time - start_time))
    
    if __name__ == '__main__':
        main()

    执行时间为“计算耗时 16.00943413000002 秒”

    使用concurrent.futures实现异步

    import time
    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
    
    def cpu_bound(number):
        return sum(i * i for i in range(number))
    
    def calculate_sums(numbers):
        with ProcessPoolExecutor() as executor:
            results = executor.map(cpu_bound, numbers)
            results = [result for result in results]
        print(results)
    
    def main():
        start_time = time.perf_counter()
        numbers = [10000000 + x for x in range(20)]
        calculate_sums(numbers)
        end_time = time.perf_counter()
        print('Calculation takes {} seconds'.format(end_time - start_time))
    
    if __name__ == '__main__':
        main()

    执行时间为“计算耗时 7.314132894999999 秒”

    在这个改进的代码中,我们使用 `concurrent.futures.ProcessPoolExecutor` 创建进程池,然后使用 `executor.map()` 方法提交任务并获取结果。注意,使用 `executor.map()` 后,如果需要获取结果,可以将结果迭代成列表,或者使用其他方法对结果进行处理。

    多处理实现

    import time
    import multiprocessing
    
    def cpu_bound(number):
        return sum(i * i for i in range(number))
    
    def calculate_sums(numbers):
        with multiprocessing.Pool() as pool:
            pool.map(cpu_bound, numbers)
    
    def main():
        start_time = time.perf_counter()
        numbers = [10000000 + x for x in range(20)]
        calculate_sums(numbers)
        end_time = time.perf_counter()
        print('Calculation takes {} seconds'.format(end_time - start_time))
    
    if __name__ == '__main__':
        main()

    执行时间为“计算耗时 5.024221667 秒”

    `concurrent.futures.ProcessPoolExecutor` 和 `multiprocessing` 都是 Python 中实现多进程并发的库,它们之间有区别:

  • 基于接口的封装:concurrent.futures.ProcessPoolExecutor 是concurrent.futures模块提供的高层接口,封装了底层的多进程函数,可以方便编写多进程代码;而multiprocessing是Python的标准库之一,提供了完整的多进程支持,可以直接对进程进行操作。
  • API 使用:concurrent.futures.ProcessPoolExecutor 的使用方式类似线程池,将可调用对象(如函数)提交给进程池执行,并返回 Future 对象,通过 Future 对象可以获取执行结果。multiprocessing 提供了更多底层的进程管理和通信接口,可以显式地创建、启动和控制进程,多个进程之间可以使用队列或管道进行通信。
  • 可扩展性和灵活性:由于 multiprocessing 提供了更多的底层接口,因此相比于 concurrent.futures.ProcessPoolExecutor 更加灵活。通过直接操作进程,可以对每个进程实现更细粒度的控制,比如设置进程优先级、进程间共享数据等。concurrent.futures.ProcessPoolExecutor 更适合简单的任务并行化,隐藏了很多底层细节,使得多进程代码的编写更加简单。
  • 跨平台支持:concurrent.futures.ProcessPoolExecutor 和 multiprocessing 都提供跨平台的多进程支持,可以在各种操作系统上使用。
  • 综上所述,`concurrent.futures.ProcessPoolExecutor` 是一个高层接口,封装了底层的多进程功能,适用于简单的多进程任务并行化。`multiprocessing` 是一个更底层的库,提供更多的控制和灵活性,适用于需要对进程进行细粒度控制的场景。需要根据具体需求选择合适的库。如果只是简单的任务并行化,可以使用 `concurrent.futures.ProcessPoolExecutor` 来简化代码;如果需要更多的底层控制和通信,可以使用 `multiprocessing` 库。

    概括

    与多线程不同,`asyncio`是单线程的,但其内部事件循环的机制允许它同时运行多个不同的任务,并且比多线程具有更大的自主控制能力。

    `asyncio` 中的任务在运行过程中不会被打断,因此不会出现 race condition 的情况。

    特别是在 I/O 操作繁重的场景下,asyncio 比多线程有更高的运行效率。因为 asyncio 中任务切换的代价比线程切换的代价小得多,并且 asyncio 可以启动的任务数量比多线程中的线程数量要多得多。

    不过需要注意的是,很多情况下使用 `asyncio` 需要特定第三方库的支持,比如上例中的 `aiohttp`。而如果 I/O 操作很快,且不繁重,使用多线程也能有效解决问题。

  • asyncio 是一个用于实现异步编程的 Python 库。
  • 协程是asyncio的核心概念,通过async和await关键字实现异步操作。
  • asyncio 为异步 I/O 操作提供了强大的 API,可以轻松处理 I/O 密集型任务。
  • 通过asyncio.gather()等机制,可以并发执行多个协程任务。
  • Leapcell:FastAPI、Flask 和其他 Python 应用程序的理想平台

    最后我来介绍一下部署Flask/FastAPI的理想平台:Leapcell。

    Leapcell 是专为现代分布式应用设计的云计算平台。其按需付费的定价模式确保没有闲置成本,这意味着用户只需为实际使用的资源付费。

    Image description
  • 多语言支持支持使用 JavaScript、Python、Go 或 Rust 进行开发。
  • 免费部署无限项目仅根据使用量收费。没有请求时不收费。
  • 无与伦比的成本效益 按需付费,无闲置费用。例如,25 美元可以支持 694 万个请求,平均响应时间为 60 毫秒。
  • 简化的开发人员体验直观的用户界面,易于设置。全自动 CI/CD 管道和 GitOps 集成。实时指标和日志,提供可操作的见解。
  • 轻松的可扩展性和高性能自动扩展以轻松处理高并发性。零操作开销,让开发人员专注于开发。
  • 在文档中了解更多信息!

    Leapcell Twitter:https://x.com/LeapcellHQ