Python asyncio 非同期プログラミング実践:原理から本番運用まで

编程语言

同期 vs 非同期:実行モデルの比較

非同期プログラミングを理解する第一歩は、同期と非同期の本質的な違いを把握することです。

次元 同期(Sync) 非同期(Async)
実行方式 順次ブロッキング、一つずつ 並行非ブロッキング、I/O待機中に切り替え
スレッド占有 各I/Oがスレッド全体を占有 単一スレッドで複数I/Oを処理
適用シナリオ CPU集約型 I/O集約型(ネットワーク、ディスク)
リソース消費 スレッドオーバーヘッド大(~8MB/スレッド) コルーチンオーバーヘッド極小(~2KB/コルーチン)
プログラミング複雑さ シンプルで直感的 イベントループの理解が必要
import time
import asyncio

def fetchSync(name, delay):
    print(f"[同期] 取得開始 {name}")
    time.sleep(delay)
    print(f"[同期] 完了 {name}")
    return f"{name}_data"

async def fetchAsync(name, delay):
    print(f"[非同期] 取得開始 {name}")
    await asyncio.sleep(delay)
    print(f"[非同期] 完了 {name}")
    return f"{name}_data"

def runSync():
    start = time.perf_counter()
    fetchSync("API-A", 2)
    fetchSync("API-B", 2)
    fetchSync("API-C", 2)
    print(f"同期合計時間: {time.perf_counter() - start:.2f}s")

async def runAsync():
    start = time.perf_counter()
    await asyncio.gather(
        fetchAsync("API-A", 2),
        fetchAsync("API-B", 2),
        fetchAsync("API-C", 2),
    )
    print(f"非同期合計時間: {time.perf_counter() - start:.2f}s")

runSync()
asyncio.run(runAsync())

同期は6s、非同期は2s — 3つのI/Oが並行実行されます。


イベントループの内部動作

イベントループはasyncioの心臓であり、すべてのコルーチンの実行をスケジュールします。

コアメカニズム

┌─────────────────────────────────┐
│         Event Loop              │
│  ┌───────┐  ┌───────┐          │
│  │ Task1 │  │ Task2 │  ...     │
│  └───┬───┘  └───┬───┘          │
│      │          │               │
│  ┌───▼──────────▼───────────┐  │
│  │   Ready Queue (実行可能)  │  │
│  └──────────────────────────┘  │
│  ┌──────────────────────────┐  │
│  │   I/O Poll (I/O完了待ち) │  │
│  └──────────────────────────┘  │
└─────────────────────────────────┘

手動イベントループ制御

import asyncio

async def task1():
    print("Task1 開始")
    await asyncio.sleep(1)
    print("Task1 終了")
    return "result1"

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
    result = loop.run_until_complete(task1())
    print(f"結果: {result}")
finally:
    loop.close()

イベントループポリシー

import asyncio

print(asyncio.get_event_loop_policy())
loop = asyncio.get_running_loop()
print(f"実行中: {loop.is_running()}")
print(f"クローズ済み: {loop.is_closed()}")

async/await構文の深掘り

コルーチンの定義と呼び出し

async def coroutineFunc():
    """async def でコルーチン関数を定義"""
    await asyncio.sleep(0.1)
    return 42

coro = coroutineFunc()
print(type(coro))  # <class 'coroutine'>

result = await coroutineFunc()

awaitの本質

awaitは現在のコルーチンを一時停止し、制御をイベントループに返し、awaitableオブジェクトが完了するまで待ちます。

import asyncio

async def slowOperation():
    print("低速操作を開始...")
    result = await asyncio.sleep(2, result="完了")
    print(f"低速操作の結果: {result}")
    return result

async def main():
    task = asyncio.create_task(slowOperation())
    print(f"Task状態: {task.done()}")
    await task
    print(f"Task状態: {task.done()}")

asyncio.run(main())

コルーチンのライフサイクル

import asyncio

async def lifecycleDemo():
    print("1. コルーチン作成(created)")
    await asyncio.sleep(0)
    print("2. 初回スケジュール(running)")
    await asyncio.sleep(0.5)
    print("3. awaitで一時停止(suspended)→ イベントループが他をスケジュール")
    await asyncio.sleep(0)
    print("4. 実行再開(running)")
    return "5. 結果を返却(finished)"

