Python Celery Distributed Task Queue: 6 Production Patterns from Basic Tasks to Event-Driven Pipelines

编程语言

Background Tasks Blocking APIs, Cron Not Scaling, Task Failures Without Retry

Users upload avatars and need 5 thumbnail sizes generated — the API times out synchronously. The 2 AM daily data cleanup cron can't finish when data doubles. Third-party payment callbacks fail occasionally with no retry mechanism — manual intervention required. In 2026, Python Celery remains the king of distributed task queues — version 5.4 brings native asyncio support and more stable Canvas workflows, from single-machine development to K8s cluster deployment, one codebase handles it all.

This article covers 6 production patterns, guiding you through basic task definition → routing queue isolation → exponential backoff retry → Canvas workflows → Flower+Prometheus monitoring → Docker/K8s production deployment with complete runnable Python code.


Celery Core Concepts

Concept Description
Celery Python distributed task queue framework supporting multiple brokers and result backends
Broker Message middleware that receives task messages from producers (clients) and delivers them to consumers (workers), commonly Redis/RabbitMQ
Worker Consumer process that fetches tasks from the Broker and executes them, supports multi-process/coroutine/thread concurrency
Task Asynchronous function managed by Celery, registered via @app.task decorator, returns AsyncResult when called
Canvas Celery's workflow primitives supporting chain (sequential), group (parallel), chord (parallel + aggregate) composition
Chain Sequential workflow where the output of one task becomes the input of the next
Chord Parallel + aggregate workflow where a callback task aggregates results after all group tasks complete
Group Parallel workflow where a set of tasks execute simultaneously without dependencies
Flower Celery's real-time monitoring web panel providing task status, worker status, queue depth visualization
Result Backend Task result storage backend for saving task return values and states, commonly Redis/database
Beat Celery's periodic task scheduler, similar to Cron but supports dynamic configuration and distributed locks
Signature Task signature that serializes task call parameters into a passable object for Canvas composition

Problem Analysis: 5 Major Distributed Task Queue Challenges

  1. Task definition and invocation coupling: Business code directly calls time-consuming operations, synchronous blocking causes slow API responses, need to abstract operations as async tasks and decouple callers
  2. Task routing and resource isolation: CPU-intensive and IO-intensive tasks share the same queue, IO tasks blocked by CPU tasks, need queue isolation and routing strategies
  3. Task failure and retry storms: Tasks repeatedly fail and retry when third-party services are unavailable, no backoff strategy causes avalanches, need exponential backoff + max retries + dead letter handling
  4. Complex workflow orchestration: Multi-step tasks require sequential/parallel composition, manual callback chains are hard to maintain, need Canvas primitives for declarative orchestration
  5. Production observability: Task accumulation, worker crashes, queue blockage go undetected, need real-time monitoring + alerting + auto-scaling

Step-by-Step: 6 Celery Distributed Task Patterns

Pattern 1: Basic Task Definition and Execution

The simplest Celery task: define an async function, start a worker, invoke and get results.

# celery_config.py
broker_url = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/1"
task_serializer = "json"
result_serializer = "json"
accept_content = ["json"]
timezone = "UTC"
enable_utc = True
# tasks.py
from celery import Celery

app = Celery("myapp")
app.config_from_object("celery_config")

@app.task
def add(x, y):
    return x + y

@app.task
def send_email(to, subject, body):
    import time
    time.sleep(2)
    return {"status": "sent", "to": to, "subject": subject}

@app.task
def generate_thumbnail(image_path, sizes):
    import time
    time.sleep(3)
    thumbnails = []
    for size in sizes:
        thumbnails.append(f"{image_path}_{size}x{size}.jpg")
    return {"image": image_path, "thumbnails": thumbnails}

if __name__ == "__main__":
    app.start()
# client.py - Invoke tasks
from tasks import add, send_email, generate_thumbnail

result = add.delay(4, 6)
print(f"Task ID: {result.id}")
print(f"Ready: {result.ready()}")
print(f"Result: {result.get(timeout=10)}")

email_result = send_email.delay("user@example.com", "Welcome", "Hello!")
print(f"Email: {email_result.get(timeout=10)}")

