Python asyncio 非同步程式設計實戰:從原理到生產

编程语言

同步 vs 非同步:執行模型對比

理解非同步程式設計的第一步是搞清楚同步與非同步的本質區別。

維度 同步(Sync) 非同步(Async)
執行方式 順序阻塞,一個接一個 並發非阻塞,I/O 等待時切換
執行緒佔用 每個 I/O 佔用整個執行緒 單執行緒處理多個 I/O
適用場景 CPU 密集型 I/O 密集型(網路、磁碟)
資源消耗 執行緒開銷大(~8MB/執行緒) 協程開銷極小(~2KB/協程)
程式設計複雜度 簡單直觀 需要理解事件迴圈
import time
import asyncio

def fetchSync(name, delay):
    print(f"[同步] 開始取得 {name}")
    time.sleep(delay)
    print(f"[同步] 完成 {name}")
    return f"{name}_data"

async def fetchAsync(name, delay):
    print(f"[非同步] 開始取得 {name}")
    await asyncio.sleep(delay)
    print(f"[非同步] 完成 {name}")
    return f"{name}_data"

def runSync():
    start = time.perf_counter()
    fetchSync("API-A", 2)
    fetchSync("API-B", 2)
    fetchSync("API-C", 2)
    print(f"同步總耗時: {time.perf_counter() - start:.2f}s")

async def runAsync():
    start = time.perf_counter()
    await asyncio.gather(
        fetchAsync("API-A", 2),
        fetchAsync("API-B", 2),
        fetchAsync("API-C", 2),
    )
    print(f"非同步總耗時: {time.perf_counter() - start:.2f}s")

runSync()
asyncio.run(runAsync())

同步耗時 ~6s,非同步耗時 ~2s——三個 I/O 並發執行。


事件迴圈(Event Loop)原理

事件迴圈是 asyncio 的心臟,負責排程所有協程的執行。

核心機制

┌─────────────────────────────────┐
│         Event Loop              │
│  ┌───────┐  ┌───────┐          │
│  │ Task1 │  │ Task2 │  ...     │
│  └───┬───┘  └───┬───┘          │
│      │          │               │
│  ┌───▼──────────▼───────────┐  │
│  │   Ready Queue (可執行)    │  │
│  └──────────────────────────┘  │
│  ┌──────────────────────────┐  │
│  │   I/O Poll (等待I/O完成)  │  │
│  └──────────────────────────┘  │
└─────────────────────────────────┘

手動控制事件迴圈

import asyncio

async def task1():
    print("Task1 開始")
    await asyncio.sleep(1)
    print("Task1 結束")
    return "result1"

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
    result = loop.run_until_complete(task1())
    print(f"結果: {result}")
finally:
    loop.close()

事件迴圈策略

import asyncio

print(asyncio.get_event_loop_policy())
loop = asyncio.get_running_loop()
print(f"正在執行: {loop.is_running()}")
print(f"已關閉: {loop.is_closed()}")

async/await 語法深入

協程定義與呼叫

async def coroutineFunc():
    """async def 定義協程函式"""
    await asyncio.sleep(0.1)
    return 42

coro = coroutineFunc()
print(type(coro))  # <class 'coroutine'>

result = await coroutineFunc()

await 的本質

await 掛起當前協程,將控制權交還事件迴圈,直到 awaitable 物件完成。

import asyncio

async def slowOperation():
    print("開始慢操作...")
    result = await asyncio.sleep(2, result="完成")
    print(f"慢操作結果: {result}")
    return result

async def main():
    task = asyncio.create_task(slowOperation())
    print(f"Task 狀態: {task.done()}")
    await task
    print(f"Task 狀態: {task.done()}")

asyncio.run(main())

協程生命週期

import asyncio

async def lifecycleDemo():
    print("1. 協程建立(created)")
    await asyncio.sleep(0)
    print("2. 首次被排程(running)")
    await asyncio.sleep(0.5)
    print("3. await 掛起(suspended)→ 事件迴圈排程其他協程")
    await asyncio.sleep(0)
    print("4. 恢復執行(running)")
    return "5. 回傳結果(finished)"

