Python AIデータパイプライン実践:ETLからフィーチャーストアまでの5つのプロダクションパターン
データ品質崩壊、ETLスクリプト制御不能、フィーチャー本番反映に1週間
午前3時、上流のデータフィールドがuser_idからuserIdに変更され、モデル学習タスクがクラッシュ — 夜明けまでデバッグ。特徴エンジニアリングスクリプトは10個のJupyter Notebookに散在し、新規フィーチャーの本番反映にはデータエンジニアのスケジュール待ちで1週間。RAGシステムのベクトルデータは常にソースドキュメントと同期されておらず、検索結果が的外れ。2026年、Python AIデータパイプラインは「Nice to Have」から「Must Have」へ — Airflow 3.0はネイティブasync defサポートを、Pydantic v2は5〜50倍のデータ検証パフォーマンス向上を、Polarsは特徴エンジニアリングでPandasの10倍以上の高速化を実現。
本記事では5つのプロダクションパターンを通じて、Airflow ETLオーケストレーション→Pydantic+GEデータ検証→Pandas/Polars特徴エンジニアリング→RAGベクトルデータパイプライン→Prometheus本番モニタリングの全链路実践を、各ステップで完全に実行可能なPythonコードとともに解説。
主要なポイント
- Apache Airflow 3.0のDAGオーケストレーションパターンを習得し、ETLタスクの依存管理と失敗リトライを実現
- Pydantic v2 + Great Expectationsでデータ検証防御線を構築し、ダーティーデータのパイプライン侵入を拒否
- Pandas + Polarsハイブリッドアプローチで特徴エンジニアリングを実行し、エコシステム互換性と計算パフォーマンスのバランスを確保
- ドキュメントチャンキングからEmbedding、ベクトルストレージまでの完全なRAGベクトルデータパイプラインを構築
- Prometheus + Grafanaデータ品質モニタリングを実装し、パイプライン異常を秒単位でアラート
- 5つの一般的なデータパイプラインの落とし穴とプロダクションレベルのソリューションを理解
- 増分処理、キャッシング、並列高速化などの高度な最適化手法を習得
目次
- Architecture Overview:AIデータパイプライン全体アーキテクチャ
- Pattern 1:ETL Pipeline with Apache Airflow + Python
- Pattern 2:Data Validation with Pydantic v2 + Great Expectations
- Pattern 3:Feature Engineering Pipeline with Pandas + Polars
- Pattern 4:Vector Data Pipeline for RAG
- Pattern 5:Production Monitoring with Prometheus + Data Quality Alerts
- 5 Common Pitfalls and Solutions
- 10 Common Error Troubleshooting
- Advanced Optimization Techniques
- Comparison Analysis
- Recommended Online Tools
- Summary & Further Reading
Architecture Overview
┌─────────────────────────────────────────────────────────────────┐
│ AI Data Pipeline Architecture │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────────┐ ┌──────────────────────┐ │
│ │ Data │ │ Airflow DAG │ │ Pydantic + GE │ │
│ │ Sources │───▶│ Orchestration│───▶│ Data Validation │ │
│ │ (API/DB/ │ │ (ETL Sched) │ │ (Schema + Rules) │ │
│ │ Files) │ └──────────────┘ └──────────┬───────────┘ │
│ └──────────┘ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Feature Engineering Pipeline │ │
│ │ ┌─────────┐ ┌──────────┐ ┌──────────────────────┐ │ │
│ │ │ Pandas │──▶│ Polars │──▶│ Feature Store │ │ │
│ │ │ (Compat)│ │ (Speed) │ │ (Redis/Feast) │ │ │
│ │ └─────────┘ └──────────┘ └──────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Vector Data Pipeline (RAG) │ │
│ │ ┌──────────┐ ┌──────────┐ ┌───────────────────────┐ │ │
│ │ │ Chunking │─▶│ Embedding│─▶│ Vector Store │ │ │
│ │ │ (Split) │ │ (Model) │ │ (PGVector/Chroma) │ │ │
│ │ └──────────┘ └──────────┘ └───────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Production Monitoring │ │
│ │ ┌────────────┐ ┌───────────┐ ┌────────────────────┐ │ │
│ │ │ Prometheus │─▶│ Grafana │─▶│ AlertManager │ │ │
│ │ │ (Metrics) │ │ (Dashboard)│ │ (Notifications) │ │ │
│ │ └────────────┘ └───────────┘ └────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Pattern 1:ETL Pipeline with Apache Airflow + Python
問題シナリオ
毎日3つのデータベース、2つのAPI、5つのCSVファイルからデータを抽出し、クレンジング後にデータウェアハウスに書き込む必要がある。手動スクリプトでは実行状態を追跡できず、上流の失敗が下流に伝わらず、リトライは完全に手動。
ソリューション
Apache Airflow 3.0を使用してETL DAGをオーケストレーション — タスク依存、自動リトライ、失敗アラートを実現。
# dags/etl_pipeline.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.sensors.python import PythonSensor
default_args = {
"owner": "data-team",
"depends_on_past": False,
"email_on_failure": True,
"email": ["data-team@company.com"],
"retries": 3,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="etl_daily_pipeline",
default_args=default_args,
description="Daily ETL pipeline for AI data",
schedule="0 2 * * *",
start_date=datetime(2026, 1, 1),
catchup=False,
tags=["etl", "production"],
) as dag:
def extract_from_postgres(**context):
import psycopg2
conn = psycopg2.connect(
host="postgres-host",
database="source_db",
user="reader",
password="***",
)
cur = conn.cursor()
cur.execute("SELECT * FROM user_events WHERE created_at >= CURRENT_DATE - INTERVAL '1 day'")
rows = cur.fetchall()
conn.close()
context["ti"].xcom_push(key="postgres_rows", value=len(rows))
return rows
def extract_from_api(**context):
import requests
resp = requests.get(
"https://api.example.com/v2/data",
headers={"Authorization": "Bearer ***"},
params={"date": context["ds"]},
)
resp.raise_for_status()
data = resp.json()
context["ti"].xcom_push(key="api_records", value=len(data))
return data
def extract_from_files(**context):
import glob
import pandas as pd
files = glob.glob(f"/data/input/{context['ds']}/*.csv")
frames = [pd.read_csv(f) for f in files]
combined = pd.concat(frames, ignore_index=True) if frames else pd.DataFrame()
context["ti"].xcom_push(key="file_count", value=len(files))
return combined.to_dict("records")
def transform_data(**context):
import pandas as pd
ti = context["ti"]
pg_rows = ti.xcom_pull(task_ids="extract_postgres", key="return_value")
api_data = ti.xcom_pull(task_ids="extract_api", key="return_value")
file_data = ti.xcom_pull(task_ids="extract_files", key="return_value")
df_pg = pd.DataFrame(pg_rows)
df_api = pd.DataFrame(api_data)
df_files = pd.DataFrame(file_data)
df_all = pd.concat([df_pg, df_api, df_files], ignore_index=True)
df_all = df_all.drop_duplicates(subset=["user_id", "event_id"])
df_all = df_all.dropna(subset=["user_id", "event_type"])
df_all["event_date"] = pd.to_datetime(df_all["created_at"]).dt.date
context["ti"].xcom_push(key="total_records", value=len(df_all))
return df_all.to_dict("records")
def load_to_warehouse(**context):
import psycopg2
ti = context["ti"]
records = ti.xcom_pull(task_ids="transform", key="return_value")
conn = psycopg2.connect(
host="warehouse-host",
database="analytics_db",
user="loader",
password="***",
)
cur = conn.cursor()
cur.execute("TRUNCATE TABLE staging.user_events_daily")
insert_sql = """
INSERT INTO staging.user_events_daily
(user_id, event_id, event_type, event_date, created_at)
VALUES (%s, %s, %s, %s, %s)
"""
batch = [(r["user_id"], r["event_id"], r["event_type"],
str(r["event_date"]), r["created_at"]) for r in records[:1000]]
cur.executemany(insert_sql, batch)
conn.commit()
conn.close()
return len(batch)
def data_quality_check(**context):
ti = context["ti"]
total = ti.xcom_pull(task_ids="transform", key="total_records")
loaded = ti.xcom_pull(task_ids="load", key="return_value")
if total and loaded and loaded / total < 0.95:
raise ValueError(f"Data quality check failed: only {loaded}/{total} records loaded")
return "PASSED"
start = EmptyOperator(task_id="start")
extract_pg = PythonOperator(task_id="extract_postgres", python_callable=extract_from_postgres)
extract_api = PythonOperator(task_id="extract_api", python_callable=extract_from_api)
extract_files = PythonOperator(task_id="extract_files", python_callable=extract_from_files)
transform = PythonOperator(task_id="transform", python_callable=transform_data)
load = PythonOperator(task_id="load", python_callable=load_to_warehouse)
quality_check = PythonOperator(task_id="quality_check", python_callable=data_quality_check)
end = EmptyOperator(task_id="end")
start >> [extract_pg, extract_api, extract_files] >> transform >> load >> quality_check >> end
# Airflow起動
pip install apache-airflow==3.0.0
airflow db init
airflow dags list
airflow dags trigger etl_daily_pipeline
Pattern 2:Data Validation with Pydantic v2 + Great Expectations
問題シナリオ
上流システムが密かにフィールドタイプを変更 — ageがintからstringに、emailフォーマットの検証がなくなり、ダーティーデータがそのままモデル学習に流入し、精度が暴落してからようやく発覚。
ソリューション
Pydantic v2でスキーマ検証、Great Expectationsで統計ルール検証 — ダーティーデータに対する二重防御線。
# schemas.py
from pydantic import BaseModel, Field, EmailStr, field_validator
from datetime import datetime
from typing import Optional
class UserEventSchema(BaseModel):
user_id: str = Field(..., min_length=1, max_length=64, description="ユーザー一意識別子")
event_id: str = Field(..., pattern=r"^evt_[a-zA-Z0-9]{16}$", description="イベントID形式")
event_type: str = Field(..., description="イベントタイプ")
event_date: datetime = Field(..., description="イベント日時")
age: int = Field(..., ge=0, le=150, description="ユーザー年齢")
email: EmailStr = Field(..., description="ユーザーメール")
amount: Optional[float] = Field(None, ge=0, description="金額")
@field_validator("event_type")
@classmethod
def validate_event_type(cls, v):
allowed = {"click", "purchase", "signup", "logout", "view"}
if v not in allowed:
raise ValueError(f"event_type must be one of {allowed}, got {v}")
return v
model_config = {"extra": "forbid"}
class BatchValidationResult(BaseModel):
total: int
valid: int
invalid: int
errors: list[dict]
# validation_pipeline.py
from schemas import UserEventSchema, BatchValidationResult
from great_expectations.dataset import PandasDataset
import pandas as pd
import json
def validate_with_pydantic(records: list[dict]) -> BatchValidationResult:
valid_records = []
errors = []
for i, record in enumerate(records):
try:
UserEventSchema(**record)
valid_records.append(record)
except Exception as e:
errors.append({"index": i, "record": record, "error": str(e)})
return BatchValidationResult(
total=len(records),
valid=len(valid_records),
invalid=len(errors),
errors=errors,
)
def validate_with_great_expectations(df: pd.DataFrame) -> dict:
ge_df = PandasDataset(df)
results = {}
results["row_count"] = ge_df.expect_table_row_count_to_be_between(min_value=1, max_value=10_000_000)
results["user_id_not_null"] = ge_df.expect_column_values_to_not_be_null("user_id")
results["age_range"] = ge_df.expect_column_values_to_be_between("age", min_value=0, max_value=150)
results["event_type_set"] = ge_df.expect_column_values_to_be_in_set(
"event_type", ["click", "purchase", "signup", "logout", "view"]
)
results["email_format"] = ge_df.expect_column_values_to_match_regex("email", r"^[\w\.-]+@[\w\.-]+\.\w+$")
results["amount_non_negative"] = ge_df.expect_column_values_to_be_between(
"amount", min_value=0, max_value=None
)
passed = sum(1 for r in results.values() if r["success"])
failed = len(results) - passed
return {"total_expectations": len(results), "passed": passed, "failed": failed, "details": results}
def run_validation_pipeline(records: list[dict]) -> dict:
pydantic_result = validate_with_pydantic(records)
valid_df = pd.DataFrame([r for i, r in enumerate(records) if i not in {e["index"] for e in pydantic_result.errors}])
ge_result = validate_with_great_expectations(valid_df) if not valid_df.empty else {"total_expectations": 0, "passed": 0, "failed": 0}
return {
"pydantic_validation": pydantic_result.model_dump(),
"ge_validation": ge_result,
"overall_pass": pydantic_result.invalid == 0 and ge_result.get("failed", 0) == 0,
}
if __name__ == "__main__":
test_data = [
{"user_id": "u001", "event_id": "evt_abc123def456gh7", "event_type": "click",
"event_date": "2026-06-16T10:00:00", "age": 28, "email": "user@test.com", "amount": 99.5},
{"user_id": "", "event_id": "evt_xyz789abc012ij3", "event_type": "signup",
"event_date": "2026-06-16T11:00:00", "age": 35, "email": "new@test.com", "amount": 0},
{"user_id": "u003", "event_id": "evt_bad_format", "event_type": "hack",
"event_date": "2026-06-16T12:00:00", "age": -5, "email": "not-an-email", "amount": -10},
]
result = run_validation_pipeline(test_data)
print(json.dumps(result, indent=2, default=str))
Pattern 3:Feature Engineering Pipeline with Pandas + Polars
問題シナリオ
特徴エンジニアリングスクリプトはPandasで書かれている。データ量が100万から5000万に増加すると、groupby+aggが30分経っても結果を返さない。Polarsへの移行は望ましいが、既存コードが多すぎて完全な書き直しは非現実的。
ソリューション
Pandasは小規模データの後方互換性を処理し、Polarsは大規模データの高速化を担当 — Arrow形式によるゼロコピー相互運用。
# feature_pipeline.py
import pandas as pd
import polars as pl
from datetime import datetime, timedelta
import numpy as np
def generate_sample_data(n: int = 1_000_000) -> pd.DataFrame:
np.random.seed(42)
return pd.DataFrame({
"user_id": np.random.choice([f"u{i:04d}" for i in range(1000)], n),
"event_type": np.random.choice(["click", "purchase", "view", "signup"], n),
"amount": np.random.exponential(50, n).round(2),
"event_time": pd.date_range("2026-01-01", periods=n, freq="30s"),
"category": np.random.choice(["electronics", "clothing", "food", "books"], n),
})
def pandas_features(df: pd.DataFrame) -> pd.DataFrame:
user_agg = df.groupby("user_id").agg(
total_events=("event_type", "count"),
total_amount=("amount", "sum"),
avg_amount=("amount", "mean"),
unique_categories=("category", "nunique"),
first_event=("event_time", "min"),
last_event=("event_time", "max"),
)
user_agg["active_days"] = (user_agg["last_event"] - user_agg["first_event"]).dt.days + 1
user_agg["avg_daily_events"] = user_agg["total_events"] / user_agg["active_days"].clip(lower=1)
return user_agg.reset_index()
def polars_features(df: pd.DataFrame) -> pd.DataFrame:
pl_df = pl.from_pandas(df)
result = pl_df.group_by("user_id").agg([
pl.col("event_type").count().alias("total_events"),
pl.col("amount").sum().alias("total_amount"),
pl.col("amount").mean().alias("avg_amount"),
pl.col("category").n_unique().alias("unique_categories"),
pl.col("event_time").min().alias("first_event"),
pl.col("event_time").max().alias("last_event"),
]).with_columns([
(pl.col("last_event") - pl.col("first_event")).dt.total_days().add(1).alias("active_days"),
]).with_columns([
(pl.col("total_events") / pl.col("active_days").clip(lower=1)).alias("avg_daily_events"),
])
return result.to_pandas()
def time_window_features(df: pd.DataFrame, windows: list[str] = None) -> pd.DataFrame:
if windows is None:
windows = ["7d", "30d", "90d"]
pl_df = pl.from_pandas(df)
reference_date = pl_df.select(pl.col("event_time").max()).item()
all_features = []
for window in windows:
days = int(window.replace("d", ""))
cutoff = reference_date - timedelta(days=days)
window_feat = pl_df.filter(pl.col("event_time") >= cutoff).group_by("user_id").agg([
pl.col("amount").sum().alias(f"amount_sum_{window}"),
pl.col("amount").mean().alias(f"amount_mean_{window}"),
pl.col("event_type").count().alias(f"event_count_{window}"),
pl.col("category").n_unique().alias(f"category_nunique_{window}"),
])
all_features.append(window_feat)
from functools import reduce
merged = reduce(lambda a, b: a.join(b, on="user_id", how="outer"), all_features)
return merged.to_pandas()
def category_encoding_features(df: pd.DataFrame) -> pd.DataFrame:
pl_df = pl.from_pandas(df)
pivot = pl_df.group_by(["user_id", "category"]).agg([
pl.col("amount").sum().alias("category_amount"),
pl.col("event_type").count().alias("category_count"),
]).pivot(
index="user_id",
on="category",
values=["category_amount", "category_count"],
).fill_null(0)
return pivot.to_pandas()
def run_feature_pipeline():
print("Generating sample data...")
df = generate_sample_data(1_000_000)
print(f"Data shape: {df.shape}")
print("\n--- Pandas Feature Engineering ---")
start = datetime.now()
pandas_result = pandas_features(df)
print(f"Pandas time: {(datetime.now() - start).total_seconds():.2f}s")
print(f"Pandas result shape: {pandas_result.shape}")
print("\n--- Polars Feature Engineering ---")
start = datetime.now()
polars_result = polars_features(df)
print(f"Polars time: {(datetime.now() - start).total_seconds():.2f}s")
print(f"Polars result shape: {polars_result.shape}")
print("\n--- Time Window Features ---")
window_result = time_window_features(df)
print(f"Window features shape: {window_result.shape}")
print("\n--- Category Encoding Features ---")
category_result = category_encoding_features(df)
print(f"Category features shape: {category_result.shape}")
if __name__ == "__main__":
run_feature_pipeline()
Pattern 4:Vector Data Pipeline for RAG
問題シナリオ
RAGシステムの本番稼働後、ドキュメントは更新されたがベクトルが同期されておらず、検索結果が常に古い。チャンキング戦略が粗すぎ — 長いテーブルが切り詰められ、コードブロックが途中で分割され、Embedding品質が惨憺たるもの。
ソリューション
完全なベクトルデータパイプラインを構築:スマートチャンキング→Embedding→ベクトルストレージ→増分同期。
# vector_pipeline.py
import hashlib
import json
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class Document:
doc_id: str
content: str
metadata: dict = field(default_factory=dict)
source: str = ""
updated_at: str = ""
@dataclass
class Chunk:
chunk_id: str
doc_id: str
content: str
embedding: Optional[list[float]] = None
metadata: dict = field(default_factory=dict)
class SmartChunker:
def __init__(self, chunk_size: int = 512, chunk_overlap: int = 64,
separators: list[str] = None):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.separators = separators or ["\n\n", "\n", "。", ".", " "]
def chunk_document(self, doc: Document) -> list[Chunk]:
text = doc.content
chunks_text = self._recursive_split(text, self.separators)
chunks = []
for i, text in enumerate(chunks_text):
if len(text.strip()) < 20:
continue
chunk_id = hashlib.md5(f"{doc.doc_id}:{i}:{text[:50]}".encode()).hexdigest()[:16]
chunks.append(Chunk(
chunk_id=chunk_id,
doc_id=doc.doc_id,
content=text,
metadata={**doc.metadata, "chunk_index": i, "source": doc.source},
))
return chunks
def _recursive_split(self, text: str, separators: list[str]) -> list[str]:
if not separators or len(text) <= self.chunk_size:
return [text] if text.strip() else []
sep = separators[0]
parts = text.split(sep)
result = []
current = ""
for part in parts:
candidate = current + sep + part if current else part
if len(candidate) <= self.chunk_size:
current = candidate
else:
if current:
result.append(current)
if len(part) > self.chunk_size:
sub_splits = self._recursive_split(part, separators[1:])
result.extend(sub_splits)
current = ""
else:
current = part
if current:
result.append(current)
return result
class EmbeddingService:
def __init__(self, model_name: str = "text-embedding-3-small", api_key: str = ""):
self.model_name = model_name
self.api_key = api_key
def embed_texts(self, texts: list[str]) -> list[list[float]]:
try:
from openai import OpenAI
client = OpenAI(api_key=self.api_key)
response = client.embeddings.create(input=texts, model=self.model_name)
return [item.embedding for item in response.data]
except ImportError:
import numpy as np
return [np.random.randn(1536).tolist() for _ in texts]
def embed_chunks(self, chunks: list[Chunk], batch_size: int = 100) -> list[Chunk]:
for i in range(0, len(chunks), batch_size):
batch = chunks[i:i + batch_size]
texts = [c.content for c in batch]
embeddings = self.embed_texts(texts)
for j, chunk in enumerate(batch):
chunk.embedding = embeddings[j]
return chunks
class VectorStore:
def __init__(self, connection_string: str = ""):
self.connection_string = connection_string
self._store: dict[str, Chunk] = {}
def upsert_chunks(self, chunks: list[Chunk]):
for chunk in chunks:
self._store[chunk.chunk_id] = chunk
def search(self, query_embedding: list[float], top_k: int = 5) -> list[Chunk]:
import numpy as np
query_vec = np.array(query_embedding)
scores = []
for chunk in self._store.values():
if chunk.embedding is None:
continue
chunk_vec = np.array(chunk.embedding)
similarity = np.dot(query_vec, chunk_vec) / (np.linalg.norm(query_vec) * np.linalg.norm(chunk_vec) + 1e-8)
scores.append((chunk, float(similarity)))
scores.sort(key=lambda x: x[1], reverse=True)
return [chunk for chunk, _ in scores[:top_k]]
def delete_by_doc_id(self, doc_id: str):
to_delete = [cid for cid, c in self._store.items() if c.doc_id == doc_id]
for cid in to_delete:
del self._store[cid]
class VectorPipeline:
def __init__(self, chunk_size: int = 512, model_name: str = "text-embedding-3-small"):
self.chunker = SmartChunker(chunk_size=chunk_size)
self.embedder = EmbeddingService(model_name=model_name)
self.store = VectorStore()
def ingest_documents(self, documents: list[Document]) -> dict:
total_chunks = 0
for doc in documents:
self.store.delete_by_doc_id(doc.doc_id)
chunks = self.chunker.chunk_document(doc)
chunks = self.embedder.embed_chunks(chunks)
self.store.upsert_chunks(chunks)
total_chunks += len(chunks)
return {"documents": len(documents), "chunks": total_chunks}
def query(self, question: str, top_k: int = 5) -> list[dict]:
query_embedding = self.embedder.embed_texts([question])[0]
results = self.store.search(query_embedding, top_k=top_k)
return [{"chunk_id": r.chunk_id, "content": r.content[:200],
"metadata": r.metadata} for r in results]
if __name__ == "__main__":
pipeline = VectorPipeline(chunk_size=256)
docs = [
Document(doc_id="doc_001", content="Pythonは広く使用されている高水準プログラミング言語です。"
"その設計哲学はコードの可読性を強調しています。"
"Pythonはオブジェクト指向、命令型、関数型、手続き型など、複数のプログラミングパラダイムをサポートしています。"
"Pythonはネットワーク通信、テキスト処理、データベースインターフェースなど、大規模な標準ライブラリを備えています。",
metadata={"source": "wiki", "category": "programming"},
source="wikipedia"),
Document(doc_id="doc_002", content="Apache Airflowはオープンソースのワークフロー管理プラットフォームです。"
"ユーザーはプログラムでワークフローを作成、スケジュール、監視できます。"
"Airflowの中核概念はDAG(有向非巡回グラフ)で、タスク間の依存関係を定義します。"
"2026年にリリースされたAirflow 3.0はネイティブasyncサポートをもたらしました。",
metadata={"source": "docs", "category": "data-engineering"},
source="airflow_docs"),
]
result = pipeline.ingest_documents(docs)
print(f"Ingestion result: {result}")
query_result = pipeline.query("Pythonプログラミング言語とは?", top_k=3)
print(f"\nQuery results: {json.dumps(query_result, indent=2, ensure_ascii=False)}")
Pattern 5:Production Monitoring with Prometheus + Data Quality Alerts
問題シナリオ
データパイプラインがサイレントに失敗 — データ遅延3時間に気づかず。特徴値の分布ドリフトに誰も気づかず、モデル推論結果がますます不正確に。ベクトルストアの検索レイテンシP99が50msから5sに急増し、ユーザーからの苦情でようやく発覚。
ソリューション
Prometheusでメトリクスを収集、カスタムデータ品質メトリクスを定義、Grafanaで可視化、AlertManagerでアラート。
# monitoring.py
import time
import threading
from dataclasses import dataclass, field
from typing import Callable, Optional
from prometheus_client import Counter, Gauge, Histogram, start_http_server
pipeline_runs_total = Counter(
"pipeline_runs_total",
"Total pipeline runs",
["pipeline_name", "status"],
)
pipeline_duration_seconds = Histogram(
"pipeline_duration_seconds",
"Pipeline execution duration",
["pipeline_name"],
buckets=[10, 30, 60, 120, 300, 600, 1800, 3600],
)
records_processed = Counter(
"records_processed_total",
"Total records processed",
["pipeline_name", "stage"],
)
data_quality_score = Gauge(
"data_quality_score",
"Data quality score (0-1)",
["pipeline_name", "check_name"],
)
feature_drift_score = Gauge(
"feature_drift_score",
"Feature drift score (PSI)",
["pipeline_name", "feature_name"],
)
vector_search_latency = Histogram(
"vector_search_latency_seconds",
"Vector search latency",
["pipeline_name"],
buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0],
)
@dataclass
class QualityCheck:
name: str
check_fn: Callable[[dict], float]
threshold: float = 0.8
@dataclass
class DriftAlert:
feature_name: str
psi_score: float
threshold: float = 0.2
is_drifted: bool = False
class PipelineMonitor:
def __init__(self, pipeline_name: str, metrics_port: int = 9090):
self.pipeline_name = pipeline_name
self.metrics_port = metrics_port
self._quality_checks: list[QualityCheck] = []
self._drift_threshold = 0.2
self._server_started = False
def start_metrics_server(self):
if not self._server_started:
start_http_server(self.metrics_port)
self._server_started = True
print(f"Metrics server started on port {self.metrics_port}")
def add_quality_check(self, name: str, check_fn: Callable, threshold: float = 0.8):
self._quality_checks.append(QualityCheck(name=name, check_fn=check_fn, threshold=threshold))
def record_pipeline_run(self, status: str, duration: float):
pipeline_runs_total.labels(self.pipeline_name, status).inc()
pipeline_duration_seconds.labels(self.pipeline_name).observe(duration)
def record_records_processed(self, stage: str, count: int):
records_processed.labels(self.pipeline_name, stage).inc(count)
def run_quality_checks(self, data: dict) -> dict:
results = {}
for check in self._quality_checks:
score = check.check_fn(data)
data_quality_score.labels(self.pipeline_name, check.name).set(score)
passed = score >= check.threshold
results[check.name] = {"score": score, "threshold": check.threshold, "passed": passed}
if not passed:
print(f"ALERT: Quality check '{check.name}' failed: score={score:.3f} < threshold={check.threshold}")
return results
def check_feature_drift(self, reference_stats: dict, current_stats: dict) -> list[DriftAlert]:
alerts = []
for feature_name in reference_stats:
if feature_name not in current_stats:
continue
psi = self._calculate_psi(reference_stats[feature_name], current_stats[feature_name])
feature_drift_score.labels(self.pipeline_name, feature_name).set(psi)
is_drifted = psi > self._drift_threshold
alert = DriftAlert(
feature_name=feature_name,
psi_score=psi,
threshold=self._drift_threshold,
is_drifted=is_drifted,
)
alerts.append(alert)
if is_drifted:
print(f"ALERT: Feature drift detected for '{feature_name}': PSI={psi:.4f} > threshold={self._drift_threshold}")
return alerts
@staticmethod
def _calculate_psi(expected: dict, actual: dict, bucket_count: int = 10) -> float:
import numpy as np
e_vals = np.array(list(expected.values())) if isinstance(expected, dict) else np.array(expected)
a_vals = np.array(list(actual.values())) if isinstance(actual, dict) else np.array(actual)
e_probs = e_vals / e_vals.sum() if e_vals.sum() > 0 else np.ones_like(e_vals) / len(e_vals)
a_probs = a_vals / a_vals.sum() if a_vals.sum() > 0 else np.ones_like(a_vals) / len(a_vals)
e_probs = np.clip(e_probs, 1e-6, None)
a_probs = np.clip(a_probs, 1e-6, None)
psi = float(np.sum((a_probs - e_probs) * np.log(a_probs / e_probs)))
return psi
def record_vector_search_latency(self, latency: float):
vector_search_latency.labels(self.pipeline_name).observe(latency)
if __name__ == "__main__":
monitor = PipelineMonitor("ai_data_pipeline", metrics_port=9090)
monitor.start_metrics_server()
monitor.add_quality_check(
"null_rate",
lambda d: 1.0 - d.get("null_count", 0) / max(d.get("total", 1), 1),
threshold=0.95,
)
monitor.add_quality_check(
"schema_compliance",
lambda d: d.get("valid_count", 0) / max(d.get("total", 1), 1),
threshold=0.99,
)
start_time = time.time()
monitor.record_records_processed("extract", 50000)
monitor.record_records_processed("transform", 48000)
monitor.record_records_processed("load", 47500)
duration = time.time() - start_time
monitor.record_pipeline_run("success", duration)
quality_data = {"null_count": 50, "total": 50000, "valid_count": 49800}
quality_results = monitor.run_quality_checks(quality_data)
print(f"Quality results: {quality_results}")
ref_stats = {"age": {"bins": [100, 200, 300, 250, 150]}, "amount": {"bins": [50, 150, 300, 350, 150]}}
cur_stats = {"age": {"bins": [80, 180, 350, 250, 140]}, "amount": {"bins": [50, 150, 300, 350, 150]}}
drift_alerts = monitor.check_feature_drift(
{k: v["bins"] for k, v in ref_stats.items()},
{k: v["bins"] for k, v in cur_stats.items()},
)
print(f"Drift alerts: {[(a.feature_name, a.psi_score, a.is_drifted) for a in drift_alerts]}")
print(f"\nMetrics available at http://localhost:9090/metrics")
time.sleep(300)
# prometheus.yml
global:
scrape_interval: 15s
scrape_configs:
- job_name: "data_pipeline"
static_configs:
- targets: ["localhost:9090"]
rule_files:
- "alerts.yml"
# alerts.yml
groups:
- name: data_pipeline_alerts
rules:
- alert: PipelineRunFailed
expr: increase(pipeline_runs_total{status="failed"}[5m]) > 0
for: 1m
labels:
severity: critical
annotations:
summary: "Pipeline {{ $labels.pipeline_name }} failed"
- alert: DataQualityDegraded
expr: data_quality_score < 0.8
for: 5m
labels:
severity: warning
annotations:
summary: "Data quality check {{ $labels.check_name }} below threshold"
- alert: FeatureDriftDetected
expr: feature_drift_score > 0.2
for: 10m
labels:
severity: warning
annotations:
summary: "Feature {{ $labels.feature_name }} drift detected (PSI={{ $value }})"
- alert: VectorSearchLatencyHigh
expr: histogram_quantile(0.99, rate(vector_search_latency_seconds_bucket[5m])) > 1.0
for: 5m
labels:
severity: warning
annotations:
summary: "Vector search P99 latency above 1s"
5 Common Pitfalls and Solutions
1. ETLスクリプトにデータベース接続情報をハードコード
問題:接続文字列がDAGコードに直接書かれており、環境変更のたびにコード修正が必要、パスワードがGitに漏洩。
ソリューション:Airflow ConnectionとEnvironment Variableを使用。
from airflow.hooks.base import BaseHook
def get_connection():
conn = BaseHook.get_connection("postgres_source")
return f"postgresql://{conn.login}:{conn.password}@{conn.host}:{conn.port}/{conn.schema}"
# または環境変数を使用
import os
db_url = os.environ["DATABASE_URL"]
2. データ検証がスキーマチェックのみで統計チェックなし
問題:スキーマ検証は通過するがデータ分布が異常(例:ageが全て999)。Pydanticは形式のみ検証し統計は検査しない。
ソリューション:Pydanticで形式検証 + Great Expectationsで統計分布検証 — 二重防御線。
# Pydantic: 形式検証
class UserSchema(BaseModel):
age: int = Field(..., ge=0, le=150)
# Great Expectations: 統計検証
ge_df.expect_column_mean_to_be_between("age", min_value=18, max_value=65)
ge_df.expect_column_values_to_be_in_set("country", ["CN", "US", "JP", "UK"])
3. 特徴エンジニアリングでデータリークを考慮しない
問題:未来のデータで特徴を計算(例:全データの平均で正規化)、学習時は良好だが本番で崩壊。
ソリューション:訓練セットの統計量のみで変換、時窓特徴は履歴データのみ使用。
from sklearn.preprocessing import StandardScaler
train_stats = df[df["date"] < "2026-01-01"]["amount"].agg(["mean", "std"])
df["amount_scaled"] = (df["amount"] - train_stats["mean"]) / train_stats["std"]
4. ベクトルデータの増分更新なし
問題:毎回全量ベクトルインデックスを再構築 — 10万ドキュメントで2時間かかり、データが常に古い。
ソリューション:ドキュメントハッシュベースの増分更新 — 変更部分のみ処理。
def incremental_update(documents: list[Document], store: VectorStore) -> dict:
updated = 0
for doc in documents:
content_hash = hashlib.md5(doc.content.encode()).hexdigest()
existing = store.get_doc_metadata(doc.doc_id)
if existing and existing.get("content_hash") == content_hash:
continue
store.delete_by_doc_id(doc.doc_id)
chunks = pipeline.chunker.chunk_document(doc)
chunks = pipeline.embedder.embed_chunks(chunks)
for chunk in chunks:
chunk.metadata["content_hash"] = content_hash
store.upsert_chunks(chunks)
updated += 1
return {"checked": len(documents), "updated": updated}
5. モニタリングがパイプライン実行のみでデータ品質を見ない
問題:AirflowはDAG成功を表示するが、データ量が90%減少または特徴値が全てNULL — 本番モデルは既に壊れている。
ソリューション:DAGの末尾にデータ品質ゲートを追加 — 品質が閾値未満なら下流をブロック。
def quality_gate_check(**context):
ti = context["ti"]
loaded_count = ti.xcom_pull(task_ids="load", key="return_value")
expected_min = context["params"]["expected_min_records"]
if loaded_count < expected_min:
raise ValueError(f"Quality gate failed: {loaded_count} < {expected_min}")
quality_score = run_quality_checks()
if quality_score < 0.9:
raise ValueError(f"Quality gate failed: score={quality_score:.2f}")
return "PASSED"
quality_gate = PythonOperator(
task_id="quality_gate",
python_callable=quality_gate_check,
params={"expected_min_records": 10000},
)
10 Common Error Troubleshooting
| # | エラーメッセージ | 原因 | ソリューション |
|---|---|---|---|
| 1 | Airflow DAG import error |
Pythonパスまたは依存関係の欠落 | PYTHONPATHを確認、requirements.txtが完全にインストールされているか確認 |
| 2 | Pydantic ValidationError: field required |
上流データフィールドの欠落または名称変更 | Optionalデフォルト値を追加、model_config = {"extra": "allow"}で互換性を設定 |
| 3 | Great Expectations expectation failed |
データ分布ドリフトまたはダーティーデータ混入 | データソース変更を確認、expectation閾値を調整またはデータクレンジングステップを追加 |
| 4 | Polars SchemaError: dtype mismatch |
Pandas→Polars変換時の型推論エラー | スキーマを明示的に指定: pl.DataFrame(data, schema={"col": pl.Float64}) |
| 5 | OpenAI API RateLimitError |
EmbeddingリクエストがAPIレート制限を超過 | バッチ+リトライを実装: バッチ呼び出し、tenacity指数バックオフリトライを追加 |
| 6 | numpy.linalg.LinAlgError in cosine similarity |
ベクトル次元の不一致またはゼロベクトル | 正規化前にベクトルノルムをチェック: np.linalg.norm(v) > 1e-8 |
| 7 | Prometheus duplicate label error |
同じラベル組み合わせでメトリクスが重複登録 | モジュールレベルのメトリクス定義を使用、関数内でのCounter/Gauge重複作成を回避 |
| 8 | MemoryError on large dataset groupby |
Pandas全量集計でメモリ不足 | Polarsに切り替えまたはチャンク処理: pd.read_csv(chunksize=100000) |
| 9 | PSI calculation returns NaN |
ビニング後の一部バケット頻度が0 | スムージングを追加: probs = np.clip(probs, 1e-6, None) |
| 10 | Airflow task timeout |
単一タスクの実行時間が制限を超過 | execution_timeoutを増やす、または複数サブタスクに分割して並列実行 |
Advanced Optimization Techniques
1. 増分ETL:変更データのみ処理
大規模データでの全量ETLは持続不可能 — CDC(Change Data Capture)またはタイムスタンプマーカーで増分のみ処理。
def incremental_extract(last_run_time: str) -> pd.DataFrame:
conn = get_connection()
query = f"""
SELECT * FROM user_events
WHERE updated_at > '{last_run_time}'
ORDER BY updated_at
"""
return pd.read_sql(query, conn)
def get_last_run_time(pipeline_name: str) -> str:
import redis
r = redis.Redis()
return r.get(f"pipeline:{pipeline_name}:last_run") or "2026-01-01T00:00:00"
def save_last_run_time(pipeline_name: str, run_time: str):
import redis
r = redis.Redis()
r.set(f"pipeline:{pipeline_name}:last_run", run_time)
2. フィーチャーキャッシングとバージョン管理
特徴計算結果をRedisにキャッシュして再計算を回避、バージョン管理で学習と推論の同一特徴使用を確保。
import redis
import json
import hashlib
class FeatureCache:
def __init__(self, redis_url: str = "redis://localhost:6379/2"):
self.r = redis.from_url(redis_url)
self.ttl = 86400
def _cache_key(self, feature_name: str, entity_id: str, version: str) -> str:
raw = f"{feature_name}:{entity_id}:{version}"
return f"feat:{hashlib.md5(raw.encode()).hexdigest()}"
def get(self, feature_name: str, entity_id: str, version: str = "v1") -> dict | None:
key = self._cache_key(feature_name, entity_id, version)
cached = self.r.get(key)
return json.loads(cached) if cached else None
def set(self, feature_name: str, entity_id: str, value: dict, version: str = "v1"):
key = self._cache_key(feature_name, entity_id, version)
self.r.setex(key, self.ttl, json.dumps(value, default=str))
3. Embeddingバッチ並列高速化
単一Embedding呼び出しは遅すぎる — バッチ+非同期並列でスループットを劇的に改善。
import asyncio
from openai import AsyncOpenAI
async def async_embed_batch(texts: list[str], batch_size: int = 2048,
model: str = "text-embedding-3-small") -> list[list[float]]:
client = AsyncOpenAI()
all_embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
response = await client.embeddings.create(input=batch, model=model)
all_embeddings.extend([item.embedding for item in response.data])
return all_embeddings
async def parallel_embed(documents: list[str], concurrency: int = 5) -> list[list[float]]:
semaphore = asyncio.Semaphore(concurrency)
async def limited_embed(batch):
async with semaphore:
return await async_embed_batch(batch)
tasks = [limited_embed(documents[i:i+100]) for i in range(0, len(documents), 100)]
results = await asyncio.gather(*tasks)
return [emb for batch in results for emb in batch]
4. ベクトルインデックス最適化:HNSWパラメータチューニング
PGVectorのHNSWインデックスパラメータは検索精度と速度に直接影響。
-- HNSWインデックスを作成、パラメータチューニング
CREATE INDEX CONCURRENTLY idx_documents_embedding_hnsw
ON documents USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 200);
-- クエリ時にef_searchを設定
SET hnsw.ef_search = 100;
-- 大量挿入後にインデックスを再構築
REINDEX INDEX CONCURRENTLY idx_documents_embedding_hnsw;
5. データ品質SLA自動化
データ品質チェック結果をSLAテーブルに書き込み、SLA達成率を自動計算。
from datetime import datetime
import psycopg2
def record_sla(pipeline_name: str, check_name: str, score: float,
threshold: float, passed: bool):
conn = psycopg2.connect(dsn=os.environ["WAREHOUSE_URL"])
cur = conn.cursor()
cur.execute("""
INSERT INTO data_quality_sla
(pipeline_name, check_name, score, threshold, passed, checked_at)
VALUES (%s, %s, %s, %s, %s, %s)
""", (pipeline_name, check_name, score, threshold, passed, datetime.utcnow()))
conn.commit()
conn.close()
def get_sla_rate(pipeline_name: str, days: int = 30) -> float:
conn = psycopg2.connect(dsn=os.environ["WAREHOUSE_URL"])
cur = conn.cursor()
cur.execute("""
SELECT COUNT(*) FILTER (WHERE passed) * 100.0 / COUNT(*)
FROM data_quality_sla
WHERE pipeline_name = %s AND checked_at >= NOW() - INTERVAL '%s days'
""", (pipeline_name, days))
rate = cur.fetchone()[0]
conn.close()
return float(rate) if rate else 0.0
Comparison Analysis
| 次元 | Apache Airflow | Prefect | Dagster | Luigi | Mage |
|---|---|---|---|---|---|
| オーケストレーション | DAG定義 | 関数型+デコレータ | アセット定義 | タスク依存 | Notebook型 |
| データ検証 | 手動統合 | 内蔵 | ネイティブType | なし | 内蔵 |
| フィーチャー管理 | 手動統合 | 手動統合 | ネイティブAsset | なし | 手動統合 |
| ベクトルパイプライン | 手動統合 | 手動統合 | 手動統合 | なし | 手動統合 |
| モニタリング | 基本UI | ネイティブCloud UI | ネイティブDagit | 基本 | ネイティブUI |
| 学習曲線 | 中 | 低 | 中高 | 低 | 低 |
| コミュニティ | 最も成熟 | 急成長 | 成長中 | 衰退 | 新興 |
| Pythonネイティブ | はい | はい | はい | はい | はい |
| 2026推奨度 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐ | ⭐⭐⭐ |
| 適用シナリオ | エンタープライズETL | 中小プロジェクト迅速開始 | データアセット集約型 | シンプルバッチ処理 | データ分析チーム |
| データ検証 | Pydantic v2 | Great Expectations | Pandera | TFX Data Validation |
|---|---|---|---|---|
| 検証速度 | 非常に高速(Rustコア) | 中程度 | 高速 | 中程度 |
| スキーマ定義 | Pythonクラス | YAML/Code | Pythonクラス | Protobuf |
| 統計検証 | 非対応 | 強力 | 中程度 | 強力 |
| カスタムルール | 柔軟 | 柔軟 | 中程度 | 中程度 |
| データプロファイリング | 非対応 | 強力 | 非対応 | 強力 |
| 適用シナリオ | フォーマット検証 | 包括的データ品質 | 軽量スキーマ | TensorFlowエコシステム |
Recommended Online Tools
- JSONフォーマッター — Airflow DAG設定やパイプラインデータのデバッグ時にJSONをフォーマット
- Base64エンコード — ベクトルデータ転送やAPI認証のエンコーディング処理
- cURL to Code — APIデバッグのcURLコマンドをETL抽出用のPythonコードに変換
Summary
2026年のPython AIデータパイプラインは成熟した方法論を形成しています:Airflowオーケストレーションは骨格 — DAGがタスク依存を定義し、自動リトライとアラートで信頼性を確保;データ検証は免疫システム — Pydanticがフォーマットエラーを、Great Expectationsが統計異常をインターセプト;特徴エンジニアリングはコアバリュー — Pandasで後方互換性、Polarsで大規模データ高速化;ベクトルパイプラインはAIインフラ — スマートチャンキング+増分更新でRAGデータの鮮度を確保;モニタリングは本番環境の目 — Prometheusがメトリクスを収集し、データ品質SLAが定量的な保証を提供。覚えておいてください:データ品質保証のないパイプラインは、速く走れば走るほど遠くへ間違えます。
Further Reading
ブラウザローカルツールを無料で試す →