Python 异步编程通过单线程事件循环 + 协程协作式调度,让 IO 密集型任务在等待时切换到其他可执行任务。本文聚焦写法、模式、AI/ML 高并发场景的实战要点。

异步本质

维度同步多线程异步 (asyncio)
执行模型串行阻塞多线程并行单线程协作式
切换成本内核线程切换协程切换(极低)
适用简单任务兼容同步 IO 库IO 密集 + 大量并发
CPU 密集OKOK(受 GIL 限)不适合

异步 ≠ 并行。异步在单线程内”交替执行”,本质是高效的并发,不是真正的并行计算。

时间复杂度直观分析

n 个 IO 密集任务,每个 CPU 时间 ,IO 时间 ):

例:n=10,。同步 11 s,异步约 2 s。

演进简史

Python 版本关键特性
3.4asyncio 标准库,@asyncio.coroutine + yield from
3.5async / await 语法
3.7asyncio.run() 简化事件循环管理
3.11TaskGroupExceptionGroupexcept*asyncio.timeout()
3.12异步迭代器/生成器优化、asyncio.run(context=...)

基础写法

import asyncio
 
async def fetch_data(url, delay):
    await asyncio.sleep(delay)
    return f"Data from {url}"
 
async def main():
    result = await fetch_data("https://example.com", 1)
    print(result)
 
asyncio.run(main())

要点:

  • async def 调用返回协程对象,不会立即执行,必须 await 或交给事件循环
  • await 后必须是可等待对象(协程、Task、Future)
  • asyncio.run() 是 Python 3.7+ 推荐入口,每次创建并关闭事件循环

三种并发调度

create_task + await:单独控制

async def main():
    t1 = asyncio.create_task(fetch_data("A", 2))
    t2 = asyncio.create_task(fetch_data("B", 1))
    print(await t2)   # 1s 后先返回
    print(await t1)

适合需要单独取消、查询状态、按优先级处理的场景。

asyncio.gather:批量并发

results = await asyncio.gather(
    fetch_data("A", 2),
    fetch_data("B", 1),
    fetch_data("C", 3),
)  # 顺序与传入一致

适合”全部完成后统一处理”的批量场景(如读完所有数据再训练)。

TaskGroup(Python 3.11+,结构化并发)

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            t1 = tg.create_task(fetch_data("A", 2))
            t2 = tg.create_task(fetch_data("B", 1))
        print(t1.result(), t2.result())
    except* ValueError as eg:
        print(eg.exceptions)  # 多个异常一起捕获

任意子任务抛异常会自动取消其他任务。生产环境应优先使用,避免任务泄露与异常吞没。

异常处理

  • gather:默认遇异常立即终止其他任务(除非 return_exceptions=True
  • TaskGroup:自动取消其他任务,异常聚合为 ExceptionGroup,用 except* 分类捕获
  • 单任务:在 await 处用 try/except 即可

常用进阶模式

asyncio.to_thread:兼容同步 IO 库

import time, asyncio
 
def sync_call(data):
    time.sleep(2)  # 阻塞函数
    return f"done: {data}"
 
async def main():
    result = await asyncio.to_thread(sync_call, "x")

将同步阻塞函数放线程池执行,避免阻塞事件循环。

Semaphore:限流并发

sem = asyncio.Semaphore(5)  # 最多 5 个并发
 
async def fetch(url):
    async with sem:
        return await aiohttp_get(url)
 
await asyncio.gather(*(fetch(u) for u in urls))

避免对端限流或本地 TCP 连接耗尽。

asyncio.timeout(Python 3.11+)

try:
    async with asyncio.timeout(2):
        await long_running()
except TimeoutError:
    ...

替代 asyncio.wait_for,语义更清晰。

AI/ML 场景示例:异步读取 OSS

import asyncio, ossfs
 
fs = ossfs.OSSFileSystem(endpoint="...", key="...", secret="...")
sem = asyncio.Semaphore(5)
 
async def read_tar(path):
    async with sem, fs.open(path, "rb") as f:
        data = await f.read(1024)
    return f"{path}: {len(data)} bytes"
 
async def main():
    paths = ["oss://bucket/x1.tar", "oss://bucket/x2.tar", ...]
    results = await asyncio.gather(*(read_tar(p) for p in paths))

类似模式适用于:批量调用模型推理 API、流式拉取大模型输出、并发聚合多个数据源。

适用与不适用

场景推荐
批量调远程 API、读 OSS/S3asyncio + 异步库
大模型流式输出、实时推理服务asyncio + FastAPI
模型训练、矩阵运算multiprocessing 或 GPU
少量本地同步任务直接同步代码

高频坑

表现解决
协程内调用 time.sleep / requests整个事件循环阻塞改用 asyncio.sleep / aiohttp 等异步库
create_task 后忘记 await任务被静默取消,结果丢失TaskGroup 或显式 await
并发过高上游限流、本地端口耗尽Semaphore 限流
异常被吞调试困难使用 TaskGroup + except*
对应库不支持异步强行 await 导致阻塞asyncio.to_thread 或换支持异步的库

异步 IO 库速查

用途
HTTP 客户端aiohttp、httpx
文件读写aiofiles
OSS / S3ossfs、s3fs
PostgreSQLasyncpg
MongoDBmotor
Web 框架FastAPI、Sanic、Starlette