async def main():
    coro = lifecycleDemo()
    print(f"型: {type(coro)}")
    result = await coro
    print(f"最終: {result}")

asyncio.run(main())

Task vs Future

Future:低レベルの結果コンテナ

import asyncio

async def futureDemo():
    loop = asyncio.get_running_loop()
    future = loop.create_future()

    async def setValue():
        await asyncio.sleep(1)
        future.set_result("未来は今")

    asyncio.create_task(setValue())
    result = await future
    print(f"Futureの結果: {result}")

asyncio.run(futureDemo())

Task:スケジュール可能なコルーチンラッパー

import asyncio

async def taskVsFuture():
    async def work(name, seconds):
        await asyncio.sleep(seconds)
        return f"{name} 完了"

    task = asyncio.create_task(work("TaskA", 1))
    print(f"Task型: {type(task)}")
    print(f"Futureのサブクラス: {issubclass(asyncio.Task, asyncio.Future)}")
    print(f"Task名: {task.get_name()}")
    print(f"完了状態: {task.done()}")

    result = await task
    print(f"結果: {result}, 完了状態: {task.done()}")

asyncio.run(taskVsFuture())

TaskGroup(Python 3.11+)

import asyncio

async def taskGroupDemo():
    async def fetch(itemId):
        await asyncio.sleep(0.5)
        return f"item_{itemId}"

    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(fetch(i)) for i in range(5)]

    results = [t.result() for t in tasks]
    print(f"全結果: {results}")

asyncio.run(taskGroupDemo())

一般的な非同期並行パターン

asyncio.gather:並行結果収集

import asyncio

async def gatherDemo():
    async def fetch(url, delay):
        await asyncio.sleep(delay)
        return f"Response from {url}"

    results = await asyncio.gather(
        fetch("api.example.com/users", 1),
        fetch("api.example.com/posts", 2),
        fetch("api.example.com/comments", 1.5),
    )
    print(f"全結果: {results}")

    resultsWithErrors = await asyncio.gather(
        fetch("ok.com", 0.5),
        fetch("err.com", 0.5),
        return_exceptions=True,
    )
    print(f"例外含む: {resultsWithErrors}")

asyncio.run(gatherDemo())

asyncio.wait:より柔軟な待機

import asyncio

async def waitDemo():
    async def job(name, delay):
        await asyncio.sleep(delay)
        return name

    tasks = {
        asyncio.create_task(job("fast", 0.5)),
        asyncio.create_task(job("medium", 1.0)),
        asyncio.create_task(job("slow", 2.0)),
    }

    done, pending = await asyncio.wait(tasks, timeout=1.5)
    print(f"完了: {[t.result() for t in done]}")
    print(f"未完了: {len(pending)}個")

    for t in pending:
        t.cancel()

asyncio.run(waitDemo())

asyncio.as_completed:完了順で取得

import asyncio

async def asCompletedDemo():
    async def job(name, delay):
        await asyncio.sleep(delay)
        return f"{name} ({delay}s)"

    tasks = [
        asyncio.create_task(job("slow", 3)),
        asyncio.create_task(job("fast", 1)),
        asyncio.create_task(job("medium", 2)),
    ]

    for coro in asyncio.as_completed(tasks):
        result = await coro
        print(f"完了: {result}")

asyncio.run(asCompletedDemo())

非同期HTTPリクエスト

aiohttp

import asyncio
import aiohttp

async def aiohttpDemo():
    async with aiohttp.ClientSession() as session:
        async def fetchJson(url):
            async with session.get(url) as response:
                response.raise_for_status()
                return await response.json()

        results = await asyncio.gather(
            fetchJson("https://api.github.com/users/python"),
            fetchJson("https://api.github.com/repos/python/cpython"),
        )
        print(f"ユーザー: {results[0]['login']}")
        print(f"リポジトリ: {results[1]['full_name']}")

asyncio.run(aiohttpDemo())

httpx(HTTP/2サポート)

import asyncio
import httpx

