Python Celery Distributed Task Queue: 6 Production Patterns from Basic Tasks to Event-Driven Pipelines
Background Tasks Blocking APIs, Cron Not Scaling, Task Failures Without Retry
Users upload avatars and need 5 thumbnail sizes generated — the API times out synchronously. The 2 AM daily data cleanup cron can't finish when data doubles. Third-party payment callbacks fail occasionally with no retry mechanism — manual intervention required. In 2026, Python Celery remains the king of distributed task queues — version 5.4 brings native asyncio support and more stable Canvas workflows, from single-machine development to K8s cluster deployment, one codebase handles it all.
This article covers 6 production patterns, guiding you through basic task definition → routing queue isolation → exponential backoff retry → Canvas workflows → Flower+Prometheus monitoring → Docker/K8s production deployment with complete runnable Python code.
Celery Core Concepts
| Concept | Description |
|---|---|
| Celery | Python distributed task queue framework supporting multiple brokers and result backends |
| Broker | Message middleware that receives task messages from producers (clients) and delivers them to consumers (workers), commonly Redis/RabbitMQ |
| Worker | Consumer process that fetches tasks from the Broker and executes them, supports multi-process/coroutine/thread concurrency |
| Task | Asynchronous function managed by Celery, registered via @app.task decorator, returns AsyncResult when called |
| Canvas | Celery's workflow primitives supporting chain (sequential), group (parallel), chord (parallel + aggregate) composition |
| Chain | Sequential workflow where the output of one task becomes the input of the next |
| Chord | Parallel + aggregate workflow where a callback task aggregates results after all group tasks complete |
| Group | Parallel workflow where a set of tasks execute simultaneously without dependencies |
| Flower | Celery's real-time monitoring web panel providing task status, worker status, queue depth visualization |
| Result Backend | Task result storage backend for saving task return values and states, commonly Redis/database |
| Beat | Celery's periodic task scheduler, similar to Cron but supports dynamic configuration and distributed locks |
| Signature | Task signature that serializes task call parameters into a passable object for Canvas composition |
Problem Analysis: 5 Major Distributed Task Queue Challenges
- Task definition and invocation coupling: Business code directly calls time-consuming operations, synchronous blocking causes slow API responses, need to abstract operations as async tasks and decouple callers
- Task routing and resource isolation: CPU-intensive and IO-intensive tasks share the same queue, IO tasks blocked by CPU tasks, need queue isolation and routing strategies
- Task failure and retry storms: Tasks repeatedly fail and retry when third-party services are unavailable, no backoff strategy causes avalanches, need exponential backoff + max retries + dead letter handling
- Complex workflow orchestration: Multi-step tasks require sequential/parallel composition, manual callback chains are hard to maintain, need Canvas primitives for declarative orchestration
- Production observability: Task accumulation, worker crashes, queue blockage go undetected, need real-time monitoring + alerting + auto-scaling
Step-by-Step: 6 Celery Distributed Task Patterns
Pattern 1: Basic Task Definition and Execution
The simplest Celery task: define an async function, start a worker, invoke and get results.
# celery_config.py
broker_url = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/1"
task_serializer = "json"
result_serializer = "json"
accept_content = ["json"]
timezone = "UTC"
enable_utc = True
# tasks.py
from celery import Celery
app = Celery("myapp")
app.config_from_object("celery_config")
@app.task
def add(x, y):
return x + y
@app.task
def send_email(to, subject, body):
import time
time.sleep(2)
return {"status": "sent", "to": to, "subject": subject}
@app.task
def generate_thumbnail(image_path, sizes):
import time
time.sleep(3)
thumbnails = []
for size in sizes:
thumbnails.append(f"{image_path}_{size}x{size}.jpg")
return {"image": image_path, "thumbnails": thumbnails}
if __name__ == "__main__":
app.start()
# client.py - Invoke tasks
from tasks import add, send_email, generate_thumbnail
result = add.delay(4, 6)
print(f"Task ID: {result.id}")
print(f"Ready: {result.ready()}")
print(f"Result: {result.get(timeout=10)}")
email_result = send_email.delay("user@example.com", "Welcome", "Hello!")
print(f"Email: {email_result.get(timeout=10)}")
thumb_result = generate_thumbnail.delay("/uploads/avatar.png", [64, 128, 256])
print(f"Thumbnails: {thumb_result.get(timeout=10)}")
# Start Worker
celery -A tasks worker --loglevel=info --concurrency=4
Pattern 2: Task Routing and Queue Isolation
Route different task types to different queues, isolate resources by queue.
# celery_config_routing.py
broker_url = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/1"
task_serializer = "json"
result_serializer = "json"
task_routes = {
"tasks.compute.*": {"queue": "compute"},
"tasks.io.*": {"queue": "io"},
"tasks.default.*": {"queue": "default"},
}
task_default_queue = "default"
task_queues = {
"default": {
"exchange": "default",
"routing_key": "default",
},
"compute": {
"exchange": "compute",
"routing_key": "compute",
},
"io": {
"exchange": "io",
"routing_key": "io",
},
}
# tasks_routing.py
from celery import Celery
app = Celery("routed_app")
app.config_from_object("celery_config_routing")
@app.task(name="tasks.compute.ml_inference")
def ml_inference(model_id, input_data):
import time
time.sleep(5)
return {"model": model_id, "prediction": 0.95, "latency_ms": 5000}
@app.task(name="tasks.compute.data_pipeline")
def data_pipeline(source, target):
import time
time.sleep(8)
return {"source": source, "target": target, "rows": 100000}
@app.task(name="tasks.io.send_notification")
def send_notification(user_id, message):
import time
time.sleep(1)
return {"user_id": user_id, "status": "delivered"}
@app.task(name="tasks.io.sync_external_api")
def sync_external_api(endpoint, payload):
import time
time.sleep(2)
return {"endpoint": endpoint, "status_code": 200}
@app.task(name="tasks.default.cleanup")
def cleanup(days_old):
return {"deleted_records": days_old * 100}
# Start workers for different queues
celery -A tasks_routing worker -Q compute --loglevel=info --concurrency=2 -n compute@%h
celery -A tasks_routing worker -Q io --loglevel=info --concurrency=8 -n io@%h
celery -A tasks_routing worker -Q default --loglevel=info --concurrency=4 -n default@%h
# client_routing.py
from tasks_routing import ml_inference, send_notification, cleanup
ml_inference.delay("resnet50", {"image": "base64..."})
send_notification.delay("USR-001", "Your order has shipped")
cleanup.delay(30)
# Dynamically specify queue
from tasks_routing import sync_external_api
sync_external_api.apply_async(
args=["/api/v1/sync", {"batch": 100}],
queue="io",
priority=5,
)
Pattern 3: Retry Strategies with Exponential Backoff
Automatic retry on task failure, exponential backoff prevents avalanches, max retries as safety net.
# tasks_retry.py
from celery import Celery
from celery.utils.log import get_task_logger
import random
import time
logger = get_task_logger(__name__)
app = Celery("retry_app")
app.config_from_object("celery_config")
class ExternalAPIError(Exception):
pass
class RateLimitError(Exception):
pass
@app.task(
bind=True,
max_retries=5,
default_retry_delay=30,
autoretry_for=(ExternalAPIError,),
retry_backoff=True,
retry_backoff_max=600,
retry_jitter=True,
)
def call_payment_api(self, order_id, amount):
try:
success = random.random() > 0.7
if not success:
raise ExternalAPIError(f"Payment API timeout for order {order_id}")
return {"order_id": order_id, "amount": amount, "status": "paid"}
except ExternalAPIError as exc:
logger.warning(f"Payment failed for {order_id}, retry {self.request.retries + 1}/{self.max_retries}")
raise self.retry(exc=exc)
@app.task(
bind=True,
max_retries=3,
autoretry_for=(RateLimitError,),
retry_backoff=2,
retry_backoff_max=120,
retry_jitter=True,
)
def call_rate_limited_api(self, endpoint):
try:
if random.random() > 0.5:
raise RateLimitError(f"Rate limited on {endpoint}")
return {"endpoint": endpoint, "data": "success"}
except RateLimitError as exc:
countdown = 2 ** self.request.retries + random.uniform(0, 1)
logger.info(f"Rate limited, retrying in {countdown:.1f}s (attempt {self.request.retries + 1})")
raise self.retry(exc=exc, countdown=countdown)
@app.task(bind=True, max_retries=4)
def process_with_custom_retry(self, data):
try:
result = _do_risky_operation(data)
return result
except Exception as exc:
if self.request.retries >= self.max_retries:
logger.error(f"Max retries exceeded for {data}, sending to DLQ")
send_to_dlq.delay(str(data), str(exc), self.request.retries)
return {"status": "failed", "data": data, "error": str(exc)}
countdown = min(60, 5 * (2 ** self.request.retries))
logger.warning(f"Processing failed, retry in {countdown}s")
raise self.retry(exc=exc, countdown=countdown)
def _do_risky_operation(data):
if random.random() > 0.4:
raise ValueError(f"Random failure processing {data}")
return {"processed": data, "result": "ok"}
@app.task
def send_to_dlq(task_data, error, retry_count):
logger.error(f"DLQ: task={task_data}, error={error}, retries={retry_count}")
return {"dlq": True, "task": task_data, "error": error}
# client_retry.py
from tasks_retry import call_payment_api, call_rate_limited_api, process_with_custom_retry
result = call_payment_api.delay("ORD-001", 99.99)
print(f"Payment task: {result.id}")
result2 = call_rate_limited_api.delay("/api/v2/users")
print(f"Rate limited task: {result2.id}")
result3 = process_with_custom_retry.delay({"key": "value"})
print(f"Custom retry task: {result3.id}")
Pattern 4: Canvas Workflow (Chain, Chord, Group)
Declarative orchestration of complex workflows: sequential chains, parallel groups, parallel + aggregate.
# tasks_canvas.py
from celery import Celery, chain, group, chord
from celery import signature
app = Celery("canvas_app")
app.config_from_object("celery_config")
@app.task
def download_url(url):
import time
time.sleep(1)
return {"url": url, "content": f"content_of_{url.split('/')[-1]}"}
@app.task
def parse_content(download_result):
content = download_result["content"]
return {"parsed": True, "items": len(content), "source": download_result["url"]}
@app.task
def store_results(parse_result):
return {"stored": True, "items": parse_result["items"], "source": parse_result["source"]}
@app.task
def aggregate_results(results):
total_items = sum(r["items"] for r in results)
return {"total_items": total_items, "sources": len(results), "aggregated": True}
@app.task
def send_report(aggregate_result):
return {
"report_sent": True,
"total_items": aggregate_result["total_items"],
"sources": aggregate_result["sources"],
}
# Chain: Sequential workflow
def run_chain():
workflow = chain(
download_url.s("https://api.example.com/data"),
parse_content.s(),
store_results.s(),
)
result = workflow.apply_async()
return result.id
# Group: Parallel workflow
def run_group():
urls = [
"https://api.example.com/users",
"https://api.example.com/orders",
"https://api.example.com/products",
]
workflow = group(download_url.s(url) for url in urls)
result = workflow.apply_async()
return result.id
# Chord: Parallel execution + aggregation
def run_chord():
urls = [
"https://api.example.com/users",
"https://api.example.com/orders",
"https://api.example.com/products",
]
workflow = chord(
[chain(download_url.s(url), parse_content.s()) for url in urls],
aggregate_results.s(),
)
result = workflow.apply_async()
return result.id
# Complex nested workflow
def run_complex_workflow():
header = group(
chain(download_url.s(f"https://api.example.com/batch/{i}"), parse_content.s())
for i in range(1, 4)
)
workflow = chain(
header,
aggregate_results.s(),
send_report.s(),
)
result = workflow.apply_async()
return result.id
# client_canvas.py
from tasks_canvas import run_chain, run_group, run_chord, run_complex_workflow
from tasks_canvas import download_url, parse_content, store_results, aggregate_results
from celery import chain, group, chord
# Method 1: Using wrapper functions
chain_id = run_chain()
print(f"Chain task: {chain_id}")
group_id = run_group()
print(f"Group task: {group_id}")
chord_id = run_chord()
print(f"Chord task: {chord_id}")
# Method 2: Using Canvas primitives directly
from tasks_canvas import app
result = chain(
download_url.s("https://api.example.com/report"),
parse_content.s(),
store_results.s(),
).apply_async()
print(f"Direct chain: {result.id}")
# Using signatures for deferred construction
sig = download_url.s("https://api.example.com/dynamic")
result = chain(sig, parse_content.s(), store_results.s()).apply_async()
print(f"Signature chain: {result.id}")
Pattern 5: Monitoring with Flower and Prometheus
Real-time monitoring of task status, worker health, integrated with Prometheus alerting.
# celery_config_monitoring.py
broker_url = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/1"
task_serializer = "json"
result_serializer = "json"
task_send_sent_event = True
task_track_started = True
worker_send_task_events = True
task_routes = {
"monitoring_tasks.*": {"queue": "monitoring"},
}
flower_port = 5555
flower_basic_auth = ["admin:secret123"]
flower_persistent = True
flower_db = "flower_db"
flower_max_tasks = 10000
# monitoring_tasks.py
from celery import Celery
from celery.utils.log import get_task_logger
import time
import random
logger = get_task_logger(__name__)
app = Celery("monitoring_app")
app.config_from_object("celery_config_monitoring")
@app.task(name="monitoring_tasks.process_order", bind=True, track_started=True)
def process_order(self, order_id):
self.update_state(state="PROCESSING", meta={"step": "validating", "order_id": order_id})
time.sleep(1)
self.update_state(state="PROCESSING", meta={"step": "charging", "order_id": order_id})
time.sleep(2)
self.update_state(state="PROCESSING", meta={"step": "fulfilling", "order_id": order_id})
time.sleep(1)
return {"order_id": order_id, "status": "completed", "amount": random.randint(100, 9999)}
@app.task(name="monitoring_tasks.generate_report", bind=True, track_started=True, time_limit=300)
def generate_report(self, report_type, date_range):
self.update_state(state="GENERATING", meta={"progress": 0, "type": report_type})
for i in range(1, 6):
time.sleep(1)
self.update_state(state="GENERATING", meta={"progress": i * 20, "type": report_type})
return {"report_type": report_type, "date_range": date_range, "rows": random.randint(1000, 50000)}
# prometheus_exporter.py - Custom Prometheus metrics
from prometheus_client import Counter, Histogram, Gauge, start_http_server
TASK_STARTED = Counter(
"celery_task_started_total",
"Total number of Celery tasks started",
["task_name", "queue"],
)
TASK_COMPLETED = Counter(
"celery_task_completed_total",
"Total number of Celery tasks completed",
["task_name", "queue", "status"],
)
TASK_DURATION = Histogram(
"celery_task_duration_seconds",
"Celery task duration in seconds",
["task_name", "queue"],
buckets=[0.1, 0.5, 1, 2, 5, 10, 30, 60, 120, 300],
)
QUEUE_DEPTH = Gauge(
"celery_queue_depth",
"Number of tasks waiting in queue",
["queue_name"],
)
ACTIVE_WORKERS = Gauge(
"celery_active_workers",
"Number of active Celery workers",
)
def start_metrics_server(port=9090):
start_http_server(port)
print(f"Prometheus metrics server started on :{port}")
# celery_events_monitor.py - Event listener
from celery import Celery
from prometheus_exporter import TASK_STARTED, TASK_COMPLETED, TASK_DURATION
import time
app = Celery("monitoring_app")
app.config_from_object("celery_config_monitoring")
def monitor_events():
with app.connection() as connection:
recv = app.events.Receiver(
connection,
handlers={
"task-sent": on_task_sent,
"task-started": on_task_started,
"task-succeeded": on_task_succeeded,
"task-failed": on_task_failed,
"task-retried": on_task_retried,
"worker-heartbeat": on_worker_heartbeat,
},
)
recv.capture(limit=None, timeout=None, wakeup=True)
def on_task_sent(event):
pass
def on_task_started(event):
task_name = event.get("name", "unknown")
queue = event.get("queue", "unknown")
TASK_STARTED.labels(task_name=task_name, queue=queue).inc()
def on_task_succeeded(event):
task_name = event.get("name", "unknown")
queue = event.get("queue", "unknown")
runtime = event.get("runtime", 0)
TASK_COMPLETED.labels(task_name=task_name, queue=queue, status="success").inc()
TASK_DURATION.labels(task_name=task_name, queue=queue).observe(runtime)
def on_task_failed(event):
task_name = event.get("name", "unknown")
queue = event.get("queue", "unknown")
TASK_COMPLETED.labels(task_name=task_name, queue=queue, status="failed").inc()
def on_task_retried(event):
task_name = event.get("name", "unknown")
TASK_COMPLETED.labels(task_name=task_name, queue="unknown", status="retried").inc()
def on_worker_heartbeat(event):
pass
if __name__ == "__main__":
from prometheus_exporter import start_metrics_server
start_metrics_server(9090)
monitor_events()
# Start Flower monitoring
celery -A monitoring_tasks flower --port=5555 --basic-auth=admin:secret123
# Start event listener + Prometheus
python celery_events_monitor.py
Pattern 6: Production Celery Service with Docker and Kubernetes
Docker multi-stage builds, K8s Deployment, HPA auto-scaling.
# Dockerfile
FROM python:3.12-slim AS builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --prefix=/install -r requirements.txt
FROM python:3.12-slim
WORKDIR /app
COPY --from=builder /install /usr/local
COPY . .
ENV PYTHONUNBUFFERED=1
ENV CELERY_BROKER_URL=redis://redis:6379/0
ENV CELERY_RESULT_BACKEND=redis://redis:6379/1
CMD ["celery", "-A", "tasks", "worker", "--loglevel=info"]
# docker-compose.yml
version: "3.8"
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
command: redis-server --appendonly yes --maxmemory 512mb --maxmemory-policy allkeys-lru
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5
worker-default:
build: .
command: celery -A tasks worker -Q default --loglevel=info --concurrency=4 -n default@%h
depends_on:
redis:
condition: service_healthy
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
deploy:
replicas: 2
resources:
limits:
memory: 512M
cpus: "1.0"
reservations:
memory: 256M
cpus: "0.5"
restart: unless-stopped
worker-compute:
build: .
command: celery -A tasks worker -Q compute --loglevel=info --concurrency=2 -n compute@%h
depends_on:
redis:
condition: service_healthy
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
deploy:
replicas: 1
resources:
limits:
memory: 2G
cpus: "2.0"
reservations:
memory: 1G
cpus: "1.0"
restart: unless-stopped
worker-io:
build: .
command: celery -A tasks worker -Q io --loglevel=info --concurrency=16 -n io@%h
depends_on:
redis:
condition: service_healthy
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
deploy:
replicas: 3
resources:
limits:
memory: 512M
cpus: "1.0"
restart: unless-stopped
flower:
build: .
command: celery -A tasks flower --port=5555 --basic-auth=admin:secret123
depends_on:
redis:
condition: service_healthy
ports:
- "5555:5555"
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
restart: unless-stopped
beat:
build: .
command: celery -A tasks beat --loglevel=info
depends_on:
redis:
condition: service_healthy
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
restart: unless-stopped
volumes:
redis_data:
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: celery-worker-default
labels:
app: celery-worker
queue: default
spec:
replicas: 3
selector:
matchLabels:
app: celery-worker
queue: default
template:
metadata:
labels:
app: celery-worker
queue: default
spec:
containers:
- name: worker
image: myregistry/celery-app:latest
command: ["celery", "-A", "tasks", "worker", "-Q", "default", "--loglevel=info", "--concurrency=4"]
env:
- name: CELERY_BROKER_URL
valueFrom:
secretKeyRef:
name: celery-secrets
key: broker-url
- name: CELERY_RESULT_BACKEND
valueFrom:
secretKeyRef:
name: celery-secrets
key: result-backend
resources:
requests:
memory: "256Mi"
cpu: "500m"
limits:
memory: "512Mi"
cpu: "1000m"
livenessProbe:
exec:
command: ["celery", "-A", "tasks", "inspect", "ping"]
initialDelaySeconds: 30
periodSeconds: 60
timeoutSeconds: 10
readinessProbe:
exec:
command: ["celery", "-A", "tasks", "inspect", "ping"]
initialDelaySeconds: 10
periodSeconds: 30
timeoutSeconds: 10
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: celery-worker-default-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: celery-worker-default
minReplicas: 2
maxReplicas: 20
metrics:
- type: External
external:
metric:
name: celery_queue_depth
selector:
matchLabels:
queue_name: default
target:
type: AverageValue
averageValue: "100"
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: celery-beat
labels:
app: celery-beat
spec:
replicas: 1
selector:
matchLabels:
app: celery-beat
template:
metadata:
labels:
app: celery-beat
spec:
containers:
- name: beat
image: myregistry/celery-app:latest
command: ["celery", "-A", "tasks", "beat", "--loglevel=info"]
env:
- name: CELERY_BROKER_URL
valueFrom:
secretKeyRef:
name: celery-secrets
key: broker-url
- name: CELERY_RESULT_BACKEND
valueFrom:
secretKeyRef:
name: celery-secrets
key: result-backend
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "256Mi"
cpu: "250m"
# k8s/flower-service.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: celery-flower
spec:
replicas: 1
selector:
matchLabels:
app: celery-flower
template:
metadata:
labels:
app: celery-flower
spec:
containers:
- name: flower
image: myregistry/celery-app:latest
command: ["celery", "-A", "tasks", "flower", "--port=5555"]
ports:
- containerPort: 5555
env:
- name: CELERY_BROKER_URL
valueFrom:
secretKeyRef:
name: celery-secrets
key: broker-url
- name: CELERY_RESULT_BACKEND
valueFrom:
secretKeyRef:
name: celery-secrets
key: result-backend
---
apiVersion: v1
kind: Service
metadata:
name: celery-flower
spec:
selector:
app: celery-flower
ports:
- port: 5555
targetPort: 5555
type: ClusterIP
Pitfall Guide: 5 Common Traps
Pitfall 1: Using Global Database Connections in Tasks
❌ Wrong:
import psycopg2
conn = psycopg2.connect("postgresql://localhost/mydb")
@app.task
def save_user(user_data):
cursor = conn.cursor()
cursor.execute("INSERT INTO users (%s)", (user_data,))
conn.commit()
✅ Correct:
@app.task
def save_user(user_data):
from django.db import connection
with connection.cursor() as cursor:
cursor.execute("INSERT INTO users (data) VALUES (%s)", (user_data,))
Pitfall 2: Ignoring Task Timeouts Causing Worker to Block Forever
❌ Wrong:
@app.task
def fetch_external_data(url):
import requests
response = requests.get(url)
return response.json()
✅ Correct:
@app.task(
time_limit=300,
soft_time_limit=270,
)
def fetch_external_data(url):
import requests
try:
response = requests.get(url, timeout=60)
return response.json()
except requests.Timeout:
raise RetryException("Request timeout")
Pitfall 3: Passing Non-Serializable Objects in Canvas
❌ Wrong:
@app.task
def process_file(file_obj):
return file_obj.read()
workflow = chain(process_file.s(open("data.csv")), store_result.s())
✅ Correct:
@app.task
def process_file(file_path):
with open(file_path, "r") as f:
return f.read()
workflow = chain(process_file.s("/data/data.csv"), store_result.s())
Pitfall 4: Beat Periodic Tasks Without Distributed Locks Causing Duplicate Execution
❌ Wrong:
# Multiple Beat instances running, tasks execute repeatedly
CELERYBEAT_SCHEDULE = {
"cleanup-every-night": {
"task": "tasks.cleanup",
"schedule": crontab(hour=2, minute=0),
},
}
✅ Correct:
from celery.schedules import crontab
from celery_redbeat import RedBeatScheduler
beat_scheduler = RedBeatScheduler
redbeat_redis_url = "redis://localhost:6379/2"
CELERYBEAT_SCHEDULE = {
"cleanup-every-night": {
"task": "tasks.cleanup",
"schedule": crontab(hour=2, minute=0),
"options": {"queue": "default"},
},
}
Pitfall 5: Worker Fork Mode Without Handling Resource Inheritance
❌ Wrong:
import redis
redis_client = redis.Redis()
@app.task
def update_cache(key, value):
redis_client.set(key, value)
✅ Correct:
@app.task
def update_cache(key, value):
import redis
redis_client = redis.Redis()
redis_client.set(key, value)
# Or use worker process initialization
@app.task(bind=True)
def update_cache(self, key, value):
redis_client = self.app.pool.redis
redis_client.set(key, value)
Error Troubleshooting Table
| Error Symptom | Possible Cause | Diagnosis | Solution |
|---|---|---|---|
| Worker starts with no log output | Broker connection failure | Check CELERY_BROKER_URL format and connectivity |
Confirm Redis/RabbitMQ is running, URL format is correct |
| Task stuck in PENDING | No worker consuming the queue | celery -A tasks inspect active_queues |
Start a worker for the queue or check task_routes config |
kombu.exceptions.EncodeError |
Task args contain non-serializable objects | Check delay()/apply_async() parameters |
Only pass JSON-serializable basic types |
WorkerLostError |
Worker process killed by OOM Killer | dmesg | grep -i oom check system logs |
Increase worker memory limit or reduce concurrency |
TimeLimitExceeded |
Task exceeds time_limit | Check task logic for infinite loops or blocking IO | Set reasonable time_limit and soft_time_limit |
Task result returns None |
No result_backend configured | Check CELERY_RESULT_BACKEND configuration |
Configure Redis/database as result backend |
| Chord callback never executes | A header task fails silently | Check header task status in Flower | Ensure header tasks properly raise exceptions instead of returning None |
ContentDisallowed |
Unaccepted message serialization format | accept_content mismatch with sender | Unify accept_content = ["json"] configuration |
| Beat tasks not executing on schedule | Timezone misconfiguration | Check timezone and enable_utc settings |
Unify timezone config, recommend enable_utc=True |
| Worker frequently disconnects/reconnects | Broker connection pool exhausted or network jitter | Check broker_pool_limit and connection timeout |
Increase broker_pool_limit, set broker_heartbeat |
Advanced Optimization
1. Task Result Deduplication and Idempotency
In production, tasks may be redelivered (network jitter, worker restart), requiring idempotency guarantees.
import hashlib
from celery import Celery
from contextlib import contextmanager
app = Celery("idempotent_app")
app.config_from_object("celery_config")
@contextmanager
def idempotent_lock(task_id, redis_client, ttl=3600):
lock_key = f"celery:idempotent:{task_id}"
acquired = redis_client.set(lock_key, "1", nx=True, ex=ttl)
try:
yield acquired is not None
finally:
pass
@app.task(bind=True)
def process_order_idempotent(self, order_id, amount):
import redis
r = redis.Redis(host="localhost", port=6379, db=3)
task_key = f"order:processed:{order_id}"
existing = r.get(task_key)
if existing:
return {"status": "duplicate", "order_id": order_id, "cached_result": existing.decode()}
with idempotent_lock(self.request.id, r) as acquired:
if not acquired:
return {"status": "duplicate", "order_id": order_id}
result = _do_process_order(order_id, amount)
r.setex(task_key, 86400, str(result))
return result
def _do_process_order(order_id, amount):
return {"status": "processed", "order_id": order_id, "amount": amount}
2. Dynamic Task Priority and Rate Limiting
High-priority tasks execute first, rate limiting prevents downstream service overload.
from celery import Celery
app = Celery("priority_app")
app.config_from_object("celery_config")
app.conf.broker_transport_options = {
"priority_steps": list(range(10)),
"sep": ":",
"queue_order_strategy": "priority",
}
@app.task(rate_limit="10/s")
def call_external_api(endpoint, payload):
import time
time.sleep(0.1)
return {"endpoint": endpoint, "status": "ok"}
# High priority task
def submit_high_priority():
call_external_api.apply_async(
args=["/api/v1/critical", {"urgent": True}],
priority=0,
queue="io",
)
# Low priority task
def submit_low_priority():
call_external_api.apply_async(
args=["/api/v1/batch", {"batch": True}],
priority=9,
queue="io",
)
# Dynamic rate limiting
from celery import current_app
def adjust_rate_limit(task_name, new_rate):
current_app.control.rate_limit(task_name, new_rate)
3. Celery Integration with FastAPI
Integrate Celery into a FastAPI application — API triggers async tasks and queries status.
# fastapi_celery.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from celery.result import AsyncResult
app = FastAPI(title="Celery API Gateway")
class TaskSubmit(BaseModel):
task_name: str
args: list = []
kwargs: dict = {}
class TaskResponse(BaseModel):
task_id: str
status: str
@app.post("/tasks/submit", response_model=TaskResponse)
def submit_task(submit: TaskSubmit):
from tasks import app as celery_app
try:
result = celery_app.send_task(
submit.task_name,
args=submit.args,
kwargs=submit.kwargs,
)
return TaskResponse(task_id=result.id, status="PENDING")
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/tasks/{task_id}", response_model=dict)
def get_task_status(task_id: str):
result = AsyncResult(task_id)
response = {
"task_id": task_id,
"status": result.status,
"result": result.result if result.ready() else None,
}
if result.failed():
response["error"] = str(result.result)
return response
@app.post("/tasks/{task_id}/revoke")
def revoke_task(task_id: str):
from tasks import app as celery_app
celery_app.control.revoke(task_id, terminate=True)
return {"task_id": task_id, "status": "REVOKED"}
Framework Comparison
| Feature | Celery | Dramatiq | Huey | RQ | Temporal |
|---|---|---|---|---|---|
| Language | Python | Python | Python | Python | Go/Multi-language |
| Broker | Redis/RabbitMQ/SQS etc. | Redis/RabbitMQ | Redis/SQLite | Redis | Native |
| Workflow Orchestration | Canvas (chain/chord/group) | Pipeline | Not supported | Not supported | Native DAG |
| Retry Strategy | Exponential backoff+Jitter+Custom | Exponential backoff+Jitter | Simple retry | Simple retry | Complete retry strategy |
| Monitoring | Flower | Built-in Dashboard | Basic | RQ Dashboard | Web UI |
| Periodic Tasks | Beat | Built-in | Built-in | Not supported | Native Schedule |
| Community Ecosystem | Most mature | Medium | Niche | Niche | Fast-growing |
| Learning Curve | Medium | Low | Low | Low | High |
| Multi-language Support | No | No | No | No | Yes |
| Production Maturity | High | Medium | Low | Medium | High |
| Use Case | General distributed tasks | Small-medium projects | Lightweight tasks | Redis ecosystem simple tasks | Cross-language complex workflows |
Summary
Python Celery since version 5.4 is the most mature and flexible solution in the distributed task queue space. Remember three core principles: Task idempotency is the baseline — duplicate delivery must be safe; Queue isolation is standard — CPU/IO/default queues must be separated; Monitoring and alerting are not optional — Flower+Prometheus are the eyes of production. From a single
celery workerto K8s HPA auto-scaling, Celery's architecture scales linearly with your business without rewriting task code.
Recommended Tools
- JSON Formatter — Format JSON when debugging Celery task parameters and results
- Base64 Encode — Handle binary data encoding in tasks
- Hash Calculator — Generate idempotency keys for task deduplication
Try these browser-local tools — no sign-up required →