Python AI Data Pipeline: 5 Production Patterns from ETL to Feature Store

AI与大数据

Data Quality Collapse, ETL Scripts Out of Control, Feature Deployment Delayed a Week

At 3 AM, the model training task crashes because upstream changed user_id to userId — debugging until dawn. Feature engineering scripts are scattered across 10 Jupyter Notebooks — deploying a new feature requires a week of data engineer scheduling. The RAG system's vector data is perpetually out of sync with source documents — retrieval results are completely irrelevant. In 2026, Python AI data pipelines have evolved from "Nice to Have" to "Must Have" — Airflow 3.0 brings native async def support, Pydantic v2 delivers 5-50x data validation performance improvement, and Polars outperforms Pandas by 10x+ in feature engineering scenarios.

This article covers 5 production patterns, guiding you through Airflow ETL orchestration → Pydantic + GE data validation → Pandas/Polars feature engineering → RAG vector data pipeline → Prometheus production monitoring with complete runnable Python code at every step.


Key Takeaways

  • Master Apache Airflow 3.0 DAG orchestration patterns for ETL dependency management and failure retry
  • Build data validation defenses with Pydantic v2 + Great Expectations to reject dirty data from entering pipelines
  • Use Pandas + Polars hybrid approach for feature engineering, balancing ecosystem compatibility and computation performance
  • Build a RAG vector data pipeline with the complete chain from document chunking to embedding to vector storage
  • Implement Prometheus + Grafana data quality monitoring with second-level alerting for pipeline anomalies
  • Understand 5 common data pipeline pitfalls and their production-grade solutions
  • Master advanced optimization techniques including incremental processing, caching, and parallel acceleration

Table of Contents

  1. Architecture Overview: AI Data Pipeline Global Architecture
  2. Pattern 1: ETL Pipeline with Apache Airflow + Python
  3. Pattern 2: Data Validation with Pydantic v2 + Great Expectations
  4. Pattern 3: Feature Engineering Pipeline with Pandas + Polars
  5. Pattern 4: Vector Data Pipeline for RAG
  6. Pattern 5: Production Monitoring with Prometheus + Data Quality Alerts
  7. 5 Common Pitfalls and Solutions
  8. 10 Common Error Troubleshooting
  9. Advanced Optimization Techniques
  10. Comparison Analysis
  11. Recommended Online Tools
  12. Summary & Further Reading

Architecture Overview

┌─────────────────────────────────────────────────────────────────┐
│                    AI Data Pipeline Architecture                 │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌──────────┐    ┌──────────────┐    ┌──────────────────────┐  │
│  │  Data     │    │  Airflow DAG │    │  Pydantic + GE       │  │
│  │  Sources  │───▶│  Orchestration│───▶│  Data Validation     │  │
│  │  (API/DB/ │    │  (ETL Sched) │    │  (Schema + Rules)    │  │
│  │   Files)  │    └──────────────┘    └──────────┬───────────┘  │
│  └──────────┘                                    │              │
│                                                   ▼              │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │              Feature Engineering Pipeline                 │  │
│  │  ┌─────────┐   ┌──────────┐   ┌──────────────────────┐ │  │
│  │  │ Pandas  │──▶│  Polars  │──▶│  Feature Store       │ │  │
│  │  │ (Compat)│   │ (Speed)  │   │  (Redis/Feast)       │ │  │
│  │  └─────────┘   └──────────┘   └──────────────────────┘ │  │
│  └──────────────────────────────────────────────────────────┘  │
│                                                   │              │
│                                                   ▼              │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │              Vector Data Pipeline (RAG)                   │  │
│  │  ┌──────────┐  ┌──────────┐  ┌───────────────────────┐ │  │
│  │  │ Chunking │─▶│ Embedding│─▶│  Vector Store         │ │  │
│  │  │ (Split)  │  │ (Model)  │  │  (PGVector/Chroma)    │ │  │
│  │  └──────────┘  └──────────┘  └───────────────────────┘ │  │
│  └──────────────────────────────────────────────────────────┘  │
│                                                   │              │
│                                                   ▼              │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │              Production Monitoring                        │  │
│  │  ┌────────────┐  ┌───────────┐  ┌────────────────────┐ │  │
│  │  │ Prometheus │─▶│  Grafana  │─▶│  AlertManager      │ │  │
│  │  │ (Metrics)  │  │ (Dashboard)│  │  (Notifications)   │ │  │
│  │  └────────────┘  └───────────┘  └────────────────────┘ │  │
│  └──────────────────────────────────────────────────────────┘  │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Pattern 1: ETL Pipeline with Apache Airflow + Python

Problem Scenario

Every day you need to extract data from 3 databases, 2 APIs, and 5 CSV files, then clean and load into the data warehouse. Manual scripts can't track execution status, upstream failures go unnoticed downstream, and retries are entirely manual.

Solution

Use Apache Airflow 3.0 to orchestrate ETL DAGs with task dependencies, automatic retries, and failure alerting.

# dags/etl_pipeline.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.sensors.python import PythonSensor

default_args = {
    "owner": "data-team",
    "depends_on_past": False,
    "email_on_failure": True,
    "email": ["data-team@company.com"],
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
}