async def httpxDemo():
    async with httpx.AsyncClient(http2=True) as client:
        response = await client.get("https://httpbin.org/get")
        print(f"ステータス: {response.status_code}")
        print(f"プロトコル: {response.extensions.get('http_version')}")

        tasks = [
            client.get(f"https://httpbin.org/delay/{i}")
            for i in range(1, 4)
        ]
        responses = await asyncio.gather(*tasks)
        print(f"並行{len(responses)}リクエスト全完了")

asyncio.run(httpxDemo())

非同期データベース操作

asyncpg(PostgreSQL)

import asyncio
import asyncpg

async def asyncpgDemo():
    conn = await asyncpg.connect(
        "postgresql://user:pass@localhost/mydb"
    )

    await conn.execute("""
        CREATE TABLE IF NOT EXISTS users (
            id SERIAL PRIMARY KEY,
            name TEXT,
            email TEXT
        )
    """)

    await conn.execute(
        "INSERT INTO users (name, email) VALUES ($1, $2)",
        "田中", "tanaka@example.com"
    )

    rows = await conn.fetch("SELECT * FROM users")
    for row in rows:
        print(dict(row))

    await conn.close()

asyncio.run(asyncpgDemo())

aiomysql(MySQL)

import asyncio
import aiomysql

async def aiomysqlDemo():
    conn = await aiomysql.connect(
        host="localhost", port=3306,
        user="root", password="pass", db="mydb"
    )

    async with conn.cursor() as cur:
        await cur.execute("SELECT * FROM products LIMIT 10")
        rows = await cur.fetchall()
        print(f"{len(rows)}件のレコード")

    conn.close()

asyncio.run(aiomysqlDemo())

motor(MongoDB)

import asyncio
from motor.motor_asyncio import AsyncIOMotorClient

async def motorDemo():
    client = AsyncIOMotorClient("mongodb://localhost:27017")
    db = client.mydb
    collection = db.users

    await collection.insert_one({"name": "鈴木", "age": 30})

    cursor = collection.find({"age": {"$gte": 25}})
    async for doc in cursor:
        print(doc)

asyncio.run(motorDemo())

非同期ファイルI/O

aiofiles

import asyncio
import aiofiles
import json

async def aiofilesDemo():
    async with aiofiles.open("data.json", mode="w") as f:
        await f.write(json.dumps({"key": "value"}, ensure_ascii=False))

    async with aiofiles.open("data.json", mode="r") as f:
        content = await f.read()
        data = json.loads(content)
        print(f"読込: {data}")

    async with aiofiles.open("large.log", mode="r") as f:
        async for line in f:
            if "ERROR" in line:
                print(line.strip())

asyncio.run(aiofilesDemo())

プロデューサー・コンシューマーパターン

import asyncio
import random

async def producer(queue, producerId):
    for i in range(10):
        item = f"item-{producerId}-{i}"
        await queue.put(item)
        print(f"Producer{producerId} → {item}")
        await asyncio.sleep(random.uniform(0.1, 0.5))
    await queue.put(None)

async def consumer(queue, consumerId):
    while True:
        item = await queue.get()
        if item is None:
            queue.task_done()
            break
        print(f"Consumer{consumerId} ← {item}")
        await asyncio.sleep(random.uniform(0.2, 0.8))
        queue.task_done()

async def producerConsumerDemo():
    queue = asyncio.Queue(maxsize=5)

    producers = [asyncio.create_task(producer(queue, i)) for i in range(2)]
    consumers = [asyncio.create_task(consumer(queue, i)) for i in range(3)]

    await asyncio.gather(*producers)
    for _ in consumers:
        await queue.put(None)
    await asyncio.gather(*consumers)
    await queue.join()

asyncio.run(producerConsumerDemo())

Semaphoreによるレート制限

import asyncio

async def rateLimitedRequests():
    semaphore = asyncio.Semaphore(5)

    async def fetch(url):
        async with semaphore:
            print(f"リクエスト: {url}")
            await asyncio.sleep(1)
            return f"response_{url}"

    tasks = [fetch(f"url_{i}") for i in range(20)]
    results = await asyncio.gather(*tasks)
    print(f"{len(results)}リクエスト完了(最大5並行)")

asyncio.run(rateLimitedRequests())