thumb_result = generate_thumbnail.delay("/uploads/avatar.png", [64, 128, 256])
print(f"Thumbnails: {thumb_result.get(timeout=10)}")
# Start Worker
celery -A tasks worker --loglevel=info --concurrency=4

Pattern 2: Task Routing and Queue Isolation

Route different task types to different queues, isolate resources by queue.

# celery_config_routing.py
broker_url = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/1"
task_serializer = "json"
result_serializer = "json"

task_routes = {
    "tasks.compute.*": {"queue": "compute"},
    "tasks.io.*": {"queue": "io"},
    "tasks.default.*": {"queue": "default"},
}

task_default_queue = "default"
task_queues = {
    "default": {
        "exchange": "default",
        "routing_key": "default",
    },
    "compute": {
        "exchange": "compute",
        "routing_key": "compute",
    },
    "io": {
        "exchange": "io",
        "routing_key": "io",
    },
}
# tasks_routing.py
from celery import Celery

app = Celery("routed_app")
app.config_from_object("celery_config_routing")

@app.task(name="tasks.compute.ml_inference")
def ml_inference(model_id, input_data):
    import time
    time.sleep(5)
    return {"model": model_id, "prediction": 0.95, "latency_ms": 5000}

@app.task(name="tasks.compute.data_pipeline")
def data_pipeline(source, target):
    import time
    time.sleep(8)
    return {"source": source, "target": target, "rows": 100000}

@app.task(name="tasks.io.send_notification")
def send_notification(user_id, message):
    import time
    time.sleep(1)
    return {"user_id": user_id, "status": "delivered"}

@app.task(name="tasks.io.sync_external_api")
def sync_external_api(endpoint, payload):
    import time
    time.sleep(2)
    return {"endpoint": endpoint, "status_code": 200}

@app.task(name="tasks.default.cleanup")
def cleanup(days_old):
    return {"deleted_records": days_old * 100}
# Start workers for different queues
celery -A tasks_routing worker -Q compute --loglevel=info --concurrency=2 -n compute@%h
celery -A tasks_routing worker -Q io --loglevel=info --concurrency=8 -n io@%h
celery -A tasks_routing worker -Q default --loglevel=info --concurrency=4 -n default@%h
# client_routing.py
from tasks_routing import ml_inference, send_notification, cleanup

ml_inference.delay("resnet50", {"image": "base64..."})
send_notification.delay("USR-001", "Your order has shipped")
cleanup.delay(30)

# Dynamically specify queue
from tasks_routing import sync_external_api
sync_external_api.apply_async(
    args=["/api/v1/sync", {"batch": 100}],
    queue="io",
    priority=5,
)

Pattern 3: Retry Strategies with Exponential Backoff

Automatic retry on task failure, exponential backoff prevents avalanches, max retries as safety net.

# tasks_retry.py
from celery import Celery
from celery.utils.log import get_task_logger
import random
import time

logger = get_task_logger(__name__)

app = Celery("retry_app")
app.config_from_object("celery_config")

class ExternalAPIError(Exception):
    pass

class RateLimitError(Exception):
    pass

@app.task(
    bind=True,
    max_retries=5,
    default_retry_delay=30,
    autoretry_for=(ExternalAPIError,),
    retry_backoff=True,
    retry_backoff_max=600,
    retry_jitter=True,
)
def call_payment_api(self, order_id, amount):
    try:
        success = random.random() > 0.7
        if not success:
            raise ExternalAPIError(f"Payment API timeout for order {order_id}")
        return {"order_id": order_id, "amount": amount, "status": "paid"}
    except ExternalAPIError as exc:
        logger.warning(f"Payment failed for {order_id}, retry {self.request.retries + 1}/{self.max_retries}")
        raise self.retry(exc=exc)