with DAG(
    dag_id="etl_daily_pipeline",
    default_args=default_args,
    description="Daily ETL pipeline for AI data",
    schedule="0 2 * * *",
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=["etl", "production"],
) as dag:

    def extract_from_postgres(**context):
        import psycopg2
        conn = psycopg2.connect(
            host="postgres-host",
            database="source_db",
            user="reader",
            password="***",
        )
        cur = conn.cursor()
        cur.execute("SELECT * FROM user_events WHERE created_at >= CURRENT_DATE - INTERVAL '1 day'")
        rows = cur.fetchall()
        conn.close()
        context["ti"].xcom_push(key="postgres_rows", value=len(rows))
        return rows

    def extract_from_api(**context):
        import requests
        resp = requests.get(
            "https://api.example.com/v2/data",
            headers={"Authorization": "Bearer ***"},
            params={"date": context["ds"]},
        )
        resp.raise_for_status()
        data = resp.json()
        context["ti"].xcom_push(key="api_records", value=len(data))
        return data

    def extract_from_files(**context):
        import glob
        import pandas as pd
        files = glob.glob(f"/data/input/{context['ds']}/*.csv")
        frames = [pd.read_csv(f) for f in files]
        combined = pd.concat(frames, ignore_index=True) if frames else pd.DataFrame()
        context["ti"].xcom_push(key="file_count", value=len(files))
        return combined.to_dict("records")

    def transform_data(**context):
        import pandas as pd
        ti = context["ti"]
        pg_rows = ti.xcom_pull(task_ids="extract_postgres", key="return_value")
        api_data = ti.xcom_pull(task_ids="extract_api", key="return_value")
        file_data = ti.xcom_pull(task_ids="extract_files", key="return_value")

        df_pg = pd.DataFrame(pg_rows)
        df_api = pd.DataFrame(api_data)
        df_files = pd.DataFrame(file_data)

        df_all = pd.concat([df_pg, df_api, df_files], ignore_index=True)
        df_all = df_all.drop_duplicates(subset=["user_id", "event_id"])
        df_all = df_all.dropna(subset=["user_id", "event_type"])
        df_all["event_date"] = pd.to_datetime(df_all["created_at"]).dt.date

        context["ti"].xcom_push(key="total_records", value=len(df_all))
        return df_all.to_dict("records")

    def load_to_warehouse(**context):
        import psycopg2
        ti = context["ti"]
        records = ti.xcom_pull(task_ids="transform", key="return_value")
        conn = psycopg2.connect(
            host="warehouse-host",
            database="analytics_db",
            user="loader",
            password="***",
        )
        cur = conn.cursor()
        cur.execute("TRUNCATE TABLE staging.user_events_daily")
        insert_sql = """
            INSERT INTO staging.user_events_daily
            (user_id, event_id, event_type, event_date, created_at)
            VALUES (%s, %s, %s, %s, %s)
        """
        batch = [(r["user_id"], r["event_id"], r["event_type"],
                  str(r["event_date"]), r["created_at"]) for r in records[:1000]]
        cur.executemany(insert_sql, batch)
        conn.commit()
        conn.close()
        return len(batch)

    def data_quality_check(**context):
        ti = context["ti"]
        total = ti.xcom_pull(task_ids="transform", key="total_records")
        loaded = ti.xcom_pull(task_ids="load", key="return_value")
        if total and loaded and loaded / total < 0.95:
            raise ValueError(f"Data quality check failed: only {loaded}/{total} records loaded")
        return "PASSED"

    start = EmptyOperator(task_id="start")
    extract_pg = PythonOperator(task_id="extract_postgres", python_callable=extract_from_postgres)
    extract_api = PythonOperator(task_id="extract_api", python_callable=extract_from_api)
    extract_files = PythonOperator(task_id="extract_files", python_callable=extract_from_files)
    transform = PythonOperator(task_id="transform", python_callable=transform_data)
    load = PythonOperator(task_id="load", python_callable=load_to_warehouse)
    quality_check = PythonOperator(task_id="quality_check", python_callable=data_quality_check)
    end = EmptyOperator(task_id="end")

    start >> [extract_pg, extract_api, extract_files] >> transform >> load >> quality_check >> end
# Start Airflow
pip install apache-airflow==3.0.0
airflow db init
airflow dags list
airflow dags trigger etl_daily_pipeline

Pattern 2: Data Validation with Pydantic v2 + Great Expectations

Problem Scenario

Upstream systems silently change field types — age from int to string, email format no longer validated — dirty data flows straight into model training, and accuracy plummets before anyone notices.

Solution

Use Pydantic v2 for schema validation and Great Expectations for statistical rule validation — a double defense line against dirty data.

# schemas.py
from pydantic import BaseModel, Field, EmailStr, field_validator
from datetime import datetime
from typing import Optional

class UserEventSchema(BaseModel):
    user_id: str = Field(..., min_length=1, max_length=64, description="Unique user identifier")
    event_id: str = Field(..., pattern=r"^evt_[a-zA-Z0-9]{16}$", description="Event ID format")
    event_type: str = Field(..., description="Event type")
    event_date: datetime = Field(..., description="Event date")
    age: int = Field(..., ge=0, le=150, description="User age")
    email: EmailStr = Field(..., description="User email")
    amount: Optional[float] = Field(None, ge=0, description="Amount")

    @field_validator("event_type")
    @classmethod
    def validate_event_type(cls, v):
        allowed = {"click", "purchase", "signup", "logout", "view"}
        if v not in allowed:
            raise ValueError(f"event_type must be one of {allowed}, got {v}")
        return v

    model_config = {"extra": "forbid"}

class BatchValidationResult(BaseModel):
    total: int
    valid: int
    invalid: int
    errors: list[dict]
# validation_pipeline.py
from schemas import UserEventSchema, BatchValidationResult
from great_expectations.dataset import PandasDataset
import pandas as pd
import json