エラー処理とキャンセル

例外処理

import asyncio

async def errorHandling():
    async def riskyOp(name):
        await asyncio.sleep(0.5)
        if name == "bad":
            raise ValueError(f"{name} でエラー")
        return f"{name}_ok"

    results = await asyncio.gather(
        riskyOp("good1"),
        riskyOp("bad"),
        riskyOp("good2"),
        return_exceptions=True,
    )
    for r in results:
        if isinstance(r, Exception):
            print(f"エラー: {r}")
        else:
            print(f"成功: {r}")

asyncio.run(errorHandling())

タスクキャンセル

import asyncio

async def cancellationDemo():
    async def longRunning():
        try:
            print("長時間操作を開始...")
            await asyncio.sleep(10)
            print("これは実行されない")
        except asyncio.CancelledError:
            print("タスクがキャンセル、クリーンアップ実行")
            raise

    task = asyncio.create_task(longRunning())
    await asyncio.sleep(1)
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("タスクキャンセル確認")

asyncio.run(cancellationDemo())

タイムアウト制御

import asyncio

async def timeoutDemo():
    async def slowTask():
        await asyncio.sleep(5)
        return "完了"

    try:
        result = await asyncio.wait_for(slowTask(), timeout=2)
    except asyncio.TimeoutError:
        print("タスクがタイムアウト!")

    try:
        async with asyncio.timeout(2):
            await slowTask()
    except TimeoutError:
        print("asyncio.timeout タイムアウト!")

asyncio.run(timeoutDemo())

非同期コードのデバッグ

asyncio.runのデバッグモード

import asyncio

async def debugDemo():
    async def forgottenAwait():
        coro = asyncio.sleep(1)
        print(f"未awaitのコルーチン: {coro}")

    asyncio.run(forgottenAwait(), debug=True)

ロギング設定

import asyncio
import logging

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("asyncio")

async def loggingDemo():
    async def work():
        logger.info("作業開始")
        await asyncio.sleep(0.5)
        logger.info("作業完了")

    await work()

asyncio.run(loggingDemo(), debug=True)

aiomonitorリアルタイム監視

pip install aiomonitor
import asyncio
import aiomonitor

async def main():
    async def backgroundTask():
        while True:
            await asyncio.sleep(1)
            print("バックグラウンドタスク実行中...")

    task = asyncio.create_task(backgroundTask())
    await asyncio.sleep(10)
    task.cancel()

with aiomonitor.start_monitor(loop=asyncio.new_event_loop()):
    asyncio.run(main())

同期と非同期の混在

run_in_executor

import asyncio
import time

def syncCpuBound(n):
    """CPU集約型の同期関数"""
    return sum(i * i for i in range(n))

async def runInExecutorDemo():
    loop = asyncio.get_running_loop()

    result = await loop.run_in_executor(None, syncCpuBound, 10**7)
    print(f"デフォルトスレッドプール結果: {result}")

    import concurrent.futures
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, syncCpuBound, 10**8)
        print(f"プロセスプール結果: {result}")

asyncio.run(runInExecutorDemo())

to_thread(Python 3.9+)

import asyncio

def blockingIo():
    import time
    time.sleep(2)
    return "ブロッキングI/O完了"

async def toThreadDemo():
    result = await asyncio.to_thread(blockingIo)
    print(result)

asyncio.run(toThreadDemo())

パフォーマンス比較:同期 vs 非同期

シナリオ 同期時間 非同期時間 高速化
100 HTTPリクエスト ~100s ~2s 50x
50 DBクエリ ~5s ~0.3s 16x
10 ファイル読み書き ~1s ~0.2s 5x
CPU集約計算 10s 10s+ 1x(改善なし)
混合 I/O+CPU 15s 5s 3x
import asyncio
import time
import aiohttp

async def benchmark():
    urls = [f"https://httpbin.org/delay/0.5" for _ in range(50)]

    async def fetchOne(session, url):
        async with session.get(url) as r:
            return r.status

    start = time.perf_counter()
    async with aiohttp.ClientSession() as session:
        await asyncio.gather(*[fetchOne(session, u) for u in urls])
    elapsed = time.perf_counter() - start
    print(f"50リクエスト非同期時間: {elapsed:.2f}s")

