Python Celery分散タスクキュ実践:基本タスクからイベント駆動パイプラインまで6つのプロダクションパターン

编程语言

バックグラウンドタスクがAPIをブロック、Cronがスケールせず、タスク失敗時にリトライなし

ユーザーがアバターをアップロードして5種類のサムネイルを生成する必要があるが、APIが同期的に処理してタイムアウト。毎日午前2時のデータクリーンアップCronは、データ量が倍増すると完了しない。サードパーティ決済のコールバックが時々失敗し、リトライ機構がないため手動対応しかない。2026年、Python Celeryは依然として分散タスクキュの王者——バージョン5.4はネイティブasyncioサポートとより安定したCanvasワークフローをもたらし、単一マシン開発からK8sクラスタデプロイまで、ひとつのコードベースで対応できる。

本記事では6つのプロダクションパターンから、基本タスク定義→ルーティングキュー分離→指数バックオフリトライ→Canvasワークフロー→Flower+Prometheus監視→Docker/K8sプロダクションデプロイまでのフルスタック実践を解説する。各ステップには完全に実行可能なPythonコードを含む。


Celeryコア概念

概念 説明
Celery Python分散タスクキュフレームワーク、複数のBrokerと結果バックエンドをサポート
Broker メッセージミドルウェア、プロデューサー(クライアント)からタスクメッセージを受信しコンシューマー(Worker)に配信、Redis/RabbitMQが一般的
Worker コンシューマープロセス、Brokerからタスクを取得して実行、マルチプロセス/コルーチン/スレッド並行モードをサポート
Task Celery管理下の非同期関数、@app.taskデコレータで登録、呼び出し時にAsyncResultを返す
Canvas Celeryのワークフロープリミティブ、chain(チェーン)、group(並列)、chord(並列+集約)などの組み合わせパターンをサポート
Chain チェーンワークフロー、前のタスクの出力が次のタスクの入力となる、直列実行
Chord 並列+集約ワークフロー、group内の全タスク完了後にcallbackタスクが結果を集約
Group 並列ワークフロー、タスク群が同時に実行され、互いに依存しない
Flower Celeryのリアルタイム監視Webパネル、タスク状態、Worker状態、キュー深度などの可視化を提供
Result Backend タスク結果保存バックエンド、タスクの戻り値と状態を保存、Redis/データベースが一般的
Beat Celeryの定期タスクスケジューラ、Cronに似ているが動的設定と分散ロックをサポート
Signature タスクシグネチャ、タスク呼び出しパラメータをシリアライズして受け渡し可能なオブジェクトにする、Canvas構成に使用

問題分析:分散タスクキュの5つの課題

  1. タスク定義と呼び出しの結合:ビジネスコードが時間のかかる操作を直接呼び出し、同期待ちでAPI応答が遅くなる。時間のかかる操作を非同期タスクとして抽象化し、呼び出し側を分離する必要がある
  2. タスクルーティングとリソース分離:CPU集約型タスクとIO集約型タスクが同じキューに混在し、IOタスクがCPUタスクにブロックされる。キュー分離とルーティング戦略が必要
  3. タスク失敗とリトライストーム:サードパーティサービスが利用不可の際、タスクが繰り返し失敗・リトライし、バックオフ戦略がないと雪崩現象が発生。指数バックオフ+最大リトライ回数+デッドレター処理が必要
  4. 複雑なワークフローオーケストレーション:マルチステップタスクは直列/並列の組み合わせが必要で、手動コールバックチェーンは保守困難。Canvasプリミティブによる宣言的オーケストレーションが必要
  5. プロダクション環境のオブザーバビリティ:タスク蓄積、Workerクラッシュ、キュー閉塞を即座に発見できない。リアルタイム監視+アラート+オートスケーリングが必要

ステップバイステップ:6つのCelery分散タスクパターン

パターン1:基本タスク定義と実行

最もシンプルなCeleryタスク:非同期関数の定義、Workerの起動、呼び出しと結果取得。

# celery_config.py
broker_url = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/1"
task_serializer = "json"
result_serializer = "json"
accept_content = ["json"]
timezone = "Asia/Tokyo"
enable_utc = True
# tasks.py
from celery import Celery

app = Celery("myapp")
app.config_from_object("celery_config")

@app.task
def add(x, y):
    return x + y

