Python Celery分布式任务队列实战:从基础任务到事件驱动管道的6种生产模式

编程语言

后台任务阻塞API、Cron无法扩展、任务失败无人重试

用户上传头像要生成5种尺寸缩略图,API同步处理直接超时;每天凌晨2点的数据清洗Cron,数据量翻倍后跑不完;第三方支付回调偶发失败,没有重试机制只能人工补单。2026年,Python Celery依然是分布式任务队列的王者——5.4版本带来原生asyncio支持和更稳定的Canvas工作流,从单机开发到K8s集群部署,一套代码全搞定。

本文将从6种生产模式出发,带你完成基础任务定义→路由队列隔离→指数退避重试→Canvas工作流→Flower+Prometheus监控→Docker/K8s生产部署的全链路实战,每一步都有完整可运行的Python代码。


Celery核心概念

概念 说明
Celery Python分布式任务队列框架,支持多种Broker和结果后端
Broker 消息中间件,负责接收生产者(客户端)发送的任务消息并投递给消费者(Worker),常用Redis/RabbitMQ
Worker 消费者进程,从Broker获取任务并执行,支持多进程/协程/线程并发模式
Task 被Celery管理的异步函数,通过@app.task装饰器注册,调用时返回AsyncResult
Canvas Celery的工作流原语,支持chain(链式)、group(并行)、chord(并行+汇总)等组合模式
Chain 链式工作流,前一个任务的输出作为后一个任务的输入,串行执行
Chord 并行+汇总工作流,group中所有任务完成后触发callback任务汇总结果
Group 并行工作流,一组任务同时执行,互不依赖
Flower Celery的实时监控Web面板,提供任务状态、Worker状态、队列深度等可视化
Result Backend 任务结果存储后端,用于保存任务返回值和状态,常用Redis/数据库
Beat Celery定时任务调度器,类似Cron但支持动态配置和分布式锁
Signature 任务签名,将任务调用参数序列化为可传递的对象,用于Canvas组合

问题分析:分布式任务队列的5大挑战

  1. 任务定义与调用耦合:业务代码直接调用耗时操作,同步阻塞导致API响应慢,需要将耗时操作抽象为异步任务并解耦调用方
  2. 任务路由与资源隔离:CPU密集型任务和IO密集型任务混在同一个队列,IO任务被CPU任务阻塞,需要队列隔离和路由策略
  3. 任务失败与重试风暴:第三方服务不可用时任务反复失败重试,无退避策略导致雪崩,需要指数退避+最大重试次数+死信处理
  4. 复杂工作流编排:多步骤任务需要串行/并行组合,手动回调链难以维护,需要Canvas原语声明式编排
  5. 生产环境可观测性:任务堆积、Worker宕机、队列阻塞无法及时发现,需要实时监控+告警+自动扩缩容

分步实操:6种Celery分布式任务模式

模式1:基础任务定义与执行

最简单的Celery任务:定义异步函数、启动Worker、调用并获取结果。

# celery_config.py
broker_url = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/1"
task_serializer = "json"
result_serializer = "json"
accept_content = ["json"]
timezone = "Asia/Shanghai"
enable_utc = True
# tasks.py
from celery import Celery

app = Celery("myapp")
app.config_from_object("celery_config")

@app.task
def add(x, y):
    return x + y

@app.task
def send_email(to, subject, body):
    import time
    time.sleep(2)
    return {"status": "sent", "to": to, "subject": subject}

@app.task
def generate_thumbnail(image_path, sizes):
    import time
    time.sleep(3)
    thumbnails = []
    for size in sizes:
        thumbnails.append(f"{image_path}_{size}x{size}.jpg")
    return {"image": image_path, "thumbnails": thumbnails}

if __name__ == "__main__":
    app.start()
# client.py - 调用任务
from tasks import add, send_email, generate_thumbnail

result = add.delay(4, 6)
print(f"Task ID: {result.id}")
print(f"Ready: {result.ready()}")
print(f"Result: {result.get(timeout=10)}")

email_result = send_email.delay("user@example.com", "Welcome", "Hello!")
print(f"Email: {email_result.get(timeout=10)}")

