Python AIモデル評価実践:ベンチマークから自動評価までの6つのプロダクションパターン
Python AIモデル評価実践:ベンチマークから自動評価までの6つのプロダクションパターン
AIモデルを本番稼働させたものの、精度は本当にどうなのか?LLMの回答品質をどう定量化するか?RAGシステムの検索と生成のどちらがボトルネックなのか?本番稼働から3ヶ月後にモデルの性能劣化はないか?多くのチームがまだ「いくつかの結果を目視確認」するだけでモデルを評価している——これは肉眼でチップの歩留まりを検査するようなもので、信頼性も再現性もない。2026年、AIモデル評価は完全なエンジニアリング体系を形成している:lm-evaluation-harnessによるベンチマーク、RAGASフレームワークによるRAG評価、pytest駆動の自動評価パイプライン、A/Bテストによるモデル比較、ヒューマン評価プラットフォーム、本番環境のドリフト検出——モデルライフサイクル全体をカバーする6つのプロダクションパターン。
主要なポイント
- lm-evaluation-harnessによるLLM標準化ベンチマークの完全なワークフローを習得
- RAGASフレームワークを使用してRAGシステムの検索品質と生成品質を定量的に評価
- pytest駆動の自動評価パイプラインを構築し、CI/CDの品質ゲートを実現
- 科学的なA/Bテスト計画を設計し、異なるモデルバージョンの効果差を比較
- ヒューマン評価プラットフォームを構築し、高品質な人間フィードバックデータを収集
- 本番環境のモデルドリフト検出と自動アラートメカニズムを実装
- 6つの評価パターンの適用シーン、長所短所、組み合わせ戦略を理解
目次
- アーキテクチャ概要:AIモデル評価の全体像
- パターン1:LLMベンチマーク(lm-evaluation-harness)
- パターン2:RAG評価(RAGASフレームワーク)
- パターン3:自動評価パイプライン(pytest)
- パターン4:A/Bテストモデル比較
- パターン5:ヒューマン評価プラットフォーム
- パターン6:本番モニタリングとドリフト検出
- 5つのよくある落とし穴と解決策
- 10のよくあるエラートラブルシューティング
- 高度な最適化テクニック
- 比較分析
- おすすめオンラインツール
アーキテクチャ概要:AIモデル評価の全体像
┌─────────────────────────────────────────────────────────────┐
│ AI Model Evaluation Pipeline │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────────────────┐ │
│ │ Offline │ │ Online │ │ Human-in-the-Loop │ │
│ │ Eval │ │ Eval │ │ Evaluation │ │
│ │ │ │ │ │ │ │
│ │ • Bench │ │ • A/B │ │ • Preference Ranking │ │
│ │ mark │ │ Test │ │ • Quality Scoring │ │
│ │ • RAG │ │ • Drift │ │ • Red Team Testing │ │
│ │ Eval │ │ Detect │ │ • Domain Expert │ │
│ │ • Auto │ │ • Prod │ │ Review │ │
│ │ Test │ │ Monitor│ │ │ │
│ └────┬─────┘ └────┬─────┘ └──────────┬───────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Evaluation Results Store │ │
│ │ ┌─────────┐ ┌──────────┐ ┌──────────────────┐ │ │
│ │ │ Metrics │ │ Reports │ │ Comparison Board │ │ │
│ │ │ DB │ │ Generator│ │ │ │ │
│ │ └─────────┘ └──────────┘ └──────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Decision & Action Layer │ │
│ │ • Model Promotion / Rollback │ │
│ │ • Retraining Trigger │ │
│ │ • Alert & Notification │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
パターン1:LLMベンチマーク(lm-evaluation-harness)
なぜ標準化ベンチマークが必要か?
「私たちのモデルは良い性能を出している」——この発言には情報量がない。制御された条件下で標準化されたデータセットと指標を使用し、モデルの能力を定量化する必要がある。EleutherAIのlm-evaluation-harnessは2026年に最も広く使用されているLLM評価フレームワークで、200以上のタスクをサポートしている。
完全なベンチマークワークフロー
# llm_benchmark.py
from lm_eval import evaluator
from lm_eval.models.huggingface import HuggingFaceAuto
from typing import Dict, List, Optional
import json
import os
class LLMBenchmarkRunner:
def __init__(
self,
model_path: str,
device: str = "cuda",
batch_size: int = 8,
):
self.model_path = model_path
self.device = device
self.batch_size = batch_size
self.results_history = []
def run_core_tasks(self) -> Dict:
core_tasks = [
"mmlu",
"hellaswag",
"arc_challenge",
"truthfulqa_mc2",
"winogrande",
"gsm8k",
]
results = evaluator.simple_evaluate(
model="hf",
model_args=f"pretrained={self.model_path}",
tasks=core_tasks,
batch_size=self.batch_size,
device=self.device,
)
formatted = self._format_results(results)
self.results_history.append(formatted)
return formatted
def run_custom_task(self, task_config_path: str) -> Dict:
results = evaluator.simple_evaluate(
model="hf",
model_args=f"pretrained={self.model_path}",
tasks=[task_config_path],
batch_size=self.batch_size,
device=self.device,
)
return self._format_results(results)
def _format_results(self, raw_results: Dict) -> Dict:
formatted = {
"model": self.model_path,
"timestamp": self._get_timestamp(),
"tasks": {},
}
for task_name, task_results in raw_results["results"].items():
formatted["tasks"][task_name] = {
k: round(v, 4) if isinstance(v, float) else v
for k, v in task_results.items()
}
return formatted
def compare_with_baseline(self, baseline_path: str) -> Dict:
if not self.results_history:
self.run_core_tasks()
with open(baseline_path, "r") as f:
baseline = json.load(f)
current = self.results_history[-1]
comparison = {}
for task_name in current["tasks"]:
if task_name in baseline["tasks"]:
current_score = current["tasks"][task_name].get("acc,none", 0)
baseline_score = baseline["tasks"][task_name].get("acc,none", 0)
comparison[task_name] = {
"current": current_score,
"baseline": baseline_score,
"delta": round(current_score - baseline_score, 4),
"improved": current_score > baseline_score,
}
return comparison
@staticmethod
def _get_timestamp() -> str:
from datetime import datetime
return datetime.now().isoformat()
class CustomTaskConfig:
@staticmethod
def create_domain_eval(
dataset_path: str,
task_name: str,
output_dir: str = "./custom_tasks",
) -> str:
config = {
"task": task_name,
"dataset_path": dataset_path,
"output_type": "multiple_choice",
"test_split": "test",
"doc_to_text": "{{question}}",
"doc_to_target": "{{answer}}",
"doc_to_choice": "{{choices}}",
"metric_list": [
{"metric": "acc", "aggregation": "mean"},
{"metric": "f1", "aggregation": "mean"},
],
}
os.makedirs(output_dir, exist_ok=True)
config_path = os.path.join(output_dir, f"{task_name}.yaml")
import yaml
with open(config_path, "w") as f:
yaml.dump(config, f)
return config_path
if __name__ == "__main__":
runner = LLMBenchmarkRunner(
model_path="meta-llama/Llama-3.1-8B-Instruct",
batch_size=4,
)
results = runner.run_core_tasks()
print(json.dumps(results, indent=2))
comparison = runner.compare_with_baseline("./baseline_results.json")
for task, delta_info in comparison.items():
status = "↑" if delta_info["improved"] else "↓"
print(f"{task}: {delta_info['baseline']:.4f} → {delta_info['current']:.4f} {status}{delta_info['delta']:+.4f}")
カスタムドメイン評価タスク
# custom_tasks/medical_qa.yaml
task: medical_qa
dataset_path: json
dataset_kwargs:
data_files:
test: ./data/medical_qa_test.jsonl
test_split: test
doc_to_text: "質問:{{question}}\n選択肢:\nA. {{A}}\nB. {{B}}\nC. {{C}}\nD. {{D}}\n回答:"
doc_to_target: "{{answer}}"
doc_to_choice: ["A", "B", "C", "D"]
metric_list:
- metric: acc
aggregation: mean
- metric: f1
aggregation: mean
パターン2:RAG評価(RAGASフレームワーク)
RAG評価の次元
RAGシステムには検索と生成の2つのコンポーネントがあり、それぞれ別々に評価する必要がある。RAGASフレームワークは4つのコア指標を提供する:
| 指標 | 評価コンポーネント | 意味 | 計算方法 |
|---|---|---|---|
| Context Precision | 検索 | 検索結果における関連ドキュメントのランキング精度 | 関連ドキュメントランクの加重平均 |
| Context Recall | 検索 | 必要な情報が検索された割合 | Ground Truthが検索内容でカバーされる割合 |
| Faithfulness | 生成 | 生成回答と検索ドキュメントの事実的一致性 | 回答の主張が検索ドキュメントでサポートされる割合 |
| Answer Relevancy | 生成 | 回答と質問の関連性 | 回答から元の質問を生成する逆確率 |
完全なRAG評価実装
# rag_evaluation_benchmark.py
from ragas import evaluate
from ragas.metrics import (
context_precision,
context_recall,
faithfulness,
answer_relevancy,
AnswerSimilarity,
)
from datasets import Dataset
from typing import List, Dict, Optional
from dataclasses import dataclass, field
import json
@dataclass
class RAGEvalSample:
question: str
contexts: List[str]
answer: str
ground_truth: str
@dataclass
class RAGEvalReport:
faithfulness: float
answer_relevancy: float
context_precision: float
context_recall: float
answer_similarity: float = 0.0
sample_count: int = 0
details: List[Dict] = field(default_factory=list)
class RAGEvaluator:
def __init__(
self,
metrics: Optional[List] = None,
llm=None,
embeddings=None,
):
self.metrics = metrics or [
faithfulness,
answer_relevancy,
context_precision,
context_recall,
]
self.llm = llm
self.embeddings = embeddings
def evaluate_samples(
self,
samples: List[RAGEvalSample],
) -> RAGEvalReport:
eval_data = {
"question": [s.question for s in samples],
"contexts": [s.contexts for s in samples],
"answer": [s.answer for s in samples],
"ground_truth": [s.ground_truth for s in samples],
}
dataset = Dataset.from_dict(eval_data)
result = evaluate(
dataset,
metrics=self.metrics,
llm=self.llm,
embeddings=self.embeddings,
)
return RAGEvalReport(
faithfulness=result["faithfulness"],
answer_relevancy=result["answer_relevancy"],
context_precision=result["context_precision"],
context_recall=result["context_recall"],
sample_count=len(samples),
)
def evaluate_rag_pipeline(
self,
rag_pipeline,
test_questions: List[Dict],
) -> RAGEvalReport:
samples = []
for q in test_questions:
rag_result = rag_pipeline.query(q["question"])
sample = RAGEvalSample(
question=q["question"],
contexts=rag_result["contexts"],
answer=rag_result["answer"],
ground_truth=q["ground_truth"],
)
samples.append(sample)
return self.evaluate_samples(samples)
def compare_pipelines(
self,
pipelines: Dict[str, object],
test_questions: List[Dict],
) -> Dict[str, RAGEvalReport]:
reports = {}
for name, pipeline in pipelines.items():
report = self.evaluate_rag_pipeline(pipeline, test_questions)
reports[name] = report
print(f"\n=== {name} ===")
print(f" Faithfulness: {report.faithfulness:.4f}")
print(f" Answer Relevancy: {report.answer_relevancy:.4f}")
print(f" Context Precision: {report.context_precision:.4f}")
print(f" Context Recall: {report.context_recall:.4f}")
return reports
class RAGEvalDatasetBuilder:
@staticmethod
def from_qa_pairs(
qa_pairs: List[Dict],
rag_pipeline=None,
) -> List[RAGEvalSample]:
samples = []
for qa in qa_pairs:
if rag_pipeline and "contexts" not in qa:
result = rag_pipeline.query(qa["question"])
contexts = result["contexts"]
answer = result["answer"]
else:
contexts = qa.get("contexts", [])
answer = qa.get("answer", "")
samples.append(RAGEvalSample(
question=qa["question"],
contexts=contexts,
answer=answer,
ground_truth=qa["ground_truth"],
))
return samples
@staticmethod
def from_jsonl(file_path: str) -> List[RAGEvalSample]:
samples = []
with open(file_path, "r", encoding="utf-8") as f:
for line in f:
data = json.loads(line.strip())
samples.append(RAGEvalSample(
question=data["question"],
contexts=data["contexts"],
answer=data["answer"],
ground_truth=data["ground_truth"],
))
return samples
if __name__ == "__main__":
test_data = [
{
"question": "ゼロトラストネットワークアクセス(ZTNA)とは何ですか?",
"contexts": ["ゼロトラストネットワークアクセス(ZTNA)は、「決して信頼せず、常に検証する」という原則に基づくセキュリティモデルで、リモートユーザーに特定アプリケーションへの安全なアクセスを提供します。"],
"answer": "ZTNAは決して信頼せず常に検証するという原則に基づくセキュリティモデルで、リモートユーザーに安全なアクセスを提供します。",
"ground_truth": "ゼロトラストネットワークアクセス(ZTNA)は、決して信頼せず常に検証するという原則に基づくセキュリティアーキテクチャで、認証と認可を通じてリモートユーザーに特定アプリケーションへの安全なアクセスを提供し、従来のVPNに代わるものです。",
},
{
"question": "SASEアーキテクチャのコアコンポーネントは何ですか?",
"contexts": ["SASE(Secure Access Service Edge)は、SD-WAN、SWG、CASB、FWaaS、ZTNAを統合したクラウドネイティブサービスです。"],
"answer": "SASEアーキテクチャにはSD-WAN、SWG、CASB、FWaaS、ZTNAがコアコンポーネントとして含まれ、統合されたクラウドネイティブサービスとして提供されます。",
"ground_truth": "SASEアーキテクチャのコアコンポーネントには、SD-WAN(ソフトウェア定義広域ネットワーク)、SWG(セキュアWebゲートウェイ)、CASB(クラウドアクセスセキュリティブローカー)、FWaaS(ファイアウォール・アズ・ア・サービス)、ZTNA(ゼロトラストネットワークアクセス)が含まれ、統合されたクラウドネイティブサービスデリバリーモデルとして提供されます。",
},
]
evaluator = RAGEvaluator()
samples = RAGEvalDatasetBuilder.from_qa_pairs(test_data)
report = evaluator.evaluate_samples(samples)
print(f"\n=== RAG Evaluation Report ===")
print(f"Faithfulness: {report.faithfulness:.4f}")
print(f"Answer Relevancy: {report.answer_relevancy:.4f}")
print(f"Context Precision: {report.context_precision:.4f}")
print(f"Context Recall: {report.context_recall:.4f}")
print(f"Sample Count: {report.sample_count}")
パターン3:自動評価パイプライン(pytest)
なぜ自動テストが必要か?
手動評価は再現不可能、追跡不可能、CI/CDに統合できない。pytest駆動の自動評価パイプラインは、モデル評価をユニットテストと同じように信頼性の高いものにする。
完全な自動テストフレームワーク
# tests/conftest.py
import pytest
from typing import Dict, List
import json
import os
@pytest.fixture(scope="session")
def model_client():
from openai import OpenAI
return OpenAI(
base_url=os.getenv("MODEL_API_URL", "http://localhost:8000/v1"),
api_key=os.getenv("MODEL_API_KEY", "test"),
)
@pytest.fixture(scope="session")
def eval_dataset():
with open("./data/eval_dataset.json", "r", encoding="utf-8") as f:
return json.load(f)
@pytest.fixture(scope="session")
def baseline_scores():
with open("./data/baseline_scores.json", "r", encoding="utf-8") as f:
return json.load(f)
# tests/test_model_quality.py
import pytest
from typing import Dict
class TestModelQuality:
def test_factual_accuracy(self, model_client, eval_dataset):
factual_questions = [
q for q in eval_dataset if q["category"] == "factual"
]
correct = 0
for q in factual_questions:
response = model_client.chat.completions.create(
model="default",
messages=[{"role": "user", "content": q["question"]}],
temperature=0.0,
)
answer = response.choices[0].message.content
if self._check_answer(answer, q["expected_keywords"]):
correct += 1
accuracy = correct / len(factual_questions)
assert accuracy >= 0.85, f"Factual accuracy {accuracy:.2%} below threshold 85%"
def test_no_hallucination(self, model_client, eval_dataset):
hallucination_prompts = [
q for q in eval_dataset if q["category"] == "hallucination_trap"
]
hallucinated = 0
for q in hallucination_prompts:
response = model_client.chat.completions.create(
model="default",
messages=[{"role": "user", "content": q["question"]}],
temperature=0.0,
)
answer = response.choices[0].message.content
if self._contains_hallucination(answer, q["trap_keywords"]):
hallucinated += 1
hallucination_rate = hallucinated / len(hallucination_prompts)
assert hallucination_rate <= 0.10, f"Hallucination rate {hallucination_rate:.2%} above threshold 10%"
def test_response_latency(self, model_client, eval_dataset):
import time
latencies = []
for q in eval_dataset[:20]:
start = time.time()
model_client.chat.completions.create(
model="default",
messages=[{"role": "user", "content": q["question"]}],
)
latencies.append(time.time() - start)
avg_latency = sum(latencies) / len(latencies)
p95_latency = sorted(latencies)[int(len(latencies) * 0.95)]
assert avg_latency <= 2.0, f"Average latency {avg_latency:.2f}s above threshold 2s"
assert p95_latency <= 5.0, f"P95 latency {p95_latency:.2f}s above threshold 5s"
def test_output_format_compliance(self, model_client, eval_dataset):
format_questions = [
q for q in eval_dataset if q.get("expected_format") == "json"
]
format_errors = 0
for q in format_questions:
response = model_client.chat.completions.create(
model="default",
messages=[{"role": "user", "content": q["question"]}],
temperature=0.0,
)
answer = response.choices[0].message.content
try:
json.loads(answer)
except json.JSONDecodeError:
format_errors += 1
format_accuracy = 1 - format_errors / len(format_questions)
assert format_accuracy >= 0.95, f"JSON format accuracy {format_accuracy:.2%} below 95%"
def test_regression_against_baseline(self, model_client, eval_dataset, baseline_scores):
current_scores = self._run_evaluation_suite(model_client, eval_dataset)
for metric, baseline_value in baseline_scores.items():
current_value = current_scores.get(metric, 0)
assert current_value >= baseline_value * 0.95, (
f"Regression detected: {metric} dropped from {baseline_value:.4f} to {current_value:.4f}"
)
@staticmethod
def _check_answer(answer: str, keywords: List[str]) -> bool:
answer_lower = answer.lower()
matched = sum(1 for kw in keywords if kw.lower() in answer_lower)
return matched >= len(keywords) * 0.6
@staticmethod
def _contains_hallucination(answer: str, trap_keywords: List[str]) -> bool:
answer_lower = answer.lower()
return any(kw.lower() in answer_lower for kw in trap_keywords)
@staticmethod
def _run_evaluation_suite(model_client, eval_dataset) -> Dict:
return {
"accuracy": 0.88,
"faithfulness": 0.91,
"relevancy": 0.85,
}
# pytest.ini
"""
[pytest]
testpaths = tests
python_files = test_model_quality.py
python_classes = TestModelQuality
python_functions = test_*
addopts = -v --tb=short --json-report --json-report-file=eval_report.json
markers =
smoke: smoke tests for quick validation
regression: full regression test suite
benchmark: performance benchmark tests
"""
CI/CD統合
# .github/workflows/model_eval.yml
name: Model Evaluation Pipeline
on:
pull_request:
paths:
- 'models/**'
- 'config/**'
jobs:
model-eval:
runs-on: gpu-runner
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Install Dependencies
run: |
pip install -r requirements-eval.txt
pip install pytest pytest-json-report
- name: Deploy Model Canary
run: |
python scripts/deploy_canary.py --model-path ${{ env.MODEL_PATH }}
- name: Run Smoke Tests
run: pytest tests/ -m smoke -v
- name: Run Full Evaluation
run: pytest tests/ -m regression -v --json-report
- name: Check Regression
run: python scripts/check_regression.py --report eval_report.json --baseline data/baseline_scores.json
- name: Generate Report
if: always()
run: python scripts/generate_eval_report.py --report eval_report.json
- name: Upload Results
if: always()
uses: actions/upload-artifact@v4
with:
name: eval-results
path: eval_report.json
パターン4:A/Bテストモデル比較
なぜA/Bテストが必要か?
オフライン評価スコアが高くても、オンラインパフォーマンスが良いとは限らない。A/Bテストは実際のトラフィックでモデルを比較する——最も信頼性の高い効果検証方法である。
A/Bテストフレームワーク実装
# ab_test_framework.py
import hashlib
import random
import time
from typing import Dict, List, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import json
import statistics
class AllocationStrategy(Enum):
RANDOM = "random"
HASH_BASED = "hash_based"
STICKY = "sticky"
@dataclass
class ABTestConfig:
test_name: str
variant_a_name: str
variant_b_name: str
traffic_split: float = 0.5
min_sample_size: int = 1000
confidence_level: float = 0.95
allocation_strategy: AllocationStrategy = AllocationStrategy.HASH_BASED
duration_hours: int = 72
@dataclass
class ABTestResult:
query: str
variant: str
response: str
latency_ms: float
timestamp: str
user_feedback: Optional[int] = None
auto_score: Optional[float] = None
@dataclass
class ABTestReport:
test_name: str
variant_a: Dict
variant_b: Dict
winner: Optional[str] = None
confidence: float = 0.0
is_significant: bool = False
sample_size_a: int = 0
sample_size_b: int = 0
class ABTestRunner:
def __init__(self, config: ABTestConfig):
self.config = config
self.results: List[ABTestResult] = []
self._sticky_map: Dict[str, str] = {}
def allocate_variant(self, user_id: str) -> str:
if self.config.allocation_strategy == AllocationStrategy.RANDOM:
return self.config.variant_a_name if random.random() < self.config.traffic_split else self.config.variant_b_name
elif self.config.allocation_strategy == AllocationStrategy.HASH_BASED:
hash_val = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
threshold = int(self.config.traffic_split * (2**128))
variant = self.config.variant_a_name if hash_val < threshold else self.config.variant_b_name
return variant
elif self.config.allocation_strategy == AllocationStrategy.STICKY:
if user_id in self._sticky_map:
return self._sticky_map[user_id]
variant = self.allocate_variant(user_id + "_init")
self._sticky_map[user_id] = variant
return variant
def record_result(self, result: ABTestResult):
self.results.append(result)
def run_test(
self,
queries: List[str],
model_a_fn: Callable,
model_b_fn: Callable,
evaluator_fn: Optional[Callable] = None,
) -> ABTestReport:
for i, query in enumerate(queries):
user_id = f"user_{i}"
variant = self.allocate_variant(user_id)
model_fn = model_a_fn if variant == self.config.variant_a_name else model_b_fn
start_time = time.time()
response = model_fn(query)
latency_ms = (time.time() - start_time) * 1000
auto_score = evaluator_fn(query, response) if evaluator_fn else None
self.record_result(ABTestResult(
query=query,
variant=variant,
response=response,
latency_ms=latency_ms,
timestamp=datetime.now().isoformat(),
auto_score=auto_score,
))
return self.analyze()
def analyze(self) -> ABTestReport:
a_results = [r for r in self.results if r.variant == self.config.variant_a_name]
b_results = [r for r in self.results if r.variant == self.config.variant_b_name]
a_scores = [r.auto_score for r in a_results if r.auto_score is not None]
b_scores = [r.auto_score for r in b_results if r.auto_score is not None]
a_latencies = [r.latency_ms for r in a_results]
b_latencies = [r.latency_ms for r in b_results]
a_feedback = [r.user_feedback for r in a_results if r.user_feedback is not None]
b_feedback = [r.user_feedback for r in b_results if r.user_feedback is not None]
stats_a = {
"avg_score": statistics.mean(a_scores) if a_scores else 0,
"avg_latency_ms": statistics.mean(a_latencies) if a_latencies else 0,
"p95_latency_ms": sorted(a_latencies)[int(len(a_latencies) * 0.95)] if a_latencies else 0,
"avg_feedback": statistics.mean(a_feedback) if a_feedback else 0,
}
stats_b = {
"avg_score": statistics.mean(b_scores) if b_scores else 0,
"avg_latency_ms": statistics.mean(b_latencies) if b_latencies else 0,
"p95_latency_ms": sorted(b_latencies)[int(len(b_latencies) * 0.95)] if b_latencies else 0,
"avg_feedback": statistics.mean(b_feedback) if b_feedback else 0,
}
is_significant = False
confidence = 0.0
if a_scores and b_scores and len(a_scores) >= 30 and len(b_scores) >= 30:
confidence, is_significant = self._statistical_test(a_scores, b_scores)
winner = None
if is_significant:
if stats_a["avg_score"] > stats_b["avg_score"]:
winner = self.config.variant_a_name
else:
winner = self.config.variant_b_name
return ABTestReport(
test_name=self.config.test_name,
variant_a=stats_a,
variant_b=stats_b,
winner=winner,
confidence=confidence,
is_significant=is_significant,
sample_size_a=len(a_results),
sample_size_b=len(b_results),
)
@staticmethod
def _statistical_test(a: List[float], b: List[float]) -> tuple:
from scipy.stats import ttest_ind
t_stat, p_value = ttest_ind(a, b)
confidence = 1 - p_value
is_significant = p_value < 0.05
return round(confidence, 4), is_significant
if __name__ == "__main__":
config = ABTestConfig(
test_name="llm_v1_vs_v2",
variant_a_name="llama-3.1-8b",
variant_b_name="llama-3.1-8b-finetuned",
traffic_split=0.5,
min_sample_size=500,
)
runner = ABTestRunner(config)
def model_a_fn(query: str) -> str:
return f"Model A response to: {query}"
def model_b_fn(query: str) -> str:
return f"Model B enhanced response to: {query}"
def evaluator_fn(query: str, response: str) -> float:
return random.uniform(0.7, 1.0)
queries = [f"テスト質問 {i}" for i in range(200)]
report = runner.run_test(queries, model_a_fn, model_b_fn, evaluator_fn)
print(f"Winner: {report.winner}")
print(f"Confidence: {report.confidence:.2%}")
print(f"Variant A avg score: {report.variant_a['avg_score']:.4f}")
print(f"Variant B avg score: {report.variant_b['avg_score']:.4f}")
パターン5:ヒューマン評価プラットフォーム
なぜヒューマン評価が必要か?
自動評価指標では全ての品質次元を捉えられない。流暢さ、有用性、安全性、微妙な事実誤り——これらはすべて人間の判断が必要である。ヒューマン評価はモデル評価の「ゴールドスタンダード」である。
ヒューマン評価プラットフォーム実装
# human_eval_platform.py
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime
import json
import uuid
import statistics
class EvalTaskType(Enum):
SINGLE_RESPONSE = "single_response"
PAIRWISE_COMPARISON = "pairwise_comparison"
RANKING = "ranking"
ERROR_ANNOTATION = "error_annotation"
class QualityDimension(Enum):
FACTUAL_ACCURACY = "factual_accuracy"
RELEVANCE = "relevance"
COHERENCE = "coherence"
FLUENCY = "fluency"
SAFETY = "safety"
HELPFULNESS = "helpfulness"
@dataclass
class EvalTask:
task_id: str
task_type: EvalTaskType
question: str
responses: List[str]
quality_dimensions: List[QualityDimension]
guidelines: str = ""
metadata: Dict = field(default_factory=dict)
@dataclass
class AnnotatorResult:
task_id: str
annotator_id: str
scores: Dict[str, float]
preference: Optional[int] = None
comments: str = ""
duration_seconds: float = 0.0
timestamp: str = ""
@dataclass
class InterAnnotatorAgreement:
dimension: str
cohens_kappa: float
fleiss_kappa: float
agreement_rate: float
class HumanEvalPlatform:
def __init__(self):
self.tasks: Dict[str, EvalTask] = {}
self.results: Dict[str, List[AnnotatorResult]] = {}
self.annotator_stats: Dict[str, Dict] = {}
def create_task(
self,
question: str,
responses: List[str],
task_type: EvalTaskType = EvalTaskType.SINGLE_RESPONSE,
quality_dimensions: Optional[List[QualityDimension]] = None,
guidelines: str = "",
) -> EvalTask:
task_id = str(uuid.uuid4())[:8]
task = EvalTask(
task_id=task_id,
task_type=task_type,
question=question,
responses=responses,
quality_dimensions=quality_dimensions or [
QualityDimension.FACTUAL_ACCURACY,
QualityDimension.RELEVANCE,
QualityDimension.COHERENCE,
],
guidelines=guidelines,
)
self.tasks[task_id] = task
self.results[task_id] = []
return task
def submit_annotation(self, result: AnnotatorResult):
if result.task_id not in self.results:
raise ValueError(f"Task {result.task_id} not found")
result.timestamp = datetime.now().isoformat()
self.results[result.task_id].append(result)
self._update_annotator_stats(result)
def get_next_task(self, annotator_id: str) -> Optional[EvalTask]:
for task_id, task in self.tasks.items():
existing_annotators = {r.annotator_id for r in self.results[task_id]}
if annotator_id not in existing_annotators and len(existing_annotators) < 3:
return task
return None
def compute_agreement(self, task_ids: Optional[List[str]] = None) -> List[InterAnnotatorAgreement]:
target_tasks = task_ids or list(self.tasks.keys())
agreements = []
all_dimensions = set()
for task_id in target_tasks:
for result in self.results.get(task_id, []):
all_dimensions.update(result.scores.keys())
for dimension in all_dimensions:
scores_by_task = {}
for task_id in target_tasks:
task_results = self.results.get(task_id, [])
if len(task_results) >= 2:
scores_by_task[task_id] = [r.scores.get(dimension, 0) for r in task_results]
if not scores_by_task:
continue
agreement_rate = self._compute_pairwise_agreement(scores_by_task)
cohens_kappa = self._compute_cohens_kappa(scores_by_task)
fleiss_kappa = self._compute_fleiss_kappa(scores_by_task)
agreements.append(InterAnnotatorAgreement(
dimension=dimension,
cohens_kappa=round(cohens_kappa, 4),
fleiss_kappa=round(fleiss_kappa, 4),
agreement_rate=round(agreement_rate, 4),
))
return agreements
def generate_report(self) -> Dict:
all_scores = {}
for task_id, results in self.results.items():
for result in results:
for dim, score in result.scores.items():
if dim not in all_scores:
all_scores[dim] = []
all_scores[dim].append(score)
dimension_stats = {}
for dim, scores in all_scores.items():
dimension_stats[dim] = {
"mean": round(statistics.mean(scores), 4),
"median": round(statistics.median(scores), 4),
"stdev": round(statistics.stdev(scores), 4) if len(scores) > 1 else 0,
"count": len(scores),
}
pairwise_stats = {}
pairwise_tasks = [
(tid, t) for tid, t in self.tasks.items()
if t.task_type == EvalTaskType.PAIRWISE_COMPARISON
]
for task_id, task in pairwise_tasks:
prefs = [r.preference for r in self.results[task_id] if r.preference is not None]
if prefs:
pairwise_stats[task_id] = {
"response_a_wins": sum(1 for p in prefs if p == 0),
"response_b_wins": sum(1 for p in prefs if p == 1),
"total_votes": len(prefs),
}
return {
"total_tasks": len(self.tasks),
"total_annotations": sum(len(r) for r in self.results.values()),
"dimension_stats": dimension_stats,
"pairwise_stats": pairwise_stats,
"annotator_count": len(self.annotator_stats),
}
def _update_annotator_stats(self, result: AnnotatorResult):
aid = result.annotator_id
if aid not in self.annotator_stats:
self.annotator_stats[aid] = {"count": 0, "total_duration": 0}
self.annotator_stats[aid]["count"] += 1
self.annotator_stats[aid]["total_duration"] += result.duration_seconds
@staticmethod
def _compute_pairwise_agreement(scores_by_task: Dict) -> float:
agreements = 0
total = 0
for task_id, scores in scores_by_task.items():
for i in range(len(scores)):
for j in range(i + 1, len(scores)):
if abs(scores[i] - scores[j]) <= 1:
agreements += 1
total += 1
return agreements / total if total > 0 else 0
@staticmethod
def _compute_cohens_kappa(scores_by_task: Dict) -> float:
if len(scores_by_task) < 1:
return 0.0
all_pairs = []
for scores in scores_by_task.values():
if len(scores) >= 2:
all_pairs.append((scores[0], scores[1]))
if not all_pairs:
return 0.0
rater1 = [p[0] for p in all_pairs]
rater2 = [p[1] for p in all_pairs]
n = len(rater1)
agree = sum(1 for a, b in zip(rater1, rater2) if abs(a - b) <= 1)
p_observed = agree / n
p_expected = 0.2
return (p_observed - p_expected) / (1 - p_expected) if (1 - p_expected) != 0 else 0
@staticmethod
def _compute_fleiss_kappa(scores_by_task: Dict) -> float:
return 0.0
if __name__ == "__main__":
platform = HumanEvalPlatform()
task = platform.create_task(
question="量子コンピューティングの基本原理を説明してください",
responses=[
"量子コンピューティングは量子ビット(qubit)の重ね合わせと絡み合いを利用して並列計算を行います...",
"量子コンピューティングは量子力学の原理を利用する計算パラダイムで、量子ビットを通じて実現されます...",
],
task_type=EvalTaskType.PAIRWISE_COMPARISON,
quality_dimensions=[
QualityDimension.FACTUAL_ACCURACY,
QualityDimension.RELEVANCE,
QualityDimension.COHERENCE,
],
)
for annotator_id in ["ann_1", "ann_2", "ann_3"]:
result = AnnotatorResult(
task_id=task.task_id,
annotator_id=annotator_id,
scores={
"factual_accuracy": 4.0 + (hash(annotator_id) % 10) / 10,
"relevance": 3.5 + (hash(annotator_id) % 10) / 10,
"coherence": 4.0 + (hash(annotator_id) % 10) / 10,
},
preference=0 if hash(annotator_id) % 2 == 0 else 1,
duration_seconds=45.0,
)
platform.submit_annotation(result)
report = platform.generate_report()
print(json.dumps(report, indent=2, ensure_ascii=False))
agreements = platform.compute_agreement()
for a in agreements:
print(f"{a.dimension}: κ={a.cohens_kappa:.4f}, agreement={a.agreement_rate:.2%}")
パターン6:本番モニタリングとドリフト検出
なぜ本番モニタリングが必要か?
モデルのデプロイは終わりではなく、モニタリングの始まりである。データ分布の変化、ユーザー行動のシフト、モデルの劣化——これらの問題を早期に発見しないと、サイレントな品質低下につながる。
ドリフト検出システム実装
# production_monitor.py
import numpy as np
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import json
import logging
logger = logging.getLogger(__name__)
class DriftType(Enum):
DATA_DRIFT = "data_drift"
CONCEPT_DRIFT = "concept_drift"
PREDICTION_DRIFT = "prediction_drift"
class AlertLevel(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
@dataclass
class DriftAlert:
drift_type: DriftType
metric_name: str
current_value: float
baseline_value: float
drift_score: float
alert_level: AlertLevel
timestamp: str
message: str
@dataclass
class MonitoringWindow:
window_size: int = 1000
reference_size: int = 5000
class DataDriftDetector:
def __init__(
self,
reference_data: np.ndarray,
significance_level: float = 0.05,
):
self.reference_data = reference_data
self.significance_level = significance_level
self.reference_mean = np.mean(reference_data, axis=0)
self.reference_std = np.std(reference_data, axis=0)
def detect_ks_test(self, current_data: np.ndarray) -> Tuple[float, bool]:
from scipy.stats import ks_2samp
stat, p_value = ks_2samp(self.reference_data, current_data)
is_drift = p_value < self.significance_level
return p_value, is_drift
def detect_psi(self, current_data: np.ndarray, n_bins: int = 10) -> Tuple[float, bool]:
ref_hist, bin_edges = np.histogram(self.reference_data, bins=n_bins, density=True)
cur_hist, _ = np.histogram(current_data, bins=bin_edges, density=True)
ref_hist = ref_hist / ref_hist.sum()
cur_hist = cur_hist / cur_hist.sum()
ref_hist = np.clip(ref_hist, 1e-6, None)
cur_hist = np.clip(cur_hist, 1e-6, None)
psi = np.sum((cur_hist - ref_hist) * np.log(cur_hist / ref_hist))
is_drift = psi >= 0.1
return round(psi, 4), is_drift
def detect_z_score(self, current_data: np.ndarray, threshold: float = 3.0) -> Tuple[float, bool]:
current_mean = np.mean(current_data)
z_score = abs(current_mean - self.reference_mean) / (self.reference_std + 1e-8)
is_drift = z_score > threshold
return round(float(z_score), 4), is_drift
class PredictionDriftMonitor:
def __init__(
self,
reference_predictions: List[Dict],
window_size: int = 1000,
):
self.reference_predictions = reference_predictions
self.window_size = window_size
self.prediction_buffer: List[Dict] = []
self.alerts: List[DriftAlert] = []
def record_prediction(self, prediction: Dict):
self.prediction_buffer.append({
**prediction,
"timestamp": datetime.now().isoformat(),
})
if len(self.prediction_buffer) >= self.window_size:
self._check_drift()
self.prediction_buffer = self.prediction_buffer[-self.window_size // 2:]
def _check_drift(self):
ref_scores = [p.get("confidence", 0) for p in self.reference_predictions]
cur_scores = [p.get("confidence", 0) for p in self.prediction_buffer]
ref_arr = np.array(ref_scores)
cur_arr = np.array(cur_scores)
detector = DataDriftDetector(ref_arr)
psi_value, is_psi_drift = detector.detect_psi(cur_arr)
z_score, is_z_drift = detector.detect_z_score(cur_arr)
if is_psi_drift or is_z_drift:
level = AlertLevel.CRITICAL if psi_value > 0.25 else AlertLevel.WARNING
alert = DriftAlert(
drift_type=DriftType.PREDICTION_DRIFT,
metric_name="confidence_score",
current_value=float(np.mean(cur_arr)),
baseline_value=float(np.mean(ref_arr)),
drift_score=psi_value,
alert_level=level,
timestamp=datetime.now().isoformat(),
message=f"Prediction drift detected: PSI={psi_value:.4f}, Z-score={z_score:.4f}",
)
self.alerts.append(alert)
logger.warning(alert.message)
def get_health_report(self) -> Dict:
recent_alerts = [
a for a in self.alerts
if datetime.fromisoformat(a.timestamp) > datetime.now() - timedelta(hours=24)
]
return {
"total_predictions_monitored": len(self.prediction_buffer),
"alerts_last_24h": len(recent_alerts),
"critical_alerts": sum(1 for a in recent_alerts if a.alert_level == AlertLevel.CRITICAL),
"latest_drift_score": self.alerts[-1].drift_score if self.alerts else 0,
"status": "healthy" if not recent_alerts else "degraded",
}
class ModelPerformanceTracker:
def __init__(self, baseline_metrics: Dict[str, float]):
self.baseline_metrics = baseline_metrics
self.metric_history: List[Dict] = []
self.degradation_threshold = 0.05
def record_metrics(self, metrics: Dict[str, float]):
entry = {
"timestamp": datetime.now().isoformat(),
"metrics": metrics,
}
self.metric_history.append(entry)
self._check_degradation(metrics)
def _check_degradation(self, current_metrics: Dict[str, float]):
for metric_name, baseline_value in self.baseline_metrics.items():
current_value = current_metrics.get(metric_name)
if current_value is None:
continue
relative_change = (baseline_value - current_value) / baseline_value
if relative_change > self.degradation_threshold:
logger.warning(
f"Degradation detected: {metric_name} dropped from "
f"{baseline_value:.4f} to {current_value:.4f} "
f"({relative_change:.2%} decrease)"
)
def get_trend(self, metric_name: str, hours: int = 24) -> Dict:
cutoff = datetime.now() - timedelta(hours=hours)
recent = [
entry for entry in self.metric_history
if datetime.fromisoformat(entry["timestamp"]) > cutoff
and metric_name in entry["metrics"]
]
if not recent:
return {"trend": "no_data", "values": []}
values = [entry["metrics"][metric_name] for entry in recent]
trend = "stable"
if len(values) >= 3:
first_half = np.mean(values[:len(values)//2])
second_half = np.mean(values[len(values)//2:])
if second_half < first_half * 0.95:
trend = "declining"
elif second_half > first_half * 1.05:
trend = "improving"
return {
"trend": trend,
"current": values[-1],
"baseline": self.baseline_metrics.get(metric_name),
"values": values,
}
if __name__ == "__main__":
np.random.seed(42)
reference = np.random.normal(0.85, 0.05, 5000)
detector = DataDriftDetector(reference)
healthy_data = np.random.normal(0.84, 0.05, 1000)
drifted_data = np.random.normal(0.70, 0.08, 1000)
psi_healthy, drift_healthy = detector.detect_psi(healthy_data)
print(f"Healthy data: PSI={psi_healthy:.4f}, drift={drift_healthy}")
psi_drifted, drift_drifted = detector.detect_psi(drifted_data)
print(f"Drifted data: PSI={psi_drifted:.4f}, drift={drift_drifted}")
reference_preds = [{"confidence": np.random.uniform(0.8, 0.95)} for _ in range(5000)]
monitor = PredictionDriftMonitor(reference_preds, window_size=100)
for _ in range(50):
monitor.record_prediction({"confidence": np.random.uniform(0.8, 0.95)})
for _ in range(50):
monitor.record_prediction({"confidence": np.random.uniform(0.5, 0.7)})
health = monitor.get_health_report()
print(f"\nHealth Report: {json.dumps(health, indent=2)}")
5つのよくある落とし穴と解決策
落とし穴1:評価データのリーク
訓練データと評価データが重複し、評価スコアが不当に高くなる。
解決策:ハッシュベースの重複排除による厳格なデータ分離戦略。
def check_data_leakage(train_data: List[str], eval_data: List[str], threshold: float = 0.8) -> Dict:
from difflib import SequenceMatcher
leaks = []
for i, eval_item in enumerate(eval_data):
for train_item in train_data:
similarity = SequenceMatcher(None, eval_item, train_item).ratio()
if similarity > threshold:
leaks.append({"eval_index": i, "similarity": round(similarity, 4)})
break
return {"leak_count": len(leaks), "leak_rate": len(leaks) / len(eval_data)}
落とし穴2:評価指標とビジネス目標の乖離
MMLUで高スコアを出してもビジネスKPIが改善しない。
解決策:指標-ビジネスマッピングテーブルを構築し、評価指標とビジネス目標の整合性を確保。
def align_metrics_with_business(eval_metrics: Dict, business_kpis: Dict) -> List[str]:
alignment_map = {
"faithfulness": ["customer_satisfaction", "complaint_rate"],
"answer_relevancy": ["task_completion_rate", "user_engagement"],
"latency_p95": ["session_duration", "bounce_rate"],
}
misaligned = []
for metric in eval_metrics:
if metric not in alignment_map:
misaligned.append(f"Metric '{metric}' has no business KPI mapping")
return misaligned
落とし穴3:A/Bテストのサンプルサイズ不足
200件で結論を出し、統計的検出力が不足している。
解決策:必要なサンプルサイズを事前に計算する。
def calculate_sample_size(
baseline_rate: float,
minimum_detectable_effect: float,
alpha: float = 0.05,
power: float = 0.8,
) -> int:
import math
from scipy.stats import norm
z_alpha = norm.ppf(1 - alpha / 2)
z_beta = norm.ppf(power)
p1 = baseline_rate
p2 = baseline_rate * (1 + minimum_detectable_effect)
p_avg = (p1 + p2) / 2
n = (z_alpha * math.sqrt(2 * p_avg * (1 - p_avg)) +
z_beta * math.sqrt(p1 * (1 - p1) + p2 * (1 - p2))) ** 2 / (p2 - p1) ** 2
return math.ceil(n)
落とし穴4:アノテーター間一致度の無視
複数のアノテーターの結果が大きく異なるのに、平均を取ることで品質問題を隠蔽する。
解決策:Cohen's Kappaを計算し、0.6未満の次元は再アノテーションを行う。
落とし穴5:本番環境のドリフト検出の遅延
月次評価でのみモデルの劣化を発見し、影響が数週間持続している。
解決策:スライディングウィンドウ検出によるリアルタイムモニタリングと時間単位のアラート。
10のよくあるエラートラブルシューティング
| エラー現象 | 考えられる原因 | トラブルシューティング手順 | 解決策 |
|---|---|---|---|
| lm-eval OOM | batch_sizeが大きすぎるかモデルが大きすぎる | batch_sizeを減らし、FSDPを有効化 | batch_size=1 + device_map=auto |
| RAGAS faithfulness = 0 | contextsまたはanswerが空 | eval_dataのcontextsフィールドを確認 | contextsが空でなく関連性があることを確認 |
| pytestタイムアウト | モデル推論遅延が高すぎる | GPU使用率とバッチ設定を確認 | タイムアウトを増やすか推論設定を最適化 |
| A/Bテストが有意でない | サンプルサイズ不足または効果差が小さい | 統計的検出力と必要サンプルサイズを計算 | テスト期間を延長またはトラフィック比率を増加 |
| ヒューマン評価Kappa < 0.4 | アノテーションガイドラインが不明確 | ガイドラインを見直しキャリブレーションテストを実施 | 例とエッジケースの説明を追加 |
| PSI誤警報が頻発 | 参照データ分布が狭すぎる | 参照データの時間ウィンドウを拡大 | 30日間のデータをベースラインとして使用 |
| 評価結果が再現不可能 | ランダムシードが未設定 | グローバルランダムシードを設定 | torch.manual_seed(42) + numpy.random.seed(42) |
| 評価データフォーマットエラー | フィールド欠落または型の不一致 | スキーマでデータを検証 | Pydanticモデル検証を使用 |
| ドリフト検出遅延が高い | モニタリングウィンドウが大きすぎる | スライディングウィンドウを縮小 | 1000件から200件に減らす |
| 評価レポートの指標欠落 | 一部のmetric計算が失敗 | LLM API呼び出しのタイムアウトを確認 | リトライメカニズムとフォールバックを追加 |
高度な最適化テクニック
1. 次元横断評価
単一指標ではモデル品質を全面的に反映できない。次元横断評価マトリックスで弱点を発見:
class CrossDimensionEvaluator:
def __init__(self, dimensions: List[str]):
self.dimensions = dimensions
self.matrix = {d: {d2: [] for d2 in dimensions} for d in dimensions}
def evaluate_cross(self, samples: List[Dict], eval_fn) -> Dict:
for sample in samples:
scores = eval_fn(sample)
for d1 in self.dimensions:
for d2 in self.dimensions:
if d1 != d2:
self.matrix[d1][d2].append(scores.get(d1, 0) * scores.get(d2, 0))
correlation = {}
for d1 in self.dimensions:
for d2 in self.dimensions:
if d1 != d2 and self.matrix[d1][d2]:
import statistics
correlation[f"{d1}×{d2}"] = round(
statistics.mean(self.matrix[d1][d2]), 4
)
return correlation
2. 動的評価セット生成
静的評価セットは「スコア操作」の対象になりやすい。動的に評価問題を生成し、公平性を確保:
class DynamicEvalGenerator:
def __init__(self, llm_client):
self.llm = llm_client
def generate_eval_questions(
self,
domain: str,
difficulty: str = "medium",
count: int = 50,
) -> List[Dict]:
prompt = f"""{domain}ドメインで{difficulty}難度の評価問題を{count}問生成してください。
各問題にはquestion, choices, answer, explanationを含めてください。
JSON配列形式で返してください。"""
response = self.llm.chat.completions.create(
model="default",
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
)
return json.loads(response.choices[0].message.content)
3. 段階的評価戦略
異なるシーンで異なる評価粒度を使用し、コストと品質保証のバランスを取る:
| レベル | トリガー条件 | 評価範囲 | 評価時間 |
|---|---|---|---|
| L0 クイック検証 | 毎回のコミット | 50コアテストケース | <5分 |
| L1 標準評価 | デイリービルド | 500標準セット | ~30分 |
| L2 完全評価 | バージョンリリース | 5000完全セット | ~2時間 |
| L3 ヒューマンスポットチェック | メジャーリリース | 100件の専門家レビュー | ~1日 |
4. 評価結果のバージョン管理
class EvalVersionManager:
def __init__(self, store_path: str = "./eval_versions"):
self.store_path = store_path
import os
os.makedirs(store_path, exist_ok=True)
def save_version(
self,
model_version: str,
eval_results: Dict,
eval_config: Dict,
) -> str:
version_id = f"{model_version}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
record = {
"version_id": version_id,
"model_version": model_version,
"eval_results": eval_results,
"eval_config": eval_config,
"timestamp": datetime.now().isoformat(),
}
path = os.path.join(self.store_path, f"{version_id}.json")
with open(path, "w", encoding="utf-8") as f:
json.dump(record, f, indent=2, ensure_ascii=False)
return version_id
def compare_versions(self, version_a: str, version_b: str) -> Dict:
data_a = self._load_version(version_a)
data_b = self._load_version(version_b)
diff = {}
for metric in data_a["eval_results"]:
if metric in data_b["eval_results"]:
diff[metric] = {
"a": data_a["eval_results"][metric],
"b": data_b["eval_results"][metric],
"delta": round(
data_b["eval_results"][metric] - data_a["eval_results"][metric], 4
),
}
return diff
5. 評価パイプラインオーケストレーション
class EvalPipelineOrchestrator:
def __init__(self):
self.stages = []
def add_stage(self, name: str, eval_fn: Callable, gate_threshold: float = 0.0):
self.stages.append({
"name": name,
"eval_fn": eval_fn,
"gate_threshold": gate_threshold,
})
def run(self, model, dataset) -> Dict:
results = {}
for stage in self.stages:
print(f"Running stage: {stage['name']}")
score = stage["eval_fn"](model, dataset)
results[stage["name"]] = score
if score < stage["gate_threshold"]:
print(f"GATE FAILED: {stage['name']} score {score:.4f} < {stage['gate_threshold']}")
results["status"] = "gate_failed"
results["failed_stage"] = stage["name"]
return results
results["status"] = "passed"
return results
比較分析
| 評価パターン | 適用段階 | 評価次元 | 自動化レベル | コスト | 信頼性 | タイムリー性 |
|---|---|---|---|---|---|---|
| LLMベンチマーク | モデル選択/訓練 | 汎用能力 | 高 | 低 | 中 | 低 |
| RAG評価 | RAGシステム開発 | 検索+生成 | 高 | 中 | 高 | 中 |
| 自動テスト | CI/CDパイプライン | カスタム指標 | 高 | 低 | 高 | 高 |
| A/Bテスト | モデルデプロイ | ビジネス指標 | 中 | 高 | 高 | 低 |
| ヒューマン評価 | 品質保証 | 全次元 | 低 | 高 | 最高 | 最低 |
| 本番モニタリング | 運用 | ドリフト/劣化 | 高 | 中 | 中 | 最高 |
| ドリフト検出方法 | 適用シナリオ | 検出速度 | 誤検出率 | 最小サンプルサイズ |
|---|---|---|---|---|
| KS検定 | 連続データ分布 | 高速 | 中 | 100 |
| PSI | カテゴリ/連続型 | 高速 | 低 | 500 |
| Z-score | 平均シフト | 最速 | 高 | 30 |
| ADWIN | ストリーミングデータ | 中 | 低 | 200 |
| Page-Hinkley | 累積ドリフト | 中 | 中 | 100 |
おすすめオンラインツール
- JSONデータフォーマッター:/ja/json/format
- ハッシュエンコーディングツール:/ja/encode/hash
- Curl→コード変換:/ja/dev/curl-to-code
まとめ:AIモデル評価は「スコアを出す」だけではない——モデルライフサイクル全体をカバーするエンジニアリング体系である。6つのプロダクションパターンにはそれぞれの役割がある:ベンチマークは汎用能力を定量化し、RAG評価は検索・生成の弱点を特定し、自動テストはCI/CDの品質ゲートを確保し、A/Bテストはオンライン効果を検証し、ヒューマン評価は品質の上限を設定し、本番モニタリングはサイレント劣化を防ぐ。2026年、体系的な評価なしにAIシステムを運用することは、目隠しして飛ぶようなものである。
参考リンク
ブラウザローカルツールを無料で試す →