@app.task
def send_email(to, subject, body):
    import time
    time.sleep(2)
    return {"status": "sent", "to": to, "subject": subject}

@app.task
def generate_thumbnail(image_path, sizes):
    import time
    time.sleep(3)
    thumbnails = []
    for size in sizes:
        thumbnails.append(f"{image_path}_{size}x{size}.jpg")
    return {"image": image_path, "thumbnails": thumbnails}

if __name__ == "__main__":
    app.start()
# client.py - タスクの呼び出し
from tasks import add, send_email, generate_thumbnail

result = add.delay(4, 6)
print(f"Task ID: {result.id}")
print(f"Ready: {result.ready()}")
print(f"Result: {result.get(timeout=10)}")

email_result = send_email.delay("user@example.com", "Welcome", "Hello!")
print(f"Email: {email_result.get(timeout=10)}")

thumb_result = generate_thumbnail.delay("/uploads/avatar.png", [64, 128, 256])
print(f"Thumbnails: {thumb_result.get(timeout=10)}")
# Workerの起動
celery -A tasks worker --loglevel=info --concurrency=4

パターン2:タスクルーティングとキュー分離

異なるタイプのタスクを異なるキューに振り分け、キューごとにWorkerのリソースを分離。

# celery_config_routing.py
broker_url = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/1"
task_serializer = "json"
result_serializer = "json"

task_routes = {
    "tasks.compute.*": {"queue": "compute"},
    "tasks.io.*": {"queue": "io"},
    "tasks.default.*": {"queue": "default"},
}

task_default_queue = "default"
task_queues = {
    "default": {
        "exchange": "default",
        "routing_key": "default",
    },
    "compute": {
        "exchange": "compute",
        "routing_key": "compute",
    },
    "io": {
        "exchange": "io",
        "routing_key": "io",
    },
}
# tasks_routing.py
from celery import Celery

app = Celery("routed_app")
app.config_from_object("celery_config_routing")

@app.task(name="tasks.compute.ml_inference")
def ml_inference(model_id, input_data):
    import time
    time.sleep(5)
    return {"model": model_id, "prediction": 0.95, "latency_ms": 5000}

@app.task(name="tasks.compute.data_pipeline")
def data_pipeline(source, target):
    import time
    time.sleep(8)
    return {"source": source, "target": target, "rows": 100000}

@app.task(name="tasks.io.send_notification")
def send_notification(user_id, message):
    import time
    time.sleep(1)
    return {"user_id": user_id, "status": "delivered"}

@app.task(name="tasks.io.sync_external_api")
def sync_external_api(endpoint, payload):
    import time
    time.sleep(2)
    return {"endpoint": endpoint, "status_code": 200}

@app.task(name="tasks.default.cleanup")
def cleanup(days_old):
    return {"deleted_records": days_old * 100}
# 異なるキューのWorkerを起動
celery -A tasks_routing worker -Q compute --loglevel=info --concurrency=2 -n compute@%h
celery -A tasks_routing worker -Q io --loglevel=info --concurrency=8 -n io@%h
celery -A tasks_routing worker -Q default --loglevel=info --concurrency=4 -n default@%h
# client_routing.py
from tasks_routing import ml_inference, send_notification, cleanup

ml_inference.delay("resnet50", {"image": "base64..."})
send_notification.delay("USR-001", "Your order has shipped")
cleanup.delay(30)

# 動的にキューを指定
from tasks_routing import sync_external_api
sync_external_api.apply_async(
    args=["/api/v1/sync", {"batch": 100}],
    queue="io",
    priority=5,
)

パターン3:指数バックオフリトライ戦略

タスク失敗時に自動リトライ、指数バックオフで雪崩を防止、最大リトライ回数でセーフティネット。

# tasks_retry.py
from celery import Celery
from celery.utils.log import get_task_logger
import random
import time

logger = get_task_logger(__name__)

app = Celery("retry_app")
app.config_from_object("celery_config")

class ExternalAPIError(Exception):
    pass

class RateLimitError(Exception):
    pass

@app.task(
    bind=True,
    max_retries=5,
    default_retry_delay=30,
    autoretry_for=(ExternalAPIError,),
    retry_backoff=True,
    retry_backoff_max=600,
    retry_jitter=True,
)
def call_payment_api(self, order_id, amount):
    try:
        success = random.random() > 0.7
        if not success:
            raise ExternalAPIError(f"Payment API timeout for order {order_id}")
        return {"order_id": order_id, "amount": amount, "status": "paid"}
    except ExternalAPIError as exc:
        logger.warning(f"Payment failed for {order_id}, retry {self.request.retries + 1}/{self.max_retries}")
        raise self.retry(exc=exc)