thumb_result = generate_thumbnail.delay("/uploads/avatar.png", [64, 128, 256])
print(f"Thumbnails: {thumb_result.get(timeout=10)}")
# 启动Worker
celery -A tasks worker --loglevel=info --concurrency=4

模式2:任务路由与队列隔离

不同类型任务分发到不同队列,Worker按队列隔离资源。

# celery_config_routing.py
broker_url = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/1"
task_serializer = "json"
result_serializer = "json"

task_routes = {
    "tasks.compute.*": {"queue": "compute"},
    "tasks.io.*": {"queue": "io"},
    "tasks.default.*": {"queue": "default"},
}

task_default_queue = "default"
task_queues = {
    "default": {
        "exchange": "default",
        "routing_key": "default",
    },
    "compute": {
        "exchange": "compute",
        "routing_key": "compute",
    },
    "io": {
        "exchange": "io",
        "routing_key": "io",
    },
}
# tasks_routing.py
from celery import Celery

app = Celery("routed_app")
app.config_from_object("celery_config_routing")

@app.task(name="tasks.compute.ml_inference")
def ml_inference(model_id, input_data):
    import time
    time.sleep(5)
    return {"model": model_id, "prediction": 0.95, "latency_ms": 5000}

@app.task(name="tasks.compute.data_pipeline")
def data_pipeline(source, target):
    import time
    time.sleep(8)
    return {"source": source, "target": target, "rows": 100000}

@app.task(name="tasks.io.send_notification")
def send_notification(user_id, message):
    import time
    time.sleep(1)
    return {"user_id": user_id, "status": "delivered"}

@app.task(name="tasks.io.sync_external_api")
def sync_external_api(endpoint, payload):
    import time
    time.sleep(2)
    return {"endpoint": endpoint, "status_code": 200}

@app.task(name="tasks.default.cleanup")
def cleanup(days_old):
    return {"deleted_records": days_old * 100}
# 启动不同队列的Worker
celery -A tasks_routing worker -Q compute --loglevel=info --concurrency=2 -n compute@%h
celery -A tasks_routing worker -Q io --loglevel=info --concurrency=8 -n io@%h
celery -A tasks_routing worker -Q default --loglevel=info --concurrency=4 -n default@%h
# client_routing.py
from tasks_routing import ml_inference, send_notification, cleanup

ml_inference.delay("resnet50", {"image": "base64..."})
send_notification.delay("USR-001", "Your order has shipped")
cleanup.delay(30)

# 动态指定队列
from tasks_routing import sync_external_api
sync_external_api.apply_async(
    args=["/api/v1/sync", {"batch": 100}],
    queue="io",
    priority=5,
)

模式3:指数退避重试策略

任务失败时自动重试,指数退避避免雪崩,最大重试次数兜底。

# tasks_retry.py
from celery import Celery
from celery.utils.log import get_task_logger
import random
import time

logger = get_task_logger(__name__)

app = Celery("retry_app")
app.config_from_object("celery_config")

class ExternalAPIError(Exception):
    pass

class RateLimitError(Exception):
    pass

@app.task(
    bind=True,
    max_retries=5,
    default_retry_delay=30,
    autoretry_for=(ExternalAPIError,),
    retry_backoff=True,
    retry_backoff_max=600,
    retry_jitter=True,
)
def call_payment_api(self, order_id, amount):
    try:
        success = random.random() > 0.7
        if not success:
            raise ExternalAPIError(f"Payment API timeout for order {order_id}")
        return {"order_id": order_id, "amount": amount, "status": "paid"}
    except ExternalAPIError as exc:
        logger.warning(f"Payment failed for {order_id}, retry {self.request.retries + 1}/{self.max_retries}")
        raise self.retry(exc=exc)

