Python asyncio in Practice: From Principles to Production
Sync vs Async: Execution Model Comparison
The first step to understanding async programming is grasping the fundamental difference between synchronous and asynchronous execution.
| Dimension | Synchronous (Sync) | Asynchronous (Async) |
|---|---|---|
| Execution | Sequential blocking, one after another | Concurrent non-blocking, switches during I/O wait |
| Thread usage | Each I/O occupies an entire thread | Single thread handles multiple I/O |
| Best for | CPU-bound tasks | I/O-bound tasks (network, disk) |
| Resource cost | High thread overhead (~8MB/thread) | Minimal coroutine cost (~2KB/coroutine) |
| Complexity | Simple and intuitive | Requires understanding the event loop |
import time
import asyncio
def fetchSync(name, delay):
print(f"[Sync] Start fetching {name}")
time.sleep(delay)
print(f"[Sync] Done {name}")
return f"{name}_data"
async def fetchAsync(name, delay):
print(f"[Async] Start fetching {name}")
await asyncio.sleep(delay)
print(f"[Async] Done {name}")
return f"{name}_data"
def runSync():
start = time.perf_counter()
fetchSync("API-A", 2)
fetchSync("API-B", 2)
fetchSync("API-C", 2)
print(f"Sync total time: {time.perf_counter() - start:.2f}s")
async def runAsync():
start = time.perf_counter()
await asyncio.gather(
fetchAsync("API-A", 2),
fetchAsync("API-B", 2),
fetchAsync("API-C", 2),
)
print(f"Async total time: {time.perf_counter() - start:.2f}s")
runSync()
asyncio.run(runAsync())
Sync takes ~6s, async takes ~2s — three I/O operations run concurrently.
Event Loop Internals
The event loop is the heart of asyncio, responsible for scheduling all coroutines.
Core Mechanism
┌─────────────────────────────────┐
│ Event Loop │
│ ┌───────┐ ┌───────┐ │
│ │ Task1 │ │ Task2 │ ... │
│ └───┬───┘ └───┬───┘ │
│ │ │ │
│ ┌───▼──────────▼───────────┐ │
│ │ Ready Queue (runnable) │ │
│ └──────────────────────────┘ │
│ ┌──────────────────────────┐ │
│ │ I/O Poll (waiting) │ │
│ └──────────────────────────┘ │
└─────────────────────────────────┘
Manual Event Loop Control
import asyncio
async def task1():
print("Task1 started")
await asyncio.sleep(1)
print("Task1 finished")
return "result1"
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(task1())
print(f"Result: {result}")
finally:
loop.close()
Event Loop Policies
import asyncio
print(asyncio.get_event_loop_policy())
loop = asyncio.get_running_loop()
print(f"Running: {loop.is_running()}")
print(f"Closed: {loop.is_closed()}")
async/await Syntax Deep Dive
Coroutine Definition and Invocation
async def coroutineFunc():
"""async def defines a coroutine function"""
await asyncio.sleep(0.1)
return 42
coro = coroutineFunc()
print(type(coro)) # <class 'coroutine'>
result = await coroutineFunc()
The Essence of await
await suspends the current coroutine, returning control to the event loop until the awaitable completes.
import asyncio
async def slowOperation():
print("Starting slow operation...")
result = await asyncio.sleep(2, result="Done")
print(f"Slow operation result: {result}")
return result
async def main():
task = asyncio.create_task(slowOperation())
print(f"Task done: {task.done()}")
await task
print(f"Task done: {task.done()}")
asyncio.run(main())
Coroutine Lifecycle
import asyncio
async def lifecycleDemo():
print("1. Coroutine created")
await asyncio.sleep(0)
print("2. First scheduled (running)")
await asyncio.sleep(0.5)
print("3. Suspended by await → event loop schedules others")
await asyncio.sleep(0)
print("4. Resumed (running)")
return "5. Returned result (finished)"
async def main():
coro = lifecycleDemo()
print(f"Type: {type(coro)}")
result = await coro
print(f"Final: {result}")
asyncio.run(main())
Task vs Future
Future: Low-Level Result Container
import asyncio
async def futureDemo():
loop = asyncio.get_running_loop()
future = loop.create_future()
async def setValue():
await asyncio.sleep(1)
future.set_result("The future is now")
asyncio.create_task(setValue())
result = await future
print(f"Future result: {result}")
asyncio.run(futureDemo())
Task: Schedulable Coroutine Wrapper
import asyncio
async def taskVsFuture():
async def work(name, seconds):
await asyncio.sleep(seconds)
return f"{name} done"
task = asyncio.create_task(work("TaskA", 1))
print(f"Task type: {type(task)}")
print(f"Is Future subclass: {issubclass(asyncio.Task, asyncio.Future)}")
print(f"Task name: {task.get_name()}")
print(f"Done: {task.done()}")
result = await task
print(f"Result: {result}, Done: {task.done()}")
asyncio.run(taskVsFuture())
TaskGroup (Python 3.11+)
import asyncio
async def taskGroupDemo():
async def fetch(itemId):
await asyncio.sleep(0.5)
return f"item_{itemId}"
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch(i)) for i in range(5)]
results = [t.result() for t in tasks]
print(f"All results: {results}")
asyncio.run(taskGroupDemo())
Common Async Concurrency Patterns
asyncio.gather: Concurrent Result Collection
import asyncio
async def gatherDemo():
async def fetch(url, delay):
await asyncio.sleep(delay)
return f"Response from {url}"
results = await asyncio.gather(
fetch("api.example.com/users", 1),
fetch("api.example.com/posts", 2),
fetch("api.example.com/comments", 1.5),
)
print(f"All results: {results}")
resultsWithErrors = await asyncio.gather(
fetch("ok.com", 0.5),
fetch("err.com", 0.5),
return_exceptions=True,
)
print(f"With exceptions: {resultsWithErrors}")
asyncio.run(gatherDemo())
asyncio.wait: More Flexible Waiting
import asyncio
async def waitDemo():
async def job(name, delay):
await asyncio.sleep(delay)
return name
tasks = {
asyncio.create_task(job("fast", 0.5)),
asyncio.create_task(job("medium", 1.0)),
asyncio.create_task(job("slow", 2.0)),
}
done, pending = await asyncio.wait(tasks, timeout=1.5)
print(f"Completed: {[t.result() for t in done]}")
print(f"Pending: {len(pending)} tasks")
for t in pending:
t.cancel()
asyncio.run(waitDemo())
asyncio.as_completed: Get Results in Completion Order
import asyncio
async def asCompletedDemo():
async def job(name, delay):
await asyncio.sleep(delay)
return f"{name} (took {delay}s)"
tasks = [
asyncio.create_task(job("slow", 3)),
asyncio.create_task(job("fast", 1)),
asyncio.create_task(job("medium", 2)),
]
for coro in asyncio.as_completed(tasks):
result = await coro
print(f"Done: {result}")
asyncio.run(asCompletedDemo())
Async HTTP Requests
aiohttp
import asyncio
import aiohttp
async def aiohttpDemo():
async with aiohttp.ClientSession() as session:
async def fetchJson(url):
async with session.get(url) as response:
response.raise_for_status()
return await response.json()
results = await asyncio.gather(
fetchJson("https://api.github.com/users/python"),
fetchJson("https://api.github.com/repos/python/cpython"),
)
print(f"User: {results[0]['login']}")
print(f"Repo: {results[1]['full_name']}")
asyncio.run(aiohttpDemo())
httpx (with HTTP/2 support)
import asyncio
import httpx
async def httpxDemo():
async with httpx.AsyncClient(http2=True) as client:
response = await client.get("https://httpbin.org/get")
print(f"Status: {response.status_code}")
print(f"Protocol: {response.extensions.get('http_version')}")
tasks = [
client.get(f"https://httpbin.org/delay/{i}")
for i in range(1, 4)
]
responses = await asyncio.gather(*tasks)
print(f"All {len(responses)} concurrent requests completed")
asyncio.run(httpxDemo())
Async Database Operations
asyncpg (PostgreSQL)
import asyncio
import asyncpg
async def asyncpgDemo():
conn = await asyncpg.connect(
"postgresql://user:pass@localhost/mydb"
)
await conn.execute("""
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
name TEXT,
email TEXT
)
""")
await conn.execute(
"INSERT INTO users (name, email) VALUES ($1, $2)",
"Alice", "alice@example.com"
)
rows = await conn.fetch("SELECT * FROM users")
for row in rows:
print(dict(row))
await conn.close()
asyncio.run(asyncpgDemo())
aiomysql (MySQL)
import asyncio
import aiomysql
async def aiomysqlDemo():
conn = await aiomysql.connect(
host="localhost", port=3306,
user="root", password="pass", db="mydb"
)
async with conn.cursor() as cur:
await cur.execute("SELECT * FROM products LIMIT 10")
rows = await cur.fetchall()
print(f"Found {len(rows)} records")
conn.close()
asyncio.run(aiomysqlDemo())
motor (MongoDB)
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient
async def motorDemo():
client = AsyncIOMotorClient("mongodb://localhost:27017")
db = client.mydb
collection = db.users
await collection.insert_one({"name": "Bob", "age": 30})
cursor = collection.find({"age": {"$gte": 25}})
async for doc in cursor:
print(doc)
asyncio.run(motorDemo())
Async File I/O
aiofiles
import asyncio
import aiofiles
import json
async def aiofilesDemo():
async with aiofiles.open("data.json", mode="w") as f:
await f.write(json.dumps({"key": "value"}))
async with aiofiles.open("data.json", mode="r") as f:
content = await f.read()
data = json.loads(content)
print(f"Read: {data}")
async with aiofiles.open("large.log", mode="r") as f:
async for line in f:
if "ERROR" in line:
print(line.strip())
asyncio.run(aiofilesDemo())
Producer-Consumer Pattern
import asyncio
import random
async def producer(queue, producerId):
for i in range(10):
item = f"item-{producerId}-{i}"
await queue.put(item)
print(f"Producer{producerId} → {item}")
await asyncio.sleep(random.uniform(0.1, 0.5))
await queue.put(None)
async def consumer(queue, consumerId):
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
print(f"Consumer{consumerId} ← {item}")
await asyncio.sleep(random.uniform(0.2, 0.8))
queue.task_done()
async def producerConsumerDemo():
queue = asyncio.Queue(maxsize=5)
producers = [asyncio.create_task(producer(queue, i)) for i in range(2)]
consumers = [asyncio.create_task(consumer(queue, i)) for i in range(3)]
await asyncio.gather(*producers)
for _ in consumers:
await queue.put(None)
await asyncio.gather(*consumers)
await queue.join()
asyncio.run(producerConsumerDemo())
Rate Limiting with Semaphore
import asyncio
async def rateLimitedRequests():
semaphore = asyncio.Semaphore(5)
async def fetch(url):
async with semaphore:
print(f"Requesting: {url}")
await asyncio.sleep(1)
return f"response_{url}"
tasks = [fetch(f"url_{i}") for i in range(20)]
results = await asyncio.gather(*tasks)
print(f"Completed {len(results)} requests (max 5 concurrent)")
asyncio.run(rateLimitedRequests())
Error Handling and Cancellation
Exception Handling
import asyncio
async def errorHandling():
async def riskyOp(name):
await asyncio.sleep(0.5)
if name == "bad":
raise ValueError(f"{name} failed")
return f"{name}_ok"
results = await asyncio.gather(
riskyOp("good1"),
riskyOp("bad"),
riskyOp("good2"),
return_exceptions=True,
)
for r in results:
if isinstance(r, Exception):
print(f"Error: {r}")
else:
print(f"Success: {r}")
asyncio.run(errorHandling())
Task Cancellation
import asyncio
async def cancellationDemo():
async def longRunning():
try:
print("Starting long operation...")
await asyncio.sleep(10)
print("This won't execute")
except asyncio.CancelledError:
print("Task cancelled, performing cleanup")
raise
task = asyncio.create_task(longRunning())
await asyncio.sleep(1)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Confirmed task cancelled")
asyncio.run(cancellationDemo())
Timeout Control
import asyncio
async def timeoutDemo():
async def slowTask():
await asyncio.sleep(5)
return "Done"
try:
result = await asyncio.wait_for(slowTask(), timeout=2)
except asyncio.TimeoutError:
print("Task timed out!")
try:
async with asyncio.timeout(2):
await slowTask()
except TimeoutError:
print("asyncio.timeout timed out!")
asyncio.run(timeoutDemo())
Debugging Async Code
Debug Mode with asyncio.run
import asyncio
async def debugDemo():
async def forgottenAwait():
coro = asyncio.sleep(1)
print(f"Unawaited coroutine: {coro}")
asyncio.run(forgottenAwait(), debug=True)
Logging Configuration
import asyncio
import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("asyncio")
async def loggingDemo():
async def work():
logger.info("Starting work")
await asyncio.sleep(0.5)
logger.info("Work completed")
await work()
asyncio.run(loggingDemo(), debug=True)
aiomonitor Real-time Monitoring
pip install aiomonitor
import asyncio
import aiomonitor
async def main():
async def backgroundTask():
while True:
await asyncio.sleep(1)
print("Background task running...")
task = asyncio.create_task(backgroundTask())
await asyncio.sleep(10)
task.cancel()
with aiomonitor.start_monitor(loop=asyncio.new_event_loop()):
asyncio.run(main())
Mixing Sync and Async Code
run_in_executor
import asyncio
import time
def syncCpuBound(n):
"""CPU-bound synchronous function"""
return sum(i * i for i in range(n))
async def runInExecutorDemo():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, syncCpuBound, 10**7)
print(f"Default thread pool result: {result}")
import concurrent.futures
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, syncCpuBound, 10**8)
print(f"Process pool result: {result}")
asyncio.run(runInExecutorDemo())
to_thread (Python 3.9+)
import asyncio
def blockingIo():
import time
time.sleep(2)
return "Blocking I/O done"
async def toThreadDemo():
result = await asyncio.to_thread(blockingIo)
print(result)
asyncio.run(toThreadDemo())
Performance Comparison: Sync vs Async
| Scenario | Sync Time | Async Time | Speedup |
|---|---|---|---|
| 100 HTTP requests | ~100s | ~2s | 50x |
| 50 DB queries | ~5s | ~0.3s | 16x |
| 10 file reads/writes | ~1s | ~0.2s | 5x |
| CPU-bound computation | 10s | 10s+ | 1x (no gain) |
| Mixed I/O + CPU | 15s | 5s | 3x |
import asyncio
import time
import aiohttp
async def benchmark():
urls = [f"https://httpbin.org/delay/0.5" for _ in range(50)]
async def fetchOne(session, url):
async with session.get(url) as r:
return r.status
start = time.perf_counter()
async with aiohttp.ClientSession() as session:
await asyncio.gather(*[fetchOne(session, u) for u in urls])
elapsed = time.perf_counter() - start
print(f"50 requests async time: {elapsed:.2f}s")
asyncio.run(benchmark())
Common Pitfalls
Pitfall 1: Blocking the Event Loop
import asyncio
import time
async def badBlocking():
time.sleep(5)
async def goodNonBlocking():
await asyncio.sleep(5)
async def badSyncCall():
import requests
requests.get("https://example.com")
async def goodAsyncCall():
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get("https://example.com") as r:
return await r.text()
Pitfall 2: Forgotten await
import asyncio
async def forgottenAwaitDemo():
async def getValue():
await asyncio.sleep(0.1)
return 42
result = getValue()
print(type(result)) # coroutine, not 42!
correctResult = await getValue()
print(correctResult) # 42
Pitfall 3: Using Sync Locks in Async Functions
import asyncio
import threading
async def wrongLock():
lock = threading.Lock()
with lock:
await asyncio.sleep(1)
async def rightLock():
lock = asyncio.Lock()
async with lock:
await asyncio.sleep(1)
Production Deployment
Running ASGI Apps with uvicorn
from fastapi import FastAPI
import asyncio
import aiohttp
app = FastAPI()
@app.get("/api/data")
async def getData():
async with aiohttp.ClientSession() as session:
async with session.get("https://api.example.com/data") as response:
data = await response.json()
return {"result": data}
@app.get("/api/health")
async def healthCheck():
return {"status": "ok"}
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4
Production Configuration
uvicorn main:app \
--host 0.0.0.0 \
--port 8000 \
--workers 4 \
--loop uvloop \
--http httptools \
--access-log \
--log-level info
Docker Deployment
{
"from": "python:3.12-slim",
"workdir": "/app",
"copy": ["requirements.txt .", ". ."],
"run": "pip install --no-cache-dir -r requirements.txt",
"cmd": "uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4"
}
FAQ
Q1: Is asyncio suitable for CPU-bound tasks?
No. asyncio is single-threaded; CPU-bound tasks will block the event loop. Use run_in_executor with a process pool, or multiprocessing.
Q2: asyncio or threading?
| Scenario | Recommendation |
|---|---|
| High I/O concurrency (>1000) | asyncio |
| Low I/O concurrency (<100) | threading |
| Shared state needed | asyncio (naturally avoids races) |
| Existing sync libraries | threading + run_in_executor |
Q3: Can one event loop run across multiple threads?
Each thread should have its own event loop. For cross-thread scheduling, use asyncio.run_coroutine_threadsafe().
Q4: asyncio.gather or TaskGroup?
gather: When you needreturn_exceptions=Trueor custom wait strategiesTaskGroup(3.11+): When you want structured concurrency with automatic exception propagation — recommended as the default
Q5: How to gracefully shut down an async application?
import asyncio
import signal
async def gracefulShutdown():
server = await asyncio.start_server(handle, "0.0.0.0", 8000)
loop = asyncio.get_running_loop()
stop = loop.create_future()
loop.add_signal_handler(signal.SIGTERM, stop.set_result, None)
loop.add_signal_handler(signal.SIGINT, stop.set_result, None)
async with server:
await stop
print("Server shut down gracefully")
Recommended Tools
These ToolsKu tools can help in asyncio programming practice:
- JSON Formatter — Format JSON data from async API responses to quickly spot structural issues
- Base64 Encode — Handle Base64 image/file data in async requests
- Hash Calculator — Generate cache keys for async caching and request deduplication
asyncio is the cornerstone of Python async programming. Master the event loop, coroutines, concurrency patterns, and production deployment to build high-performance I/O-bound applications.
Try these browser-local tools — no sign-up required →