def validate_with_pydantic(records: list[dict]) -> BatchValidationResult:
    valid_records = []
    errors = []
    for i, record in enumerate(records):
        try:
            UserEventSchema(**record)
            valid_records.append(record)
        except Exception as e:
            errors.append({"index": i, "record": record, "error": str(e)})
    return BatchValidationResult(
        total=len(records),
        valid=len(valid_records),
        invalid=len(errors),
        errors=errors,
    )

def validate_with_great_expectations(df: pd.DataFrame) -> dict:
    ge_df = PandasDataset(df)
    results = {}

    results["row_count"] = ge_df.expect_table_row_count_to_be_between(min_value=1, max_value=10_000_000)
    results["user_id_not_null"] = ge_df.expect_column_values_to_not_be_null("user_id")
    results["age_range"] = ge_df.expect_column_values_to_be_between("age", min_value=0, max_value=150)
    results["event_type_set"] = ge_df.expect_column_values_to_be_in_set(
        "event_type", ["click", "purchase", "signup", "logout", "view"]
    )
    results["email_format"] = ge_df.expect_column_values_to_match_regex("email", r"^[\w\.-]+@[\w\.-]+\.\w+$")
    results["amount_non_negative"] = ge_df.expect_column_values_to_be_between(
        "amount", min_value=0, max_value=None
    )

    passed = sum(1 for r in results.values() if r["success"])
    failed = len(results) - passed
    return {"total_expectations": len(results), "passed": passed, "failed": failed, "details": results}

def run_validation_pipeline(records: list[dict]) -> dict:
    pydantic_result = validate_with_pydantic(records)

    valid_df = pd.DataFrame([r for i, r in enumerate(records) if i not in {e["index"] for e in pydantic_result.errors}])
    ge_result = validate_with_great_expectations(valid_df) if not valid_df.empty else {"total_expectations": 0, "passed": 0, "failed": 0}

    return {
        "pydantic_validation": pydantic_result.model_dump(),
        "ge_validation": ge_result,
        "overall_pass": pydantic_result.invalid == 0 and ge_result.get("failed", 0) == 0,
    }

if __name__ == "__main__":
    test_data = [
        {"user_id": "u001", "event_id": "evt_abc123def456gh7", "event_type": "click",
         "event_date": "2026-06-16T10:00:00", "age": 28, "email": "user@test.com", "amount": 99.5},
        {"user_id": "", "event_id": "evt_xyz789abc012ij3", "event_type": "signup",
         "event_date": "2026-06-16T11:00:00", "age": 35, "email": "new@test.com", "amount": 0},
        {"user_id": "u003", "event_id": "evt_bad_format", "event_type": "hack",
         "event_date": "2026-06-16T12:00:00", "age": -5, "email": "not-an-email", "amount": -10},
    ]
    result = run_validation_pipeline(test_data)
    print(json.dumps(result, indent=2, default=str))

Pattern 3: Feature Engineering Pipeline with Pandas + Polars

Problem Scenario

Feature engineering scripts are written in Pandas. When data grows from 1M to 50M rows, groupby + agg takes 30 minutes without returning results. Migrating to Polars is desirable but existing code is too extensive for a complete rewrite.

Solution

Pandas handles small data for backward compatibility, Polars handles large data for speed — zero-copy interchange via Arrow format.

# feature_pipeline.py
import pandas as pd
import polars as pl
from datetime import datetime, timedelta
import numpy as np

def generate_sample_data(n: int = 1_000_000) -> pd.DataFrame:
    np.random.seed(42)
    return pd.DataFrame({
        "user_id": np.random.choice([f"u{i:04d}" for i in range(1000)], n),
        "event_type": np.random.choice(["click", "purchase", "view", "signup"], n),
        "amount": np.random.exponential(50, n).round(2),
        "event_time": pd.date_range("2026-01-01", periods=n, freq="30s"),
        "category": np.random.choice(["electronics", "clothing", "food", "books"], n),
    })

def pandas_features(df: pd.DataFrame) -> pd.DataFrame:
    user_agg = df.groupby("user_id").agg(
        total_events=("event_type", "count"),
        total_amount=("amount", "sum"),
        avg_amount=("amount", "mean"),
        unique_categories=("category", "nunique"),
        first_event=("event_time", "min"),
        last_event=("event_time", "max"),
    )
    user_agg["active_days"] = (user_agg["last_event"] - user_agg["first_event"]).dt.days + 1
    user_agg["avg_daily_events"] = user_agg["total_events"] / user_agg["active_days"].clip(lower=1)
    return user_agg.reset_index()

def polars_features(df: pd.DataFrame) -> pd.DataFrame:
    pl_df = pl.from_pandas(df)

    result = pl_df.group_by("user_id").agg([
        pl.col("event_type").count().alias("total_events"),
        pl.col("amount").sum().alias("total_amount"),
        pl.col("amount").mean().alias("avg_amount"),
        pl.col("category").n_unique().alias("unique_categories"),
        pl.col("event_time").min().alias("first_event"),
        pl.col("event_time").max().alias("last_event"),
    ]).with_columns([
        (pl.col("last_event") - pl.col("first_event")).dt.total_days().add(1).alias("active_days"),
    ]).with_columns([
        (pl.col("total_events") / pl.col("active_days").clip(lower=1)).alias("avg_daily_events"),
    ])

    return result.to_pandas()

