Python Celery分散式任務佇列實戰:從基礎任務到事件驅動管道的6種生產模式

编程语言

後台任務阻塞API、Cron無法擴展、任務失敗無人重試

使用者上傳頭像要產生5種尺寸縮圖,API同步處理直接超時;每天凌晨2點的資料清洗Cron,資料量翻倍後跑不完;第三方支付回撥偶發失敗,沒有重試機制只能人工補單。2026年,Python Celery依然是分散式任務佇列的王者——5.4版本帶來原生asyncio支援和更穩定的Canvas工作流,從單機開發到K8s叢集部署,一套程式碼全搞定。

本文將從6種生產模式出發,帶你完成基礎任務定義→路由佇列隔離→指數退避重試→Canvas工作流→Flower+Prometheus監控→Docker/K8s生產部署的全鏈路實戰,每一步都有完整可執行的Python程式碼。


Celery核心概念

概念 說明
Celery Python分散式任務佇列框架,支援多種Broker和結果後端
Broker 訊息中介軟體,負責接收生產者(客戶端)傳送的任務訊息並投遞給消費者(Worker),常用Redis/RabbitMQ
Worker 消費者程序,從Broker取得任務並執行,支援多程序/協程/執行緒並行模式
Task 被Celery管理的非同步函式,透過@app.task裝飾器註冊,呼叫時回傳AsyncResult
Canvas Celery的工作流原語,支援chain(鏈式)、group(並行)、chord(並行+彙總)等組合模式
Chain 鏈式工作流,前一個任務的輸出作為後一個任務的輸入,串行執行
Chord 並行+彙總工作流,group中所有任務完成後觸發callback任務彙總結果
Group 並行工作流,一組任務同時執行,互不依賴
Flower Celery的即時監控Web面板,提供任務狀態、Worker狀態、佇列深度等視覺化
Result Backend 任務結果儲存後端,用於儲存任務回傳值和狀態,常用Redis/資料庫
Beat Celery定時任務排程器,類似Cron但支援動態設定和分散式鎖
Signature 任務簽章,將任務呼叫引數序列化為可傳遞的物件,用於Canvas組合

問題分析:分散式任務佇列的5大挑戰

  1. 任務定義與呼叫耦合:業務程式碼直接呼叫耗時操作,同步阻塞導致API回應慢,需要將耗時操作抽象為非同步任務並解耦呼叫方
  2. 任務路由與資源隔離:CPU密集型任務和IO密集型任務混在同一個佇列,IO任務被CPU任務阻塞,需要佇列隔離和路由策略
  3. 任務失敗與重試風暴:第三方服務不可用時任務反覆失敗重試,無退避策略導致雪崩,需要指數退避+最大重試次數+死信處理
  4. 複雜工作流編排:多步驟任務需要串行/並行組合,手動回撥鏈難以維護,需要Canvas原語宣告式編排
  5. 生產環境可觀測性:任務堆積、Worker當機、佇列阻塞無法及時發現,需要即時監控+告警+自動擴縮容

分步實操:6種Celery分散式任務模式

模式1:基礎任務定義與執行

最簡單的Celery任務:定義非同步函式、啟動Worker、呼叫並取得結果。

# celery_config.py
broker_url = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/1"
task_serializer = "json"
result_serializer = "json"
accept_content = ["json"]
timezone = "Asia/Taipei"
enable_utc = True
# tasks.py
from celery import Celery

app = Celery("myapp")
app.config_from_object("celery_config")

@app.task
def add(x, y):
    return x + y

@app.task
def send_email(to, subject, body):
    import time
    time.sleep(2)
    return {"status": "sent", "to": to, "subject": subject}

@app.task
def generate_thumbnail(image_path, sizes):
    import time
    time.sleep(3)
    thumbnails = []
    for size in sizes:
        thumbnails.append(f"{image_path}_{size}x{size}.jpg")
    return {"image": image_path, "thumbnails": thumbnails}

if __name__ == "__main__":
    app.start()
# client.py - 呼叫任務
from tasks import add, send_email, generate_thumbnail

result = add.delay(4, 6)
print(f"Task ID: {result.id}")
print(f"Ready: {result.ready()}")
print(f"Result: {result.get(timeout=10)}")