@app.task(
    bind=True,
    max_retries=3,
    autoretry_for=(RateLimitError,),
    retry_backoff=2,
    retry_backoff_max=120,
    retry_jitter=True,
)
def call_rate_limited_api(self, endpoint):
    try:
        if random.random() > 0.5:
            raise RateLimitError(f"Rate limited on {endpoint}")
        return {"endpoint": endpoint, "data": "success"}
    except RateLimitError as exc:
        countdown = 2 ** self.request.retries + random.uniform(0, 1)
        logger.info(f"Rate limited, retrying in {countdown:.1f}s (attempt {self.request.retries + 1})")
        raise self.retry(exc=exc, countdown=countdown)

@app.task(bind=True, max_retries=4)
def process_with_custom_retry(self, data):
    try:
        result = _do_risky_operation(data)
        return result
    except Exception as exc:
        if self.request.retries >= self.max_retries:
            logger.error(f"Max retries exceeded for {data}, sending to DLQ")
            send_to_dlq.delay(str(data), str(exc), self.request.retries)
            return {"status": "failed", "data": data, "error": str(exc)}
        countdown = min(60, 5 * (2 ** self.request.retries))
        logger.warning(f"Processing failed, retry in {countdown}s")
        raise self.retry(exc=exc, countdown=countdown)

def _do_risky_operation(data):
    if random.random() > 0.4:
        raise ValueError(f"Random failure processing {data}")
    return {"processed": data, "result": "ok"}

@app.task
def send_to_dlq(task_data, error, retry_count):
    logger.error(f"DLQ: task={task_data}, error={error}, retries={retry_count}")
    return {"dlq": True, "task": task_data, "error": error}
# client_retry.py
from tasks_retry import call_payment_api, call_rate_limited_api, process_with_custom_retry

result = call_payment_api.delay("ORD-001", 99.99)
print(f"Payment task: {result.id}")

result2 = call_rate_limited_api.delay("/api/v2/users")
print(f"Rate limited task: {result2.id}")

result3 = process_with_custom_retry.delay({"key": "value"})
print(f"Custom retry task: {result3.id}")

パターン4:Canvasワークフロー(Chain、Chord、Group)

複雑なワークフローの宣言的オーケストレーション:直列チェーン、並列グループ、並列+集約。

# tasks_canvas.py
from celery import Celery, chain, group, chord
from celery import signature

app = Celery("canvas_app")
app.config_from_object("celery_config")

@app.task
def download_url(url):
    import time
    time.sleep(1)
    return {"url": url, "content": f"content_of_{url.split('/')[-1]}"}

@app.task
def parse_content(download_result):
    content = download_result["content"]
    return {"parsed": True, "items": len(content), "source": download_result["url"]}

@app.task
def store_results(parse_result):
    return {"stored": True, "items": parse_result["items"], "source": parse_result["source"]}

@app.task
def aggregate_results(results):
    total_items = sum(r["items"] for r in results)
    return {"total_items": total_items, "sources": len(results), "aggregated": True}

@app.task
def send_report(aggregate_result):
    return {
        "report_sent": True,
        "total_items": aggregate_result["total_items"],
        "sources": aggregate_result["sources"],
    }

# Chain: 直列ワークフロー
def run_chain():
    workflow = chain(
        download_url.s("https://api.example.com/data"),
        parse_content.s(),
        store_results.s(),
    )
    result = workflow.apply_async()
    return result.id

# Group: 並列ワークフロー
def run_group():
    urls = [
        "https://api.example.com/users",
        "https://api.example.com/orders",
        "https://api.example.com/products",
    ]
    workflow = group(download_url.s(url) for url in urls)
    result = workflow.apply_async()
    return result.id

# Chord: 並列実行+集約
def run_chord():
    urls = [
        "https://api.example.com/users",
        "https://api.example.com/orders",
        "https://api.example.com/products",
    ]
    workflow = chord(
        [chain(download_url.s(url), parse_content.s()) for url in urls],
        aggregate_results.s(),
    )
    result = workflow.apply_async()
    return result.id