def time_window_features(df: pd.DataFrame, windows: list[str] = None) -> pd.DataFrame:
    if windows is None:
        windows = ["7d", "30d", "90d"]
    pl_df = pl.from_pandas(df)
    reference_date = pl_df.select(pl.col("event_time").max()).item()

    all_features = []
    for window in windows:
        days = int(window.replace("d", ""))
        cutoff = reference_date - timedelta(days=days)

        window_feat = pl_df.filter(pl.col("event_time") >= cutoff).group_by("user_id").agg([
            pl.col("amount").sum().alias(f"amount_sum_{window}"),
            pl.col("amount").mean().alias(f"amount_mean_{window}"),
            pl.col("event_type").count().alias(f"event_count_{window}"),
            pl.col("category").n_unique().alias(f"category_nunique_{window}"),
        ])
        all_features.append(window_feat)

    from functools import reduce
    merged = reduce(lambda a, b: a.join(b, on="user_id", how="outer"), all_features)
    return merged.to_pandas()

def category_encoding_features(df: pd.DataFrame) -> pd.DataFrame:
    pl_df = pl.from_pandas(df)
    pivot = pl_df.group_by(["user_id", "category"]).agg([
        pl.col("amount").sum().alias("category_amount"),
        pl.col("event_type").count().alias("category_count"),
    ]).pivot(
        index="user_id",
        on="category",
        values=["category_amount", "category_count"],
    ).fill_null(0)
    return pivot.to_pandas()

def run_feature_pipeline():
    print("Generating sample data...")
    df = generate_sample_data(1_000_000)
    print(f"Data shape: {df.shape}")

    print("\n--- Pandas Feature Engineering ---")
    start = datetime.now()
    pandas_result = pandas_features(df)
    print(f"Pandas time: {(datetime.now() - start).total_seconds():.2f}s")
    print(f"Pandas result shape: {pandas_result.shape}")

    print("\n--- Polars Feature Engineering ---")
    start = datetime.now()
    polars_result = polars_features(df)
    print(f"Polars time: {(datetime.now() - start).total_seconds():.2f}s")
    print(f"Polars result shape: {polars_result.shape}")

    print("\n--- Time Window Features ---")
    window_result = time_window_features(df)
    print(f"Window features shape: {window_result.shape}")

    print("\n--- Category Encoding Features ---")
    category_result = category_encoding_features(df)
    print(f"Category features shape: {category_result.shape}")

if __name__ == "__main__":
    run_feature_pipeline()

Pattern 4: Vector Data Pipeline for RAG

Problem Scenario

After the RAG system goes live, documents are updated but vectors aren't synced — retrieval results are always stale. The chunking strategy is too crude — long tables get truncated, code blocks are split in the middle, and embedding quality is abysmal.

Solution

Build a complete vector data pipeline: smart chunking → embedding → vector storage → incremental sync.

# vector_pipeline.py
import hashlib
import json
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class Document:
    doc_id: str
    content: str
    metadata: dict = field(default_factory=dict)
    source: str = ""
    updated_at: str = ""

@dataclass
class Chunk:
    chunk_id: str
    doc_id: str
    content: str
    embedding: Optional[list[float]] = None
    metadata: dict = field(default_factory=dict)

class SmartChunker:
    def __init__(self, chunk_size: int = 512, chunk_overlap: int = 64,
                 separators: list[str] = None):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.separators = separators or ["\n\n", "\n", ". ", ". ", " "]

    def chunk_document(self, doc: Document) -> list[Chunk]:
        text = doc.content
        chunks_text = self._recursive_split(text, self.separators)
        chunks = []
        for i, text in enumerate(chunks_text):
            if len(text.strip()) < 20:
                continue
            chunk_id = hashlib.md5(f"{doc.doc_id}:{i}:{text[:50]}".encode()).hexdigest()[:16]
            chunks.append(Chunk(
                chunk_id=chunk_id,
                doc_id=doc.doc_id,
                content=text,
                metadata={**doc.metadata, "chunk_index": i, "source": doc.source},
            ))
        return chunks

    def _recursive_split(self, text: str, separators: list[str]) -> list[str]:
        if not separators or len(text) <= self.chunk_size:
            return [text] if text.strip() else []

        sep = separators[0]
        parts = text.split(sep)

        result = []
        current = ""
        for part in parts:
            candidate = current + sep + part if current else part
            if len(candidate) <= self.chunk_size:
                current = candidate
            else:
                if current:
                    result.append(current)
                if len(part) > self.chunk_size:
                    sub_splits = self._recursive_split(part, separators[1:])
                    result.extend(sub_splits)
                    current = ""
                else:
                    current = part
        if current:
            result.append(current)
        return result

class EmbeddingService:
    def __init__(self, model_name: str = "text-embedding-3-small", api_key: str = ""):
        self.model_name = model_name
        self.api_key = api_key

    def embed_texts(self, texts: list[str]) -> list[list[float]]:
        try:
            from openai import OpenAI
            client = OpenAI(api_key=self.api_key)
            response = client.embeddings.create(input=texts, model=self.model_name)
            return [item.embedding for item in response.data]
        except ImportError:
            import numpy as np
            return [np.random.randn(1536).tolist() for _ in texts]

    def embed_chunks(self, chunks: list[Chunk], batch_size: int = 100) -> list[Chunk]:
        for i in range(0, len(chunks), batch_size):
            batch = chunks[i:i + batch_size]
            texts = [c.content for c in batch]
            embeddings = self.embed_texts(texts)
            for j, chunk in enumerate(batch):
                chunk.embedding = embeddings[j]
        return chunks

