Python AI数据流水线实战:从ETL到特征存储的5种生产模式

AI与大数据

数据质量崩塌、ETL脚本失控、特征上线延迟一周

凌晨3点,模型训练任务因为上游数据字段从user_id改成了userId直接崩溃,排查到天亮;特征工程脚本散落在10个Jupyter Notebook里,上线一个新特征要等数据工程师排期一周;RAG系统的向量数据永远和源文档不同步,检索结果驴唇不对马嘴。2026年,Python AI数据流水线已经从"Nice to Have"变成了"Must Have"——Airflow 3.0带来了原生async def支持,Pydantic v2的数据校验性能提升5-50倍,Polars在特征工程场景下比Pandas快10倍以上。

本文将从5种生产模式出发,带你完成Airflow ETL编排→Pydantic+GE数据校验→Pandas/Polars特征工程→RAG向量数据流水线→Prometheus生产监控的全链路实战,每一步都有完整可运行的Python代码。


核心收获

  • 掌握Apache Airflow 3.0的DAG编排模式,实现ETL任务的依赖管理和失败重试
  • 使用Pydantic v2 + Great Expectations构建数据校验防线,拒绝脏数据进入流水线
  • 用Pandas + Polars混合方案完成特征工程,兼顾生态兼容性和计算性能
  • 搭建RAG向量数据流水线,从文档分块到Embedding到向量存储的完整链路
  • 实现Prometheus + Grafana数据质量监控,流水线异常秒级告警
  • 理解5种常见数据流水线陷阱及其生产级解决方案
  • 掌握增量处理、缓存优化、并行加速等高级优化技巧

目录

  1. Architecture Overview:AI数据流水线全局架构
  2. Pattern 1:ETL Pipeline with Apache Airflow + Python
  3. Pattern 2:Data Validation with Pydantic v2 + Great Expectations
  4. Pattern 3:Feature Engineering Pipeline with Pandas + Polars
  5. Pattern 4:Vector Data Pipeline for RAG
  6. Pattern 5:Production Monitoring with Prometheus + Data Quality Alerts
  7. 5 Common Pitfalls and Solutions
  8. 10 Common Error Troubleshooting
  9. Advanced Optimization Techniques
  10. Comparison Analysis
  11. Recommended Online Tools
  12. Summary & Further Reading

Architecture Overview

┌─────────────────────────────────────────────────────────────────┐
│                    AI Data Pipeline Architecture                 │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌──────────┐    ┌──────────────┐    ┌──────────────────────┐  │
│  │  Data     │    │  Airflow DAG │    │  Pydantic + GE       │  │
│  │  Sources  │───▶│  Orchestration│───▶│  Data Validation     │  │
│  │  (API/DB/ │    │  (ETL Sched) │    │  (Schema + Rules)    │  │
│  │   Files)  │    └──────────────┘    └──────────┬───────────┘  │
│  └──────────┘                                    │              │
│                                                   ▼              │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │              Feature Engineering Pipeline                 │  │
│  │  ┌─────────┐   ┌──────────┐   ┌──────────────────────┐ │  │
│  │  │ Pandas  │──▶│  Polars  │──▶│  Feature Store       │ │  │
│  │  │ (Compat)│   │ (Speed)  │   │  (Redis/Feast)       │ │  │
│  │  └─────────┘   └──────────┘   └──────────────────────┘ │  │
│  └──────────────────────────────────────────────────────────┘  │
│                                                   │              │
│                                                   ▼              │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │              Vector Data Pipeline (RAG)                   │  │
│  │  ┌──────────┐  ┌──────────┐  ┌───────────────────────┐ │  │
│  │  │ Chunking │─▶│ Embedding│─▶│  Vector Store         │ │  │
│  │  │ (Split)  │  │ (Model)  │  │  (PGVector/Chroma)    │ │  │
│  │  └──────────┘  └──────────┘  └───────────────────────┘ │  │
│  └──────────────────────────────────────────────────────────┘  │
│                                                   │              │
│                                                   ▼              │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │              Production Monitoring                        │  │
│  │  ┌────────────┐  ┌───────────┐  ┌────────────────────┐ │  │
│  │  │ Prometheus │─▶│  Grafana  │─▶│  AlertManager      │ │  │
│  │  │ (Metrics)  │  │ (Dashboard)│  │  (Notifications)   │ │  │
│  │  └────────────┘  └───────────┘  └────────────────────┘ │  │
│  └──────────────────────────────────────────────────────────┘  │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Pattern 1:ETL Pipeline with Apache Airflow + Python

问题场景

每天需要从3个数据库、2个API、5个CSV文件中抽取数据,清洗后写入数据仓库。手动脚本无法追踪执行状态,上游失败下游不知道,重试全靠人肉。

解决方案

使用Apache Airflow 3.0编排ETL DAG,实现任务依赖、自动重试、失败告警。

# dags/etl_pipeline.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.sensors.python import PythonSensor

default_args = {
    "owner": "data-team",
    "depends_on_past": False,
    "email_on_failure": True,
    "email": ["data-team@company.com"],
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
}

