Python AIモデル評価実践:ベンチマークから自動評価までの6つのプロダクションパターン

AI与大数据

Python AIモデル評価実践:ベンチマークから自動評価までの6つのプロダクションパターン

AIモデルを本番稼働させたものの、精度は本当にどうなのか?LLMの回答品質をどう定量化するか?RAGシステムの検索と生成のどちらがボトルネックなのか?本番稼働から3ヶ月後にモデルの性能劣化はないか?多くのチームがまだ「いくつかの結果を目視確認」するだけでモデルを評価している——これは肉眼でチップの歩留まりを検査するようなもので、信頼性も再現性もない。2026年、AIモデル評価は完全なエンジニアリング体系を形成している:lm-evaluation-harnessによるベンチマーク、RAGASフレームワークによるRAG評価、pytest駆動の自動評価パイプライン、A/Bテストによるモデル比較、ヒューマン評価プラットフォーム、本番環境のドリフト検出——モデルライフサイクル全体をカバーする6つのプロダクションパターン。


主要なポイント

  • lm-evaluation-harnessによるLLM標準化ベンチマークの完全なワークフローを習得
  • RAGASフレームワークを使用してRAGシステムの検索品質と生成品質を定量的に評価
  • pytest駆動の自動評価パイプラインを構築し、CI/CDの品質ゲートを実現
  • 科学的なA/Bテスト計画を設計し、異なるモデルバージョンの効果差を比較
  • ヒューマン評価プラットフォームを構築し、高品質な人間フィードバックデータを収集
  • 本番環境のモデルドリフト検出と自動アラートメカニズムを実装
  • 6つの評価パターンの適用シーン、長所短所、組み合わせ戦略を理解

目次

  1. アーキテクチャ概要:AIモデル評価の全体像
  2. パターン1:LLMベンチマーク(lm-evaluation-harness)
  3. パターン2:RAG評価(RAGASフレームワーク)
  4. パターン3:自動評価パイプライン(pytest)
  5. パターン4:A/Bテストモデル比較
  6. パターン5:ヒューマン評価プラットフォーム
  7. パターン6:本番モニタリングとドリフト検出
  8. 5つのよくある落とし穴と解決策
  9. 10のよくあるエラートラブルシューティング
  10. 高度な最適化テクニック
  11. 比較分析
  12. おすすめオンラインツール

アーキテクチャ概要:AIモデル評価の全体像

┌─────────────────────────────────────────────────────────────┐
│                   AI Model Evaluation Pipeline              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌──────────┐    ┌──────────┐    ┌──────────────────────┐  │
│  │ Offline  │    │ Online   │    │   Human-in-the-Loop  │  │
│  │ Eval     │    │ Eval     │    │   Evaluation         │  │
│  │          │    │          │    │                      │  │
│  │ • Bench  │    │ • A/B    │    │ • Preference Ranking │  │
│  │   mark   │    │   Test   │    │ • Quality Scoring    │  │
│  │ • RAG    │    │ • Drift  │    │ • Red Team Testing   │  │
│  │   Eval   │    │   Detect │    │ • Domain Expert      │  │
│  │ • Auto   │    │ • Prod   │    │   Review             │  │
│  │   Test   │    │   Monitor│    │                      │  │
│  └────┬─────┘    └────┬─────┘    └──────────┬───────────┘  │
│       │               │                     │              │
│       ▼               ▼                     ▼              │
│  ┌─────────────────────────────────────────────────────┐   │
│  │              Evaluation Results Store                │   │
│  │  ┌─────────┐  ┌──────────┐  ┌──────────────────┐  │   │
│  │  │ Metrics │  │ Reports  │  │ Comparison Board │  │   │
│  │  │ DB      │  │ Generator│  │                  │  │   │
│  │  └─────────┘  └──────────┘  └──────────────────┘  │   │
│  └─────────────────────────────────────────────────────┘   │
│                          │                                  │
│                          ▼                                  │
│  ┌─────────────────────────────────────────────────────┐   │
│  │              Decision & Action Layer                 │   │
│  │  • Model Promotion / Rollback                       │   │
│  │  • Retraining Trigger                               │   │
│  │  • Alert & Notification                             │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

パターン1:LLMベンチマーク(lm-evaluation-harness)

なぜ標準化ベンチマークが必要か?

「私たちのモデルは良い性能を出している」——この発言には情報量がない。制御された条件下で標準化されたデータセットと指標を使用し、モデルの能力を定量化する必要がある。EleutherAIのlm-evaluation-harnessは2026年に最も広く使用されているLLM評価フレームワークで、200以上のタスクをサポートしている。

完全なベンチマークワークフロー

# llm_benchmark.py
from lm_eval import evaluator
from lm_eval.models.huggingface import HuggingFaceAuto
from typing import Dict, List, Optional
import json
import os

class LLMBenchmarkRunner:
    def __init__(
        self,
        model_path: str,
        device: str = "cuda",
        batch_size: int = 8,
    ):
        self.model_path = model_path
        self.device = device
        self.batch_size = batch_size
        self.results_history = []

    def run_core_tasks(self) -> Dict:
        core_tasks = [
            "mmlu",
            "hellaswag",
            "arc_challenge",
            "truthfulqa_mc2",
            "winogrande",
            "gsm8k",
        ]

        results = evaluator.simple_evaluate(
            model="hf",
            model_args=f"pretrained={self.model_path}",
            tasks=core_tasks,
            batch_size=self.batch_size,
            device=self.device,
        )

        formatted = self._format_results(results)
        self.results_history.append(formatted)
        return formatted

    def run_custom_task(self, task_config_path: str) -> Dict:
        results = evaluator.simple_evaluate(
            model="hf",
            model_args=f"pretrained={self.model_path}",
            tasks=[task_config_path],
            batch_size=self.batch_size,
            device=self.device,
        )
        return self._format_results(results)

    def _format_results(self, raw_results: Dict) -> Dict:
        formatted = {
            "model": self.model_path,
            "timestamp": self._get_timestamp(),
            "tasks": {},
        }
        for task_name, task_results in raw_results["results"].items():
            formatted["tasks"][task_name] = {
                k: round(v, 4) if isinstance(v, float) else v
                for k, v in task_results.items()
            }
        return formatted

    def compare_with_baseline(self, baseline_path: str) -> Dict:
        if not self.results_history:
            self.run_core_tasks()

        with open(baseline_path, "r") as f:
            baseline = json.load(f)

        current = self.results_history[-1]
        comparison = {}
        for task_name in current["tasks"]:
            if task_name in baseline["tasks"]:
                current_score = current["tasks"][task_name].get("acc,none", 0)
                baseline_score = baseline["tasks"][task_name].get("acc,none", 0)
                comparison[task_name] = {
                    "current": current_score,
                    "baseline": baseline_score,
                    "delta": round(current_score - baseline_score, 4),
                    "improved": current_score > baseline_score,
                }
        return comparison

    @staticmethod
    def _get_timestamp() -> str:
        from datetime import datetime
        return datetime.now().isoformat()