async def main():
    coro = lifecycleDemo()
    print(f"型別: {type(coro)}")
    result = await coro
    print(f"最終: {result}")

asyncio.run(main())

Task vs Future

Future:底層結果容器

import asyncio

async def futureDemo():
    loop = asyncio.get_running_loop()
    future = loop.create_future()

    async def setValue():
        await asyncio.sleep(1)
        future.set_result("未來已來")

    asyncio.create_task(setValue())
    result = await future
    print(f"Future 結果: {result}")

asyncio.run(futureDemo())

Task:可排程的協程包裝

import asyncio

async def taskVsFuture():
    async def work(name, seconds):
        await asyncio.sleep(seconds)
        return f"{name} 完成"

    task = asyncio.create_task(work("任務A", 1))
    print(f"Task 型別: {type(task)}")
    print(f"是 Future 子類別: {issubclass(asyncio.Task, asyncio.Future)}")
    print(f"Task 名稱: {task.get_name()}")
    print(f"完成狀態: {task.done()}")

    result = await task
    print(f"結果: {result}, 完成狀態: {task.done()}")

asyncio.run(taskVsFuture())

TaskGroup(Python 3.11+)

import asyncio

async def taskGroupDemo():
    async def fetch(itemId):
        await asyncio.sleep(0.5)
        return f"item_{itemId}"

    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(fetch(i)) for i in range(5)]

    results = [t.result() for t in tasks]
    print(f"所有結果: {results}")

asyncio.run(taskGroupDemo())

常用非同步並發模式

asyncio.gather:並發收集結果

import asyncio

async def gatherDemo():
    async def fetch(url, delay):
        await asyncio.sleep(delay)
        return f"Response from {url}"

    results = await asyncio.gather(
        fetch("api.example.com/users", 1),
        fetch("api.example.com/posts", 2),
        fetch("api.example.com/comments", 1.5),
    )
    print(f"全部結果: {results}")

    resultsWithErrors = await asyncio.gather(
        fetch("ok.com", 0.5),
        fetch("err.com", 0.5),
        return_exceptions=True,
    )
    print(f"含異常: {resultsWithErrors}")

asyncio.run(gatherDemo())

asyncio.wait:更彈性的等待

import asyncio

async def waitDemo():
    async def job(name, delay):
        await asyncio.sleep(delay)
        return name

    tasks = {
        asyncio.create_task(job("fast", 0.5)),
        asyncio.create_task(job("medium", 1.0)),
        asyncio.create_task(job("slow", 2.0)),
    }

    done, pending = await asyncio.wait(tasks, timeout=1.5)
    print(f"已完成: {[t.result() for t in done]}")
    print(f"未完成: {len(pending)} 個")

    for t in pending:
        t.cancel()

asyncio.run(waitDemo())

asyncio.as_completed:按完成順序取得

import asyncio

async def asCompletedDemo():
    async def job(name, delay):
        await asyncio.sleep(delay)
        return f"{name} (耗時{delay}s)"

    tasks = [
        asyncio.create_task(job("慢任務", 3)),
        asyncio.create_task(job("快任務", 1)),
        asyncio.create_task(job("中任務", 2)),
    ]

    for coro in asyncio.as_completed(tasks):
        result = await coro
        print(f"完成: {result}")

asyncio.run(asCompletedDemo())

非同步 HTTP 請求

aiohttp

import asyncio
import aiohttp

async def aiohttpDemo():
    async with aiohttp.ClientSession() as session:
        async def fetchJson(url):
            async with session.get(url) as response:
                response.raise_for_status()
                return await response.json()

        results = await asyncio.gather(
            fetchJson("https://api.github.com/users/python"),
            fetchJson("https://api.github.com/repos/python/cpython"),
        )
        print(f"使用者: {results[0]['login']}")
        print(f"倉庫: {results[1]['full_name']}")

asyncio.run(aiohttpDemo())

httpx(支援 HTTP/2)

import asyncio
import httpx

