Python AI資料流水線實戰:從ETL到特徵儲存的5種生產模式

AI与大数据

資料品質崩塌、ETL腳本失控、特徵上線延遲一週

凌晨3點,模型訓練任務因為上游資料欄位從user_id改成了userId直接崩潰,排查到天亮;特徵工程腳本散落在10個Jupyter Notebook裡,上線一個新特徵要等資料工程師排期一週;RAG系統的向量資料永遠和源文件不同步,檢索結果驢唇不對馬嘴。2026年,Python AI資料流水線已經從「Nice to Have」變成了「Must Have」——Airflow 3.0帶來了原生async def支援,Pydantic v2的資料校驗效能提升5-50倍,Polars在特徵工程場景下比Pandas快10倍以上。

本文將從5種生產模式出發,帶你完成Airflow ETL編排→Pydantic+GE資料校驗→Pandas/Polars特徵工程→RAG向量資料流水線→Prometheus生產監控的全鏈路實戰,每一步都有完整可執行的Python程式碼。


核心收穫

  • 掌握Apache Airflow 3.0的DAG編排模式,實現ETL任務的依賴管理和失敗重試
  • 使用Pydantic v2 + Great Expectations構建資料校驗防線,拒絕髒資料進入流水線
  • 用Pandas + Polars混合方案完成特徵工程,兼顧生態相容性和計算效能
  • 搭建RAG向量資料流水線,從文件分塊到Embedding到向量儲存的完整鏈路
  • 實現Prometheus + Grafana資料品質監控,流水線異常秒級告警
  • 理解5種常見資料流水線陷阱及其生產級解決方案
  • 掌握增量處理、快取最佳化、並行加速等進階最佳化技巧

目錄

  1. Architecture Overview:AI資料流水線全域架構
  2. Pattern 1:ETL Pipeline with Apache Airflow + Python
  3. Pattern 2:Data Validation with Pydantic v2 + Great Expectations
  4. Pattern 3:Feature Engineering Pipeline with Pandas + Polars
  5. Pattern 4:Vector Data Pipeline for RAG
  6. Pattern 5:Production Monitoring with Prometheus + Data Quality Alerts
  7. 5 Common Pitfalls and Solutions
  8. 10 Common Error Troubleshooting
  9. Advanced Optimization Techniques
  10. Comparison Analysis
  11. Recommended Online Tools
  12. Summary & Further Reading

Architecture Overview

┌─────────────────────────────────────────────────────────────────┐
│                    AI Data Pipeline Architecture                 │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌──────────┐    ┌──────────────┐    ┌──────────────────────┐  │
│  │  Data     │    │  Airflow DAG │    │  Pydantic + GE       │  │
│  │  Sources  │───▶│  Orchestration│───▶│  Data Validation     │  │
│  │  (API/DB/ │    │  (ETL Sched) │    │  (Schema + Rules)    │  │
│  │   Files)  │    └──────────────┘    └──────────┬───────────┘  │
│  └──────────┘                                    │              │
│                                                   ▼              │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │              Feature Engineering Pipeline                 │  │
│  │  ┌─────────┐   ┌──────────┐   ┌──────────────────────┐ │  │
│  │  │ Pandas  │──▶│  Polars  │──▶│  Feature Store       │ │  │
│  │  │ (Compat)│   │ (Speed)  │   │  (Redis/Feast)       │ │  │
│  │  └─────────┘   └──────────┘   └──────────────────────┘ │  │
│  └──────────────────────────────────────────────────────────┘  │
│                                                   │              │
│                                                   ▼              │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │              Vector Data Pipeline (RAG)                   │  │
│  │  ┌──────────┐  ┌──────────┐  ┌───────────────────────┐ │  │
│  │  │ Chunking │─▶│ Embedding│─▶│  Vector Store         │ │  │
│  │  │ (Split)  │  │ (Model)  │  │  (PGVector/Chroma)    │ │  │
│  │  └──────────┘  └──────────┘  └───────────────────────┘ │  │
│  └──────────────────────────────────────────────────────────┘  │
│                                                   │              │
│                                                   ▼              │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │              Production Monitoring                        │  │
│  │  ┌────────────┐  ┌───────────┐  ┌────────────────────┐ │  │
│  │  │ Prometheus │─▶│  Grafana  │─▶│  AlertManager      │ │  │
│  │  │ (Metrics)  │  │ (Dashboard)│  │  (Notifications)   │ │  │
│  │  └────────────┘  └───────────┘  └────────────────────┘ │  │
│  └──────────────────────────────────────────────────────────┘  │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Pattern 1:ETL Pipeline with Apache Airflow + Python

問題場景

每天需要從3個資料庫、2個API、5個CSV檔案中抽取資料,清洗後寫入資料倉儲。手動腳本無法追蹤執行狀態,上游失敗下游不知道,重試全靠人肉。

解決方案

使用Apache Airflow 3.0編排ETL DAG,實現任務依賴、自動重試、失敗告警。

# dags/etl_pipeline.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.sensors.python import PythonSensor

default_args = {
    "owner": "data-team",
    "depends_on_past": False,
    "email_on_failure": True,
    "email": ["data-team@company.com"],
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
}