@app.task(
    bind=True,
    max_retries=3,
    autoretry_for=(RateLimitError,),
    retry_backoff=2,
    retry_backoff_max=120,
    retry_jitter=True,
)
def call_rate_limited_api(self, endpoint):
    try:
        if random.random() > 0.5:
            raise RateLimitError(f"Rate limited on {endpoint}")
        return {"endpoint": endpoint, "data": "success"}
    except RateLimitError as exc:
        countdown = 2 ** self.request.retries + random.uniform(0, 1)
        logger.info(f"Rate limited, retrying in {countdown:.1f}s (attempt {self.request.retries + 1})")
        raise self.retry(exc=exc, countdown=countdown)

@app.task(bind=True, max_retries=4)
def process_with_custom_retry(self, data):
    try:
        result = _do_risky_operation(data)
        return result
    except Exception as exc:
        if self.request.retries >= self.max_retries:
            logger.error(f"Max retries exceeded for {data}, sending to DLQ")
            send_to_dlq.delay(str(data), str(exc), self.request.retries)
            return {"status": "failed", "data": data, "error": str(exc)}
        countdown = min(60, 5 * (2 ** self.request.retries))
        logger.warning(f"Processing failed, retry in {countdown}s")
        raise self.retry(exc=exc, countdown=countdown)

def _do_risky_operation(data):
    if random.random() > 0.4:
        raise ValueError(f"Random failure processing {data}")
    return {"processed": data, "result": "ok"}

@app.task
def send_to_dlq(task_data, error, retry_count):
    logger.error(f"DLQ: task={task_data}, error={error}, retries={retry_count}")
    return {"dlq": True, "task": task_data, "error": error}
# client_retry.py
from tasks_retry import call_payment_api, call_rate_limited_api, process_with_custom_retry

result = call_payment_api.delay("ORD-001", 99.99)
print(f"Payment task: {result.id}")

result2 = call_rate_limited_api.delay("/api/v2/users")
print(f"Rate limited task: {result2.id}")

result3 = process_with_custom_retry.delay({"key": "value"})
print(f"Custom retry task: {result3.id}")

模式4:Canvas工作流(Chain、Chord、Group)

声明式编排复杂工作流:串行链、并行组、并行+汇总。

# tasks_canvas.py
from celery import Celery, chain, group, chord
from celery import signature

app = Celery("canvas_app")
app.config_from_object("celery_config")

@app.task
def download_url(url):
    import time
    time.sleep(1)
    return {"url": url, "content": f"content_of_{url.split('/')[-1]}"}

@app.task
def parse_content(download_result):
    content = download_result["content"]
    return {"parsed": True, "items": len(content), "source": download_result["url"]}

@app.task
def store_results(parse_result):
    return {"stored": True, "items": parse_result["items"], "source": parse_result["source"]}

@app.task
def aggregate_results(results):
    total_items = sum(r["items"] for r in results)
    return {"total_items": total_items, "sources": len(results), "aggregated": True}

@app.task
def send_report(aggregate_result):
    return {
        "report_sent": True,
        "total_items": aggregate_result["total_items"],
        "sources": aggregate_result["sources"],
    }

# Chain: 串行工作流
def run_chain():
    workflow = chain(
        download_url.s("https://api.example.com/data"),
        parse_content.s(),
        store_results.s(),
    )
    result = workflow.apply_async()
    return result.id

# Group: 并行工作流
def run_group():
    urls = [
        "https://api.example.com/users",
        "https://api.example.com/orders",
        "https://api.example.com/products",
    ]
    workflow = group(download_url.s(url) for url in urls)
    result = workflow.apply_async()
    return result.id

# Chord: 并行执行 + 汇总
def run_chord():
    urls = [
        "https://api.example.com/users",
        "https://api.example.com/orders",
        "https://api.example.com/products",
    ]
    workflow = chord(
        [chain(download_url.s(url), parse_content.s()) for url in urls],
        aggregate_results.s(),
    )
    result = workflow.apply_async()
    return result.id

# 复杂嵌套工作流
def run_complex_workflow():
    header = group(
        chain(download_url.s(f"https://api.example.com/batch/{i}"), parse_content.s())
        for i in range(1, 4)
    )
    workflow = chain(
        header,
        aggregate_results.s(),
        send_report.s(),
    )
    result = workflow.apply_async()
    return result.id