class VectorStore:
    def __init__(self, connection_string: str = ""):
        self.connection_string = connection_string
        self._store: dict[str, Chunk] = {}

    def upsert_chunks(self, chunks: list[Chunk]):
        for chunk in chunks:
            self._store[chunk.chunk_id] = chunk

    def search(self, query_embedding: list[float], top_k: int = 5) -> list[Chunk]:
        import numpy as np
        query_vec = np.array(query_embedding)
        scores = []
        for chunk in self._store.values():
            if chunk.embedding is None:
                continue
            chunk_vec = np.array(chunk.embedding)
            similarity = np.dot(query_vec, chunk_vec) / (np.linalg.norm(query_vec) * np.linalg.norm(chunk_vec) + 1e-8)
            scores.append((chunk, float(similarity)))
        scores.sort(key=lambda x: x[1], reverse=True)
        return [chunk for chunk, _ in scores[:top_k]]

    def delete_by_doc_id(self, doc_id: str):
        to_delete = [cid for cid, c in self._store.items() if c.doc_id == doc_id]
        for cid in to_delete:
            del self._store[cid]

class VectorPipeline:
    def __init__(self, chunk_size: int = 512, model_name: str = "text-embedding-3-small"):
        self.chunker = SmartChunker(chunk_size=chunk_size)
        self.embedder = EmbeddingService(model_name=model_name)
        self.store = VectorStore()

    def ingest_documents(self, documents: list[Document]) -> dict:
        total_chunks = 0
        for doc in documents:
            self.store.delete_by_doc_id(doc.doc_id)
            chunks = self.chunker.chunk_document(doc)
            chunks = self.embedder.embed_chunks(chunks)
            self.store.upsert_chunks(chunks)
            total_chunks += len(chunks)
        return {"documents": len(documents), "chunks": total_chunks}

    def query(self, question: str, top_k: int = 5) -> list[dict]:
        query_embedding = self.embedder.embed_texts([question])[0]
        results = self.store.search(query_embedding, top_k=top_k)
        return [{"chunk_id": r.chunk_id, "content": r.content[:200],
                 "metadata": r.metadata} for r in results]

if __name__ == "__main__":
    pipeline = VectorPipeline(chunk_size=256)

    docs = [
        Document(doc_id="doc_001", content="Python is a widely used high-level programming language. "
                 "Its design philosophy emphasizes code readability. "
                 "Python supports multiple programming paradigms, including object-oriented, imperative, "
                 "functional, and procedural programming. "
                 "Python has a large standard library covering networking, text processing, database interfaces, and more.",
                 metadata={"source": "wiki", "category": "programming"},
                 source="wikipedia"),
        Document(doc_id="doc_002", content="Apache Airflow is an open-source workflow management platform. "
                 "It allows users to programmatically author, schedule, and monitor workflows. "
                 "Airflow's core concept is the DAG (Directed Acyclic Graph), which defines dependencies between tasks. "
                 "Airflow 3.0 released in 2026 brings native async support.",
                 metadata={"source": "docs", "category": "data-engineering"},
                 source="airflow_docs"),
    ]

    result = pipeline.ingest_documents(docs)
    print(f"Ingestion result: {result}")

    query_result = pipeline.query("What is the Python programming language?", top_k=3)
    print(f"\nQuery results: {json.dumps(query_result, indent=2)}")

Pattern 5: Production Monitoring with Prometheus + Data Quality Alerts

Problem Scenario

Data pipelines fail silently — data lag of 3 hours goes unnoticed. Feature value distribution drifts without anyone knowing, and model inference results become increasingly unreliable. Vector store search latency P99 spikes from 50ms to 5s, discovered only after user complaints.

Solution

Use Prometheus to collect metrics, define custom data quality metrics, visualize with Grafana, and alert with AlertManager.

# monitoring.py
import time
import threading
from dataclasses import dataclass, field
from typing import Callable, Optional
from prometheus_client import Counter, Gauge, Histogram, start_http_server

pipeline_runs_total = Counter(
    "pipeline_runs_total",
    "Total pipeline runs",
    ["pipeline_name", "status"],
)

pipeline_duration_seconds = Histogram(
    "pipeline_duration_seconds",
    "Pipeline execution duration",
    ["pipeline_name"],
    buckets=[10, 30, 60, 120, 300, 600, 1800, 3600],
)

records_processed = Counter(
    "records_processed_total",
    "Total records processed",
    ["pipeline_name", "stage"],
)

data_quality_score = Gauge(
    "data_quality_score",
    "Data quality score (0-1)",
    ["pipeline_name", "check_name"],
)

feature_drift_score = Gauge(
    "feature_drift_score",
    "Feature drift score (PSI)",
    ["pipeline_name", "feature_name"],
)

vector_search_latency = Histogram(
    "vector_search_latency_seconds",
    "Vector search latency",
    ["pipeline_name"],
    buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0],
)

@dataclass
class QualityCheck:
    name: str
    check_fn: Callable[[dict], float]
    threshold: float = 0.8

@dataclass
class DriftAlert:
    feature_name: str
    psi_score: float
    threshold: float = 0.2
    is_drifted: bool = False

