Python AIデータパイプライン実践:ETLからフィーチャーストアまでの5つのプロダクションパターン

AI与大数据

データ品質崩壊、ETLスクリプト制御不能、フィーチャー本番反映に1週間

午前3時、上流のデータフィールドがuser_idからuserIdに変更され、モデル学習タスクがクラッシュ — 夜明けまでデバッグ。特徴エンジニアリングスクリプトは10個のJupyter Notebookに散在し、新規フィーチャーの本番反映にはデータエンジニアのスケジュール待ちで1週間。RAGシステムのベクトルデータは常にソースドキュメントと同期されておらず、検索結果が的外れ。2026年、Python AIデータパイプラインは「Nice to Have」から「Must Have」へ — Airflow 3.0はネイティブasync defサポートを、Pydantic v2は5〜50倍のデータ検証パフォーマンス向上を、Polarsは特徴エンジニアリングでPandasの10倍以上の高速化を実現。

本記事では5つのプロダクションパターンを通じて、Airflow ETLオーケストレーション→Pydantic+GEデータ検証→Pandas/Polars特徴エンジニアリング→RAGベクトルデータパイプライン→Prometheus本番モニタリングの全链路実践を、各ステップで完全に実行可能なPythonコードとともに解説。


主要なポイント

  • Apache Airflow 3.0のDAGオーケストレーションパターンを習得し、ETLタスクの依存管理と失敗リトライを実現
  • Pydantic v2 + Great Expectationsでデータ検証防御線を構築し、ダーティーデータのパイプライン侵入を拒否
  • Pandas + Polarsハイブリッドアプローチで特徴エンジニアリングを実行し、エコシステム互換性と計算パフォーマンスのバランスを確保
  • ドキュメントチャンキングからEmbedding、ベクトルストレージまでの完全なRAGベクトルデータパイプラインを構築
  • Prometheus + Grafanaデータ品質モニタリングを実装し、パイプライン異常を秒単位でアラート
  • 5つの一般的なデータパイプラインの落とし穴とプロダクションレベルのソリューションを理解
  • 増分処理、キャッシング、並列高速化などの高度な最適化手法を習得

目次

  1. Architecture Overview:AIデータパイプライン全体アーキテクチャ
  2. Pattern 1:ETL Pipeline with Apache Airflow + Python
  3. Pattern 2:Data Validation with Pydantic v2 + Great Expectations
  4. Pattern 3:Feature Engineering Pipeline with Pandas + Polars
  5. Pattern 4:Vector Data Pipeline for RAG
  6. Pattern 5:Production Monitoring with Prometheus + Data Quality Alerts
  7. 5 Common Pitfalls and Solutions
  8. 10 Common Error Troubleshooting
  9. Advanced Optimization Techniques
  10. Comparison Analysis
  11. Recommended Online Tools
  12. Summary & Further Reading

Architecture Overview

┌─────────────────────────────────────────────────────────────────┐
│                    AI Data Pipeline Architecture                 │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  ┌──────────┐    ┌──────────────┐    ┌──────────────────────┐  │
│  │  Data     │    │  Airflow DAG │    │  Pydantic + GE       │  │
│  │  Sources  │───▶│  Orchestration│───▶│  Data Validation     │  │
│  │  (API/DB/ │    │  (ETL Sched) │    │  (Schema + Rules)    │  │
│  │   Files)  │    └──────────────┘    └──────────┬───────────┘  │
│  └──────────┘                                    │              │
│                                                   ▼              │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │              Feature Engineering Pipeline                 │  │
│  │  ┌─────────┐   ┌──────────┐   ┌──────────────────────┐ │  │
│  │  │ Pandas  │──▶│  Polars  │──▶│  Feature Store       │ │  │
│  │  │ (Compat)│   │ (Speed)  │   │  (Redis/Feast)       │ │  │
│  │  └─────────┘   └──────────┘   └──────────────────────┘ │  │
│  └──────────────────────────────────────────────────────────┘  │
│                                                   │              │
│                                                   ▼              │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │              Vector Data Pipeline (RAG)                   │  │
│  │  ┌──────────┐  ┌──────────┐  ┌───────────────────────┐ │  │
│  │  │ Chunking │─▶│ Embedding│─▶│  Vector Store         │ │  │
│  │  │ (Split)  │  │ (Model)  │  │  (PGVector/Chroma)    │ │  │
│  │  └──────────┘  └──────────┘  └───────────────────────┘ │  │
│  └──────────────────────────────────────────────────────────┘  │
│                                                   │              │
│                                                   ▼              │
│  ┌──────────────────────────────────────────────────────────┐  │
│  │              Production Monitoring                        │  │
│  │  ┌────────────┐  ┌───────────┐  ┌────────────────────┐ │  │
│  │  │ Prometheus │─▶│  Grafana  │─▶│  AlertManager      │ │  │
│  │  │ (Metrics)  │  │ (Dashboard)│  │  (Notifications)   │ │  │
│  │  └────────────┘  └───────────┘  └────────────────────┘ │  │
│  └──────────────────────────────────────────────────────────┘  │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Pattern 1:ETL Pipeline with Apache Airflow + Python

問題シナリオ

毎日3つのデータベース、2つのAPI、5つのCSVファイルからデータを抽出し、クレンジング後にデータウェアハウスに書き込む必要がある。手動スクリプトでは実行状態を追跡できず、上流の失敗が下流に伝わらず、リトライは完全に手動。

ソリューション

Apache Airflow 3.0を使用してETL DAGをオーケストレーション — タスク依存、自動リトライ、失敗アラートを実現。

# dags/etl_pipeline.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.sensors.python import PythonSensor

default_args = {
    "owner": "data-team",
    "depends_on_past": False,
    "email_on_failure": True,
    "email": ["data-team@company.com"],
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
}