with DAG(
    dag_id="etl_daily_pipeline",
    default_args=default_args,
    description="Daily ETL pipeline for AI data",
    schedule="0 2 * * *",
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=["etl", "production"],
) as dag:

    def extract_from_postgres(**context):
        import psycopg2
        conn = psycopg2.connect(
            host="postgres-host",
            database="source_db",
            user="reader",
            password="***",
        )
        cur = conn.cursor()
        cur.execute("SELECT * FROM user_events WHERE created_at >= CURRENT_DATE - INTERVAL '1 day'")
        rows = cur.fetchall()
        conn.close()
        context["ti"].xcom_push(key="postgres_rows", value=len(rows))
        return rows

    def extract_from_api(**context):
        import requests
        resp = requests.get(
            "https://api.example.com/v2/data",
            headers={"Authorization": "Bearer ***"},
            params={"date": context["ds"]},
        )
        resp.raise_for_status()
        data = resp.json()
        context["ti"].xcom_push(key="api_records", value=len(data))
        return data

    def extract_from_files(**context):
        import glob
        import pandas as pd
        files = glob.glob(f"/data/input/{context['ds']}/*.csv")
        frames = [pd.read_csv(f) for f in files]
        combined = pd.concat(frames, ignore_index=True) if frames else pd.DataFrame()
        context["ti"].xcom_push(key="file_count", value=len(files))
        return combined.to_dict("records")

    def transform_data(**context):
        import pandas as pd
        ti = context["ti"]
        pg_rows = ti.xcom_pull(task_ids="extract_postgres", key="return_value")
        api_data = ti.xcom_pull(task_ids="extract_api", key="return_value")
        file_data = ti.xcom_pull(task_ids="extract_files", key="return_value")

        df_pg = pd.DataFrame(pg_rows)
        df_api = pd.DataFrame(api_data)
        df_files = pd.DataFrame(file_data)

        df_all = pd.concat([df_pg, df_api, df_files], ignore_index=True)
        df_all = df_all.drop_duplicates(subset=["user_id", "event_id"])
        df_all = df_all.dropna(subset=["user_id", "event_type"])
        df_all["event_date"] = pd.to_datetime(df_all["created_at"]).dt.date

        context["ti"].xcom_push(key="total_records", value=len(df_all))
        return df_all.to_dict("records")

    def load_to_warehouse(**context):
        import psycopg2
        ti = context["ti"]
        records = ti.xcom_pull(task_ids="transform", key="return_value")
        conn = psycopg2.connect(
            host="warehouse-host",
            database="analytics_db",
            user="loader",
            password="***",
        )
        cur = conn.cursor()
        cur.execute("TRUNCATE TABLE staging.user_events_daily")
        insert_sql = """
            INSERT INTO staging.user_events_daily
            (user_id, event_id, event_type, event_date, created_at)
            VALUES (%s, %s, %s, %s, %s)
        """
        batch = [(r["user_id"], r["event_id"], r["event_type"],
                  str(r["event_date"]), r["created_at"]) for r in records[:1000]]
        cur.executemany(insert_sql, batch)
        conn.commit()
        conn.close()
        return len(batch)

    def data_quality_check(**context):
        ti = context["ti"]
        total = ti.xcom_pull(task_ids="transform", key="total_records")
        loaded = ti.xcom_pull(task_ids="load", key="return_value")
        if total and loaded and loaded / total < 0.95:
            raise ValueError(f"Data quality check failed: only {loaded}/{total} records loaded")
        return "PASSED"

    start = EmptyOperator(task_id="start")
    extract_pg = PythonOperator(task_id="extract_postgres", python_callable=extract_from_postgres)
    extract_api = PythonOperator(task_id="extract_api", python_callable=extract_from_api)
    extract_files = PythonOperator(task_id="extract_files", python_callable=extract_from_files)
    transform = PythonOperator(task_id="transform", python_callable=transform_data)
    load = PythonOperator(task_id="load", python_callable=load_to_warehouse)
    quality_check = PythonOperator(task_id="quality_check", python_callable=data_quality_check)
    end = EmptyOperator(task_id="end")

    start >> [extract_pg, extract_api, extract_files] >> transform >> load >> quality_check >> end
# 啟動Airflow
pip install apache-airflow==3.0.0
airflow db init
airflow dags list
airflow dags trigger etl_daily_pipeline

Pattern 2:Data Validation with Pydantic v2 + Great Expectations

問題場景

上游系統悄悄改了欄位型別,age從int變成了string,email格式不再校驗,髒資料一路灌入模型訓練,訓練完了才發現準確率暴跌。

解決方案

用Pydantic v2做Schema校驗,Great Expectations做統計規則校驗,雙重防線攔截髒資料。

# schemas.py
from pydantic import BaseModel, Field, EmailStr, field_validator
from datetime import datetime
from typing import Optional

class UserEventSchema(BaseModel):
    user_id: str = Field(..., min_length=1, max_length=64, description="使用者唯一識別")
    event_id: str = Field(..., pattern=r"^evt_[a-zA-Z0-9]{16}$", description="事件ID格式")
    event_type: str = Field(..., description="事件類型")
    event_date: datetime = Field(..., description="事件日期")
    age: int = Field(..., ge=0, le=150, description="使用者年齡")
    email: EmailStr = Field(..., description="使用者信箱")
    amount: Optional[float] = Field(None, ge=0, description="金額")

    @field_validator("event_type")
    @classmethod
    def validate_event_type(cls, v):
        allowed = {"click", "purchase", "signup", "logout", "view"}
        if v not in allowed:
            raise ValueError(f"event_type must be one of {allowed}, got {v}")
        return v

    model_config = {"extra": "forbid"}

class BatchValidationResult(BaseModel):
    total: int
    valid: int
    invalid: int
    errors: list[dict]
# validation_pipeline.py
from schemas import UserEventSchema, BatchValidationResult
from great_expectations.dataset import PandasDataset
import pandas as pd
import json

