Python asyncio in Practice: From Principles to Production

编程语言

Sync vs Async: Execution Model Comparison

The first step to understanding async programming is grasping the fundamental difference between synchronous and asynchronous execution.

Dimension Synchronous (Sync) Asynchronous (Async)
Execution Sequential blocking, one after another Concurrent non-blocking, switches during I/O wait
Thread usage Each I/O occupies an entire thread Single thread handles multiple I/O
Best for CPU-bound tasks I/O-bound tasks (network, disk)
Resource cost High thread overhead (~8MB/thread) Minimal coroutine cost (~2KB/coroutine)
Complexity Simple and intuitive Requires understanding the event loop
import time
import asyncio

def fetchSync(name, delay):
    print(f"[Sync] Start fetching {name}")
    time.sleep(delay)
    print(f"[Sync] Done {name}")
    return f"{name}_data"

async def fetchAsync(name, delay):
    print(f"[Async] Start fetching {name}")
    await asyncio.sleep(delay)
    print(f"[Async] Done {name}")
    return f"{name}_data"

def runSync():
    start = time.perf_counter()
    fetchSync("API-A", 2)
    fetchSync("API-B", 2)
    fetchSync("API-C", 2)
    print(f"Sync total time: {time.perf_counter() - start:.2f}s")

async def runAsync():
    start = time.perf_counter()
    await asyncio.gather(
        fetchAsync("API-A", 2),
        fetchAsync("API-B", 2),
        fetchAsync("API-C", 2),
    )
    print(f"Async total time: {time.perf_counter() - start:.2f}s")

runSync()
asyncio.run(runAsync())

Sync takes ~6s, async takes ~2s — three I/O operations run concurrently.


Event Loop Internals

The event loop is the heart of asyncio, responsible for scheduling all coroutines.

Core Mechanism

┌─────────────────────────────────┐
│         Event Loop              │
│  ┌───────┐  ┌───────┐          │
│  │ Task1 │  │ Task2 │  ...     │
│  └───┬───┘  └───┬───┘          │
│      │          │               │
│  ┌───▼──────────▼───────────┐  │
│  │   Ready Queue (runnable) │  │
│  └──────────────────────────┘  │
│  ┌──────────────────────────┐  │
│  │   I/O Poll (waiting)     │  │
│  └──────────────────────────┘  │
└─────────────────────────────────┘

Manual Event Loop Control

import asyncio

async def task1():
    print("Task1 started")
    await asyncio.sleep(1)
    print("Task1 finished")
    return "result1"

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
    result = loop.run_until_complete(task1())
    print(f"Result: {result}")
finally:
    loop.close()

Event Loop Policies

import asyncio

print(asyncio.get_event_loop_policy())
loop = asyncio.get_running_loop()
print(f"Running: {loop.is_running()}")
print(f"Closed: {loop.is_closed()}")

async/await Syntax Deep Dive

Coroutine Definition and Invocation

async def coroutineFunc():
    """async def defines a coroutine function"""
    await asyncio.sleep(0.1)
    return 42

coro = coroutineFunc()
print(type(coro))  # <class 'coroutine'>

result = await coroutineFunc()

The Essence of await

await suspends the current coroutine, returning control to the event loop until the awaitable completes.

import asyncio

async def slowOperation():
    print("Starting slow operation...")
    result = await asyncio.sleep(2, result="Done")
    print(f"Slow operation result: {result}")
    return result

async def main():
    task = asyncio.create_task(slowOperation())
    print(f"Task done: {task.done()}")
    await task
    print(f"Task done: {task.done()}")

asyncio.run(main())

Coroutine Lifecycle

import asyncio

async def lifecycleDemo():
    print("1. Coroutine created")
    await asyncio.sleep(0)
    print("2. First scheduled (running)")
    await asyncio.sleep(0.5)
    print("3. Suspended by await → event loop schedules others")
    await asyncio.sleep(0)
    print("4. Resumed (running)")
    return "5. Returned result (finished)"