# client_canvas.py
from tasks_canvas import run_chain, run_group, run_chord, run_complex_workflow
from tasks_canvas import download_url, parse_content, store_results, aggregate_results
from celery import chain, group, chord

# 方式1:使用封装函数
chain_id = run_chain()
print(f"Chain task: {chain_id}")

group_id = run_group()
print(f"Group task: {group_id}")

chord_id = run_chord()
print(f"Chord task: {chord_id}")

# 方式2:直接使用Canvas原语
from tasks_canvas import app

result = chain(
    download_url.s("https://api.example.com/report"),
    parse_content.s(),
    store_results.s(),
).apply_async()

print(f"Direct chain: {result.id}")

# 使用signature延迟构建
sig = download_url.s("https://api.example.com/dynamic")
result = chain(sig, parse_content.s(), store_results.s()).apply_async()
print(f"Signature chain: {result.id}")

模式5:Flower监控与Prometheus集成

实时监控任务状态、Worker健康度,集成Prometheus告警。

# celery_config_monitoring.py
broker_url = "redis://localhost:6379/0"
result_backend = "redis://localhost:6379/1"
task_serializer = "json"
result_serializer = "json"

task_send_sent_event = True
task_track_started = True
worker_send_task_events = True

task_routes = {
    "monitoring_tasks.*": {"queue": "monitoring"},
}

flower_port = 5555
flower_basic_auth = ["admin:secret123"]
flower_persistent = True
flower_db = "flower_db"
flower_max_tasks = 10000
# monitoring_tasks.py
from celery import Celery
from celery.utils.log import get_task_logger
import time
import random

logger = get_task_logger(__name__)

app = Celery("monitoring_app")
app.config_from_object("celery_config_monitoring")

@app.task(name="monitoring_tasks.process_order", bind=True, track_started=True)
def process_order(self, order_id):
    self.update_state(state="PROCESSING", meta={"step": "validating", "order_id": order_id})
    time.sleep(1)

    self.update_state(state="PROCESSING", meta={"step": "charging", "order_id": order_id})
    time.sleep(2)

    self.update_state(state="PROCESSING", meta={"step": "fulfilling", "order_id": order_id})
    time.sleep(1)

    return {"order_id": order_id, "status": "completed", "amount": random.randint(100, 9999)}

@app.task(name="monitoring_tasks.generate_report", bind=True, track_started=True, time_limit=300)
def generate_report(self, report_type, date_range):
    self.update_state(state="GENERATING", meta={"progress": 0, "type": report_type})
    for i in range(1, 6):
        time.sleep(1)
        self.update_state(state="GENERATING", meta={"progress": i * 20, "type": report_type})

    return {"report_type": report_type, "date_range": date_range, "rows": random.randint(1000, 50000)}
# prometheus_exporter.py - 自定义Prometheus指标
from prometheus_client import Counter, Histogram, Gauge, start_http_server

TASK_STARTED = Counter(
    "celery_task_started_total",
    "Total number of Celery tasks started",
    ["task_name", "queue"],
)

TASK_COMPLETED = Counter(
    "celery_task_completed_total",
    "Total number of Celery tasks completed",
    ["task_name", "queue", "status"],
)

TASK_DURATION = Histogram(
    "celery_task_duration_seconds",
    "Celery task duration in seconds",
    ["task_name", "queue"],
    buckets=[0.1, 0.5, 1, 2, 5, 10, 30, 60, 120, 300],
)

QUEUE_DEPTH = Gauge(
    "celery_queue_depth",
    "Number of tasks waiting in queue",
    ["queue_name"],
)

ACTIVE_WORKERS = Gauge(
    "celery_active_workers",
    "Number of active Celery workers",
)

def start_metrics_server(port=9090):
    start_http_server(port)
    print(f"Prometheus metrics server started on :{port}")
# celery_events_monitor.py - 事件监听器
from celery import Celery
from prometheus_exporter import TASK_STARTED, TASK_COMPLETED, TASK_DURATION
import time

app = Celery("monitoring_app")
app.config_from_object("celery_config_monitoring")