asyncio.run(benchmark())

よくある落とし穴

落とし穴1:イベントループのブロック

import asyncio
import time

async def badBlocking():
    time.sleep(5)

async def goodNonBlocking():
    await asyncio.sleep(5)

async def badSyncCall():
    import requests
    requests.get("https://example.com")

async def goodAsyncCall():
    import aiohttp
    async with aiohttp.ClientSession() as session:
        async with session.get("https://example.com") as r:
            return await r.text()

落とし穴2:awaitの忘れ

import asyncio

async def forgottenAwaitDemo():
    async def getValue():
        await asyncio.sleep(0.1)
        return 42

    result = getValue()
    print(type(result))  # coroutine、42ではない!

    correctResult = await getValue()
    print(correctResult)  # 42

落とし穴3:async関数での同期ロック使用

import asyncio
import threading

async def wrongLock():
    lock = threading.Lock()
    with lock:
        await asyncio.sleep(1)

async def rightLock():
    lock = asyncio.Lock()
    async with lock:
        await asyncio.sleep(1)

本番デプロイ

uvicornでASGIアプリを実行

from fastapi import FastAPI
import asyncio
import aiohttp

app = FastAPI()

@app.get("/api/data")
async def getData():
    async with aiohttp.ClientSession() as session:
        async with session.get("https://api.example.com/data") as response:
            data = await response.json()
            return {"result": data}

@app.get("/api/health")
async def healthCheck():
    return {"status": "ok"}
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4

本番設定

uvicorn main:app \
  --host 0.0.0.0 \
  --port 8000 \
  --workers 4 \
  --loop uvloop \
  --http httptools \
  --access-log \
  --log-level info

Dockerデプロイ

{
  "from": "python:3.12-slim",
  "workdir": "/app",
  "copy": ["requirements.txt .", ". ."],
  "run": "pip install --no-cache-dir -r requirements.txt",
  "cmd": "uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4"
}

FAQ

Q1:asyncioはCPU集約型タスクに適していますか?

いいえ。asyncioはシングルスレッドであり、CPU集約型タスクはイベントループをブロックします。run_in_executorとプロセスプール、またはmultiprocessingを使用してください。

Q2:asyncioかthreadingか?

シナリオ 推奨
高I/O並行(>1000) asyncio
低I/O並行(<100) threading
共有状態が必要 asyncio(競合状態を自然に回避)
既存の同期ライブラリ threading + run_in_executor

Q3:1つのイベントループを複数スレッドで実行できますか?

各スレッドには独自のイベントループが必要です。スレッド間スケジューリングにはasyncio.run_coroutine_threadsafe()を使用します。

Q4:asyncio.gatherかTaskGroupか?

  • gatherreturn_exceptions=Trueやカスタム待機戦略が必要な場合
  • TaskGroup(3.11+):構造化並行性と自動例外伝播が必要な場合 — デフォルトとして推奨

Q5:非同期アプリをグレースフルシャットダウンするには?

import asyncio
import signal

async def gracefulShutdown():
    server = await asyncio.start_server(handle, "0.0.0.0", 8000)

    loop = asyncio.get_running_loop()
    stop = loop.create_future()

    loop.add_signal_handler(signal.SIGTERM, stop.set_result, None)
    loop.add_signal_handler(signal.SIGINT, stop.set_result, None)

    async with server:
        await stop

    print("サーバーをグレースフルにシャットダウン")

推奨ツール

asyncioプログラミングの実践で、以下のToolsKuツールが役立ちます:

  • JSONフォーマッター — 非同期APIレスポンスのJSONデータをフォーマットし、構造問題を迅速に特定
  • Base64エンコード — 非同期リクエスト内のBase64画像/ファイルデータを処理
  • ハッシュ計算 — 非同期キャッシュのキーを生成し、リクエスト重複排除を実現

asyncioはPython非同期プログラミングの礎石です。イベントループ、コルーチン、並行パターン、本番デプロイをマスターすれば、高性能なI/O集約型アプリケーションを構築できます。

ブラウザローカルツールを無料で試す →

#Python#asyncio#异步编程#协程#教程