async def main():
    coro = lifecycleDemo()
    print(f"Type: {type(coro)}")
    result = await coro
    print(f"Final: {result}")

asyncio.run(main())

Task vs Future

Future: Low-Level Result Container

import asyncio

async def futureDemo():
    loop = asyncio.get_running_loop()
    future = loop.create_future()

    async def setValue():
        await asyncio.sleep(1)
        future.set_result("The future is now")

    asyncio.create_task(setValue())
    result = await future
    print(f"Future result: {result}")

asyncio.run(futureDemo())

Task: Schedulable Coroutine Wrapper

import asyncio

async def taskVsFuture():
    async def work(name, seconds):
        await asyncio.sleep(seconds)
        return f"{name} done"

    task = asyncio.create_task(work("TaskA", 1))
    print(f"Task type: {type(task)}")
    print(f"Is Future subclass: {issubclass(asyncio.Task, asyncio.Future)}")
    print(f"Task name: {task.get_name()}")
    print(f"Done: {task.done()}")

    result = await task
    print(f"Result: {result}, Done: {task.done()}")

asyncio.run(taskVsFuture())

TaskGroup (Python 3.11+)

import asyncio

async def taskGroupDemo():
    async def fetch(itemId):
        await asyncio.sleep(0.5)
        return f"item_{itemId}"

    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(fetch(i)) for i in range(5)]

    results = [t.result() for t in tasks]
    print(f"All results: {results}")

asyncio.run(taskGroupDemo())

Common Async Concurrency Patterns

asyncio.gather: Concurrent Result Collection

import asyncio

async def gatherDemo():
    async def fetch(url, delay):
        await asyncio.sleep(delay)
        return f"Response from {url}"

    results = await asyncio.gather(
        fetch("api.example.com/users", 1),
        fetch("api.example.com/posts", 2),
        fetch("api.example.com/comments", 1.5),
    )
    print(f"All results: {results}")

    resultsWithErrors = await asyncio.gather(
        fetch("ok.com", 0.5),
        fetch("err.com", 0.5),
        return_exceptions=True,
    )
    print(f"With exceptions: {resultsWithErrors}")

asyncio.run(gatherDemo())

asyncio.wait: More Flexible Waiting

import asyncio

async def waitDemo():
    async def job(name, delay):
        await asyncio.sleep(delay)
        return name

    tasks = {
        asyncio.create_task(job("fast", 0.5)),
        asyncio.create_task(job("medium", 1.0)),
        asyncio.create_task(job("slow", 2.0)),
    }

    done, pending = await asyncio.wait(tasks, timeout=1.5)
    print(f"Completed: {[t.result() for t in done]}")
    print(f"Pending: {len(pending)} tasks")

    for t in pending:
        t.cancel()

asyncio.run(waitDemo())

asyncio.as_completed: Get Results in Completion Order

import asyncio

async def asCompletedDemo():
    async def job(name, delay):
        await asyncio.sleep(delay)
        return f"{name} (took {delay}s)"

    tasks = [
        asyncio.create_task(job("slow", 3)),
        asyncio.create_task(job("fast", 1)),
        asyncio.create_task(job("medium", 2)),
    ]

    for coro in asyncio.as_completed(tasks):
        result = await coro
        print(f"Done: {result}")

asyncio.run(asCompletedDemo())

Async HTTP Requests

aiohttp

import asyncio
import aiohttp

async def aiohttpDemo():
    async with aiohttp.ClientSession() as session:
        async def fetchJson(url):
            async with session.get(url) as response:
                response.raise_for_status()
                return await response.json()

        results = await asyncio.gather(
            fetchJson("https://api.github.com/users/python"),
            fetchJson("https://api.github.com/repos/python/cpython"),
        )
        print(f"User: {results[0]['login']}")
        print(f"Repo: {results[1]['full_name']}")

asyncio.run(aiohttpDemo())

httpx (with HTTP/2 support)

import asyncio
import httpx