with DAG(
    dag_id="etl_daily_pipeline",
    default_args=default_args,
    description="Daily ETL pipeline for AI data",
    schedule="0 2 * * *",
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=["etl", "production"],
) as dag:

    def extract_from_postgres(**context):
        import psycopg2
        conn = psycopg2.connect(
            host="postgres-host",
            database="source_db",
            user="reader",
            password="***",
        )
        cur = conn.cursor()
        cur.execute("SELECT * FROM user_events WHERE created_at >= CURRENT_DATE - INTERVAL '1 day'")
        rows = cur.fetchall()
        conn.close()
        context["ti"].xcom_push(key="postgres_rows", value=len(rows))
        return rows

    def extract_from_api(**context):
        import requests
        resp = requests.get(
            "https://api.example.com/v2/data",
            headers={"Authorization": "Bearer ***"},
            params={"date": context["ds"]},
        )
        resp.raise_for_status()
        data = resp.json()
        context["ti"].xcom_push(key="api_records", value=len(data))
        return data

    def extract_from_files(**context):
        import glob
        import pandas as pd
        files = glob.glob(f"/data/input/{context['ds']}/*.csv")
        frames = [pd.read_csv(f) for f in files]
        combined = pd.concat(frames, ignore_index=True) if frames else pd.DataFrame()
        context["ti"].xcom_push(key="file_count", value=len(files))
        return combined.to_dict("records")

    def transform_data(**context):
        import pandas as pd
        ti = context["ti"]
        pg_rows = ti.xcom_pull(task_ids="extract_postgres", key="return_value")
        api_data = ti.xcom_pull(task_ids="extract_api", key="return_value")
        file_data = ti.xcom_pull(task_ids="extract_files", key="return_value")

        df_pg = pd.DataFrame(pg_rows)
        df_api = pd.DataFrame(api_data)
        df_files = pd.DataFrame(file_data)

        df_all = pd.concat([df_pg, df_api, df_files], ignore_index=True)
        df_all = df_all.drop_duplicates(subset=["user_id", "event_id"])
        df_all = df_all.dropna(subset=["user_id", "event_type"])
        df_all["event_date"] = pd.to_datetime(df_all["created_at"]).dt.date

        context["ti"].xcom_push(key="total_records", value=len(df_all))
        return df_all.to_dict("records")

    def load_to_warehouse(**context):
        import psycopg2
        ti = context["ti"]
        records = ti.xcom_pull(task_ids="transform", key="return_value")
        conn = psycopg2.connect(
            host="warehouse-host",
            database="analytics_db",
            user="loader",
            password="***",
        )
        cur = conn.cursor()
        cur.execute("TRUNCATE TABLE staging.user_events_daily")
        insert_sql = """
            INSERT INTO staging.user_events_daily
            (user_id, event_id, event_type, event_date, created_at)
            VALUES (%s, %s, %s, %s, %s)
        """
        batch = [(r["user_id"], r["event_id"], r["event_type"],
                  str(r["event_date"]), r["created_at"]) for r in records[:1000]]
        cur.executemany(insert_sql, batch)
        conn.commit()
        conn.close()
        return len(batch)

    def data_quality_check(**context):
        ti = context["ti"]
        total = ti.xcom_pull(task_ids="transform", key="total_records")
        loaded = ti.xcom_pull(task_ids="load", key="return_value")
        if total and loaded and loaded / total < 0.95:
            raise ValueError(f"Data quality check failed: only {loaded}/{total} records loaded")
        return "PASSED"

    start = EmptyOperator(task_id="start")
    extract_pg = PythonOperator(task_id="extract_postgres", python_callable=extract_from_postgres)
    extract_api = PythonOperator(task_id="extract_api", python_callable=extract_from_api)
    extract_files = PythonOperator(task_id="extract_files", python_callable=extract_from_files)
    transform = PythonOperator(task_id="transform", python_callable=transform_data)
    load = PythonOperator(task_id="load", python_callable=load_to_warehouse)
    quality_check = PythonOperator(task_id="quality_check", python_callable=data_quality_check)
    end = EmptyOperator(task_id="end")

    start >> [extract_pg, extract_api, extract_files] >> transform >> load >> quality_check >> end
# 启动Airflow
pip install apache-airflow==3.0.0
airflow db init
airflow dags list
airflow dags trigger etl_daily_pipeline

Pattern 2:Data Validation with Pydantic v2 + Great Expectations

问题场景

上游系统悄悄改了字段类型,age从int变成了string,email格式不再校验,脏数据一路灌入模型训练,训练完了才发现准确率暴跌。

解决方案

用Pydantic v2做Schema校验,Great Expectations做统计规则校验,双重防线拦截脏数据。

# schemas.py
from pydantic import BaseModel, Field, EmailStr, field_validator
from datetime import datetime
from typing import Optional

class UserEventSchema(BaseModel):
    user_id: str = Field(..., min_length=1, max_length=64, description="用户唯一标识")
    event_id: str = Field(..., pattern=r"^evt_[a-zA-Z0-9]{16}$", description="事件ID格式")
    event_type: str = Field(..., description="事件类型")
    event_date: datetime = Field(..., description="事件日期")
    age: int = Field(..., ge=0, le=150, description="用户年龄")
    email: EmailStr = Field(..., description="用户邮箱")
    amount: Optional[float] = Field(None, ge=0, description="金额")

    @field_validator("event_type")
    @classmethod
    def validate_event_type(cls, v):
        allowed = {"click", "purchase", "signup", "logout", "view"}
        if v not in allowed:
            raise ValueError(f"event_type must be one of {allowed}, got {v}")
        return v

    model_config = {"extra": "forbid"}

class BatchValidationResult(BaseModel):
    total: int
    valid: int
    invalid: int
    errors: list[dict]
# validation_pipeline.py
from schemas import UserEventSchema, BatchValidationResult
from great_expectations.dataset import PandasDataset
import pandas as pd
import json