def validate_with_pydantic(records: list[dict]) -> BatchValidationResult:
    valid_records = []
    errors = []
    for i, record in enumerate(records):
        try:
            UserEventSchema(**record)
            valid_records.append(record)
        except Exception as e:
            errors.append({"index": i, "record": record, "error": str(e)})
    return BatchValidationResult(
        total=len(records),
        valid=len(valid_records),
        invalid=len(errors),
        errors=errors,
    )

def validate_with_great_expectations(df: pd.DataFrame) -> dict:
    ge_df = PandasDataset(df)
    results = {}

    results["row_count"] = ge_df.expect_table_row_count_to_be_between(min_value=1, max_value=10_000_000)
    results["user_id_not_null"] = ge_df.expect_column_values_to_not_be_null("user_id")
    results["age_range"] = ge_df.expect_column_values_to_be_between("age", min_value=0, max_value=150)
    results["event_type_set"] = ge_df.expect_column_values_to_be_in_set(
        "event_type", ["click", "purchase", "signup", "logout", "view"]
    )
    results["email_format"] = ge_df.expect_column_values_to_match_regex("email", r"^[\w\.-]+@[\w\.-]+\.\w+$")
    results["amount_non_negative"] = ge_df.expect_column_values_to_be_between(
        "amount", min_value=0, max_value=None
    )

    passed = sum(1 for r in results.values() if r["success"])
    failed = len(results) - passed
    return {"total_expectations": len(results), "passed": passed, "failed": failed, "details": results}

def run_validation_pipeline(records: list[dict]) -> dict:
    pydantic_result = validate_with_pydantic(records)

    valid_df = pd.DataFrame([r for i, r in enumerate(records) if i not in {e["index"] for e in pydantic_result.errors}])
    ge_result = validate_with_great_expectations(valid_df) if not valid_df.empty else {"total_expectations": 0, "passed": 0, "failed": 0}

    return {
        "pydantic_validation": pydantic_result.model_dump(),
        "ge_validation": ge_result,
        "overall_pass": pydantic_result.invalid == 0 and ge_result.get("failed", 0) == 0,
    }

if __name__ == "__main__":
    test_data = [
        {"user_id": "u001", "event_id": "evt_abc123def456gh7", "event_type": "click",
         "event_date": "2026-06-16T10:00:00", "age": 28, "email": "user@test.com", "amount": 99.5},
        {"user_id": "", "event_id": "evt_xyz789abc012ij3", "event_type": "signup",
         "event_date": "2026-06-16T11:00:00", "age": 35, "email": "new@test.com", "amount": 0},
        {"user_id": "u003", "event_id": "evt_bad_format", "event_type": "hack",
         "event_date": "2026-06-16T12:00:00", "age": -5, "email": "not-an-email", "amount": -10},
    ]
    result = run_validation_pipeline(test_data)
    print(json.dumps(result, indent=2, default=str))

Pattern 3:Feature Engineering Pipeline with Pandas + Polars

問題場景

特徵工程腳本用Pandas寫的,資料量從100萬漲到5000萬後,groupby+agg跑30分鐘還不出結果。想遷移到Polars但存量程式碼太多,全部重寫不現實。

解決方案

Pandas處理小資料相容存量程式碼,Polars處理大資料加速計算,透過Arrow格式零拷貝互通。

# feature_pipeline.py
import pandas as pd
import polars as pl
from datetime import datetime, timedelta
import numpy as np

def generate_sample_data(n: int = 1_000_000) -> pd.DataFrame:
    np.random.seed(42)
    return pd.DataFrame({
        "user_id": np.random.choice([f"u{i:04d}" for i in range(1000)], n),
        "event_type": np.random.choice(["click", "purchase", "view", "signup"], n),
        "amount": np.random.exponential(50, n).round(2),
        "event_time": pd.date_range("2026-01-01", periods=n, freq="30s"),
        "category": np.random.choice(["electronics", "clothing", "food", "books"], n),
    })

def pandas_features(df: pd.DataFrame) -> pd.DataFrame:
    user_agg = df.groupby("user_id").agg(
        total_events=("event_type", "count"),
        total_amount=("amount", "sum"),
        avg_amount=("amount", "mean"),
        unique_categories=("category", "nunique"),
        first_event=("event_time", "min"),
        last_event=("event_time", "max"),
    )
    user_agg["active_days"] = (user_agg["last_event"] - user_agg["first_event"]).dt.days + 1
    user_agg["avg_daily_events"] = user_agg["total_events"] / user_agg["active_days"].clip(lower=1)
    return user_agg.reset_index()

def polars_features(df: pd.DataFrame) -> pd.DataFrame:
    pl_df = pl.from_pandas(df)

    result = pl_df.group_by("user_id").agg([
        pl.col("event_type").count().alias("total_events"),
        pl.col("amount").sum().alias("total_amount"),
        pl.col("amount").mean().alias("avg_amount"),
        pl.col("category").n_unique().alias("unique_categories"),
        pl.col("event_time").min().alias("first_event"),
        pl.col("event_time").max().alias("last_event"),
    ]).with_columns([
        (pl.col("last_event") - pl.col("first_event")).dt.total_days().add(1).alias("active_days"),
    ]).with_columns([
        (pl.col("total_events") / pl.col("active_days").clip(lower=1)).alias("avg_daily_events"),
    ])

    return result.to_pandas()