class PipelineMonitor:
    def __init__(self, pipeline_name: str, metrics_port: int = 9090):
        self.pipeline_name = pipeline_name
        self.metrics_port = metrics_port
        self._quality_checks: list[QualityCheck] = []
        self._drift_threshold = 0.2
        self._server_started = False

    def start_metrics_server(self):
        if not self._server_started:
            start_http_server(self.metrics_port)
            self._server_started = True
            print(f"Metrics server started on port {self.metrics_port}")

    def add_quality_check(self, name: str, check_fn: Callable, threshold: float = 0.8):
        self._quality_checks.append(QualityCheck(name=name, check_fn=check_fn, threshold=threshold))

    def record_pipeline_run(self, status: str, duration: float):
        pipeline_runs_total.labels(self.pipeline_name, status).inc()
        pipeline_duration_seconds.labels(self.pipeline_name).observe(duration)

    def record_records_processed(self, stage: str, count: int):
        records_processed.labels(self.pipeline_name, stage).inc(count)

    def run_quality_checks(self, data: dict) -> dict:
        results = {}
        for check in self._quality_checks:
            score = check.check_fn(data)
            data_quality_score.labels(self.pipeline_name, check.name).set(score)
            passed = score >= check.threshold
            results[check.name] = {"score": score, "threshold": check.threshold, "passed": passed}
            if not passed:
                print(f"ALERT: Quality check '{check.name}' failed: score={score:.3f} < threshold={check.threshold}")
        return results

    def check_feature_drift(self, reference_stats: dict, current_stats: dict) -> list[DriftAlert]:
        alerts = []
        for feature_name in reference_stats:
            if feature_name not in current_stats:
                continue
            psi = self._calculate_psi(reference_stats[feature_name], current_stats[feature_name])
            feature_drift_score.labels(self.pipeline_name, feature_name).set(psi)
            is_drifted = psi > self._drift_threshold
            alert = DriftAlert(
                feature_name=feature_name,
                psi_score=psi,
                threshold=self._drift_threshold,
                is_drifted=is_drifted,
            )
            alerts.append(alert)
            if is_drifted:
                print(f"ALERT: Feature drift detected for '{feature_name}': PSI={psi:.4f} > threshold={self._drift_threshold}")
        return alerts

    @staticmethod
    def _calculate_psi(expected: dict, actual: dict, bucket_count: int = 10) -> float:
        import numpy as np
        e_vals = np.array(list(expected.values())) if isinstance(expected, dict) else np.array(expected)
        a_vals = np.array(list(actual.values())) if isinstance(actual, dict) else np.array(actual)
        e_probs = e_vals / e_vals.sum() if e_vals.sum() > 0 else np.ones_like(e_vals) / len(e_vals)
        a_probs = a_vals / a_vals.sum() if a_vals.sum() > 0 else np.ones_like(a_vals) / len(a_vals)
        e_probs = np.clip(e_probs, 1e-6, None)
        a_probs = np.clip(a_probs, 1e-6, None)
        psi = float(np.sum((a_probs - e_probs) * np.log(a_probs / e_probs)))
        return psi

    def record_vector_search_latency(self, latency: float):
        vector_search_latency.labels(self.pipeline_name).observe(latency)

if __name__ == "__main__":
    monitor = PipelineMonitor("ai_data_pipeline", metrics_port=9090)
    monitor.start_metrics_server()

    monitor.add_quality_check(
        "null_rate",
        lambda d: 1.0 - d.get("null_count", 0) / max(d.get("total", 1), 1),
        threshold=0.95,
    )
    monitor.add_quality_check(
        "schema_compliance",
        lambda d: d.get("valid_count", 0) / max(d.get("total", 1), 1),
        threshold=0.99,
    )

    start_time = time.time()
    monitor.record_records_processed("extract", 50000)
    monitor.record_records_processed("transform", 48000)
    monitor.record_records_processed("load", 47500)
    duration = time.time() - start_time

    monitor.record_pipeline_run("success", duration)

    quality_data = {"null_count": 50, "total": 50000, "valid_count": 49800}
    quality_results = monitor.run_quality_checks(quality_data)
    print(f"Quality results: {quality_results}")

    ref_stats = {"age": {"bins": [100, 200, 300, 250, 150]}, "amount": {"bins": [50, 150, 300, 350, 150]}}
    cur_stats = {"age": {"bins": [80, 180, 350, 250, 140]}, "amount": {"bins": [50, 150, 300, 350, 150]}}
    drift_alerts = monitor.check_feature_drift(
        {k: v["bins"] for k, v in ref_stats.items()},
        {k: v["bins"] for k, v in cur_stats.items()},
    )
    print(f"Drift alerts: {[(a.feature_name, a.psi_score, a.is_drifted) for a in drift_alerts]}")

    print(f"\nMetrics available at http://localhost:9090/metrics")
    time.sleep(300)
# prometheus.yml
global:
  scrape_interval: 15s

scrape_configs:
  - job_name: "data_pipeline"
    static_configs:
      - targets: ["localhost:9090"]

rule_files:
  - "alerts.yml"
# alerts.yml
groups:
  - name: data_pipeline_alerts
    rules:
      - alert: PipelineRunFailed
        expr: increase(pipeline_runs_total{status="failed"}[5m]) > 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Pipeline {{ $labels.pipeline_name }} failed"

      - alert: DataQualityDegraded
        expr: data_quality_score < 0.8
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Data quality check {{ $labels.check_name }} below threshold"

      - alert: FeatureDriftDetected
        expr: feature_drift_score > 0.2
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Feature {{ $labels.feature_name }} drift detected (PSI={{ $value }})"

      - alert: VectorSearchLatencyHigh
        expr: histogram_quantile(0.99, rate(vector_search_latency_seconds_bucket[5m])) > 1.0
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Vector search P99 latency above 1s"

5 Common Pitfalls and Solutions

1. Hardcoded Database Connection Strings in ETL Scripts

Problem: Connection strings are hardcoded in DAG code — changing environments requires code changes, and passwords leak to Git.

Solution: Use Airflow Connections and Environment Variables.

from airflow.hooks.base import BaseHook

def get_connection():
    conn = BaseHook.get_connection("postgres_source")
    return f"postgresql://{conn.login}:{conn.password}@{conn.host}:{conn.port}/{conn.schema}"

# Or use environment variables
import os
db_url = os.environ["DATABASE_URL"]

2. Data Validation Only Checks Schema, Not Statistics