async def httpxDemo():
    async with httpx.AsyncClient(http2=True) as client:
        response = await client.get("https://httpbin.org/get")
        print(f"狀態碼: {response.status_code}")
        print(f"協定: {response.extensions.get('http_version')}")

        tasks = [
            client.get(f"https://httpbin.org/delay/{i}")
            for i in range(1, 4)
        ]
        responses = await asyncio.gather(*tasks)
        print(f"並發 {len(responses)} 個請求全部完成")

asyncio.run(httpxDemo())

非同步資料庫操作

asyncpg(PostgreSQL)

import asyncio
import asyncpg

async def asyncpgDemo():
    conn = await asyncpg.connect(
        "postgresql://user:pass@localhost/mydb"
    )

    await conn.execute("""
        CREATE TABLE IF NOT EXISTS users (
            id SERIAL PRIMARY KEY,
            name TEXT,
            email TEXT
        )
    """)

    await conn.execute(
        "INSERT INTO users (name, email) VALUES ($1, $2)",
        "張三", "zhang@example.com"
    )

    rows = await conn.fetch("SELECT * FROM users")
    for row in rows:
        print(dict(row))

    await conn.close()

asyncio.run(asyncpgDemo())

aiomysql(MySQL)

import asyncio
import aiomysql

async def aiomysqlDemo():
    conn = await aiomysql.connect(
        host="localhost", port=3306,
        user="root", password="pass", db="mydb"
    )

    async with conn.cursor() as cur:
        await cur.execute("SELECT * FROM products LIMIT 10")
        rows = await cur.fetchall()
        print(f"查詢到 {len(rows)} 筆記錄")

    conn.close()

asyncio.run(aiomysqlDemo())

motor(MongoDB)

import asyncio
from motor.motor_asyncio import AsyncIOMotorClient

async def motorDemo():
    client = AsyncIOMotorClient("mongodb://localhost:27017")
    db = client.mydb
    collection = db.users

    await collection.insert_one({"name": "李四", "age": 30})

    cursor = collection.find({"age": {"$gte": 25}})
    async for doc in cursor:
        print(doc)

asyncio.run(motorDemo())

非同步檔案 I/O

aiofiles

import asyncio
import aiofiles
import json

async def aiofilesDemo():
    async with aiofiles.open("data.json", mode="w") as f:
        await f.write(json.dumps({"key": "value"}, ensure_ascii=False))

    async with aiofiles.open("data.json", mode="r") as f:
        content = await f.read()
        data = json.loads(content)
        print(f"讀取: {data}")

    async with aiofiles.open("large.log", mode="r") as f:
        async for line in f:
            if "ERROR" in line:
                print(line.strip())

asyncio.run(aiofilesDemo())

生產者-消費者模式

import asyncio
import random

async def producer(queue, producerId):
    for i in range(10):
        item = f"item-{producerId}-{i}"
        await queue.put(item)
        print(f"生產者{producerId} → {item}")
        await asyncio.sleep(random.uniform(0.1, 0.5))
    await queue.put(None)

async def consumer(queue, consumerId):
    while True:
        item = await queue.get()
        if item is None:
            queue.task_done()
            break
        print(f"消費者{consumerId} ← {item}")
        await asyncio.sleep(random.uniform(0.2, 0.8))
        queue.task_done()

async def producerConsumerDemo():
    queue = asyncio.Queue(maxsize=5)

    producers = [asyncio.create_task(producer(queue, i)) for i in range(2)]
    consumers = [asyncio.create_task(consumer(queue, i)) for i in range(3)]

    await asyncio.gather(*producers)
    for _ in consumers:
        await queue.put(None)
    await asyncio.gather(*consumers)
    await queue.join()

asyncio.run(producerConsumerDemo())

Semaphore 限流

import asyncio

async def rateLimitedRequests():
    semaphore = asyncio.Semaphore(5)

    async def fetch(url):
        async with semaphore:
            print(f"請求: {url}")
            await asyncio.sleep(1)
            return f"response_{url}"

    tasks = [fetch(f"url_{i}") for i in range(20)]
    results = await asyncio.gather(*tasks)
    print(f"完成 {len(results)} 個請求(最多5個並發)")