@app.task(
    bind=True,
    max_retries=3,
    autoretry_for=(RateLimitError,),
    retry_backoff=2,
    retry_backoff_max=120,
    retry_jitter=True,
)
def call_rate_limited_api(self, endpoint):
    try:
        if random.random() > 0.5:
            raise RateLimitError(f"Rate limited on {endpoint}")
        return {"endpoint": endpoint, "data": "success"}
    except RateLimitError as exc:
        countdown = 2 ** self.request.retries + random.uniform(0, 1)
        logger.info(f"Rate limited, retrying in {countdown:.1f}s (attempt {self.request.retries + 1})")
        raise self.retry(exc=exc, countdown=countdown)

@app.task(bind=True, max_retries=4)
def process_with_custom_retry(self, data):
    try:
        result = _do_risky_operation(data)
        return result
    except Exception as exc:
        if self.request.retries >= self.max_retries:
            logger.error(f"Max retries exceeded for {data}, sending to DLQ")
            send_to_dlq.delay(str(data), str(exc), self.request.retries)
            return {"status": "failed", "data": data, "error": str(exc)}
        countdown = min(60, 5 * (2 ** self.request.retries))
        logger.warning(f"Processing failed, retry in {countdown}s")
        raise self.retry(exc=exc, countdown=countdown)

def _do_risky_operation(data):
    if random.random() > 0.4:
        raise ValueError(f"Random failure processing {data}")
    return {"processed": data, "result": "ok"}

@app.task
def send_to_dlq(task_data, error, retry_count):
    logger.error(f"DLQ: task={task_data}, error={error}, retries={retry_count}")
    return {"dlq": True, "task": task_data, "error": error}
# client_retry.py
from tasks_retry import call_payment_api, call_rate_limited_api, process_with_custom_retry

result = call_payment_api.delay("ORD-001", 99.99)
print(f"Payment task: {result.id}")

result2 = call_rate_limited_api.delay("/api/v2/users")
print(f"Rate limited task: {result2.id}")

result3 = process_with_custom_retry.delay({"key": "value"})
print(f"Custom retry task: {result3.id}")

Pattern 4: Canvas Workflow (Chain, Chord, Group)

Declarative orchestration of complex workflows: sequential chains, parallel groups, parallel + aggregate.

# tasks_canvas.py
from celery import Celery, chain, group, chord
from celery import signature

app = Celery("canvas_app")
app.config_from_object("celery_config")

@app.task
def download_url(url):
    import time
    time.sleep(1)
    return {"url": url, "content": f"content_of_{url.split('/')[-1]}"}

@app.task
def parse_content(download_result):
    content = download_result["content"]
    return {"parsed": True, "items": len(content), "source": download_result["url"]}

@app.task
def store_results(parse_result):
    return {"stored": True, "items": parse_result["items"], "source": parse_result["source"]}

@app.task
def aggregate_results(results):
    total_items = sum(r["items"] for r in results)
    return {"total_items": total_items, "sources": len(results), "aggregated": True}

@app.task
def send_report(aggregate_result):
    return {
        "report_sent": True,
        "total_items": aggregate_result["total_items"],
        "sources": aggregate_result["sources"],
    }

# Chain: Sequential workflow
def run_chain():
    workflow = chain(
        download_url.s("https://api.example.com/data"),
        parse_content.s(),
        store_results.s(),
    )
    result = workflow.apply_async()
    return result.id

# Group: Parallel workflow
def run_group():
    urls = [
        "https://api.example.com/users",
        "https://api.example.com/orders",
        "https://api.example.com/products",
    ]
    workflow = group(download_url.s(url) for url in urls)
    result = workflow.apply_async()
    return result.id

# Chord: Parallel execution + aggregation
def run_chord():
    urls = [
        "https://api.example.com/users",
        "https://api.example.com/orders",
        "https://api.example.com/products",
    ]
    workflow = chord(
        [chain(download_url.s(url), parse_content.s()) for url in urls],
        aggregate_results.s(),
    )
    result = workflow.apply_async()
    return result.id

# Complex nested workflow
def run_complex_workflow():
    header = group(
        chain(download_url.s(f"https://api.example.com/batch/{i}"), parse_content.s())
        for i in range(1, 4)
    )
    workflow = chain(
        header,
        aggregate_results.s(),
        send_report.s(),
    )
    result = workflow.apply_async()
    return result.id