with DAG(
    dag_id="etl_daily_pipeline",
    default_args=default_args,
    description="Daily ETL pipeline for AI data",
    schedule="0 2 * * *",
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=["etl", "production"],
) as dag:

    def extract_from_postgres(**context):
        import psycopg2
        conn = psycopg2.connect(
            host="postgres-host",
            database="source_db",
            user="reader",
            password="***",
        )
        cur = conn.cursor()
        cur.execute("SELECT * FROM user_events WHERE created_at >= CURRENT_DATE - INTERVAL '1 day'")
        rows = cur.fetchall()
        conn.close()
        context["ti"].xcom_push(key="postgres_rows", value=len(rows))
        return rows

    def extract_from_api(**context):
        import requests
        resp = requests.get(
            "https://api.example.com/v2/data",
            headers={"Authorization": "Bearer ***"},
            params={"date": context["ds"]},
        )
        resp.raise_for_status()
        data = resp.json()
        context["ti"].xcom_push(key="api_records", value=len(data))
        return data

    def extract_from_files(**context):
        import glob
        import pandas as pd
        files = glob.glob(f"/data/input/{context['ds']}/*.csv")
        frames = [pd.read_csv(f) for f in files]
        combined = pd.concat(frames, ignore_index=True) if frames else pd.DataFrame()
        context["ti"].xcom_push(key="file_count", value=len(files))
        return combined.to_dict("records")

    def transform_data(**context):
        import pandas as pd
        ti = context["ti"]
        pg_rows = ti.xcom_pull(task_ids="extract_postgres", key="return_value")
        api_data = ti.xcom_pull(task_ids="extract_api", key="return_value")
        file_data = ti.xcom_pull(task_ids="extract_files", key="return_value")

        df_pg = pd.DataFrame(pg_rows)
        df_api = pd.DataFrame(api_data)
        df_files = pd.DataFrame(file_data)

        df_all = pd.concat([df_pg, df_api, df_files], ignore_index=True)
        df_all = df_all.drop_duplicates(subset=["user_id", "event_id"])
        df_all = df_all.dropna(subset=["user_id", "event_type"])
        df_all["event_date"] = pd.to_datetime(df_all["created_at"]).dt.date

        context["ti"].xcom_push(key="total_records", value=len(df_all))
        return df_all.to_dict("records")

    def load_to_warehouse(**context):
        import psycopg2
        ti = context["ti"]
        records = ti.xcom_pull(task_ids="transform", key="return_value")
        conn = psycopg2.connect(
            host="warehouse-host",
            database="analytics_db",
            user="loader",
            password="***",
        )
        cur = conn.cursor()
        cur.execute("TRUNCATE TABLE staging.user_events_daily")
        insert_sql = """
            INSERT INTO staging.user_events_daily
            (user_id, event_id, event_type, event_date, created_at)
            VALUES (%s, %s, %s, %s, %s)
        """
        batch = [(r["user_id"], r["event_id"], r["event_type"],
                  str(r["event_date"]), r["created_at"]) for r in records[:1000]]
        cur.executemany(insert_sql, batch)
        conn.commit()
        conn.close()
        return len(batch)

    def data_quality_check(**context):
        ti = context["ti"]
        total = ti.xcom_pull(task_ids="transform", key="total_records")
        loaded = ti.xcom_pull(task_ids="load", key="return_value")
        if total and loaded and loaded / total < 0.95:
            raise ValueError(f"Data quality check failed: only {loaded}/{total} records loaded")
        return "PASSED"

    start = EmptyOperator(task_id="start")
    extract_pg = PythonOperator(task_id="extract_postgres", python_callable=extract_from_postgres)
    extract_api = PythonOperator(task_id="extract_api", python_callable=extract_from_api)
    extract_files = PythonOperator(task_id="extract_files", python_callable=extract_from_files)
    transform = PythonOperator(task_id="transform", python_callable=transform_data)
    load = PythonOperator(task_id="load", python_callable=load_to_warehouse)
    quality_check = PythonOperator(task_id="quality_check", python_callable=data_quality_check)
    end = EmptyOperator(task_id="end")

    start >> [extract_pg, extract_api, extract_files] >> transform >> load >> quality_check >> end
# Airflow起動
pip install apache-airflow==3.0.0
airflow db init
airflow dags list
airflow dags trigger etl_daily_pipeline

Pattern 2:Data Validation with Pydantic v2 + Great Expectations

問題シナリオ

上流システムが密かにフィールドタイプを変更 — ageがintからstringに、emailフォーマットの検証がなくなり、ダーティーデータがそのままモデル学習に流入し、精度が暴落してからようやく発覚。

ソリューション

Pydantic v2でスキーマ検証、Great Expectationsで統計ルール検証 — ダーティーデータに対する二重防御線。

# schemas.py
from pydantic import BaseModel, Field, EmailStr, field_validator
from datetime import datetime
from typing import Optional

class UserEventSchema(BaseModel):
    user_id: str = Field(..., min_length=1, max_length=64, description="ユーザー一意識別子")
    event_id: str = Field(..., pattern=r"^evt_[a-zA-Z0-9]{16}$", description="イベントID形式")
    event_type: str = Field(..., description="イベントタイプ")
    event_date: datetime = Field(..., description="イベント日時")
    age: int = Field(..., ge=0, le=150, description="ユーザー年齢")
    email: EmailStr = Field(..., description="ユーザーメール")
    amount: Optional[float] = Field(None, ge=0, description="金額")

    @field_validator("event_type")
    @classmethod
    def validate_event_type(cls, v):
        allowed = {"click", "purchase", "signup", "logout", "view"}
        if v not in allowed:
            raise ValueError(f"event_type must be one of {allowed}, got {v}")
        return v

    model_config = {"extra": "forbid"}

class BatchValidationResult(BaseModel):
    total: int
    valid: int
    invalid: int
    errors: list[dict]
# validation_pipeline.py
from schemas import UserEventSchema, BatchValidationResult
from great_expectations.dataset import PandasDataset
import pandas as pd
import json

