Python AI Data Pipeline: 5 Production Patterns from ETL to Feature Store
Data Quality Collapse, ETL Scripts Out of Control, Feature Deployment Delayed a Week
At 3 AM, the model training task crashes because upstream changed user_id to userId — debugging until dawn. Feature engineering scripts are scattered across 10 Jupyter Notebooks — deploying a new feature requires a week of data engineer scheduling. The RAG system's vector data is perpetually out of sync with source documents — retrieval results are completely irrelevant. In 2026, Python AI data pipelines have evolved from "Nice to Have" to "Must Have" — Airflow 3.0 brings native async def support, Pydantic v2 delivers 5-50x data validation performance improvement, and Polars outperforms Pandas by 10x+ in feature engineering scenarios.
This article covers 5 production patterns, guiding you through Airflow ETL orchestration → Pydantic + GE data validation → Pandas/Polars feature engineering → RAG vector data pipeline → Prometheus production monitoring with complete runnable Python code at every step.
Key Takeaways
- Master Apache Airflow 3.0 DAG orchestration patterns for ETL dependency management and failure retry
- Build data validation defenses with Pydantic v2 + Great Expectations to reject dirty data from entering pipelines
- Use Pandas + Polars hybrid approach for feature engineering, balancing ecosystem compatibility and computation performance
- Build a RAG vector data pipeline with the complete chain from document chunking to embedding to vector storage
- Implement Prometheus + Grafana data quality monitoring with second-level alerting for pipeline anomalies
- Understand 5 common data pipeline pitfalls and their production-grade solutions
- Master advanced optimization techniques including incremental processing, caching, and parallel acceleration
Table of Contents
- Architecture Overview: AI Data Pipeline Global Architecture
- Pattern 1: ETL Pipeline with Apache Airflow + Python
- Pattern 2: Data Validation with Pydantic v2 + Great Expectations
- Pattern 3: Feature Engineering Pipeline with Pandas + Polars
- Pattern 4: Vector Data Pipeline for RAG
- Pattern 5: Production Monitoring with Prometheus + Data Quality Alerts
- 5 Common Pitfalls and Solutions
- 10 Common Error Troubleshooting
- Advanced Optimization Techniques
- Comparison Analysis
- Recommended Online Tools
- Summary & Further Reading
Architecture Overview
┌─────────────────────────────────────────────────────────────────┐
│ AI Data Pipeline Architecture │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────────┐ ┌──────────────────────┐ │
│ │ Data │ │ Airflow DAG │ │ Pydantic + GE │ │
│ │ Sources │───▶│ Orchestration│───▶│ Data Validation │ │
│ │ (API/DB/ │ │ (ETL Sched) │ │ (Schema + Rules) │ │
│ │ Files) │ └──────────────┘ └──────────┬───────────┘ │
│ └──────────┘ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Feature Engineering Pipeline │ │
│ │ ┌─────────┐ ┌──────────┐ ┌──────────────────────┐ │ │
│ │ │ Pandas │──▶│ Polars │──▶│ Feature Store │ │ │
│ │ │ (Compat)│ │ (Speed) │ │ (Redis/Feast) │ │ │
│ │ └─────────┘ └──────────┘ └──────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Vector Data Pipeline (RAG) │ │
│ │ ┌──────────┐ ┌──────────┐ ┌───────────────────────┐ │ │
│ │ │ Chunking │─▶│ Embedding│─▶│ Vector Store │ │ │
│ │ │ (Split) │ │ (Model) │ │ (PGVector/Chroma) │ │ │
│ │ └──────────┘ └──────────┘ └───────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Production Monitoring │ │
│ │ ┌────────────┐ ┌───────────┐ ┌────────────────────┐ │ │
│ │ │ Prometheus │─▶│ Grafana │─▶│ AlertManager │ │ │
│ │ │ (Metrics) │ │ (Dashboard)│ │ (Notifications) │ │ │
│ │ └────────────┘ └───────────┘ └────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Pattern 1: ETL Pipeline with Apache Airflow + Python
Problem Scenario
Every day you need to extract data from 3 databases, 2 APIs, and 5 CSV files, then clean and load into the data warehouse. Manual scripts can't track execution status, upstream failures go unnoticed downstream, and retries are entirely manual.
Solution
Use Apache Airflow 3.0 to orchestrate ETL DAGs with task dependencies, automatic retries, and failure alerting.
# dags/etl_pipeline.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.sensors.python import PythonSensor
default_args = {
"owner": "data-team",
"depends_on_past": False,
"email_on_failure": True,
"email": ["data-team@company.com"],
"retries": 3,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="etl_daily_pipeline",
default_args=default_args,
description="Daily ETL pipeline for AI data",
schedule="0 2 * * *",
start_date=datetime(2026, 1, 1),
catchup=False,
tags=["etl", "production"],
) as dag:
def extract_from_postgres(**context):
import psycopg2
conn = psycopg2.connect(
host="postgres-host",
database="source_db",
user="reader",
password="***",
)
cur = conn.cursor()
cur.execute("SELECT * FROM user_events WHERE created_at >= CURRENT_DATE - INTERVAL '1 day'")
rows = cur.fetchall()
conn.close()
context["ti"].xcom_push(key="postgres_rows", value=len(rows))
return rows
def extract_from_api(**context):
import requests
resp = requests.get(
"https://api.example.com/v2/data",
headers={"Authorization": "Bearer ***"},
params={"date": context["ds"]},
)
resp.raise_for_status()
data = resp.json()
context["ti"].xcom_push(key="api_records", value=len(data))
return data
def extract_from_files(**context):
import glob
import pandas as pd
files = glob.glob(f"/data/input/{context['ds']}/*.csv")
frames = [pd.read_csv(f) for f in files]
combined = pd.concat(frames, ignore_index=True) if frames else pd.DataFrame()
context["ti"].xcom_push(key="file_count", value=len(files))
return combined.to_dict("records")
def transform_data(**context):
import pandas as pd
ti = context["ti"]
pg_rows = ti.xcom_pull(task_ids="extract_postgres", key="return_value")
api_data = ti.xcom_pull(task_ids="extract_api", key="return_value")
file_data = ti.xcom_pull(task_ids="extract_files", key="return_value")
df_pg = pd.DataFrame(pg_rows)
df_api = pd.DataFrame(api_data)
df_files = pd.DataFrame(file_data)
df_all = pd.concat([df_pg, df_api, df_files], ignore_index=True)
df_all = df_all.drop_duplicates(subset=["user_id", "event_id"])
df_all = df_all.dropna(subset=["user_id", "event_type"])
df_all["event_date"] = pd.to_datetime(df_all["created_at"]).dt.date
context["ti"].xcom_push(key="total_records", value=len(df_all))
return df_all.to_dict("records")
def load_to_warehouse(**context):
import psycopg2
ti = context["ti"]
records = ti.xcom_pull(task_ids="transform", key="return_value")
conn = psycopg2.connect(
host="warehouse-host",
database="analytics_db",
user="loader",
password="***",
)
cur = conn.cursor()
cur.execute("TRUNCATE TABLE staging.user_events_daily")
insert_sql = """
INSERT INTO staging.user_events_daily
(user_id, event_id, event_type, event_date, created_at)
VALUES (%s, %s, %s, %s, %s)
"""
batch = [(r["user_id"], r["event_id"], r["event_type"],
str(r["event_date"]), r["created_at"]) for r in records[:1000]]
cur.executemany(insert_sql, batch)
conn.commit()
conn.close()
return len(batch)
def data_quality_check(**context):
ti = context["ti"]
total = ti.xcom_pull(task_ids="transform", key="total_records")
loaded = ti.xcom_pull(task_ids="load", key="return_value")
if total and loaded and loaded / total < 0.95:
raise ValueError(f"Data quality check failed: only {loaded}/{total} records loaded")
return "PASSED"
start = EmptyOperator(task_id="start")
extract_pg = PythonOperator(task_id="extract_postgres", python_callable=extract_from_postgres)
extract_api = PythonOperator(task_id="extract_api", python_callable=extract_from_api)
extract_files = PythonOperator(task_id="extract_files", python_callable=extract_from_files)
transform = PythonOperator(task_id="transform", python_callable=transform_data)
load = PythonOperator(task_id="load", python_callable=load_to_warehouse)
quality_check = PythonOperator(task_id="quality_check", python_callable=data_quality_check)
end = EmptyOperator(task_id="end")
start >> [extract_pg, extract_api, extract_files] >> transform >> load >> quality_check >> end
# Start Airflow
pip install apache-airflow==3.0.0
airflow db init
airflow dags list
airflow dags trigger etl_daily_pipeline
Pattern 2: Data Validation with Pydantic v2 + Great Expectations
Problem Scenario
Upstream systems silently change field types — age from int to string, email format no longer validated — dirty data flows straight into model training, and accuracy plummets before anyone notices.
Solution
Use Pydantic v2 for schema validation and Great Expectations for statistical rule validation — a double defense line against dirty data.
# schemas.py
from pydantic import BaseModel, Field, EmailStr, field_validator
from datetime import datetime
from typing import Optional
class UserEventSchema(BaseModel):
user_id: str = Field(..., min_length=1, max_length=64, description="Unique user identifier")
event_id: str = Field(..., pattern=r"^evt_[a-zA-Z0-9]{16}$", description="Event ID format")
event_type: str = Field(..., description="Event type")
event_date: datetime = Field(..., description="Event date")
age: int = Field(..., ge=0, le=150, description="User age")
email: EmailStr = Field(..., description="User email")
amount: Optional[float] = Field(None, ge=0, description="Amount")
@field_validator("event_type")
@classmethod
def validate_event_type(cls, v):
allowed = {"click", "purchase", "signup", "logout", "view"}
if v not in allowed:
raise ValueError(f"event_type must be one of {allowed}, got {v}")
return v
model_config = {"extra": "forbid"}
class BatchValidationResult(BaseModel):
total: int
valid: int
invalid: int
errors: list[dict]
# validation_pipeline.py
from schemas import UserEventSchema, BatchValidationResult
from great_expectations.dataset import PandasDataset
import pandas as pd
import json
def validate_with_pydantic(records: list[dict]) -> BatchValidationResult:
valid_records = []
errors = []
for i, record in enumerate(records):
try:
UserEventSchema(**record)
valid_records.append(record)
except Exception as e:
errors.append({"index": i, "record": record, "error": str(e)})
return BatchValidationResult(
total=len(records),
valid=len(valid_records),
invalid=len(errors),
errors=errors,
)
def validate_with_great_expectations(df: pd.DataFrame) -> dict:
ge_df = PandasDataset(df)
results = {}
results["row_count"] = ge_df.expect_table_row_count_to_be_between(min_value=1, max_value=10_000_000)
results["user_id_not_null"] = ge_df.expect_column_values_to_not_be_null("user_id")
results["age_range"] = ge_df.expect_column_values_to_be_between("age", min_value=0, max_value=150)
results["event_type_set"] = ge_df.expect_column_values_to_be_in_set(
"event_type", ["click", "purchase", "signup", "logout", "view"]
)
results["email_format"] = ge_df.expect_column_values_to_match_regex("email", r"^[\w\.-]+@[\w\.-]+\.\w+$")
results["amount_non_negative"] = ge_df.expect_column_values_to_be_between(
"amount", min_value=0, max_value=None
)
passed = sum(1 for r in results.values() if r["success"])
failed = len(results) - passed
return {"total_expectations": len(results), "passed": passed, "failed": failed, "details": results}
def run_validation_pipeline(records: list[dict]) -> dict:
pydantic_result = validate_with_pydantic(records)
valid_df = pd.DataFrame([r for i, r in enumerate(records) if i not in {e["index"] for e in pydantic_result.errors}])
ge_result = validate_with_great_expectations(valid_df) if not valid_df.empty else {"total_expectations": 0, "passed": 0, "failed": 0}
return {
"pydantic_validation": pydantic_result.model_dump(),
"ge_validation": ge_result,
"overall_pass": pydantic_result.invalid == 0 and ge_result.get("failed", 0) == 0,
}
if __name__ == "__main__":
test_data = [
{"user_id": "u001", "event_id": "evt_abc123def456gh7", "event_type": "click",
"event_date": "2026-06-16T10:00:00", "age": 28, "email": "user@test.com", "amount": 99.5},
{"user_id": "", "event_id": "evt_xyz789abc012ij3", "event_type": "signup",
"event_date": "2026-06-16T11:00:00", "age": 35, "email": "new@test.com", "amount": 0},
{"user_id": "u003", "event_id": "evt_bad_format", "event_type": "hack",
"event_date": "2026-06-16T12:00:00", "age": -5, "email": "not-an-email", "amount": -10},
]
result = run_validation_pipeline(test_data)
print(json.dumps(result, indent=2, default=str))
Pattern 3: Feature Engineering Pipeline with Pandas + Polars
Problem Scenario
Feature engineering scripts are written in Pandas. When data grows from 1M to 50M rows, groupby + agg takes 30 minutes without returning results. Migrating to Polars is desirable but existing code is too extensive for a complete rewrite.
Solution
Pandas handles small data for backward compatibility, Polars handles large data for speed — zero-copy interchange via Arrow format.
# feature_pipeline.py
import pandas as pd
import polars as pl
from datetime import datetime, timedelta
import numpy as np
def generate_sample_data(n: int = 1_000_000) -> pd.DataFrame:
np.random.seed(42)
return pd.DataFrame({
"user_id": np.random.choice([f"u{i:04d}" for i in range(1000)], n),
"event_type": np.random.choice(["click", "purchase", "view", "signup"], n),
"amount": np.random.exponential(50, n).round(2),
"event_time": pd.date_range("2026-01-01", periods=n, freq="30s"),
"category": np.random.choice(["electronics", "clothing", "food", "books"], n),
})
def pandas_features(df: pd.DataFrame) -> pd.DataFrame:
user_agg = df.groupby("user_id").agg(
total_events=("event_type", "count"),
total_amount=("amount", "sum"),
avg_amount=("amount", "mean"),
unique_categories=("category", "nunique"),
first_event=("event_time", "min"),
last_event=("event_time", "max"),
)
user_agg["active_days"] = (user_agg["last_event"] - user_agg["first_event"]).dt.days + 1
user_agg["avg_daily_events"] = user_agg["total_events"] / user_agg["active_days"].clip(lower=1)
return user_agg.reset_index()
def polars_features(df: pd.DataFrame) -> pd.DataFrame:
pl_df = pl.from_pandas(df)
result = pl_df.group_by("user_id").agg([
pl.col("event_type").count().alias("total_events"),
pl.col("amount").sum().alias("total_amount"),
pl.col("amount").mean().alias("avg_amount"),
pl.col("category").n_unique().alias("unique_categories"),
pl.col("event_time").min().alias("first_event"),
pl.col("event_time").max().alias("last_event"),
]).with_columns([
(pl.col("last_event") - pl.col("first_event")).dt.total_days().add(1).alias("active_days"),
]).with_columns([
(pl.col("total_events") / pl.col("active_days").clip(lower=1)).alias("avg_daily_events"),
])
return result.to_pandas()
def time_window_features(df: pd.DataFrame, windows: list[str] = None) -> pd.DataFrame:
if windows is None:
windows = ["7d", "30d", "90d"]
pl_df = pl.from_pandas(df)
reference_date = pl_df.select(pl.col("event_time").max()).item()
all_features = []
for window in windows:
days = int(window.replace("d", ""))
cutoff = reference_date - timedelta(days=days)
window_feat = pl_df.filter(pl.col("event_time") >= cutoff).group_by("user_id").agg([
pl.col("amount").sum().alias(f"amount_sum_{window}"),
pl.col("amount").mean().alias(f"amount_mean_{window}"),
pl.col("event_type").count().alias(f"event_count_{window}"),
pl.col("category").n_unique().alias(f"category_nunique_{window}"),
])
all_features.append(window_feat)
from functools import reduce
merged = reduce(lambda a, b: a.join(b, on="user_id", how="outer"), all_features)
return merged.to_pandas()
def category_encoding_features(df: pd.DataFrame) -> pd.DataFrame:
pl_df = pl.from_pandas(df)
pivot = pl_df.group_by(["user_id", "category"]).agg([
pl.col("amount").sum().alias("category_amount"),
pl.col("event_type").count().alias("category_count"),
]).pivot(
index="user_id",
on="category",
values=["category_amount", "category_count"],
).fill_null(0)
return pivot.to_pandas()
def run_feature_pipeline():
print("Generating sample data...")
df = generate_sample_data(1_000_000)
print(f"Data shape: {df.shape}")
print("\n--- Pandas Feature Engineering ---")
start = datetime.now()
pandas_result = pandas_features(df)
print(f"Pandas time: {(datetime.now() - start).total_seconds():.2f}s")
print(f"Pandas result shape: {pandas_result.shape}")
print("\n--- Polars Feature Engineering ---")
start = datetime.now()
polars_result = polars_features(df)
print(f"Polars time: {(datetime.now() - start).total_seconds():.2f}s")
print(f"Polars result shape: {polars_result.shape}")
print("\n--- Time Window Features ---")
window_result = time_window_features(df)
print(f"Window features shape: {window_result.shape}")
print("\n--- Category Encoding Features ---")
category_result = category_encoding_features(df)
print(f"Category features shape: {category_result.shape}")
if __name__ == "__main__":
run_feature_pipeline()
Pattern 4: Vector Data Pipeline for RAG
Problem Scenario
After the RAG system goes live, documents are updated but vectors aren't synced — retrieval results are always stale. The chunking strategy is too crude — long tables get truncated, code blocks are split in the middle, and embedding quality is abysmal.
Solution
Build a complete vector data pipeline: smart chunking → embedding → vector storage → incremental sync.
# vector_pipeline.py
import hashlib
import json
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class Document:
doc_id: str
content: str
metadata: dict = field(default_factory=dict)
source: str = ""
updated_at: str = ""
@dataclass
class Chunk:
chunk_id: str
doc_id: str
content: str
embedding: Optional[list[float]] = None
metadata: dict = field(default_factory=dict)
class SmartChunker:
def __init__(self, chunk_size: int = 512, chunk_overlap: int = 64,
separators: list[str] = None):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.separators = separators or ["\n\n", "\n", ". ", ". ", " "]
def chunk_document(self, doc: Document) -> list[Chunk]:
text = doc.content
chunks_text = self._recursive_split(text, self.separators)
chunks = []
for i, text in enumerate(chunks_text):
if len(text.strip()) < 20:
continue
chunk_id = hashlib.md5(f"{doc.doc_id}:{i}:{text[:50]}".encode()).hexdigest()[:16]
chunks.append(Chunk(
chunk_id=chunk_id,
doc_id=doc.doc_id,
content=text,
metadata={**doc.metadata, "chunk_index": i, "source": doc.source},
))
return chunks
def _recursive_split(self, text: str, separators: list[str]) -> list[str]:
if not separators or len(text) <= self.chunk_size:
return [text] if text.strip() else []
sep = separators[0]
parts = text.split(sep)
result = []
current = ""
for part in parts:
candidate = current + sep + part if current else part
if len(candidate) <= self.chunk_size:
current = candidate
else:
if current:
result.append(current)
if len(part) > self.chunk_size:
sub_splits = self._recursive_split(part, separators[1:])
result.extend(sub_splits)
current = ""
else:
current = part
if current:
result.append(current)
return result
class EmbeddingService:
def __init__(self, model_name: str = "text-embedding-3-small", api_key: str = ""):
self.model_name = model_name
self.api_key = api_key
def embed_texts(self, texts: list[str]) -> list[list[float]]:
try:
from openai import OpenAI
client = OpenAI(api_key=self.api_key)
response = client.embeddings.create(input=texts, model=self.model_name)
return [item.embedding for item in response.data]
except ImportError:
import numpy as np
return [np.random.randn(1536).tolist() for _ in texts]
def embed_chunks(self, chunks: list[Chunk], batch_size: int = 100) -> list[Chunk]:
for i in range(0, len(chunks), batch_size):
batch = chunks[i:i + batch_size]
texts = [c.content for c in batch]
embeddings = self.embed_texts(texts)
for j, chunk in enumerate(batch):
chunk.embedding = embeddings[j]
return chunks
class VectorStore:
def __init__(self, connection_string: str = ""):
self.connection_string = connection_string
self._store: dict[str, Chunk] = {}
def upsert_chunks(self, chunks: list[Chunk]):
for chunk in chunks:
self._store[chunk.chunk_id] = chunk
def search(self, query_embedding: list[float], top_k: int = 5) -> list[Chunk]:
import numpy as np
query_vec = np.array(query_embedding)
scores = []
for chunk in self._store.values():
if chunk.embedding is None:
continue
chunk_vec = np.array(chunk.embedding)
similarity = np.dot(query_vec, chunk_vec) / (np.linalg.norm(query_vec) * np.linalg.norm(chunk_vec) + 1e-8)
scores.append((chunk, float(similarity)))
scores.sort(key=lambda x: x[1], reverse=True)
return [chunk for chunk, _ in scores[:top_k]]
def delete_by_doc_id(self, doc_id: str):
to_delete = [cid for cid, c in self._store.items() if c.doc_id == doc_id]
for cid in to_delete:
del self._store[cid]
class VectorPipeline:
def __init__(self, chunk_size: int = 512, model_name: str = "text-embedding-3-small"):
self.chunker = SmartChunker(chunk_size=chunk_size)
self.embedder = EmbeddingService(model_name=model_name)
self.store = VectorStore()
def ingest_documents(self, documents: list[Document]) -> dict:
total_chunks = 0
for doc in documents:
self.store.delete_by_doc_id(doc.doc_id)
chunks = self.chunker.chunk_document(doc)
chunks = self.embedder.embed_chunks(chunks)
self.store.upsert_chunks(chunks)
total_chunks += len(chunks)
return {"documents": len(documents), "chunks": total_chunks}
def query(self, question: str, top_k: int = 5) -> list[dict]:
query_embedding = self.embedder.embed_texts([question])[0]
results = self.store.search(query_embedding, top_k=top_k)
return [{"chunk_id": r.chunk_id, "content": r.content[:200],
"metadata": r.metadata} for r in results]
if __name__ == "__main__":
pipeline = VectorPipeline(chunk_size=256)
docs = [
Document(doc_id="doc_001", content="Python is a widely used high-level programming language. "
"Its design philosophy emphasizes code readability. "
"Python supports multiple programming paradigms, including object-oriented, imperative, "
"functional, and procedural programming. "
"Python has a large standard library covering networking, text processing, database interfaces, and more.",
metadata={"source": "wiki", "category": "programming"},
source="wikipedia"),
Document(doc_id="doc_002", content="Apache Airflow is an open-source workflow management platform. "
"It allows users to programmatically author, schedule, and monitor workflows. "
"Airflow's core concept is the DAG (Directed Acyclic Graph), which defines dependencies between tasks. "
"Airflow 3.0 released in 2026 brings native async support.",
metadata={"source": "docs", "category": "data-engineering"},
source="airflow_docs"),
]
result = pipeline.ingest_documents(docs)
print(f"Ingestion result: {result}")
query_result = pipeline.query("What is the Python programming language?", top_k=3)
print(f"\nQuery results: {json.dumps(query_result, indent=2)}")
Pattern 5: Production Monitoring with Prometheus + Data Quality Alerts
Problem Scenario
Data pipelines fail silently — data lag of 3 hours goes unnoticed. Feature value distribution drifts without anyone knowing, and model inference results become increasingly unreliable. Vector store search latency P99 spikes from 50ms to 5s, discovered only after user complaints.
Solution
Use Prometheus to collect metrics, define custom data quality metrics, visualize with Grafana, and alert with AlertManager.
# monitoring.py
import time
import threading
from dataclasses import dataclass, field
from typing import Callable, Optional
from prometheus_client import Counter, Gauge, Histogram, start_http_server
pipeline_runs_total = Counter(
"pipeline_runs_total",
"Total pipeline runs",
["pipeline_name", "status"],
)
pipeline_duration_seconds = Histogram(
"pipeline_duration_seconds",
"Pipeline execution duration",
["pipeline_name"],
buckets=[10, 30, 60, 120, 300, 600, 1800, 3600],
)
records_processed = Counter(
"records_processed_total",
"Total records processed",
["pipeline_name", "stage"],
)
data_quality_score = Gauge(
"data_quality_score",
"Data quality score (0-1)",
["pipeline_name", "check_name"],
)
feature_drift_score = Gauge(
"feature_drift_score",
"Feature drift score (PSI)",
["pipeline_name", "feature_name"],
)
vector_search_latency = Histogram(
"vector_search_latency_seconds",
"Vector search latency",
["pipeline_name"],
buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0],
)
@dataclass
class QualityCheck:
name: str
check_fn: Callable[[dict], float]
threshold: float = 0.8
@dataclass
class DriftAlert:
feature_name: str
psi_score: float
threshold: float = 0.2
is_drifted: bool = False
class PipelineMonitor:
def __init__(self, pipeline_name: str, metrics_port: int = 9090):
self.pipeline_name = pipeline_name
self.metrics_port = metrics_port
self._quality_checks: list[QualityCheck] = []
self._drift_threshold = 0.2
self._server_started = False
def start_metrics_server(self):
if not self._server_started:
start_http_server(self.metrics_port)
self._server_started = True
print(f"Metrics server started on port {self.metrics_port}")
def add_quality_check(self, name: str, check_fn: Callable, threshold: float = 0.8):
self._quality_checks.append(QualityCheck(name=name, check_fn=check_fn, threshold=threshold))
def record_pipeline_run(self, status: str, duration: float):
pipeline_runs_total.labels(self.pipeline_name, status).inc()
pipeline_duration_seconds.labels(self.pipeline_name).observe(duration)
def record_records_processed(self, stage: str, count: int):
records_processed.labels(self.pipeline_name, stage).inc(count)
def run_quality_checks(self, data: dict) -> dict:
results = {}
for check in self._quality_checks:
score = check.check_fn(data)
data_quality_score.labels(self.pipeline_name, check.name).set(score)
passed = score >= check.threshold
results[check.name] = {"score": score, "threshold": check.threshold, "passed": passed}
if not passed:
print(f"ALERT: Quality check '{check.name}' failed: score={score:.3f} < threshold={check.threshold}")
return results
def check_feature_drift(self, reference_stats: dict, current_stats: dict) -> list[DriftAlert]:
alerts = []
for feature_name in reference_stats:
if feature_name not in current_stats:
continue
psi = self._calculate_psi(reference_stats[feature_name], current_stats[feature_name])
feature_drift_score.labels(self.pipeline_name, feature_name).set(psi)
is_drifted = psi > self._drift_threshold
alert = DriftAlert(
feature_name=feature_name,
psi_score=psi,
threshold=self._drift_threshold,
is_drifted=is_drifted,
)
alerts.append(alert)
if is_drifted:
print(f"ALERT: Feature drift detected for '{feature_name}': PSI={psi:.4f} > threshold={self._drift_threshold}")
return alerts
@staticmethod
def _calculate_psi(expected: dict, actual: dict, bucket_count: int = 10) -> float:
import numpy as np
e_vals = np.array(list(expected.values())) if isinstance(expected, dict) else np.array(expected)
a_vals = np.array(list(actual.values())) if isinstance(actual, dict) else np.array(actual)
e_probs = e_vals / e_vals.sum() if e_vals.sum() > 0 else np.ones_like(e_vals) / len(e_vals)
a_probs = a_vals / a_vals.sum() if a_vals.sum() > 0 else np.ones_like(a_vals) / len(a_vals)
e_probs = np.clip(e_probs, 1e-6, None)
a_probs = np.clip(a_probs, 1e-6, None)
psi = float(np.sum((a_probs - e_probs) * np.log(a_probs / e_probs)))
return psi
def record_vector_search_latency(self, latency: float):
vector_search_latency.labels(self.pipeline_name).observe(latency)
if __name__ == "__main__":
monitor = PipelineMonitor("ai_data_pipeline", metrics_port=9090)
monitor.start_metrics_server()
monitor.add_quality_check(
"null_rate",
lambda d: 1.0 - d.get("null_count", 0) / max(d.get("total", 1), 1),
threshold=0.95,
)
monitor.add_quality_check(
"schema_compliance",
lambda d: d.get("valid_count", 0) / max(d.get("total", 1), 1),
threshold=0.99,
)
start_time = time.time()
monitor.record_records_processed("extract", 50000)
monitor.record_records_processed("transform", 48000)
monitor.record_records_processed("load", 47500)
duration = time.time() - start_time
monitor.record_pipeline_run("success", duration)
quality_data = {"null_count": 50, "total": 50000, "valid_count": 49800}
quality_results = monitor.run_quality_checks(quality_data)
print(f"Quality results: {quality_results}")
ref_stats = {"age": {"bins": [100, 200, 300, 250, 150]}, "amount": {"bins": [50, 150, 300, 350, 150]}}
cur_stats = {"age": {"bins": [80, 180, 350, 250, 140]}, "amount": {"bins": [50, 150, 300, 350, 150]}}
drift_alerts = monitor.check_feature_drift(
{k: v["bins"] for k, v in ref_stats.items()},
{k: v["bins"] for k, v in cur_stats.items()},
)
print(f"Drift alerts: {[(a.feature_name, a.psi_score, a.is_drifted) for a in drift_alerts]}")
print(f"\nMetrics available at http://localhost:9090/metrics")
time.sleep(300)
# prometheus.yml
global:
scrape_interval: 15s
scrape_configs:
- job_name: "data_pipeline"
static_configs:
- targets: ["localhost:9090"]
rule_files:
- "alerts.yml"
# alerts.yml
groups:
- name: data_pipeline_alerts
rules:
- alert: PipelineRunFailed
expr: increase(pipeline_runs_total{status="failed"}[5m]) > 0
for: 1m
labels:
severity: critical
annotations:
summary: "Pipeline {{ $labels.pipeline_name }} failed"
- alert: DataQualityDegraded
expr: data_quality_score < 0.8
for: 5m
labels:
severity: warning
annotations:
summary: "Data quality check {{ $labels.check_name }} below threshold"
- alert: FeatureDriftDetected
expr: feature_drift_score > 0.2
for: 10m
labels:
severity: warning
annotations:
summary: "Feature {{ $labels.feature_name }} drift detected (PSI={{ $value }})"
- alert: VectorSearchLatencyHigh
expr: histogram_quantile(0.99, rate(vector_search_latency_seconds_bucket[5m])) > 1.0
for: 5m
labels:
severity: warning
annotations:
summary: "Vector search P99 latency above 1s"
5 Common Pitfalls and Solutions
1. Hardcoded Database Connection Strings in ETL Scripts
Problem: Connection strings are hardcoded in DAG code — changing environments requires code changes, and passwords leak to Git.
Solution: Use Airflow Connections and Environment Variables.
from airflow.hooks.base import BaseHook
def get_connection():
conn = BaseHook.get_connection("postgres_source")
return f"postgresql://{conn.login}:{conn.password}@{conn.host}:{conn.port}/{conn.schema}"
# Or use environment variables
import os
db_url = os.environ["DATABASE_URL"]
2. Data Validation Only Checks Schema, Not Statistics
Problem: Schema validation passes but data distribution is abnormal (e.g., all ages are 999). Pydantic only validates format, not statistics.
Solution: Pydantic for format validation + Great Expectations for statistical distribution validation — double defense line.
# Pydantic: Format validation
class UserSchema(BaseModel):
age: int = Field(..., ge=0, le=150)
# Great Expectations: Statistical validation
ge_df.expect_column_mean_to_be_between("age", min_value=18, max_value=65)
ge_df.expect_column_values_to_be_in_set("country", ["CN", "US", "JP", "UK"])
3. Feature Engineering Ignores Data Leakage
Problem: Using future data to compute features (e.g., normalizing with full dataset mean) — great training performance, catastrophic in production.
Solution: Strictly use training set statistics for transformations; time-window features must only use historical data.
from sklearn.preprocessing import StandardScaler
train_stats = df[df["date"] < "2026-01-01"]["amount"].agg(["mean", "std"])
df["amount_scaled"] = (df["amount"] - train_stats["mean"]) / train_stats["std"]
4. Vector Data Without Incremental Updates
Problem: Full vector index rebuild every time — 100K documents takes 2 hours, data is never current.
Solution: Document hash-based incremental updates — only process changed documents.
def incremental_update(documents: list[Document], store: VectorStore) -> dict:
updated = 0
for doc in documents:
content_hash = hashlib.md5(doc.content.encode()).hexdigest()
existing = store.get_doc_metadata(doc.doc_id)
if existing and existing.get("content_hash") == content_hash:
continue
store.delete_by_doc_id(doc.doc_id)
chunks = pipeline.chunker.chunk_document(doc)
chunks = pipeline.embedder.embed_chunks(chunks)
for chunk in chunks:
chunk.metadata["content_hash"] = content_hash
store.upsert_chunks(chunks)
updated += 1
return {"checked": len(documents), "updated": updated}
5. Monitoring Only Checks Pipeline Execution, Not Data Quality
Problem: Airflow shows DAG success, but data volume dropped 90% or feature values are all NULL — the production model is already broken.
Solution: Add data quality gates at the end of DAGs — block downstream if quality is below threshold.
def quality_gate_check(**context):
ti = context["ti"]
loaded_count = ti.xcom_pull(task_ids="load", key="return_value")
expected_min = context["params"]["expected_min_records"]
if loaded_count < expected_min:
raise ValueError(f"Quality gate failed: {loaded_count} < {expected_min}")
quality_score = run_quality_checks()
if quality_score < 0.9:
raise ValueError(f"Quality gate failed: score={quality_score:.2f}")
return "PASSED"
quality_gate = PythonOperator(
task_id="quality_gate",
python_callable=quality_gate_check,
params={"expected_min_records": 10000},
)
10 Common Error Troubleshooting
| # | Error Message | Cause | Solution |
|---|---|---|---|
| 1 | Airflow DAG import error |
Python path or missing dependencies | Check PYTHONPATH, ensure requirements.txt is fully installed |
| 2 | Pydantic ValidationError: field required |
Upstream data field missing or renamed | Add Optional defaults, configure model_config = {"extra": "allow"} for compatibility |
| 3 | Great Expectations expectation failed |
Data distribution drift or dirty data | Check data source changes, adjust expectation thresholds or add data cleaning steps |
| 4 | Polars SchemaError: dtype mismatch |
Type inference errors during Pandas→Polars conversion | Explicitly specify schema: pl.DataFrame(data, schema={"col": pl.Float64}) |
| 5 | OpenAI API RateLimitError |
Embedding requests exceed API rate limit | Implement batch+retry: batch calls, add tenacity exponential backoff retry |
| 6 | numpy.linalg.LinAlgError in cosine similarity |
Vector dimension mismatch or zero vectors | Check vector norms before normalization: np.linalg.norm(v) > 1e-8 |
| 7 | Prometheus duplicate label error |
Same label combination registering metrics repeatedly | Use module-level metric definitions, avoid creating Counter/Gauge inside functions |
| 8 | MemoryError on large dataset groupby |
Pandas full aggregation runs out of memory | Switch to Polars or chunk processing: pd.read_csv(chunksize=100000) |
| 9 | PSI calculation returns NaN |
Some bucket frequencies are zero after binning | Add smoothing: probs = np.clip(probs, 1e-6, None) |
| 10 | Airflow task timeout |
Single task execution exceeds time limit | Increase execution_timeout, or split into multiple sub-tasks for parallel execution |
Advanced Optimization Techniques
1. Incremental ETL: Process Only Changed Data
Full ETL is unsustainable at scale — use CDC (Change Data Capture) or timestamp markers to process only increments.
def incremental_extract(last_run_time: str) -> pd.DataFrame:
conn = get_connection()
query = f"""
SELECT * FROM user_events
WHERE updated_at > '{last_run_time}'
ORDER BY updated_at
"""
return pd.read_sql(query, conn)
def get_last_run_time(pipeline_name: str) -> str:
import redis
r = redis.Redis()
return r.get(f"pipeline:{pipeline_name}:last_run") or "2026-01-01T00:00:00"
def save_last_run_time(pipeline_name: str, run_time: str):
import redis
r = redis.Redis()
r.set(f"pipeline:{pipeline_name}:last_run", run_time)
2. Feature Caching and Version Management
Cache feature computation results in Redis to avoid recomputation; version management ensures training and inference use the same features.
import redis
import json
import hashlib
class FeatureCache:
def __init__(self, redis_url: str = "redis://localhost:6379/2"):
self.r = redis.from_url(redis_url)
self.ttl = 86400
def _cache_key(self, feature_name: str, entity_id: str, version: str) -> str:
raw = f"{feature_name}:{entity_id}:{version}"
return f"feat:{hashlib.md5(raw.encode()).hexdigest()}"
def get(self, feature_name: str, entity_id: str, version: str = "v1") -> dict | None:
key = self._cache_key(feature_name, entity_id, version)
cached = self.r.get(key)
return json.loads(cached) if cached else None
def set(self, feature_name: str, entity_id: str, value: dict, version: str = "v1"):
key = self._cache_key(feature_name, entity_id, version)
self.r.setex(key, self.ttl, json.dumps(value, default=str))
3. Embedding Batch Parallel Acceleration
Single embedding calls are too slow — batch + async parallel dramatically improves throughput.
import asyncio
from openai import AsyncOpenAI
async def async_embed_batch(texts: list[str], batch_size: int = 2048,
model: str = "text-embedding-3-small") -> list[list[float]]:
client = AsyncOpenAI()
all_embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
response = await client.embeddings.create(input=batch, model=model)
all_embeddings.extend([item.embedding for item in response.data])
return all_embeddings
async def parallel_embed(documents: list[str], concurrency: int = 5) -> list[list[float]]:
semaphore = asyncio.Semaphore(concurrency)
async def limited_embed(batch):
async with semaphore:
return await async_embed_batch(batch)
tasks = [limited_embed(documents[i:i+100]) for i in range(0, len(documents), 100)]
results = await asyncio.gather(*tasks)
return [emb for batch in results for emb in batch]
4. Vector Index Optimization: HNSW Parameter Tuning
PGVector's HNSW index parameters directly affect retrieval accuracy and speed.
-- Create HNSW index with tuned parameters
CREATE INDEX CONCURRENTLY idx_documents_embedding_hnsw
ON documents USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 200);
-- Set ef_search at query time
SET hnsw.ef_search = 100;
-- Rebuild index after bulk inserts
REINDEX INDEX CONCURRENTLY idx_documents_embedding_hnsw;
5. Data Quality SLA Automation
Write data quality check results to an SLA table and automatically calculate SLA compliance rates.
from datetime import datetime
import psycopg2
def record_sla(pipeline_name: str, check_name: str, score: float,
threshold: float, passed: bool):
conn = psycopg2.connect(dsn=os.environ["WAREHOUSE_URL"])
cur = conn.cursor()
cur.execute("""
INSERT INTO data_quality_sla
(pipeline_name, check_name, score, threshold, passed, checked_at)
VALUES (%s, %s, %s, %s, %s, %s)
""", (pipeline_name, check_name, score, threshold, passed, datetime.utcnow()))
conn.commit()
conn.close()
def get_sla_rate(pipeline_name: str, days: int = 30) -> float:
conn = psycopg2.connect(dsn=os.environ["WAREHOUSE_URL"])
cur = conn.cursor()
cur.execute("""
SELECT COUNT(*) FILTER (WHERE passed) * 100.0 / COUNT(*)
FROM data_quality_sla
WHERE pipeline_name = %s AND checked_at >= NOW() - INTERVAL '%s days'
""", (pipeline_name, days))
rate = cur.fetchone()[0]
conn.close()
return float(rate) if rate else 0.0
Comparison Analysis
| Dimension | Apache Airflow | Prefect | Dagster | Luigi | Mage |
|---|---|---|---|---|---|
| Orchestration | DAG definition | Functional+Decorators | Asset definition | Task dependency | Notebook-style |
| Data Validation | Manual integration | Built-in | Native Type system | None | Built-in |
| Feature Management | Manual integration | Manual integration | Native Asset | None | Manual integration |
| Vector Pipeline | Manual integration | Manual integration | Manual integration | None | Manual integration |
| Monitoring | Basic UI | Native Cloud UI | Native Dagit | Basic | Native UI |
| Learning Curve | Medium | Low | Medium-High | Low | Low |
| Community | Most mature | Fast-growing | Growing | Declining | Emerging |
| Python Native | Yes | Yes | Yes | Yes | Yes |
| 2026 Recommendation | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐ | ⭐⭐⭐ |
| Best For | Enterprise ETL | SMB quick start | Data asset intensive | Simple batch | Data analytics teams |
| Data Validation | Pydantic v2 | Great Expectations | Pandera | TFX Data Validation |
|---|---|---|---|---|
| Validation Speed | Very fast (Rust core) | Medium | Fast | Medium |
| Schema Definition | Python class | YAML/Code | Python class | Protobuf |
| Statistical Validation | Not supported | Strong | Medium | Strong |
| Custom Rules | Flexible | Flexible | Medium | Medium |
| Data Profiling | Not supported | Strong | Not supported | Strong |
| Best For | Format validation | Comprehensive data quality | Lightweight schema | TensorFlow ecosystem |
Recommended Online Tools
- JSON Formatter — Format JSON when debugging Airflow DAG configs and pipeline data
- Base64 Encode — Handle vector data transmission and API authentication encoding
- cURL to Code — Convert API debugging cURL commands to Python code for ETL extraction
Summary
Python AI data pipelines in 2026 have formed a mature methodology: Airflow orchestration is the backbone — DAGs define task dependencies, automatic retries and alerting ensure reliability; Data validation is the immune system — Pydantic intercepts format errors, Great Expectations intercepts statistical anomalies; Feature engineering is the core value — Pandas for backward compatibility, Polars for large-data acceleration; Vector pipelines are AI infrastructure — smart chunking + incremental updates ensure RAG data freshness; Monitoring is the eyes of production — Prometheus collects metrics, data quality SLAs provide quantifiable guarantees. Remember: pipelines without data quality guarantees run faster only to fail faster.
Further Reading
Try these browser-local tools — no sign-up required →