async def httpxDemo():
    async with httpx.AsyncClient(http2=True) as client:
        response = await client.get("https://httpbin.org/get")
        print(f"Status: {response.status_code}")
        print(f"Protocol: {response.extensions.get('http_version')}")

        tasks = [
            client.get(f"https://httpbin.org/delay/{i}")
            for i in range(1, 4)
        ]
        responses = await asyncio.gather(*tasks)
        print(f"All {len(responses)} concurrent requests completed")

asyncio.run(httpxDemo())

Async Database Operations

asyncpg (PostgreSQL)

import asyncio
import asyncpg

async def asyncpgDemo():
    conn = await asyncpg.connect(
        "postgresql://user:pass@localhost/mydb"
    )

    await conn.execute("""
        CREATE TABLE IF NOT EXISTS users (
            id SERIAL PRIMARY KEY,
            name TEXT,
            email TEXT
        )
    """)

    await conn.execute(
        "INSERT INTO users (name, email) VALUES ($1, $2)",
        "Alice", "alice@example.com"
    )

    rows = await conn.fetch("SELECT * FROM users")
    for row in rows:
        print(dict(row))

    await conn.close()

asyncio.run(asyncpgDemo())

aiomysql (MySQL)

import asyncio
import aiomysql

async def aiomysqlDemo():
    conn = await aiomysql.connect(
        host="localhost", port=3306,
        user="root", password="pass", db="mydb"
    )

    async with conn.cursor() as cur:
        await cur.execute("SELECT * FROM products LIMIT 10")
        rows = await cur.fetchall()
        print(f"Found {len(rows)} records")

    conn.close()

asyncio.run(aiomysqlDemo())

motor (MongoDB)

import asyncio
from motor.motor_asyncio import AsyncIOMotorClient

async def motorDemo():
    client = AsyncIOMotorClient("mongodb://localhost:27017")
    db = client.mydb
    collection = db.users

    await collection.insert_one({"name": "Bob", "age": 30})

    cursor = collection.find({"age": {"$gte": 25}})
    async for doc in cursor:
        print(doc)

asyncio.run(motorDemo())

Async File I/O

aiofiles

import asyncio
import aiofiles
import json

async def aiofilesDemo():
    async with aiofiles.open("data.json", mode="w") as f:
        await f.write(json.dumps({"key": "value"}))

    async with aiofiles.open("data.json", mode="r") as f:
        content = await f.read()
        data = json.loads(content)
        print(f"Read: {data}")

    async with aiofiles.open("large.log", mode="r") as f:
        async for line in f:
            if "ERROR" in line:
                print(line.strip())

asyncio.run(aiofilesDemo())

Producer-Consumer Pattern

import asyncio
import random

async def producer(queue, producerId):
    for i in range(10):
        item = f"item-{producerId}-{i}"
        await queue.put(item)
        print(f"Producer{producerId} → {item}")
        await asyncio.sleep(random.uniform(0.1, 0.5))
    await queue.put(None)

async def consumer(queue, consumerId):
    while True:
        item = await queue.get()
        if item is None:
            queue.task_done()
            break
        print(f"Consumer{consumerId} ← {item}")
        await asyncio.sleep(random.uniform(0.2, 0.8))
        queue.task_done()

async def producerConsumerDemo():
    queue = asyncio.Queue(maxsize=5)

    producers = [asyncio.create_task(producer(queue, i)) for i in range(2)]
    consumers = [asyncio.create_task(consumer(queue, i)) for i in range(3)]

    await asyncio.gather(*producers)
    for _ in consumers:
        await queue.put(None)
    await asyncio.gather(*consumers)
    await queue.join()

asyncio.run(producerConsumerDemo())

Rate Limiting with Semaphore

import asyncio

async def rateLimitedRequests():
    semaphore = asyncio.Semaphore(5)

    async def fetch(url):
        async with semaphore:
            print(f"Requesting: {url}")
            await asyncio.sleep(1)
            return f"response_{url}"

    tasks = [fetch(f"url_{i}") for i in range(20)]
    results = await asyncio.gather(*tasks)
    print(f"Completed {len(results)} requests (max 5 concurrent)")