def monitor_events():
    with app.connection() as connection:
        recv = app.events.Receiver(
            connection,
            handlers={
                "task-sent": on_task_sent,
                "task-started": on_task_started,
                "task-succeeded": on_task_succeeded,
                "task-failed": on_task_failed,
                "task-retried": on_task_retried,
                "worker-heartbeat": on_worker_heartbeat,
            },
        )
        recv.capture(limit=None, timeout=None, wakeup=True)

def on_task_sent(event):
    pass

def on_task_started(event):
    task_name = event.get("name", "unknown")
    queue = event.get("queue", "unknown")
    TASK_STARTED.labels(task_name=task_name, queue=queue).inc()

def on_task_succeeded(event):
    task_name = event.get("name", "unknown")
    queue = event.get("queue", "unknown")
    runtime = event.get("runtime", 0)
    TASK_COMPLETED.labels(task_name=task_name, queue=queue, status="success").inc()
    TASK_DURATION.labels(task_name=task_name, queue=queue).observe(runtime)

def on_task_failed(event):
    task_name = event.get("name", "unknown")
    queue = event.get("queue", "unknown")
    TASK_COMPLETED.labels(task_name=task_name, queue=queue, status="failed").inc()

def on_task_retried(event):
    task_name = event.get("name", "unknown")
    TASK_COMPLETED.labels(task_name=task_name, queue="unknown", status="retried").inc()

def on_worker_heartbeat(event):
    pass

if __name__ == "__main__":
    from prometheus_exporter import start_metrics_server
    start_metrics_server(9090)
    monitor_events()
# 启动Flower监控
celery -A monitoring_tasks flower --port=5555 --basic-auth=admin:secret123

# 启动事件监听+Prometheus
python celery_events_monitor.py

模式6:生产级Celery服务(Docker + Kubernetes)

Docker多阶段构建、K8s Deployment部署、HPA自动扩缩容。

# Dockerfile
FROM python:3.12-slim AS builder

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --prefix=/install -r requirements.txt

FROM python:3.12-slim

WORKDIR /app
COPY --from=builder /install /usr/local
COPY . .

ENV PYTHONUNBUFFERED=1
ENV CELERY_BROKER_URL=redis://redis:6379/0
ENV CELERY_RESULT_BACKEND=redis://redis:6379/1

CMD ["celery", "-A", "tasks", "worker", "--loglevel=info"]
# docker-compose.yml
version: "3.8"

services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    command: redis-server --appendonly yes --maxmemory 512mb --maxmemory-policy allkeys-lru
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5

  worker-default:
    build: .
    command: celery -A tasks worker -Q default --loglevel=info --concurrency=4 -n default@%h
    depends_on:
      redis:
        condition: service_healthy
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/1
    deploy:
      replicas: 2
      resources:
        limits:
          memory: 512M
          cpus: "1.0"
        reservations:
          memory: 256M
          cpus: "0.5"
    restart: unless-stopped

  worker-compute:
    build: .
    command: celery -A tasks worker -Q compute --loglevel=info --concurrency=2 -n compute@%h
    depends_on:
      redis:
        condition: service_healthy
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/1
    deploy:
      replicas: 1
      resources:
        limits:
          memory: 2G
          cpus: "2.0"
        reservations:
          memory: 1G
          cpus: "1.0"
    restart: unless-stopped

  worker-io:
    build: .
    command: celery -A tasks worker -Q io --loglevel=info --concurrency=16 -n io@%h
    depends_on:
      redis:
        condition: service_healthy
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/1
    deploy:
      replicas: 3
      resources:
        limits:
          memory: 512M
          cpus: "1.0"
    restart: unless-stopped

  flower:
    build: .
    command: celery -A tasks flower --port=5555 --basic-auth=admin:secret123
    depends_on:
      redis:
        condition: service_healthy
    ports:
      - "5555:5555"
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/1
    restart: unless-stopped

  beat:
    build: .
    command: celery -A tasks beat --loglevel=info
    depends_on:
      redis:
        condition: service_healthy
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/1
    restart: unless-stopped

volumes:
  redis_data:
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: celery-worker-default
  labels:
    app: celery-worker
    queue: default