# 複雑なネストワークフロー
def run_complex_workflow():
    header = group(
        chain(download_url.s(f"https://api.example.com/batch/{i}"), parse_content.s())
        for i in range(1, 4)
    )
    workflow = chain(
        header,
        aggregate_results.s(),
        send_report.s(),
    )
    result = workflow.apply_async()
    return result.id
# client_canvas.py
from tasks_canvas import run_chain, run_group, run_chord, run_complex_workflow
from tasks_canvas import download_url, parse_content, store_results, aggregate_results
from celery import chain, group, chord

# 方法1:ラッパー関数を使用
chain_id = run_chain()
print(f"Chain task: {chain_id}")

group_id = run_group()
print(f"Group task: {group_id}")

chord_id = run_chord()
print(f"Chord task: {chord_id}")

# 方法2:Canvasプリミティブを直接使用
from tasks_canvas import app

result = chain(
    download_url.s("https://api.example.com/report"),
    parse_content.s(),
    store_results.s(),
).apply_async()

print(f"Direct chain: {result.id}")

# シグネチャを使用した遅延構築
sig = download_url.s("https://api.example.com/dynamic")
result = chain(sig, parse_content.s(), store_results.s()).apply_async()
print(f"Signature chain: {result.id}")

パターン5:Flower監視とPrometheus統合

タスク状態とWorker健全性のリアルタイム監視、Prometheusアラートとの統合。

# celery_config_monitoring.py
broker_url = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/1"
task_serializer = "json"
result_serializer = "json"

task_send_sent_event = True
task_track_started = True
worker_send_task_events = True

task_routes = {
    "monitoring_tasks.*": {"queue": "monitoring"},
}

flower_port = 5555
flower_basic_auth = ["admin:secret123"]
flower_persistent = True
flower_db = "flower_db"
flower_max_tasks = 10000
# monitoring_tasks.py
from celery import Celery
from celery.utils.log import get_task_logger
import time
import random

logger = get_task_logger(__name__)

app = Celery("monitoring_app")
app.config_from_object("celery_config_monitoring")

@app.task(name="monitoring_tasks.process_order", bind=True, track_started=True)
def process_order(self, order_id):
    self.update_state(state="PROCESSING", meta={"step": "validating", "order_id": order_id})
    time.sleep(1)

    self.update_state(state="PROCESSING", meta={"step": "charging", "order_id": order_id})
    time.sleep(2)

    self.update_state(state="PROCESSING", meta={"step": "fulfilling", "order_id": order_id})
    time.sleep(1)

    return {"order_id": order_id, "status": "completed", "amount": random.randint(100, 9999)}

@app.task(name="monitoring_tasks.generate_report", bind=True, track_started=True, time_limit=300)
def generate_report(self, report_type, date_range):
    self.update_state(state="GENERATING", meta={"progress": 0, "type": report_type})
    for i in range(1, 6):
        time.sleep(1)
        self.update_state(state="GENERATING", meta={"progress": i * 20, "type": report_type})

    return {"report_type": report_type, "date_range": date_range, "rows": random.randint(1000, 50000)}
# prometheus_exporter.py - カスタムPrometheusメトリクス
from prometheus_client import Counter, Histogram, Gauge, start_http_server

TASK_STARTED = Counter(
    "celery_task_started_total",
    "Total number of Celery tasks started",
    ["task_name", "queue"],
)

TASK_COMPLETED = Counter(
    "celery_task_completed_total",
    "Total number of Celery tasks completed",
    ["task_name", "queue", "status"],
)

TASK_DURATION = Histogram(
    "celery_task_duration_seconds",
    "Celery task duration in seconds",
    ["task_name", "queue"],
    buckets=[0.1, 0.5, 1, 2, 5, 10, 30, 60, 120, 300],
)

QUEUE_DEPTH = Gauge(
    "celery_queue_depth",
    "Number of tasks waiting in queue",
    ["queue_name"],
)

ACTIVE_WORKERS = Gauge(
    "celery_active_workers",
    "Number of active Celery workers",
)

def start_metrics_server(port=9090):
    start_http_server(port)
    print(f"Prometheus metrics server started on :{port}")
# celery_events_monitor.py - イベントリスナー
from celery import Celery
from prometheus_exporter import TASK_STARTED, TASK_COMPLETED, TASK_DURATION
import time

app = Celery("monitoring_app")
app.config_from_object("celery_config_monitoring")