asyncio.run(rateLimitedRequests())

Error Handling and Cancellation

Exception Handling

import asyncio

async def errorHandling():
    async def riskyOp(name):
        await asyncio.sleep(0.5)
        if name == "bad":
            raise ValueError(f"{name} failed")
        return f"{name}_ok"

    results = await asyncio.gather(
        riskyOp("good1"),
        riskyOp("bad"),
        riskyOp("good2"),
        return_exceptions=True,
    )
    for r in results:
        if isinstance(r, Exception):
            print(f"Error: {r}")
        else:
            print(f"Success: {r}")

asyncio.run(errorHandling())

Task Cancellation

import asyncio

async def cancellationDemo():
    async def longRunning():
        try:
            print("Starting long operation...")
            await asyncio.sleep(10)
            print("This won't execute")
        except asyncio.CancelledError:
            print("Task cancelled, performing cleanup")
            raise

    task = asyncio.create_task(longRunning())
    await asyncio.sleep(1)
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("Confirmed task cancelled")

asyncio.run(cancellationDemo())

Timeout Control

import asyncio

async def timeoutDemo():
    async def slowTask():
        await asyncio.sleep(5)
        return "Done"

    try:
        result = await asyncio.wait_for(slowTask(), timeout=2)
    except asyncio.TimeoutError:
        print("Task timed out!")

    try:
        async with asyncio.timeout(2):
            await slowTask()
    except TimeoutError:
        print("asyncio.timeout timed out!")

asyncio.run(timeoutDemo())

Debugging Async Code

Debug Mode with asyncio.run

import asyncio

async def debugDemo():
    async def forgottenAwait():
        coro = asyncio.sleep(1)
        print(f"Unawaited coroutine: {coro}")

    asyncio.run(forgottenAwait(), debug=True)

Logging Configuration

import asyncio
import logging

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("asyncio")

async def loggingDemo():
    async def work():
        logger.info("Starting work")
        await asyncio.sleep(0.5)
        logger.info("Work completed")

    await work()

asyncio.run(loggingDemo(), debug=True)

aiomonitor Real-time Monitoring

pip install aiomonitor
import asyncio
import aiomonitor

async def main():
    async def backgroundTask():
        while True:
            await asyncio.sleep(1)
            print("Background task running...")

    task = asyncio.create_task(backgroundTask())
    await asyncio.sleep(10)
    task.cancel()

with aiomonitor.start_monitor(loop=asyncio.new_event_loop()):
    asyncio.run(main())

Mixing Sync and Async Code

run_in_executor

import asyncio
import time

def syncCpuBound(n):
    """CPU-bound synchronous function"""
    return sum(i * i for i in range(n))

async def runInExecutorDemo():
    loop = asyncio.get_running_loop()

    result = await loop.run_in_executor(None, syncCpuBound, 10**7)
    print(f"Default thread pool result: {result}")

    import concurrent.futures
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, syncCpuBound, 10**8)
        print(f"Process pool result: {result}")

asyncio.run(runInExecutorDemo())

to_thread (Python 3.9+)

import asyncio

def blockingIo():
    import time
    time.sleep(2)
    return "Blocking I/O done"

async def toThreadDemo():
    result = await asyncio.to_thread(blockingIo)
    print(result)

asyncio.run(toThreadDemo())

Performance Comparison: Sync vs Async

Scenario Sync Time Async Time Speedup
100 HTTP requests ~100s ~2s 50x
50 DB queries ~5s ~0.3s 16x
10 file reads/writes ~1s ~0.2s 5x
CPU-bound computation 10s 10s+ 1x (no gain)
Mixed I/O + CPU 15s 5s 3x
import asyncio
import time
import aiohttp

