Python Celery分散タスクキュ実践:基本タスクからイベント駆動パイプラインまで6つのプロダクションパターン
バックグラウンドタスクがAPIをブロック、Cronがスケールせず、タスク失敗時にリトライなし
ユーザーがアバターをアップロードして5種類のサムネイルを生成する必要があるが、APIが同期的に処理してタイムアウト。毎日午前2時のデータクリーンアップCronは、データ量が倍増すると完了しない。サードパーティ決済のコールバックが時々失敗し、リトライ機構がないため手動対応しかない。2026年、Python Celeryは依然として分散タスクキュの王者——バージョン5.4はネイティブasyncioサポートとより安定したCanvasワークフローをもたらし、単一マシン開発からK8sクラスタデプロイまで、ひとつのコードベースで対応できる。
本記事では6つのプロダクションパターンから、基本タスク定義→ルーティングキュー分離→指数バックオフリトライ→Canvasワークフロー→Flower+Prometheus監視→Docker/K8sプロダクションデプロイまでのフルスタック実践を解説する。各ステップには完全に実行可能なPythonコードを含む。
Celeryコア概念
| 概念 | 説明 |
|---|---|
| Celery | Python分散タスクキュフレームワーク、複数のBrokerと結果バックエンドをサポート |
| Broker | メッセージミドルウェア、プロデューサー(クライアント)からタスクメッセージを受信しコンシューマー(Worker)に配信、Redis/RabbitMQが一般的 |
| Worker | コンシューマープロセス、Brokerからタスクを取得して実行、マルチプロセス/コルーチン/スレッド並行モードをサポート |
| Task | Celery管理下の非同期関数、@app.taskデコレータで登録、呼び出し時にAsyncResultを返す |
| Canvas | Celeryのワークフロープリミティブ、chain(チェーン)、group(並列)、chord(並列+集約)などの組み合わせパターンをサポート |
| Chain | チェーンワークフロー、前のタスクの出力が次のタスクの入力となる、直列実行 |
| Chord | 並列+集約ワークフロー、group内の全タスク完了後にcallbackタスクが結果を集約 |
| Group | 並列ワークフロー、タスク群が同時に実行され、互いに依存しない |
| Flower | Celeryのリアルタイム監視Webパネル、タスク状態、Worker状態、キュー深度などの可視化を提供 |
| Result Backend | タスク結果保存バックエンド、タスクの戻り値と状態を保存、Redis/データベースが一般的 |
| Beat | Celeryの定期タスクスケジューラ、Cronに似ているが動的設定と分散ロックをサポート |
| Signature | タスクシグネチャ、タスク呼び出しパラメータをシリアライズして受け渡し可能なオブジェクトにする、Canvas構成に使用 |
問題分析:分散タスクキュの5つの課題
- タスク定義と呼び出しの結合:ビジネスコードが時間のかかる操作を直接呼び出し、同期待ちでAPI応答が遅くなる。時間のかかる操作を非同期タスクとして抽象化し、呼び出し側を分離する必要がある
- タスクルーティングとリソース分離:CPU集約型タスクとIO集約型タスクが同じキューに混在し、IOタスクがCPUタスクにブロックされる。キュー分離とルーティング戦略が必要
- タスク失敗とリトライストーム:サードパーティサービスが利用不可の際、タスクが繰り返し失敗・リトライし、バックオフ戦略がないと雪崩現象が発生。指数バックオフ+最大リトライ回数+デッドレター処理が必要
- 複雑なワークフローオーケストレーション:マルチステップタスクは直列/並列の組み合わせが必要で、手動コールバックチェーンは保守困難。Canvasプリミティブによる宣言的オーケストレーションが必要
- プロダクション環境のオブザーバビリティ:タスク蓄積、Workerクラッシュ、キュー閉塞を即座に発見できない。リアルタイム監視+アラート+オートスケーリングが必要
ステップバイステップ:6つのCelery分散タスクパターン
パターン1:基本タスク定義と実行
最もシンプルなCeleryタスク:非同期関数の定義、Workerの起動、呼び出しと結果取得。
# celery_config.py
broker_url = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/1"
task_serializer = "json"
result_serializer = "json"
accept_content = ["json"]
timezone = "Asia/Tokyo"
enable_utc = True
# tasks.py
from celery import Celery
app = Celery("myapp")
app.config_from_object("celery_config")
@app.task
def add(x, y):
return x + y
@app.task
def send_email(to, subject, body):
import time
time.sleep(2)
return {"status": "sent", "to": to, "subject": subject}
@app.task
def generate_thumbnail(image_path, sizes):
import time
time.sleep(3)
thumbnails = []
for size in sizes:
thumbnails.append(f"{image_path}_{size}x{size}.jpg")
return {"image": image_path, "thumbnails": thumbnails}
if __name__ == "__main__":
app.start()
# client.py - タスクの呼び出し
from tasks import add, send_email, generate_thumbnail
result = add.delay(4, 6)
print(f"Task ID: {result.id}")
print(f"Ready: {result.ready()}")
print(f"Result: {result.get(timeout=10)}")
email_result = send_email.delay("user@example.com", "Welcome", "Hello!")
print(f"Email: {email_result.get(timeout=10)}")
thumb_result = generate_thumbnail.delay("/uploads/avatar.png", [64, 128, 256])
print(f"Thumbnails: {thumb_result.get(timeout=10)}")
# Workerの起動
celery -A tasks worker --loglevel=info --concurrency=4
パターン2:タスクルーティングとキュー分離
異なるタイプのタスクを異なるキューに振り分け、キューごとにWorkerのリソースを分離。
# celery_config_routing.py
broker_url = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/1"
task_serializer = "json"
result_serializer = "json"
task_routes = {
"tasks.compute.*": {"queue": "compute"},
"tasks.io.*": {"queue": "io"},
"tasks.default.*": {"queue": "default"},
}
task_default_queue = "default"
task_queues = {
"default": {
"exchange": "default",
"routing_key": "default",
},
"compute": {
"exchange": "compute",
"routing_key": "compute",
},
"io": {
"exchange": "io",
"routing_key": "io",
},
}
# tasks_routing.py
from celery import Celery
app = Celery("routed_app")
app.config_from_object("celery_config_routing")
@app.task(name="tasks.compute.ml_inference")
def ml_inference(model_id, input_data):
import time
time.sleep(5)
return {"model": model_id, "prediction": 0.95, "latency_ms": 5000}
@app.task(name="tasks.compute.data_pipeline")
def data_pipeline(source, target):
import time
time.sleep(8)
return {"source": source, "target": target, "rows": 100000}
@app.task(name="tasks.io.send_notification")
def send_notification(user_id, message):
import time
time.sleep(1)
return {"user_id": user_id, "status": "delivered"}
@app.task(name="tasks.io.sync_external_api")
def sync_external_api(endpoint, payload):
import time
time.sleep(2)
return {"endpoint": endpoint, "status_code": 200}
@app.task(name="tasks.default.cleanup")
def cleanup(days_old):
return {"deleted_records": days_old * 100}
# 異なるキューのWorkerを起動
celery -A tasks_routing worker -Q compute --loglevel=info --concurrency=2 -n compute@%h
celery -A tasks_routing worker -Q io --loglevel=info --concurrency=8 -n io@%h
celery -A tasks_routing worker -Q default --loglevel=info --concurrency=4 -n default@%h
# client_routing.py
from tasks_routing import ml_inference, send_notification, cleanup
ml_inference.delay("resnet50", {"image": "base64..."})
send_notification.delay("USR-001", "Your order has shipped")
cleanup.delay(30)
# 動的にキューを指定
from tasks_routing import sync_external_api
sync_external_api.apply_async(
args=["/api/v1/sync", {"batch": 100}],
queue="io",
priority=5,
)
パターン3:指数バックオフリトライ戦略
タスク失敗時に自動リトライ、指数バックオフで雪崩を防止、最大リトライ回数でセーフティネット。
# tasks_retry.py
from celery import Celery
from celery.utils.log import get_task_logger
import random
import time
logger = get_task_logger(__name__)
app = Celery("retry_app")
app.config_from_object("celery_config")
class ExternalAPIError(Exception):
pass
class RateLimitError(Exception):
pass
@app.task(
bind=True,
max_retries=5,
default_retry_delay=30,
autoretry_for=(ExternalAPIError,),
retry_backoff=True,
retry_backoff_max=600,
retry_jitter=True,
)
def call_payment_api(self, order_id, amount):
try:
success = random.random() > 0.7
if not success:
raise ExternalAPIError(f"Payment API timeout for order {order_id}")
return {"order_id": order_id, "amount": amount, "status": "paid"}
except ExternalAPIError as exc:
logger.warning(f"Payment failed for {order_id}, retry {self.request.retries + 1}/{self.max_retries}")
raise self.retry(exc=exc)
@app.task(
bind=True,
max_retries=3,
autoretry_for=(RateLimitError,),
retry_backoff=2,
retry_backoff_max=120,
retry_jitter=True,
)
def call_rate_limited_api(self, endpoint):
try:
if random.random() > 0.5:
raise RateLimitError(f"Rate limited on {endpoint}")
return {"endpoint": endpoint, "data": "success"}
except RateLimitError as exc:
countdown = 2 ** self.request.retries + random.uniform(0, 1)
logger.info(f"Rate limited, retrying in {countdown:.1f}s (attempt {self.request.retries + 1})")
raise self.retry(exc=exc, countdown=countdown)
@app.task(bind=True, max_retries=4)
def process_with_custom_retry(self, data):
try:
result = _do_risky_operation(data)
return result
except Exception as exc:
if self.request.retries >= self.max_retries:
logger.error(f"Max retries exceeded for {data}, sending to DLQ")
send_to_dlq.delay(str(data), str(exc), self.request.retries)
return {"status": "failed", "data": data, "error": str(exc)}
countdown = min(60, 5 * (2 ** self.request.retries))
logger.warning(f"Processing failed, retry in {countdown}s")
raise self.retry(exc=exc, countdown=countdown)
def _do_risky_operation(data):
if random.random() > 0.4:
raise ValueError(f"Random failure processing {data}")
return {"processed": data, "result": "ok"}
@app.task
def send_to_dlq(task_data, error, retry_count):
logger.error(f"DLQ: task={task_data}, error={error}, retries={retry_count}")
return {"dlq": True, "task": task_data, "error": error}
# client_retry.py
from tasks_retry import call_payment_api, call_rate_limited_api, process_with_custom_retry
result = call_payment_api.delay("ORD-001", 99.99)
print(f"Payment task: {result.id}")
result2 = call_rate_limited_api.delay("/api/v2/users")
print(f"Rate limited task: {result2.id}")
result3 = process_with_custom_retry.delay({"key": "value"})
print(f"Custom retry task: {result3.id}")
パターン4:Canvasワークフロー(Chain、Chord、Group)
複雑なワークフローの宣言的オーケストレーション:直列チェーン、並列グループ、並列+集約。
# tasks_canvas.py
from celery import Celery, chain, group, chord
from celery import signature
app = Celery("canvas_app")
app.config_from_object("celery_config")
@app.task
def download_url(url):
import time
time.sleep(1)
return {"url": url, "content": f"content_of_{url.split('/')[-1]}"}
@app.task
def parse_content(download_result):
content = download_result["content"]
return {"parsed": True, "items": len(content), "source": download_result["url"]}
@app.task
def store_results(parse_result):
return {"stored": True, "items": parse_result["items"], "source": parse_result["source"]}
@app.task
def aggregate_results(results):
total_items = sum(r["items"] for r in results)
return {"total_items": total_items, "sources": len(results), "aggregated": True}
@app.task
def send_report(aggregate_result):
return {
"report_sent": True,
"total_items": aggregate_result["total_items"],
"sources": aggregate_result["sources"],
}
# Chain: 直列ワークフロー
def run_chain():
workflow = chain(
download_url.s("https://api.example.com/data"),
parse_content.s(),
store_results.s(),
)
result = workflow.apply_async()
return result.id
# Group: 並列ワークフロー
def run_group():
urls = [
"https://api.example.com/users",
"https://api.example.com/orders",
"https://api.example.com/products",
]
workflow = group(download_url.s(url) for url in urls)
result = workflow.apply_async()
return result.id
# Chord: 並列実行+集約
def run_chord():
urls = [
"https://api.example.com/users",
"https://api.example.com/orders",
"https://api.example.com/products",
]
workflow = chord(
[chain(download_url.s(url), parse_content.s()) for url in urls],
aggregate_results.s(),
)
result = workflow.apply_async()
return result.id
# 複雑なネストワークフロー
def run_complex_workflow():
header = group(
chain(download_url.s(f"https://api.example.com/batch/{i}"), parse_content.s())
for i in range(1, 4)
)
workflow = chain(
header,
aggregate_results.s(),
send_report.s(),
)
result = workflow.apply_async()
return result.id
# client_canvas.py
from tasks_canvas import run_chain, run_group, run_chord, run_complex_workflow
from tasks_canvas import download_url, parse_content, store_results, aggregate_results
from celery import chain, group, chord
# 方法1:ラッパー関数を使用
chain_id = run_chain()
print(f"Chain task: {chain_id}")
group_id = run_group()
print(f"Group task: {group_id}")
chord_id = run_chord()
print(f"Chord task: {chord_id}")
# 方法2:Canvasプリミティブを直接使用
from tasks_canvas import app
result = chain(
download_url.s("https://api.example.com/report"),
parse_content.s(),
store_results.s(),
).apply_async()
print(f"Direct chain: {result.id}")
# シグネチャを使用した遅延構築
sig = download_url.s("https://api.example.com/dynamic")
result = chain(sig, parse_content.s(), store_results.s()).apply_async()
print(f"Signature chain: {result.id}")
パターン5:Flower監視とPrometheus統合
タスク状態とWorker健全性のリアルタイム監視、Prometheusアラートとの統合。
# celery_config_monitoring.py
broker_url = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/1"
task_serializer = "json"
result_serializer = "json"
task_send_sent_event = True
task_track_started = True
worker_send_task_events = True
task_routes = {
"monitoring_tasks.*": {"queue": "monitoring"},
}
flower_port = 5555
flower_basic_auth = ["admin:secret123"]
flower_persistent = True
flower_db = "flower_db"
flower_max_tasks = 10000
# monitoring_tasks.py
from celery import Celery
from celery.utils.log import get_task_logger
import time
import random
logger = get_task_logger(__name__)
app = Celery("monitoring_app")
app.config_from_object("celery_config_monitoring")
@app.task(name="monitoring_tasks.process_order", bind=True, track_started=True)
def process_order(self, order_id):
self.update_state(state="PROCESSING", meta={"step": "validating", "order_id": order_id})
time.sleep(1)
self.update_state(state="PROCESSING", meta={"step": "charging", "order_id": order_id})
time.sleep(2)
self.update_state(state="PROCESSING", meta={"step": "fulfilling", "order_id": order_id})
time.sleep(1)
return {"order_id": order_id, "status": "completed", "amount": random.randint(100, 9999)}
@app.task(name="monitoring_tasks.generate_report", bind=True, track_started=True, time_limit=300)
def generate_report(self, report_type, date_range):
self.update_state(state="GENERATING", meta={"progress": 0, "type": report_type})
for i in range(1, 6):
time.sleep(1)
self.update_state(state="GENERATING", meta={"progress": i * 20, "type": report_type})
return {"report_type": report_type, "date_range": date_range, "rows": random.randint(1000, 50000)}
# prometheus_exporter.py - カスタムPrometheusメトリクス
from prometheus_client import Counter, Histogram, Gauge, start_http_server
TASK_STARTED = Counter(
"celery_task_started_total",
"Total number of Celery tasks started",
["task_name", "queue"],
)
TASK_COMPLETED = Counter(
"celery_task_completed_total",
"Total number of Celery tasks completed",
["task_name", "queue", "status"],
)
TASK_DURATION = Histogram(
"celery_task_duration_seconds",
"Celery task duration in seconds",
["task_name", "queue"],
buckets=[0.1, 0.5, 1, 2, 5, 10, 30, 60, 120, 300],
)
QUEUE_DEPTH = Gauge(
"celery_queue_depth",
"Number of tasks waiting in queue",
["queue_name"],
)
ACTIVE_WORKERS = Gauge(
"celery_active_workers",
"Number of active Celery workers",
)
def start_metrics_server(port=9090):
start_http_server(port)
print(f"Prometheus metrics server started on :{port}")
# celery_events_monitor.py - イベントリスナー
from celery import Celery
from prometheus_exporter import TASK_STARTED, TASK_COMPLETED, TASK_DURATION
import time
app = Celery("monitoring_app")
app.config_from_object("celery_config_monitoring")
def monitor_events():
with app.connection() as connection:
recv = app.events.Receiver(
connection,
handlers={
"task-sent": on_task_sent,
"task-started": on_task_started,
"task-succeeded": on_task_succeeded,
"task-failed": on_task_failed,
"task-retried": on_task_retried,
"worker-heartbeat": on_worker_heartbeat,
},
)
recv.capture(limit=None, timeout=None, wakeup=True)
def on_task_sent(event):
pass
def on_task_started(event):
task_name = event.get("name", "unknown")
queue = event.get("queue", "unknown")
TASK_STARTED.labels(task_name=task_name, queue=queue).inc()
def on_task_succeeded(event):
task_name = event.get("name", "unknown")
queue = event.get("queue", "unknown")
runtime = event.get("runtime", 0)
TASK_COMPLETED.labels(task_name=task_name, queue=queue, status="success").inc()
TASK_DURATION.labels(task_name=task_name, queue=queue).observe(runtime)
def on_task_failed(event):
task_name = event.get("name", "unknown")
queue = event.get("queue", "unknown")
TASK_COMPLETED.labels(task_name=task_name, queue=queue, status="failed").inc()
def on_task_retried(event):
task_name = event.get("name", "unknown")
TASK_COMPLETED.labels(task_name=task_name, queue="unknown", status="retried").inc()
def on_worker_heartbeat(event):
pass
if __name__ == "__main__":
from prometheus_exporter import start_metrics_server
start_metrics_server(9090)
monitor_events()
# Flower監視の起動
celery -A monitoring_tasks flower --port=5555 --basic-auth=admin:secret123
# イベントリスナー+Prometheusの起動
python celery_events_monitor.py
パターン6:プロダクションCeleryサービス(Docker + Kubernetes)
Dockerマルチステージビルド、K8s Deployment、HPAオートスケーリング。
# Dockerfile
FROM python:3.12-slim AS builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --prefix=/install -r requirements.txt
FROM python:3.12-slim
WORKDIR /app
COPY --from=builder /install /usr/local
COPY . .
ENV PYTHONUNBUFFERED=1
ENV CELERY_BROKER_URL=redis://redis:6379/0
ENV CELERY_RESULT_BACKEND=redis://redis:6379/1
CMD ["celery", "-A", "tasks", "worker", "--loglevel=info"]
# docker-compose.yml
version: "3.8"
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
command: redis-server --appendonly yes --maxmemory 512mb --maxmemory-policy allkeys-lru
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5
worker-default:
build: .
command: celery -A tasks worker -Q default --loglevel=info --concurrency=4 -n default@%h
depends_on:
redis:
condition: service_healthy
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
deploy:
replicas: 2
resources:
limits:
memory: 512M
cpus: "1.0"
reservations:
memory: 256M
cpus: "0.5"
restart: unless-stopped
worker-compute:
build: .
command: celery -A tasks worker -Q compute --loglevel=info --concurrency=2 -n compute@%h
depends_on:
redis:
condition: service_healthy
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
deploy:
replicas: 1
resources:
limits:
memory: 2G
cpus: "2.0"
reservations:
memory: 1G
cpus: "1.0"
restart: unless-stopped
worker-io:
build: .
command: celery -A tasks worker -Q io --loglevel=info --concurrency=16 -n io@%h
depends_on:
redis:
condition: service_healthy
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
deploy:
replicas: 3
resources:
limits:
memory: 512M
cpus: "1.0"
restart: unless-stopped
flower:
build: .
command: celery -A tasks flower --port=5555 --basic-auth=admin:secret123
depends_on:
redis:
condition: service_healthy
ports:
- "5555:5555"
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
restart: unless-stopped
beat:
build: .
command: celery -A tasks beat --loglevel=info
depends_on:
redis:
condition: service_healthy
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
restart: unless-stopped
volumes:
redis_data:
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: celery-worker-default
labels:
app: celery-worker
queue: default
spec:
replicas: 3
selector:
matchLabels:
app: celery-worker
queue: default
template:
metadata:
labels:
app: celery-worker
queue: default
spec:
containers:
- name: worker
image: myregistry/celery-app:latest
command: ["celery", "-A", "tasks", "worker", "-Q", "default", "--loglevel=info", "--concurrency=4"]
env:
- name: CELERY_BROKER_URL
valueFrom:
secretKeyRef:
name: celery-secrets
key: broker-url
- name: CELERY_RESULT_BACKEND
valueFrom:
secretKeyRef:
name: celery-secrets
key: result-backend
resources:
requests:
memory: "256Mi"
cpu: "500m"
limits:
memory: "512Mi"
cpu: "1000m"
livenessProbe:
exec:
command: ["celery", "-A", "tasks", "inspect", "ping"]
initialDelaySeconds: 30
periodSeconds: 60
timeoutSeconds: 10
readinessProbe:
exec:
command: ["celery", "-A", "tasks", "inspect", "ping"]
initialDelaySeconds: 10
periodSeconds: 30
timeoutSeconds: 10
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: celery-worker-default-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: celery-worker-default
minReplicas: 2
maxReplicas: 20
metrics:
- type: External
external:
metric:
name: celery_queue_depth
selector:
matchLabels:
queue_name: default
target:
type: AverageValue
averageValue: "100"
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: celery-beat
labels:
app: celery-beat
spec:
replicas: 1
selector:
matchLabels:
app: celery-beat
template:
metadata:
labels:
app: celery-beat
spec:
containers:
- name: beat
image: myregistry/celery-app:latest
command: ["celery", "-A", "tasks", "beat", "--loglevel=info"]
env:
- name: CELERY_BROKER_URL
valueFrom:
secretKeyRef:
name: celery-secrets
key: broker-url
- name: CELERY_RESULT_BACKEND
valueFrom:
secretKeyRef:
name: celery-secrets
key: result-backend
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "256Mi"
cpu: "250m"
# k8s/flower-service.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: celery-flower
spec:
replicas: 1
selector:
matchLabels:
app: celery-flower
template:
metadata:
labels:
app: celery-flower
spec:
containers:
- name: flower
image: myregistry/celery-app:latest
command: ["celery", "-A", "tasks", "flower", "--port=5555"]
ports:
- containerPort: 5555
env:
- name: CELERY_BROKER_URL
valueFrom:
secretKeyRef:
name: celery-secrets
key: broker-url
- name: CELERY_RESULT_BACKEND
valueFrom:
secretKeyRef:
name: celery-secrets
key: result-backend
---
apiVersion: v1
kind: Service
metadata:
name: celery-flower
spec:
selector:
app: celery-flower
ports:
- port: 5555
targetPort: 5555
type: ClusterIP
よくある落とし穴:5つの罠
罠1:タスク内でのグローバルデータベース接続の使用
❌ 間違った書き方:
import psycopg2
conn = psycopg2.connect("postgresql://localhost/mydb")
@app.task
def save_user(user_data):
cursor = conn.cursor()
cursor.execute("INSERT INTO users (%s)", (user_data,))
conn.commit()
✅ 正しい書き方:
@app.task
def save_user(user_data):
from django.db import connection
with connection.cursor() as cursor:
cursor.execute("INSERT INTO users (data) VALUES (%s)", (user_data,))
罠2:タスクタイムアウトを無視してWorkerが永久にブロック
❌ 間違った書き方:
@app.task
def fetch_external_data(url):
import requests
response = requests.get(url)
return response.json()
✅ 正しい書き方:
@app.task(
time_limit=300,
soft_time_limit=270,
)
def fetch_external_data(url):
import requests
try:
response = requests.get(url, timeout=60)
return response.json()
except requests.Timeout:
raise RetryException("Request timeout")
罠3:Canvasでシリアライズ不可能なオブジェクトを渡す
❌ 間違った書き方:
@app.task
def process_file(file_obj):
return file_obj.read()
workflow = chain(process_file.s(open("data.csv")), store_result.s())
✅ 正しい書き方:
@app.task
def process_file(file_path):
with open(file_path, "r") as f:
return f.read()
workflow = chain(process_file.s("/data/data.csv"), store_result.s())
罠4:Beat定期タスクに分散ロックがなく重複実行される
❌ 間違った書き方:
# 複数のBeatインスタンスが同時に実行され、タスクが重複実行される
CELERYBEAT_SCHEDULE = {
"cleanup-every-night": {
"task": "tasks.cleanup",
"schedule": crontab(hour=2, minute=0),
},
}
✅ 正しい書き方:
from celery.schedules import crontab
from celery_redbeat import RedBeatScheduler
beat_scheduler = RedBeatScheduler
redbeat_redis_url = "redis://localhost:6379/2"
CELERYBEAT_SCHEDULE = {
"cleanup-every-night": {
"task": "tasks.cleanup",
"schedule": crontab(hour=2, minute=0),
"options": {"queue": "default"},
},
}
罠5:Workerのforkモードでリソース継承を処理していない
❌ 間違った書き方:
import redis
redis_client = redis.Redis()
@app.task
def update_cache(key, value):
redis_client.set(key, value)
✅ 正しい書き方:
@app.task
def update_cache(key, value):
import redis
redis_client = redis.Redis()
redis_client.set(key, value)
# またはWorkerプロセス初期化を使用
@app.task(bind=True)
def update_cache(self, key, value):
redis_client = self.app.pool.redis
redis_client.set(key, value)
エラートラブルシューティング表
| エラー現象 | 可能な原因 | 診断方法 | 解決策 |
|---|---|---|---|
| Worker起動後にログ出力なし | Broker接続失敗 | CELERY_BROKER_URLの形式と接続性を確認 |
Redis/RabbitMQの稼働とURL形式を確認 |
| タスクがPENDINGのまま実行されない | 対応キューを消費するWorkerがない | celery -A tasks inspect active_queues |
対応キューのWorkerを起動するかtask_routes設定を確認 |
kombu.exceptions.EncodeError |
タスク引数にシリアライズ不可能なオブジェクトが含まれる | delay()/apply_async()の引数を確認 |
JSONシリアライズ可能な基本型のみ渡す |
WorkerLostError |
WorkerプロセスがOOM Killerに強制終了された | dmesg | grep -i oomでシステムログを確認 |
Workerのメモリ制限を増やすかconcurrencyを減らす |
TimeLimitExceeded |
タスクがtime_limitを超過 | タスクロジックに無限ループやブロッキングIOがないか確認 | 適切なtime_limitとsoft_time_limitを設定 |
タスク結果がNoneを返す |
result_backendが未設定 | CELERY_RESULT_BACKEND設定を確認 |
Redis/データベースを結果バックエンドとして設定 |
| Chordのcallbackが実行されない | header内のタスクがサイレントに失敗 | Flowerでheaderタスクの状態を確認 | headerタスクがNoneを返すのではなく例外をraiseすること |
ContentDisallowed |
受け入れ不可のメッセージシリアライズ形式 | accept_contentが送信側と一致しない | accept_content = ["json"]を統一設定 |
| Beatタスクがスケジュール通りに実行されない | タイムゾーン設定エラー | timezoneとenable_utc設定を確認 |
タイムゾーン設定を統一、enable_utc=Trueを推奨 |
| Workerが頻繁に切断・再接続 | Broker接続プール枯渇またはネットワークジッター | broker_pool_limitと接続タイムアウトを確認 |
broker_pool_limitを増やし、broker_heartbeatを設定 |
高度な最適化
1. タスク結果の重複排除と冪等性
プロダクション環境ではタスクが重複配信される可能性があり(ネットワークジッター、Worker再起動)、冪等性の保証が必要。
import hashlib
from celery import Celery
from contextlib import contextmanager
app = Celery("idempotent_app")
app.config_from_object("celery_config")
@contextmanager
def idempotent_lock(task_id, redis_client, ttl=3600):
lock_key = f"celery:idempotent:{task_id}"
acquired = redis_client.set(lock_key, "1", nx=True, ex=ttl)
try:
yield acquired is not None
finally:
pass
@app.task(bind=True)
def process_order_idempotent(self, order_id, amount):
import redis
r = redis.Redis(host="localhost", port=6379, db=3)
task_key = f"order:processed:{order_id}"
existing = r.get(task_key)
if existing:
return {"status": "duplicate", "order_id": order_id, "cached_result": existing.decode()}
with idempotent_lock(self.request.id, r) as acquired:
if not acquired:
return {"status": "duplicate", "order_id": order_id}
result = _do_process_order(order_id, amount)
r.setex(task_key, 86400, str(result))
return result
def _do_process_order(order_id, amount):
return {"status": "processed", "order_id": order_id, "amount": amount}
2. 動的タスク優先度とレート制限
高優先度タスクを優先実行し、レート制限で下流サービスの過負荷を防止。
from celery import Celery
app = Celery("priority_app")
app.config_from_object("celery_config")
app.conf.broker_transport_options = {
"priority_steps": list(range(10)),
"sep": ":",
"queue_order_strategy": "priority",
}
@app.task(rate_limit="10/s")
def call_external_api(endpoint, payload):
import time
time.sleep(0.1)
return {"endpoint": endpoint, "status": "ok"}
# 高優先度タスク
def submit_high_priority():
call_external_api.apply_async(
args=["/api/v1/critical", {"urgent": True}],
priority=0,
queue="io",
)
# 低優先度タスク
def submit_low_priority():
call_external_api.apply_async(
args=["/api/v1/batch", {"batch": True}],
priority=9,
queue="io",
)
# 動的レート制限
from celery import current_app
def adjust_rate_limit(task_name, new_rate):
current_app.control.rate_limit(task_name, new_rate)
3. CeleryとFastAPIの統合
CeleryをFastAPIアプリケーションに統合し、APIから非同期タスクをトリガーして状態を照会。
# fastapi_celery.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from celery.result import AsyncResult
app = FastAPI(title="Celery API Gateway")
class TaskSubmit(BaseModel):
task_name: str
args: list = []
kwargs: dict = {}
class TaskResponse(BaseModel):
task_id: str
status: str
@app.post("/tasks/submit", response_model=TaskResponse)
def submit_task(submit: TaskSubmit):
from tasks import app as celery_app
try:
result = celery_app.send_task(
submit.task_name,
args=submit.args,
kwargs=submit.kwargs,
)
return TaskResponse(task_id=result.id, status="PENDING")
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/tasks/{task_id}", response_model=dict)
def get_task_status(task_id: str):
result = AsyncResult(task_id)
response = {
"task_id": task_id,
"status": result.status,
"result": result.result if result.ready() else None,
}
if result.failed():
response["error"] = str(result.result)
return response
@app.post("/tasks/{task_id}/revoke")
def revoke_task(task_id: str):
from tasks import app as celery_app
celery_app.control.revoke(task_id, terminate=True)
return {"task_id": task_id, "status": "REVOKED"}
フレームワーク比較
| 特徴 | Celery | Dramatiq | Huey | RQ | Temporal |
|---|---|---|---|---|---|
| 言語 | Python | Python | Python | Python | Go/マルチ言語 |
| Broker | Redis/RabbitMQ/SQS等 | Redis/RabbitMQ | Redis/SQLite | Redis | ネイティブ |
| ワークフローオーケストレーション | Canvas(chain/chord/group) | Pipeline | 非サポート | 非サポート | ネイティブDAG |
| リトライ戦略 | 指数バックオフ+Jitter+カスタム | 指数バックオフ+Jitter | シンプルリトライ | シンプルリトライ | 完全なリトライ戦略 |
| 監視 | Flower | 内蔵Dashboard | シンプル | RQ Dashboard | Web UI |
| 定期タスク | Beat | 内蔵 | 内蔵 | 非サポート | ネイティブSchedule |
| コミュニティエコシステム | 最も成熟 | 中程度 | ニッチ | ニッチ | 急成長中 |
| 学習曲線 | 中程度 | 低い | 低い | 低い | 高い |
| マルチ言語サポート | なし | なし | なし | なし | あり |
| プロダクション成熟度 | 高 | 中 | 低 | 中 | 高 |
| ユースケース | 汎用分散タスク | 中小規模プロジェクト | 軽量タスク | Redisエコシステムのシンプルタスク | クロス言語複雑ワークフロー |
まとめ
Python Celeryはバージョン5.4以降、分散タスクキュ領域で最も成熟し柔軟なソリューションとなっている。3つのコア原則を忘れずに:タスクの冪等性はベースライン——重複配信でも安全でなければならない;キュー分離は標準——CPU/IO/デフォルトキューは分離必須;監視とアラートはオプションではない——Flower+Prometheusはプロダクション環境の目。単一の
celery workerからK8s HPAオートスケーリングまで、Celeryのアーキテクチャはビジネスと共に線形にスケールし、タスクコードの書き直しは不要だ。
おすすめツール
- JSONフォーマッター — Celeryタスクのパラメータと結果をデバッグ時にJSONをフォーマット
- Base64エンコード — タスク内のバイナリデータのエンコード転送を処理
- ハッシュ計算 — タスク重複排除の冪等性キーを生成
ブラウザローカルツールを無料で試す →