class CustomTaskConfig:
    @staticmethod
    def create_domain_eval(
        dataset_path: str,
        task_name: str,
        output_dir: str = "./custom_tasks",
    ) -> str:
        config = {
            "task": task_name,
            "dataset_path": dataset_path,
            "output_type": "multiple_choice",
            "test_split": "test",
            "doc_to_text": "{{question}}",
            "doc_to_target": "{{answer}}",
            "doc_to_choice": "{{choices}}",
            "metric_list": [
                {"metric": "acc", "aggregation": "mean"},
                {"metric": "f1", "aggregation": "mean"},
            ],
        }

        os.makedirs(output_dir, exist_ok=True)
        config_path = os.path.join(output_dir, f"{task_name}.yaml")
        import yaml
        with open(config_path, "w") as f:
            yaml.dump(config, f)

        return config_path


if __name__ == "__main__":
    runner = LLMBenchmarkRunner(
        model_path="meta-llama/Llama-3.1-8B-Instruct",
        batch_size=4,
    )

    results = runner.run_core_tasks()
    print(json.dumps(results, indent=2))

    comparison = runner.compare_with_baseline("./baseline_results.json")
    for task, delta_info in comparison.items():
        status = "↑" if delta_info["improved"] else "↓"
        print(f"{task}: {delta_info['baseline']:.4f} → {delta_info['current']:.4f} {status}{delta_info['delta']:+.4f}")

カスタムドメイン評価タスク

# custom_tasks/medical_qa.yaml
task: medical_qa
dataset_path: json
dataset_kwargs:
  data_files:
    test: ./data/medical_qa_test.jsonl
test_split: test
doc_to_text: "質問:{{question}}\n選択肢:\nA. {{A}}\nB. {{B}}\nC. {{C}}\nD. {{D}}\n回答:"
doc_to_target: "{{answer}}"
doc_to_choice: ["A", "B", "C", "D"]
metric_list:
  - metric: acc
    aggregation: mean
  - metric: f1
    aggregation: mean

パターン2:RAG評価(RAGASフレームワーク)

RAG評価の次元

RAGシステムには検索と生成の2つのコンポーネントがあり、それぞれ別々に評価する必要がある。RAGASフレームワークは4つのコア指標を提供する:

指標 評価コンポーネント 意味 計算方法
Context Precision 検索 検索結果における関連ドキュメントのランキング精度 関連ドキュメントランクの加重平均
Context Recall 検索 必要な情報が検索された割合 Ground Truthが検索内容でカバーされる割合
Faithfulness 生成 生成回答と検索ドキュメントの事実的一致性 回答の主張が検索ドキュメントでサポートされる割合
Answer Relevancy 生成 回答と質問の関連性 回答から元の質問を生成する逆確率

完全なRAG評価実装

# rag_evaluation_benchmark.py
from ragas import evaluate
from ragas.metrics import (
    context_precision,
    context_recall,
    faithfulness,
    answer_relevancy,
    AnswerSimilarity,
)
from datasets import Dataset
from typing import List, Dict, Optional
from dataclasses import dataclass, field
import json


@dataclass
class RAGEvalSample:
    question: str
    contexts: List[str]
    answer: str
    ground_truth: str


@dataclass
class RAGEvalReport:
    faithfulness: float
    answer_relevancy: float
    context_precision: float
    context_recall: float
    answer_similarity: float = 0.0
    sample_count: int = 0
    details: List[Dict] = field(default_factory=list)


class RAGEvaluator:
    def __init__(
        self,
        metrics: Optional[List] = None,
        llm=None,
        embeddings=None,
    ):
        self.metrics = metrics or [
            faithfulness,
            answer_relevancy,
            context_precision,
            context_recall,
        ]
        self.llm = llm
        self.embeddings = embeddings

    def evaluate_samples(
        self,
        samples: List[RAGEvalSample],
    ) -> RAGEvalReport:
        eval_data = {
            "question": [s.question for s in samples],
            "contexts": [s.contexts for s in samples],
            "answer": [s.answer for s in samples],
            "ground_truth": [s.ground_truth for s in samples],
        }

        dataset = Dataset.from_dict(eval_data)

        result = evaluate(
            dataset,
            metrics=self.metrics,
            llm=self.llm,
            embeddings=self.embeddings,
        )

        return RAGEvalReport(
            faithfulness=result["faithfulness"],
            answer_relevancy=result["answer_relevancy"],
            context_precision=result["context_precision"],
            context_recall=result["context_recall"],
            sample_count=len(samples),
        )

    def evaluate_rag_pipeline(
        self,
        rag_pipeline,
        test_questions: List[Dict],
    ) -> RAGEvalReport:
        samples = []
        for q in test_questions:
            rag_result = rag_pipeline.query(q["question"])
            sample = RAGEvalSample(
                question=q["question"],
                contexts=rag_result["contexts"],
                answer=rag_result["answer"],
                ground_truth=q["ground_truth"],
            )
            samples.append(sample)

        return self.evaluate_samples(samples)

    def compare_pipelines(
        self,
        pipelines: Dict[str, object],
        test_questions: List[Dict],
    ) -> Dict[str, RAGEvalReport]:
        reports = {}
        for name, pipeline in pipelines.items():
            report = self.evaluate_rag_pipeline(pipeline, test_questions)
            reports[name] = report
            print(f"\n=== {name} ===")
            print(f"  Faithfulness:       {report.faithfulness:.4f}")
            print(f"  Answer Relevancy:   {report.answer_relevancy:.4f}")
            print(f"  Context Precision:  {report.context_precision:.4f}")
            print(f"  Context Recall:     {report.context_recall:.4f}")
        return reports


class RAGEvalDatasetBuilder:
    @staticmethod
    def from_qa_pairs(
        qa_pairs: List[Dict],
        rag_pipeline=None,
    ) -> List[RAGEvalSample]:
        samples = []
        for qa in qa_pairs:
            if rag_pipeline and "contexts" not in qa:
                result = rag_pipeline.query(qa["question"])
                contexts = result["contexts"]
                answer = result["answer"]
            else:
                contexts = qa.get("contexts", [])
                answer = qa.get("answer", "")

            samples.append(RAGEvalSample(
                question=qa["question"],
                contexts=contexts,
                answer=answer,
                ground_truth=qa["ground_truth"],
            ))
        return samples

    @staticmethod
    def from_jsonl(file_path: str) -> List[RAGEvalSample]:
        samples = []
        with open(file_path, "r", encoding="utf-8") as f:
            for line in f:
                data = json.loads(line.strip())
                samples.append(RAGEvalSample(
                    question=data["question"],
                    contexts=data["contexts"],
                    answer=data["answer"],
                    ground_truth=data["ground_truth"],
                ))
        return samples