def validate_with_pydantic(records: list[dict]) -> BatchValidationResult:
    valid_records = []
    errors = []
    for i, record in enumerate(records):
        try:
            UserEventSchema(**record)
            valid_records.append(record)
        except Exception as e:
            errors.append({"index": i, "record": record, "error": str(e)})
    return BatchValidationResult(
        total=len(records),
        valid=len(valid_records),
        invalid=len(errors),
        errors=errors,
    )

def validate_with_great_expectations(df: pd.DataFrame) -> dict:
    ge_df = PandasDataset(df)
    results = {}

    results["row_count"] = ge_df.expect_table_row_count_to_be_between(min_value=1, max_value=10_000_000)
    results["user_id_not_null"] = ge_df.expect_column_values_to_not_be_null("user_id")
    results["age_range"] = ge_df.expect_column_values_to_be_between("age", min_value=0, max_value=150)
    results["event_type_set"] = ge_df.expect_column_values_to_be_in_set(
        "event_type", ["click", "purchase", "signup", "logout", "view"]
    )
    results["email_format"] = ge_df.expect_column_values_to_match_regex("email", r"^[\w\.-]+@[\w\.-]+\.\w+$")
    results["amount_non_negative"] = ge_df.expect_column_values_to_be_between(
        "amount", min_value=0, max_value=None
    )

    passed = sum(1 for r in results.values() if r["success"])
    failed = len(results) - passed
    return {"total_expectations": len(results), "passed": passed, "failed": failed, "details": results}

def run_validation_pipeline(records: list[dict]) -> dict:
    pydantic_result = validate_with_pydantic(records)

    valid_df = pd.DataFrame([r for i, r in enumerate(records) if i not in {e["index"] for e in pydantic_result.errors}])
    ge_result = validate_with_great_expectations(valid_df) if not valid_df.empty else {"total_expectations": 0, "passed": 0, "failed": 0}

    return {
        "pydantic_validation": pydantic_result.model_dump(),
        "ge_validation": ge_result,
        "overall_pass": pydantic_result.invalid == 0 and ge_result.get("failed", 0) == 0,
    }

if __name__ == "__main__":
    test_data = [
        {"user_id": "u001", "event_id": "evt_abc123def456gh7", "event_type": "click",
         "event_date": "2026-06-16T10:00:00", "age": 28, "email": "user@test.com", "amount": 99.5},
        {"user_id": "", "event_id": "evt_xyz789abc012ij3", "event_type": "signup",
         "event_date": "2026-06-16T11:00:00", "age": 35, "email": "new@test.com", "amount": 0},
        {"user_id": "u003", "event_id": "evt_bad_format", "event_type": "hack",
         "event_date": "2026-06-16T12:00:00", "age": -5, "email": "not-an-email", "amount": -10},
    ]
    result = run_validation_pipeline(test_data)
    print(json.dumps(result, indent=2, default=str))

Pattern 3:Feature Engineering Pipeline with Pandas + Polars

問題シナリオ

特徴エンジニアリングスクリプトはPandasで書かれている。データ量が100万から5000万に増加すると、groupby+aggが30分経っても結果を返さない。Polarsへの移行は望ましいが、既存コードが多すぎて完全な書き直しは非現実的。

ソリューション

Pandasは小規模データの後方互換性を処理し、Polarsは大規模データの高速化を担当 — Arrow形式によるゼロコピー相互運用。

# feature_pipeline.py
import pandas as pd
import polars as pl
from datetime import datetime, timedelta
import numpy as np

def generate_sample_data(n: int = 1_000_000) -> pd.DataFrame:
    np.random.seed(42)
    return pd.DataFrame({
        "user_id": np.random.choice([f"u{i:04d}" for i in range(1000)], n),
        "event_type": np.random.choice(["click", "purchase", "view", "signup"], n),
        "amount": np.random.exponential(50, n).round(2),
        "event_time": pd.date_range("2026-01-01", periods=n, freq="30s"),
        "category": np.random.choice(["electronics", "clothing", "food", "books"], n),
    })

def pandas_features(df: pd.DataFrame) -> pd.DataFrame:
    user_agg = df.groupby("user_id").agg(
        total_events=("event_type", "count"),
        total_amount=("amount", "sum"),
        avg_amount=("amount", "mean"),
        unique_categories=("category", "nunique"),
        first_event=("event_time", "min"),
        last_event=("event_time", "max"),
    )
    user_agg["active_days"] = (user_agg["last_event"] - user_agg["first_event"]).dt.days + 1
    user_agg["avg_daily_events"] = user_agg["total_events"] / user_agg["active_days"].clip(lower=1)
    return user_agg.reset_index()

def polars_features(df: pd.DataFrame) -> pd.DataFrame:
    pl_df = pl.from_pandas(df)

    result = pl_df.group_by("user_id").agg([
        pl.col("event_type").count().alias("total_events"),
        pl.col("amount").sum().alias("total_amount"),
        pl.col("amount").mean().alias("avg_amount"),
        pl.col("category").n_unique().alias("unique_categories"),
        pl.col("event_time").min().alias("first_event"),
        pl.col("event_time").max().alias("last_event"),
    ]).with_columns([
        (pl.col("last_event") - pl.col("first_event")).dt.total_days().add(1).alias("active_days"),
    ]).with_columns([
        (pl.col("total_events") / pl.col("active_days").clip(lower=1)).alias("avg_daily_events"),
    ])

    return result.to_pandas()