Problem: Schema validation passes but data distribution is abnormal (e.g., all ages are 999). Pydantic only validates format, not statistics.

Solution: Pydantic for format validation + Great Expectations for statistical distribution validation — double defense line.

# Pydantic: Format validation
class UserSchema(BaseModel):
    age: int = Field(..., ge=0, le=150)

# Great Expectations: Statistical validation
ge_df.expect_column_mean_to_be_between("age", min_value=18, max_value=65)
ge_df.expect_column_values_to_be_in_set("country", ["CN", "US", "JP", "UK"])

3. Feature Engineering Ignores Data Leakage

Problem: Using future data to compute features (e.g., normalizing with full dataset mean) — great training performance, catastrophic in production.

Solution: Strictly use training set statistics for transformations; time-window features must only use historical data.

from sklearn.preprocessing import StandardScaler

train_stats = df[df["date"] < "2026-01-01"]["amount"].agg(["mean", "std"])
df["amount_scaled"] = (df["amount"] - train_stats["mean"]) / train_stats["std"]

4. Vector Data Without Incremental Updates

Problem: Full vector index rebuild every time — 100K documents takes 2 hours, data is never current.

Solution: Document hash-based incremental updates — only process changed documents.

def incremental_update(documents: list[Document], store: VectorStore) -> dict:
    updated = 0
    for doc in documents:
        content_hash = hashlib.md5(doc.content.encode()).hexdigest()
        existing = store.get_doc_metadata(doc.doc_id)
        if existing and existing.get("content_hash") == content_hash:
            continue
        store.delete_by_doc_id(doc.doc_id)
        chunks = pipeline.chunker.chunk_document(doc)
        chunks = pipeline.embedder.embed_chunks(chunks)
        for chunk in chunks:
            chunk.metadata["content_hash"] = content_hash
        store.upsert_chunks(chunks)
        updated += 1
    return {"checked": len(documents), "updated": updated}

5. Monitoring Only Checks Pipeline Execution, Not Data Quality

Problem: Airflow shows DAG success, but data volume dropped 90% or feature values are all NULL — the production model is already broken.

Solution: Add data quality gates at the end of DAGs — block downstream if quality is below threshold.

def quality_gate_check(**context):
    ti = context["ti"]
    loaded_count = ti.xcom_pull(task_ids="load", key="return_value")
    expected_min = context["params"]["expected_min_records"]

    if loaded_count < expected_min:
        raise ValueError(f"Quality gate failed: {loaded_count} < {expected_min}")

    quality_score = run_quality_checks()
    if quality_score < 0.9:
        raise ValueError(f"Quality gate failed: score={quality_score:.2f}")

    return "PASSED"

quality_gate = PythonOperator(
    task_id="quality_gate",
    python_callable=quality_gate_check,
    params={"expected_min_records": 10000},
)

10 Common Error Troubleshooting

# Error Message Cause Solution
1 Airflow DAG import error Python path or missing dependencies Check PYTHONPATH, ensure requirements.txt is fully installed
2 Pydantic ValidationError: field required Upstream data field missing or renamed Add Optional defaults, configure model_config = {"extra": "allow"} for compatibility
3 Great Expectations expectation failed Data distribution drift or dirty data Check data source changes, adjust expectation thresholds or add data cleaning steps
4 Polars SchemaError: dtype mismatch Type inference errors during Pandas→Polars conversion Explicitly specify schema: pl.DataFrame(data, schema={"col": pl.Float64})
5 OpenAI API RateLimitError Embedding requests exceed API rate limit Implement batch+retry: batch calls, add tenacity exponential backoff retry
6 numpy.linalg.LinAlgError in cosine similarity Vector dimension mismatch or zero vectors Check vector norms before normalization: np.linalg.norm(v) > 1e-8
7 Prometheus duplicate label error Same label combination registering metrics repeatedly Use module-level metric definitions, avoid creating Counter/Gauge inside functions
8 MemoryError on large dataset groupby Pandas full aggregation runs out of memory Switch to Polars or chunk processing: pd.read_csv(chunksize=100000)
9 PSI calculation returns NaN Some bucket frequencies are zero after binning Add smoothing: probs = np.clip(probs, 1e-6, None)
10 Airflow task timeout Single task execution exceeds time limit Increase execution_timeout, or split into multiple sub-tasks for parallel execution

Advanced Optimization Techniques

1. Incremental ETL: Process Only Changed Data

Full ETL is unsustainable at scale — use CDC (Change Data Capture) or timestamp markers to process only increments.

def incremental_extract(last_run_time: str) -> pd.DataFrame:
    conn = get_connection()
    query = f"""
        SELECT * FROM user_events
        WHERE updated_at > '{last_run_time}'
        ORDER BY updated_at
    """
    return pd.read_sql(query, conn)

def get_last_run_time(pipeline_name: str) -> str:
    import redis
    r = redis.Redis()
    return r.get(f"pipeline:{pipeline_name}:last_run") or "2026-01-01T00:00:00"

def save_last_run_time(pipeline_name: str, run_time: str):
    import redis
    r = redis.Redis()
    r.set(f"pipeline:{pipeline_name}:last_run", run_time)

2. Feature Caching and Version Management

Cache feature computation results in Redis to avoid recomputation; version management ensures training and inference use the same features.

import redis
import json
import hashlib