email_result = send_email.delay("user@example.com", "Welcome", "Hello!")
print(f"Email: {email_result.get(timeout=10)}")

thumb_result = generate_thumbnail.delay("/uploads/avatar.png", [64, 128, 256])
print(f"Thumbnails: {thumb_result.get(timeout=10)}")
# 啟動Worker
celery -A tasks worker --loglevel=info --concurrency=4

模式2:任務路由與佇列隔離

不同類型任務分發到不同佇列,Worker按佇列隔離資源。

# celery_config_routing.py
broker_url = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/1"
task_serializer = "json"
result_serializer = "json"

task_routes = {
    "tasks.compute.*": {"queue": "compute"},
    "tasks.io.*": {"queue": "io"},
    "tasks.default.*": {"queue": "default"},
}

task_default_queue = "default"
task_queues = {
    "default": {
        "exchange": "default",
        "routing_key": "default",
    },
    "compute": {
        "exchange": "compute",
        "routing_key": "compute",
    },
    "io": {
        "exchange": "io",
        "routing_key": "io",
    },
}
# tasks_routing.py
from celery import Celery

app = Celery("routed_app")
app.config_from_object("celery_config_routing")

@app.task(name="tasks.compute.ml_inference")
def ml_inference(model_id, input_data):
    import time
    time.sleep(5)
    return {"model": model_id, "prediction": 0.95, "latency_ms": 5000}

@app.task(name="tasks.compute.data_pipeline")
def data_pipeline(source, target):
    import time
    time.sleep(8)
    return {"source": source, "target": target, "rows": 100000}

@app.task(name="tasks.io.send_notification")
def send_notification(user_id, message):
    import time
    time.sleep(1)
    return {"user_id": user_id, "status": "delivered"}

@app.task(name="tasks.io.sync_external_api")
def sync_external_api(endpoint, payload):
    import time
    time.sleep(2)
    return {"endpoint": endpoint, "status_code": 200}

@app.task(name="tasks.default.cleanup")
def cleanup(days_old):
    return {"deleted_records": days_old * 100}
# 啟動不同佇列的Worker
celery -A tasks_routing worker -Q compute --loglevel=info --concurrency=2 -n compute@%h
celery -A tasks_routing worker -Q io --loglevel=info --concurrency=8 -n io@%h
celery -A tasks_routing worker -Q default --loglevel=info --concurrency=4 -n default@%h
# client_routing.py
from tasks_routing import ml_inference, send_notification, cleanup

ml_inference.delay("resnet50", {"image": "base64..."})
send_notification.delay("USR-001", "Your order has shipped")
cleanup.delay(30)

# 動態指定佇列
from tasks_routing import sync_external_api
sync_external_api.apply_async(
    args=["/api/v1/sync", {"batch": 100}],
    queue="io",
    priority=5,
)

模式3:指數退避重試策略

任務失敗時自動重試,指數退避避免雪崩,最大重試次數兜底。

# tasks_retry.py
from celery import Celery
from celery.utils.log import get_task_logger
import random
import time

logger = get_task_logger(__name__)

app = Celery("retry_app")
app.config_from_object("celery_config")

class ExternalAPIError(Exception):
    pass

class RateLimitError(Exception):
    pass

@app.task(
    bind=True,
    max_retries=5,
    default_retry_delay=30,
    autoretry_for=(ExternalAPIError,),
    retry_backoff=True,
    retry_backoff_max=600,
    retry_jitter=True,
)
def call_payment_api(self, order_id, amount):
    try:
        success = random.random() > 0.7
        if not success:
            raise ExternalAPIError(f"Payment API timeout for order {order_id}")
        return {"order_id": order_id, "amount": amount, "status": "paid"}
    except ExternalAPIError as exc:
        logger.warning(f"Payment failed for {order_id}, retry {self.request.retries + 1}/{self.max_retries}")
        raise self.retry(exc=exc)