# client_canvas.py
from tasks_canvas import run_chain, run_group, run_chord, run_complex_workflow
from tasks_canvas import download_url, parse_content, store_results, aggregate_results
from celery import chain, group, chord

# Method 1: Using wrapper functions
chain_id = run_chain()
print(f"Chain task: {chain_id}")

group_id = run_group()
print(f"Group task: {group_id}")

chord_id = run_chord()
print(f"Chord task: {chord_id}")

# Method 2: Using Canvas primitives directly
from tasks_canvas import app

result = chain(
    download_url.s("https://api.example.com/report"),
    parse_content.s(),
    store_results.s(),
).apply_async()

print(f"Direct chain: {result.id}")

# Using signatures for deferred construction
sig = download_url.s("https://api.example.com/dynamic")
result = chain(sig, parse_content.s(), store_results.s()).apply_async()
print(f"Signature chain: {result.id}")

Pattern 5: Monitoring with Flower and Prometheus

Real-time monitoring of task status, worker health, integrated with Prometheus alerting.

# celery_config_monitoring.py
broker_url = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/1"
task_serializer = "json"
result_serializer = "json"

task_send_sent_event = True
task_track_started = True
worker_send_task_events = True

task_routes = {
    "monitoring_tasks.*": {"queue": "monitoring"},
}

flower_port = 5555
flower_basic_auth = ["admin:secret123"]
flower_persistent = True
flower_db = "flower_db"
flower_max_tasks = 10000
# monitoring_tasks.py
from celery import Celery
from celery.utils.log import get_task_logger
import time
import random

logger = get_task_logger(__name__)

app = Celery("monitoring_app")
app.config_from_object("celery_config_monitoring")

@app.task(name="monitoring_tasks.process_order", bind=True, track_started=True)
def process_order(self, order_id):
    self.update_state(state="PROCESSING", meta={"step": "validating", "order_id": order_id})
    time.sleep(1)

    self.update_state(state="PROCESSING", meta={"step": "charging", "order_id": order_id})
    time.sleep(2)

    self.update_state(state="PROCESSING", meta={"step": "fulfilling", "order_id": order_id})
    time.sleep(1)

    return {"order_id": order_id, "status": "completed", "amount": random.randint(100, 9999)}

@app.task(name="monitoring_tasks.generate_report", bind=True, track_started=True, time_limit=300)
def generate_report(self, report_type, date_range):
    self.update_state(state="GENERATING", meta={"progress": 0, "type": report_type})
    for i in range(1, 6):
        time.sleep(1)
        self.update_state(state="GENERATING", meta={"progress": i * 20, "type": report_type})

    return {"report_type": report_type, "date_range": date_range, "rows": random.randint(1000, 50000)}
# prometheus_exporter.py - Custom Prometheus metrics
from prometheus_client import Counter, Histogram, Gauge, start_http_server

TASK_STARTED = Counter(
    "celery_task_started_total",
    "Total number of Celery tasks started",
    ["task_name", "queue"],
)

TASK_COMPLETED = Counter(
    "celery_task_completed_total",
    "Total number of Celery tasks completed",
    ["task_name", "queue", "status"],
)

TASK_DURATION = Histogram(
    "celery_task_duration_seconds",
    "Celery task duration in seconds",
    ["task_name", "queue"],
    buckets=[0.1, 0.5, 1, 2, 5, 10, 30, 60, 120, 300],
)

QUEUE_DEPTH = Gauge(
    "celery_queue_depth",
    "Number of tasks waiting in queue",
    ["queue_name"],
)

ACTIVE_WORKERS = Gauge(
    "celery_active_workers",
    "Number of active Celery workers",
)

def start_metrics_server(port=9090):
    start_http_server(port)
    print(f"Prometheus metrics server started on :{port}")
# celery_events_monitor.py - Event listener
from celery import Celery
from prometheus_exporter import TASK_STARTED, TASK_COMPLETED, TASK_DURATION
import time

app = Celery("monitoring_app")
app.config_from_object("celery_config_monitoring")