def time_window_features(df: pd.DataFrame, windows: list[str] = None) -> pd.DataFrame:
    if windows is None:
        windows = ["7d", "30d", "90d"]
    pl_df = pl.from_pandas(df)
    reference_date = pl_df.select(pl.col("event_time").max()).item()

    all_features = []
    for window in windows:
        days = int(window.replace("d", ""))
        cutoff = reference_date - timedelta(days=days)

        window_feat = pl_df.filter(pl.col("event_time") >= cutoff).group_by("user_id").agg([
            pl.col("amount").sum().alias(f"amount_sum_{window}"),
            pl.col("amount").mean().alias(f"amount_mean_{window}"),
            pl.col("event_type").count().alias(f"event_count_{window}"),
            pl.col("category").n_unique().alias(f"category_nunique_{window}"),
        ])
        all_features.append(window_feat)

    from functools import reduce
    merged = reduce(lambda a, b: a.join(b, on="user_id", how="outer"), all_features)
    return merged.to_pandas()

def category_encoding_features(df: pd.DataFrame) -> pd.DataFrame:
    pl_df = pl.from_pandas(df)
    pivot = pl_df.group_by(["user_id", "category"]).agg([
        pl.col("amount").sum().alias("category_amount"),
        pl.col("event_type").count().alias("category_count"),
    ]).pivot(
        index="user_id",
        on="category",
        values=["category_amount", "category_count"],
    ).fill_null(0)
    return pivot.to_pandas()

def run_feature_pipeline():
    print("Generating sample data...")
    df = generate_sample_data(1_000_000)
    print(f"Data shape: {df.shape}")

    print("\n--- Pandas Feature Engineering ---")
    start = datetime.now()
    pandas_result = pandas_features(df)
    print(f"Pandas time: {(datetime.now() - start).total_seconds():.2f}s")
    print(f"Pandas result shape: {pandas_result.shape}")

    print("\n--- Polars Feature Engineering ---")
    start = datetime.now()
    polars_result = polars_features(df)
    print(f"Polars time: {(datetime.now() - start).total_seconds():.2f}s")
    print(f"Polars result shape: {polars_result.shape}")

    print("\n--- Time Window Features ---")
    window_result = time_window_features(df)
    print(f"Window features shape: {window_result.shape}")

    print("\n--- Category Encoding Features ---")
    category_result = category_encoding_features(df)
    print(f"Category features shape: {category_result.shape}")

if __name__ == "__main__":
    run_feature_pipeline()

Pattern 4:Vector Data Pipeline for RAG

問題シナリオ

RAGシステムの本番稼働後、ドキュメントは更新されたがベクトルが同期されておらず、検索結果が常に古い。チャンキング戦略が粗すぎ — 長いテーブルが切り詰められ、コードブロックが途中で分割され、Embedding品質が惨憺たるもの。

ソリューション

完全なベクトルデータパイプラインを構築:スマートチャンキング→Embedding→ベクトルストレージ→増分同期。

# vector_pipeline.py
import hashlib
import json
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class Document:
    doc_id: str
    content: str
    metadata: dict = field(default_factory=dict)
    source: str = ""
    updated_at: str = ""

@dataclass
class Chunk:
    chunk_id: str
    doc_id: str
    content: str
    embedding: Optional[list[float]] = None
    metadata: dict = field(default_factory=dict)

class SmartChunker:
    def __init__(self, chunk_size: int = 512, chunk_overlap: int = 64,
                 separators: list[str] = None):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.separators = separators or ["\n\n", "\n", "。", ".", " "]

    def chunk_document(self, doc: Document) -> list[Chunk]:
        text = doc.content
        chunks_text = self._recursive_split(text, self.separators)
        chunks = []
        for i, text in enumerate(chunks_text):
            if len(text.strip()) < 20:
                continue
            chunk_id = hashlib.md5(f"{doc.doc_id}:{i}:{text[:50]}".encode()).hexdigest()[:16]
            chunks.append(Chunk(
                chunk_id=chunk_id,
                doc_id=doc.doc_id,
                content=text,
                metadata={**doc.metadata, "chunk_index": i, "source": doc.source},
            ))
        return chunks

    def _recursive_split(self, text: str, separators: list[str]) -> list[str]:
        if not separators or len(text) <= self.chunk_size:
            return [text] if text.strip() else []

        sep = separators[0]
        parts = text.split(sep)

        result = []
        current = ""
        for part in parts:
            candidate = current + sep + part if current else part
            if len(candidate) <= self.chunk_size:
                current = candidate
            else:
                if current:
                    result.append(current)
                if len(part) > self.chunk_size:
                    sub_splits = self._recursive_split(part, separators[1:])
                    result.extend(sub_splits)
                    current = ""
                else:
                    current = part
        if current:
            result.append(current)
        return result

class EmbeddingService:
    def __init__(self, model_name: str = "text-embedding-3-small", api_key: str = ""):
        self.model_name = model_name
        self.api_key = api_key

    def embed_texts(self, texts: list[str]) -> list[list[float]]:
        try:
            from openai import OpenAI
            client = OpenAI(api_key=self.api_key)
            response = client.embeddings.create(input=texts, model=self.model_name)
            return [item.embedding for item in response.data]
        except ImportError:
            import numpy as np
            return [np.random.randn(1536).tolist() for _ in texts]

    def embed_chunks(self, chunks: list[Chunk], batch_size: int = 100) -> list[Chunk]:
        for i in range(0, len(chunks), batch_size):
            batch = chunks[i:i + batch_size]
            texts = [c.content for c in batch]
            embeddings = self.embed_texts(texts)
            for j, chunk in enumerate(batch):
                chunk.embedding = embeddings[j]
        return chunks

class VectorStore:
    def __init__(self, connection_string: str = ""):
        self.connection_string = connection_string
        self._store: dict[str, Chunk] = {}

    def upsert_chunks(self, chunks: list[Chunk]):
        for chunk in chunks:
            self._store[chunk.chunk_id] = chunk

    def search(self, query_embedding: list[float], top_k: int = 5) -> list[Chunk]:
        import numpy as np
        query_vec = np.array(query_embedding)
        scores = []
        for chunk in self._store.values():
            if chunk.embedding is None:
                continue
            chunk_vec = np.array(chunk.embedding)
            similarity = np.dot(query_vec, chunk_vec) / (np.linalg.norm(query_vec) * np.linalg.norm(chunk_vec) + 1e-8)
            scores.append((chunk, float(similarity)))
        scores.sort(key=lambda x: x[1], reverse=True)
        return [chunk for chunk, _ in scores[:top_k]]

    def delete_by_doc_id(self, doc_id: str):
        to_delete = [cid for cid, c in self._store.items() if c.doc_id == doc_id]
        for cid in to_delete:
            del self._store[cid]

