Python Celery分散式任務佇列實戰:從基礎任務到事件驅動管道的6種生產模式
後台任務阻塞API、Cron無法擴展、任務失敗無人重試
使用者上傳頭像要產生5種尺寸縮圖,API同步處理直接超時;每天凌晨2點的資料清洗Cron,資料量翻倍後跑不完;第三方支付回撥偶發失敗,沒有重試機制只能人工補單。2026年,Python Celery依然是分散式任務佇列的王者——5.4版本帶來原生asyncio支援和更穩定的Canvas工作流,從單機開發到K8s叢集部署,一套程式碼全搞定。
本文將從6種生產模式出發,帶你完成基礎任務定義→路由佇列隔離→指數退避重試→Canvas工作流→Flower+Prometheus監控→Docker/K8s生產部署的全鏈路實戰,每一步都有完整可執行的Python程式碼。
Celery核心概念
| 概念 | 說明 |
|---|---|
| Celery | Python分散式任務佇列框架,支援多種Broker和結果後端 |
| Broker | 訊息中介軟體,負責接收生產者(客戶端)傳送的任務訊息並投遞給消費者(Worker),常用Redis/RabbitMQ |
| Worker | 消費者程序,從Broker取得任務並執行,支援多程序/協程/執行緒並行模式 |
| Task | 被Celery管理的非同步函式,透過@app.task裝飾器註冊,呼叫時回傳AsyncResult |
| Canvas | Celery的工作流原語,支援chain(鏈式)、group(並行)、chord(並行+彙總)等組合模式 |
| Chain | 鏈式工作流,前一個任務的輸出作為後一個任務的輸入,串行執行 |
| Chord | 並行+彙總工作流,group中所有任務完成後觸發callback任務彙總結果 |
| Group | 並行工作流,一組任務同時執行,互不依賴 |
| Flower | Celery的即時監控Web面板,提供任務狀態、Worker狀態、佇列深度等視覺化 |
| Result Backend | 任務結果儲存後端,用於儲存任務回傳值和狀態,常用Redis/資料庫 |
| Beat | Celery定時任務排程器,類似Cron但支援動態設定和分散式鎖 |
| Signature | 任務簽章,將任務呼叫引數序列化為可傳遞的物件,用於Canvas組合 |
問題分析:分散式任務佇列的5大挑戰
- 任務定義與呼叫耦合:業務程式碼直接呼叫耗時操作,同步阻塞導致API回應慢,需要將耗時操作抽象為非同步任務並解耦呼叫方
- 任務路由與資源隔離:CPU密集型任務和IO密集型任務混在同一個佇列,IO任務被CPU任務阻塞,需要佇列隔離和路由策略
- 任務失敗與重試風暴:第三方服務不可用時任務反覆失敗重試,無退避策略導致雪崩,需要指數退避+最大重試次數+死信處理
- 複雜工作流編排:多步驟任務需要串行/並行組合,手動回撥鏈難以維護,需要Canvas原語宣告式編排
- 生產環境可觀測性:任務堆積、Worker當機、佇列阻塞無法及時發現,需要即時監控+告警+自動擴縮容
分步實操:6種Celery分散式任務模式
模式1:基礎任務定義與執行
最簡單的Celery任務:定義非同步函式、啟動Worker、呼叫並取得結果。
# celery_config.py
broker_url = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/1"
task_serializer = "json"
result_serializer = "json"
accept_content = ["json"]
timezone = "Asia/Taipei"
enable_utc = True
# tasks.py
from celery import Celery
app = Celery("myapp")
app.config_from_object("celery_config")
@app.task
def add(x, y):
return x + y
@app.task
def send_email(to, subject, body):
import time
time.sleep(2)
return {"status": "sent", "to": to, "subject": subject}
@app.task
def generate_thumbnail(image_path, sizes):
import time
time.sleep(3)
thumbnails = []
for size in sizes:
thumbnails.append(f"{image_path}_{size}x{size}.jpg")
return {"image": image_path, "thumbnails": thumbnails}
if __name__ == "__main__":
app.start()
# client.py - 呼叫任務
from tasks import add, send_email, generate_thumbnail
result = add.delay(4, 6)
print(f"Task ID: {result.id}")
print(f"Ready: {result.ready()}")
print(f"Result: {result.get(timeout=10)}")
email_result = send_email.delay("user@example.com", "Welcome", "Hello!")
print(f"Email: {email_result.get(timeout=10)}")
thumb_result = generate_thumbnail.delay("/uploads/avatar.png", [64, 128, 256])
print(f"Thumbnails: {thumb_result.get(timeout=10)}")
# 啟動Worker
celery -A tasks worker --loglevel=info --concurrency=4
模式2:任務路由與佇列隔離
不同類型任務分發到不同佇列,Worker按佇列隔離資源。
# celery_config_routing.py
broker_url = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/1"
task_serializer = "json"
result_serializer = "json"
task_routes = {
"tasks.compute.*": {"queue": "compute"},
"tasks.io.*": {"queue": "io"},
"tasks.default.*": {"queue": "default"},
}
task_default_queue = "default"
task_queues = {
"default": {
"exchange": "default",
"routing_key": "default",
},
"compute": {
"exchange": "compute",
"routing_key": "compute",
},
"io": {
"exchange": "io",
"routing_key": "io",
},
}
# tasks_routing.py
from celery import Celery
app = Celery("routed_app")
app.config_from_object("celery_config_routing")
@app.task(name="tasks.compute.ml_inference")
def ml_inference(model_id, input_data):
import time
time.sleep(5)
return {"model": model_id, "prediction": 0.95, "latency_ms": 5000}
@app.task(name="tasks.compute.data_pipeline")
def data_pipeline(source, target):
import time
time.sleep(8)
return {"source": source, "target": target, "rows": 100000}
@app.task(name="tasks.io.send_notification")
def send_notification(user_id, message):
import time
time.sleep(1)
return {"user_id": user_id, "status": "delivered"}
@app.task(name="tasks.io.sync_external_api")
def sync_external_api(endpoint, payload):
import time
time.sleep(2)
return {"endpoint": endpoint, "status_code": 200}
@app.task(name="tasks.default.cleanup")
def cleanup(days_old):
return {"deleted_records": days_old * 100}
# 啟動不同佇列的Worker
celery -A tasks_routing worker -Q compute --loglevel=info --concurrency=2 -n compute@%h
celery -A tasks_routing worker -Q io --loglevel=info --concurrency=8 -n io@%h
celery -A tasks_routing worker -Q default --loglevel=info --concurrency=4 -n default@%h
# client_routing.py
from tasks_routing import ml_inference, send_notification, cleanup
ml_inference.delay("resnet50", {"image": "base64..."})
send_notification.delay("USR-001", "Your order has shipped")
cleanup.delay(30)
# 動態指定佇列
from tasks_routing import sync_external_api
sync_external_api.apply_async(
args=["/api/v1/sync", {"batch": 100}],
queue="io",
priority=5,
)
模式3:指數退避重試策略
任務失敗時自動重試,指數退避避免雪崩,最大重試次數兜底。
# tasks_retry.py
from celery import Celery
from celery.utils.log import get_task_logger
import random
import time
logger = get_task_logger(__name__)
app = Celery("retry_app")
app.config_from_object("celery_config")
class ExternalAPIError(Exception):
pass
class RateLimitError(Exception):
pass
@app.task(
bind=True,
max_retries=5,
default_retry_delay=30,
autoretry_for=(ExternalAPIError,),
retry_backoff=True,
retry_backoff_max=600,
retry_jitter=True,
)
def call_payment_api(self, order_id, amount):
try:
success = random.random() > 0.7
if not success:
raise ExternalAPIError(f"Payment API timeout for order {order_id}")
return {"order_id": order_id, "amount": amount, "status": "paid"}
except ExternalAPIError as exc:
logger.warning(f"Payment failed for {order_id}, retry {self.request.retries + 1}/{self.max_retries}")
raise self.retry(exc=exc)
@app.task(
bind=True,
max_retries=3,
autoretry_for=(RateLimitError,),
retry_backoff=2,
retry_backoff_max=120,
retry_jitter=True,
)
def call_rate_limited_api(self, endpoint):
try:
if random.random() > 0.5:
raise RateLimitError(f"Rate limited on {endpoint}")
return {"endpoint": endpoint, "data": "success"}
except RateLimitError as exc:
countdown = 2 ** self.request.retries + random.uniform(0, 1)
logger.info(f"Rate limited, retrying in {countdown:.1f}s (attempt {self.request.retries + 1})")
raise self.retry(exc=exc, countdown=countdown)
@app.task(bind=True, max_retries=4)
def process_with_custom_retry(self, data):
try:
result = _do_risky_operation(data)
return result
except Exception as exc:
if self.request.retries >= self.max_retries:
logger.error(f"Max retries exceeded for {data}, sending to DLQ")
send_to_dlq.delay(str(data), str(exc), self.request.retries)
return {"status": "failed", "data": data, "error": str(exc)}
countdown = min(60, 5 * (2 ** self.request.retries))
logger.warning(f"Processing failed, retry in {countdown}s")
raise self.retry(exc=exc, countdown=countdown)
def _do_risky_operation(data):
if random.random() > 0.4:
raise ValueError(f"Random failure processing {data}")
return {"processed": data, "result": "ok"}
@app.task
def send_to_dlq(task_data, error, retry_count):
logger.error(f"DLQ: task={task_data}, error={error}, retries={retry_count}")
return {"dlq": True, "task": task_data, "error": error}
# client_retry.py
from tasks_retry import call_payment_api, call_rate_limited_api, process_with_custom_retry
result = call_payment_api.delay("ORD-001", 99.99)
print(f"Payment task: {result.id}")
result2 = call_rate_limited_api.delay("/api/v2/users")
print(f"Rate limited task: {result2.id}")
result3 = process_with_custom_retry.delay({"key": "value"})
print(f"Custom retry task: {result3.id}")
模式4:Canvas工作流(Chain、Chord、Group)
宣告式編排複雜工作流:串行鏈、並行組、並行+彙總。
# tasks_canvas.py
from celery import Celery, chain, group, chord
from celery import signature
app = Celery("canvas_app")
app.config_from_object("celery_config")
@app.task
def download_url(url):
import time
time.sleep(1)
return {"url": url, "content": f"content_of_{url.split('/')[-1]}"}
@app.task
def parse_content(download_result):
content = download_result["content"]
return {"parsed": True, "items": len(content), "source": download_result["url"]}
@app.task
def store_results(parse_result):
return {"stored": True, "items": parse_result["items"], "source": parse_result["source"]}
@app.task
def aggregate_results(results):
total_items = sum(r["items"] for r in results)
return {"total_items": total_items, "sources": len(results), "aggregated": True}
@app.task
def send_report(aggregate_result):
return {
"report_sent": True,
"total_items": aggregate_result["total_items"],
"sources": aggregate_result["sources"],
}
# Chain: 串行工作流
def run_chain():
workflow = chain(
download_url.s("https://api.example.com/data"),
parse_content.s(),
store_results.s(),
)
result = workflow.apply_async()
return result.id
# Group: 並行工作流
def run_group():
urls = [
"https://api.example.com/users",
"https://api.example.com/orders",
"https://api.example.com/products",
]
workflow = group(download_url.s(url) for url in urls)
result = workflow.apply_async()
return result.id
# Chord: 並行執行 + 彙總
def run_chord():
urls = [
"https://api.example.com/users",
"https://api.example.com/orders",
"https://api.example.com/products",
]
workflow = chord(
[chain(download_url.s(url), parse_content.s()) for url in urls],
aggregate_results.s(),
)
result = workflow.apply_async()
return result.id
# 複雜巢狀工作流
def run_complex_workflow():
header = group(
chain(download_url.s(f"https://api.example.com/batch/{i}"), parse_content.s())
for i in range(1, 4)
)
workflow = chain(
header,
aggregate_results.s(),
send_report.s(),
)
result = workflow.apply_async()
return result.id
# client_canvas.py
from tasks_canvas import run_chain, run_group, run_chord, run_complex_workflow
from tasks_canvas import download_url, parse_content, store_results, aggregate_results
from celery import chain, group, chord
# 方式1:使用封裝函式
chain_id = run_chain()
print(f"Chain task: {chain_id}")
group_id = run_group()
print(f"Group task: {group_id}")
chord_id = run_chord()
print(f"Chord task: {chord_id}")
# 方式2:直接使用Canvas原語
from tasks_canvas import app
result = chain(
download_url.s("https://api.example.com/report"),
parse_content.s(),
store_results.s(),
).apply_async()
print(f"Direct chain: {result.id}")
# 使用signature延遲建構
sig = download_url.s("https://api.example.com/dynamic")
result = chain(sig, parse_content.s(), store_results.s()).apply_async()
print(f"Signature chain: {result.id}")
模式5:Flower監控與Prometheus整合
即時監控任務狀態、Worker健康度,整合Prometheus告警。
# celery_config_monitoring.py
broker_url = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/1"
task_serializer = "json"
result_serializer = "json"
task_send_sent_event = True
task_track_started = True
worker_send_task_events = True
task_routes = {
"monitoring_tasks.*": {"queue": "monitoring"},
}
flower_port = 5555
flower_basic_auth = ["admin:secret123"]
flower_persistent = True
flower_db = "flower_db"
flower_max_tasks = 10000
# monitoring_tasks.py
from celery import Celery
from celery.utils.log import get_task_logger
import time
import random
logger = get_task_logger(__name__)
app = Celery("monitoring_app")
app.config_from_object("celery_config_monitoring")
@app.task(name="monitoring_tasks.process_order", bind=True, track_started=True)
def process_order(self, order_id):
self.update_state(state="PROCESSING", meta={"step": "validating", "order_id": order_id})
time.sleep(1)
self.update_state(state="PROCESSING", meta={"step": "charging", "order_id": order_id})
time.sleep(2)
self.update_state(state="PROCESSING", meta={"step": "fulfilling", "order_id": order_id})
time.sleep(1)
return {"order_id": order_id, "status": "completed", "amount": random.randint(100, 9999)}
@app.task(name="monitoring_tasks.generate_report", bind=True, track_started=True, time_limit=300)
def generate_report(self, report_type, date_range):
self.update_state(state="GENERATING", meta={"progress": 0, "type": report_type})
for i in range(1, 6):
time.sleep(1)
self.update_state(state="GENERATING", meta={"progress": i * 20, "type": report_type})
return {"report_type": report_type, "date_range": date_range, "rows": random.randint(1000, 50000)}
# prometheus_exporter.py - 自訂Prometheus指標
from prometheus_client import Counter, Histogram, Gauge, start_http_server
TASK_STARTED = Counter(
"celery_task_started_total",
"Total number of Celery tasks started",
["task_name", "queue"],
)
TASK_COMPLETED = Counter(
"celery_task_completed_total",
"Total number of Celery tasks completed",
["task_name", "queue", "status"],
)
TASK_DURATION = Histogram(
"celery_task_duration_seconds",
"Celery task duration in seconds",
["task_name", "queue"],
buckets=[0.1, 0.5, 1, 2, 5, 10, 30, 60, 120, 300],
)
QUEUE_DEPTH = Gauge(
"celery_queue_depth",
"Number of tasks waiting in queue",
["queue_name"],
)
ACTIVE_WORKERS = Gauge(
"celery_active_workers",
"Number of active Celery workers",
)
def start_metrics_server(port=9090):
start_http_server(port)
print(f"Prometheus metrics server started on :{port}")
# celery_events_monitor.py - 事件監聽器
from celery import Celery
from prometheus_exporter import TASK_STARTED, TASK_COMPLETED, TASK_DURATION
import time
app = Celery("monitoring_app")
app.config_from_object("celery_config_monitoring")
def monitor_events():
with app.connection() as connection:
recv = app.events.Receiver(
connection,
handlers={
"task-sent": on_task_sent,
"task-started": on_task_started,
"task-succeeded": on_task_succeeded,
"task-failed": on_task_failed,
"task-retried": on_task_retried,
"worker-heartbeat": on_worker_heartbeat,
},
)
recv.capture(limit=None, timeout=None, wakeup=True)
def on_task_sent(event):
pass
def on_task_started(event):
task_name = event.get("name", "unknown")
queue = event.get("queue", "unknown")
TASK_STARTED.labels(task_name=task_name, queue=queue).inc()
def on_task_succeeded(event):
task_name = event.get("name", "unknown")
queue = event.get("queue", "unknown")
runtime = event.get("runtime", 0)
TASK_COMPLETED.labels(task_name=task_name, queue=queue, status="success").inc()
TASK_DURATION.labels(task_name=task_name, queue=queue).observe(runtime)
def on_task_failed(event):
task_name = event.get("name", "unknown")
queue = event.get("queue", "unknown")
TASK_COMPLETED.labels(task_name=task_name, queue=queue, status="failed").inc()
def on_task_retried(event):
task_name = event.get("name", "unknown")
TASK_COMPLETED.labels(task_name=task_name, queue="unknown", status="retried").inc()
def on_worker_heartbeat(event):
pass
if __name__ == "__main__":
from prometheus_exporter import start_metrics_server
start_metrics_server(9090)
monitor_events()
# 啟動Flower監控
celery -A monitoring_tasks flower --port=5555 --basic-auth=admin:secret123
# 啟動事件監聽+Prometheus
python celery_events_monitor.py
模式6:生產級Celery服務(Docker + Kubernetes)
Docker多階段建構、K8s Deployment部署、HPA自動擴縮容。
# Dockerfile
FROM python:3.12-slim AS builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --prefix=/install -r requirements.txt
FROM python:3.12-slim
WORKDIR /app
COPY --from=builder /install /usr/local
COPY . .
ENV PYTHONUNBUFFERED=1
ENV CELERY_BROKER_URL=redis://redis:6379/0
ENV CELERY_RESULT_BACKEND=redis://redis:6379/1
CMD ["celery", "-A", "tasks", "worker", "--loglevel=info"]
# docker-compose.yml
version: "3.8"
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
command: redis-server --appendonly yes --maxmemory 512mb --maxmemory-policy allkeys-lru
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5
worker-default:
build: .
command: celery -A tasks worker -Q default --loglevel=info --concurrency=4 -n default@%h
depends_on:
redis:
condition: service_healthy
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
deploy:
replicas: 2
resources:
limits:
memory: 512M
cpus: "1.0"
reservations:
memory: 256M
cpus: "0.5"
restart: unless-stopped
worker-compute:
build: .
command: celery -A tasks worker -Q compute --loglevel=info --concurrency=2 -n compute@%h
depends_on:
redis:
condition: service_healthy
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
deploy:
replicas: 1
resources:
limits:
memory: 2G
cpus: "2.0"
reservations:
memory: 1G
cpus: "1.0"
restart: unless-stopped
worker-io:
build: .
command: celery -A tasks worker -Q io --loglevel=info --concurrency=16 -n io@%h
depends_on:
redis:
condition: service_healthy
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
deploy:
replicas: 3
resources:
limits:
memory: 512M
cpus: "1.0"
restart: unless-stopped
flower:
build: .
command: celery -A tasks flower --port=5555 --basic-auth=admin:secret123
depends_on:
redis:
condition: service_healthy
ports:
- "5555:5555"
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
restart: unless-stopped
beat:
build: .
command: celery -A tasks beat --loglevel=info
depends_on:
redis:
condition: service_healthy
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
restart: unless-stopped
volumes:
redis_data:
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: celery-worker-default
labels:
app: celery-worker
queue: default
spec:
replicas: 3
selector:
matchLabels:
app: celery-worker
queue: default
template:
metadata:
labels:
app: celery-worker
queue: default
spec:
containers:
- name: worker
image: myregistry/celery-app:latest
command: ["celery", "-A", "tasks", "worker", "-Q", "default", "--loglevel=info", "--concurrency=4"]
env:
- name: CELERY_BROKER_URL
valueFrom:
secretKeyRef:
name: celery-secrets
key: broker-url
- name: CELERY_RESULT_BACKEND
valueFrom:
secretKeyRef:
name: celery-secrets
key: result-backend
resources:
requests:
memory: "256Mi"
cpu: "500m"
limits:
memory: "512Mi"
cpu: "1000m"
livenessProbe:
exec:
command: ["celery", "-A", "tasks", "inspect", "ping"]
initialDelaySeconds: 30
periodSeconds: 60
timeoutSeconds: 10
readinessProbe:
exec:
command: ["celery", "-A", "tasks", "inspect", "ping"]
initialDelaySeconds: 10
periodSeconds: 30
timeoutSeconds: 10
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: celery-worker-default-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: celery-worker-default
minReplicas: 2
maxReplicas: 20
metrics:
- type: External
external:
metric:
name: celery_queue_depth
selector:
matchLabels:
queue_name: default
target:
type: AverageValue
averageValue: "100"
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: celery-beat
labels:
app: celery-beat
spec:
replicas: 1
selector:
matchLabels:
app: celery-beat
template:
metadata:
labels:
app: celery-beat
spec:
containers:
- name: beat
image: myregistry/celery-app:latest
command: ["celery", "-A", "tasks", "beat", "--loglevel=info"]
env:
- name: CELERY_BROKER_URL
valueFrom:
secretKeyRef:
name: celery-secrets
key: broker-url
- name: CELERY_RESULT_BACKEND
valueFrom:
secretKeyRef:
name: celery-secrets
key: result-backend
resources:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "256Mi"
cpu: "250m"
# k8s/flower-service.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: celery-flower
spec:
replicas: 1
selector:
matchLabels:
app: celery-flower
template:
metadata:
labels:
app: celery-flower
spec:
containers:
- name: flower
image: myregistry/celery-app:latest
command: ["celery", "-A", "tasks", "flower", "--port=5555"]
ports:
- containerPort: 5555
env:
- name: CELERY_BROKER_URL
valueFrom:
secretKeyRef:
name: celery-secrets
key: broker-url
- name: CELERY_RESULT_BACKEND
valueFrom:
secretKeyRef:
name: celery-secrets
key: result-backend
---
apiVersion: v1
kind: Service
metadata:
name: celery-flower
spec:
selector:
app: celery-flower
ports:
- port: 5555
targetPort: 5555
type: ClusterIP
踩坑指南:5個常見陷阱
陷阱1:任務中直接使用全域資料庫連線
❌ 錯誤寫法:
import psycopg2
conn = psycopg2.connect("postgresql://localhost/mydb")
@app.task
def save_user(user_data):
cursor = conn.cursor()
cursor.execute("INSERT INTO users (%s)", (user_data,))
conn.commit()
✅ 正確寫法:
@app.task
def save_user(user_data):
from django.db import connection
with connection.cursor() as cursor:
cursor.execute("INSERT INTO users (data) VALUES (%s)", (user_data,))
陷阱2:忽略任務超時導致Worker永久阻塞
❌ 錯誤寫法:
@app.task
def fetch_external_data(url):
import requests
response = requests.get(url)
return response.json()
✅ 正確寫法:
@app.task(
time_limit=300,
soft_time_limit=270,
)
def fetch_external_data(url):
import requests
try:
response = requests.get(url, timeout=60)
return response.json()
except requests.Timeout:
raise RetryException("Request timeout")
陷阱3:Canvas中傳遞不可序列化物件
❌ 錯誤寫法:
@app.task
def process_file(file_obj):
return file_obj.read()
workflow = chain(process_file.s(open("data.csv")), store_result.s())
✅ 正確寫法:
@app.task
def process_file(file_path):
with open(file_path, "r") as f:
return f.read()
workflow = chain(process_file.s("/data/data.csv"), store_result.s())
陷阱4:Beat定時任務未加分散式鎖導致重複執行
❌ 錯誤寫法:
# 多個Beat實例同時執行,任務重複執行
CELERYBEAT_SCHEDULE = {
"cleanup-every-night": {
"task": "tasks.cleanup",
"schedule": crontab(hour=2, minute=0),
},
}
✅ 正確寫法:
from celery.schedules import crontab
from celery_redbeat import RedBeatScheduler
beat_scheduler = RedBeatScheduler
redbeat_redis_url = "redis://localhost:6379/2"
CELERYBEAT_SCHEDULE = {
"cleanup-every-night": {
"task": "tasks.cleanup",
"schedule": crontab(hour=2, minute=0),
"options": {"queue": "default"},
},
}
陷阱5:Worker使用fork模式但未處理資源繼承
❌ 錯誤寫法:
import redis
redis_client = redis.Redis()
@app.task
def update_cache(key, value):
redis_client.set(key, value)
✅ 正確寫法:
@app.task
def update_cache(key, value):
import redis
redis_client = redis.Redis()
redis_client.set(key, value)
# 或使用Worker程序初始化
@app.task(bind=True)
def update_cache(self, key, value):
redis_client = self.app.pool.redis
redis_client.set(key, value)
錯誤排查表
| 錯誤現象 | 可能原因 | 排查方法 | 解決方案 |
|---|---|---|---|
| Worker啟動後無日誌輸出 | Broker連線失敗 | 檢查CELERY_BROKER_URL格式和連通性 |
確認Redis/RabbitMQ執行正常,URL格式正確 |
| 任務一直PENDING不執行 | 沒有Worker消費對應佇列 | celery -A tasks inspect active_queues |
啟動對應佇列的Worker或檢查task_routes設定 |
kombu.exceptions.EncodeError |
任務引數包含不可序列化物件 | 檢查delay()/apply_async()的引數 |
只傳遞JSON可序列化的基本型別 |
WorkerLostError |
Worker程序被OOM Killer終止 | dmesg | grep -i oom檢視系統日誌 |
增加Worker記憶體限制或減少concurrency |
TimeLimitExceeded |
任務執行超過time_limit | 檢查任務邏輯是否有死迴圈或阻塞IO | 設定合理的time_limit和soft_time_limit |
任務結果回傳None |
未設定result_backend | 檢查CELERY_RESULT_BACKEND設定 |
設定Redis/資料庫作為結果後端 |
| Chord任務callback不執行 | header中某個任務靜默失敗 | Flower檢視header任務狀態 | 確保header任務正確raise異常而非return None |
ContentDisallowed |
不接受的訊息序列化格式 | accept_content與傳送方不一致 | 統一設定accept_content = ["json"] |
| Beat任務不按時執行 | 時區設定錯誤 | 檢查timezone和enable_utc設定 |
統一時區設定,推薦enable_utc=True |
| Worker頻繁斷線重連 | Broker連線池耗盡或網路抖動 | 檢查broker_pool_limit和連線超時 |
增加broker_pool_limit,設定broker_heartbeat |
進階最佳化
1. 任務結果去重與冪等性
生產環境中任務可能被重複投遞(網路抖動、Worker重啟),需要保證冪等性。
import hashlib
from celery import Celery
from contextlib import contextmanager
app = Celery("idempotent_app")
app.config_from_object("celery_config")
@contextmanager
def idempotent_lock(task_id, redis_client, ttl=3600):
lock_key = f"celery:idempotent:{task_id}"
acquired = redis_client.set(lock_key, "1", nx=True, ex=ttl)
try:
yield acquired is not None
finally:
pass
@app.task(bind=True)
def process_order_idempotent(self, order_id, amount):
import redis
r = redis.Redis(host="localhost", port=6379, db=3)
task_key = f"order:processed:{order_id}"
existing = r.get(task_key)
if existing:
return {"status": "duplicate", "order_id": order_id, "cached_result": existing.decode()}
with idempotent_lock(self.request.id, r) as acquired:
if not acquired:
return {"status": "duplicate", "order_id": order_id}
result = _do_process_order(order_id, amount)
r.setex(task_key, 86400, str(result))
return result
def _do_process_order(order_id, amount):
return {"status": "processed", "order_id": order_id, "amount": amount}
2. 動態任務優先順序與限流
高優先順序任務優先執行,限流防止下游服務過載。
from celery import Celery
app = Celery("priority_app")
app.config_from_object("celery_config")
app.conf.broker_transport_options = {
"priority_steps": list(range(10)),
"sep": ":",
"queue_order_strategy": "priority",
}
@app.task(rate_limit="10/s")
def call_external_api(endpoint, payload):
import time
time.sleep(0.1)
return {"endpoint": endpoint, "status": "ok"}
# 高優先順序任務
def submit_high_priority():
call_external_api.apply_async(
args=["/api/v1/critical", {"urgent": True}],
priority=0,
queue="io",
)
# 低優先順序任務
def submit_low_priority():
call_external_api.apply_async(
args=["/api/v1/batch", {"batch": True}],
priority=9,
queue="io",
)
# 動態限流
from celery import current_app
def adjust_rate_limit(task_name, new_rate):
current_app.control.rate_limit(task_name, new_rate)
3. Celery與FastAPI整合
將Celery整合到FastAPI應用中,API觸發非同步任務並查詢狀態。
# fastapi_celery.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from celery.result import AsyncResult
app = FastAPI(title="Celery API Gateway")
class TaskSubmit(BaseModel):
task_name: str
args: list = []
kwargs: dict = {}
class TaskResponse(BaseModel):
task_id: str
status: str
@app.post("/tasks/submit", response_model=TaskResponse)
def submit_task(submit: TaskSubmit):
from tasks import app as celery_app
try:
result = celery_app.send_task(
submit.task_name,
args=submit.args,
kwargs=submit.kwargs,
)
return TaskResponse(task_id=result.id, status="PENDING")
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/tasks/{task_id}", response_model=dict)
def get_task_status(task_id: str):
result = AsyncResult(task_id)
response = {
"task_id": task_id,
"status": result.status,
"result": result.result if result.ready() else None,
}
if result.failed():
response["error"] = str(result.result)
return response
@app.post("/tasks/{task_id}/revoke")
def revoke_task(task_id: str):
from tasks import app as celery_app
celery_app.control.revoke(task_id, terminate=True)
return {"task_id": task_id, "status": "REVOKED"}
框架比較
| 特性 | Celery | Dramatiq | Huey | RQ | Temporal |
|---|---|---|---|---|---|
| 語言 | Python | Python | Python | Python | Go/多語言 |
| Broker | Redis/RabbitMQ/SQS等 | Redis/RabbitMQ | Redis/SQLite | Redis | 自有 |
| 工作流編排 | Canvas(chain/chord/group) | Pipeline | 不支援 | 不支援 | 原生DAG |
| 重試策略 | 指數退避+Jitter+自訂 | 指數退避+Jitter | 簡單重試 | 簡單重試 | 完整重試策略 |
| 監控 | Flower | 內建Dashboard | 簡單 | RQ Dashboard | Web UI |
| 定時任務 | Beat | 內建 | 內建 | 不支援 | 原生Schedule |
| 社群生態 | 最成熟 | 中等 | 小眾 | 小眾 | 快速增長 |
| 學習曲線 | 中等 | 低 | 低 | 低 | 高 |
| 多語言支援 | 否 | 否 | 否 | 否 | 是 |
| 生產成熟度 | 高 | 中 | 低 | 中 | 高 |
| 適用場景 | 通用分散式任務 | 中小專案 | 輕量級任務 | Redis生態簡單任務 | 跨語言複雜工作流 |
總結
Python Celery從5.4版本開始,已經是分散式任務佇列領域最成熟、最靈活的方案。記住三個核心原則:任務冪等性是底線——重複投遞必須安全;佇列隔離是標配——CPU/IO/預設佇列必須分開;監控告警不是可選項——Flower+Prometheus是生產環境的眼睛。從單機
celery worker到K8s HPA自動擴縮容,Celery的架構可以隨業務線性增長,不需要重寫任務程式碼。
推薦工具
本站提供瀏覽器本地工具,免註冊即可試用 →