async def benchmark():
    urls = [f"https://httpbin.org/delay/0.5" for _ in range(50)]

    async def fetchOne(session, url):
        async with session.get(url) as r:
            return r.status

    start = time.perf_counter()
    async with aiohttp.ClientSession() as session:
        await asyncio.gather(*[fetchOne(session, u) for u in urls])
    elapsed = time.perf_counter() - start
    print(f"50 requests async time: {elapsed:.2f}s")

asyncio.run(benchmark())

Common Pitfalls

Pitfall 1: Blocking the Event Loop

import asyncio
import time

async def badBlocking():
    time.sleep(5)

async def goodNonBlocking():
    await asyncio.sleep(5)

async def badSyncCall():
    import requests
    requests.get("https://example.com")

async def goodAsyncCall():
    import aiohttp
    async with aiohttp.ClientSession() as session:
        async with session.get("https://example.com") as r:
            return await r.text()

Pitfall 2: Forgotten await

import asyncio

async def forgottenAwaitDemo():
    async def getValue():
        await asyncio.sleep(0.1)
        return 42

    result = getValue()
    print(type(result))  # coroutine, not 42!

    correctResult = await getValue()
    print(correctResult)  # 42

Pitfall 3: Using Sync Locks in Async Functions

import asyncio
import threading

async def wrongLock():
    lock = threading.Lock()
    with lock:
        await asyncio.sleep(1)

async def rightLock():
    lock = asyncio.Lock()
    async with lock:
        await asyncio.sleep(1)

Production Deployment

Running ASGI Apps with uvicorn

from fastapi import FastAPI
import asyncio
import aiohttp

app = FastAPI()

@app.get("/api/data")
async def getData():
    async with aiohttp.ClientSession() as session:
        async with session.get("https://api.example.com/data") as response:
            data = await response.json()
            return {"result": data}

@app.get("/api/health")
async def healthCheck():
    return {"status": "ok"}
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4

Production Configuration

uvicorn main:app \
  --host 0.0.0.0 \
  --port 8000 \
  --workers 4 \
  --loop uvloop \
  --http httptools \
  --access-log \
  --log-level info

Docker Deployment

{
  "from": "python:3.12-slim",
  "workdir": "/app",
  "copy": ["requirements.txt .", ". ."],
  "run": "pip install --no-cache-dir -r requirements.txt",
  "cmd": "uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4"
}

FAQ

Q1: Is asyncio suitable for CPU-bound tasks?

No. asyncio is single-threaded; CPU-bound tasks will block the event loop. Use run_in_executor with a process pool, or multiprocessing.

Q2: asyncio or threading?

Scenario Recommendation
High I/O concurrency (>1000) asyncio
Low I/O concurrency (<100) threading
Shared state needed asyncio (naturally avoids races)
Existing sync libraries threading + run_in_executor

Q3: Can one event loop run across multiple threads?

Each thread should have its own event loop. For cross-thread scheduling, use asyncio.run_coroutine_threadsafe().

Q4: asyncio.gather or TaskGroup?

  • gather: When you need return_exceptions=True or custom wait strategies
  • TaskGroup (3.11+): When you want structured concurrency with automatic exception propagation — recommended as the default

Q5: How to gracefully shut down an async application?

import asyncio
import signal

async def gracefulShutdown():
    server = await asyncio.start_server(handle, "0.0.0.0", 8000)

    loop = asyncio.get_running_loop()
    stop = loop.create_future()

    loop.add_signal_handler(signal.SIGTERM, stop.set_result, None)
    loop.add_signal_handler(signal.SIGINT, stop.set_result, None)

    async with server:
        await stop

    print("Server shut down gracefully")

These ToolsKu tools can help in asyncio programming practice:

  • JSON Formatter — Format JSON data from async API responses to quickly spot structural issues
  • Base64 Encode — Handle Base64 image/file data in async requests
  • Hash Calculator — Generate cache keys for async caching and request deduplication

asyncio is the cornerstone of Python async programming. Master the event loop, coroutines, concurrency patterns, and production deployment to build high-performance I/O-bound applications.

Try these browser-local tools — no sign-up required →

#Python#asyncio#异步编程#协程#教程