def monitor_events():
    with app.connection() as connection:
        recv = app.events.Receiver(
            connection,
            handlers={
                "task-sent": on_task_sent,
                "task-started": on_task_started,
                "task-succeeded": on_task_succeeded,
                "task-failed": on_task_failed,
                "task-retried": on_task_retried,
                "worker-heartbeat": on_worker_heartbeat,
            },
        )
        recv.capture(limit=None, timeout=None, wakeup=True)

def on_task_sent(event):
    pass

def on_task_started(event):
    task_name = event.get("name", "unknown")
    queue = event.get("queue", "unknown")
    TASK_STARTED.labels(task_name=task_name, queue=queue).inc()

def on_task_succeeded(event):
    task_name = event.get("name", "unknown")
    queue = event.get("queue", "unknown")
    runtime = event.get("runtime", 0)
    TASK_COMPLETED.labels(task_name=task_name, queue=queue, status="success").inc()
    TASK_DURATION.labels(task_name=task_name, queue=queue).observe(runtime)

def on_task_failed(event):
    task_name = event.get("name", "unknown")
    queue = event.get("queue", "unknown")
    TASK_COMPLETED.labels(task_name=task_name, queue=queue, status="failed").inc()

def on_task_retried(event):
    task_name = event.get("name", "unknown")
    TASK_COMPLETED.labels(task_name=task_name, queue="unknown", status="retried").inc()

def on_worker_heartbeat(event):
    pass

if __name__ == "__main__":
    from prometheus_exporter import start_metrics_server
    start_metrics_server(9090)
    monitor_events()
# Flower監視の起動
celery -A monitoring_tasks flower --port=5555 --basic-auth=admin:secret123

# イベントリスナー+Prometheusの起動
python celery_events_monitor.py

パターン6:プロダクションCeleryサービス(Docker + Kubernetes)

Dockerマルチステージビルド、K8s Deployment、HPAオートスケーリング。

# Dockerfile
FROM python:3.12-slim AS builder

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --prefix=/install -r requirements.txt

FROM python:3.12-slim

WORKDIR /app
COPY --from=builder /install /usr/local
COPY . .

ENV PYTHONUNBUFFERED=1
ENV CELERY_BROKER_URL=redis://redis:6379/0
ENV CELERY_RESULT_BACKEND=redis://redis:6379/1

CMD ["celery", "-A", "tasks", "worker", "--loglevel=info"]
# docker-compose.yml
version: "3.8"

services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    command: redis-server --appendonly yes --maxmemory 512mb --maxmemory-policy allkeys-lru
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5

  worker-default:
    build: .
    command: celery -A tasks worker -Q default --loglevel=info --concurrency=4 -n default@%h
    depends_on:
      redis:
        condition: service_healthy
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/1
    deploy:
      replicas: 2
      resources:
        limits:
          memory: 512M
          cpus: "1.0"
        reservations:
          memory: 256M
          cpus: "0.5"
    restart: unless-stopped

  worker-compute:
    build: .
    command: celery -A tasks worker -Q compute --loglevel=info --concurrency=2 -n compute@%h
    depends_on:
      redis:
        condition: service_healthy
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/1
    deploy:
      replicas: 1
      resources:
        limits:
          memory: 2G
          cpus: "2.0"
        reservations:
          memory: 1G
          cpus: "1.0"
    restart: unless-stopped

  worker-io:
    build: .
    command: celery -A tasks worker -Q io --loglevel=info --concurrency=16 -n io@%h
    depends_on:
      redis:
        condition: service_healthy
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/1
    deploy:
      replicas: 3
      resources:
        limits:
          memory: 512M
          cpus: "1.0"
    restart: unless-stopped

  flower:
    build: .
    command: celery -A tasks flower --port=5555 --basic-auth=admin:secret123
    depends_on:
      redis:
        condition: service_healthy
    ports:
      - "5555:5555"
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/1
    restart: unless-stopped

  beat:
    build: .
    command: celery -A tasks beat --loglevel=info
    depends_on:
      redis:
        condition: service_healthy
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/1
    restart: unless-stopped

volumes:
  redis_data:
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: celery-worker-default
  labels:
    app: celery-worker
    queue: default