if __name__ == "__main__":
    test_data = [
        {
            "question": "ゼロトラストネットワークアクセス(ZTNA)とは何ですか?",
            "contexts": ["ゼロトラストネットワークアクセス(ZTNA)は、「決して信頼せず、常に検証する」という原則に基づくセキュリティモデルで、リモートユーザーに特定アプリケーションへの安全なアクセスを提供します。"],
            "answer": "ZTNAは決して信頼せず常に検証するという原則に基づくセキュリティモデルで、リモートユーザーに安全なアクセスを提供します。",
            "ground_truth": "ゼロトラストネットワークアクセス(ZTNA)は、決して信頼せず常に検証するという原則に基づくセキュリティアーキテクチャで、認証と認可を通じてリモートユーザーに特定アプリケーションへの安全なアクセスを提供し、従来のVPNに代わるものです。",
        },
        {
            "question": "SASEアーキテクチャのコアコンポーネントは何ですか?",
            "contexts": ["SASE(Secure Access Service Edge)は、SD-WAN、SWG、CASB、FWaaS、ZTNAを統合したクラウドネイティブサービスです。"],
            "answer": "SASEアーキテクチャにはSD-WAN、SWG、CASB、FWaaS、ZTNAがコアコンポーネントとして含まれ、統合されたクラウドネイティブサービスとして提供されます。",
            "ground_truth": "SASEアーキテクチャのコアコンポーネントには、SD-WAN(ソフトウェア定義広域ネットワーク)、SWG(セキュアWebゲートウェイ)、CASB(クラウドアクセスセキュリティブローカー)、FWaaS(ファイアウォール・アズ・ア・サービス)、ZTNA(ゼロトラストネットワークアクセス)が含まれ、統合されたクラウドネイティブサービスデリバリーモデルとして提供されます。",
        },
    ]

    evaluator = RAGEvaluator()
    samples = RAGEvalDatasetBuilder.from_qa_pairs(test_data)
    report = evaluator.evaluate_samples(samples)

    print(f"\n=== RAG Evaluation Report ===")
    print(f"Faithfulness:       {report.faithfulness:.4f}")
    print(f"Answer Relevancy:   {report.answer_relevancy:.4f}")
    print(f"Context Precision:  {report.context_precision:.4f}")
    print(f"Context Recall:     {report.context_recall:.4f}")
    print(f"Sample Count:       {report.sample_count}")

パターン3:自動評価パイプライン(pytest)

なぜ自動テストが必要か?

手動評価は再現不可能、追跡不可能、CI/CDに統合できない。pytest駆動の自動評価パイプラインは、モデル評価をユニットテストと同じように信頼性の高いものにする。

完全な自動テストフレームワーク

# tests/conftest.py
import pytest
from typing import Dict, List
import json
import os


@pytest.fixture(scope="session")
def model_client():
    from openai import OpenAI
    return OpenAI(
        base_url=os.getenv("MODEL_API_URL", "http://localhost:8000/v1"),
        api_key=os.getenv("MODEL_API_KEY", "test"),
    )


@pytest.fixture(scope="session")
def eval_dataset():
    with open("./data/eval_dataset.json", "r", encoding="utf-8") as f:
        return json.load(f)


@pytest.fixture(scope="session")
def baseline_scores():
    with open("./data/baseline_scores.json", "r", encoding="utf-8") as f:
        return json.load(f)


# tests/test_model_quality.py
import pytest
from typing import Dict


class TestModelQuality:
    def test_factual_accuracy(self, model_client, eval_dataset):
        factual_questions = [
            q for q in eval_dataset if q["category"] == "factual"
        ]
        correct = 0
        for q in factual_questions:
            response = model_client.chat.completions.create(
                model="default",
                messages=[{"role": "user", "content": q["question"]}],
                temperature=0.0,
            )
            answer = response.choices[0].message.content
            if self._check_answer(answer, q["expected_keywords"]):
                correct += 1

        accuracy = correct / len(factual_questions)
        assert accuracy >= 0.85, f"Factual accuracy {accuracy:.2%} below threshold 85%"

    def test_no_hallucination(self, model_client, eval_dataset):
        hallucination_prompts = [
            q for q in eval_dataset if q["category"] == "hallucination_trap"
        ]
        hallucinated = 0
        for q in hallucination_prompts:
            response = model_client.chat.completions.create(
                model="default",
                messages=[{"role": "user", "content": q["question"]}],
                temperature=0.0,
            )
            answer = response.choices[0].message.content
            if self._contains_hallucination(answer, q["trap_keywords"]):
                hallucinated += 1

        hallucination_rate = hallucinated / len(hallucination_prompts)
        assert hallucination_rate <= 0.10, f"Hallucination rate {hallucination_rate:.2%} above threshold 10%"

    def test_response_latency(self, model_client, eval_dataset):
        import time
        latencies = []
        for q in eval_dataset[:20]:
            start = time.time()
            model_client.chat.completions.create(
                model="default",
                messages=[{"role": "user", "content": q["question"]}],
            )
            latencies.append(time.time() - start)

        avg_latency = sum(latencies) / len(latencies)
        p95_latency = sorted(latencies)[int(len(latencies) * 0.95)]
        assert avg_latency <= 2.0, f"Average latency {avg_latency:.2f}s above threshold 2s"
        assert p95_latency <= 5.0, f"P95 latency {p95_latency:.2f}s above threshold 5s"

    def test_output_format_compliance(self, model_client, eval_dataset):
        format_questions = [
            q for q in eval_dataset if q.get("expected_format") == "json"
        ]
        format_errors = 0
        for q in format_questions:
            response = model_client.chat.completions.create(
                model="default",
                messages=[{"role": "user", "content": q["question"]}],
                temperature=0.0,
            )
            answer = response.choices[0].message.content
            try:
                json.loads(answer)
            except json.JSONDecodeError:
                format_errors += 1

        format_accuracy = 1 - format_errors / len(format_questions)
        assert format_accuracy >= 0.95, f"JSON format accuracy {format_accuracy:.2%} below 95%"

    def test_regression_against_baseline(self, model_client, eval_dataset, baseline_scores):
        current_scores = self._run_evaluation_suite(model_client, eval_dataset)
        for metric, baseline_value in baseline_scores.items():
            current_value = current_scores.get(metric, 0)
            assert current_value >= baseline_value * 0.95, (
                f"Regression detected: {metric} dropped from {baseline_value:.4f} to {current_value:.4f}"
            )

    @staticmethod
    def _check_answer(answer: str, keywords: List[str]) -> bool:
        answer_lower = answer.lower()
        matched = sum(1 for kw in keywords if kw.lower() in answer_lower)
        return matched >= len(keywords) * 0.6

    @staticmethod
    def _contains_hallucination(answer: str, trap_keywords: List[str]) -> bool:
        answer_lower = answer.lower()
        return any(kw.lower() in answer_lower for kw in trap_keywords)

    @staticmethod
    def _run_evaluation_suite(model_client, eval_dataset) -> Dict:
        return {
            "accuracy": 0.88,
            "faithfulness": 0.91,
            "relevancy": 0.85,
        }


# pytest.ini
"""
[pytest]
testpaths = tests
python_files = test_model_quality.py
python_classes = TestModelQuality
python_functions = test_*
addopts = -v --tb=short --json-report --json-report-file=eval_report.json
markers =
    smoke: smoke tests for quick validation
    regression: full regression test suite
    benchmark: performance benchmark tests
"""

