Python asyncio 异步编程实战:从原理到生产
编程语言
同步 vs 异步:执行模型对比
理解异步编程的第一步是搞清楚同步与异步的本质区别。
| 维度 | 同步(Sync) | 异步(Async) |
|---|---|---|
| 执行方式 | 顺序阻塞,一个接一个 | 并发非阻塞,I/O 等待时切换 |
| 线程占用 | 每个 I/O 占用整个线程 | 单线程处理多个 I/O |
| 适用场景 | CPU 密集型 | I/O 密集型(网络、磁盘) |
| 资源消耗 | 线程开销大(~8MB/线程) | 协程开销极小(~2KB/协程) |
| 编程复杂度 | 简单直观 | 需要理解事件循环 |
import time
import asyncio
def fetchSync(name, delay):
print(f"[同步] 开始获取 {name}")
time.sleep(delay)
print(f"[同步] 完成 {name}")
return f"{name}_data"
async def fetchAsync(name, delay):
print(f"[异步] 开始获取 {name}")
await asyncio.sleep(delay)
print(f"[异步] 完成 {name}")
return f"{name}_data"
def runSync():
start = time.perf_counter()
fetchSync("API-A", 2)
fetchSync("API-B", 2)
fetchSync("API-C", 2)
print(f"同步总耗时: {time.perf_counter() - start:.2f}s")
async def runAsync():
start = time.perf_counter()
await asyncio.gather(
fetchAsync("API-A", 2),
fetchAsync("API-B", 2),
fetchAsync("API-C", 2),
)
print(f"异步总耗时: {time.perf_counter() - start:.2f}s")
runSync()
asyncio.run(runAsync())
同步耗时 ~6s,异步耗时 ~2s——三个 I/O 并发执行。
事件循环(Event Loop)原理
事件循环是 asyncio 的心脏,负责调度所有协程的执行。
核心机制
┌─────────────────────────────────┐
│ Event Loop │
│ ┌───────┐ ┌───────┐ │
│ │ Task1 │ │ Task2 │ ... │
│ └───┬───┘ └───┬───┘ │
│ │ │ │
│ ┌───▼──────────▼───────────┐ │
│ │ Ready Queue (可执行) │ │
│ └──────────────────────────┘ │
│ ┌──────────────────────────┐ │
│ │ I/O Poll (等待I/O完成) │ │
│ └──────────────────────────┘ │
└─────────────────────────────────┘
手动控制事件循环
import asyncio
async def task1():
print("Task1 开始")
await asyncio.sleep(1)
print("Task1 结束")
return "result1"
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(task1())
print(f"结果: {result}")
finally:
loop.close()
事件循环策略
import asyncio
print(asyncio.get_event_loop_policy())
loop = asyncio.get_running_loop()
print(f"正在运行: {loop.is_running()}")
print(f"已关闭: {loop.is_closed()}")
async/await 语法深入
协程定义与调用
async def coroutineFunc():
"""async def 定义协程函数"""
await asyncio.sleep(0.1)
return 42
coro = coroutineFunc()
print(type(coro)) # <class 'coroutine'>
result = await coroutineFunc()
await 的本质
await 挂起当前协程,将控制权交还事件循环,直到 awaitable 对象完成。
import asyncio
async def slowOperation():
print("开始慢操作...")
result = await asyncio.sleep(2, result="完成")
print(f"慢操作结果: {result}")
return result
async def main():
task = asyncio.create_task(slowOperation())
print(f"Task 状态: {task.done()}")
await task
print(f"Task 状态: {task.done()}")
asyncio.run(main())
协程生命周期
import asyncio
async def lifecycleDemo():
print("1. 协程创建(created)")
await asyncio.sleep(0)
print("2. 首次被调度(running)")
await asyncio.sleep(0.5)
print("3. await 挂起(suspended)→ 事件循环调度其他协程")
await asyncio.sleep(0)
print("4. 恢复执行(running)")
return "5. 返回结果(finished)"
async def main():
coro = lifecycleDemo()
print(f"类型: {type(coro)}")
result = await coro
print(f"最终: {result}")
asyncio.run(main())
Task vs Future
Future:底层结果容器
import asyncio
async def futureDemo():
loop = asyncio.get_running_loop()
future = loop.create_future()
async def setValue():
await asyncio.sleep(1)
future.set_result("未来已来")
asyncio.create_task(setValue())
result = await future
print(f"Future 结果: {result}")
asyncio.run(futureDemo())
Task:可调度的协程包装
import asyncio
async def taskVsFuture():
async def work(name, seconds):
await asyncio.sleep(seconds)
return f"{name} 完成"
task = asyncio.create_task(work("任务A", 1))
print(f"Task 类型: {type(task)}")
print(f"是 Future 子类: {issubclass(asyncio.Task, asyncio.Future)}")
print(f"Task 名字: {task.get_name()}")
print(f"完成状态: {task.done()}")
result = await task
print(f"结果: {result}, 完成状态: {task.done()}")
asyncio.run(taskVsFuture())
TaskGroup(Python 3.11+)
import asyncio
async def taskGroupDemo():
async def fetch(itemId):
await asyncio.sleep(0.5)
return f"item_{itemId}"
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch(i)) for i in range(5)]
results = [t.result() for t in tasks]
print(f"所有结果: {results}")
asyncio.run(taskGroupDemo())
常用异步并发模式
asyncio.gather:并发收集结果
import asyncio
async def gatherDemo():
async def fetch(url, delay):
await asyncio.sleep(delay)
return f"Response from {url}"
results = await asyncio.gather(
fetch("api.example.com/users", 1),
fetch("api.example.com/posts", 2),
fetch("api.example.com/comments", 1.5),
)
print(f"全部结果: {results}")
resultsWithErrors = await asyncio.gather(
fetch("ok.com", 0.5),
fetch("err.com", 0.5),
return_exceptions=True,
)
print(f"含异常: {resultsWithErrors}")
asyncio.run(gatherDemo())
asyncio.wait:更灵活的等待
import asyncio
async def waitDemo():
async def job(name, delay):
await asyncio.sleep(delay)
return name
tasks = {
asyncio.create_task(job("fast", 0.5)),
asyncio.create_task(job("medium", 1.0)),
asyncio.create_task(job("slow", 2.0)),
}
done, pending = await asyncio.wait(tasks, timeout=1.5)
print(f"已完成: {[t.result() for t in done]}")
print(f"未完成: {len(pending)} 个")
for t in pending:
t.cancel()
asyncio.run(waitDemo())
asyncio.as_completed:按完成顺序获取
import asyncio
async def asCompletedDemo():
async def job(name, delay):
await asyncio.sleep(delay)
return f"{name} (耗时{delay}s)"
tasks = [
asyncio.create_task(job("慢任务", 3)),
asyncio.create_task(job("快任务", 1)),
asyncio.create_task(job("中任务", 2)),
]
for coro in asyncio.as_completed(tasks):
result = await coro
print(f"完成: {result}")
asyncio.run(asCompletedDemo())
异步 HTTP 请求
aiohttp
import asyncio
import aiohttp
async def aiohttpDemo():
async with aiohttp.ClientSession() as session:
async def fetchJson(url):
async with session.get(url) as response:
response.raise_for_status()
return await response.json()
results = await asyncio.gather(
fetchJson("https://api.github.com/users/python"),
fetchJson("https://api.github.com/repos/python/cpython"),
)
print(f"用户: {results[0]['login']}")
print(f"仓库: {results[1]['full_name']}")
asyncio.run(aiohttpDemo())
httpx(支持 HTTP/2)
import asyncio
import httpx
async def httpxDemo():
async with httpx.AsyncClient(http2=True) as client:
response = await client.get("https://httpbin.org/get")
print(f"状态码: {response.status_code}")
print(f"协议: {response.extensions.get('http_version')}")
tasks = [
client.get(f"https://httpbin.org/delay/{i}")
for i in range(1, 4)
]
responses = await asyncio.gather(*tasks)
print(f"并发 {len(responses)} 个请求全部完成")
asyncio.run(httpxDemo())
异步数据库操作
asyncpg(PostgreSQL)
import asyncio
import asyncpg
async def asyncpgDemo():
conn = await asyncpg.connect(
"postgresql://user:pass@localhost/mydb"
)
await conn.execute("""
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
name TEXT,
email TEXT
)
""")
await conn.execute(
"INSERT INTO users (name, email) VALUES ($1, $2)",
"张三", "zhang@example.com"
)
rows = await conn.fetch("SELECT * FROM users")
for row in rows:
print(dict(row))
await conn.close()
asyncio.run(asyncpgDemo())
aiomysql(MySQL)
import asyncio
import aiomysql
async def aiomysqlDemo():
conn = await aiomysql.connect(
host="localhost", port=3306,
user="root", password="pass", db="mydb"
)
async with conn.cursor() as cur:
await cur.execute("SELECT * FROM products LIMIT 10")
rows = await cur.fetchall()
print(f"查询到 {len(rows)} 条记录")
conn.close()
asyncio.run(aiomysqlDemo())
motor(MongoDB)
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient
async def motorDemo():
client = AsyncIOMotorClient("mongodb://localhost:27017")
db = client.mydb
collection = db.users
await collection.insert_one({"name": "李四", "age": 30})
cursor = collection.find({"age": {"$gte": 25}})
async for doc in cursor:
print(doc)
asyncio.run(motorDemo())
异步文件 I/O
aiofiles
import asyncio
import aiofiles
import json
async def aiofilesDemo():
async with aiofiles.open("data.json", mode="w") as f:
await f.write(json.dumps({"key": "value"}, ensure_ascii=False))
async with aiofiles.open("data.json", mode="r") as f:
content = await f.read()
data = json.loads(content)
print(f"读取: {data}")
async with aiofiles.open("large.log", mode="r") as f:
async for line in f:
if "ERROR" in line:
print(line.strip())
asyncio.run(aiofilesDemo())
生产者-消费者模式
import asyncio
import random
async def producer(queue, producerId):
for i in range(10):
item = f"item-{producerId}-{i}"
await queue.put(item)
print(f"生产者{producerId} → {item}")
await asyncio.sleep(random.uniform(0.1, 0.5))
await queue.put(None)
async def consumer(queue, consumerId):
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
print(f"消费者{consumerId} ← {item}")
await asyncio.sleep(random.uniform(0.2, 0.8))
queue.task_done()
async def producerConsumerDemo():
queue = asyncio.Queue(maxsize=5)
producers = [asyncio.create_task(producer(queue, i)) for i in range(2)]
consumers = [asyncio.create_task(consumer(queue, i)) for i in range(3)]
await asyncio.gather(*producers)
for _ in consumers:
await queue.put(None)
await asyncio.gather(*consumers)
await queue.join()
asyncio.run(producerConsumerDemo())
Semaphore 限流
import asyncio
async def rateLimitedRequests():
semaphore = asyncio.Semaphore(5)
async def fetch(url):
async with semaphore:
print(f"请求: {url}")
await asyncio.sleep(1)
return f"response_{url}"
tasks = [fetch(f"url_{i}") for i in range(20)]
results = await asyncio.gather(*tasks)
print(f"完成 {len(results)} 个请求(最多5个并发)")
asyncio.run(rateLimitedRequests())
错误处理与取消
异常处理
import asyncio
async def errorHandling():
async def riskyOp(name):
await asyncio.sleep(0.5)
if name == "bad":
raise ValueError(f"{name} 出错了")
return f"{name}_ok"
results = await asyncio.gather(
riskyOp("good1"),
riskyOp("bad"),
riskyOp("good2"),
return_exceptions=True,
)
for r in results:
if isinstance(r, Exception):
print(f"异常: {r}")
else:
print(f"成功: {r}")
asyncio.run(errorHandling())
任务取消
import asyncio
async def cancellationDemo():
async def longRunning():
try:
print("开始长时间运行...")
await asyncio.sleep(10)
print("这不会执行")
except asyncio.CancelledError:
print("任务被取消,执行清理")
raise
task = asyncio.create_task(longRunning())
await asyncio.sleep(1)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("确认任务已取消")
asyncio.run(cancellationDemo())
超时控制
import asyncio
async def timeoutDemo():
async def slowTask():
await asyncio.sleep(5)
return "完成"
try:
result = await asyncio.wait_for(slowTask(), timeout=2)
except asyncio.TimeoutError:
print("任务超时!")
try:
async with asyncio.timeout(2):
await slowTask()
except TimeoutError:
print("asyncio.timeout 超时!")
asyncio.run(timeoutDemo())
调试异步代码
asyncio.run 的调试模式
import asyncio
async def debugDemo():
async def forgottenAwait():
coro = asyncio.sleep(1)
print(f"未 await 的协程: {coro}")
asyncio.run(forgottenAwait(), debug=True)
日志配置
import asyncio
import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("asyncio")
async def loggingDemo():
async def work():
logger.info("开始工作")
await asyncio.sleep(0.5)
logger.info("工作完成")
await work()
asyncio.run(loggingDemo(), debug=True)
aiomonitor 实时监控
pip install aiomonitor
import asyncio
import aiomonitor
async def main():
async def backgroundTask():
while True:
await asyncio.sleep(1)
print("后台任务运行中...")
task = asyncio.create_task(backgroundTask())
await asyncio.sleep(10)
task.cancel()
with aiomonitor.start_monitor(loop=asyncio.new_event_loop()):
asyncio.run(main())
混合同步与异步代码
run_in_executor
import asyncio
import time
def syncCpuBound(n):
"""CPU 密集型同步函数"""
return sum(i * i for i in range(n))
async def runInExecutorDemo():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, syncCpuBound, 10**7)
print(f"默认线程池结果: {result}")
import concurrent.futures
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, syncCpuBound, 10**8)
print(f"进程池结果: {result}")
asyncio.run(runInExecutorDemo())
to_thread(Python 3.9+)
import asyncio
def blockingIo():
import time
time.sleep(2)
return "阻塞I/O完成"
async def toThreadDemo():
result = await asyncio.to_thread(blockingIo)
print(result)
asyncio.run(toThreadDemo())
性能对比:同步 vs 异步
| 场景 | 同步耗时 | 异步耗时 | 提升倍数 |
|---|---|---|---|
| 100 个 HTTP 请求 | ~100s | ~2s | 50x |
| 50 次数据库查询 | ~5s | ~0.3s | 16x |
| 10 个文件读写 | ~1s | ~0.2s | 5x |
| CPU 密集计算 | 10s | 10s+ | 1x(无提升) |
| 混合 I/O+CPU | 15s | 5s | 3x |
import asyncio
import time
import aiohttp
async def benchmark():
urls = [f"https://httpbin.org/delay/0.5" for _ in range(50)]
async def fetchOne(session, url):
async with session.get(url) as r:
return r.status
start = time.perf_counter()
async with aiohttp.ClientSession() as session:
await asyncio.gather(*[fetchOne(session, u) for u in urls])
elapsed = time.perf_counter() - start
print(f"50个请求异步耗时: {elapsed:.2f}s")
asyncio.run(benchmark())
常见陷阱
陷阱一:阻塞事件循环
import asyncio
import time
async def badBlocking():
time.sleep(5)
async def goodNonBlocking():
await asyncio.sleep(5)
async def badSyncCall():
import requests
requests.get("https://example.com")
async def goodAsyncCall():
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get("https://example.com") as r:
return await r.text()
陷阱二:忘记 await
import asyncio
async def forgottenAwaitDemo():
async def getValue():
await asyncio.sleep(0.1)
return 42
result = getValue()
print(type(result)) # coroutine,不是 42!
correctResult = await getValue()
print(correctResult) # 42
陷阱三:在 async 函数中使用同步锁
import asyncio
import threading
async def wrongLock():
lock = threading.Lock()
with lock:
await asyncio.sleep(1)
async def rightLock():
lock = asyncio.Lock()
async with lock:
await asyncio.sleep(1)
生产部署
uvicorn 运行 ASGI 应用
from fastapi import FastAPI
import asyncio
import aiohttp
app = FastAPI()
@app.get("/api/data")
async def getData():
async with aiohttp.ClientSession() as session:
async with session.get("https://api.example.com/data") as response:
data = await response.json()
return {"result": data}
@app.get("/api/health")
async def healthCheck():
return {"status": "ok"}
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4
生产配置
uvicorn main:app \
--host 0.0.0.0 \
--port 8000 \
--workers 4 \
--loop uvloop \
--http httptools \
--access-log \
--log-level info
Docker 部署
{
"from": "python:3.12-slim",
"workdir": "/app",
"copy": ["requirements.txt .", ". ."],
"run": "pip install --no-cache-dir -r requirements.txt",
"cmd": "uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4"
}
常见问题 FAQ
Q1:asyncio 适合 CPU 密集型任务吗?
不适合。asyncio 是单线程的,CPU 密集型任务会阻塞事件循环。应使用 run_in_executor 配合进程池,或使用 multiprocessing。
Q2:asyncio 和 threading 怎么选?
| 场景 | 推荐 |
|---|---|
| 大量 I/O 并发(>1000) | asyncio |
| 少量 I/O 并发(<100) | threading |
| 需要共享状态 | asyncio(天然避免竞态) |
| 已有同步库无法替换 | threading + run_in_executor |
Q3:一个事件循环可以跑多个线程吗?
每个线程应有自己的事件循环。跨线程调度使用 asyncio.run_coroutine_threadsafe()。
Q4:asyncio.gather 和 TaskGroup 怎么选?
gather:需要return_exceptions=True或自定义等待策略时TaskGroup(3.11+):需要结构化并发、自动异常传播时,推荐优先使用
Q5:如何优雅关闭异步应用?
import asyncio
import signal
async def gracefulShutdown():
server = await asyncio.start_server(handle, "0.0.0.0", 8000)
loop = asyncio.get_running_loop()
stop = loop.create_future()
loop.add_signal_handler(signal.SIGTERM, stop.set_result, None)
loop.add_signal_handler(signal.SIGINT, stop.set_result, None)
async with server:
await stop
print("服务优雅关闭")
工具推荐
在 asyncio 异步编程实践中,以下 工具库 工具可以帮到你:
- JSON 格式化 — 格式化异步 API 返回的 JSON 数据,快速排查结构问题
- Base64 编码 — 处理异步请求中的 Base64 图片/文件数据
- Hash 计算 — 为异步缓存生成键名,实现请求去重
asyncio 是 Python 异步编程的基石。掌握事件循环、协程、并发模式和生产部署,你就能构建高性能的 I/O 密集型应用。
本站提供浏览器本地工具,免注册即可试用 →
#Python#asyncio#异步编程#协程#教程