def time_window_features(df: pd.DataFrame, windows: list[str] = None) -> pd.DataFrame:
    if windows is None:
        windows = ["7d", "30d", "90d"]
    pl_df = pl.from_pandas(df)
    reference_date = pl_df.select(pl.col("event_time").max()).item()

    all_features = []
    for window in windows:
        days = int(window.replace("d", ""))
        cutoff = reference_date - timedelta(days=days)

        window_feat = pl_df.filter(pl.col("event_time") >= cutoff).group_by("user_id").agg([
            pl.col("amount").sum().alias(f"amount_sum_{window}"),
            pl.col("amount").mean().alias(f"amount_mean_{window}"),
            pl.col("event_type").count().alias(f"event_count_{window}"),
            pl.col("category").n_unique().alias(f"category_nunique_{window}"),
        ])
        all_features.append(window_feat)

    from functools import reduce
    merged = reduce(lambda a, b: a.join(b, on="user_id", how="outer"), all_features)
    return merged.to_pandas()

def category_encoding_features(df: pd.DataFrame) -> pd.DataFrame:
    pl_df = pl.from_pandas(df)
    pivot = pl_df.group_by(["user_id", "category"]).agg([
        pl.col("amount").sum().alias("category_amount"),
        pl.col("event_type").count().alias("category_count"),
    ]).pivot(
        index="user_id",
        on="category",
        values=["category_amount", "category_count"],
    ).fill_null(0)
    return pivot.to_pandas()

def run_feature_pipeline():
    print("Generating sample data...")
    df = generate_sample_data(1_000_000)
    print(f"Data shape: {df.shape}")

    print("\n--- Pandas Feature Engineering ---")
    start = datetime.now()
    pandas_result = pandas_features(df)
    print(f"Pandas time: {(datetime.now() - start).total_seconds():.2f}s")
    print(f"Pandas result shape: {pandas_result.shape}")

    print("\n--- Polars Feature Engineering ---")
    start = datetime.now()
    polars_result = polars_features(df)
    print(f"Polars time: {(datetime.now() - start).total_seconds():.2f}s")
    print(f"Polars result shape: {polars_result.shape}")

    print("\n--- Time Window Features ---")
    window_result = time_window_features(df)
    print(f"Window features shape: {window_result.shape}")

    print("\n--- Category Encoding Features ---")
    category_result = category_encoding_features(df)
    print(f"Category features shape: {category_result.shape}")

if __name__ == "__main__":
    run_feature_pipeline()

Pattern 4:Vector Data Pipeline for RAG

問題場景

RAG系統上線後,文件更新了但向量沒同步,檢索出來的內容永遠是舊的。文件分塊策略太粗暴,長表格被截斷,程式碼區塊被切在中間,Embedding品質慘不忍睹。

解決方案

構建完整的向量資料流水線:智慧分塊→Embedding→向量儲存→增量同步。

# vector_pipeline.py
import hashlib
import json
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class Document:
    doc_id: str
    content: str
    metadata: dict = field(default_factory=dict)
    source: str = ""
    updated_at: str = ""

@dataclass
class Chunk:
    chunk_id: str
    doc_id: str
    content: str
    embedding: Optional[list[float]] = None
    metadata: dict = field(default_factory=dict)

class SmartChunker:
    def __init__(self, chunk_size: int = 512, chunk_overlap: int = 64,
                 separators: list[str] = None):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.separators = separators or ["\n\n", "\n", "。", ".", " "]

    def chunk_document(self, doc: Document) -> list[Chunk]:
        text = doc.content
        chunks_text = self._recursive_split(text, self.separators)
        chunks = []
        for i, text in enumerate(chunks_text):
            if len(text.strip()) < 20:
                continue
            chunk_id = hashlib.md5(f"{doc.doc_id}:{i}:{text[:50]}".encode()).hexdigest()[:16]
            chunks.append(Chunk(
                chunk_id=chunk_id,
                doc_id=doc.doc_id,
                content=text,
                metadata={**doc.metadata, "chunk_index": i, "source": doc.source},
            ))
        return chunks

    def _recursive_split(self, text: str, separators: list[str]) -> list[str]:
        if not separators or len(text) <= self.chunk_size:
            return [text] if text.strip() else []

        sep = separators[0]
        parts = text.split(sep)

        result = []
        current = ""
        for part in parts:
            candidate = current + sep + part if current else part
            if len(candidate) <= self.chunk_size:
                current = candidate
            else:
                if current:
                    result.append(current)
                if len(part) > self.chunk_size:
                    sub_splits = self._recursive_split(part, separators[1:])
                    result.extend(sub_splits)
                    current = ""
                else:
                    current = part
        if current:
            result.append(current)
        return result

class EmbeddingService:
    def __init__(self, model_name: str = "text-embedding-3-small", api_key: str = ""):
        self.model_name = model_name
        self.api_key = api_key

    def embed_texts(self, texts: list[str]) -> list[list[float]]:
        try:
            from openai import OpenAI
            client = OpenAI(api_key=self.api_key)
            response = client.embeddings.create(input=texts, model=self.model_name)
            return [item.embedding for item in response.data]
        except ImportError:
            import numpy as np
            return [np.random.randn(1536).tolist() for _ in texts]

    def embed_chunks(self, chunks: list[Chunk], batch_size: int = 100) -> list[Chunk]:
        for i in range(0, len(chunks), batch_size):
            batch = chunks[i:i + batch_size]
            texts = [c.content for c in batch]
            embeddings = self.embed_texts(texts)
            for j, chunk in enumerate(batch):
                chunk.embedding = embeddings[j]
        return chunks