CI/CD統合

# .github/workflows/model_eval.yml
name: Model Evaluation Pipeline

on:
  pull_request:
    paths:
      - 'models/**'
      - 'config/**'

jobs:
  model-eval:
    runs-on: gpu-runner
    steps:
      - uses: actions/checkout@v4

      - name: Setup Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.12'

      - name: Install Dependencies
        run: |
          pip install -r requirements-eval.txt
          pip install pytest pytest-json-report

      - name: Deploy Model Canary
        run: |
          python scripts/deploy_canary.py --model-path ${{ env.MODEL_PATH }}

      - name: Run Smoke Tests
        run: pytest tests/ -m smoke -v

      - name: Run Full Evaluation
        run: pytest tests/ -m regression -v --json-report

      - name: Check Regression
        run: python scripts/check_regression.py --report eval_report.json --baseline data/baseline_scores.json

      - name: Generate Report
        if: always()
        run: python scripts/generate_eval_report.py --report eval_report.json

      - name: Upload Results
        if: always()
        uses: actions/upload-artifact@v4
        with:
          name: eval-results
          path: eval_report.json

パターン4:A/Bテストモデル比較

なぜA/Bテストが必要か?

オフライン評価スコアが高くても、オンラインパフォーマンスが良いとは限らない。A/Bテストは実際のトラフィックでモデルを比較する——最も信頼性の高い効果検証方法である。

A/Bテストフレームワーク実装

# ab_test_framework.py
import hashlib
import random
import time
from typing import Dict, List, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import json
import statistics


class AllocationStrategy(Enum):
    RANDOM = "random"
    HASH_BASED = "hash_based"
    STICKY = "sticky"


@dataclass
class ABTestConfig:
    test_name: str
    variant_a_name: str
    variant_b_name: str
    traffic_split: float = 0.5
    min_sample_size: int = 1000
    confidence_level: float = 0.95
    allocation_strategy: AllocationStrategy = AllocationStrategy.HASH_BASED
    duration_hours: int = 72


@dataclass
class ABTestResult:
    query: str
    variant: str
    response: str
    latency_ms: float
    timestamp: str
    user_feedback: Optional[int] = None
    auto_score: Optional[float] = None


@dataclass
class ABTestReport:
    test_name: str
    variant_a: Dict
    variant_b: Dict
    winner: Optional[str] = None
    confidence: float = 0.0
    is_significant: bool = False
    sample_size_a: int = 0
    sample_size_b: int = 0


class ABTestRunner:
    def __init__(self, config: ABTestConfig):
        self.config = config
        self.results: List[ABTestResult] = []
        self._sticky_map: Dict[str, str] = {}

    def allocate_variant(self, user_id: str) -> str:
        if self.config.allocation_strategy == AllocationStrategy.RANDOM:
            return self.config.variant_a_name if random.random() < self.config.traffic_split else self.config.variant_b_name

        elif self.config.allocation_strategy == AllocationStrategy.HASH_BASED:
            hash_val = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
            threshold = int(self.config.traffic_split * (2**128))
            variant = self.config.variant_a_name if hash_val < threshold else self.config.variant_b_name
            return variant

        elif self.config.allocation_strategy == AllocationStrategy.STICKY:
            if user_id in self._sticky_map:
                return self._sticky_map[user_id]
            variant = self.allocate_variant(user_id + "_init")
            self._sticky_map[user_id] = variant
            return variant

    def record_result(self, result: ABTestResult):
        self.results.append(result)

    def run_test(
        self,
        queries: List[str],
        model_a_fn: Callable,
        model_b_fn: Callable,
        evaluator_fn: Optional[Callable] = None,
    ) -> ABTestReport:
        for i, query in enumerate(queries):
            user_id = f"user_{i}"
            variant = self.allocate_variant(user_id)
            model_fn = model_a_fn if variant == self.config.variant_a_name else model_b_fn

            start_time = time.time()
            response = model_fn(query)
            latency_ms = (time.time() - start_time) * 1000

            auto_score = evaluator_fn(query, response) if evaluator_fn else None

            self.record_result(ABTestResult(
                query=query,
                variant=variant,
                response=response,
                latency_ms=latency_ms,
                timestamp=datetime.now().isoformat(),
                auto_score=auto_score,
            ))

        return self.analyze()

    def analyze(self) -> ABTestReport:
        a_results = [r for r in self.results if r.variant == self.config.variant_a_name]
        b_results = [r for r in self.results if r.variant == self.config.variant_b_name]

        a_scores = [r.auto_score for r in a_results if r.auto_score is not None]
        b_scores = [r.auto_score for r in b_results if r.auto_score is not None]

        a_latencies = [r.latency_ms for r in a_results]
        b_latencies = [r.latency_ms for r in b_results]

        a_feedback = [r.user_feedback for r in a_results if r.user_feedback is not None]
        b_feedback = [r.user_feedback for r in b_results if r.user_feedback is not None]

        stats_a = {
            "avg_score": statistics.mean(a_scores) if a_scores else 0,
            "avg_latency_ms": statistics.mean(a_latencies) if a_latencies else 0,
            "p95_latency_ms": sorted(a_latencies)[int(len(a_latencies) * 0.95)] if a_latencies else 0,
            "avg_feedback": statistics.mean(a_feedback) if a_feedback else 0,
        }
        stats_b = {
            "avg_score": statistics.mean(b_scores) if b_scores else 0,
            "avg_latency_ms": statistics.mean(b_latencies) if b_latencies else 0,
            "p95_latency_ms": sorted(b_latencies)[int(len(b_latencies) * 0.95)] if b_latencies else 0,
            "avg_feedback": statistics.mean(b_feedback) if b_feedback else 0,
        }

        is_significant = False
        confidence = 0.0
        if a_scores and b_scores and len(a_scores) >= 30 and len(b_scores) >= 30:
            confidence, is_significant = self._statistical_test(a_scores, b_scores)

        winner = None
        if is_significant:
            if stats_a["avg_score"] > stats_b["avg_score"]:
                winner = self.config.variant_a_name
            else:
                winner = self.config.variant_b_name

        return ABTestReport(
            test_name=self.config.test_name,
            variant_a=stats_a,
            variant_b=stats_b,
            winner=winner,
            confidence=confidence,
            is_significant=is_significant,
            sample_size_a=len(a_results),
            sample_size_b=len(b_results),
        )

    @staticmethod
    def _statistical_test(a: List[float], b: List[float]) -> tuple:
        from scipy.stats import ttest_ind
        t_stat, p_value = ttest_ind(a, b)
        confidence = 1 - p_value
        is_significant = p_value < 0.05
        return round(confidence, 4), is_significant