@app.task(
    bind=True,
    max_retries=3,
    autoretry_for=(RateLimitError,),
    retry_backoff=2,
    retry_backoff_max=120,
    retry_jitter=True,
)
def call_rate_limited_api(self, endpoint):
    try:
        if random.random() > 0.5:
            raise RateLimitError(f"Rate limited on {endpoint}")
        return {"endpoint": endpoint, "data": "success"}
    except RateLimitError as exc:
        countdown = 2 ** self.request.retries + random.uniform(0, 1)
        logger.info(f"Rate limited, retrying in {countdown:.1f}s (attempt {self.request.retries + 1})")
        raise self.retry(exc=exc, countdown=countdown)

@app.task(bind=True, max_retries=4)
def process_with_custom_retry(self, data):
    try:
        result = _do_risky_operation(data)
        return result
    except Exception as exc:
        if self.request.retries >= self.max_retries:
            logger.error(f"Max retries exceeded for {data}, sending to DLQ")
            send_to_dlq.delay(str(data), str(exc), self.request.retries)
            return {"status": "failed", "data": data, "error": str(exc)}
        countdown = min(60, 5 * (2 ** self.request.retries))
        logger.warning(f"Processing failed, retry in {countdown}s")
        raise self.retry(exc=exc, countdown=countdown)

def _do_risky_operation(data):
    if random.random() > 0.4:
        raise ValueError(f"Random failure processing {data}")
    return {"processed": data, "result": "ok"}

@app.task
def send_to_dlq(task_data, error, retry_count):
    logger.error(f"DLQ: task={task_data}, error={error}, retries={retry_count}")
    return {"dlq": True, "task": task_data, "error": error}
# client_retry.py
from tasks_retry import call_payment_api, call_rate_limited_api, process_with_custom_retry

result = call_payment_api.delay("ORD-001", 99.99)
print(f"Payment task: {result.id}")

result2 = call_rate_limited_api.delay("/api/v2/users")
print(f"Rate limited task: {result2.id}")

result3 = process_with_custom_retry.delay({"key": "value"})
print(f"Custom retry task: {result3.id}")

模式4:Canvas工作流(Chain、Chord、Group)

宣告式編排複雜工作流:串行鏈、並行組、並行+彙總。

# tasks_canvas.py
from celery import Celery, chain, group, chord
from celery import signature

app = Celery("canvas_app")
app.config_from_object("celery_config")

@app.task
def download_url(url):
    import time
    time.sleep(1)
    return {"url": url, "content": f"content_of_{url.split('/')[-1]}"}

@app.task
def parse_content(download_result):
    content = download_result["content"]
    return {"parsed": True, "items": len(content), "source": download_result["url"]}

@app.task
def store_results(parse_result):
    return {"stored": True, "items": parse_result["items"], "source": parse_result["source"]}

@app.task
def aggregate_results(results):
    total_items = sum(r["items"] for r in results)
    return {"total_items": total_items, "sources": len(results), "aggregated": True}

@app.task
def send_report(aggregate_result):
    return {
        "report_sent": True,
        "total_items": aggregate_result["total_items"],
        "sources": aggregate_result["sources"],
    }

# Chain: 串行工作流
def run_chain():
    workflow = chain(
        download_url.s("https://api.example.com/data"),
        parse_content.s(),
        store_results.s(),
    )
    result = workflow.apply_async()
    return result.id

# Group: 並行工作流
def run_group():
    urls = [
        "https://api.example.com/users",
        "https://api.example.com/orders",
        "https://api.example.com/products",
    ]
    workflow = group(download_url.s(url) for url in urls)
    result = workflow.apply_async()
    return result.id

# Chord: 並行執行 + 彙總
def run_chord():
    urls = [
        "https://api.example.com/users",
        "https://api.example.com/orders",
        "https://api.example.com/products",
    ]
    workflow = chord(
        [chain(download_url.s(url), parse_content.s()) for url in urls],
        aggregate_results.s(),
    )
    result = workflow.apply_async()
    return result.id

# 複雜巢狀工作流
def run_complex_workflow():
    header = group(
        chain(download_url.s(f"https://api.example.com/batch/{i}"), parse_content.s())
        for i in range(1, 4)
    )
    workflow = chain(
        header,
        aggregate_results.s(),
        send_report.s(),
    )
    result = workflow.apply_async()
    return result.id