def validate_with_pydantic(records: list[dict]) -> BatchValidationResult:
    valid_records = []
    errors = []
    for i, record in enumerate(records):
        try:
            UserEventSchema(**record)
            valid_records.append(record)
        except Exception as e:
            errors.append({"index": i, "record": record, "error": str(e)})
    return BatchValidationResult(
        total=len(records),
        valid=len(valid_records),
        invalid=len(errors),
        errors=errors,
    )

def validate_with_great_expectations(df: pd.DataFrame) -> dict:
    ge_df = PandasDataset(df)
    results = {}

    results["row_count"] = ge_df.expect_table_row_count_to_be_between(min_value=1, max_value=10_000_000)
    results["user_id_not_null"] = ge_df.expect_column_values_to_not_be_null("user_id")
    results["age_range"] = ge_df.expect_column_values_to_be_between("age", min_value=0, max_value=150)
    results["event_type_set"] = ge_df.expect_column_values_to_be_in_set(
        "event_type", ["click", "purchase", "signup", "logout", "view"]
    )
    results["email_format"] = ge_df.expect_column_values_to_match_regex("email", r"^[\w\.-]+@[\w\.-]+\.\w+$")
    results["amount_non_negative"] = ge_df.expect_column_values_to_be_between(
        "amount", min_value=0, max_value=None
    )

    passed = sum(1 for r in results.values() if r["success"])
    failed = len(results) - passed
    return {"total_expectations": len(results), "passed": passed, "failed": failed, "details": results}

def run_validation_pipeline(records: list[dict]) -> dict:
    pydantic_result = validate_with_pydantic(records)

    valid_df = pd.DataFrame([r for i, r in enumerate(records) if i not in {e["index"] for e in pydantic_result.errors}])
    ge_result = validate_with_great_expectations(valid_df) if not valid_df.empty else {"total_expectations": 0, "passed": 0, "failed": 0}

    return {
        "pydantic_validation": pydantic_result.model_dump(),
        "ge_validation": ge_result,
        "overall_pass": pydantic_result.invalid == 0 and ge_result.get("failed", 0) == 0,
    }

if __name__ == "__main__":
    test_data = [
        {"user_id": "u001", "event_id": "evt_abc123def456gh7", "event_type": "click",
         "event_date": "2026-06-16T10:00:00", "age": 28, "email": "user@test.com", "amount": 99.5},
        {"user_id": "", "event_id": "evt_xyz789abc012ij3", "event_type": "signup",
         "event_date": "2026-06-16T11:00:00", "age": 35, "email": "new@test.com", "amount": 0},
        {"user_id": "u003", "event_id": "evt_bad_format", "event_type": "hack",
         "event_date": "2026-06-16T12:00:00", "age": -5, "email": "not-an-email", "amount": -10},
    ]
    result = run_validation_pipeline(test_data)
    print(json.dumps(result, indent=2, default=str))

Pattern 3:Feature Engineering Pipeline with Pandas + Polars

问题场景

特征工程脚本用Pandas写的,数据量从100万涨到5000万后,groupby+agg跑30分钟还不出结果。想迁移到Polars但存量代码太多,全部重写不现实。

解决方案

Pandas处理小数据兼容存量代码,Polars处理大数据加速计算,通过Arrow格式零拷贝互通。

# feature_pipeline.py
import pandas as pd
import polars as pl
from datetime import datetime, timedelta
import numpy as np

def generate_sample_data(n: int = 1_000_000) -> pd.DataFrame:
    np.random.seed(42)
    return pd.DataFrame({
        "user_id": np.random.choice([f"u{i:04d}" for i in range(1000)], n),
        "event_type": np.random.choice(["click", "purchase", "view", "signup"], n),
        "amount": np.random.exponential(50, n).round(2),
        "event_time": pd.date_range("2026-01-01", periods=n, freq="30s"),
        "category": np.random.choice(["electronics", "clothing", "food", "books"], n),
    })

def pandas_features(df: pd.DataFrame) -> pd.DataFrame:
    user_agg = df.groupby("user_id").agg(
        total_events=("event_type", "count"),
        total_amount=("amount", "sum"),
        avg_amount=("amount", "mean"),
        unique_categories=("category", "nunique"),
        first_event=("event_time", "min"),
        last_event=("event_time", "max"),
    )
    user_agg["active_days"] = (user_agg["last_event"] - user_agg["first_event"]).dt.days + 1
    user_agg["avg_daily_events"] = user_agg["total_events"] / user_agg["active_days"].clip(lower=1)
    return user_agg.reset_index()

def polars_features(df: pd.DataFrame) -> pd.DataFrame:
    pl_df = pl.from_pandas(df)

    result = pl_df.group_by("user_id").agg([
        pl.col("event_type").count().alias("total_events"),
        pl.col("amount").sum().alias("total_amount"),
        pl.col("amount").mean().alias("avg_amount"),
        pl.col("category").n_unique().alias("unique_categories"),
        pl.col("event_time").min().alias("first_event"),
        pl.col("event_time").max().alias("last_event"),
    ]).with_columns([
        (pl.col("last_event") - pl.col("first_event")).dt.total_days().add(1).alias("active_days"),
    ]).with_columns([
        (pl.col("total_events") / pl.col("active_days").clip(lower=1)).alias("avg_daily_events"),
    ])

    return result.to_pandas()