class VectorStore:
    def __init__(self, connection_string: str = ""):
        self.connection_string = connection_string
        self._store: dict[str, Chunk] = {}

    def upsert_chunks(self, chunks: list[Chunk]):
        for chunk in chunks:
            self._store[chunk.chunk_id] = chunk

    def search(self, query_embedding: list[float], top_k: int = 5) -> list[Chunk]:
        import numpy as np
        query_vec = np.array(query_embedding)
        scores = []
        for chunk in self._store.values():
            if chunk.embedding is None:
                continue
            chunk_vec = np.array(chunk.embedding)
            similarity = np.dot(query_vec, chunk_vec) / (np.linalg.norm(query_vec) * np.linalg.norm(chunk_vec) + 1e-8)
            scores.append((chunk, float(similarity)))
        scores.sort(key=lambda x: x[1], reverse=True)
        return [chunk for chunk, _ in scores[:top_k]]

    def delete_by_doc_id(self, doc_id: str):
        to_delete = [cid for cid, c in self._store.items() if c.doc_id == doc_id]
        for cid in to_delete:
            del self._store[cid]

class VectorPipeline:
    def __init__(self, chunk_size: int = 512, model_name: str = "text-embedding-3-small"):
        self.chunker = SmartChunker(chunk_size=chunk_size)
        self.embedder = EmbeddingService(model_name=model_name)
        self.store = VectorStore()

    def ingest_documents(self, documents: list[Document]) -> dict:
        total_chunks = 0
        for doc in documents:
            self.store.delete_by_doc_id(doc.doc_id)
            chunks = self.chunker.chunk_document(doc)
            chunks = self.embedder.embed_chunks(chunks)
            self.store.upsert_chunks(chunks)
            total_chunks += len(chunks)
        return {"documents": len(documents), "chunks": total_chunks}

    def query(self, question: str, top_k: int = 5) -> list[dict]:
        query_embedding = self.embedder.embed_texts([question])[0]
        results = self.store.search(query_embedding, top_k=top_k)
        return [{"chunk_id": r.chunk_id, "content": r.content[:200],
                 "metadata": r.metadata} for r in results]

if __name__ == "__main__":
    pipeline = VectorPipeline(chunk_size=256)

    docs = [
        Document(doc_id="doc_001", content="Python是一種廣泛使用的高階程式語言。它的設計哲學強調程式碼的可讀性。"
                 "Python支援多種程式設計範式,包括物件導向、命令式、函數式和程序式程式設計。"
                 "Python擁有一個龐大的標準函式庫,涵蓋了網路通訊、文字處理、資料庫介面等各個方面。",
                 metadata={"source": "wiki", "category": "programming"},
                 source="wikipedia"),
        Document(doc_id="doc_002", content="Apache Airflow是一個開源的工作流程管理平台。"
                 "它允許使用者以程式設計方式編寫、排程和監控工作流程。"
                 "Airflow的核心概念是DAG(有向無環圖),用於定義任務之間的依賴關係。"
                 "2026年發布的Airflow 3.0帶來了原生async支援。",
                 metadata={"source": "docs", "category": "data-engineering"},
                 source="airflow_docs"),
    ]

    result = pipeline.ingest_documents(docs)
    print(f"Ingestion result: {result}")

    query_result = pipeline.query("什麼是Python程式語言?", top_k=3)
    print(f"\nQuery results: {json.dumps(query_result, indent=2, ensure_ascii=False)}")

Pattern 5:Production Monitoring with Prometheus + Data Quality Alerts

問題場景

資料流水線靜默失敗,資料延遲3小時才發現;特徵值分佈漂移沒人知道,模型推論結果越來越離譜;向量儲存的檢索延遲P99從50ms飆升到5s,使用者投訴才後知後覺。

解決方案

用Prometheus採集指標,自訂資料品質指標,Grafana視覺化,AlertManager告警。

# monitoring.py
import time
import threading
from dataclasses import dataclass, field
from typing import Callable, Optional
from prometheus_client import Counter, Gauge, Histogram, start_http_server

pipeline_runs_total = Counter(
    "pipeline_runs_total",
    "Total pipeline runs",
    ["pipeline_name", "status"],
)

pipeline_duration_seconds = Histogram(
    "pipeline_duration_seconds",
    "Pipeline execution duration",
    ["pipeline_name"],
    buckets=[10, 30, 60, 120, 300, 600, 1800, 3600],
)

records_processed = Counter(
    "records_processed_total",
    "Total records processed",
    ["pipeline_name", "stage"],
)

data_quality_score = Gauge(
    "data_quality_score",
    "Data quality score (0-1)",
    ["pipeline_name", "check_name"],
)

feature_drift_score = Gauge(
    "feature_drift_score",
    "Feature drift score (PSI)",
    ["pipeline_name", "feature_name"],
)

vector_search_latency = Histogram(
    "vector_search_latency_seconds",
    "Vector search latency",
    ["pipeline_name"],
    buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0],
)

@dataclass
class QualityCheck:
    name: str
    check_fn: Callable[[dict], float]
    threshold: float = 0.8

@dataclass
class DriftAlert:
    feature_name: str
    psi_score: float
    threshold: float = 0.2
    is_drifted: bool = False