spec:
  replicas: 3
  selector:
    matchLabels:
      app: celery-worker
      queue: default
  template:
    metadata:
      labels:
        app: celery-worker
        queue: default
    spec:
      containers:
        - name: worker
          image: myregistry/celery-app:latest
          command: ["celery", "-A", "tasks", "worker", "-Q", "default", "--loglevel=info", "--concurrency=4"]
          env:
            - name: CELERY_BROKER_URL
              valueFrom:
                secretKeyRef:
                  name: celery-secrets
                  key: broker-url
            - name: CELERY_RESULT_BACKEND
              valueFrom:
                secretKeyRef:
                  name: celery-secrets
                  key: result-backend
          resources:
            requests:
              memory: "256Mi"
              cpu: "500m"
            limits:
              memory: "512Mi"
              cpu: "1000m"
          livenessProbe:
            exec:
              command: ["celery", "-A", "tasks", "inspect", "ping"]
            initialDelaySeconds: 30
            periodSeconds: 60
            timeoutSeconds: 10
          readinessProbe:
            exec:
              command: ["celery", "-A", "tasks", "inspect", "ping"]
            initialDelaySeconds: 10
            periodSeconds: 30
            timeoutSeconds: 10
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: celery-worker-default-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: celery-worker-default
  minReplicas: 2
  maxReplicas: 20
  metrics:
    - type: External
      external:
        metric:
          name: celery_queue_depth
          selector:
            matchLabels:
              queue_name: default
        target:
          type: AverageValue
          averageValue: "100"
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: celery-beat
  labels:
    app: celery-beat
spec:
  replicas: 1
  selector:
    matchLabels:
      app: celery-beat
  template:
    metadata:
      labels:
        app: celery-beat
    spec:
      containers:
        - name: beat
          image: myregistry/celery-app:latest
          command: ["celery", "-A", "tasks", "beat", "--loglevel=info"]
          env:
            - name: CELERY_BROKER_URL
              valueFrom:
                secretKeyRef:
                  name: celery-secrets
                  key: broker-url
            - name: CELERY_RESULT_BACKEND
              valueFrom:
                secretKeyRef:
                  name: celery-secrets
                  key: result-backend
          resources:
            requests:
              memory: "128Mi"
              cpu: "100m"
            limits:
              memory: "256Mi"
              cpu: "250m"
# k8s/flower-service.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: celery-flower
spec:
  replicas: 1
  selector:
    matchLabels:
      app: celery-flower
  template:
    metadata:
      labels:
        app: celery-flower
    spec:
      containers:
        - name: flower
          image: myregistry/celery-app:latest
          command: ["celery", "-A", "tasks", "flower", "--port=5555"]
          ports:
            - containerPort: 5555
          env:
            - name: CELERY_BROKER_URL
              valueFrom:
                secretKeyRef:
                  name: celery-secrets
                  key: broker-url
            - name: CELERY_RESULT_BACKEND
              valueFrom:
                secretKeyRef:
                  name: celery-secrets
                  key: result-backend
---
apiVersion: v1
kind: Service
metadata:
  name: celery-flower
spec:
  selector:
    app: celery-flower
  ports:
    - port: 5555
      targetPort: 5555
  type: ClusterIP

よくある落とし穴:5つの罠

罠1:タスク内でのグローバルデータベース接続の使用

間違った書き方

import psycopg2

conn = psycopg2.connect("postgresql://localhost/mydb")

@app.task
def save_user(user_data):
    cursor = conn.cursor()
    cursor.execute("INSERT INTO users (%s)", (user_data,))
    conn.commit()

正しい書き方

@app.task
def save_user(user_data):
    from django.db import connection
    with connection.cursor() as cursor:
        cursor.execute("INSERT INTO users (data) VALUES (%s)", (user_data,))

罠2:タスクタイムアウトを無視してWorkerが永久にブロック

間違った書き方

@app.task
def fetch_external_data(url):
    import requests
    response = requests.get(url)
    return response.json()

正しい書き方

@app.task(
    time_limit=300,
    soft_time_limit=270,
)
def fetch_external_data(url):
    import requests
    try:
        response = requests.get(url, timeout=60)
        return response.json()
    except requests.Timeout:
        raise RetryException("Request timeout")

罠3:Canvasでシリアライズ不可能なオブジェクトを渡す

間違った書き方

@app.task
def process_file(file_obj):
    return file_obj.read()

workflow = chain(process_file.s(open("data.csv")), store_result.s())

正しい書き方

@app.task
def process_file(file_path):
    with open(file_path, "r") as f:
        return f.read()