def time_window_features(df: pd.DataFrame, windows: list[str] = None) -> pd.DataFrame:
    if windows is None:
        windows = ["7d", "30d", "90d"]
    pl_df = pl.from_pandas(df)
    reference_date = pl_df.select(pl.col("event_time").max()).item()

    all_features = []
    for window in windows:
        days = int(window.replace("d", ""))
        cutoff = reference_date - timedelta(days=days)

        window_feat = pl_df.filter(pl.col("event_time") >= cutoff).group_by("user_id").agg([
            pl.col("amount").sum().alias(f"amount_sum_{window}"),
            pl.col("amount").mean().alias(f"amount_mean_{window}"),
            pl.col("event_type").count().alias(f"event_count_{window}"),
            pl.col("category").n_unique().alias(f"category_nunique_{window}"),
        ])
        all_features.append(window_feat)

    from functools import reduce
    merged = reduce(lambda a, b: a.join(b, on="user_id", how="outer"), all_features)
    return merged.to_pandas()

def category_encoding_features(df: pd.DataFrame) -> pd.DataFrame:
    pl_df = pl.from_pandas(df)
    pivot = pl_df.group_by(["user_id", "category"]).agg([
        pl.col("amount").sum().alias("category_amount"),
        pl.col("event_type").count().alias("category_count"),
    ]).pivot(
        index="user_id",
        on="category",
        values=["category_amount", "category_count"],
    ).fill_null(0)
    return pivot.to_pandas()

def run_feature_pipeline():
    print("Generating sample data...")
    df = generate_sample_data(1_000_000)
    print(f"Data shape: {df.shape}")

    print("\n--- Pandas Feature Engineering ---")
    start = datetime.now()
    pandas_result = pandas_features(df)
    print(f"Pandas time: {(datetime.now() - start).total_seconds():.2f}s")
    print(f"Pandas result shape: {pandas_result.shape}")

    print("\n--- Polars Feature Engineering ---")
    start = datetime.now()
    polars_result = polars_features(df)
    print(f"Polars time: {(datetime.now() - start).total_seconds():.2f}s")
    print(f"Polars result shape: {polars_result.shape}")

    print("\n--- Time Window Features ---")
    window_result = time_window_features(df)
    print(f"Window features shape: {window_result.shape}")

    print("\n--- Category Encoding Features ---")
    category_result = category_encoding_features(df)
    print(f"Category features shape: {category_result.shape}")

if __name__ == "__main__":
    run_feature_pipeline()

Pattern 4:Vector Data Pipeline for RAG

问题场景

RAG系统上线后,文档更新了但向量没同步,检索出来的内容永远是旧的。文档分块策略太粗暴,长表格被截断,代码块被切在中间,Embedding质量惨不忍睹。

解决方案

构建完整的向量数据流水线:智能分块→Embedding→向量存储→增量同步。

# vector_pipeline.py
import hashlib
import json
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class Document:
    doc_id: str
    content: str
    metadata: dict = field(default_factory=dict)
    source: str = ""
    updated_at: str = ""

@dataclass
class Chunk:
    chunk_id: str
    doc_id: str
    content: str
    embedding: Optional[list[float]] = None
    metadata: dict = field(default_factory=dict)

class SmartChunker:
    def __init__(self, chunk_size: int = 512, chunk_overlap: int = 64,
                 separators: list[str] = None):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.separators = separators or ["\n\n", "\n", "。", ".", " "]

    def chunk_document(self, doc: Document) -> list[Chunk]:
        text = doc.content
        chunks_text = self._recursive_split(text, self.separators)
        chunks = []
        for i, text in enumerate(chunks_text):
            if len(text.strip()) < 20:
                continue
            chunk_id = hashlib.md5(f"{doc.doc_id}:{i}:{text[:50]}".encode()).hexdigest()[:16]
            chunks.append(Chunk(
                chunk_id=chunk_id,
                doc_id=doc.doc_id,
                content=text,
                metadata={**doc.metadata, "chunk_index": i, "source": doc.source},
            ))
        return chunks

    def _recursive_split(self, text: str, separators: list[str]) -> list[str]:
        if not separators or len(text) <= self.chunk_size:
            return [text] if text.strip() else []

        sep = separators[0]
        parts = text.split(sep)

        result = []
        current = ""
        for part in parts:
            candidate = current + sep + part if current else part
            if len(candidate) <= self.chunk_size:
                current = candidate
            else:
                if current:
                    result.append(current)
                if len(part) > self.chunk_size:
                    sub_splits = self._recursive_split(part, separators[1:])
                    result.extend(sub_splits)
                    current = ""
                else:
                    current = part
        if current:
            result.append(current)
        return result

class EmbeddingService:
    def __init__(self, model_name: str = "text-embedding-3-small", api_key: str = ""):
        self.model_name = model_name
        self.api_key = api_key

    def embed_texts(self, texts: list[str]) -> list[list[float]]:
        try:
            from openai import OpenAI
            client = OpenAI(api_key=self.api_key)
            response = client.embeddings.create(input=texts, model=self.model_name)
            return [item.embedding for item in response.data]
        except ImportError:
            import numpy as np
            return [np.random.randn(1536).tolist() for _ in texts]

    def embed_chunks(self, chunks: list[Chunk], batch_size: int = 100) -> list[Chunk]:
        for i in range(0, len(chunks), batch_size):
            batch = chunks[i:i + batch_size]
            texts = [c.content for c in batch]
            embeddings = self.embed_texts(texts)
            for j, chunk in enumerate(batch):
                chunk.embedding = embeddings[j]
        return chunks