class PipelineMonitor:
    def __init__(self, pipeline_name: str, metrics_port: int = 9090):
        self.pipeline_name = pipeline_name
        self.metrics_port = metrics_port
        self._quality_checks: list[QualityCheck] = []
        self._drift_threshold = 0.2
        self._server_started = False

    def start_metrics_server(self):
        if not self._server_started:
            start_http_server(self.metrics_port)
            self._server_started = True
            print(f"Metrics server started on port {self.metrics_port}")

    def add_quality_check(self, name: str, check_fn: Callable, threshold: float = 0.8):
        self._quality_checks.append(QualityCheck(name=name, check_fn=check_fn, threshold=threshold))

    def record_pipeline_run(self, status: str, duration: float):
        pipeline_runs_total.labels(self.pipeline_name, status).inc()
        pipeline_duration_seconds.labels(self.pipeline_name).observe(duration)

    def record_records_processed(self, stage: str, count: int):
        records_processed.labels(self.pipeline_name, stage).inc(count)

    def run_quality_checks(self, data: dict) -> dict:
        results = {}
        for check in self._quality_checks:
            score = check.check_fn(data)
            data_quality_score.labels(self.pipeline_name, check.name).set(score)
            passed = score >= check.threshold
            results[check.name] = {"score": score, "threshold": check.threshold, "passed": passed}
            if not passed:
                print(f"ALERT: Quality check '{check.name}' failed: score={score:.3f} < threshold={check.threshold}")
        return results

    def check_feature_drift(self, reference_stats: dict, current_stats: dict) -> list[DriftAlert]:
        alerts = []
        for feature_name in reference_stats:
            if feature_name not in current_stats:
                continue
            psi = self._calculate_psi(reference_stats[feature_name], current_stats[feature_name])
            feature_drift_score.labels(self.pipeline_name, feature_name).set(psi)
            is_drifted = psi > self._drift_threshold
            alert = DriftAlert(
                feature_name=feature_name,
                psi_score=psi,
                threshold=self._drift_threshold,
                is_drifted=is_drifted,
            )
            alerts.append(alert)
            if is_drifted:
                print(f"ALERT: Feature drift detected for '{feature_name}': PSI={psi:.4f} > threshold={self._drift_threshold}")
        return alerts

    @staticmethod
    def _calculate_psi(expected: dict, actual: dict, bucket_count: int = 10) -> float:
        import numpy as np
        e_vals = np.array(list(expected.values())) if isinstance(expected, dict) else np.array(expected)
        a_vals = np.array(list(actual.values())) if isinstance(actual, dict) else np.array(actual)
        e_probs = e_vals / e_vals.sum() if e_vals.sum() > 0 else np.ones_like(e_vals) / len(e_vals)
        a_probs = a_vals / a_vals.sum() if a_vals.sum() > 0 else np.ones_like(a_vals) / len(a_vals)
        e_probs = np.clip(e_probs, 1e-6, None)
        a_probs = np.clip(a_probs, 1e-6, None)
        psi = float(np.sum((a_probs - e_probs) * np.log(a_probs / e_probs)))
        return psi

    def record_vector_search_latency(self, latency: float):
        vector_search_latency.labels(self.pipeline_name).observe(latency)

if __name__ == "__main__":
    monitor = PipelineMonitor("ai_data_pipeline", metrics_port=9090)
    monitor.start_metrics_server()

    monitor.add_quality_check(
        "null_rate",
        lambda d: 1.0 - d.get("null_count", 0) / max(d.get("total", 1), 1),
        threshold=0.95,
    )
    monitor.add_quality_check(
        "schema_compliance",
        lambda d: d.get("valid_count", 0) / max(d.get("total", 1), 1),
        threshold=0.99,
    )

    start_time = time.time()
    monitor.record_records_processed("extract", 50000)
    monitor.record_records_processed("transform", 48000)
    monitor.record_records_processed("load", 47500)
    duration = time.time() - start_time

    monitor.record_pipeline_run("success", duration)

    quality_data = {"null_count": 50, "total": 50000, "valid_count": 49800}
    quality_results = monitor.run_quality_checks(quality_data)
    print(f"Quality results: {quality_results}")

    ref_stats = {"age": {"bins": [100, 200, 300, 250, 150]}, "amount": {"bins": [50, 150, 300, 350, 150]}}
    cur_stats = {"age": {"bins": [80, 180, 350, 250, 140]}, "amount": {"bins": [50, 150, 300, 350, 150]}}
    drift_alerts = monitor.check_feature_drift(
        {k: v["bins"] for k, v in ref_stats.items()},
        {k: v["bins"] for k, v in cur_stats.items()},
    )
    print(f"Drift alerts: {[(a.feature_name, a.psi_score, a.is_drifted) for a in drift_alerts]}")

    print(f"\nMetrics available at http://localhost:9090/metrics")
    time.sleep(300)
# prometheus.yml
global:
  scrape_interval: 15s

scrape_configs:
  - job_name: "data_pipeline"
    static_configs:
      - targets: ["localhost:9090"]

rule_files:
  - "alerts.yml"
# alerts.yml
groups:
  - name: data_pipeline_alerts
    rules:
      - alert: PipelineRunFailed
        expr: increase(pipeline_runs_total{status="failed"}[5m]) > 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Pipeline {{ $labels.pipeline_name }} failed"

      - alert: DataQualityDegraded
        expr: data_quality_score < 0.8
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Data quality check {{ $labels.check_name }} below threshold"

      - alert: FeatureDriftDetected
        expr: feature_drift_score > 0.2
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Feature {{ $labels.feature_name }} drift detected (PSI={{ $value }})"

      - alert: VectorSearchLatencyHigh
        expr: histogram_quantile(0.99, rate(vector_search_latency_seconds_bucket[5m])) > 1.0
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Vector search P99 latency above 1s"

5 Common Pitfalls and Solutions

1. ETL腳本硬編碼資料庫連線資訊

問題:連線字串直接寫在DAG程式碼裡,換環境就要改程式碼,密碼洩漏到Git。

解決方案:使用Airflow Connection和Environment Variable。

from airflow.hooks.base import BaseHook

def get_connection():
    conn = BaseHook.get_connection("postgres_source")
    return f"postgresql://{conn.login}:{conn.password}@{conn.host}:{conn.port}/{conn.schema}"

# 或者使用環境變數
import os
db_url = os.environ["DATABASE_URL"]

2. 資料校驗只做Schema檢查不做統計檢查

