Python 异步编程通过单线程事件循环 + 协程协作式调度,让 IO 密集型任务在等待时切换到其他可执行任务。本文聚焦写法、模式、AI/ML 高并发场景的实战要点。
异步本质
| 维度 | 同步 | 多线程 | 异步 (asyncio) |
|---|---|---|---|
| 执行模型 | 串行阻塞 | 多线程并行 | 单线程协作式 |
| 切换成本 | — | 内核线程切换 | 协程切换(极低) |
| 适用 | 简单任务 | 兼容同步 IO 库 | IO 密集 + 大量并发 |
| CPU 密集 | OK | OK(受 GIL 限) | 不适合 |
异步 ≠ 并行。异步在单线程内”交替执行”,本质是高效的并发,不是真正的并行计算。
时间复杂度直观分析
n 个 IO 密集任务,每个 CPU 时间 ,IO 时间 ():
例:n=10,,。同步 11 s,异步约 2 s。
演进简史
| Python 版本 | 关键特性 |
|---|---|
| 3.4 | asyncio 标准库,@asyncio.coroutine + yield from |
| 3.5 | async / await 语法 |
| 3.7 | asyncio.run() 简化事件循环管理 |
| 3.11 | TaskGroup、ExceptionGroup、except*、asyncio.timeout() |
| 3.12 | 异步迭代器/生成器优化、asyncio.run(context=...) |
基础写法
import asyncio
async def fetch_data(url, delay):
await asyncio.sleep(delay)
return f"Data from {url}"
async def main():
result = await fetch_data("https://example.com", 1)
print(result)
asyncio.run(main())要点:
async def调用返回协程对象,不会立即执行,必须await或交给事件循环await后必须是可等待对象(协程、Task、Future)asyncio.run()是 Python 3.7+ 推荐入口,每次创建并关闭事件循环
三种并发调度
create_task + await:单独控制
async def main():
t1 = asyncio.create_task(fetch_data("A", 2))
t2 = asyncio.create_task(fetch_data("B", 1))
print(await t2) # 1s 后先返回
print(await t1)适合需要单独取消、查询状态、按优先级处理的场景。
asyncio.gather:批量并发
results = await asyncio.gather(
fetch_data("A", 2),
fetch_data("B", 1),
fetch_data("C", 3),
) # 顺序与传入一致适合”全部完成后统一处理”的批量场景(如读完所有数据再训练)。
TaskGroup(Python 3.11+,结构化并发)
async def main():
try:
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(fetch_data("A", 2))
t2 = tg.create_task(fetch_data("B", 1))
print(t1.result(), t2.result())
except* ValueError as eg:
print(eg.exceptions) # 多个异常一起捕获任意子任务抛异常会自动取消其他任务。生产环境应优先使用,避免任务泄露与异常吞没。
异常处理
- gather:默认遇异常立即终止其他任务(除非
return_exceptions=True) - TaskGroup:自动取消其他任务,异常聚合为
ExceptionGroup,用except*分类捕获 - 单任务:在
await处用try/except即可
常用进阶模式
asyncio.to_thread:兼容同步 IO 库
import time, asyncio
def sync_call(data):
time.sleep(2) # 阻塞函数
return f"done: {data}"
async def main():
result = await asyncio.to_thread(sync_call, "x")将同步阻塞函数放线程池执行,避免阻塞事件循环。
Semaphore:限流并发
sem = asyncio.Semaphore(5) # 最多 5 个并发
async def fetch(url):
async with sem:
return await aiohttp_get(url)
await asyncio.gather(*(fetch(u) for u in urls))避免对端限流或本地 TCP 连接耗尽。
asyncio.timeout(Python 3.11+)
try:
async with asyncio.timeout(2):
await long_running()
except TimeoutError:
...替代 asyncio.wait_for,语义更清晰。
AI/ML 场景示例:异步读取 OSS
import asyncio, ossfs
fs = ossfs.OSSFileSystem(endpoint="...", key="...", secret="...")
sem = asyncio.Semaphore(5)
async def read_tar(path):
async with sem, fs.open(path, "rb") as f:
data = await f.read(1024)
return f"{path}: {len(data)} bytes"
async def main():
paths = ["oss://bucket/x1.tar", "oss://bucket/x2.tar", ...]
results = await asyncio.gather(*(read_tar(p) for p in paths))类似模式适用于:批量调用模型推理 API、流式拉取大模型输出、并发聚合多个数据源。
适用与不适用
| 场景 | 推荐 |
|---|---|
| 批量调远程 API、读 OSS/S3 | asyncio + 异步库 |
| 大模型流式输出、实时推理服务 | asyncio + FastAPI |
| 模型训练、矩阵运算 | multiprocessing 或 GPU |
| 少量本地同步任务 | 直接同步代码 |
高频坑
| 坑 | 表现 | 解决 |
|---|---|---|
协程内调用 time.sleep / requests | 整个事件循环阻塞 | 改用 asyncio.sleep / aiohttp 等异步库 |
create_task 后忘记 await | 任务被静默取消,结果丢失 | 用 TaskGroup 或显式 await |
| 并发过高 | 上游限流、本地端口耗尽 | Semaphore 限流 |
| 异常被吞 | 调试困难 | 使用 TaskGroup + except* |
| 对应库不支持异步 | 强行 await 导致阻塞 | 用 asyncio.to_thread 或换支持异步的库 |
异步 IO 库速查
| 用途 | 库 |
|---|---|
| HTTP 客户端 | aiohttp、httpx |
| 文件读写 | aiofiles |
| OSS / S3 | ossfs、s3fs |
| PostgreSQL | asyncpg |
| MongoDB | motor |
| Web 框架 | FastAPI、Sanic、Starlette |