asyncio.run(rateLimitedRequests())

錯誤處理與取消

異常處理

import asyncio

async def errorHandling():
    async def riskyOp(name):
        await asyncio.sleep(0.5)
        if name == "bad":
            raise ValueError(f"{name} 出錯了")
        return f"{name}_ok"

    results = await asyncio.gather(
        riskyOp("good1"),
        riskyOp("bad"),
        riskyOp("good2"),
        return_exceptions=True,
    )
    for r in results:
        if isinstance(r, Exception):
            print(f"異常: {r}")
        else:
            print(f"成功: {r}")

asyncio.run(errorHandling())

任務取消

import asyncio

async def cancellationDemo():
    async def longRunning():
        try:
            print("開始長時間執行...")
            await asyncio.sleep(10)
            print("這不會執行")
        except asyncio.CancelledError:
            print("任務被取消,執行清理")
            raise

    task = asyncio.create_task(longRunning())
    await asyncio.sleep(1)
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("確認任務已取消")

asyncio.run(cancellationDemo())

超時控制

import asyncio

async def timeoutDemo():
    async def slowTask():
        await asyncio.sleep(5)
        return "完成"

    try:
        result = await asyncio.wait_for(slowTask(), timeout=2)
    except asyncio.TimeoutError:
        print("任務超時!")

    try:
        async with asyncio.timeout(2):
            await slowTask()
    except TimeoutError:
        print("asyncio.timeout 超時!")

asyncio.run(timeoutDemo())

除錯非同步程式碼

asyncio.run 的除錯模式

import asyncio

async def debugDemo():
    async def forgottenAwait():
        coro = asyncio.sleep(1)
        print(f"未 await 的協程: {coro}")

    asyncio.run(forgottenAwait(), debug=True)

日誌設定

import asyncio
import logging

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("asyncio")

async def loggingDemo():
    async def work():
        logger.info("開始工作")
        await asyncio.sleep(0.5)
        logger.info("工作完成")

    await work()

asyncio.run(loggingDemo(), debug=True)

aiomonitor 即時監控

pip install aiomonitor
import asyncio
import aiomonitor

async def main():
    async def backgroundTask():
        while True:
            await asyncio.sleep(1)
            print("背景任務執行中...")

    task = asyncio.create_task(backgroundTask())
    await asyncio.sleep(10)
    task.cancel()

with aiomonitor.start_monitor(loop=asyncio.new_event_loop()):
    asyncio.run(main())

混合同步與非同步程式碼

run_in_executor

import asyncio
import time

def syncCpuBound(n):
    """CPU 密集型同步函式"""
    return sum(i * i for i in range(n))

async def runInExecutorDemo():
    loop = asyncio.get_running_loop()

    result = await loop.run_in_executor(None, syncCpuBound, 10**7)
    print(f"預設執行緒集區結果: {result}")

    import concurrent.futures
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, syncCpuBound, 10**8)
        print(f"行程集區結果: {result}")

asyncio.run(runInExecutorDemo())

to_thread(Python 3.9+)

import asyncio

def blockingIo():
    import time
    time.sleep(2)
    return "阻塞I/O完成"

async def toThreadDemo():
    result = await asyncio.to_thread(blockingIo)
    print(result)

asyncio.run(toThreadDemo())

效能對比:同步 vs 非同步

場景 同步耗時 非同步耗時 提升倍數
100 個 HTTP 請求 ~100s ~2s 50x
50 次資料庫查詢 ~5s ~0.3s 16x
10 個檔案讀寫 ~1s ~0.2s 5x
CPU 密集計算 10s 10s+ 1x(無提升)
混合 I/O+CPU 15s 5s 3x
import asyncio
import time
import aiohttp

async def benchmark():
    urls = [f"https://httpbin.org/delay/0.5" for _ in range(50)]

    async def fetchOne(session, url):
        async with session.get(url) as r:
            return r.status

    start = time.perf_counter()
    async with aiohttp.ClientSession() as session:
        await asyncio.gather(*[fetchOne(session, u) for u in urls])
    elapsed = time.perf_counter() - start
    print(f"50個請求非同步耗時: {elapsed:.2f}s")