# client_canvas.py
from tasks_canvas import run_chain, run_group, run_chord, run_complex_workflow
from tasks_canvas import download_url, parse_content, store_results, aggregate_results
from celery import chain, group, chord

# 方式1:使用封裝函式
chain_id = run_chain()
print(f"Chain task: {chain_id}")

group_id = run_group()
print(f"Group task: {group_id}")

chord_id = run_chord()
print(f"Chord task: {chord_id}")

# 方式2:直接使用Canvas原語
from tasks_canvas import app

result = chain(
    download_url.s("https://api.example.com/report"),
    parse_content.s(),
    store_results.s(),
).apply_async()

print(f"Direct chain: {result.id}")

# 使用signature延遲建構
sig = download_url.s("https://api.example.com/dynamic")
result = chain(sig, parse_content.s(), store_results.s()).apply_async()
print(f"Signature chain: {result.id}")

模式5:Flower監控與Prometheus整合

即時監控任務狀態、Worker健康度,整合Prometheus告警。

# celery_config_monitoring.py
broker_url = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/1"
task_serializer = "json"
result_serializer = "json"

task_send_sent_event = True
task_track_started = True
worker_send_task_events = True

task_routes = {
    "monitoring_tasks.*": {"queue": "monitoring"},
}

flower_port = 5555
flower_basic_auth = ["admin:secret123"]
flower_persistent = True
flower_db = "flower_db"
flower_max_tasks = 10000
# monitoring_tasks.py
from celery import Celery
from celery.utils.log import get_task_logger
import time
import random

logger = get_task_logger(__name__)

app = Celery("monitoring_app")
app.config_from_object("celery_config_monitoring")

@app.task(name="monitoring_tasks.process_order", bind=True, track_started=True)
def process_order(self, order_id):
    self.update_state(state="PROCESSING", meta={"step": "validating", "order_id": order_id})
    time.sleep(1)

    self.update_state(state="PROCESSING", meta={"step": "charging", "order_id": order_id})
    time.sleep(2)

    self.update_state(state="PROCESSING", meta={"step": "fulfilling", "order_id": order_id})
    time.sleep(1)

    return {"order_id": order_id, "status": "completed", "amount": random.randint(100, 9999)}

@app.task(name="monitoring_tasks.generate_report", bind=True, track_started=True, time_limit=300)
def generate_report(self, report_type, date_range):
    self.update_state(state="GENERATING", meta={"progress": 0, "type": report_type})
    for i in range(1, 6):
        time.sleep(1)
        self.update_state(state="GENERATING", meta={"progress": i * 20, "type": report_type})

    return {"report_type": report_type, "date_range": date_range, "rows": random.randint(1000, 50000)}
# prometheus_exporter.py - 自訂Prometheus指標
from prometheus_client import Counter, Histogram, Gauge, start_http_server

TASK_STARTED = Counter(
    "celery_task_started_total",
    "Total number of Celery tasks started",
    ["task_name", "queue"],
)

TASK_COMPLETED = Counter(
    "celery_task_completed_total",
    "Total number of Celery tasks completed",
    ["task_name", "queue", "status"],
)

TASK_DURATION = Histogram(
    "celery_task_duration_seconds",
    "Celery task duration in seconds",
    ["task_name", "queue"],
    buckets=[0.1, 0.5, 1, 2, 5, 10, 30, 60, 120, 300],
)

QUEUE_DEPTH = Gauge(
    "celery_queue_depth",
    "Number of tasks waiting in queue",
    ["queue_name"],
)

ACTIVE_WORKERS = Gauge(
    "celery_active_workers",
    "Number of active Celery workers",
)

def start_metrics_server(port=9090):
    start_http_server(port)
    print(f"Prometheus metrics server started on :{port}")
# celery_events_monitor.py - 事件監聽器
from celery import Celery
from prometheus_exporter import TASK_STARTED, TASK_COMPLETED, TASK_DURATION
import time

app = Celery("monitoring_app")
app.config_from_object("celery_config_monitoring")