問題:Schema校驗通過但資料分佈異常(如age全是999),Pydantic只管格式不管統計。

解決方案:Pydantic校驗格式 + Great Expectations校驗統計分佈,雙重防線。

# Pydantic: 格式校驗
class UserSchema(BaseModel):
    age: int = Field(..., ge=0, le=150)

# Great Expectations: 統計校驗
ge_df.expect_column_mean_to_be_between("age", min_value=18, max_value=65)
ge_df.expect_column_values_to_be_in_set("country", ["CN", "US", "JP", "UK"])

3. 特徵工程不考慮資料洩漏

問題:用未來資料計算特徵(如用全量資料的均值做歸一化),訓練時表現好,上線就崩。

解決方案:嚴格使用訓練集統計量做變換,時間窗口特徵只用歷史資料。

from sklearn.preprocessing import StandardScaler

train_stats = df[df["date"] < "2026-01-01"]["amount"].agg(["mean", "std"])
df["amount_scaled"] = (df["amount"] - train_stats["mean"]) / train_stats["std"]

4. 向量資料不做增量更新

問題:每次全量重建向量索引,10萬筆文件要跑2小時,資料永遠不是最新的。

解決方案:基於文件hash的增量更新,只處理變更部分。

def incremental_update(documents: list[Document], store: VectorStore) -> dict:
    updated = 0
    for doc in documents:
        content_hash = hashlib.md5(doc.content.encode()).hexdigest()
        existing = store.get_doc_metadata(doc.doc_id)
        if existing and existing.get("content_hash") == content_hash:
            continue
        store.delete_by_doc_id(doc.doc_id)
        chunks = pipeline.chunker.chunk_document(doc)
        chunks = pipeline.embedder.embed_chunks(chunks)
        for chunk in chunks:
            chunk.metadata["content_hash"] = content_hash
        store.upsert_chunks(chunks)
        updated += 1
    return {"checked": len(documents), "updated": updated}

5. 監控只看Pipeline是否執行,不看資料品質

問題:Airflow顯示DAG成功,但資料量暴跌90%或特徵值全為NULL,線上模型已經廢了。

解決方案:在DAG末尾加資料品質門禁,品質不達標則阻斷下游。

def quality_gate_check(**context):
    ti = context["ti"]
    loaded_count = ti.xcom_pull(task_ids="load", key="return_value")
    expected_min = context["params"]["expected_min_records"]

    if loaded_count < expected_min:
        raise ValueError(f"Quality gate failed: {loaded_count} < {expected_min}")

    quality_score = run_quality_checks()
    if quality_score < 0.9:
        raise ValueError(f"Quality gate failed: score={quality_score:.2f}")

    return "PASSED"

quality_gate = PythonOperator(
    task_id="quality_gate",
    python_callable=quality_gate_check,
    params={"expected_min_records": 10000},
)

10 Common Error Troubleshooting

# 錯誤訊息 原因 解決方案
1 Airflow DAG import error Python路徑或依賴缺失 檢查PYTHONPATH,確保requirements.txt完整安裝
2 Pydantic ValidationError: field required 上游資料欄位缺失或命名變更 新增Optional預設值,設定model_config = {"extra": "allow"}相容
3 Great Expectations expectation failed 資料分佈偏移或髒資料混入 檢查資料來源變更,調整expectation閾值或增加資料清洗步驟
4 Polars SchemaError: dtype mismatch Pandas→Polars轉換時型別推斷錯誤 明確指定schema: pl.DataFrame(data, schema={"col": pl.Float64})
5 OpenAI API RateLimitError Embedding請求超過API限速 實作batch+retry: 分批呼叫,加tenacity指數退避重試
6 numpy.linalg.LinAlgError in cosine similarity 向量維度不匹配或零向量 正規化前檢查向量範數: np.linalg.norm(v) > 1e-8
7 Prometheus duplicate label error 同一label組合重複註冊指標 使用模組級指標定義,避免在函式內重複建立Counter/Gauge
8 MemoryError on large dataset groupby Pandas全量聚合記憶體不足 切換Polars或分塊處理: pd.read_csv(chunksize=100000)
9 PSI calculation returns NaN 分桶後某些桶頻數為0 加入平滑: probs = np.clip(probs, 1e-6, None)
10 Airflow task timeout 單個Task執行時間超過限制 增大execution_timeout,或拆分為多個子任務平行執行

Advanced Optimization Techniques

1. 增量ETL:只處理變更資料

全量ETL在大資料量下不可持續,透過CDC(Change Data Capture)或時間戳記只處理增量。

def incremental_extract(last_run_time: str) -> pd.DataFrame:
    conn = get_connection()
    query = f"""
        SELECT * FROM user_events
        WHERE updated_at > '{last_run_time}'
        ORDER BY updated_at
    """
    return pd.read_sql(query, conn)

def get_last_run_time(pipeline_name: str) -> str:
    import redis
    r = redis.Redis()
    return r.get(f"pipeline:{pipeline_name}:last_run") or "2026-01-01T00:00:00"

def save_last_run_time(pipeline_name: str, run_time: str):
    import redis
    r = redis.Redis()
    r.set(f"pipeline:{pipeline_name}:last_run", run_time)

2. 特徵快取與版本管理

特徵計算結果快取到Redis,避免重複計算;版本化管理確保訓練和推論使用相同特徵。

import redis
import json
import hashlib

