高性能 Python:Asyncio

并发编程是一种处理多个任务同时执行的编程方法。在 Python 中,`asyncio` 是一个实现异步编程的强大工具。`asyncio` 基于协程的概念,可以高效地处理 I/O 密集型任务。本文将介绍 `asyncio` 的基本原理和使用方法。

为什么我们需要 asyncio
我们知道,在处理 I/O 操作时,使用多线程可以比普通的单线程大大提高效率。那么,为什么还需要 `asyncio` 呢?
多线程具有很多优点,被广泛应用,但是也存在一定的局限性:
正是为了解决这些问题,`asyncio` 才出现的。
同步 VS 异步
我们先来区分一下Sync(同步)和Async(异步)的概念。
asyncio 的工作原理
总结一下,asyncio 的工作原理是基于协程和事件循环的机制,通过使用协程进行异步操作,并由事件循环负责协程的调度和执行,asyncio 实现了一种高效的异步编程模型。
协程和异步编程
协程是 `asyncio` 中的一个重要概念,是轻量级的执行单元,可以在任务之间快速切换,而无需线程切换的开销。协程可以用 `async` 关键字定义,`await` 关键字用于暂停协程的执行,并在某个操作完成后恢复。
下面是一个简单的示例代码,演示了如何使用协程进行异步编程:
import asyncio async def hello(): print("Hello") await asyncio.sleep(1) # Simulate a time-consuming operation print("World") # Create an event loop loop = asyncio.get_event_loop() # Add the coroutine to the event loop and execute loop.run_until_complete(hello())
在这个例子中,函数 `hello()` 是一个使用 `async` 关键字定义的协程。在协程内部,我们可以使用 `await` 来暂停其执行。这里,`asyncio.sleep(1)` 用于模拟耗时操作。`run_until_complete()` 方法将协程添加到事件循环并运行它。
异步 I/O 操作
`asyncio` 主要用于处理 I/O 密集型任务,例如网络请求、文件读写等,它提供了一系列异步 I/O 操作的 API,可以与 `await` 关键字结合使用,轻松实现异步编程。
下面是一个简单的示例代码,展示如何使用 `asyncio` 进行异步网络请求:
import asyncio import aiohttp async def fetch(session, url): async with session.get(url) as response: return await response.text() async def main(): async with aiohttp.ClientSession() as session: html = await fetch(session, 'https://www.example.com') print(html) # Create an event loop loop = asyncio.get_event_loop() # Add the coroutine to the event loop and execute loop.run_until_complete(main())
在这个例子中,我们使用 `aiohttp` 库进行网络请求。函数 `fetch()` 是一个协程,它通过 `session.get()` 方法发起异步 GET 请求,并使用 `await` 关键字等待响应返回。函数 `main()` 是另一个协程,它在内部创建一个 `ClientSession` 对象以供重用,然后调用 `fetch()` 方法获取网页内容并打印。
**注意**:这里我们使用 `aiohttp` 库来替代 `requests` 库,是因为 `requests` 库不兼容 `asyncio`,而 `aiohttp` 库兼容。要想用好 `asyncio`,特别是发挥它的强大功能,很多时候都需要配套的 Python 库。
多个任务并发执行
`asyncio` 还提供了一些并发执行多个任务的机制,例如 `asyncio.gather()` 和 `asyncio.wait()`。以下是一段示例代码,展示了如何使用这些机制并发执行多个协程任务:
import asyncio async def task1(): print("Task 1 started") await asyncio.sleep(1) print("Task 1 finished") async def task2(): print("Task 2 started") await asyncio.sleep(2) print("Task 2 finished") async def main(): await asyncio.gather(task1(), task2()) # Create an event loop loop = asyncio.get_event_loop() # Add the coroutine to the event loop and execute loop.run_until_complete(main())
在这个例子中,我们定义了两个协程任务 `task1()` 和 `task2()`,这两个任务都执行一些耗时的操作。协程 `main()` 通过 `asyncio.gather()` 同时启动这两个任务并等待它们完成。并发执行可以提高程序执行效率。
如何选择?
实际项目中到底应该选择多线程还是 asyncio?有位大佬总结的很形象:
if io_bound: if io_slow: print('Use Asyncio') else: print('Use multi-threading') elif cpu_bound: print('Use multi-processing')
实践
输入一个列表。对于列表中的每个元素,我们要计算从 0 到该元素的所有整数的平方和。
同步实现
import time def cpu_bound(number): return sum(i * i for i in range(number)) def calculate_sums(numbers): for number in numbers: cpu_bound(number) def main(): start_time = time.perf_counter() numbers = [10000000 + x for x in range(20)] calculate_sums(numbers) end_time = time.perf_counter() print('Calculation takes {} seconds'.format(end_time - start_time)) if __name__ == '__main__': main()
执行时间为“计算耗时 16.00943413000002 秒”
使用concurrent.futures实现异步
import time from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed def cpu_bound(number): return sum(i * i for i in range(number)) def calculate_sums(numbers): with ProcessPoolExecutor() as executor: results = executor.map(cpu_bound, numbers) results = [result for result in results] print(results) def main(): start_time = time.perf_counter() numbers = [10000000 + x for x in range(20)] calculate_sums(numbers) end_time = time.perf_counter() print('Calculation takes {} seconds'.format(end_time - start_time)) if __name__ == '__main__': main()
执行时间为“计算耗时 7.314132894999999 秒”
在这个改进的代码中,我们使用 `concurrent.futures.ProcessPoolExecutor` 创建进程池,然后使用 `executor.map()` 方法提交任务并获取结果。注意,使用 `executor.map()` 后,如果需要获取结果,可以将结果迭代成列表,或者使用其他方法对结果进行处理。
多处理实现
import time import multiprocessing def cpu_bound(number): return sum(i * i for i in range(number)) def calculate_sums(numbers): with multiprocessing.Pool() as pool: pool.map(cpu_bound, numbers) def main(): start_time = time.perf_counter() numbers = [10000000 + x for x in range(20)] calculate_sums(numbers) end_time = time.perf_counter() print('Calculation takes {} seconds'.format(end_time - start_time)) if __name__ == '__main__': main()
执行时间为“计算耗时 5.024221667 秒”
`concurrent.futures.ProcessPoolExecutor` 和 `multiprocessing` 都是 Python 中实现多进程并发的库,它们之间有区别:
综上所述,`concurrent.futures.ProcessPoolExecutor` 是一个高层接口,封装了底层的多进程功能,适用于简单的多进程任务并行化。`multiprocessing` 是一个更底层的库,提供更多的控制和灵活性,适用于需要对进程进行细粒度控制的场景。需要根据具体需求选择合适的库。如果只是简单的任务并行化,可以使用 `concurrent.futures.ProcessPoolExecutor` 来简化代码;如果需要更多的底层控制和通信,可以使用 `multiprocessing` 库。
概括
与多线程不同,`asyncio`是单线程的,但其内部事件循环的机制允许它同时运行多个不同的任务,并且比多线程具有更大的自主控制能力。
`asyncio` 中的任务在运行过程中不会被打断,因此不会出现 race condition 的情况。
特别是在 I/O 操作繁重的场景下,asyncio 比多线程有更高的运行效率。因为 asyncio 中任务切换的代价比线程切换的代价小得多,并且 asyncio 可以启动的任务数量比多线程中的线程数量要多得多。
不过需要注意的是,很多情况下使用 `asyncio` 需要特定第三方库的支持,比如上例中的 `aiohttp`。而如果 I/O 操作很快,且不繁重,使用多线程也能有效解决问题。
Leapcell:FastAPI、Flask 和其他 Python 应用程序的理想平台
最后我来介绍一下部署Flask/FastAPI的理想平台:Leapcell。
Leapcell 是专为现代分布式应用设计的云计算平台。其按需付费的定价模式确保没有闲置成本,这意味着用户只需为实际使用的资源付费。

在文档中了解更多信息!
Leapcell Twitter:https://x.com/LeapcellHQ