def monitor_events():
    with app.connection() as connection:
        recv = app.events.Receiver(
            connection,
            handlers={
                "task-sent": on_task_sent,
                "task-started": on_task_started,
                "task-succeeded": on_task_succeeded,
                "task-failed": on_task_failed,
                "task-retried": on_task_retried,
                "worker-heartbeat": on_worker_heartbeat,
            },
        )
        recv.capture(limit=None, timeout=None, wakeup=True)

def on_task_sent(event):
    pass

def on_task_started(event):
    task_name = event.get("name", "unknown")
    queue = event.get("queue", "unknown")
    TASK_STARTED.labels(task_name=task_name, queue=queue).inc()

def on_task_succeeded(event):
    task_name = event.get("name", "unknown")
    queue = event.get("queue", "unknown")
    runtime = event.get("runtime", 0)
    TASK_COMPLETED.labels(task_name=task_name, queue=queue, status="success").inc()
    TASK_DURATION.labels(task_name=task_name, queue=queue).observe(runtime)

def on_task_failed(event):
    task_name = event.get("name", "unknown")
    queue = event.get("queue", "unknown")
    TASK_COMPLETED.labels(task_name=task_name, queue=queue, status="failed").inc()

def on_task_retried(event):
    task_name = event.get("name", "unknown")
    TASK_COMPLETED.labels(task_name=task_name, queue="unknown", status="retried").inc()

def on_worker_heartbeat(event):
    pass

if __name__ == "__main__":
    from prometheus_exporter import start_metrics_server
    start_metrics_server(9090)
    monitor_events()
# 啟動Flower監控
celery -A monitoring_tasks flower --port=5555 --basic-auth=admin:secret123

# 啟動事件監聽+Prometheus
python celery_events_monitor.py

模式6:生產級Celery服務(Docker + Kubernetes)

Docker多階段建構、K8s Deployment部署、HPA自動擴縮容。

# Dockerfile
FROM python:3.12-slim AS builder

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --prefix=/install -r requirements.txt

FROM python:3.12-slim

WORKDIR /app
COPY --from=builder /install /usr/local
COPY . .

ENV PYTHONUNBUFFERED=1
ENV CELERY_BROKER_URL=redis://redis:6379/0
ENV CELERY_RESULT_BACKEND=redis://redis:6379/1

CMD ["celery", "-A", "tasks", "worker", "--loglevel=info"]
# docker-compose.yml
version: "3.8"

services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    command: redis-server --appendonly yes --maxmemory 512mb --maxmemory-policy allkeys-lru
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5

  worker-default:
    build: .
    command: celery -A tasks worker -Q default --loglevel=info --concurrency=4 -n default@%h
    depends_on:
      redis:
        condition: service_healthy
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/1
    deploy:
      replicas: 2
      resources:
        limits:
          memory: 512M
          cpus: "1.0"
        reservations:
          memory: 256M
          cpus: "0.5"
    restart: unless-stopped

  worker-compute:
    build: .
    command: celery -A tasks worker -Q compute --loglevel=info --concurrency=2 -n compute@%h
    depends_on:
      redis:
        condition: service_healthy
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/1
    deploy:
      replicas: 1
      resources:
        limits:
          memory: 2G
          cpus: "2.0"
        reservations:
          memory: 1G
          cpus: "1.0"
    restart: unless-stopped

  worker-io:
    build: .
    command: celery -A tasks worker -Q io --loglevel=info --concurrency=16 -n io@%h
    depends_on:
      redis:
        condition: service_healthy
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/1
    deploy:
      replicas: 3
      resources:
        limits:
          memory: 512M
          cpus: "1.0"
    restart: unless-stopped

  flower:
    build: .
    command: celery -A tasks flower --port=5555 --basic-auth=admin:secret123
    depends_on:
      redis:
        condition: service_healthy
    ports:
      - "5555:5555"
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/1
    restart: unless-stopped

  beat:
    build: .
    command: celery -A tasks beat --loglevel=info
    depends_on:
      redis:
        condition: service_healthy
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/1
    restart: unless-stopped

volumes:
  redis_data:
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: celery-worker-default
  labels:
    app: celery-worker
    queue: default