class VectorStore:
    def __init__(self, connection_string: str = ""):
        self.connection_string = connection_string
        self._store: dict[str, Chunk] = {}

    def upsert_chunks(self, chunks: list[Chunk]):
        for chunk in chunks:
            self._store[chunk.chunk_id] = chunk

    def search(self, query_embedding: list[float], top_k: int = 5) -> list[Chunk]:
        import numpy as np
        query_vec = np.array(query_embedding)
        scores = []
        for chunk in self._store.values():
            if chunk.embedding is None:
                continue
            chunk_vec = np.array(chunk.embedding)
            similarity = np.dot(query_vec, chunk_vec) / (np.linalg.norm(query_vec) * np.linalg.norm(chunk_vec) + 1e-8)
            scores.append((chunk, float(similarity)))
        scores.sort(key=lambda x: x[1], reverse=True)
        return [chunk for chunk, _ in scores[:top_k]]

    def delete_by_doc_id(self, doc_id: str):
        to_delete = [cid for cid, c in self._store.items() if c.doc_id == doc_id]
        for cid in to_delete:
            del self._store[cid]

class VectorPipeline:
    def __init__(self, chunk_size: int = 512, model_name: str = "text-embedding-3-small"):
        self.chunker = SmartChunker(chunk_size=chunk_size)
        self.embedder = EmbeddingService(model_name=model_name)
        self.store = VectorStore()

    def ingest_documents(self, documents: list[Document]) -> dict:
        total_chunks = 0
        for doc in documents:
            self.store.delete_by_doc_id(doc.doc_id)
            chunks = self.chunker.chunk_document(doc)
            chunks = self.embedder.embed_chunks(chunks)
            self.store.upsert_chunks(chunks)
            total_chunks += len(chunks)
        return {"documents": len(documents), "chunks": total_chunks}

    def query(self, question: str, top_k: int = 5) -> list[dict]:
        query_embedding = self.embedder.embed_texts([question])[0]
        results = self.store.search(query_embedding, top_k=top_k)
        return [{"chunk_id": r.chunk_id, "content": r.content[:200],
                 "metadata": r.metadata} for r in results]

if __name__ == "__main__":
    pipeline = VectorPipeline(chunk_size=256)

    docs = [
        Document(doc_id="doc_001", content="Python是一种广泛使用的高级编程语言。它的设计哲学强调代码的可读性。"
                 "Python支持多种编程范式,包括面向对象、命令式、函数式和过程式编程。"
                 "Python拥有一个庞大的标准库,涵盖了网络通信、文本处理、数据库接口等各个方面。",
                 metadata={"source": "wiki", "category": "programming"},
                 source="wikipedia"),
        Document(doc_id="doc_002", content="Apache Airflow是一个开源的工作流管理平台。"
                 "它允许用户以编程方式编写、调度和监控工作流。"
                 "Airflow的核心概念是DAG(有向无环图),用于定义任务之间的依赖关系。"
                 "2026年发布的Airflow 3.0带来了原生async支持。",
                 metadata={"source": "docs", "category": "data-engineering"},
                 source="airflow_docs"),
    ]

    result = pipeline.ingest_documents(docs)
    print(f"Ingestion result: {result}")

    query_result = pipeline.query("什么是Python编程语言?", top_k=3)
    print(f"\nQuery results: {json.dumps(query_result, indent=2, ensure_ascii=False)}")

Pattern 5:Production Monitoring with Prometheus + Data Quality Alerts

问题场景

数据流水线静默失败,数据延迟3小时才发现;特征值分布漂移没人知道,模型推理结果越来越离谱;向量存储的检索延迟P99从50ms飙升到5s,用户投诉才后知后觉。

解决方案

用Prometheus采集指标,自定义数据质量指标,Grafana可视化,AlertManager告警。

# monitoring.py
import time
import threading
from dataclasses import dataclass, field
from typing import Callable, Optional
from prometheus_client import Counter, Gauge, Histogram, start_http_server

pipeline_runs_total = Counter(
    "pipeline_runs_total",
    "Total pipeline runs",
    ["pipeline_name", "status"],
)

pipeline_duration_seconds = Histogram(
    "pipeline_duration_seconds",
    "Pipeline execution duration",
    ["pipeline_name"],
    buckets=[10, 30, 60, 120, 300, 600, 1800, 3600],
)

records_processed = Counter(
    "records_processed_total",
    "Total records processed",
    ["pipeline_name", "stage"],
)

data_quality_score = Gauge(
    "data_quality_score",
    "Data quality score (0-1)",
    ["pipeline_name", "check_name"],
)

feature_drift_score = Gauge(
    "feature_drift_score",
    "Feature drift score (PSI)",
    ["pipeline_name", "feature_name"],
)

vector_search_latency = Histogram(
    "vector_search_latency_seconds",
    "Vector search latency",
    ["pipeline_name"],
    buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0],
)

@dataclass
class QualityCheck:
    name: str
    check_fn: Callable[[dict], float]
    threshold: float = 0.8

@dataclass
class DriftAlert:
    feature_name: str
    psi_score: float
    threshold: float = 0.2
    is_drifted: bool = False

