Python FastAPIプロダクションデプロイ:DockerからK8sまで7つのキー実践戦略

编程语言

FastAPI開発は楽しい、デプロイは悪夢

ローカルでは高速に動作、本番ではクラッシュ——メモリリーク、リクエストタイムアウト、PodがOOM Kill、ヘルスチェック失敗でローリングアップデートが停止。uvicorn main:appで起動しても、単プロセスでは同時実行に耐えられない。Gunicornを追加しても、ワーカー数の調整が悪いと逆に遅くなる。K8sにデプロイすると、readinessProbeの設定ミスでトラフィックが全損。2026年、FastAPIプロダクションデプロイはPythonバックエンドで最も失敗しやすい領域です。

本記事では7つのキー戦略から出発し、Uvicorn設定→Gunicornチューニング→Docker最適化→K8sデプロイ→ヘルスチェック→ミドルウェア→オブザーバビリティのフルパイプラインを実践します。


FastAPIプロダクションデプロイコア概念

概念 説明
Uvicorn ASGIサーバー、uvloopとhttptoolsベースの高性能非同期サーバー
Gunicorn WSGI/ASGIアプリケーションサーバー、複数ワーカープロセスを管理
Worker Gunicornの作業プロセス、各プロセスがUvicornインスタンスを実行
uvloop libuvベースのイベントループ、asyncioデフォルトを置き換え、2-4倍のパフォーマンス向上
ASGI 非同期サーバーゲートウェイインターフェース、FastAPIのランタイムプロトコル
ヘルスチェック K8sがliveness/readinessプローブでアプリケーション状態を判定
グレースフルシャットダウン SIGTERM受信後、進行中のリクエストを完了してから終了、リクエスト損失を防止
レート制限 単位時間あたりのリクエスト数を制限、サービス過負荷を防止

問題分析:FastAPIプロダクションデプロイの5つの課題

  1. 同時実行モデルの選択:非同期IO vs マルチプロセス、ワーカー数の決定方法
  2. Dockerイメージサイズ:ベースイメージ選択、依存インストール、マルチステージビルド
  3. K8sリソース計画:CPU/メモリリクエストとリミット、HPAオートスケーリング
  4. ヘルスチェック設定:livenessとreadinessプローブの閾値とパス設計
  5. オブザーバビリティ:構造化ログ、分散トレーシング、統一メトリクス収集

ステップバイステップ:7つのキー実践戦略

戦略1:Uvicornプロダクション設定

import uvicorn
from app.main import app

if __name__ == "__main__":
    uvicorn.run(
        "app.main:app",
        host="0.0.0.0",
        port=8000,
        workers=4,
        loop="uvloop",
        http="httptools",
        log_level="info",
        access_log=True,
        use_colors=False,
        proxy_headers=True,
        forwarded_allow_ips="*",
        timeout_keep_alive=5,
        limit_concurrency=1000,
        backlog=2048,
    )
# app/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
import logging

logger = logging.getLogger("app")

@asynccontextmanager
async def lifespan(app: FastAPI):
    logger.info("Application starting up...")
    yield
    logger.info("Application shutting down...")

app = FastAPI(
    title="My API",
    version="1.0.0",
    lifespan=lifespan,
    docs_url="/docs",
    redoc_url="/redoc",
)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://example.com"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

@app.get("/health")
async def health_check():
    return {"status": "healthy"}

@app.get("/ready")
async def readiness_check():
    return {"status": "ready"}

戦略2:Gunicorn + Uvicorn Worker

# gunicorn.conf.py
import multiprocessing
import os

bind = "0.0.0.0:8000"
workers = int(os.getenv("GUNICORN_WORKERS", multiprocessing.cpu_count() * 2 + 1))
worker_class = "uvicorn.workers.UvicornWorker"
keepalive = 5
timeout = 120
graceful_timeout = 30
max_requests = 5000
max_requests_jitter = 500
preload_app = True
accesslog = "-"
access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s" %(D)s'
errorlog = "-"
loglevel = "info"
# 起動コマンド
gunicorn app.main:app -c gunicorn.conf.py

戦略3:Dockerマルチステージビルド最適化

# Dockerfile
FROM python:3.12-slim AS builder

WORKDIR /app