spec:
  replicas: 3
  selector:
    matchLabels:
      app: celery-worker
      queue: default
  template:
    metadata:
      labels:
        app: celery-worker
        queue: default
    spec:
      containers:
        - name: worker
          image: myregistry/celery-app:latest
          command: ["celery", "-A", "tasks", "worker", "-Q", "default", "--loglevel=info", "--concurrency=4"]
          env:
            - name: CELERY_BROKER_URL
              valueFrom:
                secretKeyRef:
                  name: celery-secrets
                  key: broker-url
            - name: CELERY_RESULT_BACKEND
              valueFrom:
                secretKeyRef:
                  name: celery-secrets
                  key: result-backend
          resources:
            requests:
              memory: "256Mi"
              cpu: "500m"
            limits:
              memory: "512Mi"
              cpu: "1000m"
          livenessProbe:
            exec:
              command: ["celery", "-A", "tasks", "inspect", "ping"]
            initialDelaySeconds: 30
            periodSeconds: 60
            timeoutSeconds: 10
          readinessProbe:
            exec:
              command: ["celery", "-A", "tasks", "inspect", "ping"]
            initialDelaySeconds: 10
            periodSeconds: 30
            timeoutSeconds: 10
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: celery-worker-default-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: celery-worker-default
  minReplicas: 2
  maxReplicas: 20
  metrics:
    - type: External
      external:
        metric:
          name: celery_queue_depth
          selector:
            matchLabels:
              queue_name: default
        target:
          type: AverageValue
          averageValue: "100"
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: celery-beat
  labels:
    app: celery-beat
spec:
  replicas: 1
  selector:
    matchLabels:
      app: celery-beat
  template:
    metadata:
      labels:
        app: celery-beat
    spec:
      containers:
        - name: beat
          image: myregistry/celery-app:latest
          command: ["celery", "-A", "tasks", "beat", "--loglevel=info"]
          env:
            - name: CELERY_BROKER_URL
              valueFrom:
                secretKeyRef:
                  name: celery-secrets
                  key: broker-url
            - name: CELERY_RESULT_BACKEND
              valueFrom:
                secretKeyRef:
                  name: celery-secrets
                  key: result-backend
          resources:
            requests:
              memory: "128Mi"
              cpu: "100m"
            limits:
              memory: "256Mi"
              cpu: "250m"
# k8s/flower-service.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: celery-flower
spec:
  replicas: 1
  selector:
    matchLabels:
      app: celery-flower
  template:
    metadata:
      labels:
        app: celery-flower
    spec:
      containers:
        - name: flower
          image: myregistry/celery-app:latest
          command: ["celery", "-A", "tasks", "flower", "--port=5555"]
          ports:
            - containerPort: 5555
          env:
            - name: CELERY_BROKER_URL
              valueFrom:
                secretKeyRef:
                  name: celery-secrets
                  key: broker-url
            - name: CELERY_RESULT_BACKEND
              valueFrom:
                secretKeyRef:
                  name: celery-secrets
                  key: result-backend
---
apiVersion: v1
kind: Service
metadata:
  name: celery-flower
spec:
  selector:
    app: celery-flower
  ports:
    - port: 5555
      targetPort: 5555
  type: ClusterIP

踩坑指南:5个常见陷阱

陷阱1:任务中直接使用全局数据库连接

错误写法

import psycopg2

conn = psycopg2.connect("postgresql://localhost/mydb")

@app.task
def save_user(user_data):
    cursor = conn.cursor()
    cursor.execute("INSERT INTO users (%s)", (user_data,))
    conn.commit()

正确写法

@app.task
def save_user(user_data):
    from django.db import connection
    with connection.cursor() as cursor:
        cursor.execute("INSERT INTO users (data) VALUES (%s)", (user_data,))

陷阱2:忽略任务超时导致Worker永久阻塞

错误写法

@app.task
def fetch_external_data(url):
    import requests
    response = requests.get(url)
    return response.json()

正确写法

@app.task(
    time_limit=300,
    soft_time_limit=270,
)
def fetch_external_data(url):
    import requests
    try:
        response = requests.get(url, timeout=60)
        return response.json()
    except requests.Timeout:
        raise RetryException("Request timeout")