if __name__ == "__main__":
    config = ABTestConfig(
        test_name="llm_v1_vs_v2",
        variant_a_name="llama-3.1-8b",
        variant_b_name="llama-3.1-8b-finetuned",
        traffic_split=0.5,
        min_sample_size=500,
    )

    runner = ABTestRunner(config)

    def model_a_fn(query: str) -> str:
        return f"Model A response to: {query}"

    def model_b_fn(query: str) -> str:
        return f"Model B enhanced response to: {query}"

    def evaluator_fn(query: str, response: str) -> float:
        return random.uniform(0.7, 1.0)

    queries = [f"テスト質問 {i}" for i in range(200)]
    report = runner.run_test(queries, model_a_fn, model_b_fn, evaluator_fn)

    print(f"Winner: {report.winner}")
    print(f"Confidence: {report.confidence:.2%}")
    print(f"Variant A avg score: {report.variant_a['avg_score']:.4f}")
    print(f"Variant B avg score: {report.variant_b['avg_score']:.4f}")

パターン5:ヒューマン評価プラットフォーム

なぜヒューマン評価が必要か?

自動評価指標では全ての品質次元を捉えられない。流暢さ、有用性、安全性、微妙な事実誤り——これらはすべて人間の判断が必要である。ヒューマン評価はモデル評価の「ゴールドスタンダード」である。

ヒューマン評価プラットフォーム実装

# human_eval_platform.py
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime
import json
import uuid
import statistics


class EvalTaskType(Enum):
    SINGLE_RESPONSE = "single_response"
    PAIRWISE_COMPARISON = "pairwise_comparison"
    RANKING = "ranking"
    ERROR_ANNOTATION = "error_annotation"


class QualityDimension(Enum):
    FACTUAL_ACCURACY = "factual_accuracy"
    RELEVANCE = "relevance"
    COHERENCE = "coherence"
    FLUENCY = "fluency"
    SAFETY = "safety"
    HELPFULNESS = "helpfulness"


@dataclass
class EvalTask:
    task_id: str
    task_type: EvalTaskType
    question: str
    responses: List[str]
    quality_dimensions: List[QualityDimension]
    guidelines: str = ""
    metadata: Dict = field(default_factory=dict)


@dataclass
class AnnotatorResult:
    task_id: str
    annotator_id: str
    scores: Dict[str, float]
    preference: Optional[int] = None
    comments: str = ""
    duration_seconds: float = 0.0
    timestamp: str = ""


@dataclass
class InterAnnotatorAgreement:
    dimension: str
    cohens_kappa: float
    fleiss_kappa: float
    agreement_rate: float


class HumanEvalPlatform:
    def __init__(self):
        self.tasks: Dict[str, EvalTask] = {}
        self.results: Dict[str, List[AnnotatorResult]] = {}
        self.annotator_stats: Dict[str, Dict] = {}

    def create_task(
        self,
        question: str,
        responses: List[str],
        task_type: EvalTaskType = EvalTaskType.SINGLE_RESPONSE,
        quality_dimensions: Optional[List[QualityDimension]] = None,
        guidelines: str = "",
    ) -> EvalTask:
        task_id = str(uuid.uuid4())[:8]
        task = EvalTask(
            task_id=task_id,
            task_type=task_type,
            question=question,
            responses=responses,
            quality_dimensions=quality_dimensions or [
                QualityDimension.FACTUAL_ACCURACY,
                QualityDimension.RELEVANCE,
                QualityDimension.COHERENCE,
            ],
            guidelines=guidelines,
        )
        self.tasks[task_id] = task
        self.results[task_id] = []
        return task

    def submit_annotation(self, result: AnnotatorResult):
        if result.task_id not in self.results:
            raise ValueError(f"Task {result.task_id} not found")
        result.timestamp = datetime.now().isoformat()
        self.results[result.task_id].append(result)
        self._update_annotator_stats(result)

    def get_next_task(self, annotator_id: str) -> Optional[EvalTask]:
        for task_id, task in self.tasks.items():
            existing_annotators = {r.annotator_id for r in self.results[task_id]}
            if annotator_id not in existing_annotators and len(existing_annotators) < 3:
                return task
        return None

    def compute_agreement(self, task_ids: Optional[List[str]] = None) -> List[InterAnnotatorAgreement]:
        target_tasks = task_ids or list(self.tasks.keys())
        agreements = []

        all_dimensions = set()
        for task_id in target_tasks:
            for result in self.results.get(task_id, []):
                all_dimensions.update(result.scores.keys())

        for dimension in all_dimensions:
            scores_by_task = {}
            for task_id in target_tasks:
                task_results = self.results.get(task_id, [])
                if len(task_results) >= 2:
                    scores_by_task[task_id] = [r.scores.get(dimension, 0) for r in task_results]

            if not scores_by_task:
                continue

            agreement_rate = self._compute_pairwise_agreement(scores_by_task)
            cohens_kappa = self._compute_cohens_kappa(scores_by_task)
            fleiss_kappa = self._compute_fleiss_kappa(scores_by_task)

            agreements.append(InterAnnotatorAgreement(
                dimension=dimension,
                cohens_kappa=round(cohens_kappa, 4),
                fleiss_kappa=round(fleiss_kappa, 4),
                agreement_rate=round(agreement_rate, 4),
            ))

        return agreements

    def generate_report(self) -> Dict:
        all_scores = {}
        for task_id, results in self.results.items():
            for result in results:
                for dim, score in result.scores.items():
                    if dim not in all_scores:
                        all_scores[dim] = []
                    all_scores[dim].append(score)

        dimension_stats = {}
        for dim, scores in all_scores.items():
            dimension_stats[dim] = {
                "mean": round(statistics.mean(scores), 4),
                "median": round(statistics.median(scores), 4),
                "stdev": round(statistics.stdev(scores), 4) if len(scores) > 1 else 0,
                "count": len(scores),
            }

        pairwise_stats = {}
        pairwise_tasks = [
            (tid, t) for tid, t in self.tasks.items()
            if t.task_type == EvalTaskType.PAIRWISE_COMPARISON
        ]
        for task_id, task in pairwise_tasks:
            prefs = [r.preference for r in self.results[task_id] if r.preference is not None]
            if prefs:
                pairwise_stats[task_id] = {
                    "response_a_wins": sum(1 for p in prefs if p == 0),
                    "response_b_wins": sum(1 for p in prefs if p == 1),
                    "total_votes": len(prefs),
                }

        return {
            "total_tasks": len(self.tasks),
            "total_annotations": sum(len(r) for r in self.results.values()),
            "dimension_stats": dimension_stats,
            "pairwise_stats": pairwise_stats,
            "annotator_count": len(self.annotator_stats),
        }

    def _update_annotator_stats(self, result: AnnotatorResult):
        aid = result.annotator_id
        if aid not in self.annotator_stats:
            self.annotator_stats[aid] = {"count": 0, "total_duration": 0}
        self.annotator_stats[aid]["count"] += 1
        self.annotator_stats[aid]["total_duration"] += result.duration_seconds

    @staticmethod
    def _compute_pairwise_agreement(scores_by_task: Dict) -> float:
        agreements = 0
        total = 0
        for task_id, scores in scores_by_task.items():
            for i in range(len(scores)):
                for j in range(i + 1, len(scores)):
                    if abs(scores[i] - scores[j]) <= 1:
                        agreements += 1
                    total += 1
        return agreements / total if total > 0 else 0

    @staticmethod
    def _compute_cohens_kappa(scores_by_task: Dict) -> float:
        if len(scores_by_task) < 1:
            return 0.0
        all_pairs = []
        for scores in scores_by_task.values():
            if len(scores) >= 2:
                all_pairs.append((scores[0], scores[1]))
        if not all_pairs:
            return 0.0
        rater1 = [p[0] for p in all_pairs]
        rater2 = [p[1] for p in all_pairs]
        n = len(rater1)
        agree = sum(1 for a, b in zip(rater1, rater2) if abs(a - b) <= 1)
        p_observed = agree / n
        p_expected = 0.2
        return (p_observed - p_expected) / (1 - p_expected) if (1 - p_expected) != 0 else 0

    @staticmethod
    def _compute_fleiss_kappa(scores_by_task: Dict) -> float:
        return 0.0