workflow = chain(process_file.s("/data/data.csv"), store_result.s())

罠4:Beat定期タスクに分散ロックがなく重複実行される

間違った書き方

# 複数のBeatインスタンスが同時に実行され、タスクが重複実行される
CELERYBEAT_SCHEDULE = {
    "cleanup-every-night": {
        "task": "tasks.cleanup",
        "schedule": crontab(hour=2, minute=0),
    },
}

正しい書き方

from celery.schedules import crontab
from celery_redbeat import RedBeatScheduler

beat_scheduler = RedBeatScheduler
redbeat_redis_url = "redis://localhost:6379/2"

CELERYBEAT_SCHEDULE = {
    "cleanup-every-night": {
        "task": "tasks.cleanup",
        "schedule": crontab(hour=2, minute=0),
        "options": {"queue": "default"},
    },
}

罠5:Workerのforkモードでリソース継承を処理していない

間違った書き方

import redis

redis_client = redis.Redis()

@app.task
def update_cache(key, value):
    redis_client.set(key, value)

正しい書き方

@app.task
def update_cache(key, value):
    import redis
    redis_client = redis.Redis()
    redis_client.set(key, value)

# またはWorkerプロセス初期化を使用
@app.task(bind=True)
def update_cache(self, key, value):
    redis_client = self.app.pool.redis
    redis_client.set(key, value)

エラートラブルシューティング表

エラー現象 可能な原因 診断方法 解決策
Worker起動後にログ出力なし Broker接続失敗 CELERY_BROKER_URLの形式と接続性を確認 Redis/RabbitMQの稼働とURL形式を確認
タスクがPENDINGのまま実行されない 対応キューを消費するWorkerがない celery -A tasks inspect active_queues 対応キューのWorkerを起動するかtask_routes設定を確認
kombu.exceptions.EncodeError タスク引数にシリアライズ不可能なオブジェクトが含まれる delay()/apply_async()の引数を確認 JSONシリアライズ可能な基本型のみ渡す
WorkerLostError WorkerプロセスがOOM Killerに強制終了された dmesg | grep -i oomでシステムログを確認 Workerのメモリ制限を増やすかconcurrencyを減らす
TimeLimitExceeded タスクがtime_limitを超過 タスクロジックに無限ループやブロッキングIOがないか確認 適切なtime_limitとsoft_time_limitを設定
タスク結果がNoneを返す result_backendが未設定 CELERY_RESULT_BACKEND設定を確認 Redis/データベースを結果バックエンドとして設定
Chordのcallbackが実行されない header内のタスクがサイレントに失敗 Flowerでheaderタスクの状態を確認 headerタスクがNoneを返すのではなく例外をraiseすること
ContentDisallowed 受け入れ不可のメッセージシリアライズ形式 accept_contentが送信側と一致しない accept_content = ["json"]を統一設定
Beatタスクがスケジュール通りに実行されない タイムゾーン設定エラー timezoneenable_utc設定を確認 タイムゾーン設定を統一、enable_utc=Trueを推奨
Workerが頻繁に切断・再接続 Broker接続プール枯渇またはネットワークジッター broker_pool_limitと接続タイムアウトを確認 broker_pool_limitを増やし、broker_heartbeatを設定

高度な最適化

1. タスク結果の重複排除と冪等性

プロダクション環境ではタスクが重複配信される可能性があり(ネットワークジッター、Worker再起動)、冪等性の保証が必要。

import hashlib
from celery import Celery
from contextlib import contextmanager

app = Celery("idempotent_app")
app.config_from_object("celery_config")

@contextmanager
def idempotent_lock(task_id, redis_client, ttl=3600):
    lock_key = f"celery:idempotent:{task_id}"
    acquired = redis_client.set(lock_key, "1", nx=True, ex=ttl)
    try:
        yield acquired is not None
    finally:
        pass

@app.task(bind=True)
def process_order_idempotent(self, order_id, amount):
    import redis
    r = redis.Redis(host="localhost", port=6379, db=3)

    task_key = f"order:processed:{order_id}"
    existing = r.get(task_key)
    if existing:
        return {"status": "duplicate", "order_id": order_id, "cached_result": existing.decode()}

    with idempotent_lock(self.request.id, r) as acquired:
        if not acquired:
            return {"status": "duplicate", "order_id": order_id}

        result = _do_process_order(order_id, amount)
        r.setex(task_key, 86400, str(result))
        return result

