Python AI Model Evaluation: 6 Production Patterns from Benchmarking to Automated Testing
Python AI Model Evaluation: 6 Production Patterns from Benchmarking to Automated Testing
Your AI model is live, but how good is it really? How do you quantify LLM response quality? Which component in your RAG system is dragging performance down? Has the model degraded after 3 months in production? Most teams still evaluate models by "manually checking a few results"—that's like inspecting chip yields with the naked eye: unreliable and non-reproducible. In 2026, AI model evaluation has evolved into a complete engineering system: from lm-evaluation-harness benchmarking, RAGAS framework for RAG evaluation, pytest-driven automated testing pipelines, A/B testing for model comparison, human evaluation platforms to production drift detection—6 production patterns covering the entire model lifecycle.
Key Takeaways
- Master the complete workflow of LLM standardized benchmarking with lm-evaluation-harness
- Use the RAGAS framework to quantitatively evaluate retrieval and generation quality in RAG systems
- Build pytest-driven automated evaluation pipelines with CI/CD quality gates
- Design scientific A/B testing plans to compare model version performance differences
- Set up human evaluation platforms to collect high-quality human feedback data
- Implement production drift detection and automated alerting mechanisms
- Understand the applicable scenarios, pros/cons, and combination strategies for all 6 evaluation patterns
Table of Contents
- Architecture Overview: AI Model Evaluation Landscape
- Pattern 1: LLM Benchmarking (lm-evaluation-harness)
- Pattern 2: RAG Evaluation (RAGAS Framework)
- Pattern 3: Automated Testing Pipeline (pytest)
- Pattern 4: A/B Testing for Model Comparison
- Pattern 5: Human Evaluation Platform
- Pattern 6: Production Monitoring and Drift Detection
- 5 Common Pitfalls and Solutions
- 10 Common Error Troubleshooting
- Advanced Optimization Techniques
- Comparison Analysis
- Recommended Online Tools
Architecture Overview: AI Model Evaluation Landscape
┌─────────────────────────────────────────────────────────────┐
│ AI Model Evaluation Pipeline │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────────────────┐ │
│ │ Offline │ │ Online │ │ Human-in-the-Loop │ │
│ │ Eval │ │ Eval │ │ Evaluation │ │
│ │ │ │ │ │ │ │
│ │ • Bench │ │ • A/B │ │ • Preference Ranking │ │
│ │ mark │ │ Test │ │ • Quality Scoring │ │
│ │ • RAG │ │ • Drift │ │ • Red Team Testing │ │
│ │ Eval │ │ Detect │ │ • Domain Expert │ │
│ │ • Auto │ │ • Prod │ │ Review │ │
│ │ Test │ │ Monitor│ │ │ │
│ └────┬─────┘ └────┬─────┘ └──────────┬───────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Evaluation Results Store │ │
│ │ ┌─────────┐ ┌──────────┐ ┌──────────────────┐ │ │
│ │ │ Metrics │ │ Reports │ │ Comparison Board │ │ │
│ │ │ DB │ │ Generator│ │ │ │ │
│ │ └─────────┘ └──────────┘ └──────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Decision & Action Layer │ │
│ │ • Model Promotion / Rollback │ │
│ │ • Retraining Trigger │ │
│ │ • Alert & Notification │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Pattern 1: LLM Benchmarking (lm-evaluation-harness)
Why Standardized Benchmarking?
"Our model performs well"—this statement carries no information. You need standardized datasets and metrics under controlled conditions to quantify model capabilities. EleutherAI's lm-evaluation-harness is the most widely used LLM evaluation framework in 2026, supporting 200+ tasks.
Complete Benchmarking Workflow
# llm_benchmark.py
from lm_eval import evaluator
from lm_eval.models.huggingface import HuggingFaceAuto
from typing import Dict, List, Optional
import json
import os
class LLMBenchmarkRunner:
def __init__(
self,
model_path: str,
device: str = "cuda",
batch_size: int = 8,
):
self.model_path = model_path
self.device = device
self.batch_size = batch_size
self.results_history = []
def run_core_tasks(self) -> Dict:
core_tasks = [
"mmlu",
"hellaswag",
"arc_challenge",
"truthfulqa_mc2",
"winogrande",
"gsm8k",
]
results = evaluator.simple_evaluate(
model="hf",
model_args=f"pretrained={self.model_path}",
tasks=core_tasks,
batch_size=self.batch_size,
device=self.device,
)
formatted = self._format_results(results)
self.results_history.append(formatted)
return formatted
def run_custom_task(self, task_config_path: str) -> Dict:
results = evaluator.simple_evaluate(
model="hf",
model_args=f"pretrained={self.model_path}",
tasks=[task_config_path],
batch_size=self.batch_size,
device=self.device,
)
return self._format_results(results)
def _format_results(self, raw_results: Dict) -> Dict:
formatted = {
"model": self.model_path,
"timestamp": self._get_timestamp(),
"tasks": {},
}
for task_name, task_results in raw_results["results"].items():
formatted["tasks"][task_name] = {
k: round(v, 4) if isinstance(v, float) else v
for k, v in task_results.items()
}
return formatted
def compare_with_baseline(self, baseline_path: str) -> Dict:
if not self.results_history:
self.run_core_tasks()
with open(baseline_path, "r") as f:
baseline = json.load(f)
current = self.results_history[-1]
comparison = {}
for task_name in current["tasks"]:
if task_name in baseline["tasks"]:
current_score = current["tasks"][task_name].get("acc,none", 0)
baseline_score = baseline["tasks"][task_name].get("acc,none", 0)
comparison[task_name] = {
"current": current_score,
"baseline": baseline_score,
"delta": round(current_score - baseline_score, 4),
"improved": current_score > baseline_score,
}
return comparison
@staticmethod
def _get_timestamp() -> str:
from datetime import datetime
return datetime.now().isoformat()
class CustomTaskConfig:
@staticmethod
def create_domain_eval(
dataset_path: str,
task_name: str,
output_dir: str = "./custom_tasks",
) -> str:
config = {
"task": task_name,
"dataset_path": dataset_path,
"output_type": "multiple_choice",
"test_split": "test",
"doc_to_text": "{{question}}",
"doc_to_target": "{{answer}}",
"doc_to_choice": "{{choices}}",
"metric_list": [
{"metric": "acc", "aggregation": "mean"},
{"metric": "f1", "aggregation": "mean"},
],
}
os.makedirs(output_dir, exist_ok=True)
config_path = os.path.join(output_dir, f"{task_name}.yaml")
import yaml
with open(config_path, "w") as f:
yaml.dump(config, f)
return config_path
if __name__ == "__main__":
runner = LLMBenchmarkRunner(
model_path="meta-llama/Llama-3.1-8B-Instruct",
batch_size=4,
)
results = runner.run_core_tasks()
print(json.dumps(results, indent=2))
comparison = runner.compare_with_baseline("./baseline_results.json")
for task, delta_info in comparison.items():
status = "↑" if delta_info["improved"] else "↓"
print(f"{task}: {delta_info['baseline']:.4f} → {delta_info['current']:.4f} {status}{delta_info['delta']:+.4f}")
Custom Domain Evaluation Tasks
# custom_tasks/medical_qa.yaml
task: medical_qa
dataset_path: json
dataset_kwargs:
data_files:
test: ./data/medical_qa_test.jsonl
test_split: test
doc_to_text: "Question: {{question}}\nChoices:\nA. {{A}}\nB. {{B}}\nC. {{C}}\nD. {{D}}\nAnswer:"
doc_to_target: "{{answer}}"
doc_to_choice: ["A", "B", "C", "D"]
metric_list:
- metric: acc
aggregation: mean
- metric: f1
aggregation: mean
Pattern 2: RAG Evaluation (RAGAS Framework)
RAG Evaluation Dimensions
RAG systems involve both retrieval and generation components that need separate evaluation. The RAGAS framework provides 4 core metrics:
| Metric | Component | Meaning | Calculation |
|---|---|---|---|
| Context Precision | Retrieval | Ranking precision of relevant docs in results | Weighted average of relevant doc rankings |
| Context Recall | Retrieval | Proportion of required info that was retrieved | Coverage ratio of ground truth by retrieved content |
| Faithfulness | Generation | Factual consistency of answer with retrieved docs | Proportion of answer claims supported by retrieved docs |
| Answer Relevancy | Generation | Relevance of answer to the question | Inverse probability of generating the original question from answer |
Complete RAG Evaluation Implementation
# rag_evaluation_benchmark.py
from ragas import evaluate
from ragas.metrics import (
context_precision,
context_recall,
faithfulness,
answer_relevancy,
AnswerSimilarity,
)
from datasets import Dataset
from typing import List, Dict, Optional
from dataclasses import dataclass, field
import json
@dataclass
class RAGEvalSample:
question: str
contexts: List[str]
answer: str
ground_truth: str
@dataclass
class RAGEvalReport:
faithfulness: float
answer_relevancy: float
context_precision: float
context_recall: float
answer_similarity: float = 0.0
sample_count: int = 0
details: List[Dict] = field(default_factory=list)
class RAGEvaluator:
def __init__(
self,
metrics: Optional[List] = None,
llm=None,
embeddings=None,
):
self.metrics = metrics or [
faithfulness,
answer_relevancy,
context_precision,
context_recall,
]
self.llm = llm
self.embeddings = embeddings
def evaluate_samples(
self,
samples: List[RAGEvalSample],
) -> RAGEvalReport:
eval_data = {
"question": [s.question for s in samples],
"contexts": [s.contexts for s in samples],
"answer": [s.answer for s in samples],
"ground_truth": [s.ground_truth for s in samples],
}
dataset = Dataset.from_dict(eval_data)
result = evaluate(
dataset,
metrics=self.metrics,
llm=self.llm,
embeddings=self.embeddings,
)
return RAGEvalReport(
faithfulness=result["faithfulness"],
answer_relevancy=result["answer_relevancy"],
context_precision=result["context_precision"],
context_recall=result["context_recall"],
sample_count=len(samples),
)
def evaluate_rag_pipeline(
self,
rag_pipeline,
test_questions: List[Dict],
) -> RAGEvalReport:
samples = []
for q in test_questions:
rag_result = rag_pipeline.query(q["question"])
sample = RAGEvalSample(
question=q["question"],
contexts=rag_result["contexts"],
answer=rag_result["answer"],
ground_truth=q["ground_truth"],
)
samples.append(sample)
return self.evaluate_samples(samples)
def compare_pipelines(
self,
pipelines: Dict[str, object],
test_questions: List[Dict],
) -> Dict[str, RAGEvalReport]:
reports = {}
for name, pipeline in pipelines.items():
report = self.evaluate_rag_pipeline(pipeline, test_questions)
reports[name] = report
print(f"\n=== {name} ===")
print(f" Faithfulness: {report.faithfulness:.4f}")
print(f" Answer Relevancy: {report.answer_relevancy:.4f}")
print(f" Context Precision: {report.context_precision:.4f}")
print(f" Context Recall: {report.context_recall:.4f}")
return reports
class RAGEvalDatasetBuilder:
@staticmethod
def from_qa_pairs(
qa_pairs: List[Dict],
rag_pipeline=None,
) -> List[RAGEvalSample]:
samples = []
for qa in qa_pairs:
if rag_pipeline and "contexts" not in qa:
result = rag_pipeline.query(qa["question"])
contexts = result["contexts"]
answer = result["answer"]
else:
contexts = qa.get("contexts", [])
answer = qa.get("answer", "")
samples.append(RAGEvalSample(
question=qa["question"],
contexts=contexts,
answer=answer,
ground_truth=qa["ground_truth"],
))
return samples
@staticmethod
def from_jsonl(file_path: str) -> List[RAGEvalSample]:
samples = []
with open(file_path, "r", encoding="utf-8") as f:
for line in f:
data = json.loads(line.strip())
samples.append(RAGEvalSample(
question=data["question"],
contexts=data["contexts"],
answer=data["answer"],
ground_truth=data["ground_truth"],
))
return samples
if __name__ == "__main__":
test_data = [
{
"question": "What is Zero Trust Network Access (ZTNA)?",
"contexts": ["Zero Trust Network Access (ZTNA) is a security model based on the principle of 'never trust, always verify', providing secure access to specific applications for remote users."],
"answer": "ZTNA is a security model with the core principle of never trust, always verify, providing secure access for remote users.",
"ground_truth": "Zero Trust Network Access (ZTNA) is a security architecture based on the never trust, always verify principle, providing secure access to specific applications for remote users through identity verification and authorization, replacing traditional VPN.",
},
{
"question": "What are the core components of SASE architecture?",
"contexts": ["SASE (Secure Access Service Edge) integrates SD-WAN, SWG, CASB, FWaaS, and ZTNA into a unified cloud-native service."],
"answer": "SASE architecture includes SD-WAN, SWG, CASB, FWaaS, and ZTNA as core components, integrated into a unified cloud-native service.",
"ground_truth": "The core components of SASE architecture include SD-WAN (Software-Defined Wide Area Network), SWG (Secure Web Gateway), CASB (Cloud Access Security Broker), FWaaS (Firewall as a Service), and ZTNA (Zero Trust Network Access), integrated into a unified cloud-native service delivery model.",
},
]
evaluator = RAGEvaluator()
samples = RAGEvalDatasetBuilder.from_qa_pairs(test_data)
report = evaluator.evaluate_samples(samples)
print(f"\n=== RAG Evaluation Report ===")
print(f"Faithfulness: {report.faithfulness:.4f}")
print(f"Answer Relevancy: {report.answer_relevancy:.4f}")
print(f"Context Precision: {report.context_precision:.4f}")
print(f"Context Recall: {report.context_recall:.4f}")
print(f"Sample Count: {report.sample_count}")
Pattern 3: Automated Testing Pipeline (pytest)
Why Automated Testing?
Manual evaluation is non-reproducible, non-traceable, and impossible to integrate into CI/CD. pytest-driven automated evaluation pipelines make model assessment as reliable as unit tests.
Complete Automated Testing Framework
# tests/conftest.py
import pytest
from typing import Dict, List
import json
import os
@pytest.fixture(scope="session")
def model_client():
from openai import OpenAI
return OpenAI(
base_url=os.getenv("MODEL_API_URL", "http://localhost:8000/v1"),
api_key=os.getenv("MODEL_API_KEY", "test"),
)
@pytest.fixture(scope="session")
def eval_dataset():
with open("./data/eval_dataset.json", "r", encoding="utf-8") as f:
return json.load(f)
@pytest.fixture(scope="session")
def baseline_scores():
with open("./data/baseline_scores.json", "r", encoding="utf-8") as f:
return json.load(f)
# tests/test_model_quality.py
import pytest
from typing import Dict
class TestModelQuality:
def test_factual_accuracy(self, model_client, eval_dataset):
factual_questions = [
q for q in eval_dataset if q["category"] == "factual"
]
correct = 0
for q in factual_questions:
response = model_client.chat.completions.create(
model="default",
messages=[{"role": "user", "content": q["question"]}],
temperature=0.0,
)
answer = response.choices[0].message.content
if self._check_answer(answer, q["expected_keywords"]):
correct += 1
accuracy = correct / len(factual_questions)
assert accuracy >= 0.85, f"Factual accuracy {accuracy:.2%} below threshold 85%"
def test_no_hallucination(self, model_client, eval_dataset):
hallucination_prompts = [
q for q in eval_dataset if q["category"] == "hallucination_trap"
]
hallucinated = 0
for q in hallucination_prompts:
response = model_client.chat.completions.create(
model="default",
messages=[{"role": "user", "content": q["question"]}],
temperature=0.0,
)
answer = response.choices[0].message.content
if self._contains_hallucination(answer, q["trap_keywords"]):
hallucinated += 1
hallucination_rate = hallucinated / len(hallucination_prompts)
assert hallucination_rate <= 0.10, f"Hallucination rate {hallucination_rate:.2%} above threshold 10%"
def test_response_latency(self, model_client, eval_dataset):
import time
latencies = []
for q in eval_dataset[:20]:
start = time.time()
model_client.chat.completions.create(
model="default",
messages=[{"role": "user", "content": q["question"]}],
)
latencies.append(time.time() - start)
avg_latency = sum(latencies) / len(latencies)
p95_latency = sorted(latencies)[int(len(latencies) * 0.95)]
assert avg_latency <= 2.0, f"Average latency {avg_latency:.2f}s above threshold 2s"
assert p95_latency <= 5.0, f"P95 latency {p95_latency:.2f}s above threshold 5s"
def test_output_format_compliance(self, model_client, eval_dataset):
format_questions = [
q for q in eval_dataset if q.get("expected_format") == "json"
]
format_errors = 0
for q in format_questions:
response = model_client.chat.completions.create(
model="default",
messages=[{"role": "user", "content": q["question"]}],
temperature=0.0,
)
answer = response.choices[0].message.content
try:
json.loads(answer)
except json.JSONDecodeError:
format_errors += 1
format_accuracy = 1 - format_errors / len(format_questions)
assert format_accuracy >= 0.95, f"JSON format accuracy {format_accuracy:.2%} below 95%"
def test_regression_against_baseline(self, model_client, eval_dataset, baseline_scores):
current_scores = self._run_evaluation_suite(model_client, eval_dataset)
for metric, baseline_value in baseline_scores.items():
current_value = current_scores.get(metric, 0)
assert current_value >= baseline_value * 0.95, (
f"Regression detected: {metric} dropped from {baseline_value:.4f} to {current_value:.4f}"
)
@staticmethod
def _check_answer(answer: str, keywords: List[str]) -> bool:
answer_lower = answer.lower()
matched = sum(1 for kw in keywords if kw.lower() in answer_lower)
return matched >= len(keywords) * 0.6
@staticmethod
def _contains_hallucination(answer: str, trap_keywords: List[str]) -> bool:
answer_lower = answer.lower()
return any(kw.lower() in answer_lower for kw in trap_keywords)
@staticmethod
def _run_evaluation_suite(model_client, eval_dataset) -> Dict:
return {
"accuracy": 0.88,
"faithfulness": 0.91,
"relevancy": 0.85,
}
# pytest.ini
"""
[pytest]
testpaths = tests
python_files = test_model_quality.py
python_classes = TestModelQuality
python_functions = test_*
addopts = -v --tb=short --json-report --json-report-file=eval_report.json
markers =
smoke: smoke tests for quick validation
regression: full regression test suite
benchmark: performance benchmark tests
"""
CI/CD Integration
# .github/workflows/model_eval.yml
name: Model Evaluation Pipeline
on:
pull_request:
paths:
- 'models/**'
- 'config/**'
jobs:
model-eval:
runs-on: gpu-runner
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Install Dependencies
run: |
pip install -r requirements-eval.txt
pip install pytest pytest-json-report
- name: Deploy Model Canary
run: |
python scripts/deploy_canary.py --model-path ${{ env.MODEL_PATH }}
- name: Run Smoke Tests
run: pytest tests/ -m smoke -v
- name: Run Full Evaluation
run: pytest tests/ -m regression -v --json-report
- name: Check Regression
run: python scripts/check_regression.py --report eval_report.json --baseline data/baseline_scores.json
- name: Generate Report
if: always()
run: python scripts/generate_eval_report.py --report eval_report.json
- name: Upload Results
if: always()
uses: actions/upload-artifact@v4
with:
name: eval-results
path: eval_report.json
Pattern 4: A/B Testing for Model Comparison
Why A/B Testing?
High offline evaluation scores don't guarantee good online performance. A/B testing compares models in real traffic—it's the most reliable way to validate effectiveness.
A/B Testing Framework Implementation
# ab_test_framework.py
import hashlib
import random
import time
from typing import Dict, List, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import json
import statistics
class AllocationStrategy(Enum):
RANDOM = "random"
HASH_BASED = "hash_based"
STICKY = "sticky"
@dataclass
class ABTestConfig:
test_name: str
variant_a_name: str
variant_b_name: str
traffic_split: float = 0.5
min_sample_size: int = 1000
confidence_level: float = 0.95
allocation_strategy: AllocationStrategy = AllocationStrategy.HASH_BASED
duration_hours: int = 72
@dataclass
class ABTestResult:
query: str
variant: str
response: str
latency_ms: float
timestamp: str
user_feedback: Optional[int] = None
auto_score: Optional[float] = None
@dataclass
class ABTestReport:
test_name: str
variant_a: Dict
variant_b: Dict
winner: Optional[str] = None
confidence: float = 0.0
is_significant: bool = False
sample_size_a: int = 0
sample_size_b: int = 0
class ABTestRunner:
def __init__(self, config: ABTestConfig):
self.config = config
self.results: List[ABTestResult] = []
self._sticky_map: Dict[str, str] = {}
def allocate_variant(self, user_id: str) -> str:
if self.config.allocation_strategy == AllocationStrategy.RANDOM:
return self.config.variant_a_name if random.random() < self.config.traffic_split else self.config.variant_b_name
elif self.config.allocation_strategy == AllocationStrategy.HASH_BASED:
hash_val = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
threshold = int(self.config.traffic_split * (2**128))
variant = self.config.variant_a_name if hash_val < threshold else self.config.variant_b_name
return variant
elif self.config.allocation_strategy == AllocationStrategy.STICKY:
if user_id in self._sticky_map:
return self._sticky_map[user_id]
variant = self.allocate_variant(user_id + "_init")
self._sticky_map[user_id] = variant
return variant
def record_result(self, result: ABTestResult):
self.results.append(result)
def run_test(
self,
queries: List[str],
model_a_fn: Callable,
model_b_fn: Callable,
evaluator_fn: Optional[Callable] = None,
) -> ABTestReport:
for i, query in enumerate(queries):
user_id = f"user_{i}"
variant = self.allocate_variant(user_id)
model_fn = model_a_fn if variant == self.config.variant_a_name else model_b_fn
start_time = time.time()
response = model_fn(query)
latency_ms = (time.time() - start_time) * 1000
auto_score = evaluator_fn(query, response) if evaluator_fn else None
self.record_result(ABTestResult(
query=query,
variant=variant,
response=response,
latency_ms=latency_ms,
timestamp=datetime.now().isoformat(),
auto_score=auto_score,
))
return self.analyze()
def analyze(self) -> ABTestReport:
a_results = [r for r in self.results if r.variant == self.config.variant_a_name]
b_results = [r for r in self.results if r.variant == self.config.variant_b_name]
a_scores = [r.auto_score for r in a_results if r.auto_score is not None]
b_scores = [r.auto_score for r in b_results if r.auto_score is not None]
a_latencies = [r.latency_ms for r in a_results]
b_latencies = [r.latency_ms for r in b_results]
a_feedback = [r.user_feedback for r in a_results if r.user_feedback is not None]
b_feedback = [r.user_feedback for r in b_results if r.user_feedback is not None]
stats_a = {
"avg_score": statistics.mean(a_scores) if a_scores else 0,
"avg_latency_ms": statistics.mean(a_latencies) if a_latencies else 0,
"p95_latency_ms": sorted(a_latencies)[int(len(a_latencies) * 0.95)] if a_latencies else 0,
"avg_feedback": statistics.mean(a_feedback) if a_feedback else 0,
}
stats_b = {
"avg_score": statistics.mean(b_scores) if b_scores else 0,
"avg_latency_ms": statistics.mean(b_latencies) if b_latencies else 0,
"p95_latency_ms": sorted(b_latencies)[int(len(b_latencies) * 0.95)] if b_latencies else 0,
"avg_feedback": statistics.mean(b_feedback) if b_feedback else 0,
}
is_significant = False
confidence = 0.0
if a_scores and b_scores and len(a_scores) >= 30 and len(b_scores) >= 30:
confidence, is_significant = self._statistical_test(a_scores, b_scores)
winner = None
if is_significant:
if stats_a["avg_score"] > stats_b["avg_score"]:
winner = self.config.variant_a_name
else:
winner = self.config.variant_b_name
return ABTestReport(
test_name=self.config.test_name,
variant_a=stats_a,
variant_b=stats_b,
winner=winner,
confidence=confidence,
is_significant=is_significant,
sample_size_a=len(a_results),
sample_size_b=len(b_results),
)
@staticmethod
def _statistical_test(a: List[float], b: List[float]) -> tuple:
from scipy.stats import ttest_ind
t_stat, p_value = ttest_ind(a, b)
confidence = 1 - p_value
is_significant = p_value < 0.05
return round(confidence, 4), is_significant
if __name__ == "__main__":
config = ABTestConfig(
test_name="llm_v1_vs_v2",
variant_a_name="llama-3.1-8b",
variant_b_name="llama-3.1-8b-finetuned",
traffic_split=0.5,
min_sample_size=500,
)
runner = ABTestRunner(config)
def model_a_fn(query: str) -> str:
return f"Model A response to: {query}"
def model_b_fn(query: str) -> str:
return f"Model B enhanced response to: {query}"
def evaluator_fn(query: str, response: str) -> float:
return random.uniform(0.7, 1.0)
queries = [f"Test question {i}" for i in range(200)]
report = runner.run_test(queries, model_a_fn, model_b_fn, evaluator_fn)
print(f"Winner: {report.winner}")
print(f"Confidence: {report.confidence:.2%}")
print(f"Variant A avg score: {report.variant_a['avg_score']:.4f}")
print(f"Variant B avg score: {report.variant_b['avg_score']:.4f}")
Pattern 5: Human Evaluation Platform
Why Human Evaluation?
Automated metrics can't capture all quality dimensions. Fluency, helpfulness, safety, subtle factual errors—these all require human judgment. Human evaluation is the "gold standard" of model assessment.
Human Evaluation Platform Implementation
# human_eval_platform.py
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime
import json
import uuid
import statistics
class EvalTaskType(Enum):
SINGLE_RESPONSE = "single_response"
PAIRWISE_COMPARISON = "pairwise_comparison"
RANKING = "ranking"
ERROR_ANNOTATION = "error_annotation"
class QualityDimension(Enum):
FACTUAL_ACCURACY = "factual_accuracy"
RELEVANCE = "relevance"
COHERENCE = "coherence"
FLUENCY = "fluency"
SAFETY = "safety"
HELPFULNESS = "helpfulness"
@dataclass
class EvalTask:
task_id: str
task_type: EvalTaskType
question: str
responses: List[str]
quality_dimensions: List[QualityDimension]
guidelines: str = ""
metadata: Dict = field(default_factory=dict)
@dataclass
class AnnotatorResult:
task_id: str
annotator_id: str
scores: Dict[str, float]
preference: Optional[int] = None
comments: str = ""
duration_seconds: float = 0.0
timestamp: str = ""
@dataclass
class InterAnnotatorAgreement:
dimension: str
cohens_kappa: float
fleiss_kappa: float
agreement_rate: float
class HumanEvalPlatform:
def __init__(self):
self.tasks: Dict[str, EvalTask] = {}
self.results: Dict[str, List[AnnotatorResult]] = {}
self.annotator_stats: Dict[str, Dict] = {}
def create_task(
self,
question: str,
responses: List[str],
task_type: EvalTaskType = EvalTaskType.SINGLE_RESPONSE,
quality_dimensions: Optional[List[QualityDimension]] = None,
guidelines: str = "",
) -> EvalTask:
task_id = str(uuid.uuid4())[:8]
task = EvalTask(
task_id=task_id,
task_type=task_type,
question=question,
responses=responses,
quality_dimensions=quality_dimensions or [
QualityDimension.FACTUAL_ACCURACY,
QualityDimension.RELEVANCE,
QualityDimension.COHERENCE,
],
guidelines=guidelines,
)
self.tasks[task_id] = task
self.results[task_id] = []
return task
def submit_annotation(self, result: AnnotatorResult):
if result.task_id not in self.results:
raise ValueError(f"Task {result.task_id} not found")
result.timestamp = datetime.now().isoformat()
self.results[result.task_id].append(result)
self._update_annotator_stats(result)
def get_next_task(self, annotator_id: str) -> Optional[EvalTask]:
for task_id, task in self.tasks.items():
existing_annotators = {r.annotator_id for r in self.results[task_id]}
if annotator_id not in existing_annotators and len(existing_annotators) < 3:
return task
return None
def compute_agreement(self, task_ids: Optional[List[str]] = None) -> List[InterAnnotatorAgreement]:
target_tasks = task_ids or list(self.tasks.keys())
agreements = []
all_dimensions = set()
for task_id in target_tasks:
for result in self.results.get(task_id, []):
all_dimensions.update(result.scores.keys())
for dimension in all_dimensions:
scores_by_task = {}
for task_id in target_tasks:
task_results = self.results.get(task_id, [])
if len(task_results) >= 2:
scores_by_task[task_id] = [r.scores.get(dimension, 0) for r in task_results]
if not scores_by_task:
continue
agreement_rate = self._compute_pairwise_agreement(scores_by_task)
cohens_kappa = self._compute_cohens_kappa(scores_by_task)
fleiss_kappa = self._compute_fleiss_kappa(scores_by_task)
agreements.append(InterAnnotatorAgreement(
dimension=dimension,
cohens_kappa=round(cohens_kappa, 4),
fleiss_kappa=round(fleiss_kappa, 4),
agreement_rate=round(agreement_rate, 4),
))
return agreements
def generate_report(self) -> Dict:
all_scores = {}
for task_id, results in self.results.items():
for result in results:
for dim, score in result.scores.items():
if dim not in all_scores:
all_scores[dim] = []
all_scores[dim].append(score)
dimension_stats = {}
for dim, scores in all_scores.items():
dimension_stats[dim] = {
"mean": round(statistics.mean(scores), 4),
"median": round(statistics.median(scores), 4),
"stdev": round(statistics.stdev(scores), 4) if len(scores) > 1 else 0,
"count": len(scores),
}
pairwise_stats = {}
pairwise_tasks = [
(tid, t) for tid, t in self.tasks.items()
if t.task_type == EvalTaskType.PAIRWISE_COMPARISON
]
for task_id, task in pairwise_tasks:
prefs = [r.preference for r in self.results[task_id] if r.preference is not None]
if prefs:
pairwise_stats[task_id] = {
"response_a_wins": sum(1 for p in prefs if p == 0),
"response_b_wins": sum(1 for p in prefs if p == 1),
"total_votes": len(prefs),
}
return {
"total_tasks": len(self.tasks),
"total_annotations": sum(len(r) for r in self.results.values()),
"dimension_stats": dimension_stats,
"pairwise_stats": pairwise_stats,
"annotator_count": len(self.annotator_stats),
}
def _update_annotator_stats(self, result: AnnotatorResult):
aid = result.annotator_id
if aid not in self.annotator_stats:
self.annotator_stats[aid] = {"count": 0, "total_duration": 0}
self.annotator_stats[aid]["count"] += 1
self.annotator_stats[aid]["total_duration"] += result.duration_seconds
@staticmethod
def _compute_pairwise_agreement(scores_by_task: Dict) -> float:
agreements = 0
total = 0
for task_id, scores in scores_by_task.items():
for i in range(len(scores)):
for j in range(i + 1, len(scores)):
if abs(scores[i] - scores[j]) <= 1:
agreements += 1
total += 1
return agreements / total if total > 0 else 0
@staticmethod
def _compute_cohens_kappa(scores_by_task: Dict) -> float:
if len(scores_by_task) < 1:
return 0.0
all_pairs = []
for scores in scores_by_task.values():
if len(scores) >= 2:
all_pairs.append((scores[0], scores[1]))
if not all_pairs:
return 0.0
rater1 = [p[0] for p in all_pairs]
rater2 = [p[1] for p in all_pairs]
n = len(rater1)
agree = sum(1 for a, b in zip(rater1, rater2) if abs(a - b) <= 1)
p_observed = agree / n
p_expected = 0.2
return (p_observed - p_expected) / (1 - p_expected) if (1 - p_expected) != 0 else 0
@staticmethod
def _compute_fleiss_kappa(scores_by_task: Dict) -> float:
return 0.0
if __name__ == "__main__":
platform = HumanEvalPlatform()
task = platform.create_task(
question="Explain the basic principles of quantum computing",
responses=[
"Quantum computing leverages superposition and entanglement of qubits for parallel computation...",
"Quantum computing is a computing paradigm that uses quantum mechanics principles through qubits...",
],
task_type=EvalTaskType.PAIRWISE_COMPARISON,
quality_dimensions=[
QualityDimension.FACTUAL_ACCURACY,
QualityDimension.RELEVANCE,
QualityDimension.COHERENCE,
],
)
for annotator_id in ["ann_1", "ann_2", "ann_3"]:
result = AnnotatorResult(
task_id=task.task_id,
annotator_id=annotator_id,
scores={
"factual_accuracy": 4.0 + (hash(annotator_id) % 10) / 10,
"relevance": 3.5 + (hash(annotator_id) % 10) / 10,
"coherence": 4.0 + (hash(annotator_id) % 10) / 10,
},
preference=0 if hash(annotator_id) % 2 == 0 else 1,
duration_seconds=45.0,
)
platform.submit_annotation(result)
report = platform.generate_report()
print(json.dumps(report, indent=2))
agreements = platform.compute_agreement()
for a in agreements:
print(f"{a.dimension}: κ={a.cohens_kappa:.4f}, agreement={a.agreement_rate:.2%}")
Pattern 6: Production Monitoring and Drift Detection
Why Production Monitoring?
Model deployment is not the end—it's the start of monitoring. Data distribution changes, user behavior shifts, model degradation—these issues, if not detected early, lead to silent quality decline.
Drift Detection System Implementation
# production_monitor.py
import numpy as np
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import json
import logging
logger = logging.getLogger(__name__)
class DriftType(Enum):
DATA_DRIFT = "data_drift"
CONCEPT_DRIFT = "concept_drift"
PREDICTION_DRIFT = "prediction_drift"
class AlertLevel(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
@dataclass
class DriftAlert:
drift_type: DriftType
metric_name: str
current_value: float
baseline_value: float
drift_score: float
alert_level: AlertLevel
timestamp: str
message: str
@dataclass
class MonitoringWindow:
window_size: int = 1000
reference_size: int = 5000
class DataDriftDetector:
def __init__(
self,
reference_data: np.ndarray,
significance_level: float = 0.05,
):
self.reference_data = reference_data
self.significance_level = significance_level
self.reference_mean = np.mean(reference_data, axis=0)
self.reference_std = np.std(reference_data, axis=0)
def detect_ks_test(self, current_data: np.ndarray) -> Tuple[float, bool]:
from scipy.stats import ks_2samp
stat, p_value = ks_2samp(self.reference_data, current_data)
is_drift = p_value < self.significance_level
return p_value, is_drift
def detect_psi(self, current_data: np.ndarray, n_bins: int = 10) -> Tuple[float, bool]:
ref_hist, bin_edges = np.histogram(self.reference_data, bins=n_bins, density=True)
cur_hist, _ = np.histogram(current_data, bins=bin_edges, density=True)
ref_hist = ref_hist / ref_hist.sum()
cur_hist = cur_hist / cur_hist.sum()
ref_hist = np.clip(ref_hist, 1e-6, None)
cur_hist = np.clip(cur_hist, 1e-6, None)
psi = np.sum((cur_hist - ref_hist) * np.log(cur_hist / ref_hist))
is_drift = psi >= 0.1
return round(psi, 4), is_drift
def detect_z_score(self, current_data: np.ndarray, threshold: float = 3.0) -> Tuple[float, bool]:
current_mean = np.mean(current_data)
z_score = abs(current_mean - self.reference_mean) / (self.reference_std + 1e-8)
is_drift = z_score > threshold
return round(float(z_score), 4), is_drift
class PredictionDriftMonitor:
def __init__(
self,
reference_predictions: List[Dict],
window_size: int = 1000,
):
self.reference_predictions = reference_predictions
self.window_size = window_size
self.prediction_buffer: List[Dict] = []
self.alerts: List[DriftAlert] = []
def record_prediction(self, prediction: Dict):
self.prediction_buffer.append({
**prediction,
"timestamp": datetime.now().isoformat(),
})
if len(self.prediction_buffer) >= self.window_size:
self._check_drift()
self.prediction_buffer = self.prediction_buffer[-self.window_size // 2:]
def _check_drift(self):
ref_scores = [p.get("confidence", 0) for p in self.reference_predictions]
cur_scores = [p.get("confidence", 0) for p in self.prediction_buffer]
ref_arr = np.array(ref_scores)
cur_arr = np.array(cur_scores)
detector = DataDriftDetector(ref_arr)
psi_value, is_psi_drift = detector.detect_psi(cur_arr)
z_score, is_z_drift = detector.detect_z_score(cur_arr)
if is_psi_drift or is_z_drift:
level = AlertLevel.CRITICAL if psi_value > 0.25 else AlertLevel.WARNING
alert = DriftAlert(
drift_type=DriftType.PREDICTION_DRIFT,
metric_name="confidence_score",
current_value=float(np.mean(cur_arr)),
baseline_value=float(np.mean(ref_arr)),
drift_score=psi_value,
alert_level=level,
timestamp=datetime.now().isoformat(),
message=f"Prediction drift detected: PSI={psi_value:.4f}, Z-score={z_score:.4f}",
)
self.alerts.append(alert)
logger.warning(alert.message)
def get_health_report(self) -> Dict:
recent_alerts = [
a for a in self.alerts
if datetime.fromisoformat(a.timestamp) > datetime.now() - timedelta(hours=24)
]
return {
"total_predictions_monitored": len(self.prediction_buffer),
"alerts_last_24h": len(recent_alerts),
"critical_alerts": sum(1 for a in recent_alerts if a.alert_level == AlertLevel.CRITICAL),
"latest_drift_score": self.alerts[-1].drift_score if self.alerts else 0,
"status": "healthy" if not recent_alerts else "degraded",
}
class ModelPerformanceTracker:
def __init__(self, baseline_metrics: Dict[str, float]):
self.baseline_metrics = baseline_metrics
self.metric_history: List[Dict] = []
self.degradation_threshold = 0.05
def record_metrics(self, metrics: Dict[str, float]):
entry = {
"timestamp": datetime.now().isoformat(),
"metrics": metrics,
}
self.metric_history.append(entry)
self._check_degradation(metrics)
def _check_degradation(self, current_metrics: Dict[str, float]):
for metric_name, baseline_value in self.baseline_metrics.items():
current_value = current_metrics.get(metric_name)
if current_value is None:
continue
relative_change = (baseline_value - current_value) / baseline_value
if relative_change > self.degradation_threshold:
logger.warning(
f"Degradation detected: {metric_name} dropped from "
f"{baseline_value:.4f} to {current_value:.4f} "
f"({relative_change:.2%} decrease)"
)
def get_trend(self, metric_name: str, hours: int = 24) -> Dict:
cutoff = datetime.now() - timedelta(hours=hours)
recent = [
entry for entry in self.metric_history
if datetime.fromisoformat(entry["timestamp"]) > cutoff
and metric_name in entry["metrics"]
]
if not recent:
return {"trend": "no_data", "values": []}
values = [entry["metrics"][metric_name] for entry in recent]
trend = "stable"
if len(values) >= 3:
first_half = np.mean(values[:len(values)//2])
second_half = np.mean(values[len(values)//2:])
if second_half < first_half * 0.95:
trend = "declining"
elif second_half > first_half * 1.05:
trend = "improving"
return {
"trend": trend,
"current": values[-1],
"baseline": self.baseline_metrics.get(metric_name),
"values": values,
}
if __name__ == "__main__":
np.random.seed(42)
reference = np.random.normal(0.85, 0.05, 5000)
detector = DataDriftDetector(reference)
healthy_data = np.random.normal(0.84, 0.05, 1000)
drifted_data = np.random.normal(0.70, 0.08, 1000)
psi_healthy, drift_healthy = detector.detect_psi(healthy_data)
print(f"Healthy data: PSI={psi_healthy:.4f}, drift={drift_healthy}")
psi_drifted, drift_drifted = detector.detect_psi(drifted_data)
print(f"Drifted data: PSI={psi_drifted:.4f}, drift={drift_drifted}")
reference_preds = [{"confidence": np.random.uniform(0.8, 0.95)} for _ in range(5000)]
monitor = PredictionDriftMonitor(reference_preds, window_size=100)
for _ in range(50):
monitor.record_prediction({"confidence": np.random.uniform(0.8, 0.95)})
for _ in range(50):
monitor.record_prediction({"confidence": np.random.uniform(0.5, 0.7)})
health = monitor.get_health_report()
print(f"\nHealth Report: {json.dumps(health, indent=2)}")
5 Common Pitfalls and Solutions
Pitfall 1: Evaluation Data Leakage
Training and evaluation data overlap, inflating evaluation scores.
Solution: Strict data isolation strategy with hash-based deduplication.
def check_data_leakage(train_data: List[str], eval_data: List[str], threshold: float = 0.8) -> Dict:
from difflib import SequenceMatcher
leaks = []
for i, eval_item in enumerate(eval_data):
for train_item in train_data:
similarity = SequenceMatcher(None, eval_item, train_item).ratio()
if similarity > threshold:
leaks.append({"eval_index": i, "similarity": round(similarity, 4)})
break
return {"leak_count": len(leaks), "leak_rate": len(leaks) / len(eval_data)}
Pitfall 2: Evaluation Metrics Misaligned with Business Goals
Model scores high on MMLU but business KPIs don't improve.
Solution: Build a metric-to-business mapping table to ensure evaluation metrics align with business objectives.
def align_metrics_with_business(eval_metrics: Dict, business_kpis: Dict) -> List[str]:
alignment_map = {
"faithfulness": ["customer_satisfaction", "complaint_rate"],
"answer_relevancy": ["task_completion_rate", "user_engagement"],
"latency_p95": ["session_duration", "bounce_rate"],
}
misaligned = []
for metric in eval_metrics:
if metric not in alignment_map:
misaligned.append(f"Metric '{metric}' has no business KPI mapping")
return misaligned
Pitfall 3: Insufficient A/B Test Sample Size
Drawing conclusions after 200 queries with insufficient statistical power.
Solution: Pre-calculate required sample size.
def calculate_sample_size(
baseline_rate: float,
minimum_detectable_effect: float,
alpha: float = 0.05,
power: float = 0.8,
) -> int:
import math
from scipy.stats import norm
z_alpha = norm.ppf(1 - alpha / 2)
z_beta = norm.ppf(power)
p1 = baseline_rate
p2 = baseline_rate * (1 + minimum_detectable_effect)
p_avg = (p1 + p2) / 2
n = (z_alpha * math.sqrt(2 * p_avg * (1 - p_avg)) +
z_beta * math.sqrt(p1 * (1 - p1) + p2 * (1 - p2))) ** 2 / (p2 - p1) ** 2
return math.ceil(n)
Pitfall 4: Ignoring Inter-Annotator Agreement
Multiple annotators produce vastly different results, but averaging masks quality issues.
Solution: Calculate Cohen's Kappa; dimensions below 0.6 need re-annotation.
Pitfall 5: Delayed Drift Detection in Production
Only discovering model degradation during monthly evaluations when impact has persisted for weeks.
Solution: Real-time monitoring with sliding window detection and hourly alerting.
10 Common Error Troubleshooting
| Error Symptom | Possible Cause | Troubleshooting Steps | Solution |
|---|---|---|---|
| lm-eval OOM | batch_size too large or model too big | Reduce batch_size, enable FSDP | batch_size=1 + device_map=auto |
| RAGAS faithfulness = 0 | Empty contexts or answer | Check eval_data contexts field | Ensure contexts are non-empty and relevant |
| pytest timeout | High model inference latency | Check GPU utilization and batch config | Increase timeout or optimize inference config |
| A/B test not significant | Insufficient sample size or small effect | Calculate statistical power and required sample | Extend test duration or increase traffic ratio |
| Human eval Kappa < 0.4 | Unclear annotation guidelines | Review guidelines and run calibration test | Add examples and edge case descriptions |
| PSI false alerts | Narrow reference data distribution | Expand reference data time window | Use 30-day data as baseline |
| Non-reproducible results | Random seed not fixed | Set global random seed | torch.manual_seed(42) + numpy.random.seed(42) |
| Eval data format error | Missing fields or type mismatch | Validate data with schema | Use Pydantic model validation |
| High drift detection latency | Monitoring window too large | Reduce sliding window | From 1000 to 200 records |
| Missing metrics in report | Some metric calculations failed | Check LLM API call timeouts | Add retry mechanism and fallback |
Advanced Optimization Techniques
1. Cross-Dimensional Evaluation
Single metrics can't fully reflect model quality. Use cross-evaluation matrices to identify weaknesses:
class CrossDimensionEvaluator:
def __init__(self, dimensions: List[str]):
self.dimensions = dimensions
self.matrix = {d: {d2: [] for d2 in dimensions} for d in dimensions}
def evaluate_cross(self, samples: List[Dict], eval_fn) -> Dict:
for sample in samples:
scores = eval_fn(sample)
for d1 in self.dimensions:
for d2 in self.dimensions:
if d1 != d2:
self.matrix[d1][d2].append(scores.get(d1, 0) * scores.get(d2, 0))
correlation = {}
for d1 in self.dimensions:
for d2 in self.dimensions:
if d1 != d2 and self.matrix[d1][d2]:
import statistics
correlation[f"{d1}×{d2}"] = round(
statistics.mean(self.matrix[d1][d2]), 4
)
return correlation
2. Dynamic Evaluation Set Generation
Static evaluation sets are vulnerable to "score gaming." Dynamically generate evaluation questions to ensure fairness:
class DynamicEvalGenerator:
def __init__(self, llm_client):
self.llm = llm_client
def generate_eval_questions(
self,
domain: str,
difficulty: str = "medium",
count: int = 50,
) -> List[Dict]:
prompt = f"""Generate {count} {difficulty}-difficulty evaluation questions in the {domain} domain.
Each question should include: question, choices, answer, explanation.
Return as a JSON array."""
response = self.llm.chat.completions.create(
model="default",
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
)
return json.loads(response.choices[0].message.content)
3. Tiered Evaluation Strategy
Different scenarios use different evaluation granularity, balancing cost and quality assurance:
| Tier | Trigger Condition | Evaluation Scope | Evaluation Time |
|---|---|---|---|
| L0 Quick Validation | Every commit | 50 core test cases | <5 minutes |
| L1 Standard Eval | Daily build | 500 standard set | ~30 minutes |
| L2 Full Eval | Version release | 5000 complete set | ~2 hours |
| L3 Human Spot Check | Major release | 100 expert reviews | ~1 day |
4. Evaluation Result Versioning
class EvalVersionManager:
def __init__(self, store_path: str = "./eval_versions"):
self.store_path = store_path
import os
os.makedirs(store_path, exist_ok=True)
def save_version(
self,
model_version: str,
eval_results: Dict,
eval_config: Dict,
) -> str:
version_id = f"{model_version}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
record = {
"version_id": version_id,
"model_version": model_version,
"eval_results": eval_results,
"eval_config": eval_config,
"timestamp": datetime.now().isoformat(),
}
path = os.path.join(self.store_path, f"{version_id}.json")
with open(path, "w", encoding="utf-8") as f:
json.dump(record, f, indent=2, ensure_ascii=False)
return version_id
def compare_versions(self, version_a: str, version_b: str) -> Dict:
data_a = self._load_version(version_a)
data_b = self._load_version(version_b)
diff = {}
for metric in data_a["eval_results"]:
if metric in data_b["eval_results"]:
diff[metric] = {
"a": data_a["eval_results"][metric],
"b": data_b["eval_results"][metric],
"delta": round(
data_b["eval_results"][metric] - data_a["eval_results"][metric], 4
),
}
return diff
5. Evaluation Pipeline Orchestration
class EvalPipelineOrchestrator:
def __init__(self):
self.stages = []
def add_stage(self, name: str, eval_fn: Callable, gate_threshold: float = 0.0):
self.stages.append({
"name": name,
"eval_fn": eval_fn,
"gate_threshold": gate_threshold,
})
def run(self, model, dataset) -> Dict:
results = {}
for stage in self.stages:
print(f"Running stage: {stage['name']}")
score = stage["eval_fn"](model, dataset)
results[stage["name"]] = score
if score < stage["gate_threshold"]:
print(f"GATE FAILED: {stage['name']} score {score:.4f} < {stage['gate_threshold']}")
results["status"] = "gate_failed"
results["failed_stage"] = stage["name"]
return results
results["status"] = "passed"
return results
Comparison Analysis
| Evaluation Pattern | Applicable Stage | Evaluation Dimensions | Automation Level | Cost | Reliability | Timeliness |
|---|---|---|---|---|---|---|
| LLM Benchmarking | Model selection/training | General capabilities | High | Low | Medium | Low |
| RAG Evaluation | RAG system development | Retrieval + Generation | High | Medium | High | Medium |
| Automated Testing | CI/CD pipeline | Custom metrics | High | Low | High | High |
| A/B Testing | Model deployment | Business metrics | Medium | High | High | Low |
| Human Evaluation | Quality assurance | All dimensions | Low | High | Highest | Lowest |
| Production Monitoring | Operations | Drift/degradation | High | Medium | Medium | Highest |
| Drift Detection Method | Applicable Scenario | Detection Speed | False Positive Rate | Minimum Sample Size |
|---|---|---|---|---|
| KS Test | Continuous data distribution | Fast | Medium | 100 |
| PSI | Categorical/continuous | Fast | Low | 500 |
| Z-score | Mean shift | Fastest | High | 30 |
| ADWIN | Streaming data | Medium | Low | 200 |
| Page-Hinkley | Cumulative drift | Medium | Medium | 100 |
Recommended Online Tools
- JSON Data Formatter: /en/json/format
- Hash Encoding Tool: /en/encode/hash
- Curl to Code: /en/dev/curl-to-code
Summary: AI model evaluation isn't just about "running a score"—it's an engineering system covering the entire model lifecycle. The 6 production patterns each have their place: benchmarking quantifies general capabilities, RAG evaluation pinpoints retrieval/generation weaknesses, automated testing ensures CI/CD quality gates, A/B testing validates online effectiveness, human evaluation sets quality ceilings, and production monitoring prevents silent degradation. In 2026, running AI systems without systematic evaluation is flying blind.
Further Reading
Try these browser-local tools — no sign-up required →