class PipelineMonitor:
    def __init__(self, pipeline_name: str, metrics_port: int = 9090):
        self.pipeline_name = pipeline_name
        self.metrics_port = metrics_port
        self._quality_checks: list[QualityCheck] = []
        self._drift_threshold = 0.2
        self._server_started = False

    def start_metrics_server(self):
        if not self._server_started:
            start_http_server(self.metrics_port)
            self._server_started = True
            print(f"Metrics server started on port {self.metrics_port}")

    def add_quality_check(self, name: str, check_fn: Callable, threshold: float = 0.8):
        self._quality_checks.append(QualityCheck(name=name, check_fn=check_fn, threshold=threshold))

    def record_pipeline_run(self, status: str, duration: float):
        pipeline_runs_total.labels(self.pipeline_name, status).inc()
        pipeline_duration_seconds.labels(self.pipeline_name).observe(duration)

    def record_records_processed(self, stage: str, count: int):
        records_processed.labels(self.pipeline_name, stage).inc(count)

    def run_quality_checks(self, data: dict) -> dict:
        results = {}
        for check in self._quality_checks:
            score = check.check_fn(data)
            data_quality_score.labels(self.pipeline_name, check.name).set(score)
            passed = score >= check.threshold
            results[check.name] = {"score": score, "threshold": check.threshold, "passed": passed}
            if not passed:
                print(f"ALERT: Quality check '{check.name}' failed: score={score:.3f} < threshold={check.threshold}")
        return results

    def check_feature_drift(self, reference_stats: dict, current_stats: dict) -> list[DriftAlert]:
        alerts = []
        for feature_name in reference_stats:
            if feature_name not in current_stats:
                continue
            psi = self._calculate_psi(reference_stats[feature_name], current_stats[feature_name])
            feature_drift_score.labels(self.pipeline_name, feature_name).set(psi)
            is_drifted = psi > self._drift_threshold
            alert = DriftAlert(
                feature_name=feature_name,
                psi_score=psi,
                threshold=self._drift_threshold,
                is_drifted=is_drifted,
            )
            alerts.append(alert)
            if is_drifted:
                print(f"ALERT: Feature drift detected for '{feature_name}': PSI={psi:.4f} > threshold={self._drift_threshold}")
        return alerts

    @staticmethod
    def _calculate_psi(expected: dict, actual: dict, bucket_count: int = 10) -> float:
        import numpy as np
        e_vals = np.array(list(expected.values())) if isinstance(expected, dict) else np.array(expected)
        a_vals = np.array(list(actual.values())) if isinstance(actual, dict) else np.array(actual)
        e_probs = e_vals / e_vals.sum() if e_vals.sum() > 0 else np.ones_like(e_vals) / len(e_vals)
        a_probs = a_vals / a_vals.sum() if a_vals.sum() > 0 else np.ones_like(a_vals) / len(a_vals)
        e_probs = np.clip(e_probs, 1e-6, None)
        a_probs = np.clip(a_probs, 1e-6, None)
        psi = float(np.sum((a_probs - e_probs) * np.log(a_probs / e_probs)))
        return psi

    def record_vector_search_latency(self, latency: float):
        vector_search_latency.labels(self.pipeline_name).observe(latency)

if __name__ == "__main__":
    monitor = PipelineMonitor("ai_data_pipeline", metrics_port=9090)
    monitor.start_metrics_server()

    monitor.add_quality_check(
        "null_rate",
        lambda d: 1.0 - d.get("null_count", 0) / max(d.get("total", 1), 1),
        threshold=0.95,
    )
    monitor.add_quality_check(
        "schema_compliance",
        lambda d: d.get("valid_count", 0) / max(d.get("total", 1), 1),
        threshold=0.99,
    )

    start_time = time.time()
    monitor.record_records_processed("extract", 50000)
    monitor.record_records_processed("transform", 48000)
    monitor.record_records_processed("load", 47500)
    duration = time.time() - start_time

    monitor.record_pipeline_run("success", duration)

    quality_data = {"null_count": 50, "total": 50000, "valid_count": 49800}
    quality_results = monitor.run_quality_checks(quality_data)
    print(f"Quality results: {quality_results}")

    ref_stats = {"age": {"bins": [100, 200, 300, 250, 150]}, "amount": {"bins": [50, 150, 300, 350, 150]}}
    cur_stats = {"age": {"bins": [80, 180, 350, 250, 140]}, "amount": {"bins": [50, 150, 300, 350, 150]}}
    drift_alerts = monitor.check_feature_drift(
        {k: v["bins"] for k, v in ref_stats.items()},
        {k: v["bins"] for k, v in cur_stats.items()},
    )
    print(f"Drift alerts: {[(a.feature_name, a.psi_score, a.is_drifted) for a in drift_alerts]}")

    print(f"\nMetrics available at http://localhost:9090/metrics")
    time.sleep(300)
# prometheus.yml
global:
  scrape_interval: 15s

scrape_configs:
  - job_name: "data_pipeline"
    static_configs:
      - targets: ["localhost:9090"]

rule_files:
  - "alerts.yml"
# alerts.yml
groups:
  - name: data_pipeline_alerts
    rules:
      - alert: PipelineRunFailed
        expr: increase(pipeline_runs_total{status="failed"}[5m]) > 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Pipeline {{ $labels.pipeline_name }} failed"

      - alert: DataQualityDegraded
        expr: data_quality_score < 0.8
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Data quality check {{ $labels.check_name }} below threshold"

      - alert: FeatureDriftDetected
        expr: feature_drift_score > 0.2
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Feature {{ $labels.feature_name }} drift detected (PSI={{ $value }})"

      - alert: VectorSearchLatencyHigh
        expr: histogram_quantile(0.99, rate(vector_search_latency_seconds_bucket[5m])) > 1.0
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Vector search P99 latency above 1s"

5 Common Pitfalls and Solutions

1. ETL脚本硬编码数据库连接信息

问题:连接字符串直接写在DAG代码里,换环境就要改代码,密码泄露到Git。

解决方案:使用Airflow Connection和Environment Variable。

from airflow.hooks.base import BaseHook

def get_connection():
    conn = BaseHook.get_connection("postgres_source")
    return f"postgresql://{conn.login}:{conn.password}@{conn.host}:{conn.port}/{conn.schema}"