class FeatureCache:
    def __init__(self, redis_url: str = "redis://localhost:6379/2"):
        self.r = redis.from_url(redis_url)
        self.ttl = 86400

    def _cache_key(self, feature_name: str, entity_id: str, version: str) -> str:
        raw = f"{feature_name}:{entity_id}:{version}"
        return f"feat:{hashlib.md5(raw.encode()).hexdigest()}"

    def get(self, feature_name: str, entity_id: str, version: str = "v1") -> dict | None:
        key = self._cache_key(feature_name, entity_id, version)
        cached = self.r.get(key)
        return json.loads(cached) if cached else None

    def set(self, feature_name: str, entity_id: str, value: dict, version: str = "v1"):
        key = self._cache_key(feature_name, entity_id, version)
        self.r.setex(key, self.ttl, json.dumps(value, default=str))

3. Embedding批次平行加速

單條Embedding呼叫太慢,批次+非同步平行大幅提升吞吐。

import asyncio
from openai import AsyncOpenAI

async def async_embed_batch(texts: list[str], batch_size: int = 2048,
                            model: str = "text-embedding-3-small") -> list[list[float]]:
    client = AsyncOpenAI()
    all_embeddings = []
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i + batch_size]
        response = await client.embeddings.create(input=batch, model=model)
        all_embeddings.extend([item.embedding for item in response.data])
    return all_embeddings

async def parallel_embed(documents: list[str], concurrency: int = 5) -> list[list[float]]:
    semaphore = asyncio.Semaphore(concurrency)

    async def limited_embed(batch):
        async with semaphore:
            return await async_embed_batch(batch)

    tasks = [limited_embed(documents[i:i+100]) for i in range(0, len(documents), 100)]
    results = await asyncio.gather(*tasks)
    return [emb for batch in results for emb in batch]

4. 向量索引最佳化:HNSW參數調優

PGVector的HNSW索引參數直接影響檢索精度和速度。

-- 建立HNSW索引,調優參數
CREATE INDEX CONCURRENTLY idx_documents_embedding_hnsw
ON documents USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 200);

-- 查詢時設定ef_search
SET hnsw.ef_search = 100;

-- 增量插入後重建索引(大批量更新時)
REINDEX INDEX CONCURRENTLY idx_documents_embedding_hnsw;

5. 資料品質SLA自動化

將資料品質檢查結果寫入SLA表,自動計算SLA達標率。

from datetime import datetime
import psycopg2

def record_sla(pipeline_name: str, check_name: str, score: float,
               threshold: float, passed: bool):
    conn = psycopg2.connect(dsn=os.environ["WAREHOUSE_URL"])
    cur = conn.cursor()
    cur.execute("""
        INSERT INTO data_quality_sla
        (pipeline_name, check_name, score, threshold, passed, checked_at)
        VALUES (%s, %s, %s, %s, %s, %s)
    """, (pipeline_name, check_name, score, threshold, passed, datetime.utcnow()))
    conn.commit()
    conn.close()

def get_sla_rate(pipeline_name: str, days: int = 30) -> float:
    conn = psycopg2.connect(dsn=os.environ["WAREHOUSE_URL"])
    cur = conn.cursor()
    cur.execute("""
        SELECT COUNT(*) FILTER (WHERE passed) * 100.0 / COUNT(*)
        FROM data_quality_sla
        WHERE pipeline_name = %s AND checked_at >= NOW() - INTERVAL '%s days'
    """, (pipeline_name, days))
    rate = cur.fetchone()[0]
    conn.close()
    return float(rate) if rate else 0.0

Comparison Analysis

維度 Apache Airflow Prefect Dagster Luigi Mage
編排模式 DAG定義式 函數式+裝飾器 資產定義式 任務依賴式 Notebook式
資料校驗 需自行整合 內建校驗 原生Type系統 內建
特徵管理 需自行整合 需自行整合 原生Asset 需自行整合
向量流水線 需自行整合 需自行整合 需自行整合 需自行整合
監控 基礎UI 原生Cloud UI 原生Dagit 基礎 原生UI
學習曲線 中等 中高
社群生態 最成熟 快速增長 增長中 衰退 新興
Python原生
2026推薦度 ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐ ⭐⭐⭐⭐ ⭐⭐ ⭐⭐⭐
適用場景 企業級ETL 中小專案快速啟動 資料資產密集型 簡單批次處理 資料分析團隊
資料校驗方案 Pydantic v2 Great Expectations Pandera TFX Data Validation
校驗速度 極快(Rust核心) 中等 中等
Schema定義 Python類別 YAML/Code Python類別 Protobuf
統計校驗 不支援 中等
自訂規則 靈活 靈活 中等 中等
資料剖析 不支援 不支援
適用場景 格式校驗 全面資料品質 輕量Schema TensorFlow生態

  • JSON格式化 — 除錯Airflow DAG設定和Pipeline資料時格式化JSON
  • Base64編碼 — 處理向量資料傳輸和API認證的編碼需求
  • cURL轉程式碼 — 將API除錯的cURL命令轉為Python程式碼用於ETL提取

Summary

Python AI資料流水線在2026年已經形成了成熟的方法論:Airflow編排是骨架——DAG定義任務依賴,自動重試和告警保障可靠性;資料校驗是免疫系統——Pydantic攔截格式錯誤,Great Expectations攔截統計異常;特徵工程是核心價值——Pandas相容存量程式碼,Polars加速大資料計算;向量流水線是AI基礎設施——智慧分塊+增量更新確保RAG資料新鮮度;監控是生產環境的眼睛——Prometheus採集指標,資料品質SLA量化保障。記住:沒有資料品質保障的流水線,跑得越快錯得越遠。


Further Reading

本站提供瀏覽器本地工具,免註冊即可試用 →

#AI数据流水线#数据预处理#特征工程#Apache Airflow#Python#2026#AI与大数据