class VectorPipeline:
    def __init__(self, chunk_size: int = 512, model_name: str = "text-embedding-3-small"):
        self.chunker = SmartChunker(chunk_size=chunk_size)
        self.embedder = EmbeddingService(model_name=model_name)
        self.store = VectorStore()

    def ingest_documents(self, documents: list[Document]) -> dict:
        total_chunks = 0
        for doc in documents:
            self.store.delete_by_doc_id(doc.doc_id)
            chunks = self.chunker.chunk_document(doc)
            chunks = self.embedder.embed_chunks(chunks)
            self.store.upsert_chunks(chunks)
            total_chunks += len(chunks)
        return {"documents": len(documents), "chunks": total_chunks}

    def query(self, question: str, top_k: int = 5) -> list[dict]:
        query_embedding = self.embedder.embed_texts([question])[0]
        results = self.store.search(query_embedding, top_k=top_k)
        return [{"chunk_id": r.chunk_id, "content": r.content[:200],
                 "metadata": r.metadata} for r in results]

if __name__ == "__main__":
    pipeline = VectorPipeline(chunk_size=256)

    docs = [
        Document(doc_id="doc_001", content="Pythonは広く使用されている高水準プログラミング言語です。"
                 "その設計哲学はコードの可読性を強調しています。"
                 "Pythonはオブジェクト指向、命令型、関数型、手続き型など、複数のプログラミングパラダイムをサポートしています。"
                 "Pythonはネットワーク通信、テキスト処理、データベースインターフェースなど、大規模な標準ライブラリを備えています。",
                 metadata={"source": "wiki", "category": "programming"},
                 source="wikipedia"),
        Document(doc_id="doc_002", content="Apache Airflowはオープンソースのワークフロー管理プラットフォームです。"
                 "ユーザーはプログラムでワークフローを作成、スケジュール、監視できます。"
                 "Airflowの中核概念はDAG(有向非巡回グラフ)で、タスク間の依存関係を定義します。"
                 "2026年にリリースされたAirflow 3.0はネイティブasyncサポートをもたらしました。",
                 metadata={"source": "docs", "category": "data-engineering"},
                 source="airflow_docs"),
    ]

    result = pipeline.ingest_documents(docs)
    print(f"Ingestion result: {result}")

    query_result = pipeline.query("Pythonプログラミング言語とは?", top_k=3)
    print(f"\nQuery results: {json.dumps(query_result, indent=2, ensure_ascii=False)}")

Pattern 5:Production Monitoring with Prometheus + Data Quality Alerts

問題シナリオ

データパイプラインがサイレントに失敗 — データ遅延3時間に気づかず。特徴値の分布ドリフトに誰も気づかず、モデル推論結果がますます不正確に。ベクトルストアの検索レイテンシP99が50msから5sに急増し、ユーザーからの苦情でようやく発覚。

ソリューション

Prometheusでメトリクスを収集、カスタムデータ品質メトリクスを定義、Grafanaで可視化、AlertManagerでアラート。

# monitoring.py
import time
import threading
from dataclasses import dataclass, field
from typing import Callable, Optional
from prometheus_client import Counter, Gauge, Histogram, start_http_server

pipeline_runs_total = Counter(
    "pipeline_runs_total",
    "Total pipeline runs",
    ["pipeline_name", "status"],
)

pipeline_duration_seconds = Histogram(
    "pipeline_duration_seconds",
    "Pipeline execution duration",
    ["pipeline_name"],
    buckets=[10, 30, 60, 120, 300, 600, 1800, 3600],
)

records_processed = Counter(
    "records_processed_total",
    "Total records processed",
    ["pipeline_name", "stage"],
)

data_quality_score = Gauge(
    "data_quality_score",
    "Data quality score (0-1)",
    ["pipeline_name", "check_name"],
)

feature_drift_score = Gauge(
    "feature_drift_score",
    "Feature drift score (PSI)",
    ["pipeline_name", "feature_name"],
)

vector_search_latency = Histogram(
    "vector_search_latency_seconds",
    "Vector search latency",
    ["pipeline_name"],
    buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0],
)

@dataclass
class QualityCheck:
    name: str
    check_fn: Callable[[dict], float]
    threshold: float = 0.8

@dataclass
class DriftAlert:
    feature_name: str
    psi_score: float
    threshold: float = 0.2
    is_drifted: bool = False

