Python asyncio 非同期プログラミング実践:原理から本番運用まで
编程语言
同期 vs 非同期:実行モデルの比較
非同期プログラミングを理解する第一歩は、同期と非同期の本質的な違いを把握することです。
| 次元 | 同期(Sync) | 非同期(Async) |
|---|---|---|
| 実行方式 | 順次ブロッキング、一つずつ | 並行非ブロッキング、I/O待機中に切り替え |
| スレッド占有 | 各I/Oがスレッド全体を占有 | 単一スレッドで複数I/Oを処理 |
| 適用シナリオ | CPU集約型 | I/O集約型(ネットワーク、ディスク) |
| リソース消費 | スレッドオーバーヘッド大(~8MB/スレッド) | コルーチンオーバーヘッド極小(~2KB/コルーチン) |
| プログラミング複雑さ | シンプルで直感的 | イベントループの理解が必要 |
import time
import asyncio
def fetchSync(name, delay):
print(f"[同期] 取得開始 {name}")
time.sleep(delay)
print(f"[同期] 完了 {name}")
return f"{name}_data"
async def fetchAsync(name, delay):
print(f"[非同期] 取得開始 {name}")
await asyncio.sleep(delay)
print(f"[非同期] 完了 {name}")
return f"{name}_data"
def runSync():
start = time.perf_counter()
fetchSync("API-A", 2)
fetchSync("API-B", 2)
fetchSync("API-C", 2)
print(f"同期合計時間: {time.perf_counter() - start:.2f}s")
async def runAsync():
start = time.perf_counter()
await asyncio.gather(
fetchAsync("API-A", 2),
fetchAsync("API-B", 2),
fetchAsync("API-C", 2),
)
print(f"非同期合計時間: {time.perf_counter() - start:.2f}s")
runSync()
asyncio.run(runAsync())
同期は6s、非同期は2s — 3つのI/Oが並行実行されます。
イベントループの内部動作
イベントループはasyncioの心臓であり、すべてのコルーチンの実行をスケジュールします。
コアメカニズム
┌─────────────────────────────────┐
│ Event Loop │
│ ┌───────┐ ┌───────┐ │
│ │ Task1 │ │ Task2 │ ... │
│ └───┬───┘ └───┬───┘ │
│ │ │ │
│ ┌───▼──────────▼───────────┐ │
│ │ Ready Queue (実行可能) │ │
│ └──────────────────────────┘ │
│ ┌──────────────────────────┐ │
│ │ I/O Poll (I/O完了待ち) │ │
│ └──────────────────────────┘ │
└─────────────────────────────────┘
手動イベントループ制御
import asyncio
async def task1():
print("Task1 開始")
await asyncio.sleep(1)
print("Task1 終了")
return "result1"
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
result = loop.run_until_complete(task1())
print(f"結果: {result}")
finally:
loop.close()
イベントループポリシー
import asyncio
print(asyncio.get_event_loop_policy())
loop = asyncio.get_running_loop()
print(f"実行中: {loop.is_running()}")
print(f"クローズ済み: {loop.is_closed()}")
async/await構文の深掘り
コルーチンの定義と呼び出し
async def coroutineFunc():
"""async def でコルーチン関数を定義"""
await asyncio.sleep(0.1)
return 42
coro = coroutineFunc()
print(type(coro)) # <class 'coroutine'>
result = await coroutineFunc()
awaitの本質
awaitは現在のコルーチンを一時停止し、制御をイベントループに返し、awaitableオブジェクトが完了するまで待ちます。
import asyncio
async def slowOperation():
print("低速操作を開始...")
result = await asyncio.sleep(2, result="完了")
print(f"低速操作の結果: {result}")
return result
async def main():
task = asyncio.create_task(slowOperation())
print(f"Task状態: {task.done()}")
await task
print(f"Task状態: {task.done()}")
asyncio.run(main())
コルーチンのライフサイクル
import asyncio
async def lifecycleDemo():
print("1. コルーチン作成(created)")
await asyncio.sleep(0)
print("2. 初回スケジュール(running)")
await asyncio.sleep(0.5)
print("3. awaitで一時停止(suspended)→ イベントループが他をスケジュール")
await asyncio.sleep(0)
print("4. 実行再開(running)")
return "5. 結果を返却(finished)"
async def main():
coro = lifecycleDemo()
print(f"型: {type(coro)}")
result = await coro
print(f"最終: {result}")
asyncio.run(main())
Task vs Future
Future:低レベルの結果コンテナ
import asyncio
async def futureDemo():
loop = asyncio.get_running_loop()
future = loop.create_future()
async def setValue():
await asyncio.sleep(1)
future.set_result("未来は今")
asyncio.create_task(setValue())
result = await future
print(f"Futureの結果: {result}")
asyncio.run(futureDemo())
Task:スケジュール可能なコルーチンラッパー
import asyncio
async def taskVsFuture():
async def work(name, seconds):
await asyncio.sleep(seconds)
return f"{name} 完了"
task = asyncio.create_task(work("TaskA", 1))
print(f"Task型: {type(task)}")
print(f"Futureのサブクラス: {issubclass(asyncio.Task, asyncio.Future)}")
print(f"Task名: {task.get_name()}")
print(f"完了状態: {task.done()}")
result = await task
print(f"結果: {result}, 完了状態: {task.done()}")
asyncio.run(taskVsFuture())
TaskGroup(Python 3.11+)
import asyncio
async def taskGroupDemo():
async def fetch(itemId):
await asyncio.sleep(0.5)
return f"item_{itemId}"
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch(i)) for i in range(5)]
results = [t.result() for t in tasks]
print(f"全結果: {results}")
asyncio.run(taskGroupDemo())
一般的な非同期並行パターン
asyncio.gather:並行結果収集
import asyncio
async def gatherDemo():
async def fetch(url, delay):
await asyncio.sleep(delay)
return f"Response from {url}"
results = await asyncio.gather(
fetch("api.example.com/users", 1),
fetch("api.example.com/posts", 2),
fetch("api.example.com/comments", 1.5),
)
print(f"全結果: {results}")
resultsWithErrors = await asyncio.gather(
fetch("ok.com", 0.5),
fetch("err.com", 0.5),
return_exceptions=True,
)
print(f"例外含む: {resultsWithErrors}")
asyncio.run(gatherDemo())
asyncio.wait:より柔軟な待機
import asyncio
async def waitDemo():
async def job(name, delay):
await asyncio.sleep(delay)
return name
tasks = {
asyncio.create_task(job("fast", 0.5)),
asyncio.create_task(job("medium", 1.0)),
asyncio.create_task(job("slow", 2.0)),
}
done, pending = await asyncio.wait(tasks, timeout=1.5)
print(f"完了: {[t.result() for t in done]}")
print(f"未完了: {len(pending)}個")
for t in pending:
t.cancel()
asyncio.run(waitDemo())
asyncio.as_completed:完了順で取得
import asyncio
async def asCompletedDemo():
async def job(name, delay):
await asyncio.sleep(delay)
return f"{name} ({delay}s)"
tasks = [
asyncio.create_task(job("slow", 3)),
asyncio.create_task(job("fast", 1)),
asyncio.create_task(job("medium", 2)),
]
for coro in asyncio.as_completed(tasks):
result = await coro
print(f"完了: {result}")
asyncio.run(asCompletedDemo())
非同期HTTPリクエスト
aiohttp
import asyncio
import aiohttp
async def aiohttpDemo():
async with aiohttp.ClientSession() as session:
async def fetchJson(url):
async with session.get(url) as response:
response.raise_for_status()
return await response.json()
results = await asyncio.gather(
fetchJson("https://api.github.com/users/python"),
fetchJson("https://api.github.com/repos/python/cpython"),
)
print(f"ユーザー: {results[0]['login']}")
print(f"リポジトリ: {results[1]['full_name']}")
asyncio.run(aiohttpDemo())
httpx(HTTP/2サポート)
import asyncio
import httpx
async def httpxDemo():
async with httpx.AsyncClient(http2=True) as client:
response = await client.get("https://httpbin.org/get")
print(f"ステータス: {response.status_code}")
print(f"プロトコル: {response.extensions.get('http_version')}")
tasks = [
client.get(f"https://httpbin.org/delay/{i}")
for i in range(1, 4)
]
responses = await asyncio.gather(*tasks)
print(f"並行{len(responses)}リクエスト全完了")
asyncio.run(httpxDemo())
非同期データベース操作
asyncpg(PostgreSQL)
import asyncio
import asyncpg
async def asyncpgDemo():
conn = await asyncpg.connect(
"postgresql://user:pass@localhost/mydb"
)
await conn.execute("""
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
name TEXT,
email TEXT
)
""")
await conn.execute(
"INSERT INTO users (name, email) VALUES ($1, $2)",
"田中", "tanaka@example.com"
)
rows = await conn.fetch("SELECT * FROM users")
for row in rows:
print(dict(row))
await conn.close()
asyncio.run(asyncpgDemo())
aiomysql(MySQL)
import asyncio
import aiomysql
async def aiomysqlDemo():
conn = await aiomysql.connect(
host="localhost", port=3306,
user="root", password="pass", db="mydb"
)
async with conn.cursor() as cur:
await cur.execute("SELECT * FROM products LIMIT 10")
rows = await cur.fetchall()
print(f"{len(rows)}件のレコード")
conn.close()
asyncio.run(aiomysqlDemo())
motor(MongoDB)
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient
async def motorDemo():
client = AsyncIOMotorClient("mongodb://localhost:27017")
db = client.mydb
collection = db.users
await collection.insert_one({"name": "鈴木", "age": 30})
cursor = collection.find({"age": {"$gte": 25}})
async for doc in cursor:
print(doc)
asyncio.run(motorDemo())
非同期ファイルI/O
aiofiles
import asyncio
import aiofiles
import json
async def aiofilesDemo():
async with aiofiles.open("data.json", mode="w") as f:
await f.write(json.dumps({"key": "value"}, ensure_ascii=False))
async with aiofiles.open("data.json", mode="r") as f:
content = await f.read()
data = json.loads(content)
print(f"読込: {data}")
async with aiofiles.open("large.log", mode="r") as f:
async for line in f:
if "ERROR" in line:
print(line.strip())
asyncio.run(aiofilesDemo())
プロデューサー・コンシューマーパターン
import asyncio
import random
async def producer(queue, producerId):
for i in range(10):
item = f"item-{producerId}-{i}"
await queue.put(item)
print(f"Producer{producerId} → {item}")
await asyncio.sleep(random.uniform(0.1, 0.5))
await queue.put(None)
async def consumer(queue, consumerId):
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
print(f"Consumer{consumerId} ← {item}")
await asyncio.sleep(random.uniform(0.2, 0.8))
queue.task_done()
async def producerConsumerDemo():
queue = asyncio.Queue(maxsize=5)
producers = [asyncio.create_task(producer(queue, i)) for i in range(2)]
consumers = [asyncio.create_task(consumer(queue, i)) for i in range(3)]
await asyncio.gather(*producers)
for _ in consumers:
await queue.put(None)
await asyncio.gather(*consumers)
await queue.join()
asyncio.run(producerConsumerDemo())
Semaphoreによるレート制限
import asyncio
async def rateLimitedRequests():
semaphore = asyncio.Semaphore(5)
async def fetch(url):
async with semaphore:
print(f"リクエスト: {url}")
await asyncio.sleep(1)
return f"response_{url}"
tasks = [fetch(f"url_{i}") for i in range(20)]
results = await asyncio.gather(*tasks)
print(f"{len(results)}リクエスト完了(最大5並行)")
asyncio.run(rateLimitedRequests())
エラー処理とキャンセル
例外処理
import asyncio
async def errorHandling():
async def riskyOp(name):
await asyncio.sleep(0.5)
if name == "bad":
raise ValueError(f"{name} でエラー")
return f"{name}_ok"
results = await asyncio.gather(
riskyOp("good1"),
riskyOp("bad"),
riskyOp("good2"),
return_exceptions=True,
)
for r in results:
if isinstance(r, Exception):
print(f"エラー: {r}")
else:
print(f"成功: {r}")
asyncio.run(errorHandling())
タスクキャンセル
import asyncio
async def cancellationDemo():
async def longRunning():
try:
print("長時間操作を開始...")
await asyncio.sleep(10)
print("これは実行されない")
except asyncio.CancelledError:
print("タスクがキャンセル、クリーンアップ実行")
raise
task = asyncio.create_task(longRunning())
await asyncio.sleep(1)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("タスクキャンセル確認")
asyncio.run(cancellationDemo())
タイムアウト制御
import asyncio
async def timeoutDemo():
async def slowTask():
await asyncio.sleep(5)
return "完了"
try:
result = await asyncio.wait_for(slowTask(), timeout=2)
except asyncio.TimeoutError:
print("タスクがタイムアウト!")
try:
async with asyncio.timeout(2):
await slowTask()
except TimeoutError:
print("asyncio.timeout タイムアウト!")
asyncio.run(timeoutDemo())
非同期コードのデバッグ
asyncio.runのデバッグモード
import asyncio
async def debugDemo():
async def forgottenAwait():
coro = asyncio.sleep(1)
print(f"未awaitのコルーチン: {coro}")
asyncio.run(forgottenAwait(), debug=True)
ロギング設定
import asyncio
import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("asyncio")
async def loggingDemo():
async def work():
logger.info("作業開始")
await asyncio.sleep(0.5)
logger.info("作業完了")
await work()
asyncio.run(loggingDemo(), debug=True)
aiomonitorリアルタイム監視
pip install aiomonitor
import asyncio
import aiomonitor
async def main():
async def backgroundTask():
while True:
await asyncio.sleep(1)
print("バックグラウンドタスク実行中...")
task = asyncio.create_task(backgroundTask())
await asyncio.sleep(10)
task.cancel()
with aiomonitor.start_monitor(loop=asyncio.new_event_loop()):
asyncio.run(main())
同期と非同期の混在
run_in_executor
import asyncio
import time
def syncCpuBound(n):
"""CPU集約型の同期関数"""
return sum(i * i for i in range(n))
async def runInExecutorDemo():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, syncCpuBound, 10**7)
print(f"デフォルトスレッドプール結果: {result}")
import concurrent.futures
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, syncCpuBound, 10**8)
print(f"プロセスプール結果: {result}")
asyncio.run(runInExecutorDemo())
to_thread(Python 3.9+)
import asyncio
def blockingIo():
import time
time.sleep(2)
return "ブロッキングI/O完了"
async def toThreadDemo():
result = await asyncio.to_thread(blockingIo)
print(result)
asyncio.run(toThreadDemo())
パフォーマンス比較:同期 vs 非同期
| シナリオ | 同期時間 | 非同期時間 | 高速化 |
|---|---|---|---|
| 100 HTTPリクエスト | ~100s | ~2s | 50x |
| 50 DBクエリ | ~5s | ~0.3s | 16x |
| 10 ファイル読み書き | ~1s | ~0.2s | 5x |
| CPU集約計算 | 10s | 10s+ | 1x(改善なし) |
| 混合 I/O+CPU | 15s | 5s | 3x |
import asyncio
import time
import aiohttp
async def benchmark():
urls = [f"https://httpbin.org/delay/0.5" for _ in range(50)]
async def fetchOne(session, url):
async with session.get(url) as r:
return r.status
start = time.perf_counter()
async with aiohttp.ClientSession() as session:
await asyncio.gather(*[fetchOne(session, u) for u in urls])
elapsed = time.perf_counter() - start
print(f"50リクエスト非同期時間: {elapsed:.2f}s")
asyncio.run(benchmark())
よくある落とし穴
落とし穴1:イベントループのブロック
import asyncio
import time
async def badBlocking():
time.sleep(5)
async def goodNonBlocking():
await asyncio.sleep(5)
async def badSyncCall():
import requests
requests.get("https://example.com")
async def goodAsyncCall():
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get("https://example.com") as r:
return await r.text()
落とし穴2:awaitの忘れ
import asyncio
async def forgottenAwaitDemo():
async def getValue():
await asyncio.sleep(0.1)
return 42
result = getValue()
print(type(result)) # coroutine、42ではない!
correctResult = await getValue()
print(correctResult) # 42
落とし穴3:async関数での同期ロック使用
import asyncio
import threading
async def wrongLock():
lock = threading.Lock()
with lock:
await asyncio.sleep(1)
async def rightLock():
lock = asyncio.Lock()
async with lock:
await asyncio.sleep(1)
本番デプロイ
uvicornでASGIアプリを実行
from fastapi import FastAPI
import asyncio
import aiohttp
app = FastAPI()
@app.get("/api/data")
async def getData():
async with aiohttp.ClientSession() as session:
async with session.get("https://api.example.com/data") as response:
data = await response.json()
return {"result": data}
@app.get("/api/health")
async def healthCheck():
return {"status": "ok"}
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4
本番設定
uvicorn main:app \
--host 0.0.0.0 \
--port 8000 \
--workers 4 \
--loop uvloop \
--http httptools \
--access-log \
--log-level info
Dockerデプロイ
{
"from": "python:3.12-slim",
"workdir": "/app",
"copy": ["requirements.txt .", ". ."],
"run": "pip install --no-cache-dir -r requirements.txt",
"cmd": "uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4"
}
FAQ
Q1:asyncioはCPU集約型タスクに適していますか?
いいえ。asyncioはシングルスレッドであり、CPU集約型タスクはイベントループをブロックします。run_in_executorとプロセスプール、またはmultiprocessingを使用してください。
Q2:asyncioかthreadingか?
| シナリオ | 推奨 |
|---|---|
| 高I/O並行(>1000) | asyncio |
| 低I/O並行(<100) | threading |
| 共有状態が必要 | asyncio(競合状態を自然に回避) |
| 既存の同期ライブラリ | threading + run_in_executor |
Q3:1つのイベントループを複数スレッドで実行できますか?
各スレッドには独自のイベントループが必要です。スレッド間スケジューリングにはasyncio.run_coroutine_threadsafe()を使用します。
Q4:asyncio.gatherかTaskGroupか?
gather:return_exceptions=Trueやカスタム待機戦略が必要な場合TaskGroup(3.11+):構造化並行性と自動例外伝播が必要な場合 — デフォルトとして推奨
Q5:非同期アプリをグレースフルシャットダウンするには?
import asyncio
import signal
async def gracefulShutdown():
server = await asyncio.start_server(handle, "0.0.0.0", 8000)
loop = asyncio.get_running_loop()
stop = loop.create_future()
loop.add_signal_handler(signal.SIGTERM, stop.set_result, None)
loop.add_signal_handler(signal.SIGINT, stop.set_result, None)
async with server:
await stop
print("サーバーをグレースフルにシャットダウン")
推奨ツール
asyncioプログラミングの実践で、以下のToolsKuツールが役立ちます:
- JSONフォーマッター — 非同期APIレスポンスのJSONデータをフォーマットし、構造問題を迅速に特定
- Base64エンコード — 非同期リクエスト内のBase64画像/ファイルデータを処理
- ハッシュ計算 — 非同期キャッシュのキーを生成し、リクエスト重複排除を実現
asyncioはPython非同期プログラミングの礎石です。イベントループ、コルーチン、並行パターン、本番デプロイをマスターすれば、高性能なI/O集約型アプリケーションを構築できます。
ブラウザローカルツールを無料で試す →
#Python#asyncio#异步编程#协程#教程