if __name__ == "__main__":
    platform = HumanEvalPlatform()

    task = platform.create_task(
        question="量子コンピューティングの基本原理を説明してください",
        responses=[
            "量子コンピューティングは量子ビット(qubit)の重ね合わせと絡み合いを利用して並列計算を行います...",
            "量子コンピューティングは量子力学の原理を利用する計算パラダイムで、量子ビットを通じて実現されます...",
        ],
        task_type=EvalTaskType.PAIRWISE_COMPARISON,
        quality_dimensions=[
            QualityDimension.FACTUAL_ACCURACY,
            QualityDimension.RELEVANCE,
            QualityDimension.COHERENCE,
        ],
    )

    for annotator_id in ["ann_1", "ann_2", "ann_3"]:
        result = AnnotatorResult(
            task_id=task.task_id,
            annotator_id=annotator_id,
            scores={
                "factual_accuracy": 4.0 + (hash(annotator_id) % 10) / 10,
                "relevance": 3.5 + (hash(annotator_id) % 10) / 10,
                "coherence": 4.0 + (hash(annotator_id) % 10) / 10,
            },
            preference=0 if hash(annotator_id) % 2 == 0 else 1,
            duration_seconds=45.0,
        )
        platform.submit_annotation(result)

    report = platform.generate_report()
    print(json.dumps(report, indent=2, ensure_ascii=False))

    agreements = platform.compute_agreement()
    for a in agreements:
        print(f"{a.dimension}: κ={a.cohens_kappa:.4f}, agreement={a.agreement_rate:.2%}")

パターン6:本番モニタリングとドリフト検出

なぜ本番モニタリングが必要か?

モデルのデプロイは終わりではなく、モニタリングの始まりである。データ分布の変化、ユーザー行動のシフト、モデルの劣化——これらの問題を早期に発見しないと、サイレントな品質低下につながる。

ドリフト検出システム実装

# production_monitor.py
import numpy as np
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import json
import logging

logger = logging.getLogger(__name__)


class DriftType(Enum):
    DATA_DRIFT = "data_drift"
    CONCEPT_DRIFT = "concept_drift"
    PREDICTION_DRIFT = "prediction_drift"


class AlertLevel(Enum):
    INFO = "info"
    WARNING = "warning"
    CRITICAL = "critical"


@dataclass
class DriftAlert:
    drift_type: DriftType
    metric_name: str
    current_value: float
    baseline_value: float
    drift_score: float
    alert_level: AlertLevel
    timestamp: str
    message: str


@dataclass
class MonitoringWindow:
    window_size: int = 1000
    reference_size: int = 5000


class DataDriftDetector:
    def __init__(
        self,
        reference_data: np.ndarray,
        significance_level: float = 0.05,
    ):
        self.reference_data = reference_data
        self.significance_level = significance_level
        self.reference_mean = np.mean(reference_data, axis=0)
        self.reference_std = np.std(reference_data, axis=0)

    def detect_ks_test(self, current_data: np.ndarray) -> Tuple[float, bool]:
        from scipy.stats import ks_2samp
        stat, p_value = ks_2samp(self.reference_data, current_data)
        is_drift = p_value < self.significance_level
        return p_value, is_drift

    def detect_psi(self, current_data: np.ndarray, n_bins: int = 10) -> Tuple[float, bool]:
        ref_hist, bin_edges = np.histogram(self.reference_data, bins=n_bins, density=True)
        cur_hist, _ = np.histogram(current_data, bins=bin_edges, density=True)

        ref_hist = ref_hist / ref_hist.sum()
        cur_hist = cur_hist / cur_hist.sum()

        ref_hist = np.clip(ref_hist, 1e-6, None)
        cur_hist = np.clip(cur_hist, 1e-6, None)

        psi = np.sum((cur_hist - ref_hist) * np.log(cur_hist / ref_hist))

        is_drift = psi >= 0.1
        return round(psi, 4), is_drift

    def detect_z_score(self, current_data: np.ndarray, threshold: float = 3.0) -> Tuple[float, bool]:
        current_mean = np.mean(current_data)
        z_score = abs(current_mean - self.reference_mean) / (self.reference_std + 1e-8)
        is_drift = z_score > threshold
        return round(float(z_score), 4), is_drift