class PipelineMonitor:
    def __init__(self, pipeline_name: str, metrics_port: int = 9090):
        self.pipeline_name = pipeline_name
        self.metrics_port = metrics_port
        self._quality_checks: list[QualityCheck] = []
        self._drift_threshold = 0.2
        self._server_started = False

    def start_metrics_server(self):
        if not self._server_started:
            start_http_server(self.metrics_port)
            self._server_started = True
            print(f"Metrics server started on port {self.metrics_port}")

    def add_quality_check(self, name: str, check_fn: Callable, threshold: float = 0.8):
        self._quality_checks.append(QualityCheck(name=name, check_fn=check_fn, threshold=threshold))

    def record_pipeline_run(self, status: str, duration: float):
        pipeline_runs_total.labels(self.pipeline_name, status).inc()
        pipeline_duration_seconds.labels(self.pipeline_name).observe(duration)

    def record_records_processed(self, stage: str, count: int):
        records_processed.labels(self.pipeline_name, stage).inc(count)

    def run_quality_checks(self, data: dict) -> dict:
        results = {}
        for check in self._quality_checks:
            score = check.check_fn(data)
            data_quality_score.labels(self.pipeline_name, check.name).set(score)
            passed = score >= check.threshold
            results[check.name] = {"score": score, "threshold": check.threshold, "passed": passed}
            if not passed:
                print(f"ALERT: Quality check '{check.name}' failed: score={score:.3f} < threshold={check.threshold}")
        return results

    def check_feature_drift(self, reference_stats: dict, current_stats: dict) -> list[DriftAlert]:
        alerts = []
        for feature_name in reference_stats:
            if feature_name not in current_stats:
                continue
            psi = self._calculate_psi(reference_stats[feature_name], current_stats[feature_name])
            feature_drift_score.labels(self.pipeline_name, feature_name).set(psi)
            is_drifted = psi > self._drift_threshold
            alert = DriftAlert(
                feature_name=feature_name,
                psi_score=psi,
                threshold=self._drift_threshold,
                is_drifted=is_drifted,
            )
            alerts.append(alert)
            if is_drifted:
                print(f"ALERT: Feature drift detected for '{feature_name}': PSI={psi:.4f} > threshold={self._drift_threshold}")
        return alerts

    @staticmethod
    def _calculate_psi(expected: dict, actual: dict, bucket_count: int = 10) -> float:
        import numpy as np
        e_vals = np.array(list(expected.values())) if isinstance(expected, dict) else np.array(expected)
        a_vals = np.array(list(actual.values())) if isinstance(actual, dict) else np.array(actual)
        e_probs = e_vals / e_vals.sum() if e_vals.sum() > 0 else np.ones_like(e_vals) / len(e_vals)
        a_probs = a_vals / a_vals.sum() if a_vals.sum() > 0 else np.ones_like(a_vals) / len(a_vals)
        e_probs = np.clip(e_probs, 1e-6, None)
        a_probs = np.clip(a_probs, 1e-6, None)
        psi = float(np.sum((a_probs - e_probs) * np.log(a_probs / e_probs)))
        return psi

    def record_vector_search_latency(self, latency: float):
        vector_search_latency.labels(self.pipeline_name).observe(latency)

if __name__ == "__main__":
    monitor = PipelineMonitor("ai_data_pipeline", metrics_port=9090)
    monitor.start_metrics_server()

    monitor.add_quality_check(
        "null_rate",
        lambda d: 1.0 - d.get("null_count", 0) / max(d.get("total", 1), 1),
        threshold=0.95,
    )
    monitor.add_quality_check(
        "schema_compliance",
        lambda d: d.get("valid_count", 0) / max(d.get("total", 1), 1),
        threshold=0.99,
    )

    start_time = time.time()
    monitor.record_records_processed("extract", 50000)
    monitor.record_records_processed("transform", 48000)
    monitor.record_records_processed("load", 47500)
    duration = time.time() - start_time

    monitor.record_pipeline_run("success", duration)

    quality_data = {"null_count": 50, "total": 50000, "valid_count": 49800}
    quality_results = monitor.run_quality_checks(quality_data)
    print(f"Quality results: {quality_results}")

    ref_stats = {"age": {"bins": [100, 200, 300, 250, 150]}, "amount": {"bins": [50, 150, 300, 350, 150]}}
    cur_stats = {"age": {"bins": [80, 180, 350, 250, 140]}, "amount": {"bins": [50, 150, 300, 350, 150]}}
    drift_alerts = monitor.check_feature_drift(
        {k: v["bins"] for k, v in ref_stats.items()},
        {k: v["bins"] for k, v in cur_stats.items()},
    )
    print(f"Drift alerts: {[(a.feature_name, a.psi_score, a.is_drifted) for a in drift_alerts]}")

    print(f"\nMetrics available at http://localhost:9090/metrics")
    time.sleep(300)
# prometheus.yml
global:
  scrape_interval: 15s

scrape_configs:
  - job_name: "data_pipeline"
    static_configs:
      - targets: ["localhost:9090"]

rule_files:
  - "alerts.yml"
# alerts.yml
groups:
  - name: data_pipeline_alerts
    rules:
      - alert: PipelineRunFailed
        expr: increase(pipeline_runs_total{status="failed"}[5m]) > 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Pipeline {{ $labels.pipeline_name }} failed"

      - alert: DataQualityDegraded
        expr: data_quality_score < 0.8
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Data quality check {{ $labels.check_name }} below threshold"

      - alert: FeatureDriftDetected
        expr: feature_drift_score > 0.2
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Feature {{ $labels.feature_name }} drift detected (PSI={{ $value }})"

      - alert: VectorSearchLatencyHigh
        expr: histogram_quantile(0.99, rate(vector_search_latency_seconds_bucket[5m])) > 1.0
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Vector search P99 latency above 1s"

5 Common Pitfalls and Solutions

1. ETLスクリプトにデータベース接続情報をハードコード

問題:接続文字列がDAGコードに直接書かれており、環境変更のたびにコード修正が必要、パスワードがGitに漏洩。

ソリューション:Airflow ConnectionとEnvironment Variableを使用。

from airflow.hooks.base import BaseHook

def get_connection():
    conn = BaseHook.get_connection("postgres_source")
    return f"postgresql://{conn.login}:{conn.password}@{conn.host}:{conn.port}/{conn.schema}"

# または環境変数を使用
import os
db_url = os.environ["DATABASE_URL"]

2. データ検証がスキーマチェックのみで統計チェックなし

問題:スキーマ検証は通過するがデータ分布が異常(例:ageが全て999)。Pydanticは形式のみ検証し統計は検査しない。

ソリューション:Pydanticで形式検証 + Great Expectationsで統計分布検証 — 二重防御線。

# Pydantic: 形式検証
class UserSchema(BaseModel):
    age: int = Field(..., ge=0, le=150)