class FeatureCache:
    def __init__(self, redis_url: str = "redis://localhost:6379/2"):
        self.r = redis.from_url(redis_url)
        self.ttl = 86400

    def _cache_key(self, feature_name: str, entity_id: str, version: str) -> str:
        raw = f"{feature_name}:{entity_id}:{version}"
        return f"feat:{hashlib.md5(raw.encode()).hexdigest()}"

    def get(self, feature_name: str, entity_id: str, version: str = "v1") -> dict | None:
        key = self._cache_key(feature_name, entity_id, version)
        cached = self.r.get(key)
        return json.loads(cached) if cached else None

    def set(self, feature_name: str, entity_id: str, value: dict, version: str = "v1"):
        key = self._cache_key(feature_name, entity_id, version)
        self.r.setex(key, self.ttl, json.dumps(value, default=str))

3. Embedding Batch Parallel Acceleration

Single embedding calls are too slow — batch + async parallel dramatically improves throughput.

import asyncio
from openai import AsyncOpenAI

async def async_embed_batch(texts: list[str], batch_size: int = 2048,
                            model: str = "text-embedding-3-small") -> list[list[float]]:
    client = AsyncOpenAI()
    all_embeddings = []
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i + batch_size]
        response = await client.embeddings.create(input=batch, model=model)
        all_embeddings.extend([item.embedding for item in response.data])
    return all_embeddings

async def parallel_embed(documents: list[str], concurrency: int = 5) -> list[list[float]]:
    semaphore = asyncio.Semaphore(concurrency)

    async def limited_embed(batch):
        async with semaphore:
            return await async_embed_batch(batch)

    tasks = [limited_embed(documents[i:i+100]) for i in range(0, len(documents), 100)]
    results = await asyncio.gather(*tasks)
    return [emb for batch in results for emb in batch]

4. Vector Index Optimization: HNSW Parameter Tuning

PGVector's HNSW index parameters directly affect retrieval accuracy and speed.

-- Create HNSW index with tuned parameters
CREATE INDEX CONCURRENTLY idx_documents_embedding_hnsw
ON documents USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 200);

-- Set ef_search at query time
SET hnsw.ef_search = 100;

-- Rebuild index after bulk inserts
REINDEX INDEX CONCURRENTLY idx_documents_embedding_hnsw;

5. Data Quality SLA Automation

Write data quality check results to an SLA table and automatically calculate SLA compliance rates.

from datetime import datetime
import psycopg2

def record_sla(pipeline_name: str, check_name: str, score: float,
               threshold: float, passed: bool):
    conn = psycopg2.connect(dsn=os.environ["WAREHOUSE_URL"])
    cur = conn.cursor()
    cur.execute("""
        INSERT INTO data_quality_sla
        (pipeline_name, check_name, score, threshold, passed, checked_at)
        VALUES (%s, %s, %s, %s, %s, %s)
    """, (pipeline_name, check_name, score, threshold, passed, datetime.utcnow()))
    conn.commit()
    conn.close()

def get_sla_rate(pipeline_name: str, days: int = 30) -> float:
    conn = psycopg2.connect(dsn=os.environ["WAREHOUSE_URL"])
    cur = conn.cursor()
    cur.execute("""
        SELECT COUNT(*) FILTER (WHERE passed) * 100.0 / COUNT(*)
        FROM data_quality_sla
        WHERE pipeline_name = %s AND checked_at >= NOW() - INTERVAL '%s days'
    """, (pipeline_name, days))
    rate = cur.fetchone()[0]
    conn.close()
    return float(rate) if rate else 0.0

Comparison Analysis

Dimension Apache Airflow Prefect Dagster Luigi Mage
Orchestration DAG definition Functional+Decorators Asset definition Task dependency Notebook-style
Data Validation Manual integration Built-in Native Type system None Built-in
Feature Management Manual integration Manual integration Native Asset None Manual integration
Vector Pipeline Manual integration Manual integration Manual integration None Manual integration
Monitoring Basic UI Native Cloud UI Native Dagit Basic Native UI
Learning Curve Medium Low Medium-High Low Low
Community Most mature Fast-growing Growing Declining Emerging
Python Native Yes Yes Yes Yes Yes
2026 Recommendation ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐ ⭐⭐⭐⭐ ⭐⭐ ⭐⭐⭐
Best For Enterprise ETL SMB quick start Data asset intensive Simple batch Data analytics teams
Data Validation Pydantic v2 Great Expectations Pandera TFX Data Validation
Validation Speed Very fast (Rust core) Medium Fast Medium
Schema Definition Python class YAML/Code Python class Protobuf
Statistical Validation Not supported Strong Medium Strong
Custom Rules Flexible Flexible Medium Medium
Data Profiling Not supported Strong Not supported Strong
Best For Format validation Comprehensive data quality Lightweight schema TensorFlow ecosystem

  • JSON Formatter — Format JSON when debugging Airflow DAG configs and pipeline data
  • Base64 Encode — Handle vector data transmission and API authentication encoding
  • cURL to Code — Convert API debugging cURL commands to Python code for ETL extraction

Summary

Python AI data pipelines in 2026 have formed a mature methodology: Airflow orchestration is the backbone — DAGs define task dependencies, automatic retries and alerting ensure reliability; Data validation is the immune system — Pydantic intercepts format errors, Great Expectations intercepts statistical anomalies; Feature engineering is the core value — Pandas for backward compatibility, Polars for large-data acceleration; Vector pipelines are AI infrastructure — smart chunking + incremental updates ensure RAG data freshness; Monitoring is the eyes of production — Prometheus collects metrics, data quality SLAs provide quantifiable guarantees. Remember: pipelines without data quality guarantees run faster only to fail faster.


Further Reading

Try these browser-local tools — no sign-up required →

#AI数据流水线#数据预处理#特征工程#Apache Airflow#Python#2026#AI与大数据