陷阱3:Canvas中传递不可序列化对象

错误写法

@app.task
def process_file(file_obj):
    return file_obj.read()

workflow = chain(process_file.s(open("data.csv")), store_result.s())

正确写法

@app.task
def process_file(file_path):
    with open(file_path, "r") as f:
        return f.read()

workflow = chain(process_file.s("/data/data.csv"), store_result.s())

陷阱4:Beat定时任务未加分布式锁导致重复执行

错误写法

# 多个Beat实例同时运行,任务重复执行
CELERYBEAT_SCHEDULE = {
    "cleanup-every-night": {
        "task": "tasks.cleanup",
        "schedule": crontab(hour=2, minute=0),
    },
}

正确写法

from celery.schedules import crontab
from celery_redbeat import RedBeatScheduler

beat_scheduler = RedBeatScheduler
redbeat_redis_url = "redis://localhost:6379/2"

CELERYBEAT_SCHEDULE = {
    "cleanup-every-night": {
        "task": "tasks.cleanup",
        "schedule": crontab(hour=2, minute=0),
        "options": {"queue": "default"},
    },
}

陷阱5:Worker使用fork模式但未处理资源继承

错误写法

import redis

redis_client = redis.Redis()

@app.task
def update_cache(key, value):
    redis_client.set(key, value)

正确写法

@app.task
def update_cache(key, value):
    import redis
    redis_client = redis.Redis()
    redis_client.set(key, value)

# 或使用worker进程初始化
@app.task(bind=True)
def update_cache(self, key, value):
    redis_client = self.app.pool.redis
    redis_client.set(key, value)

错误排查表

错误现象 可能原因 排查方法 解决方案
Worker启动后无日志输出 Broker连接失败 检查CELERY_BROKER_URL格式和连通性 确认Redis/RabbitMQ运行正常,URL格式正确
任务一直PENDING不执行 没有Worker消费对应队列 celery -A tasks inspect active_queues 启动对应队列的Worker或检查task_routes配置
kombu.exceptions.EncodeError 任务参数包含不可序列化对象 检查delay()/apply_async()的参数 只传递JSON可序列化的基本类型
WorkerLostError Worker进程被OOM Killer杀掉 dmesg | grep -i oom查看系统日志 增加Worker内存限制或减少concurrency
TimeLimitExceeded 任务执行超过time_limit 检查任务逻辑是否有死循环或阻塞IO 设置合理的time_limit和soft_time_limit
任务结果返回None 未配置result_backend 检查CELERY_RESULT_BACKEND配置 配置Redis/数据库作为结果后端
Chord任务callback不执行 header中某个任务静默失败 Flower查看header任务状态 确保header任务正确raise异常而非return None
ContentDisallowed 不接受的消息序列化格式 accept_content与发送方不一致 统一配置accept_content = ["json"]
Beat任务不按时执行 时区配置错误 检查timezoneenable_utc设置 统一时区配置,推荐enable_utc=True
Worker频繁断连重连 Broker连接池耗尽或网络抖动 检查broker_pool_limit和连接超时 增加broker_pool_limit,设置broker_heartbeat

进阶优化

1. 任务结果去重与幂等性

生产环境中任务可能被重复投递(网络抖动、Worker重启),需要保证幂等性。

import hashlib
from celery import Celery
from contextlib import contextmanager

app = Celery("idempotent_app")
app.config_from_object("celery_config")

@contextmanager
def idempotent_lock(task_id, redis_client, ttl=3600):
    lock_key = f"celery:idempotent:{task_id}"
    acquired = redis_client.set(lock_key, "1", nx=True, ex=ttl)
    try:
        yield acquired is not None
    finally:
        pass

@app.task(bind=True)
def process_order_idempotent(self, order_id, amount):
    import redis
    r = redis.Redis(host="localhost", port=6379, db=3)

    task_key = f"order:processed:{order_id}"
    existing = r.get(task_key)
    if existing:
        return {"status": "duplicate", "order_id": order_id, "cached_result": existing.decode()}

    with idempotent_lock(self.request.id, r) as acquired:
        if not acquired:
            return {"status": "duplicate", "order_id": order_id}

        result = _do_process_order(order_id, amount)
        r.setex(task_key, 86400, str(result))
        return result