spec:
  replicas: 3
  selector:
    matchLabels:
      app: celery-worker
      queue: default
  template:
    metadata:
      labels:
        app: celery-worker
        queue: default
    spec:
      containers:
        - name: worker
          image: myregistry/celery-app:latest
          command: ["celery", "-A", "tasks", "worker", "-Q", "default", "--loglevel=info", "--concurrency=4"]
          env:
            - name: CELERY_BROKER_URL
              valueFrom:
                secretKeyRef:
                  name: celery-secrets
                  key: broker-url
            - name: CELERY_RESULT_BACKEND
              valueFrom:
                secretKeyRef:
                  name: celery-secrets
                  key: result-backend
          resources:
            requests:
              memory: "256Mi"
              cpu: "500m"
            limits:
              memory: "512Mi"
              cpu: "1000m"
          livenessProbe:
            exec:
              command: ["celery", "-A", "tasks", "inspect", "ping"]
            initialDelaySeconds: 30
            periodSeconds: 60
            timeoutSeconds: 10
          readinessProbe:
            exec:
              command: ["celery", "-A", "tasks", "inspect", "ping"]
            initialDelaySeconds: 10
            periodSeconds: 30
            timeoutSeconds: 10
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: celery-worker-default-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: celery-worker-default
  minReplicas: 2
  maxReplicas: 20
  metrics:
    - type: External
      external:
        metric:
          name: celery_queue_depth
          selector:
            matchLabels:
              queue_name: default
        target:
          type: AverageValue
          averageValue: "100"
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: celery-beat
  labels:
    app: celery-beat
spec:
  replicas: 1
  selector:
    matchLabels:
      app: celery-beat
  template:
    metadata:
      labels:
        app: celery-beat
    spec:
      containers:
        - name: beat
          image: myregistry/celery-app:latest
          command: ["celery", "-A", "tasks", "beat", "--loglevel=info"]
          env:
            - name: CELERY_BROKER_URL
              valueFrom:
                secretKeyRef:
                  name: celery-secrets
                  key: broker-url
            - name: CELERY_RESULT_BACKEND
              valueFrom:
                secretKeyRef:
                  name: celery-secrets
                  key: result-backend
          resources:
            requests:
              memory: "128Mi"
              cpu: "100m"
            limits:
              memory: "256Mi"
              cpu: "250m"
# k8s/flower-service.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: celery-flower
spec:
  replicas: 1
  selector:
    matchLabels:
      app: celery-flower
  template:
    metadata:
      labels:
        app: celery-flower
    spec:
      containers:
        - name: flower
          image: myregistry/celery-app:latest
          command: ["celery", "-A", "tasks", "flower", "--port=5555"]
          ports:
            - containerPort: 5555
          env:
            - name: CELERY_BROKER_URL
              valueFrom:
                secretKeyRef:
                  name: celery-secrets
                  key: broker-url
            - name: CELERY_RESULT_BACKEND
              valueFrom:
                secretKeyRef:
                  name: celery-secrets
                  key: result-backend
---
apiVersion: v1
kind: Service
metadata:
  name: celery-flower
spec:
  selector:
    app: celery-flower
  ports:
    - port: 5555
      targetPort: 5555
  type: ClusterIP

踩坑指南:5個常見陷阱

陷阱1:任務中直接使用全域資料庫連線

錯誤寫法

import psycopg2

conn = psycopg2.connect("postgresql://localhost/mydb")

@app.task
def save_user(user_data):
    cursor = conn.cursor()
    cursor.execute("INSERT INTO users (%s)", (user_data,))
    conn.commit()

正確寫法

@app.task
def save_user(user_data):
    from django.db import connection
    with connection.cursor() as cursor:
        cursor.execute("INSERT INTO users (data) VALUES (%s)", (user_data,))

陷阱2:忽略任務超時導致Worker永久阻塞

錯誤寫法

@app.task
def fetch_external_data(url):
    import requests
    response = requests.get(url)
    return response.json()

正確寫法

@app.task(
    time_limit=300,
    soft_time_limit=270,
)
def fetch_external_data(url):
    import requests
    try:
        response = requests.get(url, timeout=60)
        return response.json()
    except requests.Timeout:
        raise RetryException("Request timeout")