def monitor_events():
    with app.connection() as connection:
        recv = app.events.Receiver(
            connection,
            handlers={
                "task-sent": on_task_sent,
                "task-started": on_task_started,
                "task-succeeded": on_task_succeeded,
                "task-failed": on_task_failed,
                "task-retried": on_task_retried,
                "worker-heartbeat": on_worker_heartbeat,
            },
        )
        recv.capture(limit=None, timeout=None, wakeup=True)

def on_task_sent(event):
    pass

def on_task_started(event):
    task_name = event.get("name", "unknown")
    queue = event.get("queue", "unknown")
    TASK_STARTED.labels(task_name=task_name, queue=queue).inc()

def on_task_succeeded(event):
    task_name = event.get("name", "unknown")
    queue = event.get("queue", "unknown")
    runtime = event.get("runtime", 0)
    TASK_COMPLETED.labels(task_name=task_name, queue=queue, status="success").inc()
    TASK_DURATION.labels(task_name=task_name, queue=queue).observe(runtime)

def on_task_failed(event):
    task_name = event.get("name", "unknown")
    queue = event.get("queue", "unknown")
    TASK_COMPLETED.labels(task_name=task_name, queue=queue, status="failed").inc()

def on_task_retried(event):
    task_name = event.get("name", "unknown")
    TASK_COMPLETED.labels(task_name=task_name, queue="unknown", status="retried").inc()

def on_worker_heartbeat(event):
    pass

if __name__ == "__main__":
    from prometheus_exporter import start_metrics_server
    start_metrics_server(9090)
    monitor_events()
# Start Flower monitoring
celery -A monitoring_tasks flower --port=5555 --basic-auth=admin:secret123

# Start event listener + Prometheus
python celery_events_monitor.py

Pattern 6: Production Celery Service with Docker and Kubernetes

Docker multi-stage builds, K8s Deployment, HPA auto-scaling.

# Dockerfile
FROM python:3.12-slim AS builder

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --prefix=/install -r requirements.txt

FROM python:3.12-slim

WORKDIR /app
COPY --from=builder /install /usr/local
COPY . .

ENV PYTHONUNBUFFERED=1
ENV CELERY_BROKER_URL=redis://redis:6379/0
ENV CELERY_RESULT_BACKEND=redis://redis:6379/1

CMD ["celery", "-A", "tasks", "worker", "--loglevel=info"]
# docker-compose.yml
version: "3.8"

services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    command: redis-server --appendonly yes --maxmemory 512mb --maxmemory-policy allkeys-lru
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5

  worker-default:
    build: .
    command: celery -A tasks worker -Q default --loglevel=info --concurrency=4 -n default@%h
    depends_on:
      redis:
        condition: service_healthy
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/1
    deploy:
      replicas: 2
      resources:
        limits:
          memory: 512M
          cpus: "1.0"
        reservations:
          memory: 256M
          cpus: "0.5"
    restart: unless-stopped

  worker-compute:
    build: .
    command: celery -A tasks worker -Q compute --loglevel=info --concurrency=2 -n compute@%h
    depends_on:
      redis:
        condition: service_healthy
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/1
    deploy:
      replicas: 1
      resources:
        limits:
          memory: 2G
          cpus: "2.0"
        reservations:
          memory: 1G
          cpus: "1.0"
    restart: unless-stopped

  worker-io:
    build: .
    command: celery -A tasks worker -Q io --loglevel=info --concurrency=16 -n io@%h
    depends_on:
      redis:
        condition: service_healthy
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/1
    deploy:
      replicas: 3
      resources:
        limits:
          memory: 512M
          cpus: "1.0"
    restart: unless-stopped

  flower:
    build: .
    command: celery -A tasks flower --port=5555 --basic-auth=admin:secret123
    depends_on:
      redis:
        condition: service_healthy
    ports:
      - "5555:5555"
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/1
    restart: unless-stopped

  beat:
    build: .
    command: celery -A tasks beat --loglevel=info
    depends_on:
      redis:
        condition: service_healthy
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/1
    restart: unless-stopped

volumes:
  redis_data:
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: celery-worker-default
  labels:
    app: celery-worker
    queue: default