# 或者使用环境变量
import os
db_url = os.environ["DATABASE_URL"]

2. 数据校验只做Schema检查不做统计检查

问题:Schema校验通过但数据分布异常(如age全是999),Pydantic只管格式不管统计。

解决方案:Pydantic校验格式 + Great Expectations校验统计分布,双重防线。

# Pydantic: 格式校验
class UserSchema(BaseModel):
    age: int = Field(..., ge=0, le=150)

# Great Expectations: 统计校验
ge_df.expect_column_mean_to_be_between("age", min_value=18, max_value=65)
ge_df.expect_column_values_to_be_in_set("country", ["CN", "US", "JP", "UK"])

3. 特征工程不考虑数据泄露

问题:用未来数据计算特征(如用全量数据的均值做归一化),训练时表现好,上线就崩。

解决方案:严格使用训练集统计量做变换,时间窗口特征只用历史数据。

from sklearn.preprocessing import StandardScaler

train_stats = df[df["date"] < "2026-01-01"]["amount"].agg(["mean", "std"])
df["amount_scaled"] = (df["amount"] - train_stats["mean"]) / train_stats["std"]

4. 向量数据不做增量更新

问题:每次全量重建向量索引,10万条文档要跑2小时,数据永远不是最新的。

解决方案:基于文档hash的增量更新,只处理变更部分。

def incremental_update(documents: list[Document], store: VectorStore) -> dict:
    updated = 0
    for doc in documents:
        content_hash = hashlib.md5(doc.content.encode()).hexdigest()
        existing = store.get_doc_metadata(doc.doc_id)
        if existing and existing.get("content_hash") == content_hash:
            continue
        store.delete_by_doc_id(doc.doc_id)
        chunks = pipeline.chunker.chunk_document(doc)
        chunks = pipeline.embedder.embed_chunks(chunks)
        for chunk in chunks:
            chunk.metadata["content_hash"] = content_hash
        store.upsert_chunks(chunks)
        updated += 1
    return {"checked": len(documents), "updated": updated}

5. 监控只看Pipeline是否运行,不看数据质量

问题:Airflow显示DAG成功,但数据量暴跌90%或特征值全为NULL,线上模型已经废了。

解决方案:在DAG末尾加数据质量门禁,质量不达标则阻断下游。

def quality_gate_check(**context):
    ti = context["ti"]
    loaded_count = ti.xcom_pull(task_ids="load", key="return_value")
    expected_min = context["params"]["expected_min_records"]

    if loaded_count < expected_min:
        raise ValueError(f"Quality gate failed: {loaded_count} < {expected_min}")

    quality_score = run_quality_checks()
    if quality_score < 0.9:
        raise ValueError(f"Quality gate failed: score={quality_score:.2f}")

    return "PASSED"

quality_gate = PythonOperator(
    task_id="quality_gate",
    python_callable=quality_gate_check,
    params={"expected_min_records": 10000},
)

10 Common Error Troubleshooting

# 错误信息 原因 解决方案
1 Airflow DAG import error Python路径或依赖缺失 检查PYTHONPATH,确保requirements.txt完整安装
2 Pydantic ValidationError: field required 上游数据字段缺失或命名变更 添加Optional默认值,配置model_config = {"extra": "allow"}兼容
3 Great Expectations expectation failed 数据分布偏移或脏数据混入 检查数据源变更,调整expectation阈值或增加数据清洗步骤
4 Polars SchemaError: dtype mismatch Pandas→Polars转换时类型推断错误 显式指定schema: pl.DataFrame(data, schema={"col": pl.Float64})
5 OpenAI API RateLimitError Embedding请求超过API限速 实现batch+retry: 分批调用,加tenacity指数退避重试
6 numpy.linalg.LinAlgError in cosine similarity 向量维度不匹配或零向量 归一化前检查向量范数: np.linalg.norm(v) > 1e-8
7 Prometheus duplicate label error 同一label组合重复注册指标 使用模块级指标定义,避免在函数内重复创建Counter/Gauge
8 MemoryError on large dataset groupby Pandas全量聚合内存不足 切换Polars或分块处理: pd.read_csv(chunksize=100000)
9 PSI calculation returns NaN 分桶后某些桶频数为0 添加平滑: probs = np.clip(probs, 1e-6, None)
10 Airflow task timeout 单个Task执行时间超过限制 增大execution_timeout,或拆分为多个子任务并行执行

Advanced Optimization Techniques

1. 增量ETL:只处理变更数据

全量ETL在大数据量下不可持续,通过CDC(Change Data Capture)或时间戳标记只处理增量。

def incremental_extract(last_run_time: str) -> pd.DataFrame:
    conn = get_connection()
    query = f"""
        SELECT * FROM user_events
        WHERE updated_at > '{last_run_time}'
        ORDER BY updated_at
    """
    return pd.read_sql(query, conn)

def get_last_run_time(pipeline_name: str) -> str:
    import redis
    r = redis.Redis()
    return r.get(f"pipeline:{pipeline_name}:last_run") or "2026-01-01T00:00:00"

def save_last_run_time(pipeline_name: str, run_time: str):
    import redis
    r = redis.Redis()
    r.set(f"pipeline:{pipeline_name}:last_run", run_time)

2. 特征缓存与版本管理

特征计算结果缓存到Redis,避免重复计算;版本化管理确保训练和推理使用相同特征。

import redis
import json
import hashlib

