Python FastAPIプロダクションデプロイ:DockerからK8sまで7つのキー実践戦略
编程语言
FastAPI開発は楽しい、デプロイは悪夢
ローカルでは高速に動作、本番ではクラッシュ——メモリリーク、リクエストタイムアウト、PodがOOM Kill、ヘルスチェック失敗でローリングアップデートが停止。uvicorn main:appで起動しても、単プロセスでは同時実行に耐えられない。Gunicornを追加しても、ワーカー数の調整が悪いと逆に遅くなる。K8sにデプロイすると、readinessProbeの設定ミスでトラフィックが全損。2026年、FastAPIプロダクションデプロイはPythonバックエンドで最も失敗しやすい領域です。
本記事では7つのキー戦略から出発し、Uvicorn設定→Gunicornチューニング→Docker最適化→K8sデプロイ→ヘルスチェック→ミドルウェア→オブザーバビリティのフルパイプラインを実践します。
FastAPIプロダクションデプロイコア概念
| 概念 | 説明 |
|---|---|
| Uvicorn | ASGIサーバー、uvloopとhttptoolsベースの高性能非同期サーバー |
| Gunicorn | WSGI/ASGIアプリケーションサーバー、複数ワーカープロセスを管理 |
| Worker | Gunicornの作業プロセス、各プロセスがUvicornインスタンスを実行 |
| uvloop | libuvベースのイベントループ、asyncioデフォルトを置き換え、2-4倍のパフォーマンス向上 |
| ASGI | 非同期サーバーゲートウェイインターフェース、FastAPIのランタイムプロトコル |
| ヘルスチェック | K8sがliveness/readinessプローブでアプリケーション状態を判定 |
| グレースフルシャットダウン | SIGTERM受信後、進行中のリクエストを完了してから終了、リクエスト損失を防止 |
| レート制限 | 単位時間あたりのリクエスト数を制限、サービス過負荷を防止 |
問題分析:FastAPIプロダクションデプロイの5つの課題
- 同時実行モデルの選択:非同期IO vs マルチプロセス、ワーカー数の決定方法
- Dockerイメージサイズ:ベースイメージ選択、依存インストール、マルチステージビルド
- K8sリソース計画:CPU/メモリリクエストとリミット、HPAオートスケーリング
- ヘルスチェック設定:livenessとreadinessプローブの閾値とパス設計
- オブザーバビリティ:構造化ログ、分散トレーシング、統一メトリクス収集
ステップバイステップ:7つのキー実践戦略
戦略1:Uvicornプロダクション設定
import uvicorn
from app.main import app
if __name__ == "__main__":
uvicorn.run(
"app.main:app",
host="0.0.0.0",
port=8000,
workers=4,
loop="uvloop",
http="httptools",
log_level="info",
access_log=True,
use_colors=False,
proxy_headers=True,
forwarded_allow_ips="*",
timeout_keep_alive=5,
limit_concurrency=1000,
backlog=2048,
)
# app/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
import logging
logger = logging.getLogger("app")
@asynccontextmanager
async def lifespan(app: FastAPI):
logger.info("Application starting up...")
yield
logger.info("Application shutting down...")
app = FastAPI(
title="My API",
version="1.0.0",
lifespan=lifespan,
docs_url="/docs",
redoc_url="/redoc",
)
app.add_middleware(
CORSMiddleware,
allow_origins=["https://example.com"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/health")
async def health_check():
return {"status": "healthy"}
@app.get("/ready")
async def readiness_check():
return {"status": "ready"}
戦略2:Gunicorn + Uvicorn Worker
# gunicorn.conf.py
import multiprocessing
import os
bind = "0.0.0.0:8000"
workers = int(os.getenv("GUNICORN_WORKERS", multiprocessing.cpu_count() * 2 + 1))
worker_class = "uvicorn.workers.UvicornWorker"
keepalive = 5
timeout = 120
graceful_timeout = 30
max_requests = 5000
max_requests_jitter = 500
preload_app = True
accesslog = "-"
access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s" %(D)s'
errorlog = "-"
loglevel = "info"
# 起動コマンド
gunicorn app.main:app -c gunicorn.conf.py
戦略3:Dockerマルチステージビルド最適化
# Dockerfile
FROM python:3.12-slim AS builder
WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir --prefix=/install -r requirements.txt
FROM python:3.12-slim AS runtime
WORKDIR /app
RUN groupadd -r appuser && useradd -r -g appuser appuser
COPY --from=builder /install /usr/local
COPY . .
RUN chown -R appuser:appuser /app
USER appuser
EXPOSE 8000
HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \
CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')" || exit 1
CMD ["gunicorn", "app.main:app", "-c", "gunicorn.conf.py"]
戦略4:K8s Deployment完全設定
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: fastapi-app
labels:
app: fastapi-app
spec:
replicas: 3
selector:
matchLabels:
app: fastapi-app
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 1
maxUnavailable: 0
template:
metadata:
labels:
app: fastapi-app
spec:
terminationGracePeriodSeconds: 60
containers:
- name: fastapi-app
image: myregistry.com/fastapi-app:latest
ports:
- containerPort: 8000
protocol: TCP
env:
- name: GUNICORN_WORKERS
value: "4"
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: app-secrets
key: database-url
resources:
requests:
cpu: "250m"
memory: "256Mi"
limits:
cpu: "1000m"
memory: "512Mi"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 15
periodSeconds: 20
timeoutSeconds: 5
failureThreshold: 3
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 10
timeoutSeconds: 3
failureThreshold: 3
lifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 5"]
# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: fastapi-app-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: fastapi-app
minReplicas: 3
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
behavior:
scaleUp:
stabilizationWindowSeconds: 30
scaleDown:
stabilizationWindowSeconds: 300
戦略5:ヘルスチェックとグレースフルシャットダウン
# app/health.py
import asyncio
from fastapi import APIRouter, Response
from app.database import check_db_connection
from app.cache import check_redis_connection
router = APIRouter()
is_shutting_down = False
@router.get("/health")
async def liveness(response: Response):
if is_shutting_down:
response.status_code = 503
return {"status": "shutting_down"}
return {"status": "healthy"}
@router.get("/ready")
async def readiness(response: Response):
if is_shutting_down:
response.status_code = 503
return {"status": "shutting_down"}
checks = {
"database": await check_db_connection(),
"redis": await check_redis_connection(),
}
all_healthy = all(checks.values())
if not all_healthy:
response.status_code = 503
return {"status": "not_ready", "checks": checks}
return {"status": "ready", "checks": checks}
import signal
def setup_graceful_shutdown():
def shutdown_handler(signum, frame):
global is_shutting_down
is_shutting_down = True
signal.signal(signal.SIGTERM, shutdown_handler)
signal.signal(signal.SIGINT, shutdown_handler)
戦略6:レート制限とミドルウェア
# app/middleware/rate_limit.py
import time
from fastapi import Request, Response, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware
from typing import Dict, Tuple
class RateLimitMiddleware(BaseHTTPMiddleware):
def __init__(self, app, requests_per_minute: int = 60, burst: int = 10):
super().__init__(app)
self.requests_per_minute = requests_per_minute
self.burst = burst
self._clients: Dict[str, Tuple[int, float]] = {}
async def dispatch(self, request: Request, call_next):
client_ip = request.client.host if request.client else "unknown"
if client_ip not in self._clients:
self._clients[client_ip] = (1, time.time())
else:
count, window_start = self._clients[client_ip]
elapsed = time.time() - window_start
if elapsed > 60:
self._clients[client_ip] = (1, time.time())
else:
if count >= self.requests_per_minute:
raise HTTPException(
status_code=429,
detail="Too many requests",
headers={"Retry-After": str(int(60 - elapsed))},
)
self._clients[client_ip] = (count + 1, window_start)
response = await call_next(request)
return response
# app/middleware/logging.py
import time
import json
import logging
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware
logger = logging.getLogger("app.access")
class LoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
start_time = time.time()
response = await call_next(request)
duration_ms = (time.time() - start_time) * 1000
log_entry = {
"method": request.method,
"path": request.url.path,
"status_code": response.status_code,
"duration_ms": round(duration_ms, 2),
"client_ip": request.client.host if request.client else None,
}
if response.status_code >= 500:
logger.error(json.dumps(log_entry))
elif response.status_code >= 400:
logger.warning(json.dumps(log_entry))
else:
logger.info(json.dumps(log_entry))
response.headers["X-Process-Time"] = f"{duration_ms:.2f}ms"
return response
戦略7:オブザーバビリティ(OpenTelemetry)
# app/telemetry.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.redis import RedisInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.sdk.resources import Resource
def setup_telemetry(app, service_name: str = "fastapi-app", otlp_endpoint: str = "http://otel-collector:4317"):
resource = Resource.create({"service.name": service_name})
provider = TracerProvider(resource=resource)
provider.add_span_processor(
BatchSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint, insecure=True))
)
trace.set_tracer_provider(provider)
FastAPIInstrumentor.instrument_app(app)
RedisInstrumentor().instrument()
SQLAlchemyInstrumentor().instrument()
落とし穴ガイド
落とし穴1:単ワーカーでプロダクション運用
# ❌ 誤り:単プロセスではマルチコアを活用できない
uvicorn app.main:app --host 0.0.0.0 --port 8000
# ✅ 正しい:Gunicorn + 複数Uvicorn Worker
gunicorn app.main:app -c gunicorn.conf.py
# workers = cpu_count * 2 + 1
落とし穴2:Dockerイメージでpython:latestを使用
# ❌ 誤り:latestタグは予測不可、イメージサイズ1GB+
FROM python:latest
# ✅ 正しい:slimバージョン固定 + マルチステージビルド
FROM python:3.12-slim AS builder
# ... ビルドステージ
FROM python:3.12-slim AS runtime
# ... ランタイムステージ、約150MB
落とし穴3:K8sリソースリミットが小さすぎる
# ❌ 誤り:メモリリミットが低すぎ、OOM Kill頻発
resources:
limits:
memory: "128Mi"
# ✅ 正しい:適切なrequestsとlimits
resources:
requests:
cpu: "250m"
memory: "256Mi"
limits:
cpu: "1000m"
memory: "512Mi"
落とし穴4:readinessProbeとlivenessProbeが同じパス
# ❌ 誤り:DB接続不可でlivenessProbeも失敗、Podが再起動
livenessProbe:
httpGet:
path: /ready
readinessProbe:
httpGet:
path: /ready
# ✅ 正しい:livenessはプロセス生存確認、readinessは依存確認
livenessProbe:
httpGet:
path: /health
readinessProbe:
httpGet:
path: /ready
落とし穴5:SIGTERMシグナルの無視
# ❌ 誤り:SIGTERM受信で即座に終了、進行中のリクエストが損失
import sys
signal.signal(signal.SIGTERM, lambda s, f: sys.exit(0))
# ✅ 正しい:GunicornはデフォルトでSIGTERMをグレースフルに処理
# K8sのterminationGracePeriodSecondsとpreStopフックと組み合わせ
# gunicorn.conf.pyでgraceful_timeout = 30を設定
エラートラブルシューティング
| # | エラーメッセージ | 原因 | 解決方法 |
|---|---|---|---|
| 1 | Worker failed to boot |
アプリのインポートエラーまたは依存不足 | import文を確認、すべての依存がインストール済みか確認 |
| 2 | OOMKilled |
メモリ制限超過 | メモリリミットを増加、メモリリークを確認 |
| 3 | CrashLoopBackOff |
コンテナ起動後すぐにクラッシュ | Podログを確認:kubectl logs --previous |
| 4 | Readiness probe failed |
依存サービスが利用不可 | DB/Redis接続を確認、initialDelaySecondsを増加 |
| 5 | Liveness probe failed |
イベントループがブロック | 同期ブロッキング呼び出しを確認、run_in_executorを使用 |
| 6 | 429 Too Many Requests |
レート制限がトリガー | レート制限閾値を調整、異常トラフィックを確認 |
| 7 | Connection pool exhausted |
DB接続プール枯渇 | pool_sizeを増加、接続リークを確認 |
| 8 | Timeout waiting for response |
リクエスト処理タイムアウト | Gunicorn timeoutを増加、スロークエリを最適化 |
| 9 | ImagePullBackOff |
イメージプル失敗 | イメージ名とレジストリアクセス権を確認 |
| 10 | Permission denied |
コンテナ内のファイル権限エラー | USERディレクティブが正しいことを確認、ボリュームマウント権限を確認 |
高度な最適化
1. 非同期データベース接続プール
# app/database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
engine = create_async_engine(
"postgresql+asyncpg://user:pass@db:5432/mydb",
pool_size=20,
max_overflow=10,
pool_timeout=30,
pool_recycle=3600,
pool_pre_ping=True,
)
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()
async def check_db_connection() -> bool:
try:
async with engine.connect() as conn:
await conn.execute(text("SELECT 1"))
return True
except Exception:
return False
2. Redisキャッシュレイヤー
# app/cache.py
import redis.asyncio as redis
import json
from typing import Optional, Any
class RedisCache:
def __init__(self, url: str = "redis://redis:6379/0"):
self.url = url
self._client: Optional[redis.Redis] = None
async def connect(self):
self._client = redis.from_url(
self.url,
max_connections=50,
decode_responses=True,
socket_timeout=5,
retry_on_timeout=True,
)
async def disconnect(self):
if self._client:
await self._client.close()
async def get(self, key: str) -> Optional[Any]:
value = await self._client.get(key)
if value:
return json.loads(value)
return None
async def set(self, key: str, value: Any, ttl: int = 300):
await self._client.set(key, json.dumps(value), ex=ttl)
cache = RedisCache()
async def check_redis_connection() -> bool:
try:
if cache._client:
return await cache._client.ping()
return False
except Exception:
return False
3. Prometheusメトリクス収集
# app/metrics.py
from prometheus_client import Counter, Histogram, Gauge, generate_latest
from fastapi import APIRouter, Response
REQUEST_COUNT = Counter(
"http_requests_total",
"Total HTTP requests",
["method", "endpoint", "status_code"],
)
REQUEST_DURATION = Histogram(
"http_request_duration_seconds",
"HTTP request duration in seconds",
["method", "endpoint"],
buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0],
)
router = APIRouter()
@router.get("/metrics")
async def metrics():
return Response(
content=generate_latest(),
media_type="text/plain",
)
比較分析
| 次元 | Uvicorn単一 | Gunicorn+Uvicorn | Daphne | Hypercorn | Uvicorn+K8s |
|---|---|---|---|---|---|
| マルチコア | ❌単コア | ✅マルチワーカー | ✅マルチプロセス | ✅マルチワーカー | ✅マルチPod |
| 非同期サポート | ✅ネイティブ | ✅ネイティブ | ✅ネイティブ | ✅HTTP/2 | ✅ネイティブ |
| プロダクション対応 | ⚠️組み合わせ必要 | ✅ | ✅ | ✅ | ✅ |
| グレースフルシャットダウン | ⚠️基本 | ✅完全 | ✅ | ✅ | ✅+preStop |
| オートスケーリング | ❌ | ❌ | ❌ | ❌ | ✅HPA |
| ローリングアップデート | ❌ | ❌ | ❌ | ❌ | ✅ |
| リソース分離 | ❌ | ⚠️プロセスレベル | ⚠️プロセスレベル | ⚠️プロセスレベル | ✅コンテナレベル |
| 障害復旧 | ❌ | ⚠️supervisor必要 | ⚠️ | ⚠️ | ✅K8s自己修復 |
まとめ:FastAPIプロダクションデプロイは「1つのコマンド」ではなく「1つの体系」のエンジニアリングです。UvicornからK8sまで、コア原則は3つだけ:マルチプロセスでマルチコアを活用、ヘルスチェックで生存と準備完了を分離、グレースフルシャットダウンでリクエスト損失を防止。Dockerマルチステージビルドでイメージを1GBから150MBに圧縮。Gunicornのgraceful_timeoutとK8sのterminationGracePeriodSecondsの組み合わせでゼロダウンタイムアップデートを実現。覚えておいてください:開発環境では
uvicorn --reload、プロダクションではgunicorn -c gunicorn.conf.py、開発設定をプロダクションに持ち込まないこと。
オンラインツール推奨
- JSONフォーマッター:/ja/json/format
- Base64エンコード/デコード:/ja/encode/base64
- Hash計算:/ja/encode/hash
- JWTデコード:/ja/encode/jwt-decode
ブラウザローカルツールを無料で試す →
#Python#FastAPI#生产部署#Docker#Kubernetes#2026#性能优化