def _do_process_order(order_id, amount):
    return {"status": "processed", "order_id": order_id, "amount": amount}

2. 動的タスク優先度とレート制限

高優先度タスクを優先実行し、レート制限で下流サービスの過負荷を防止。

from celery import Celery

app = Celery("priority_app")
app.config_from_object("celery_config")

app.conf.broker_transport_options = {
    "priority_steps": list(range(10)),
    "sep": ":",
    "queue_order_strategy": "priority",
}

@app.task(rate_limit="10/s")
def call_external_api(endpoint, payload):
    import time
    time.sleep(0.1)
    return {"endpoint": endpoint, "status": "ok"}

# 高優先度タスク
def submit_high_priority():
    call_external_api.apply_async(
        args=["/api/v1/critical", {"urgent": True}],
        priority=0,
        queue="io",
    )

# 低優先度タスク
def submit_low_priority():
    call_external_api.apply_async(
        args=["/api/v1/batch", {"batch": True}],
        priority=9,
        queue="io",
    )

# 動的レート制限
from celery import current_app

def adjust_rate_limit(task_name, new_rate):
    current_app.control.rate_limit(task_name, new_rate)

3. CeleryとFastAPIの統合

CeleryをFastAPIアプリケーションに統合し、APIから非同期タスクをトリガーして状態を照会。

# fastapi_celery.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from celery.result import AsyncResult

app = FastAPI(title="Celery API Gateway")

class TaskSubmit(BaseModel):
    task_name: str
    args: list = []
    kwargs: dict = {}

class TaskResponse(BaseModel):
    task_id: str
    status: str

@app.post("/tasks/submit", response_model=TaskResponse)
def submit_task(submit: TaskSubmit):
    from tasks import app as celery_app
    try:
        result = celery_app.send_task(
            submit.task_name,
            args=submit.args,
            kwargs=submit.kwargs,
        )
        return TaskResponse(task_id=result.id, status="PENDING")
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.get("/tasks/{task_id}", response_model=dict)
def get_task_status(task_id: str):
    result = AsyncResult(task_id)
    response = {
        "task_id": task_id,
        "status": result.status,
        "result": result.result if result.ready() else None,
    }
    if result.failed():
        response["error"] = str(result.result)
    return response

@app.post("/tasks/{task_id}/revoke")
def revoke_task(task_id: str):
    from tasks import app as celery_app
    celery_app.control.revoke(task_id, terminate=True)
    return {"task_id": task_id, "status": "REVOKED"}

フレームワーク比較

特徴 Celery Dramatiq Huey RQ Temporal
言語 Python Python Python Python Go/マルチ言語
Broker Redis/RabbitMQ/SQS等 Redis/RabbitMQ Redis/SQLite Redis ネイティブ
ワークフローオーケストレーション Canvas(chain/chord/group) Pipeline 非サポート 非サポート ネイティブDAG
リトライ戦略 指数バックオフ+Jitter+カスタム 指数バックオフ+Jitter シンプルリトライ シンプルリトライ 完全なリトライ戦略
監視 Flower 内蔵Dashboard シンプル RQ Dashboard Web UI
定期タスク Beat 内蔵 内蔵 非サポート ネイティブSchedule
コミュニティエコシステム 最も成熟 中程度 ニッチ ニッチ 急成長中
学習曲線 中程度 低い 低い 低い 高い
マルチ言語サポート なし なし なし なし あり
プロダクション成熟度
ユースケース 汎用分散タスク 中小規模プロジェクト 軽量タスク Redisエコシステムのシンプルタスク クロス言語複雑ワークフロー

まとめ

Python Celeryはバージョン5.4以降、分散タスクキュ領域で最も成熟し柔軟なソリューションとなっている。3つのコア原則を忘れずに:タスクの冪等性はベースライン——重複配信でも安全でなければならない;キュー分離は標準——CPU/IO/デフォルトキューは分離必須;監視とアラートはオプションではない——Flower+Prometheusはプロダクション環境の目。単一のcelery workerからK8s HPAオートスケーリングまで、Celeryのアーキテクチャはビジネスと共に線形にスケールし、タスクコードの書き直しは不要だ。


おすすめツール

ブラウザローカルツールを無料で試す →

#Python#Celery#分布式任务#Redis#RabbitMQ#2026#异步任务