spec:
  replicas: 3
  selector:
    matchLabels:
      app: celery-worker
      queue: default
  template:
    metadata:
      labels:
        app: celery-worker
        queue: default
    spec:
      containers:
        - name: worker
          image: myregistry/celery-app:latest
          command: ["celery", "-A", "tasks", "worker", "-Q", "default", "--loglevel=info", "--concurrency=4"]
          env:
            - name: CELERY_BROKER_URL
              valueFrom:
                secretKeyRef:
                  name: celery-secrets
                  key: broker-url
            - name: CELERY_RESULT_BACKEND
              valueFrom:
                secretKeyRef:
                  name: celery-secrets
                  key: result-backend
          resources:
            requests:
              memory: "256Mi"
              cpu: "500m"
            limits:
              memory: "512Mi"
              cpu: "1000m"
          livenessProbe:
            exec:
              command: ["celery", "-A", "tasks", "inspect", "ping"]
            initialDelaySeconds: 30
            periodSeconds: 60
            timeoutSeconds: 10
          readinessProbe:
            exec:
              command: ["celery", "-A", "tasks", "inspect", "ping"]
            initialDelaySeconds: 10
            periodSeconds: 30
            timeoutSeconds: 10
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: celery-worker-default-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: celery-worker-default
  minReplicas: 2
  maxReplicas: 20
  metrics:
    - type: External
      external:
        metric:
          name: celery_queue_depth
          selector:
            matchLabels:
              queue_name: default
        target:
          type: AverageValue
          averageValue: "100"
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: celery-beat
  labels:
    app: celery-beat
spec:
  replicas: 1
  selector:
    matchLabels:
      app: celery-beat
  template:
    metadata:
      labels:
        app: celery-beat
    spec:
      containers:
        - name: beat
          image: myregistry/celery-app:latest
          command: ["celery", "-A", "tasks", "beat", "--loglevel=info"]
          env:
            - name: CELERY_BROKER_URL
              valueFrom:
                secretKeyRef:
                  name: celery-secrets
                  key: broker-url
            - name: CELERY_RESULT_BACKEND
              valueFrom:
                secretKeyRef:
                  name: celery-secrets
                  key: result-backend
          resources:
            requests:
              memory: "128Mi"
              cpu: "100m"
            limits:
              memory: "256Mi"
              cpu: "250m"
# k8s/flower-service.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: celery-flower
spec:
  replicas: 1
  selector:
    matchLabels:
      app: celery-flower
  template:
    metadata:
      labels:
        app: celery-flower
    spec:
      containers:
        - name: flower
          image: myregistry/celery-app:latest
          command: ["celery", "-A", "tasks", "flower", "--port=5555"]
          ports:
            - containerPort: 5555
          env:
            - name: CELERY_BROKER_URL
              valueFrom:
                secretKeyRef:
                  name: celery-secrets
                  key: broker-url
            - name: CELERY_RESULT_BACKEND
              valueFrom:
                secretKeyRef:
                  name: celery-secrets
                  key: result-backend
---
apiVersion: v1
kind: Service
metadata:
  name: celery-flower
spec:
  selector:
    app: celery-flower
  ports:
    - port: 5555
      targetPort: 5555
  type: ClusterIP

Pitfall Guide: 5 Common Traps

Pitfall 1: Using Global Database Connections in Tasks

Wrong:

import psycopg2

conn = psycopg2.connect("postgresql://localhost/mydb")

@app.task
def save_user(user_data):
    cursor = conn.cursor()
    cursor.execute("INSERT INTO users (%s)", (user_data,))
    conn.commit()

Correct:

@app.task
def save_user(user_data):
    from django.db import connection
    with connection.cursor() as cursor:
        cursor.execute("INSERT INTO users (data) VALUES (%s)", (user_data,))

Pitfall 2: Ignoring Task Timeouts Causing Worker to Block Forever

Wrong:

@app.task
def fetch_external_data(url):
    import requests
    response = requests.get(url)
    return response.json()

Correct:

@app.task(
    time_limit=300,
    soft_time_limit=270,
)
def fetch_external_data(url):
    import requests
    try:
        response = requests.get(url, timeout=60)
        return response.json()
    except requests.Timeout:
        raise RetryException("Request timeout")