# Great Expectations: 統計検証
ge_df.expect_column_mean_to_be_between("age", min_value=18, max_value=65)
ge_df.expect_column_values_to_be_in_set("country", ["CN", "US", "JP", "UK"])

3. 特徴エンジニアリングでデータリークを考慮しない

問題:未来のデータで特徴を計算(例:全データの平均で正規化)、学習時は良好だが本番で崩壊。

ソリューション:訓練セットの統計量のみで変換、時窓特徴は履歴データのみ使用。

from sklearn.preprocessing import StandardScaler

train_stats = df[df["date"] < "2026-01-01"]["amount"].agg(["mean", "std"])
df["amount_scaled"] = (df["amount"] - train_stats["mean"]) / train_stats["std"]

4. ベクトルデータの増分更新なし

問題:毎回全量ベクトルインデックスを再構築 — 10万ドキュメントで2時間かかり、データが常に古い。

ソリューション:ドキュメントハッシュベースの増分更新 — 変更部分のみ処理。

def incremental_update(documents: list[Document], store: VectorStore) -> dict:
    updated = 0
    for doc in documents:
        content_hash = hashlib.md5(doc.content.encode()).hexdigest()
        existing = store.get_doc_metadata(doc.doc_id)
        if existing and existing.get("content_hash") == content_hash:
            continue
        store.delete_by_doc_id(doc.doc_id)
        chunks = pipeline.chunker.chunk_document(doc)
        chunks = pipeline.embedder.embed_chunks(chunks)
        for chunk in chunks:
            chunk.metadata["content_hash"] = content_hash
        store.upsert_chunks(chunks)
        updated += 1
    return {"checked": len(documents), "updated": updated}

5. モニタリングがパイプライン実行のみでデータ品質を見ない

問題:AirflowはDAG成功を表示するが、データ量が90%減少または特徴値が全てNULL — 本番モデルは既に壊れている。

ソリューション:DAGの末尾にデータ品質ゲートを追加 — 品質が閾値未満なら下流をブロック。

def quality_gate_check(**context):
    ti = context["ti"]
    loaded_count = ti.xcom_pull(task_ids="load", key="return_value")
    expected_min = context["params"]["expected_min_records"]

    if loaded_count < expected_min:
        raise ValueError(f"Quality gate failed: {loaded_count} < {expected_min}")

    quality_score = run_quality_checks()
    if quality_score < 0.9:
        raise ValueError(f"Quality gate failed: score={quality_score:.2f}")

    return "PASSED"

quality_gate = PythonOperator(
    task_id="quality_gate",
    python_callable=quality_gate_check,
    params={"expected_min_records": 10000},
)

10 Common Error Troubleshooting

# エラーメッセージ 原因 ソリューション
1 Airflow DAG import error Pythonパスまたは依存関係の欠落 PYTHONPATHを確認、requirements.txtが完全にインストールされているか確認
2 Pydantic ValidationError: field required 上流データフィールドの欠落または名称変更 Optionalデフォルト値を追加、model_config = {"extra": "allow"}で互換性を設定
3 Great Expectations expectation failed データ分布ドリフトまたはダーティーデータ混入 データソース変更を確認、expectation閾値を調整またはデータクレンジングステップを追加
4 Polars SchemaError: dtype mismatch Pandas→Polars変換時の型推論エラー スキーマを明示的に指定: pl.DataFrame(data, schema={"col": pl.Float64})
5 OpenAI API RateLimitError EmbeddingリクエストがAPIレート制限を超過 バッチ+リトライを実装: バッチ呼び出し、tenacity指数バックオフリトライを追加
6 numpy.linalg.LinAlgError in cosine similarity ベクトル次元の不一致またはゼロベクトル 正規化前にベクトルノルムをチェック: np.linalg.norm(v) > 1e-8
7 Prometheus duplicate label error 同じラベル組み合わせでメトリクスが重複登録 モジュールレベルのメトリクス定義を使用、関数内でのCounter/Gauge重複作成を回避
8 MemoryError on large dataset groupby Pandas全量集計でメモリ不足 Polarsに切り替えまたはチャンク処理: pd.read_csv(chunksize=100000)
9 PSI calculation returns NaN ビニング後の一部バケット頻度が0 スムージングを追加: probs = np.clip(probs, 1e-6, None)
10 Airflow task timeout 単一タスクの実行時間が制限を超過 execution_timeoutを増やす、または複数サブタスクに分割して並列実行

Advanced Optimization Techniques

1. 増分ETL:変更データのみ処理

大規模データでの全量ETLは持続不可能 — CDC(Change Data Capture)またはタイムスタンプマーカーで増分のみ処理。

def incremental_extract(last_run_time: str) -> pd.DataFrame:
    conn = get_connection()
    query = f"""
        SELECT * FROM user_events
        WHERE updated_at > '{last_run_time}'
        ORDER BY updated_at
    """
    return pd.read_sql(query, conn)

def get_last_run_time(pipeline_name: str) -> str:
    import redis
    r = redis.Redis()
    return r.get(f"pipeline:{pipeline_name}:last_run") or "2026-01-01T00:00:00"

def save_last_run_time(pipeline_name: str, run_time: str):
    import redis
    r = redis.Redis()
    r.set(f"pipeline:{pipeline_name}:last_run", run_time)

2. フィーチャーキャッシングとバージョン管理

特徴計算結果をRedisにキャッシュして再計算を回避、バージョン管理で学習と推論の同一特徴使用を確保。

import redis
import json
import hashlib

class FeatureCache:
    def __init__(self, redis_url: str = "redis://localhost:6379/2"):
        self.r = redis.from_url(redis_url)
        self.ttl = 86400

    def _cache_key(self, feature_name: str, entity_id: str, version: str) -> str:
        raw = f"{feature_name}:{entity_id}:{version}"
        return f"feat:{hashlib.md5(raw.encode()).hexdigest()}"

    def get(self, feature_name: str, entity_id: str, version: str = "v1") -> dict | None:
        key = self._cache_key(feature_name, entity_id, version)
        cached = self.r.get(key)
        return json.loads(cached) if cached else None

    def set(self, feature_name: str, entity_id: str, value: dict, version: str = "v1"):
        key = self._cache_key(feature_name, entity_id, version)
        self.r.setex(key, self.ttl, json.dumps(value, default=str))