class PredictionDriftMonitor:
    def __init__(
        self,
        reference_predictions: List[Dict],
        window_size: int = 1000,
    ):
        self.reference_predictions = reference_predictions
        self.window_size = window_size
        self.prediction_buffer: List[Dict] = []
        self.alerts: List[DriftAlert] = []

    def record_prediction(self, prediction: Dict):
        self.prediction_buffer.append({
            **prediction,
            "timestamp": datetime.now().isoformat(),
        })

        if len(self.prediction_buffer) >= self.window_size:
            self._check_drift()
            self.prediction_buffer = self.prediction_buffer[-self.window_size // 2:]

    def _check_drift(self):
        ref_scores = [p.get("confidence", 0) for p in self.reference_predictions]
        cur_scores = [p.get("confidence", 0) for p in self.prediction_buffer]

        ref_arr = np.array(ref_scores)
        cur_arr = np.array(cur_scores)

        detector = DataDriftDetector(ref_arr)
        psi_value, is_psi_drift = detector.detect_psi(cur_arr)
        z_score, is_z_drift = detector.detect_z_score(cur_arr)

        if is_psi_drift or is_z_drift:
            level = AlertLevel.CRITICAL if psi_value > 0.25 else AlertLevel.WARNING
            alert = DriftAlert(
                drift_type=DriftType.PREDICTION_DRIFT,
                metric_name="confidence_score",
                current_value=float(np.mean(cur_arr)),
                baseline_value=float(np.mean(ref_arr)),
                drift_score=psi_value,
                alert_level=level,
                timestamp=datetime.now().isoformat(),
                message=f"Prediction drift detected: PSI={psi_value:.4f}, Z-score={z_score:.4f}",
            )
            self.alerts.append(alert)
            logger.warning(alert.message)

    def get_health_report(self) -> Dict:
        recent_alerts = [
            a for a in self.alerts
            if datetime.fromisoformat(a.timestamp) > datetime.now() - timedelta(hours=24)
        ]
        return {
            "total_predictions_monitored": len(self.prediction_buffer),
            "alerts_last_24h": len(recent_alerts),
            "critical_alerts": sum(1 for a in recent_alerts if a.alert_level == AlertLevel.CRITICAL),
            "latest_drift_score": self.alerts[-1].drift_score if self.alerts else 0,
            "status": "healthy" if not recent_alerts else "degraded",
        }


class ModelPerformanceTracker:
    def __init__(self, baseline_metrics: Dict[str, float]):
        self.baseline_metrics = baseline_metrics
        self.metric_history: List[Dict] = []
        self.degradation_threshold = 0.05

    def record_metrics(self, metrics: Dict[str, float]):
        entry = {
            "timestamp": datetime.now().isoformat(),
            "metrics": metrics,
        }
        self.metric_history.append(entry)
        self._check_degradation(metrics)

    def _check_degradation(self, current_metrics: Dict[str, float]):
        for metric_name, baseline_value in self.baseline_metrics.items():
            current_value = current_metrics.get(metric_name)
            if current_value is None:
                continue
            relative_change = (baseline_value - current_value) / baseline_value
            if relative_change > self.degradation_threshold:
                logger.warning(
                    f"Degradation detected: {metric_name} dropped from "
                    f"{baseline_value:.4f} to {current_value:.4f} "
                    f"({relative_change:.2%} decrease)"
                )

    def get_trend(self, metric_name: str, hours: int = 24) -> Dict:
        cutoff = datetime.now() - timedelta(hours=hours)
        recent = [
            entry for entry in self.metric_history
            if datetime.fromisoformat(entry["timestamp"]) > cutoff
            and metric_name in entry["metrics"]
        ]
        if not recent:
            return {"trend": "no_data", "values": []}

        values = [entry["metrics"][metric_name] for entry in recent]
        trend = "stable"
        if len(values) >= 3:
            first_half = np.mean(values[:len(values)//2])
            second_half = np.mean(values[len(values)//2:])
            if second_half < first_half * 0.95:
                trend = "declining"
            elif second_half > first_half * 1.05:
                trend = "improving"

        return {
            "trend": trend,
            "current": values[-1],
            "baseline": self.baseline_metrics.get(metric_name),
            "values": values,
        }


if __name__ == "__main__":
    np.random.seed(42)
    reference = np.random.normal(0.85, 0.05, 5000)

    detector = DataDriftDetector(reference)

    healthy_data = np.random.normal(0.84, 0.05, 1000)
    drifted_data = np.random.normal(0.70, 0.08, 1000)

    psi_healthy, drift_healthy = detector.detect_psi(healthy_data)
    print(f"Healthy data: PSI={psi_healthy:.4f}, drift={drift_healthy}")

    psi_drifted, drift_drifted = detector.detect_psi(drifted_data)
    print(f"Drifted data: PSI={psi_drifted:.4f}, drift={drift_drifted}")

    reference_preds = [{"confidence": np.random.uniform(0.8, 0.95)} for _ in range(5000)]
    monitor = PredictionDriftMonitor(reference_preds, window_size=100)

    for _ in range(50):
        monitor.record_prediction({"confidence": np.random.uniform(0.8, 0.95)})

    for _ in range(50):
        monitor.record_prediction({"confidence": np.random.uniform(0.5, 0.7)})

    health = monitor.get_health_report()
    print(f"\nHealth Report: {json.dumps(health, indent=2)}")

5つのよくある落とし穴と解決策

落とし穴1:評価データのリーク

訓練データと評価データが重複し、評価スコアが不当に高くなる。

解決策:ハッシュベースの重複排除による厳格なデータ分離戦略。

def check_data_leakage(train_data: List[str], eval_data: List[str], threshold: float = 0.8) -> Dict:
    from difflib import SequenceMatcher
    leaks = []
    for i, eval_item in enumerate(eval_data):
        for train_item in train_data:
            similarity = SequenceMatcher(None, eval_item, train_item).ratio()
            if similarity > threshold:
                leaks.append({"eval_index": i, "similarity": round(similarity, 4)})
                break
    return {"leak_count": len(leaks), "leak_rate": len(leaks) / len(eval_data)}

落とし穴2:評価指標とビジネス目標の乖離

MMLUで高スコアを出してもビジネスKPIが改善しない。

解決策:指標-ビジネスマッピングテーブルを構築し、評価指標とビジネス目標の整合性を確保。

def align_metrics_with_business(eval_metrics: Dict, business_kpis: Dict) -> List[str]:
    alignment_map = {
        "faithfulness": ["customer_satisfaction", "complaint_rate"],
        "answer_relevancy": ["task_completion_rate", "user_engagement"],
        "latency_p95": ["session_duration", "bounce_rate"],
    }
    misaligned = []
    for metric in eval_metrics:
        if metric not in alignment_map:
            misaligned.append(f"Metric '{metric}' has no business KPI mapping")
    return misaligned

落とし穴3:A/Bテストのサンプルサイズ不足

200件で結論を出し、統計的検出力が不足している。

解決策:必要なサンプルサイズを事前に計算する。

def calculate_sample_size(
    baseline_rate: float,
    minimum_detectable_effect: float,
    alpha: float = 0.05,
    power: float = 0.8,
) -> int:
    import math
    from scipy.stats import norm
    z_alpha = norm.ppf(1 - alpha / 2)
    z_beta = norm.ppf(power)
    p1 = baseline_rate
    p2 = baseline_rate * (1 + minimum_detectable_effect)
    p_avg = (p1 + p2) / 2
    n = (z_alpha * math.sqrt(2 * p_avg * (1 - p_avg)) +
         z_beta * math.sqrt(p1 * (1 - p1) + p2 * (1 - p2))) ** 2 / (p2 - p1) ** 2
    return math.ceil(n)

落とし穴4:アノテーター間一致度の無視

複数のアノテーターの結果が大きく異なるのに、平均を取ることで品質問題を隠蔽する。

解決策:Cohen's Kappaを計算し、0.6未満の次元は再アノテーションを行う。

落とし穴5:本番環境のドリフト検出の遅延

月次評価でのみモデルの劣化を発見し、影響が数週間持続している。

解決策:スライディングウィンドウ検出によるリアルタイムモニタリングと時間単位のアラート。


10のよくあるエラートラブルシューティング

エラー現象 考えられる原因 トラブルシューティング手順 解決策
lm-eval OOM batch_sizeが大きすぎるかモデルが大きすぎる batch_sizeを減らし、FSDPを有効化 batch_size=1 + device_map=auto
RAGAS faithfulness = 0 contextsまたはanswerが空 eval_dataのcontextsフィールドを確認 contextsが空でなく関連性があることを確認
pytestタイムアウト モデル推論遅延が高すぎる GPU使用率とバッチ設定を確認 タイムアウトを増やすか推論設定を最適化
A/Bテストが有意でない サンプルサイズ不足または効果差が小さい 統計的検出力と必要サンプルサイズを計算 テスト期間を延長またはトラフィック比率を増加
ヒューマン評価Kappa < 0.4 アノテーションガイドラインが不明確 ガイドラインを見直しキャリブレーションテストを実施 例とエッジケースの説明を追加
PSI誤警報が頻発 参照データ分布が狭すぎる 参照データの時間ウィンドウを拡大 30日間のデータをベースラインとして使用
評価結果が再現不可能 ランダムシードが未設定 グローバルランダムシードを設定 torch.manual_seed(42) + numpy.random.seed(42)
評価データフォーマットエラー フィールド欠落または型の不一致 スキーマでデータを検証 Pydanticモデル検証を使用
ドリフト検出遅延が高い モニタリングウィンドウが大きすぎる スライディングウィンドウを縮小 1000件から200件に減らす
評価レポートの指標欠落 一部のmetric計算が失敗 LLM API呼び出しのタイムアウトを確認 リトライメカニズムとフォールバックを追加

高度な最適化テクニック

1. 次元横断評価

単一指標ではモデル品質を全面的に反映できない。次元横断評価マトリックスで弱点を発見:

class CrossDimensionEvaluator:
    def __init__(self, dimensions: List[str]):
        self.dimensions = dimensions
        self.matrix = {d: {d2: [] for d2 in dimensions} for d in dimensions}

    def evaluate_cross(self, samples: List[Dict], eval_fn) -> Dict:
        for sample in samples:
            scores = eval_fn(sample)
            for d1 in self.dimensions:
                for d2 in self.dimensions:
                    if d1 != d2:
                        self.matrix[d1][d2].append(scores.get(d1, 0) * scores.get(d2, 0))

        correlation = {}
        for d1 in self.dimensions:
            for d2 in self.dimensions:
                if d1 != d2 and self.matrix[d1][d2]:
                    import statistics
                    correlation[f"{d1}×{d2}"] = round(
                        statistics.mean(self.matrix[d1][d2]), 4
                    )
        return correlation

2. 動的評価セット生成

静的評価セットは「スコア操作」の対象になりやすい。動的に評価問題を生成し、公平性を確保:

class DynamicEvalGenerator:
    def __init__(self, llm_client):
        self.llm = llm_client

    def generate_eval_questions(
        self,
        domain: str,
        difficulty: str = "medium",
        count: int = 50,
    ) -> List[Dict]:
        prompt = f"""{domain}ドメインで{difficulty}難度の評価問題を{count}問生成してください。
        各問題にはquestion, choices, answer, explanationを含めてください。
        JSON配列形式で返してください。"""
        response = self.llm.chat.completions.create(
            model="default",
            messages=[{"role": "user", "content": prompt}],
            temperature=0.7,
        )
        return json.loads(response.choices[0].message.content)

3. 段階的評価戦略

異なるシーンで異なる評価粒度を使用し、コストと品質保証のバランスを取る:

レベル トリガー条件 評価範囲 評価時間
L0 クイック検証 毎回のコミット 50コアテストケース <5分
L1 標準評価 デイリービルド 500標準セット ~30分
L2 完全評価 バージョンリリース 5000完全セット ~2時間
L3 ヒューマンスポットチェック メジャーリリース 100件の専門家レビュー ~1日

4. 評価結果のバージョン管理

class EvalVersionManager:
    def __init__(self, store_path: str = "./eval_versions"):
        self.store_path = store_path
        import os
        os.makedirs(store_path, exist_ok=True)

    def save_version(
        self,
        model_version: str,
        eval_results: Dict,
        eval_config: Dict,
    ) -> str:
        version_id = f"{model_version}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
        record = {
            "version_id": version_id,
            "model_version": model_version,
            "eval_results": eval_results,
            "eval_config": eval_config,
            "timestamp": datetime.now().isoformat(),
        }
        path = os.path.join(self.store_path, f"{version_id}.json")
        with open(path, "w", encoding="utf-8") as f:
            json.dump(record, f, indent=2, ensure_ascii=False)
        return version_id

    def compare_versions(self, version_a: str, version_b: str) -> Dict:
        data_a = self._load_version(version_a)
        data_b = self._load_version(version_b)
        diff = {}
        for metric in data_a["eval_results"]:
            if metric in data_b["eval_results"]:
                diff[metric] = {
                    "a": data_a["eval_results"][metric],
                    "b": data_b["eval_results"][metric],
                    "delta": round(
                        data_b["eval_results"][metric] - data_a["eval_results"][metric], 4
                    ),
                }
        return diff

5. 評価パイプラインオーケストレーション

class EvalPipelineOrchestrator:
    def __init__(self):
        self.stages = []

    def add_stage(self, name: str, eval_fn: Callable, gate_threshold: float = 0.0):
        self.stages.append({
            "name": name,
            "eval_fn": eval_fn,
            "gate_threshold": gate_threshold,
        })

    def run(self, model, dataset) -> Dict:
        results = {}
        for stage in self.stages:
            print(f"Running stage: {stage['name']}")
            score = stage["eval_fn"](model, dataset)
            results[stage["name"]] = score

            if score < stage["gate_threshold"]:
                print(f"GATE FAILED: {stage['name']} score {score:.4f} < {stage['gate_threshold']}")
                results["status"] = "gate_failed"
                results["failed_stage"] = stage["name"]
                return results

        results["status"] = "passed"
        return results

比較分析

評価パターン 適用段階 評価次元 自動化レベル コスト 信頼性 タイムリー性
LLMベンチマーク モデル選択/訓練 汎用能力
RAG評価 RAGシステム開発 検索+生成
自動テスト CI/CDパイプライン カスタム指標
A/Bテスト モデルデプロイ ビジネス指標
ヒューマン評価 品質保証 全次元 最高 最低
本番モニタリング 運用 ドリフト/劣化 最高
ドリフト検出方法 適用シナリオ 検出速度 誤検出率 最小サンプルサイズ
KS検定 連続データ分布 高速 100
PSI カテゴリ/連続型 高速 500
Z-score 平均シフト 最速 30
ADWIN ストリーミングデータ 200
Page-Hinkley 累積ドリフト 100

おすすめオンラインツール


まとめ:AIモデル評価は「スコアを出す」だけではない——モデルライフサイクル全体をカバーするエンジニアリング体系である。6つのプロダクションパターンにはそれぞれの役割がある:ベンチマークは汎用能力を定量化し、RAG評価は検索・生成の弱点を特定し、自動テストはCI/CDの品質ゲートを確保し、A/Bテストはオンライン効果を検証し、ヒューマン評価は品質の上限を設定し、本番モニタリングはサイレント劣化を防ぐ。2026年、体系的な評価なしにAIシステムを運用することは、目隠しして飛ぶようなものである。


参考リンク

ブラウザローカルツールを無料で試す →

#AI模型评估#LLM基准测试#RAG评估#自动化评测#Python#2026#AI与大数据