Pitfall 3: Passing Non-Serializable Objects in Canvas

Wrong:

@app.task
def process_file(file_obj):
    return file_obj.read()

workflow = chain(process_file.s(open("data.csv")), store_result.s())

Correct:

@app.task
def process_file(file_path):
    with open(file_path, "r") as f:
        return f.read()

workflow = chain(process_file.s("/data/data.csv"), store_result.s())

Pitfall 4: Beat Periodic Tasks Without Distributed Locks Causing Duplicate Execution

Wrong:

# Multiple Beat instances running, tasks execute repeatedly
CELERYBEAT_SCHEDULE = {
    "cleanup-every-night": {
        "task": "tasks.cleanup",
        "schedule": crontab(hour=2, minute=0),
    },
}

Correct:

from celery.schedules import crontab
from celery_redbeat import RedBeatScheduler

beat_scheduler = RedBeatScheduler
redbeat_redis_url = "redis://localhost:6379/2"

CELERYBEAT_SCHEDULE = {
    "cleanup-every-night": {
        "task": "tasks.cleanup",
        "schedule": crontab(hour=2, minute=0),
        "options": {"queue": "default"},
    },
}

Pitfall 5: Worker Fork Mode Without Handling Resource Inheritance

Wrong:

import redis

redis_client = redis.Redis()

@app.task
def update_cache(key, value):
    redis_client.set(key, value)

Correct:

@app.task
def update_cache(key, value):
    import redis
    redis_client = redis.Redis()
    redis_client.set(key, value)

# Or use worker process initialization
@app.task(bind=True)
def update_cache(self, key, value):
    redis_client = self.app.pool.redis
    redis_client.set(key, value)

Error Troubleshooting Table

Error Symptom Possible Cause Diagnosis Solution
Worker starts with no log output Broker connection failure Check CELERY_BROKER_URL format and connectivity Confirm Redis/RabbitMQ is running, URL format is correct
Task stuck in PENDING No worker consuming the queue celery -A tasks inspect active_queues Start a worker for the queue or check task_routes config
kombu.exceptions.EncodeError Task args contain non-serializable objects Check delay()/apply_async() parameters Only pass JSON-serializable basic types
WorkerLostError Worker process killed by OOM Killer dmesg | grep -i oom check system logs Increase worker memory limit or reduce concurrency
TimeLimitExceeded Task exceeds time_limit Check task logic for infinite loops or blocking IO Set reasonable time_limit and soft_time_limit
Task result returns None No result_backend configured Check CELERY_RESULT_BACKEND configuration Configure Redis/database as result backend
Chord callback never executes A header task fails silently Check header task status in Flower Ensure header tasks properly raise exceptions instead of returning None
ContentDisallowed Unaccepted message serialization format accept_content mismatch with sender Unify accept_content = ["json"] configuration
Beat tasks not executing on schedule Timezone misconfiguration Check timezone and enable_utc settings Unify timezone config, recommend enable_utc=True
Worker frequently disconnects/reconnects Broker connection pool exhausted or network jitter Check broker_pool_limit and connection timeout Increase broker_pool_limit, set broker_heartbeat

Advanced Optimization

1. Task Result Deduplication and Idempotency

In production, tasks may be redelivered (network jitter, worker restart), requiring idempotency guarantees.

import hashlib
from celery import Celery
from contextlib import contextmanager

app = Celery("idempotent_app")
app.config_from_object("celery_config")

@contextmanager
def idempotent_lock(task_id, redis_client, ttl=3600):
    lock_key = f"celery:idempotent:{task_id}"
    acquired = redis_client.set(lock_key, "1", nx=True, ex=ttl)
    try:
        yield acquired is not None
    finally:
        pass

@app.task(bind=True)
def process_order_idempotent(self, order_id, amount):
    import redis
    r = redis.Redis(host="localhost", port=6379, db=3)

    task_key = f"order:processed:{order_id}"
    existing = r.get(task_key)
    if existing:
        return {"status": "duplicate", "order_id": order_id, "cached_result": existing.decode()}

    with idempotent_lock(self.request.id, r) as acquired:
        if not acquired:
            return {"status": "duplicate", "order_id": order_id}

        result = _do_process_order(order_id, amount)
        r.setex(task_key, 86400, str(result))
        return result