RUN apt-get update && apt-get install -y --no-install-recommends \
    build-essential \
    && rm -rf /var/lib/apt/lists/*

COPY requirements.txt .
RUN pip install --no-cache-dir --prefix=/install -r requirements.txt

FROM python:3.12-slim AS runtime

WORKDIR /app

RUN groupadd -r appuser && useradd -r -g appuser appuser

COPY --from=builder /install /usr/local
COPY . .

RUN chown -R appuser:appuser /app

USER appuser

EXPOSE 8000

HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \
    CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')" || exit 1

CMD ["gunicorn", "app.main:app", "-c", "gunicorn.conf.py"]

戦略4:K8s Deployment完全設定

# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: fastapi-app
  labels:
    app: fastapi-app
spec:
  replicas: 3
  selector:
    matchLabels:
      app: fastapi-app
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxSurge: 1
      maxUnavailable: 0
  template:
    metadata:
      labels:
        app: fastapi-app
    spec:
      terminationGracePeriodSeconds: 60
      containers:
        - name: fastapi-app
          image: myregistry.com/fastapi-app:latest
          ports:
            - containerPort: 8000
              protocol: TCP
          env:
            - name: GUNICORN_WORKERS
              value: "4"
            - name: DATABASE_URL
              valueFrom:
                secretKeyRef:
                  name: app-secrets
                  key: database-url
          resources:
            requests:
              cpu: "250m"
              memory: "256Mi"
            limits:
              cpu: "1000m"
              memory: "512Mi"
          livenessProbe:
            httpGet:
              path: /health
              port: 8000
            initialDelaySeconds: 15
            periodSeconds: 20
            timeoutSeconds: 5
            failureThreshold: 3
          readinessProbe:
            httpGet:
              path: /ready
              port: 8000
            initialDelaySeconds: 5
            periodSeconds: 10
            timeoutSeconds: 3
            failureThreshold: 3
          lifecycle:
            preStop:
              exec:
                command: ["/bin/sh", "-c", "sleep 5"]
# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: fastapi-app-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: fastapi-app
  minReplicas: 3
  maxReplicas: 10
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70
  behavior:
    scaleUp:
      stabilizationWindowSeconds: 30
    scaleDown:
      stabilizationWindowSeconds: 300

戦略5:ヘルスチェックとグレースフルシャットダウン

# app/health.py
import asyncio
from fastapi import APIRouter, Response
from app.database import check_db_connection
from app.cache import check_redis_connection

router = APIRouter()

is_shutting_down = False

@router.get("/health")
async def liveness(response: Response):
    if is_shutting_down:
        response.status_code = 503
        return {"status": "shutting_down"}
    return {"status": "healthy"}

@router.get("/ready")
async def readiness(response: Response):
    if is_shutting_down:
        response.status_code = 503
        return {"status": "shutting_down"}

    checks = {
        "database": await check_db_connection(),
        "redis": await check_redis_connection(),
    }

    all_healthy = all(checks.values())
    if not all_healthy:
        response.status_code = 503
        return {"status": "not_ready", "checks": checks}

    return {"status": "ready", "checks": checks}

import signal

def setup_graceful_shutdown():
    def shutdown_handler(signum, frame):
        global is_shutting_down
        is_shutting_down = True

    signal.signal(signal.SIGTERM, shutdown_handler)
    signal.signal(signal.SIGINT, shutdown_handler)

戦略6:レート制限とミドルウェア

# app/middleware/rate_limit.py
import time
from fastapi import Request, Response, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware
from typing import Dict, Tuple

class RateLimitMiddleware(BaseHTTPMiddleware):
    def __init__(self, app, requests_per_minute: int = 60, burst: int = 10):
        super().__init__(app)
        self.requests_per_minute = requests_per_minute
        self.burst = burst
        self._clients: Dict[str, Tuple[int, float]] = {}

    async def dispatch(self, request: Request, call_next):
        client_ip = request.client.host if request.client else "unknown"

        if client_ip not in self._clients:
            self._clients[client_ip] = (1, time.time())
        else:
            count, window_start = self._clients[client_ip]
            elapsed = time.time() - window_start

            if elapsed > 60:
                self._clients[client_ip] = (1, time.time())
            else:
                if count >= self.requests_per_minute:
                    raise HTTPException(
                        status_code=429,
                        detail="Too many requests",
                        headers={"Retry-After": str(int(60 - elapsed))},
                    )
                self._clients[client_ip] = (count + 1, window_start)

        response = await call_next(request)
        return response
# app/middleware/logging.py
import time
import json
import logging
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware

logger = logging.getLogger("app.access")

class LoggingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        start_time = time.time()

        response = await call_next(request)

        duration_ms = (time.time() - start_time) * 1000

        log_entry = {
            "method": request.method,
            "path": request.url.path,
            "status_code": response.status_code,
            "duration_ms": round(duration_ms, 2),
            "client_ip": request.client.host if request.client else None,
        }

        if response.status_code >= 500:
            logger.error(json.dumps(log_entry))
        elif response.status_code >= 400:
            logger.warning(json.dumps(log_entry))
        else:
            logger.info(json.dumps(log_entry))

        response.headers["X-Process-Time"] = f"{duration_ms:.2f}ms"
        return response

戦略7:オブザーバビリティ(OpenTelemetry)

# app/telemetry.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.redis import RedisInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.sdk.resources import Resource

def setup_telemetry(app, service_name: str = "fastapi-app", otlp_endpoint: str = "http://otel-collector:4317"):
    resource = Resource.create({"service.name": service_name})

    provider = TracerProvider(resource=resource)
    provider.add_span_processor(
        BatchSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint, insecure=True))
    )
    trace.set_tracer_provider(provider)

    FastAPIInstrumentor.instrument_app(app)
    RedisInstrumentor().instrument()
    SQLAlchemyInstrumentor().instrument()

落とし穴ガイド

落とし穴1:単ワーカーでプロダクション運用

# ❌ 誤り:単プロセスではマルチコアを活用できない
uvicorn app.main:app --host 0.0.0.0 --port 8000

# ✅ 正しい:Gunicorn + 複数Uvicorn Worker
gunicorn app.main:app -c gunicorn.conf.py
# workers = cpu_count * 2 + 1

落とし穴2:Dockerイメージでpython:latestを使用

# ❌ 誤り:latestタグは予測不可、イメージサイズ1GB+
FROM python:latest

# ✅ 正しい:slimバージョン固定 + マルチステージビルド
FROM python:3.12-slim AS builder
# ... ビルドステージ
FROM python:3.12-slim AS runtime
# ... ランタイムステージ、約150MB

落とし穴3:K8sリソースリミットが小さすぎる

# ❌ 誤り:メモリリミットが低すぎ、OOM Kill頻発
resources:
  limits:
    memory: "128Mi"

# ✅ 正しい:適切なrequestsとlimits
resources:
  requests:
    cpu: "250m"
    memory: "256Mi"
  limits:
    cpu: "1000m"
    memory: "512Mi"

落とし穴4:readinessProbeとlivenessProbeが同じパス

# ❌ 誤り:DB接続不可でlivenessProbeも失敗、Podが再起動
livenessProbe:
  httpGet:
    path: /ready
readinessProbe:
  httpGet:
    path: /ready

# ✅ 正しい:livenessはプロセス生存確認、readinessは依存確認
livenessProbe:
  httpGet:
    path: /health
readinessProbe:
  httpGet:
    path: /ready

落とし穴5:SIGTERMシグナルの無視

# ❌ 誤り:SIGTERM受信で即座に終了、進行中のリクエストが損失
import sys
signal.signal(signal.SIGTERM, lambda s, f: sys.exit(0))

# ✅ 正しい:GunicornはデフォルトでSIGTERMをグレースフルに処理
# K8sのterminationGracePeriodSecondsとpreStopフックと組み合わせ
# gunicorn.conf.pyでgraceful_timeout = 30を設定

エラートラブルシューティング

# エラーメッセージ 原因 解決方法
1 Worker failed to boot アプリのインポートエラーまたは依存不足 import文を確認、すべての依存がインストール済みか確認
2 OOMKilled メモリ制限超過 メモリリミットを増加、メモリリークを確認
3 CrashLoopBackOff コンテナ起動後すぐにクラッシュ Podログを確認:kubectl logs --previous
4 Readiness probe failed 依存サービスが利用不可 DB/Redis接続を確認、initialDelaySecondsを増加
5 Liveness probe failed イベントループがブロック 同期ブロッキング呼び出しを確認、run_in_executorを使用
6 429 Too Many Requests レート制限がトリガー レート制限閾値を調整、異常トラフィックを確認
7 Connection pool exhausted DB接続プール枯渇 pool_sizeを増加、接続リークを確認
8 Timeout waiting for response リクエスト処理タイムアウト Gunicorn timeoutを増加、スロークエリを最適化
9 ImagePullBackOff イメージプル失敗 イメージ名とレジストリアクセス権を確認
10 Permission denied コンテナ内のファイル権限エラー USERディレクティブが正しいことを確認、ボリュームマウント権限を確認

高度な最適化

1. 非同期データベース接続プール

# app/database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker

engine = create_async_engine(
    "postgresql+asyncpg://user:pass@db:5432/mydb",
    pool_size=20,
    max_overflow=10,
    pool_timeout=30,
    pool_recycle=3600,
    pool_pre_ping=True,
)

AsyncSessionLocal = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,
)

async def get_db():
    async with AsyncSessionLocal() as session:
        try:
            yield session
        finally:
            await session.close()

async def check_db_connection() -> bool:
    try:
        async with engine.connect() as conn:
            await conn.execute(text("SELECT 1"))
        return True
    except Exception:
        return False

2. Redisキャッシュレイヤー

# app/cache.py
import redis.asyncio as redis
import json
from typing import Optional, Any

class RedisCache:
    def __init__(self, url: str = "redis://redis:6379/0"):
        self.url = url
        self._client: Optional[redis.Redis] = None

    async def connect(self):
        self._client = redis.from_url(
            self.url,
            max_connections=50,
            decode_responses=True,
            socket_timeout=5,
            retry_on_timeout=True,
        )

    async def disconnect(self):
        if self._client:
            await self._client.close()

    async def get(self, key: str) -> Optional[Any]:
        value = await self._client.get(key)
        if value:
            return json.loads(value)
        return None

    async def set(self, key: str, value: Any, ttl: int = 300):
        await self._client.set(key, json.dumps(value), ex=ttl)

cache = RedisCache()

async def check_redis_connection() -> bool:
    try:
        if cache._client:
            return await cache._client.ping()
        return False
    except Exception:
        return False

3. Prometheusメトリクス収集

# app/metrics.py
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from fastapi import APIRouter, Response

REQUEST_COUNT = Counter(
    "http_requests_total",
    "Total HTTP requests",
    ["method", "endpoint", "status_code"],
)

REQUEST_DURATION = Histogram(
    "http_request_duration_seconds",
    "HTTP request duration in seconds",
    ["method", "endpoint"],
    buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0],
)

router = APIRouter()

@router.get("/metrics")
async def metrics():
    return Response(
        content=generate_latest(),
        media_type="text/plain",
    )

比較分析

次元 Uvicorn単一 Gunicorn+Uvicorn Daphne Hypercorn Uvicorn+K8s
マルチコア ❌単コア ✅マルチワーカー ✅マルチプロセス ✅マルチワーカー ✅マルチPod
非同期サポート ✅ネイティブ ✅ネイティブ ✅ネイティブ ✅HTTP/2 ✅ネイティブ
プロダクション対応 ⚠️組み合わせ必要
グレースフルシャットダウン ⚠️基本 ✅完全 ✅+preStop
オートスケーリング ✅HPA
ローリングアップデート
リソース分離 ⚠️プロセスレベル ⚠️プロセスレベル ⚠️プロセスレベル ✅コンテナレベル
障害復旧 ⚠️supervisor必要 ⚠️ ⚠️ ✅K8s自己修復

まとめ:FastAPIプロダクションデプロイは「1つのコマンド」ではなく「1つの体系」のエンジニアリングです。UvicornからK8sまで、コア原則は3つだけ:マルチプロセスでマルチコアを活用、ヘルスチェックで生存と準備完了を分離、グレースフルシャットダウンでリクエスト損失を防止。Dockerマルチステージビルドでイメージを1GBから150MBに圧縮。Gunicornのgraceful_timeoutとK8sのterminationGracePeriodSecondsの組み合わせでゼロダウンタイムアップデートを実現。覚えておいてください:開発環境ではuvicorn --reload、プロダクションではgunicorn -c gunicorn.conf.py、開発設定をプロダクションに持ち込まないこと。


オンラインツール推奨

ブラウザローカルツールを無料で試す →

#Python#FastAPI#生产部署#Docker#Kubernetes#2026#性能优化