陷阱3:Canvas中傳遞不可序列化物件

錯誤寫法

@app.task
def process_file(file_obj):
    return file_obj.read()

workflow = chain(process_file.s(open("data.csv")), store_result.s())

正確寫法

@app.task
def process_file(file_path):
    with open(file_path, "r") as f:
        return f.read()

workflow = chain(process_file.s("/data/data.csv"), store_result.s())

陷阱4:Beat定時任務未加分散式鎖導致重複執行

錯誤寫法

# 多個Beat實例同時執行,任務重複執行
CELERYBEAT_SCHEDULE = {
    "cleanup-every-night": {
        "task": "tasks.cleanup",
        "schedule": crontab(hour=2, minute=0),
    },
}

正確寫法

from celery.schedules import crontab
from celery_redbeat import RedBeatScheduler

beat_scheduler = RedBeatScheduler
redbeat_redis_url = "redis://localhost:6379/2"

CELERYBEAT_SCHEDULE = {
    "cleanup-every-night": {
        "task": "tasks.cleanup",
        "schedule": crontab(hour=2, minute=0),
        "options": {"queue": "default"},
    },
}

陷阱5:Worker使用fork模式但未處理資源繼承

錯誤寫法

import redis

redis_client = redis.Redis()

@app.task
def update_cache(key, value):
    redis_client.set(key, value)

正確寫法

@app.task
def update_cache(key, value):
    import redis
    redis_client = redis.Redis()
    redis_client.set(key, value)

# 或使用Worker程序初始化
@app.task(bind=True)
def update_cache(self, key, value):
    redis_client = self.app.pool.redis
    redis_client.set(key, value)

錯誤排查表

錯誤現象 可能原因 排查方法 解決方案
Worker啟動後無日誌輸出 Broker連線失敗 檢查CELERY_BROKER_URL格式和連通性 確認Redis/RabbitMQ執行正常,URL格式正確
任務一直PENDING不執行 沒有Worker消費對應佇列 celery -A tasks inspect active_queues 啟動對應佇列的Worker或檢查task_routes設定
kombu.exceptions.EncodeError 任務引數包含不可序列化物件 檢查delay()/apply_async()的引數 只傳遞JSON可序列化的基本型別
WorkerLostError Worker程序被OOM Killer終止 dmesg | grep -i oom檢視系統日誌 增加Worker記憶體限制或減少concurrency
TimeLimitExceeded 任務執行超過time_limit 檢查任務邏輯是否有死迴圈或阻塞IO 設定合理的time_limit和soft_time_limit
任務結果回傳None 未設定result_backend 檢查CELERY_RESULT_BACKEND設定 設定Redis/資料庫作為結果後端
Chord任務callback不執行 header中某個任務靜默失敗 Flower檢視header任務狀態 確保header任務正確raise異常而非return None
ContentDisallowed 不接受的訊息序列化格式 accept_content與傳送方不一致 統一設定accept_content = ["json"]
Beat任務不按時執行 時區設定錯誤 檢查timezoneenable_utc設定 統一時區設定,推薦enable_utc=True
Worker頻繁斷線重連 Broker連線池耗盡或網路抖動 檢查broker_pool_limit和連線超時 增加broker_pool_limit,設定broker_heartbeat

進階最佳化

1. 任務結果去重與冪等性

生產環境中任務可能被重複投遞(網路抖動、Worker重啟),需要保證冪等性。

import hashlib
from celery import Celery
from contextlib import contextmanager

app = Celery("idempotent_app")
app.config_from_object("celery_config")

@contextmanager
def idempotent_lock(task_id, redis_client, ttl=3600):
    lock_key = f"celery:idempotent:{task_id}"
    acquired = redis_client.set(lock_key, "1", nx=True, ex=ttl)
    try:
        yield acquired is not None
    finally:
        pass

@app.task(bind=True)
def process_order_idempotent(self, order_id, amount):
    import redis
    r = redis.Redis(host="localhost", port=6379, db=3)

    task_key = f"order:processed:{order_id}"
    existing = r.get(task_key)
    if existing:
        return {"status": "duplicate", "order_id": order_id, "cached_result": existing.decode()}

    with idempotent_lock(self.request.id, r) as acquired:
        if not acquired:
            return {"status": "duplicate", "order_id": order_id}

        result = _do_process_order(order_id, amount)
        r.setex(task_key, 86400, str(result))
        return result