def _do_process_order(order_id, amount):
    return {"status": "processed", "order_id": order_id, "amount": amount}

2. Dynamic Task Priority and Rate Limiting

High-priority tasks execute first, rate limiting prevents downstream service overload.

from celery import Celery

app = Celery("priority_app")
app.config_from_object("celery_config")

app.conf.broker_transport_options = {
    "priority_steps": list(range(10)),
    "sep": ":",
    "queue_order_strategy": "priority",
}

@app.task(rate_limit="10/s")
def call_external_api(endpoint, payload):
    import time
    time.sleep(0.1)
    return {"endpoint": endpoint, "status": "ok"}

# High priority task
def submit_high_priority():
    call_external_api.apply_async(
        args=["/api/v1/critical", {"urgent": True}],
        priority=0,
        queue="io",
    )

# Low priority task
def submit_low_priority():
    call_external_api.apply_async(
        args=["/api/v1/batch", {"batch": True}],
        priority=9,
        queue="io",
    )

# Dynamic rate limiting
from celery import current_app

def adjust_rate_limit(task_name, new_rate):
    current_app.control.rate_limit(task_name, new_rate)

3. Celery Integration with FastAPI

Integrate Celery into a FastAPI application — API triggers async tasks and queries status.

# fastapi_celery.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from celery.result import AsyncResult

app = FastAPI(title="Celery API Gateway")

class TaskSubmit(BaseModel):
    task_name: str
    args: list = []
    kwargs: dict = {}

class TaskResponse(BaseModel):
    task_id: str
    status: str

@app.post("/tasks/submit", response_model=TaskResponse)
def submit_task(submit: TaskSubmit):
    from tasks import app as celery_app
    try:
        result = celery_app.send_task(
            submit.task_name,
            args=submit.args,
            kwargs=submit.kwargs,
        )
        return TaskResponse(task_id=result.id, status="PENDING")
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.get("/tasks/{task_id}", response_model=dict)
def get_task_status(task_id: str):
    result = AsyncResult(task_id)
    response = {
        "task_id": task_id,
        "status": result.status,
        "result": result.result if result.ready() else None,
    }
    if result.failed():
        response["error"] = str(result.result)
    return response

@app.post("/tasks/{task_id}/revoke")
def revoke_task(task_id: str):
    from tasks import app as celery_app
    celery_app.control.revoke(task_id, terminate=True)
    return {"task_id": task_id, "status": "REVOKED"}

Framework Comparison

Feature Celery Dramatiq Huey RQ Temporal
Language Python Python Python Python Go/Multi-language
Broker Redis/RabbitMQ/SQS etc. Redis/RabbitMQ Redis/SQLite Redis Native
Workflow Orchestration Canvas (chain/chord/group) Pipeline Not supported Not supported Native DAG
Retry Strategy Exponential backoff+Jitter+Custom Exponential backoff+Jitter Simple retry Simple retry Complete retry strategy
Monitoring Flower Built-in Dashboard Basic RQ Dashboard Web UI
Periodic Tasks Beat Built-in Built-in Not supported Native Schedule
Community Ecosystem Most mature Medium Niche Niche Fast-growing
Learning Curve Medium Low Low Low High
Multi-language Support No No No No Yes
Production Maturity High Medium Low Medium High
Use Case General distributed tasks Small-medium projects Lightweight tasks Redis ecosystem simple tasks Cross-language complex workflows

Summary

Python Celery since version 5.4 is the most mature and flexible solution in the distributed task queue space. Remember three core principles: Task idempotency is the baseline — duplicate delivery must be safe; Queue isolation is standard — CPU/IO/default queues must be separated; Monitoring and alerting are not optional — Flower+Prometheus are the eyes of production. From a single celery worker to K8s HPA auto-scaling, Celery's architecture scales linearly with your business without rewriting task code.


Try these browser-local tools — no sign-up required →

#Python#Celery#分布式任务#Redis#RabbitMQ#2026#异步任务