class FeatureCache:
    def __init__(self, redis_url: str = "redis://localhost:6379/2"):
        self.r = redis.from_url(redis_url)
        self.ttl = 86400

    def _cache_key(self, feature_name: str, entity_id: str, version: str) -> str:
        raw = f"{feature_name}:{entity_id}:{version}"
        return f"feat:{hashlib.md5(raw.encode()).hexdigest()}"

    def get(self, feature_name: str, entity_id: str, version: str = "v1") -> dict | None:
        key = self._cache_key(feature_name, entity_id, version)
        cached = self.r.get(key)
        return json.loads(cached) if cached else None

    def set(self, feature_name: str, entity_id: str, value: dict, version: str = "v1"):
        key = self._cache_key(feature_name, entity_id, version)
        self.r.setex(key, self.ttl, json.dumps(value, default=str))

3. Embedding批量并行加速

单条Embedding调用太慢,批量+异步并行大幅提升吞吐。

import asyncio
from openai import AsyncOpenAI

async def async_embed_batch(texts: list[str], batch_size: int = 2048,
                            model: str = "text-embedding-3-small") -> list[list[float]]:
    client = AsyncOpenAI()
    all_embeddings = []
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i + batch_size]
        response = await client.embeddings.create(input=batch, model=model)
        all_embeddings.extend([item.embedding for item in response.data])
    return all_embeddings

async def parallel_embed(documents: list[str], concurrency: int = 5) -> list[list[float]]:
    semaphore = asyncio.Semaphore(concurrency)

    async def limited_embed(batch):
        async with semaphore:
            return await async_embed_batch(batch)

    tasks = [limited_embed(documents[i:i+100]) for i in range(0, len(documents), 100)]
    results = await asyncio.gather(*tasks)
    return [emb for batch in results for emb in batch]

4. 向量索引优化:HNSW参数调优

PGVector的HNSW索引参数直接影响检索精度和速度。

-- 创建HNSW索引,调优参数
CREATE INDEX CONCURRENTLY idx_documents_embedding_hnsw
ON documents USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 200);

-- 查询时设置ef_search
SET hnsw.ef_search = 100;

-- 增量插入后重建索引(大批量更新时)
REINDEX INDEX CONCURRENTLY idx_documents_embedding_hnsw;

5. 数据质量SLA自动化

将数据质量检查结果写入SLA表,自动计算SLA达标率。

from datetime import datetime
import psycopg2

def record_sla(pipeline_name: str, check_name: str, score: float,
               threshold: float, passed: bool):
    conn = psycopg2.connect(dsn=os.environ["WAREHOUSE_URL"])
    cur = conn.cursor()
    cur.execute("""
        INSERT INTO data_quality_sla
        (pipeline_name, check_name, score, threshold, passed, checked_at)
        VALUES (%s, %s, %s, %s, %s, %s)
    """, (pipeline_name, check_name, score, threshold, passed, datetime.utcnow()))
    conn.commit()
    conn.close()

def get_sla_rate(pipeline_name: str, days: int = 30) -> float:
    conn = psycopg2.connect(dsn=os.environ["WAREHOUSE_URL"])
    cur = conn.cursor()
    cur.execute("""
        SELECT COUNT(*) FILTER (WHERE passed) * 100.0 / COUNT(*)
        FROM data_quality_sla
        WHERE pipeline_name = %s AND checked_at >= NOW() - INTERVAL '%s days'
    """, (pipeline_name, days))
    rate = cur.fetchone()[0]
    conn.close()
    return float(rate) if rate else 0.0

Comparison Analysis

维度 Apache Airflow Prefect Dagster Luigi Mage
编排模式 DAG定义式 函数式+装饰器 资产定义式 任务依赖式 Notebook式
数据校验 需自行集成 内置校验 原生Type系统 内置
特征管理 需自行集成 需自行集成 原生Asset 需自行集成
向量流水线 需自行集成 需自行集成 需自行集成 需自行集成
监控 基础UI 原生Cloud UI 原生Dagit 基础 原生UI
学习曲线 中等 中高
社区生态 最成熟 快速增长 增长中 衰退 新兴
Python原生
2026推荐度 ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐ ⭐⭐⭐⭐ ⭐⭐ ⭐⭐⭐
适用场景 企业级ETL 中小项目快速启动 数据资产密集型 简单批处理 数据分析团队
数据校验方案 Pydantic v2 Great Expectations Pandera TFX Data Validation
校验速度 极快(Rust核心) 中等 中等
Schema定义 Python类 YAML/Code Python类 Protobuf
统计校验 不支持 中等
自定义规则 灵活 灵活 中等 中等
数据剖析 不支持 不支持
适用场景 格式校验 全面数据质量 轻量Schema TensorFlow生态

  • JSON格式化 — 调试Airflow DAG配置和Pipeline数据时格式化JSON
  • Base64编码 — 处理向量数据传输和API认证的编码需求
  • cURL转代码 — 将API调试的cURL命令转为Python代码用于ETL提取

Summary

Python AI数据流水线在2026年已经形成了成熟的方法论:Airflow编排是骨架——DAG定义任务依赖,自动重试和告警保障可靠性;数据校验是免疫系统——Pydantic拦截格式错误,Great Expectations拦截统计异常;特征工程是核心价值——Pandas兼容存量代码,Polars加速大数据计算;向量流水线是AI基础设施——智能分块+增量更新确保RAG数据新鲜度;监控是生产环境的眼睛——Prometheus采集指标,数据质量SLA量化保障。记住:没有数据质量保障的流水线,跑得越快错得越远。


Further Reading

本站提供浏览器本地工具,免注册即可试用 →

#AI数据流水线#数据预处理#特征工程#Apache Airflow#Python#2026#AI与大数据