def _do_process_order(order_id, amount):
    return {"status": "processed", "order_id": order_id, "amount": amount}

2. 动态任务优先级与限流

高优先级任务优先执行,限流防止下游服务过载。

from celery import Celery

app = Celery("priority_app")
app.config_from_object("celery_config")

app.conf.broker_transport_options = {
    "priority_steps": list(range(10)),
    "sep": ":",
    "queue_order_strategy": "priority",
}

@app.task(rate_limit="10/s")
def call_external_api(endpoint, payload):
    import time
    time.sleep(0.1)
    return {"endpoint": endpoint, "status": "ok"}

# 高优先级任务
def submit_high_priority():
    call_external_api.apply_async(
        args=["/api/v1/critical", {"urgent": True}],
        priority=0,
        queue="io",
    )

# 低优先级任务
def submit_low_priority():
    call_external_api.apply_async(
        args=["/api/v1/batch", {"batch": True}],
        priority=9,
        queue="io",
    )

# 动态限流
from celery import current_app

def adjust_rate_limit(task_name, new_rate):
    current_app.control.rate_limit(task_name, new_rate)

3. Celery与FastAPI集成

将Celery集成到FastAPI应用中,API触发异步任务并查询状态。

# fastapi_celery.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from celery.result import AsyncResult

app = FastAPI(title="Celery API Gateway")

class TaskSubmit(BaseModel):
    task_name: str
    args: list = []
    kwargs: dict = {}

class TaskResponse(BaseModel):
    task_id: str
    status: str

@app.post("/tasks/submit", response_model=TaskResponse)
def submit_task(submit: TaskSubmit):
    from tasks import app as celery_app
    try:
        result = celery_app.send_task(
            submit.task_name,
            args=submit.args,
            kwargs=submit.kwargs,
        )
        return TaskResponse(task_id=result.id, status="PENDING")
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.get("/tasks/{task_id}", response_model=dict)
def get_task_status(task_id: str):
    result = AsyncResult(task_id)
    response = {
        "task_id": task_id,
        "status": result.status,
        "result": result.result if result.ready() else None,
    }
    if result.failed():
        response["error"] = str(result.result)
    return response

@app.post("/tasks/{task_id}/revoke")
def revoke_task(task_id: str):
    from tasks import app as celery_app
    celery_app.control.revoke(task_id, terminate=True)
    return {"task_id": task_id, "status": "REVOKED"}

框架对比

特性 Celery Dramatiq Huey RQ Temporal
语言 Python Python Python Python Go/多语言
Broker Redis/RabbitMQ/SQS等 Redis/RabbitMQ Redis/SQLite Redis 自有
工作流编排 Canvas(chain/chord/group) Pipeline 不支持 不支持 原生DAG
重试策略 指数退避+Jitter+自定义 指数退避+Jitter 简单重试 简单重试 完整重试策略
监控 Flower 内置Dashboard 简单 RQ Dashboard Web UI
定时任务 Beat 内置 内置 不支持 原生Schedule
社区生态 最成熟 中等 小众 小众 快速增长
学习曲线 中等
多语言支持
生产成熟度
适用场景 通用分布式任务 中小项目 轻量级任务 Redis生态简单任务 跨语言复杂工作流

总结

Python Celery从5.4版本开始,已经是分布式任务队列领域最成熟、最灵活的方案。记住三个核心原则:任务幂等性是底线——重复投递必须安全;队列隔离是标配——CPU/IO/默认队列必须分开;监控告警不是可选项——Flower+Prometheus是生产环境的眼睛。从单机celery worker到K8s HPA自动扩缩容,Celery的架构可以随业务线性增长,不需要重写任务代码。


推荐工具

本站提供浏览器本地工具,免注册即可试用 →

#Python#Celery#分布式任务#Redis#RabbitMQ#2026#异步任务