def _do_process_order(order_id, amount):
    return {"status": "processed", "order_id": order_id, "amount": amount}

2. 動態任務優先順序與限流

高優先順序任務優先執行,限流防止下游服務過載。

from celery import Celery

app = Celery("priority_app")
app.config_from_object("celery_config")

app.conf.broker_transport_options = {
    "priority_steps": list(range(10)),
    "sep": ":",
    "queue_order_strategy": "priority",
}

@app.task(rate_limit="10/s")
def call_external_api(endpoint, payload):
    import time
    time.sleep(0.1)
    return {"endpoint": endpoint, "status": "ok"}

# 高優先順序任務
def submit_high_priority():
    call_external_api.apply_async(
        args=["/api/v1/critical", {"urgent": True}],
        priority=0,
        queue="io",
    )

# 低優先順序任務
def submit_low_priority():
    call_external_api.apply_async(
        args=["/api/v1/batch", {"batch": True}],
        priority=9,
        queue="io",
    )

# 動態限流
from celery import current_app

def adjust_rate_limit(task_name, new_rate):
    current_app.control.rate_limit(task_name, new_rate)

3. Celery與FastAPI整合

將Celery整合到FastAPI應用中,API觸發非同步任務並查詢狀態。

# fastapi_celery.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from celery.result import AsyncResult

app = FastAPI(title="Celery API Gateway")

class TaskSubmit(BaseModel):
    task_name: str
    args: list = []
    kwargs: dict = {}

class TaskResponse(BaseModel):
    task_id: str
    status: str

@app.post("/tasks/submit", response_model=TaskResponse)
def submit_task(submit: TaskSubmit):
    from tasks import app as celery_app
    try:
        result = celery_app.send_task(
            submit.task_name,
            args=submit.args,
            kwargs=submit.kwargs,
        )
        return TaskResponse(task_id=result.id, status="PENDING")
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.get("/tasks/{task_id}", response_model=dict)
def get_task_status(task_id: str):
    result = AsyncResult(task_id)
    response = {
        "task_id": task_id,
        "status": result.status,
        "result": result.result if result.ready() else None,
    }
    if result.failed():
        response["error"] = str(result.result)
    return response

@app.post("/tasks/{task_id}/revoke")
def revoke_task(task_id: str):
    from tasks import app as celery_app
    celery_app.control.revoke(task_id, terminate=True)
    return {"task_id": task_id, "status": "REVOKED"}

框架比較

特性 Celery Dramatiq Huey RQ Temporal
語言 Python Python Python Python Go/多語言
Broker Redis/RabbitMQ/SQS等 Redis/RabbitMQ Redis/SQLite Redis 自有
工作流編排 Canvas(chain/chord/group) Pipeline 不支援 不支援 原生DAG
重試策略 指數退避+Jitter+自訂 指數退避+Jitter 簡單重試 簡單重試 完整重試策略
監控 Flower 內建Dashboard 簡單 RQ Dashboard Web UI
定時任務 Beat 內建 內建 不支援 原生Schedule
社群生態 最成熟 中等 小眾 小眾 快速增長
學習曲線 中等
多語言支援
生產成熟度
適用場景 通用分散式任務 中小專案 輕量級任務 Redis生態簡單任務 跨語言複雜工作流

總結

Python Celery從5.4版本開始,已經是分散式任務佇列領域最成熟、最靈活的方案。記住三個核心原則:任務冪等性是底線——重複投遞必須安全;佇列隔離是標配——CPU/IO/預設佇列必須分開;監控告警不是可選項——Flower+Prometheus是生產環境的眼睛。從單機celery worker到K8s HPA自動擴縮容,Celery的架構可以隨業務線性增長,不需要重寫任務程式碼。


推薦工具

本站提供瀏覽器本地工具,免註冊即可試用 →

#Python#Celery#分布式任务#Redis#RabbitMQ#2026#异步任务