asyncio.run(benchmark())

常見陷阱

陷阱一:阻塞事件迴圈

import asyncio
import time

async def badBlocking():
    time.sleep(5)

async def goodNonBlocking():
    await asyncio.sleep(5)

async def badSyncCall():
    import requests
    requests.get("https://example.com")

async def goodAsyncCall():
    import aiohttp
    async with aiohttp.ClientSession() as session:
        async with session.get("https://example.com") as r:
            return await r.text()

陷阱二:忘記 await

import asyncio

async def forgottenAwaitDemo():
    async def getValue():
        await asyncio.sleep(0.1)
        return 42

    result = getValue()
    print(type(result))  # coroutine,不是 42!

    correctResult = await getValue()
    print(correctResult)  # 42

陷阱三:在 async 函式中使用同步鎖

import asyncio
import threading

async def wrongLock():
    lock = threading.Lock()
    with lock:
        await asyncio.sleep(1)

async def rightLock():
    lock = asyncio.Lock()
    async with lock:
        await asyncio.sleep(1)

生產部署

uvicorn 執行 ASGI 應用

from fastapi import FastAPI
import asyncio
import aiohttp

app = FastAPI()

@app.get("/api/data")
async def getData():
    async with aiohttp.ClientSession() as session:
        async with session.get("https://api.example.com/data") as response:
            data = await response.json()
            return {"result": data}

@app.get("/api/health")
async def healthCheck():
    return {"status": "ok"}
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4

生產設定

uvicorn main:app \
  --host 0.0.0.0 \
  --port 8000 \
  --workers 4 \
  --loop uvloop \
  --http httptools \
  --access-log \
  --log-level info

Docker 部署

{
  "from": "python:3.12-slim",
  "workdir": "/app",
  "copy": ["requirements.txt .", ". ."],
  "run": "pip install --no-cache-dir -r requirements.txt",
  "cmd": "uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4"
}

常見問題 FAQ

Q1:asyncio 適合 CPU 密集型任務嗎?

不適合。asyncio 是單執行緒的,CPU 密集型任務會阻塞事件迴圈。應使用 run_in_executor 配合行程集區,或使用 multiprocessing

Q2:asyncio 和 threading 怎麼選?

場景 推薦
大量 I/O 並發(>1000) asyncio
少量 I/O 並發(<100) threading
需要共享狀態 asyncio(天然避免競態)
已有同步函式庫無法替換 threading + run_in_executor

Q3:一個事件迴圈可以跑多個執行緒嗎?

每個執行緒應有自己的事件迴圈。跨執行緒排程使用 asyncio.run_coroutine_threadsafe()

Q4:asyncio.gather 和 TaskGroup 怎麼選?

  • gather:需要 return_exceptions=True 或自訂等待策略時
  • TaskGroup(3.11+):需要結構化並發、自動異常傳播時,推薦優先使用

Q5:如何優雅關閉非同步應用?

import asyncio
import signal

async def gracefulShutdown():
    server = await asyncio.start_server(handle, "0.0.0.0", 8000)

    loop = asyncio.get_running_loop()
    stop = loop.create_future()

    loop.add_signal_handler(signal.SIGTERM, stop.set_result, None)
    loop.add_signal_handler(signal.SIGINT, stop.set_result, None)

    async with server:
        await stop

    print("服務優雅關閉")

工具推薦

在 asyncio 非同步程式設計實踐中,以下 工具庫 工具可以幫到你:

  • JSON 格式化 — 格式化非同步 API 回傳的 JSON 資料,快速排查結構問題
  • Base64 編碼 — 處理非同步請求中的 Base64 圖片/檔案資料
  • Hash 計算 — 為非同步快取產生鍵名,實現請求去重

asyncio 是 Python 非同步程式設計的基石。掌握事件迴圈、協程、並發模式和生產部署,你就能建構高效能的 I/O 密集型應用。

本站提供瀏覽器本地工具,免註冊即可試用 →

#Python#asyncio#异步编程#协程#教程