Python AI資料流水線實戰:從ETL到特徵儲存的5種生產模式
資料品質崩塌、ETL腳本失控、特徵上線延遲一週
凌晨3點,模型訓練任務因為上游資料欄位從user_id改成了userId直接崩潰,排查到天亮;特徵工程腳本散落在10個Jupyter Notebook裡,上線一個新特徵要等資料工程師排期一週;RAG系統的向量資料永遠和源文件不同步,檢索結果驢唇不對馬嘴。2026年,Python AI資料流水線已經從「Nice to Have」變成了「Must Have」——Airflow 3.0帶來了原生async def支援,Pydantic v2的資料校驗效能提升5-50倍,Polars在特徵工程場景下比Pandas快10倍以上。
本文將從5種生產模式出發,帶你完成Airflow ETL編排→Pydantic+GE資料校驗→Pandas/Polars特徵工程→RAG向量資料流水線→Prometheus生產監控的全鏈路實戰,每一步都有完整可執行的Python程式碼。
核心收穫
- 掌握Apache Airflow 3.0的DAG編排模式,實現ETL任務的依賴管理和失敗重試
- 使用Pydantic v2 + Great Expectations構建資料校驗防線,拒絕髒資料進入流水線
- 用Pandas + Polars混合方案完成特徵工程,兼顧生態相容性和計算效能
- 搭建RAG向量資料流水線,從文件分塊到Embedding到向量儲存的完整鏈路
- 實現Prometheus + Grafana資料品質監控,流水線異常秒級告警
- 理解5種常見資料流水線陷阱及其生產級解決方案
- 掌握增量處理、快取最佳化、並行加速等進階最佳化技巧
目錄
- Architecture Overview:AI資料流水線全域架構
- Pattern 1:ETL Pipeline with Apache Airflow + Python
- Pattern 2:Data Validation with Pydantic v2 + Great Expectations
- Pattern 3:Feature Engineering Pipeline with Pandas + Polars
- Pattern 4:Vector Data Pipeline for RAG
- Pattern 5:Production Monitoring with Prometheus + Data Quality Alerts
- 5 Common Pitfalls and Solutions
- 10 Common Error Troubleshooting
- Advanced Optimization Techniques
- Comparison Analysis
- Recommended Online Tools
- Summary & Further Reading
Architecture Overview
┌─────────────────────────────────────────────────────────────────┐
│ AI Data Pipeline Architecture │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────────┐ ┌──────────────────────┐ │
│ │ Data │ │ Airflow DAG │ │ Pydantic + GE │ │
│ │ Sources │───▶│ Orchestration│───▶│ Data Validation │ │
│ │ (API/DB/ │ │ (ETL Sched) │ │ (Schema + Rules) │ │
│ │ Files) │ └──────────────┘ └──────────┬───────────┘ │
│ └──────────┘ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Feature Engineering Pipeline │ │
│ │ ┌─────────┐ ┌──────────┐ ┌──────────────────────┐ │ │
│ │ │ Pandas │──▶│ Polars │──▶│ Feature Store │ │ │
│ │ │ (Compat)│ │ (Speed) │ │ (Redis/Feast) │ │ │
│ │ └─────────┘ └──────────┘ └──────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Vector Data Pipeline (RAG) │ │
│ │ ┌──────────┐ ┌──────────┐ ┌───────────────────────┐ │ │
│ │ │ Chunking │─▶│ Embedding│─▶│ Vector Store │ │ │
│ │ │ (Split) │ │ (Model) │ │ (PGVector/Chroma) │ │ │
│ │ └──────────┘ └──────────┘ └───────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Production Monitoring │ │
│ │ ┌────────────┐ ┌───────────┐ ┌────────────────────┐ │ │
│ │ │ Prometheus │─▶│ Grafana │─▶│ AlertManager │ │ │
│ │ │ (Metrics) │ │ (Dashboard)│ │ (Notifications) │ │ │
│ │ └────────────┘ └───────────┘ └────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Pattern 1:ETL Pipeline with Apache Airflow + Python
問題場景
每天需要從3個資料庫、2個API、5個CSV檔案中抽取資料,清洗後寫入資料倉儲。手動腳本無法追蹤執行狀態,上游失敗下游不知道,重試全靠人肉。
解決方案
使用Apache Airflow 3.0編排ETL DAG,實現任務依賴、自動重試、失敗告警。
# dags/etl_pipeline.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.sensors.python import PythonSensor
default_args = {
"owner": "data-team",
"depends_on_past": False,
"email_on_failure": True,
"email": ["data-team@company.com"],
"retries": 3,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="etl_daily_pipeline",
default_args=default_args,
description="Daily ETL pipeline for AI data",
schedule="0 2 * * *",
start_date=datetime(2026, 1, 1),
catchup=False,
tags=["etl", "production"],
) as dag:
def extract_from_postgres(**context):
import psycopg2
conn = psycopg2.connect(
host="postgres-host",
database="source_db",
user="reader",
password="***",
)
cur = conn.cursor()
cur.execute("SELECT * FROM user_events WHERE created_at >= CURRENT_DATE - INTERVAL '1 day'")
rows = cur.fetchall()
conn.close()
context["ti"].xcom_push(key="postgres_rows", value=len(rows))
return rows
def extract_from_api(**context):
import requests
resp = requests.get(
"https://api.example.com/v2/data",
headers={"Authorization": "Bearer ***"},
params={"date": context["ds"]},
)
resp.raise_for_status()
data = resp.json()
context["ti"].xcom_push(key="api_records", value=len(data))
return data
def extract_from_files(**context):
import glob
import pandas as pd
files = glob.glob(f"/data/input/{context['ds']}/*.csv")
frames = [pd.read_csv(f) for f in files]
combined = pd.concat(frames, ignore_index=True) if frames else pd.DataFrame()
context["ti"].xcom_push(key="file_count", value=len(files))
return combined.to_dict("records")
def transform_data(**context):
import pandas as pd
ti = context["ti"]
pg_rows = ti.xcom_pull(task_ids="extract_postgres", key="return_value")
api_data = ti.xcom_pull(task_ids="extract_api", key="return_value")
file_data = ti.xcom_pull(task_ids="extract_files", key="return_value")
df_pg = pd.DataFrame(pg_rows)
df_api = pd.DataFrame(api_data)
df_files = pd.DataFrame(file_data)
df_all = pd.concat([df_pg, df_api, df_files], ignore_index=True)
df_all = df_all.drop_duplicates(subset=["user_id", "event_id"])
df_all = df_all.dropna(subset=["user_id", "event_type"])
df_all["event_date"] = pd.to_datetime(df_all["created_at"]).dt.date
context["ti"].xcom_push(key="total_records", value=len(df_all))
return df_all.to_dict("records")
def load_to_warehouse(**context):
import psycopg2
ti = context["ti"]
records = ti.xcom_pull(task_ids="transform", key="return_value")
conn = psycopg2.connect(
host="warehouse-host",
database="analytics_db",
user="loader",
password="***",
)
cur = conn.cursor()
cur.execute("TRUNCATE TABLE staging.user_events_daily")
insert_sql = """
INSERT INTO staging.user_events_daily
(user_id, event_id, event_type, event_date, created_at)
VALUES (%s, %s, %s, %s, %s)
"""
batch = [(r["user_id"], r["event_id"], r["event_type"],
str(r["event_date"]), r["created_at"]) for r in records[:1000]]
cur.executemany(insert_sql, batch)
conn.commit()
conn.close()
return len(batch)
def data_quality_check(**context):
ti = context["ti"]
total = ti.xcom_pull(task_ids="transform", key="total_records")
loaded = ti.xcom_pull(task_ids="load", key="return_value")
if total and loaded and loaded / total < 0.95:
raise ValueError(f"Data quality check failed: only {loaded}/{total} records loaded")
return "PASSED"
start = EmptyOperator(task_id="start")
extract_pg = PythonOperator(task_id="extract_postgres", python_callable=extract_from_postgres)
extract_api = PythonOperator(task_id="extract_api", python_callable=extract_from_api)
extract_files = PythonOperator(task_id="extract_files", python_callable=extract_from_files)
transform = PythonOperator(task_id="transform", python_callable=transform_data)
load = PythonOperator(task_id="load", python_callable=load_to_warehouse)
quality_check = PythonOperator(task_id="quality_check", python_callable=data_quality_check)
end = EmptyOperator(task_id="end")
start >> [extract_pg, extract_api, extract_files] >> transform >> load >> quality_check >> end
# 啟動Airflow
pip install apache-airflow==3.0.0
airflow db init
airflow dags list
airflow dags trigger etl_daily_pipeline
Pattern 2:Data Validation with Pydantic v2 + Great Expectations
問題場景
上游系統悄悄改了欄位型別,age從int變成了string,email格式不再校驗,髒資料一路灌入模型訓練,訓練完了才發現準確率暴跌。
解決方案
用Pydantic v2做Schema校驗,Great Expectations做統計規則校驗,雙重防線攔截髒資料。
# schemas.py
from pydantic import BaseModel, Field, EmailStr, field_validator
from datetime import datetime
from typing import Optional
class UserEventSchema(BaseModel):
user_id: str = Field(..., min_length=1, max_length=64, description="使用者唯一識別")
event_id: str = Field(..., pattern=r"^evt_[a-zA-Z0-9]{16}$", description="事件ID格式")
event_type: str = Field(..., description="事件類型")
event_date: datetime = Field(..., description="事件日期")
age: int = Field(..., ge=0, le=150, description="使用者年齡")
email: EmailStr = Field(..., description="使用者信箱")
amount: Optional[float] = Field(None, ge=0, description="金額")
@field_validator("event_type")
@classmethod
def validate_event_type(cls, v):
allowed = {"click", "purchase", "signup", "logout", "view"}
if v not in allowed:
raise ValueError(f"event_type must be one of {allowed}, got {v}")
return v
model_config = {"extra": "forbid"}
class BatchValidationResult(BaseModel):
total: int
valid: int
invalid: int
errors: list[dict]
# validation_pipeline.py
from schemas import UserEventSchema, BatchValidationResult
from great_expectations.dataset import PandasDataset
import pandas as pd
import json
def validate_with_pydantic(records: list[dict]) -> BatchValidationResult:
valid_records = []
errors = []
for i, record in enumerate(records):
try:
UserEventSchema(**record)
valid_records.append(record)
except Exception as e:
errors.append({"index": i, "record": record, "error": str(e)})
return BatchValidationResult(
total=len(records),
valid=len(valid_records),
invalid=len(errors),
errors=errors,
)
def validate_with_great_expectations(df: pd.DataFrame) -> dict:
ge_df = PandasDataset(df)
results = {}
results["row_count"] = ge_df.expect_table_row_count_to_be_between(min_value=1, max_value=10_000_000)
results["user_id_not_null"] = ge_df.expect_column_values_to_not_be_null("user_id")
results["age_range"] = ge_df.expect_column_values_to_be_between("age", min_value=0, max_value=150)
results["event_type_set"] = ge_df.expect_column_values_to_be_in_set(
"event_type", ["click", "purchase", "signup", "logout", "view"]
)
results["email_format"] = ge_df.expect_column_values_to_match_regex("email", r"^[\w\.-]+@[\w\.-]+\.\w+$")
results["amount_non_negative"] = ge_df.expect_column_values_to_be_between(
"amount", min_value=0, max_value=None
)
passed = sum(1 for r in results.values() if r["success"])
failed = len(results) - passed
return {"total_expectations": len(results), "passed": passed, "failed": failed, "details": results}
def run_validation_pipeline(records: list[dict]) -> dict:
pydantic_result = validate_with_pydantic(records)
valid_df = pd.DataFrame([r for i, r in enumerate(records) if i not in {e["index"] for e in pydantic_result.errors}])
ge_result = validate_with_great_expectations(valid_df) if not valid_df.empty else {"total_expectations": 0, "passed": 0, "failed": 0}
return {
"pydantic_validation": pydantic_result.model_dump(),
"ge_validation": ge_result,
"overall_pass": pydantic_result.invalid == 0 and ge_result.get("failed", 0) == 0,
}
if __name__ == "__main__":
test_data = [
{"user_id": "u001", "event_id": "evt_abc123def456gh7", "event_type": "click",
"event_date": "2026-06-16T10:00:00", "age": 28, "email": "user@test.com", "amount": 99.5},
{"user_id": "", "event_id": "evt_xyz789abc012ij3", "event_type": "signup",
"event_date": "2026-06-16T11:00:00", "age": 35, "email": "new@test.com", "amount": 0},
{"user_id": "u003", "event_id": "evt_bad_format", "event_type": "hack",
"event_date": "2026-06-16T12:00:00", "age": -5, "email": "not-an-email", "amount": -10},
]
result = run_validation_pipeline(test_data)
print(json.dumps(result, indent=2, default=str))
Pattern 3:Feature Engineering Pipeline with Pandas + Polars
問題場景
特徵工程腳本用Pandas寫的,資料量從100萬漲到5000萬後,groupby+agg跑30分鐘還不出結果。想遷移到Polars但存量程式碼太多,全部重寫不現實。
解決方案
Pandas處理小資料相容存量程式碼,Polars處理大資料加速計算,透過Arrow格式零拷貝互通。
# feature_pipeline.py
import pandas as pd
import polars as pl
from datetime import datetime, timedelta
import numpy as np
def generate_sample_data(n: int = 1_000_000) -> pd.DataFrame:
np.random.seed(42)
return pd.DataFrame({
"user_id": np.random.choice([f"u{i:04d}" for i in range(1000)], n),
"event_type": np.random.choice(["click", "purchase", "view", "signup"], n),
"amount": np.random.exponential(50, n).round(2),
"event_time": pd.date_range("2026-01-01", periods=n, freq="30s"),
"category": np.random.choice(["electronics", "clothing", "food", "books"], n),
})
def pandas_features(df: pd.DataFrame) -> pd.DataFrame:
user_agg = df.groupby("user_id").agg(
total_events=("event_type", "count"),
total_amount=("amount", "sum"),
avg_amount=("amount", "mean"),
unique_categories=("category", "nunique"),
first_event=("event_time", "min"),
last_event=("event_time", "max"),
)
user_agg["active_days"] = (user_agg["last_event"] - user_agg["first_event"]).dt.days + 1
user_agg["avg_daily_events"] = user_agg["total_events"] / user_agg["active_days"].clip(lower=1)
return user_agg.reset_index()
def polars_features(df: pd.DataFrame) -> pd.DataFrame:
pl_df = pl.from_pandas(df)
result = pl_df.group_by("user_id").agg([
pl.col("event_type").count().alias("total_events"),
pl.col("amount").sum().alias("total_amount"),
pl.col("amount").mean().alias("avg_amount"),
pl.col("category").n_unique().alias("unique_categories"),
pl.col("event_time").min().alias("first_event"),
pl.col("event_time").max().alias("last_event"),
]).with_columns([
(pl.col("last_event") - pl.col("first_event")).dt.total_days().add(1).alias("active_days"),
]).with_columns([
(pl.col("total_events") / pl.col("active_days").clip(lower=1)).alias("avg_daily_events"),
])
return result.to_pandas()
def time_window_features(df: pd.DataFrame, windows: list[str] = None) -> pd.DataFrame:
if windows is None:
windows = ["7d", "30d", "90d"]
pl_df = pl.from_pandas(df)
reference_date = pl_df.select(pl.col("event_time").max()).item()
all_features = []
for window in windows:
days = int(window.replace("d", ""))
cutoff = reference_date - timedelta(days=days)
window_feat = pl_df.filter(pl.col("event_time") >= cutoff).group_by("user_id").agg([
pl.col("amount").sum().alias(f"amount_sum_{window}"),
pl.col("amount").mean().alias(f"amount_mean_{window}"),
pl.col("event_type").count().alias(f"event_count_{window}"),
pl.col("category").n_unique().alias(f"category_nunique_{window}"),
])
all_features.append(window_feat)
from functools import reduce
merged = reduce(lambda a, b: a.join(b, on="user_id", how="outer"), all_features)
return merged.to_pandas()
def category_encoding_features(df: pd.DataFrame) -> pd.DataFrame:
pl_df = pl.from_pandas(df)
pivot = pl_df.group_by(["user_id", "category"]).agg([
pl.col("amount").sum().alias("category_amount"),
pl.col("event_type").count().alias("category_count"),
]).pivot(
index="user_id",
on="category",
values=["category_amount", "category_count"],
).fill_null(0)
return pivot.to_pandas()
def run_feature_pipeline():
print("Generating sample data...")
df = generate_sample_data(1_000_000)
print(f"Data shape: {df.shape}")
print("\n--- Pandas Feature Engineering ---")
start = datetime.now()
pandas_result = pandas_features(df)
print(f"Pandas time: {(datetime.now() - start).total_seconds():.2f}s")
print(f"Pandas result shape: {pandas_result.shape}")
print("\n--- Polars Feature Engineering ---")
start = datetime.now()
polars_result = polars_features(df)
print(f"Polars time: {(datetime.now() - start).total_seconds():.2f}s")
print(f"Polars result shape: {polars_result.shape}")
print("\n--- Time Window Features ---")
window_result = time_window_features(df)
print(f"Window features shape: {window_result.shape}")
print("\n--- Category Encoding Features ---")
category_result = category_encoding_features(df)
print(f"Category features shape: {category_result.shape}")
if __name__ == "__main__":
run_feature_pipeline()
Pattern 4:Vector Data Pipeline for RAG
問題場景
RAG系統上線後,文件更新了但向量沒同步,檢索出來的內容永遠是舊的。文件分塊策略太粗暴,長表格被截斷,程式碼區塊被切在中間,Embedding品質慘不忍睹。
解決方案
構建完整的向量資料流水線:智慧分塊→Embedding→向量儲存→增量同步。
# vector_pipeline.py
import hashlib
import json
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class Document:
doc_id: str
content: str
metadata: dict = field(default_factory=dict)
source: str = ""
updated_at: str = ""
@dataclass
class Chunk:
chunk_id: str
doc_id: str
content: str
embedding: Optional[list[float]] = None
metadata: dict = field(default_factory=dict)
class SmartChunker:
def __init__(self, chunk_size: int = 512, chunk_overlap: int = 64,
separators: list[str] = None):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.separators = separators or ["\n\n", "\n", "。", ".", " "]
def chunk_document(self, doc: Document) -> list[Chunk]:
text = doc.content
chunks_text = self._recursive_split(text, self.separators)
chunks = []
for i, text in enumerate(chunks_text):
if len(text.strip()) < 20:
continue
chunk_id = hashlib.md5(f"{doc.doc_id}:{i}:{text[:50]}".encode()).hexdigest()[:16]
chunks.append(Chunk(
chunk_id=chunk_id,
doc_id=doc.doc_id,
content=text,
metadata={**doc.metadata, "chunk_index": i, "source": doc.source},
))
return chunks
def _recursive_split(self, text: str, separators: list[str]) -> list[str]:
if not separators or len(text) <= self.chunk_size:
return [text] if text.strip() else []
sep = separators[0]
parts = text.split(sep)
result = []
current = ""
for part in parts:
candidate = current + sep + part if current else part
if len(candidate) <= self.chunk_size:
current = candidate
else:
if current:
result.append(current)
if len(part) > self.chunk_size:
sub_splits = self._recursive_split(part, separators[1:])
result.extend(sub_splits)
current = ""
else:
current = part
if current:
result.append(current)
return result
class EmbeddingService:
def __init__(self, model_name: str = "text-embedding-3-small", api_key: str = ""):
self.model_name = model_name
self.api_key = api_key
def embed_texts(self, texts: list[str]) -> list[list[float]]:
try:
from openai import OpenAI
client = OpenAI(api_key=self.api_key)
response = client.embeddings.create(input=texts, model=self.model_name)
return [item.embedding for item in response.data]
except ImportError:
import numpy as np
return [np.random.randn(1536).tolist() for _ in texts]
def embed_chunks(self, chunks: list[Chunk], batch_size: int = 100) -> list[Chunk]:
for i in range(0, len(chunks), batch_size):
batch = chunks[i:i + batch_size]
texts = [c.content for c in batch]
embeddings = self.embed_texts(texts)
for j, chunk in enumerate(batch):
chunk.embedding = embeddings[j]
return chunks
class VectorStore:
def __init__(self, connection_string: str = ""):
self.connection_string = connection_string
self._store: dict[str, Chunk] = {}
def upsert_chunks(self, chunks: list[Chunk]):
for chunk in chunks:
self._store[chunk.chunk_id] = chunk
def search(self, query_embedding: list[float], top_k: int = 5) -> list[Chunk]:
import numpy as np
query_vec = np.array(query_embedding)
scores = []
for chunk in self._store.values():
if chunk.embedding is None:
continue
chunk_vec = np.array(chunk.embedding)
similarity = np.dot(query_vec, chunk_vec) / (np.linalg.norm(query_vec) * np.linalg.norm(chunk_vec) + 1e-8)
scores.append((chunk, float(similarity)))
scores.sort(key=lambda x: x[1], reverse=True)
return [chunk for chunk, _ in scores[:top_k]]
def delete_by_doc_id(self, doc_id: str):
to_delete = [cid for cid, c in self._store.items() if c.doc_id == doc_id]
for cid in to_delete:
del self._store[cid]
class VectorPipeline:
def __init__(self, chunk_size: int = 512, model_name: str = "text-embedding-3-small"):
self.chunker = SmartChunker(chunk_size=chunk_size)
self.embedder = EmbeddingService(model_name=model_name)
self.store = VectorStore()
def ingest_documents(self, documents: list[Document]) -> dict:
total_chunks = 0
for doc in documents:
self.store.delete_by_doc_id(doc.doc_id)
chunks = self.chunker.chunk_document(doc)
chunks = self.embedder.embed_chunks(chunks)
self.store.upsert_chunks(chunks)
total_chunks += len(chunks)
return {"documents": len(documents), "chunks": total_chunks}
def query(self, question: str, top_k: int = 5) -> list[dict]:
query_embedding = self.embedder.embed_texts([question])[0]
results = self.store.search(query_embedding, top_k=top_k)
return [{"chunk_id": r.chunk_id, "content": r.content[:200],
"metadata": r.metadata} for r in results]
if __name__ == "__main__":
pipeline = VectorPipeline(chunk_size=256)
docs = [
Document(doc_id="doc_001", content="Python是一種廣泛使用的高階程式語言。它的設計哲學強調程式碼的可讀性。"
"Python支援多種程式設計範式,包括物件導向、命令式、函數式和程序式程式設計。"
"Python擁有一個龐大的標準函式庫,涵蓋了網路通訊、文字處理、資料庫介面等各個方面。",
metadata={"source": "wiki", "category": "programming"},
source="wikipedia"),
Document(doc_id="doc_002", content="Apache Airflow是一個開源的工作流程管理平台。"
"它允許使用者以程式設計方式編寫、排程和監控工作流程。"
"Airflow的核心概念是DAG(有向無環圖),用於定義任務之間的依賴關係。"
"2026年發布的Airflow 3.0帶來了原生async支援。",
metadata={"source": "docs", "category": "data-engineering"},
source="airflow_docs"),
]
result = pipeline.ingest_documents(docs)
print(f"Ingestion result: {result}")
query_result = pipeline.query("什麼是Python程式語言?", top_k=3)
print(f"\nQuery results: {json.dumps(query_result, indent=2, ensure_ascii=False)}")
Pattern 5:Production Monitoring with Prometheus + Data Quality Alerts
問題場景
資料流水線靜默失敗,資料延遲3小時才發現;特徵值分佈漂移沒人知道,模型推論結果越來越離譜;向量儲存的檢索延遲P99從50ms飆升到5s,使用者投訴才後知後覺。
解決方案
用Prometheus採集指標,自訂資料品質指標,Grafana視覺化,AlertManager告警。
# monitoring.py
import time
import threading
from dataclasses import dataclass, field
from typing import Callable, Optional
from prometheus_client import Counter, Gauge, Histogram, start_http_server
pipeline_runs_total = Counter(
"pipeline_runs_total",
"Total pipeline runs",
["pipeline_name", "status"],
)
pipeline_duration_seconds = Histogram(
"pipeline_duration_seconds",
"Pipeline execution duration",
["pipeline_name"],
buckets=[10, 30, 60, 120, 300, 600, 1800, 3600],
)
records_processed = Counter(
"records_processed_total",
"Total records processed",
["pipeline_name", "stage"],
)
data_quality_score = Gauge(
"data_quality_score",
"Data quality score (0-1)",
["pipeline_name", "check_name"],
)
feature_drift_score = Gauge(
"feature_drift_score",
"Feature drift score (PSI)",
["pipeline_name", "feature_name"],
)
vector_search_latency = Histogram(
"vector_search_latency_seconds",
"Vector search latency",
["pipeline_name"],
buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0],
)
@dataclass
class QualityCheck:
name: str
check_fn: Callable[[dict], float]
threshold: float = 0.8
@dataclass
class DriftAlert:
feature_name: str
psi_score: float
threshold: float = 0.2
is_drifted: bool = False
class PipelineMonitor:
def __init__(self, pipeline_name: str, metrics_port: int = 9090):
self.pipeline_name = pipeline_name
self.metrics_port = metrics_port
self._quality_checks: list[QualityCheck] = []
self._drift_threshold = 0.2
self._server_started = False
def start_metrics_server(self):
if not self._server_started:
start_http_server(self.metrics_port)
self._server_started = True
print(f"Metrics server started on port {self.metrics_port}")
def add_quality_check(self, name: str, check_fn: Callable, threshold: float = 0.8):
self._quality_checks.append(QualityCheck(name=name, check_fn=check_fn, threshold=threshold))
def record_pipeline_run(self, status: str, duration: float):
pipeline_runs_total.labels(self.pipeline_name, status).inc()
pipeline_duration_seconds.labels(self.pipeline_name).observe(duration)
def record_records_processed(self, stage: str, count: int):
records_processed.labels(self.pipeline_name, stage).inc(count)
def run_quality_checks(self, data: dict) -> dict:
results = {}
for check in self._quality_checks:
score = check.check_fn(data)
data_quality_score.labels(self.pipeline_name, check.name).set(score)
passed = score >= check.threshold
results[check.name] = {"score": score, "threshold": check.threshold, "passed": passed}
if not passed:
print(f"ALERT: Quality check '{check.name}' failed: score={score:.3f} < threshold={check.threshold}")
return results
def check_feature_drift(self, reference_stats: dict, current_stats: dict) -> list[DriftAlert]:
alerts = []
for feature_name in reference_stats:
if feature_name not in current_stats:
continue
psi = self._calculate_psi(reference_stats[feature_name], current_stats[feature_name])
feature_drift_score.labels(self.pipeline_name, feature_name).set(psi)
is_drifted = psi > self._drift_threshold
alert = DriftAlert(
feature_name=feature_name,
psi_score=psi,
threshold=self._drift_threshold,
is_drifted=is_drifted,
)
alerts.append(alert)
if is_drifted:
print(f"ALERT: Feature drift detected for '{feature_name}': PSI={psi:.4f} > threshold={self._drift_threshold}")
return alerts
@staticmethod
def _calculate_psi(expected: dict, actual: dict, bucket_count: int = 10) -> float:
import numpy as np
e_vals = np.array(list(expected.values())) if isinstance(expected, dict) else np.array(expected)
a_vals = np.array(list(actual.values())) if isinstance(actual, dict) else np.array(actual)
e_probs = e_vals / e_vals.sum() if e_vals.sum() > 0 else np.ones_like(e_vals) / len(e_vals)
a_probs = a_vals / a_vals.sum() if a_vals.sum() > 0 else np.ones_like(a_vals) / len(a_vals)
e_probs = np.clip(e_probs, 1e-6, None)
a_probs = np.clip(a_probs, 1e-6, None)
psi = float(np.sum((a_probs - e_probs) * np.log(a_probs / e_probs)))
return psi
def record_vector_search_latency(self, latency: float):
vector_search_latency.labels(self.pipeline_name).observe(latency)
if __name__ == "__main__":
monitor = PipelineMonitor("ai_data_pipeline", metrics_port=9090)
monitor.start_metrics_server()
monitor.add_quality_check(
"null_rate",
lambda d: 1.0 - d.get("null_count", 0) / max(d.get("total", 1), 1),
threshold=0.95,
)
monitor.add_quality_check(
"schema_compliance",
lambda d: d.get("valid_count", 0) / max(d.get("total", 1), 1),
threshold=0.99,
)
start_time = time.time()
monitor.record_records_processed("extract", 50000)
monitor.record_records_processed("transform", 48000)
monitor.record_records_processed("load", 47500)
duration = time.time() - start_time
monitor.record_pipeline_run("success", duration)
quality_data = {"null_count": 50, "total": 50000, "valid_count": 49800}
quality_results = monitor.run_quality_checks(quality_data)
print(f"Quality results: {quality_results}")
ref_stats = {"age": {"bins": [100, 200, 300, 250, 150]}, "amount": {"bins": [50, 150, 300, 350, 150]}}
cur_stats = {"age": {"bins": [80, 180, 350, 250, 140]}, "amount": {"bins": [50, 150, 300, 350, 150]}}
drift_alerts = monitor.check_feature_drift(
{k: v["bins"] for k, v in ref_stats.items()},
{k: v["bins"] for k, v in cur_stats.items()},
)
print(f"Drift alerts: {[(a.feature_name, a.psi_score, a.is_drifted) for a in drift_alerts]}")
print(f"\nMetrics available at http://localhost:9090/metrics")
time.sleep(300)
# prometheus.yml
global:
scrape_interval: 15s
scrape_configs:
- job_name: "data_pipeline"
static_configs:
- targets: ["localhost:9090"]
rule_files:
- "alerts.yml"
# alerts.yml
groups:
- name: data_pipeline_alerts
rules:
- alert: PipelineRunFailed
expr: increase(pipeline_runs_total{status="failed"}[5m]) > 0
for: 1m
labels:
severity: critical
annotations:
summary: "Pipeline {{ $labels.pipeline_name }} failed"
- alert: DataQualityDegraded
expr: data_quality_score < 0.8
for: 5m
labels:
severity: warning
annotations:
summary: "Data quality check {{ $labels.check_name }} below threshold"
- alert: FeatureDriftDetected
expr: feature_drift_score > 0.2
for: 10m
labels:
severity: warning
annotations:
summary: "Feature {{ $labels.feature_name }} drift detected (PSI={{ $value }})"
- alert: VectorSearchLatencyHigh
expr: histogram_quantile(0.99, rate(vector_search_latency_seconds_bucket[5m])) > 1.0
for: 5m
labels:
severity: warning
annotations:
summary: "Vector search P99 latency above 1s"
5 Common Pitfalls and Solutions
1. ETL腳本硬編碼資料庫連線資訊
問題:連線字串直接寫在DAG程式碼裡,換環境就要改程式碼,密碼洩漏到Git。
解決方案:使用Airflow Connection和Environment Variable。
from airflow.hooks.base import BaseHook
def get_connection():
conn = BaseHook.get_connection("postgres_source")
return f"postgresql://{conn.login}:{conn.password}@{conn.host}:{conn.port}/{conn.schema}"
# 或者使用環境變數
import os
db_url = os.environ["DATABASE_URL"]
2. 資料校驗只做Schema檢查不做統計檢查
問題:Schema校驗通過但資料分佈異常(如age全是999),Pydantic只管格式不管統計。
解決方案:Pydantic校驗格式 + Great Expectations校驗統計分佈,雙重防線。
# Pydantic: 格式校驗
class UserSchema(BaseModel):
age: int = Field(..., ge=0, le=150)
# Great Expectations: 統計校驗
ge_df.expect_column_mean_to_be_between("age", min_value=18, max_value=65)
ge_df.expect_column_values_to_be_in_set("country", ["CN", "US", "JP", "UK"])
3. 特徵工程不考慮資料洩漏
問題:用未來資料計算特徵(如用全量資料的均值做歸一化),訓練時表現好,上線就崩。
解決方案:嚴格使用訓練集統計量做變換,時間窗口特徵只用歷史資料。
from sklearn.preprocessing import StandardScaler
train_stats = df[df["date"] < "2026-01-01"]["amount"].agg(["mean", "std"])
df["amount_scaled"] = (df["amount"] - train_stats["mean"]) / train_stats["std"]
4. 向量資料不做增量更新
問題:每次全量重建向量索引,10萬筆文件要跑2小時,資料永遠不是最新的。
解決方案:基於文件hash的增量更新,只處理變更部分。
def incremental_update(documents: list[Document], store: VectorStore) -> dict:
updated = 0
for doc in documents:
content_hash = hashlib.md5(doc.content.encode()).hexdigest()
existing = store.get_doc_metadata(doc.doc_id)
if existing and existing.get("content_hash") == content_hash:
continue
store.delete_by_doc_id(doc.doc_id)
chunks = pipeline.chunker.chunk_document(doc)
chunks = pipeline.embedder.embed_chunks(chunks)
for chunk in chunks:
chunk.metadata["content_hash"] = content_hash
store.upsert_chunks(chunks)
updated += 1
return {"checked": len(documents), "updated": updated}
5. 監控只看Pipeline是否執行,不看資料品質
問題:Airflow顯示DAG成功,但資料量暴跌90%或特徵值全為NULL,線上模型已經廢了。
解決方案:在DAG末尾加資料品質門禁,品質不達標則阻斷下游。
def quality_gate_check(**context):
ti = context["ti"]
loaded_count = ti.xcom_pull(task_ids="load", key="return_value")
expected_min = context["params"]["expected_min_records"]
if loaded_count < expected_min:
raise ValueError(f"Quality gate failed: {loaded_count} < {expected_min}")
quality_score = run_quality_checks()
if quality_score < 0.9:
raise ValueError(f"Quality gate failed: score={quality_score:.2f}")
return "PASSED"
quality_gate = PythonOperator(
task_id="quality_gate",
python_callable=quality_gate_check,
params={"expected_min_records": 10000},
)
10 Common Error Troubleshooting
| # | 錯誤訊息 | 原因 | 解決方案 |
|---|---|---|---|
| 1 | Airflow DAG import error |
Python路徑或依賴缺失 | 檢查PYTHONPATH,確保requirements.txt完整安裝 |
| 2 | Pydantic ValidationError: field required |
上游資料欄位缺失或命名變更 | 新增Optional預設值,設定model_config = {"extra": "allow"}相容 |
| 3 | Great Expectations expectation failed |
資料分佈偏移或髒資料混入 | 檢查資料來源變更,調整expectation閾值或增加資料清洗步驟 |
| 4 | Polars SchemaError: dtype mismatch |
Pandas→Polars轉換時型別推斷錯誤 | 明確指定schema: pl.DataFrame(data, schema={"col": pl.Float64}) |
| 5 | OpenAI API RateLimitError |
Embedding請求超過API限速 | 實作batch+retry: 分批呼叫,加tenacity指數退避重試 |
| 6 | numpy.linalg.LinAlgError in cosine similarity |
向量維度不匹配或零向量 | 正規化前檢查向量範數: np.linalg.norm(v) > 1e-8 |
| 7 | Prometheus duplicate label error |
同一label組合重複註冊指標 | 使用模組級指標定義,避免在函式內重複建立Counter/Gauge |
| 8 | MemoryError on large dataset groupby |
Pandas全量聚合記憶體不足 | 切換Polars或分塊處理: pd.read_csv(chunksize=100000) |
| 9 | PSI calculation returns NaN |
分桶後某些桶頻數為0 | 加入平滑: probs = np.clip(probs, 1e-6, None) |
| 10 | Airflow task timeout |
單個Task執行時間超過限制 | 增大execution_timeout,或拆分為多個子任務平行執行 |
Advanced Optimization Techniques
1. 增量ETL:只處理變更資料
全量ETL在大資料量下不可持續,透過CDC(Change Data Capture)或時間戳記只處理增量。
def incremental_extract(last_run_time: str) -> pd.DataFrame:
conn = get_connection()
query = f"""
SELECT * FROM user_events
WHERE updated_at > '{last_run_time}'
ORDER BY updated_at
"""
return pd.read_sql(query, conn)
def get_last_run_time(pipeline_name: str) -> str:
import redis
r = redis.Redis()
return r.get(f"pipeline:{pipeline_name}:last_run") or "2026-01-01T00:00:00"
def save_last_run_time(pipeline_name: str, run_time: str):
import redis
r = redis.Redis()
r.set(f"pipeline:{pipeline_name}:last_run", run_time)
2. 特徵快取與版本管理
特徵計算結果快取到Redis,避免重複計算;版本化管理確保訓練和推論使用相同特徵。
import redis
import json
import hashlib
class FeatureCache:
def __init__(self, redis_url: str = "redis://localhost:6379/2"):
self.r = redis.from_url(redis_url)
self.ttl = 86400
def _cache_key(self, feature_name: str, entity_id: str, version: str) -> str:
raw = f"{feature_name}:{entity_id}:{version}"
return f"feat:{hashlib.md5(raw.encode()).hexdigest()}"
def get(self, feature_name: str, entity_id: str, version: str = "v1") -> dict | None:
key = self._cache_key(feature_name, entity_id, version)
cached = self.r.get(key)
return json.loads(cached) if cached else None
def set(self, feature_name: str, entity_id: str, value: dict, version: str = "v1"):
key = self._cache_key(feature_name, entity_id, version)
self.r.setex(key, self.ttl, json.dumps(value, default=str))
3. Embedding批次平行加速
單條Embedding呼叫太慢,批次+非同步平行大幅提升吞吐。
import asyncio
from openai import AsyncOpenAI
async def async_embed_batch(texts: list[str], batch_size: int = 2048,
model: str = "text-embedding-3-small") -> list[list[float]]:
client = AsyncOpenAI()
all_embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
response = await client.embeddings.create(input=batch, model=model)
all_embeddings.extend([item.embedding for item in response.data])
return all_embeddings
async def parallel_embed(documents: list[str], concurrency: int = 5) -> list[list[float]]:
semaphore = asyncio.Semaphore(concurrency)
async def limited_embed(batch):
async with semaphore:
return await async_embed_batch(batch)
tasks = [limited_embed(documents[i:i+100]) for i in range(0, len(documents), 100)]
results = await asyncio.gather(*tasks)
return [emb for batch in results for emb in batch]
4. 向量索引最佳化:HNSW參數調優
PGVector的HNSW索引參數直接影響檢索精度和速度。
-- 建立HNSW索引,調優參數
CREATE INDEX CONCURRENTLY idx_documents_embedding_hnsw
ON documents USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 200);
-- 查詢時設定ef_search
SET hnsw.ef_search = 100;
-- 增量插入後重建索引(大批量更新時)
REINDEX INDEX CONCURRENTLY idx_documents_embedding_hnsw;
5. 資料品質SLA自動化
將資料品質檢查結果寫入SLA表,自動計算SLA達標率。
from datetime import datetime
import psycopg2
def record_sla(pipeline_name: str, check_name: str, score: float,
threshold: float, passed: bool):
conn = psycopg2.connect(dsn=os.environ["WAREHOUSE_URL"])
cur = conn.cursor()
cur.execute("""
INSERT INTO data_quality_sla
(pipeline_name, check_name, score, threshold, passed, checked_at)
VALUES (%s, %s, %s, %s, %s, %s)
""", (pipeline_name, check_name, score, threshold, passed, datetime.utcnow()))
conn.commit()
conn.close()
def get_sla_rate(pipeline_name: str, days: int = 30) -> float:
conn = psycopg2.connect(dsn=os.environ["WAREHOUSE_URL"])
cur = conn.cursor()
cur.execute("""
SELECT COUNT(*) FILTER (WHERE passed) * 100.0 / COUNT(*)
FROM data_quality_sla
WHERE pipeline_name = %s AND checked_at >= NOW() - INTERVAL '%s days'
""", (pipeline_name, days))
rate = cur.fetchone()[0]
conn.close()
return float(rate) if rate else 0.0
Comparison Analysis
| 維度 | Apache Airflow | Prefect | Dagster | Luigi | Mage |
|---|---|---|---|---|---|
| 編排模式 | DAG定義式 | 函數式+裝飾器 | 資產定義式 | 任務依賴式 | Notebook式 |
| 資料校驗 | 需自行整合 | 內建校驗 | 原生Type系統 | 無 | 內建 |
| 特徵管理 | 需自行整合 | 需自行整合 | 原生Asset | 無 | 需自行整合 |
| 向量流水線 | 需自行整合 | 需自行整合 | 需自行整合 | 無 | 需自行整合 |
| 監控 | 基礎UI | 原生Cloud UI | 原生Dagit | 基礎 | 原生UI |
| 學習曲線 | 中等 | 低 | 中高 | 低 | 低 |
| 社群生態 | 最成熟 | 快速增長 | 增長中 | 衰退 | 新興 |
| Python原生 | 是 | 是 | 是 | 是 | 是 |
| 2026推薦度 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐ | ⭐⭐⭐ |
| 適用場景 | 企業級ETL | 中小專案快速啟動 | 資料資產密集型 | 簡單批次處理 | 資料分析團隊 |
| 資料校驗方案 | Pydantic v2 | Great Expectations | Pandera | TFX Data Validation |
|---|---|---|---|---|
| 校驗速度 | 極快(Rust核心) | 中等 | 快 | 中等 |
| Schema定義 | Python類別 | YAML/Code | Python類別 | Protobuf |
| 統計校驗 | 不支援 | 強 | 中等 | 強 |
| 自訂規則 | 靈活 | 靈活 | 中等 | 中等 |
| 資料剖析 | 不支援 | 強 | 不支援 | 強 |
| 適用場景 | 格式校驗 | 全面資料品質 | 輕量Schema | TensorFlow生態 |
Recommended Online Tools
- JSON格式化 — 除錯Airflow DAG設定和Pipeline資料時格式化JSON
- Base64編碼 — 處理向量資料傳輸和API認證的編碼需求
- cURL轉程式碼 — 將API除錯的cURL命令轉為Python程式碼用於ETL提取
Summary
Python AI資料流水線在2026年已經形成了成熟的方法論:Airflow編排是骨架——DAG定義任務依賴,自動重試和告警保障可靠性;資料校驗是免疫系統——Pydantic攔截格式錯誤,Great Expectations攔截統計異常;特徵工程是核心價值——Pandas相容存量程式碼,Polars加速大資料計算;向量流水線是AI基礎設施——智慧分塊+增量更新確保RAG資料新鮮度;監控是生產環境的眼睛——Prometheus採集指標,資料品質SLA量化保障。記住:沒有資料品質保障的流水線,跑得越快錯得越遠。
Further Reading
本站提供瀏覽器本地工具,免註冊即可試用 →