3. Embeddingバッチ並列高速化

単一Embedding呼び出しは遅すぎる — バッチ+非同期並列でスループットを劇的に改善。

import asyncio
from openai import AsyncOpenAI

async def async_embed_batch(texts: list[str], batch_size: int = 2048,
                            model: str = "text-embedding-3-small") -> list[list[float]]:
    client = AsyncOpenAI()
    all_embeddings = []
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i + batch_size]
        response = await client.embeddings.create(input=batch, model=model)
        all_embeddings.extend([item.embedding for item in response.data])
    return all_embeddings

async def parallel_embed(documents: list[str], concurrency: int = 5) -> list[list[float]]:
    semaphore = asyncio.Semaphore(concurrency)

    async def limited_embed(batch):
        async with semaphore:
            return await async_embed_batch(batch)

    tasks = [limited_embed(documents[i:i+100]) for i in range(0, len(documents), 100)]
    results = await asyncio.gather(*tasks)
    return [emb for batch in results for emb in batch]

4. ベクトルインデックス最適化:HNSWパラメータチューニング

PGVectorのHNSWインデックスパラメータは検索精度と速度に直接影響。

-- HNSWインデックスを作成、パラメータチューニング
CREATE INDEX CONCURRENTLY idx_documents_embedding_hnsw
ON documents USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 200);

-- クエリ時にef_searchを設定
SET hnsw.ef_search = 100;

-- 大量挿入後にインデックスを再構築
REINDEX INDEX CONCURRENTLY idx_documents_embedding_hnsw;

5. データ品質SLA自動化

データ品質チェック結果をSLAテーブルに書き込み、SLA達成率を自動計算。

from datetime import datetime
import psycopg2

def record_sla(pipeline_name: str, check_name: str, score: float,
               threshold: float, passed: bool):
    conn = psycopg2.connect(dsn=os.environ["WAREHOUSE_URL"])
    cur = conn.cursor()
    cur.execute("""
        INSERT INTO data_quality_sla
        (pipeline_name, check_name, score, threshold, passed, checked_at)
        VALUES (%s, %s, %s, %s, %s, %s)
    """, (pipeline_name, check_name, score, threshold, passed, datetime.utcnow()))
    conn.commit()
    conn.close()

def get_sla_rate(pipeline_name: str, days: int = 30) -> float:
    conn = psycopg2.connect(dsn=os.environ["WAREHOUSE_URL"])
    cur = conn.cursor()
    cur.execute("""
        SELECT COUNT(*) FILTER (WHERE passed) * 100.0 / COUNT(*)
        FROM data_quality_sla
        WHERE pipeline_name = %s AND checked_at >= NOW() - INTERVAL '%s days'
    """, (pipeline_name, days))
    rate = cur.fetchone()[0]
    conn.close()
    return float(rate) if rate else 0.0

Comparison Analysis

次元 Apache Airflow Prefect Dagster Luigi Mage
オーケストレーション DAG定義 関数型+デコレータ アセット定義 タスク依存 Notebook型
データ検証 手動統合 内蔵 ネイティブType なし 内蔵
フィーチャー管理 手動統合 手動統合 ネイティブAsset なし 手動統合
ベクトルパイプライン 手動統合 手動統合 手動統合 なし 手動統合
モニタリング 基本UI ネイティブCloud UI ネイティブDagit 基本 ネイティブUI
学習曲線 中高
コミュニティ 最も成熟 急成長 成長中 衰退 新興
Pythonネイティブ はい はい はい はい はい
2026推奨度 ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐ ⭐⭐⭐⭐ ⭐⭐ ⭐⭐⭐
適用シナリオ エンタープライズETL 中小プロジェクト迅速開始 データアセット集約型 シンプルバッチ処理 データ分析チーム
データ検証 Pydantic v2 Great Expectations Pandera TFX Data Validation
検証速度 非常に高速(Rustコア) 中程度 高速 中程度
スキーマ定義 Pythonクラス YAML/Code Pythonクラス Protobuf
統計検証 非対応 強力 中程度 強力
カスタムルール 柔軟 柔軟 中程度 中程度
データプロファイリング 非対応 強力 非対応 強力
適用シナリオ フォーマット検証 包括的データ品質 軽量スキーマ TensorFlowエコシステム

  • JSONフォーマッター — Airflow DAG設定やパイプラインデータのデバッグ時にJSONをフォーマット
  • Base64エンコード — ベクトルデータ転送やAPI認証のエンコーディング処理
  • cURL to Code — APIデバッグのcURLコマンドをETL抽出用のPythonコードに変換

Summary

2026年のPython AIデータパイプラインは成熟した方法論を形成しています:Airflowオーケストレーションは骨格 — DAGがタスク依存を定義し、自動リトライとアラートで信頼性を確保;データ検証は免疫システム — Pydanticがフォーマットエラーを、Great Expectationsが統計異常をインターセプト;特徴エンジニアリングはコアバリュー — Pandasで後方互換性、Polarsで大規模データ高速化;ベクトルパイプラインはAIインフラ — スマートチャンキング+増分更新でRAGデータの鮮度を確保;モニタリングは本番環境の目 — Prometheusがメトリクスを収集し、データ品質SLAが定量的な保証を提供。覚えておいてください:データ品質保証のないパイプラインは、速く走れば走るほど遠くへ間違えます。


Further Reading

ブラウザローカルツールを無料で試す →

#AI数据流水线#数据预处理#特征工程#Apache Airflow#Python#2026#AI与大数据