Python AI Model Evaluation: 6 Production Patterns from Benchmarking to Automated Testing

AI与大数据

Python AI Model Evaluation: 6 Production Patterns from Benchmarking to Automated Testing

Your AI model is live, but how good is it really? How do you quantify LLM response quality? Which component in your RAG system is dragging performance down? Has the model degraded after 3 months in production? Most teams still evaluate models by "manually checking a few results"—that's like inspecting chip yields with the naked eye: unreliable and non-reproducible. In 2026, AI model evaluation has evolved into a complete engineering system: from lm-evaluation-harness benchmarking, RAGAS framework for RAG evaluation, pytest-driven automated testing pipelines, A/B testing for model comparison, human evaluation platforms to production drift detection—6 production patterns covering the entire model lifecycle.


Key Takeaways

  • Master the complete workflow of LLM standardized benchmarking with lm-evaluation-harness
  • Use the RAGAS framework to quantitatively evaluate retrieval and generation quality in RAG systems
  • Build pytest-driven automated evaluation pipelines with CI/CD quality gates
  • Design scientific A/B testing plans to compare model version performance differences
  • Set up human evaluation platforms to collect high-quality human feedback data
  • Implement production drift detection and automated alerting mechanisms
  • Understand the applicable scenarios, pros/cons, and combination strategies for all 6 evaluation patterns

Table of Contents

  1. Architecture Overview: AI Model Evaluation Landscape
  2. Pattern 1: LLM Benchmarking (lm-evaluation-harness)
  3. Pattern 2: RAG Evaluation (RAGAS Framework)
  4. Pattern 3: Automated Testing Pipeline (pytest)
  5. Pattern 4: A/B Testing for Model Comparison
  6. Pattern 5: Human Evaluation Platform
  7. Pattern 6: Production Monitoring and Drift Detection
  8. 5 Common Pitfalls and Solutions
  9. 10 Common Error Troubleshooting
  10. Advanced Optimization Techniques
  11. Comparison Analysis
  12. Recommended Online Tools

Architecture Overview: AI Model Evaluation Landscape

┌─────────────────────────────────────────────────────────────┐
│                   AI Model Evaluation Pipeline              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌──────────┐    ┌──────────┐    ┌──────────────────────┐  │
│  │ Offline  │    │ Online   │    │   Human-in-the-Loop  │  │
│  │ Eval     │    │ Eval     │    │   Evaluation         │  │
│  │          │    │          │    │                      │  │
│  │ • Bench  │    │ • A/B    │    │ • Preference Ranking │  │
│  │   mark   │    │   Test   │    │ • Quality Scoring    │  │
│  │ • RAG    │    │ • Drift  │    │ • Red Team Testing   │  │
│  │   Eval   │    │   Detect │    │ • Domain Expert      │  │
│  │ • Auto   │    │ • Prod   │    │   Review             │  │
│  │   Test   │    │   Monitor│    │                      │  │
│  └────┬─────┘    └────┬─────┘    └──────────┬───────────┘  │
│       │               │                     │              │
│       ▼               ▼                     ▼              │
│  ┌─────────────────────────────────────────────────────┐   │
│  │              Evaluation Results Store                │   │
│  │  ┌─────────┐  ┌──────────┐  ┌──────────────────┐  │   │
│  │  │ Metrics │  │ Reports  │  │ Comparison Board │  │   │
│  │  │ DB      │  │ Generator│  │                  │  │   │
│  │  └─────────┘  └──────────┘  └──────────────────┘  │   │
│  └─────────────────────────────────────────────────────┘   │
│                          │                                  │
│                          ▼                                  │
│  ┌─────────────────────────────────────────────────────┐   │
│  │              Decision & Action Layer                 │   │
│  │  • Model Promotion / Rollback                       │   │
│  │  • Retraining Trigger                               │   │
│  │  • Alert & Notification                             │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Pattern 1: LLM Benchmarking (lm-evaluation-harness)

Why Standardized Benchmarking?

"Our model performs well"—this statement carries no information. You need standardized datasets and metrics under controlled conditions to quantify model capabilities. EleutherAI's lm-evaluation-harness is the most widely used LLM evaluation framework in 2026, supporting 200+ tasks.

Complete Benchmarking Workflow

# llm_benchmark.py
from lm_eval import evaluator
from lm_eval.models.huggingface import HuggingFaceAuto
from typing import Dict, List, Optional
import json
import os

class LLMBenchmarkRunner:
    def __init__(
        self,
        model_path: str,
        device: str = "cuda",
        batch_size: int = 8,
    ):
        self.model_path = model_path
        self.device = device
        self.batch_size = batch_size
        self.results_history = []

    def run_core_tasks(self) -> Dict:
        core_tasks = [
            "mmlu",
            "hellaswag",
            "arc_challenge",
            "truthfulqa_mc2",
            "winogrande",
            "gsm8k",
        ]

        results = evaluator.simple_evaluate(
            model="hf",
            model_args=f"pretrained={self.model_path}",
            tasks=core_tasks,
            batch_size=self.batch_size,
            device=self.device,
        )

        formatted = self._format_results(results)
        self.results_history.append(formatted)
        return formatted

    def run_custom_task(self, task_config_path: str) -> Dict:
        results = evaluator.simple_evaluate(
            model="hf",
            model_args=f"pretrained={self.model_path}",
            tasks=[task_config_path],
            batch_size=self.batch_size,
            device=self.device,
        )
        return self._format_results(results)

    def _format_results(self, raw_results: Dict) -> Dict:
        formatted = {
            "model": self.model_path,
            "timestamp": self._get_timestamp(),
            "tasks": {},
        }
        for task_name, task_results in raw_results["results"].items():
            formatted["tasks"][task_name] = {
                k: round(v, 4) if isinstance(v, float) else v
                for k, v in task_results.items()
            }
        return formatted

    def compare_with_baseline(self, baseline_path: str) -> Dict:
        if not self.results_history:
            self.run_core_tasks()

        with open(baseline_path, "r") as f:
            baseline = json.load(f)

        current = self.results_history[-1]
        comparison = {}
        for task_name in current["tasks"]:
            if task_name in baseline["tasks"]:
                current_score = current["tasks"][task_name].get("acc,none", 0)
                baseline_score = baseline["tasks"][task_name].get("acc,none", 0)
                comparison[task_name] = {
                    "current": current_score,
                    "baseline": baseline_score,
                    "delta": round(current_score - baseline_score, 4),
                    "improved": current_score > baseline_score,
                }
        return comparison

    @staticmethod
    def _get_timestamp() -> str:
        from datetime import datetime
        return datetime.now().isoformat()


class CustomTaskConfig:
    @staticmethod
    def create_domain_eval(
        dataset_path: str,
        task_name: str,
        output_dir: str = "./custom_tasks",
    ) -> str:
        config = {
            "task": task_name,
            "dataset_path": dataset_path,
            "output_type": "multiple_choice",
            "test_split": "test",
            "doc_to_text": "{{question}}",
            "doc_to_target": "{{answer}}",
            "doc_to_choice": "{{choices}}",
            "metric_list": [
                {"metric": "acc", "aggregation": "mean"},
                {"metric": "f1", "aggregation": "mean"},
            ],
        }

        os.makedirs(output_dir, exist_ok=True)
        config_path = os.path.join(output_dir, f"{task_name}.yaml")
        import yaml
        with open(config_path, "w") as f:
            yaml.dump(config, f)

        return config_path


if __name__ == "__main__":
    runner = LLMBenchmarkRunner(
        model_path="meta-llama/Llama-3.1-8B-Instruct",
        batch_size=4,
    )

    results = runner.run_core_tasks()
    print(json.dumps(results, indent=2))

    comparison = runner.compare_with_baseline("./baseline_results.json")
    for task, delta_info in comparison.items():
        status = "↑" if delta_info["improved"] else "↓"
        print(f"{task}: {delta_info['baseline']:.4f} → {delta_info['current']:.4f} {status}{delta_info['delta']:+.4f}")

Custom Domain Evaluation Tasks

# custom_tasks/medical_qa.yaml
task: medical_qa
dataset_path: json
dataset_kwargs:
  data_files:
    test: ./data/medical_qa_test.jsonl
test_split: test
doc_to_text: "Question: {{question}}\nChoices:\nA. {{A}}\nB. {{B}}\nC. {{C}}\nD. {{D}}\nAnswer:"
doc_to_target: "{{answer}}"
doc_to_choice: ["A", "B", "C", "D"]
metric_list:
  - metric: acc
    aggregation: mean
  - metric: f1
    aggregation: mean

Pattern 2: RAG Evaluation (RAGAS Framework)

RAG Evaluation Dimensions

RAG systems involve both retrieval and generation components that need separate evaluation. The RAGAS framework provides 4 core metrics:

Metric Component Meaning Calculation
Context Precision Retrieval Ranking precision of relevant docs in results Weighted average of relevant doc rankings
Context Recall Retrieval Proportion of required info that was retrieved Coverage ratio of ground truth by retrieved content
Faithfulness Generation Factual consistency of answer with retrieved docs Proportion of answer claims supported by retrieved docs
Answer Relevancy Generation Relevance of answer to the question Inverse probability of generating the original question from answer

Complete RAG Evaluation Implementation

# rag_evaluation_benchmark.py
from ragas import evaluate
from ragas.metrics import (
    context_precision,
    context_recall,
    faithfulness,
    answer_relevancy,
    AnswerSimilarity,
)
from datasets import Dataset
from typing import List, Dict, Optional
from dataclasses import dataclass, field
import json


@dataclass
class RAGEvalSample:
    question: str
    contexts: List[str]
    answer: str
    ground_truth: str


@dataclass
class RAGEvalReport:
    faithfulness: float
    answer_relevancy: float
    context_precision: float
    context_recall: float
    answer_similarity: float = 0.0
    sample_count: int = 0
    details: List[Dict] = field(default_factory=list)


class RAGEvaluator:
    def __init__(
        self,
        metrics: Optional[List] = None,
        llm=None,
        embeddings=None,
    ):
        self.metrics = metrics or [
            faithfulness,
            answer_relevancy,
            context_precision,
            context_recall,
        ]
        self.llm = llm
        self.embeddings = embeddings

    def evaluate_samples(
        self,
        samples: List[RAGEvalSample],
    ) -> RAGEvalReport:
        eval_data = {
            "question": [s.question for s in samples],
            "contexts": [s.contexts for s in samples],
            "answer": [s.answer for s in samples],
            "ground_truth": [s.ground_truth for s in samples],
        }

        dataset = Dataset.from_dict(eval_data)

        result = evaluate(
            dataset,
            metrics=self.metrics,
            llm=self.llm,
            embeddings=self.embeddings,
        )

        return RAGEvalReport(
            faithfulness=result["faithfulness"],
            answer_relevancy=result["answer_relevancy"],
            context_precision=result["context_precision"],
            context_recall=result["context_recall"],
            sample_count=len(samples),
        )

    def evaluate_rag_pipeline(
        self,
        rag_pipeline,
        test_questions: List[Dict],
    ) -> RAGEvalReport:
        samples = []
        for q in test_questions:
            rag_result = rag_pipeline.query(q["question"])
            sample = RAGEvalSample(
                question=q["question"],
                contexts=rag_result["contexts"],
                answer=rag_result["answer"],
                ground_truth=q["ground_truth"],
            )
            samples.append(sample)

        return self.evaluate_samples(samples)

    def compare_pipelines(
        self,
        pipelines: Dict[str, object],
        test_questions: List[Dict],
    ) -> Dict[str, RAGEvalReport]:
        reports = {}
        for name, pipeline in pipelines.items():
            report = self.evaluate_rag_pipeline(pipeline, test_questions)
            reports[name] = report
            print(f"\n=== {name} ===")
            print(f"  Faithfulness:       {report.faithfulness:.4f}")
            print(f"  Answer Relevancy:   {report.answer_relevancy:.4f}")
            print(f"  Context Precision:  {report.context_precision:.4f}")
            print(f"  Context Recall:     {report.context_recall:.4f}")
        return reports


class RAGEvalDatasetBuilder:
    @staticmethod
    def from_qa_pairs(
        qa_pairs: List[Dict],
        rag_pipeline=None,
    ) -> List[RAGEvalSample]:
        samples = []
        for qa in qa_pairs:
            if rag_pipeline and "contexts" not in qa:
                result = rag_pipeline.query(qa["question"])
                contexts = result["contexts"]
                answer = result["answer"]
            else:
                contexts = qa.get("contexts", [])
                answer = qa.get("answer", "")

            samples.append(RAGEvalSample(
                question=qa["question"],
                contexts=contexts,
                answer=answer,
                ground_truth=qa["ground_truth"],
            ))
        return samples

    @staticmethod
    def from_jsonl(file_path: str) -> List[RAGEvalSample]:
        samples = []
        with open(file_path, "r", encoding="utf-8") as f:
            for line in f:
                data = json.loads(line.strip())
                samples.append(RAGEvalSample(
                    question=data["question"],
                    contexts=data["contexts"],
                    answer=data["answer"],
                    ground_truth=data["ground_truth"],
                ))
        return samples


if __name__ == "__main__":
    test_data = [
        {
            "question": "What is Zero Trust Network Access (ZTNA)?",
            "contexts": ["Zero Trust Network Access (ZTNA) is a security model based on the principle of 'never trust, always verify', providing secure access to specific applications for remote users."],
            "answer": "ZTNA is a security model with the core principle of never trust, always verify, providing secure access for remote users.",
            "ground_truth": "Zero Trust Network Access (ZTNA) is a security architecture based on the never trust, always verify principle, providing secure access to specific applications for remote users through identity verification and authorization, replacing traditional VPN.",
        },
        {
            "question": "What are the core components of SASE architecture?",
            "contexts": ["SASE (Secure Access Service Edge) integrates SD-WAN, SWG, CASB, FWaaS, and ZTNA into a unified cloud-native service."],
            "answer": "SASE architecture includes SD-WAN, SWG, CASB, FWaaS, and ZTNA as core components, integrated into a unified cloud-native service.",
            "ground_truth": "The core components of SASE architecture include SD-WAN (Software-Defined Wide Area Network), SWG (Secure Web Gateway), CASB (Cloud Access Security Broker), FWaaS (Firewall as a Service), and ZTNA (Zero Trust Network Access), integrated into a unified cloud-native service delivery model.",
        },
    ]

    evaluator = RAGEvaluator()
    samples = RAGEvalDatasetBuilder.from_qa_pairs(test_data)
    report = evaluator.evaluate_samples(samples)

    print(f"\n=== RAG Evaluation Report ===")
    print(f"Faithfulness:       {report.faithfulness:.4f}")
    print(f"Answer Relevancy:   {report.answer_relevancy:.4f}")
    print(f"Context Precision:  {report.context_precision:.4f}")
    print(f"Context Recall:     {report.context_recall:.4f}")
    print(f"Sample Count:       {report.sample_count}")

Pattern 3: Automated Testing Pipeline (pytest)

Why Automated Testing?

Manual evaluation is non-reproducible, non-traceable, and impossible to integrate into CI/CD. pytest-driven automated evaluation pipelines make model assessment as reliable as unit tests.

Complete Automated Testing Framework

# tests/conftest.py
import pytest
from typing import Dict, List
import json
import os


@pytest.fixture(scope="session")
def model_client():
    from openai import OpenAI
    return OpenAI(
        base_url=os.getenv("MODEL_API_URL", "http://localhost:8000/v1"),
        api_key=os.getenv("MODEL_API_KEY", "test"),
    )


@pytest.fixture(scope="session")
def eval_dataset():
    with open("./data/eval_dataset.json", "r", encoding="utf-8") as f:
        return json.load(f)


@pytest.fixture(scope="session")
def baseline_scores():
    with open("./data/baseline_scores.json", "r", encoding="utf-8") as f:
        return json.load(f)


# tests/test_model_quality.py
import pytest
from typing import Dict


class TestModelQuality:
    def test_factual_accuracy(self, model_client, eval_dataset):
        factual_questions = [
            q for q in eval_dataset if q["category"] == "factual"
        ]
        correct = 0
        for q in factual_questions:
            response = model_client.chat.completions.create(
                model="default",
                messages=[{"role": "user", "content": q["question"]}],
                temperature=0.0,
            )
            answer = response.choices[0].message.content
            if self._check_answer(answer, q["expected_keywords"]):
                correct += 1

        accuracy = correct / len(factual_questions)
        assert accuracy >= 0.85, f"Factual accuracy {accuracy:.2%} below threshold 85%"

    def test_no_hallucination(self, model_client, eval_dataset):
        hallucination_prompts = [
            q for q in eval_dataset if q["category"] == "hallucination_trap"
        ]
        hallucinated = 0
        for q in hallucination_prompts:
            response = model_client.chat.completions.create(
                model="default",
                messages=[{"role": "user", "content": q["question"]}],
                temperature=0.0,
            )
            answer = response.choices[0].message.content
            if self._contains_hallucination(answer, q["trap_keywords"]):
                hallucinated += 1

        hallucination_rate = hallucinated / len(hallucination_prompts)
        assert hallucination_rate <= 0.10, f"Hallucination rate {hallucination_rate:.2%} above threshold 10%"

    def test_response_latency(self, model_client, eval_dataset):
        import time
        latencies = []
        for q in eval_dataset[:20]:
            start = time.time()
            model_client.chat.completions.create(
                model="default",
                messages=[{"role": "user", "content": q["question"]}],
            )
            latencies.append(time.time() - start)

        avg_latency = sum(latencies) / len(latencies)
        p95_latency = sorted(latencies)[int(len(latencies) * 0.95)]
        assert avg_latency <= 2.0, f"Average latency {avg_latency:.2f}s above threshold 2s"
        assert p95_latency <= 5.0, f"P95 latency {p95_latency:.2f}s above threshold 5s"

    def test_output_format_compliance(self, model_client, eval_dataset):
        format_questions = [
            q for q in eval_dataset if q.get("expected_format") == "json"
        ]
        format_errors = 0
        for q in format_questions:
            response = model_client.chat.completions.create(
                model="default",
                messages=[{"role": "user", "content": q["question"]}],
                temperature=0.0,
            )
            answer = response.choices[0].message.content
            try:
                json.loads(answer)
            except json.JSONDecodeError:
                format_errors += 1

        format_accuracy = 1 - format_errors / len(format_questions)
        assert format_accuracy >= 0.95, f"JSON format accuracy {format_accuracy:.2%} below 95%"

    def test_regression_against_baseline(self, model_client, eval_dataset, baseline_scores):
        current_scores = self._run_evaluation_suite(model_client, eval_dataset)
        for metric, baseline_value in baseline_scores.items():
            current_value = current_scores.get(metric, 0)
            assert current_value >= baseline_value * 0.95, (
                f"Regression detected: {metric} dropped from {baseline_value:.4f} to {current_value:.4f}"
            )

    @staticmethod
    def _check_answer(answer: str, keywords: List[str]) -> bool:
        answer_lower = answer.lower()
        matched = sum(1 for kw in keywords if kw.lower() in answer_lower)
        return matched >= len(keywords) * 0.6

    @staticmethod
    def _contains_hallucination(answer: str, trap_keywords: List[str]) -> bool:
        answer_lower = answer.lower()
        return any(kw.lower() in answer_lower for kw in trap_keywords)

    @staticmethod
    def _run_evaluation_suite(model_client, eval_dataset) -> Dict:
        return {
            "accuracy": 0.88,
            "faithfulness": 0.91,
            "relevancy": 0.85,
        }


# pytest.ini
"""
[pytest]
testpaths = tests
python_files = test_model_quality.py
python_classes = TestModelQuality
python_functions = test_*
addopts = -v --tb=short --json-report --json-report-file=eval_report.json
markers =
    smoke: smoke tests for quick validation
    regression: full regression test suite
    benchmark: performance benchmark tests
"""

CI/CD Integration

# .github/workflows/model_eval.yml
name: Model Evaluation Pipeline

on:
  pull_request:
    paths:
      - 'models/**'
      - 'config/**'

jobs:
  model-eval:
    runs-on: gpu-runner
    steps:
      - uses: actions/checkout@v4

      - name: Setup Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.12'

      - name: Install Dependencies
        run: |
          pip install -r requirements-eval.txt
          pip install pytest pytest-json-report

      - name: Deploy Model Canary
        run: |
          python scripts/deploy_canary.py --model-path ${{ env.MODEL_PATH }}

      - name: Run Smoke Tests
        run: pytest tests/ -m smoke -v

      - name: Run Full Evaluation
        run: pytest tests/ -m regression -v --json-report

      - name: Check Regression
        run: python scripts/check_regression.py --report eval_report.json --baseline data/baseline_scores.json

      - name: Generate Report
        if: always()
        run: python scripts/generate_eval_report.py --report eval_report.json

      - name: Upload Results
        if: always()
        uses: actions/upload-artifact@v4
        with:
          name: eval-results
          path: eval_report.json

Pattern 4: A/B Testing for Model Comparison

Why A/B Testing?

High offline evaluation scores don't guarantee good online performance. A/B testing compares models in real traffic—it's the most reliable way to validate effectiveness.

A/B Testing Framework Implementation

# ab_test_framework.py
import hashlib
import random
import time
from typing import Dict, List, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import json
import statistics


class AllocationStrategy(Enum):
    RANDOM = "random"
    HASH_BASED = "hash_based"
    STICKY = "sticky"


@dataclass
class ABTestConfig:
    test_name: str
    variant_a_name: str
    variant_b_name: str
    traffic_split: float = 0.5
    min_sample_size: int = 1000
    confidence_level: float = 0.95
    allocation_strategy: AllocationStrategy = AllocationStrategy.HASH_BASED
    duration_hours: int = 72


@dataclass
class ABTestResult:
    query: str
    variant: str
    response: str
    latency_ms: float
    timestamp: str
    user_feedback: Optional[int] = None
    auto_score: Optional[float] = None


@dataclass
class ABTestReport:
    test_name: str
    variant_a: Dict
    variant_b: Dict
    winner: Optional[str] = None
    confidence: float = 0.0
    is_significant: bool = False
    sample_size_a: int = 0
    sample_size_b: int = 0


class ABTestRunner:
    def __init__(self, config: ABTestConfig):
        self.config = config
        self.results: List[ABTestResult] = []
        self._sticky_map: Dict[str, str] = {}

    def allocate_variant(self, user_id: str) -> str:
        if self.config.allocation_strategy == AllocationStrategy.RANDOM:
            return self.config.variant_a_name if random.random() < self.config.traffic_split else self.config.variant_b_name

        elif self.config.allocation_strategy == AllocationStrategy.HASH_BASED:
            hash_val = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
            threshold = int(self.config.traffic_split * (2**128))
            variant = self.config.variant_a_name if hash_val < threshold else self.config.variant_b_name
            return variant

        elif self.config.allocation_strategy == AllocationStrategy.STICKY:
            if user_id in self._sticky_map:
                return self._sticky_map[user_id]
            variant = self.allocate_variant(user_id + "_init")
            self._sticky_map[user_id] = variant
            return variant

    def record_result(self, result: ABTestResult):
        self.results.append(result)

    def run_test(
        self,
        queries: List[str],
        model_a_fn: Callable,
        model_b_fn: Callable,
        evaluator_fn: Optional[Callable] = None,
    ) -> ABTestReport:
        for i, query in enumerate(queries):
            user_id = f"user_{i}"
            variant = self.allocate_variant(user_id)
            model_fn = model_a_fn if variant == self.config.variant_a_name else model_b_fn

            start_time = time.time()
            response = model_fn(query)
            latency_ms = (time.time() - start_time) * 1000

            auto_score = evaluator_fn(query, response) if evaluator_fn else None

            self.record_result(ABTestResult(
                query=query,
                variant=variant,
                response=response,
                latency_ms=latency_ms,
                timestamp=datetime.now().isoformat(),
                auto_score=auto_score,
            ))

        return self.analyze()

    def analyze(self) -> ABTestReport:
        a_results = [r for r in self.results if r.variant == self.config.variant_a_name]
        b_results = [r for r in self.results if r.variant == self.config.variant_b_name]

        a_scores = [r.auto_score for r in a_results if r.auto_score is not None]
        b_scores = [r.auto_score for r in b_results if r.auto_score is not None]

        a_latencies = [r.latency_ms for r in a_results]
        b_latencies = [r.latency_ms for r in b_results]

        a_feedback = [r.user_feedback for r in a_results if r.user_feedback is not None]
        b_feedback = [r.user_feedback for r in b_results if r.user_feedback is not None]

        stats_a = {
            "avg_score": statistics.mean(a_scores) if a_scores else 0,
            "avg_latency_ms": statistics.mean(a_latencies) if a_latencies else 0,
            "p95_latency_ms": sorted(a_latencies)[int(len(a_latencies) * 0.95)] if a_latencies else 0,
            "avg_feedback": statistics.mean(a_feedback) if a_feedback else 0,
        }
        stats_b = {
            "avg_score": statistics.mean(b_scores) if b_scores else 0,
            "avg_latency_ms": statistics.mean(b_latencies) if b_latencies else 0,
            "p95_latency_ms": sorted(b_latencies)[int(len(b_latencies) * 0.95)] if b_latencies else 0,
            "avg_feedback": statistics.mean(b_feedback) if b_feedback else 0,
        }

        is_significant = False
        confidence = 0.0
        if a_scores and b_scores and len(a_scores) >= 30 and len(b_scores) >= 30:
            confidence, is_significant = self._statistical_test(a_scores, b_scores)

        winner = None
        if is_significant:
            if stats_a["avg_score"] > stats_b["avg_score"]:
                winner = self.config.variant_a_name
            else:
                winner = self.config.variant_b_name

        return ABTestReport(
            test_name=self.config.test_name,
            variant_a=stats_a,
            variant_b=stats_b,
            winner=winner,
            confidence=confidence,
            is_significant=is_significant,
            sample_size_a=len(a_results),
            sample_size_b=len(b_results),
        )

    @staticmethod
    def _statistical_test(a: List[float], b: List[float]) -> tuple:
        from scipy.stats import ttest_ind
        t_stat, p_value = ttest_ind(a, b)
        confidence = 1 - p_value
        is_significant = p_value < 0.05
        return round(confidence, 4), is_significant


if __name__ == "__main__":
    config = ABTestConfig(
        test_name="llm_v1_vs_v2",
        variant_a_name="llama-3.1-8b",
        variant_b_name="llama-3.1-8b-finetuned",
        traffic_split=0.5,
        min_sample_size=500,
    )

    runner = ABTestRunner(config)

    def model_a_fn(query: str) -> str:
        return f"Model A response to: {query}"

    def model_b_fn(query: str) -> str:
        return f"Model B enhanced response to: {query}"

    def evaluator_fn(query: str, response: str) -> float:
        return random.uniform(0.7, 1.0)

    queries = [f"Test question {i}" for i in range(200)]
    report = runner.run_test(queries, model_a_fn, model_b_fn, evaluator_fn)

    print(f"Winner: {report.winner}")
    print(f"Confidence: {report.confidence:.2%}")
    print(f"Variant A avg score: {report.variant_a['avg_score']:.4f}")
    print(f"Variant B avg score: {report.variant_b['avg_score']:.4f}")

Pattern 5: Human Evaluation Platform

Why Human Evaluation?

Automated metrics can't capture all quality dimensions. Fluency, helpfulness, safety, subtle factual errors—these all require human judgment. Human evaluation is the "gold standard" of model assessment.

Human Evaluation Platform Implementation

# human_eval_platform.py
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime
import json
import uuid
import statistics


class EvalTaskType(Enum):
    SINGLE_RESPONSE = "single_response"
    PAIRWISE_COMPARISON = "pairwise_comparison"
    RANKING = "ranking"
    ERROR_ANNOTATION = "error_annotation"


class QualityDimension(Enum):
    FACTUAL_ACCURACY = "factual_accuracy"
    RELEVANCE = "relevance"
    COHERENCE = "coherence"
    FLUENCY = "fluency"
    SAFETY = "safety"
    HELPFULNESS = "helpfulness"


@dataclass
class EvalTask:
    task_id: str
    task_type: EvalTaskType
    question: str
    responses: List[str]
    quality_dimensions: List[QualityDimension]
    guidelines: str = ""
    metadata: Dict = field(default_factory=dict)


@dataclass
class AnnotatorResult:
    task_id: str
    annotator_id: str
    scores: Dict[str, float]
    preference: Optional[int] = None
    comments: str = ""
    duration_seconds: float = 0.0
    timestamp: str = ""


@dataclass
class InterAnnotatorAgreement:
    dimension: str
    cohens_kappa: float
    fleiss_kappa: float
    agreement_rate: float


class HumanEvalPlatform:
    def __init__(self):
        self.tasks: Dict[str, EvalTask] = {}
        self.results: Dict[str, List[AnnotatorResult]] = {}
        self.annotator_stats: Dict[str, Dict] = {}

    def create_task(
        self,
        question: str,
        responses: List[str],
        task_type: EvalTaskType = EvalTaskType.SINGLE_RESPONSE,
        quality_dimensions: Optional[List[QualityDimension]] = None,
        guidelines: str = "",
    ) -> EvalTask:
        task_id = str(uuid.uuid4())[:8]
        task = EvalTask(
            task_id=task_id,
            task_type=task_type,
            question=question,
            responses=responses,
            quality_dimensions=quality_dimensions or [
                QualityDimension.FACTUAL_ACCURACY,
                QualityDimension.RELEVANCE,
                QualityDimension.COHERENCE,
            ],
            guidelines=guidelines,
        )
        self.tasks[task_id] = task
        self.results[task_id] = []
        return task

    def submit_annotation(self, result: AnnotatorResult):
        if result.task_id not in self.results:
            raise ValueError(f"Task {result.task_id} not found")
        result.timestamp = datetime.now().isoformat()
        self.results[result.task_id].append(result)
        self._update_annotator_stats(result)

    def get_next_task(self, annotator_id: str) -> Optional[EvalTask]:
        for task_id, task in self.tasks.items():
            existing_annotators = {r.annotator_id for r in self.results[task_id]}
            if annotator_id not in existing_annotators and len(existing_annotators) < 3:
                return task
        return None

    def compute_agreement(self, task_ids: Optional[List[str]] = None) -> List[InterAnnotatorAgreement]:
        target_tasks = task_ids or list(self.tasks.keys())
        agreements = []

        all_dimensions = set()
        for task_id in target_tasks:
            for result in self.results.get(task_id, []):
                all_dimensions.update(result.scores.keys())

        for dimension in all_dimensions:
            scores_by_task = {}
            for task_id in target_tasks:
                task_results = self.results.get(task_id, [])
                if len(task_results) >= 2:
                    scores_by_task[task_id] = [r.scores.get(dimension, 0) for r in task_results]

            if not scores_by_task:
                continue

            agreement_rate = self._compute_pairwise_agreement(scores_by_task)
            cohens_kappa = self._compute_cohens_kappa(scores_by_task)
            fleiss_kappa = self._compute_fleiss_kappa(scores_by_task)

            agreements.append(InterAnnotatorAgreement(
                dimension=dimension,
                cohens_kappa=round(cohens_kappa, 4),
                fleiss_kappa=round(fleiss_kappa, 4),
                agreement_rate=round(agreement_rate, 4),
            ))

        return agreements

    def generate_report(self) -> Dict:
        all_scores = {}
        for task_id, results in self.results.items():
            for result in results:
                for dim, score in result.scores.items():
                    if dim not in all_scores:
                        all_scores[dim] = []
                    all_scores[dim].append(score)

        dimension_stats = {}
        for dim, scores in all_scores.items():
            dimension_stats[dim] = {
                "mean": round(statistics.mean(scores), 4),
                "median": round(statistics.median(scores), 4),
                "stdev": round(statistics.stdev(scores), 4) if len(scores) > 1 else 0,
                "count": len(scores),
            }

        pairwise_stats = {}
        pairwise_tasks = [
            (tid, t) for tid, t in self.tasks.items()
            if t.task_type == EvalTaskType.PAIRWISE_COMPARISON
        ]
        for task_id, task in pairwise_tasks:
            prefs = [r.preference for r in self.results[task_id] if r.preference is not None]
            if prefs:
                pairwise_stats[task_id] = {
                    "response_a_wins": sum(1 for p in prefs if p == 0),
                    "response_b_wins": sum(1 for p in prefs if p == 1),
                    "total_votes": len(prefs),
                }

        return {
            "total_tasks": len(self.tasks),
            "total_annotations": sum(len(r) for r in self.results.values()),
            "dimension_stats": dimension_stats,
            "pairwise_stats": pairwise_stats,
            "annotator_count": len(self.annotator_stats),
        }

    def _update_annotator_stats(self, result: AnnotatorResult):
        aid = result.annotator_id
        if aid not in self.annotator_stats:
            self.annotator_stats[aid] = {"count": 0, "total_duration": 0}
        self.annotator_stats[aid]["count"] += 1
        self.annotator_stats[aid]["total_duration"] += result.duration_seconds

    @staticmethod
    def _compute_pairwise_agreement(scores_by_task: Dict) -> float:
        agreements = 0
        total = 0
        for task_id, scores in scores_by_task.items():
            for i in range(len(scores)):
                for j in range(i + 1, len(scores)):
                    if abs(scores[i] - scores[j]) <= 1:
                        agreements += 1
                    total += 1
        return agreements / total if total > 0 else 0

    @staticmethod
    def _compute_cohens_kappa(scores_by_task: Dict) -> float:
        if len(scores_by_task) < 1:
            return 0.0
        all_pairs = []
        for scores in scores_by_task.values():
            if len(scores) >= 2:
                all_pairs.append((scores[0], scores[1]))
        if not all_pairs:
            return 0.0
        rater1 = [p[0] for p in all_pairs]
        rater2 = [p[1] for p in all_pairs]
        n = len(rater1)
        agree = sum(1 for a, b in zip(rater1, rater2) if abs(a - b) <= 1)
        p_observed = agree / n
        p_expected = 0.2
        return (p_observed - p_expected) / (1 - p_expected) if (1 - p_expected) != 0 else 0

    @staticmethod
    def _compute_fleiss_kappa(scores_by_task: Dict) -> float:
        return 0.0


if __name__ == "__main__":
    platform = HumanEvalPlatform()

    task = platform.create_task(
        question="Explain the basic principles of quantum computing",
        responses=[
            "Quantum computing leverages superposition and entanglement of qubits for parallel computation...",
            "Quantum computing is a computing paradigm that uses quantum mechanics principles through qubits...",
        ],
        task_type=EvalTaskType.PAIRWISE_COMPARISON,
        quality_dimensions=[
            QualityDimension.FACTUAL_ACCURACY,
            QualityDimension.RELEVANCE,
            QualityDimension.COHERENCE,
        ],
    )

    for annotator_id in ["ann_1", "ann_2", "ann_3"]:
        result = AnnotatorResult(
            task_id=task.task_id,
            annotator_id=annotator_id,
            scores={
                "factual_accuracy": 4.0 + (hash(annotator_id) % 10) / 10,
                "relevance": 3.5 + (hash(annotator_id) % 10) / 10,
                "coherence": 4.0 + (hash(annotator_id) % 10) / 10,
            },
            preference=0 if hash(annotator_id) % 2 == 0 else 1,
            duration_seconds=45.0,
        )
        platform.submit_annotation(result)

    report = platform.generate_report()
    print(json.dumps(report, indent=2))

    agreements = platform.compute_agreement()
    for a in agreements:
        print(f"{a.dimension}: κ={a.cohens_kappa:.4f}, agreement={a.agreement_rate:.2%}")

Pattern 6: Production Monitoring and Drift Detection

Why Production Monitoring?

Model deployment is not the end—it's the start of monitoring. Data distribution changes, user behavior shifts, model degradation—these issues, if not detected early, lead to silent quality decline.

Drift Detection System Implementation

# production_monitor.py
import numpy as np
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import json
import logging

logger = logging.getLogger(__name__)


class DriftType(Enum):
    DATA_DRIFT = "data_drift"
    CONCEPT_DRIFT = "concept_drift"
    PREDICTION_DRIFT = "prediction_drift"


class AlertLevel(Enum):
    INFO = "info"
    WARNING = "warning"
    CRITICAL = "critical"


@dataclass
class DriftAlert:
    drift_type: DriftType
    metric_name: str
    current_value: float
    baseline_value: float
    drift_score: float
    alert_level: AlertLevel
    timestamp: str
    message: str


@dataclass
class MonitoringWindow:
    window_size: int = 1000
    reference_size: int = 5000


class DataDriftDetector:
    def __init__(
        self,
        reference_data: np.ndarray,
        significance_level: float = 0.05,
    ):
        self.reference_data = reference_data
        self.significance_level = significance_level
        self.reference_mean = np.mean(reference_data, axis=0)
        self.reference_std = np.std(reference_data, axis=0)

    def detect_ks_test(self, current_data: np.ndarray) -> Tuple[float, bool]:
        from scipy.stats import ks_2samp
        stat, p_value = ks_2samp(self.reference_data, current_data)
        is_drift = p_value < self.significance_level
        return p_value, is_drift

    def detect_psi(self, current_data: np.ndarray, n_bins: int = 10) -> Tuple[float, bool]:
        ref_hist, bin_edges = np.histogram(self.reference_data, bins=n_bins, density=True)
        cur_hist, _ = np.histogram(current_data, bins=bin_edges, density=True)

        ref_hist = ref_hist / ref_hist.sum()
        cur_hist = cur_hist / cur_hist.sum()

        ref_hist = np.clip(ref_hist, 1e-6, None)
        cur_hist = np.clip(cur_hist, 1e-6, None)

        psi = np.sum((cur_hist - ref_hist) * np.log(cur_hist / ref_hist))

        is_drift = psi >= 0.1
        return round(psi, 4), is_drift

    def detect_z_score(self, current_data: np.ndarray, threshold: float = 3.0) -> Tuple[float, bool]:
        current_mean = np.mean(current_data)
        z_score = abs(current_mean - self.reference_mean) / (self.reference_std + 1e-8)
        is_drift = z_score > threshold
        return round(float(z_score), 4), is_drift


class PredictionDriftMonitor:
    def __init__(
        self,
        reference_predictions: List[Dict],
        window_size: int = 1000,
    ):
        self.reference_predictions = reference_predictions
        self.window_size = window_size
        self.prediction_buffer: List[Dict] = []
        self.alerts: List[DriftAlert] = []

    def record_prediction(self, prediction: Dict):
        self.prediction_buffer.append({
            **prediction,
            "timestamp": datetime.now().isoformat(),
        })

        if len(self.prediction_buffer) >= self.window_size:
            self._check_drift()
            self.prediction_buffer = self.prediction_buffer[-self.window_size // 2:]

    def _check_drift(self):
        ref_scores = [p.get("confidence", 0) for p in self.reference_predictions]
        cur_scores = [p.get("confidence", 0) for p in self.prediction_buffer]

        ref_arr = np.array(ref_scores)
        cur_arr = np.array(cur_scores)

        detector = DataDriftDetector(ref_arr)
        psi_value, is_psi_drift = detector.detect_psi(cur_arr)
        z_score, is_z_drift = detector.detect_z_score(cur_arr)

        if is_psi_drift or is_z_drift:
            level = AlertLevel.CRITICAL if psi_value > 0.25 else AlertLevel.WARNING
            alert = DriftAlert(
                drift_type=DriftType.PREDICTION_DRIFT,
                metric_name="confidence_score",
                current_value=float(np.mean(cur_arr)),
                baseline_value=float(np.mean(ref_arr)),
                drift_score=psi_value,
                alert_level=level,
                timestamp=datetime.now().isoformat(),
                message=f"Prediction drift detected: PSI={psi_value:.4f}, Z-score={z_score:.4f}",
            )
            self.alerts.append(alert)
            logger.warning(alert.message)

    def get_health_report(self) -> Dict:
        recent_alerts = [
            a for a in self.alerts
            if datetime.fromisoformat(a.timestamp) > datetime.now() - timedelta(hours=24)
        ]
        return {
            "total_predictions_monitored": len(self.prediction_buffer),
            "alerts_last_24h": len(recent_alerts),
            "critical_alerts": sum(1 for a in recent_alerts if a.alert_level == AlertLevel.CRITICAL),
            "latest_drift_score": self.alerts[-1].drift_score if self.alerts else 0,
            "status": "healthy" if not recent_alerts else "degraded",
        }


class ModelPerformanceTracker:
    def __init__(self, baseline_metrics: Dict[str, float]):
        self.baseline_metrics = baseline_metrics
        self.metric_history: List[Dict] = []
        self.degradation_threshold = 0.05

    def record_metrics(self, metrics: Dict[str, float]):
        entry = {
            "timestamp": datetime.now().isoformat(),
            "metrics": metrics,
        }
        self.metric_history.append(entry)
        self._check_degradation(metrics)

    def _check_degradation(self, current_metrics: Dict[str, float]):
        for metric_name, baseline_value in self.baseline_metrics.items():
            current_value = current_metrics.get(metric_name)
            if current_value is None:
                continue
            relative_change = (baseline_value - current_value) / baseline_value
            if relative_change > self.degradation_threshold:
                logger.warning(
                    f"Degradation detected: {metric_name} dropped from "
                    f"{baseline_value:.4f} to {current_value:.4f} "
                    f"({relative_change:.2%} decrease)"
                )

    def get_trend(self, metric_name: str, hours: int = 24) -> Dict:
        cutoff = datetime.now() - timedelta(hours=hours)
        recent = [
            entry for entry in self.metric_history
            if datetime.fromisoformat(entry["timestamp"]) > cutoff
            and metric_name in entry["metrics"]
        ]
        if not recent:
            return {"trend": "no_data", "values": []}

        values = [entry["metrics"][metric_name] for entry in recent]
        trend = "stable"
        if len(values) >= 3:
            first_half = np.mean(values[:len(values)//2])
            second_half = np.mean(values[len(values)//2:])
            if second_half < first_half * 0.95:
                trend = "declining"
            elif second_half > first_half * 1.05:
                trend = "improving"

        return {
            "trend": trend,
            "current": values[-1],
            "baseline": self.baseline_metrics.get(metric_name),
            "values": values,
        }


if __name__ == "__main__":
    np.random.seed(42)
    reference = np.random.normal(0.85, 0.05, 5000)

    detector = DataDriftDetector(reference)

    healthy_data = np.random.normal(0.84, 0.05, 1000)
    drifted_data = np.random.normal(0.70, 0.08, 1000)

    psi_healthy, drift_healthy = detector.detect_psi(healthy_data)
    print(f"Healthy data: PSI={psi_healthy:.4f}, drift={drift_healthy}")

    psi_drifted, drift_drifted = detector.detect_psi(drifted_data)
    print(f"Drifted data: PSI={psi_drifted:.4f}, drift={drift_drifted}")

    reference_preds = [{"confidence": np.random.uniform(0.8, 0.95)} for _ in range(5000)]
    monitor = PredictionDriftMonitor(reference_preds, window_size=100)

    for _ in range(50):
        monitor.record_prediction({"confidence": np.random.uniform(0.8, 0.95)})

    for _ in range(50):
        monitor.record_prediction({"confidence": np.random.uniform(0.5, 0.7)})

    health = monitor.get_health_report()
    print(f"\nHealth Report: {json.dumps(health, indent=2)}")

5 Common Pitfalls and Solutions

Pitfall 1: Evaluation Data Leakage

Training and evaluation data overlap, inflating evaluation scores.

Solution: Strict data isolation strategy with hash-based deduplication.

def check_data_leakage(train_data: List[str], eval_data: List[str], threshold: float = 0.8) -> Dict:
    from difflib import SequenceMatcher
    leaks = []
    for i, eval_item in enumerate(eval_data):
        for train_item in train_data:
            similarity = SequenceMatcher(None, eval_item, train_item).ratio()
            if similarity > threshold:
                leaks.append({"eval_index": i, "similarity": round(similarity, 4)})
                break
    return {"leak_count": len(leaks), "leak_rate": len(leaks) / len(eval_data)}

Pitfall 2: Evaluation Metrics Misaligned with Business Goals

Model scores high on MMLU but business KPIs don't improve.

Solution: Build a metric-to-business mapping table to ensure evaluation metrics align with business objectives.

def align_metrics_with_business(eval_metrics: Dict, business_kpis: Dict) -> List[str]:
    alignment_map = {
        "faithfulness": ["customer_satisfaction", "complaint_rate"],
        "answer_relevancy": ["task_completion_rate", "user_engagement"],
        "latency_p95": ["session_duration", "bounce_rate"],
    }
    misaligned = []
    for metric in eval_metrics:
        if metric not in alignment_map:
            misaligned.append(f"Metric '{metric}' has no business KPI mapping")
    return misaligned

Pitfall 3: Insufficient A/B Test Sample Size

Drawing conclusions after 200 queries with insufficient statistical power.

Solution: Pre-calculate required sample size.

def calculate_sample_size(
    baseline_rate: float,
    minimum_detectable_effect: float,
    alpha: float = 0.05,
    power: float = 0.8,
) -> int:
    import math
    from scipy.stats import norm
    z_alpha = norm.ppf(1 - alpha / 2)
    z_beta = norm.ppf(power)
    p1 = baseline_rate
    p2 = baseline_rate * (1 + minimum_detectable_effect)
    p_avg = (p1 + p2) / 2
    n = (z_alpha * math.sqrt(2 * p_avg * (1 - p_avg)) +
         z_beta * math.sqrt(p1 * (1 - p1) + p2 * (1 - p2))) ** 2 / (p2 - p1) ** 2
    return math.ceil(n)

Pitfall 4: Ignoring Inter-Annotator Agreement

Multiple annotators produce vastly different results, but averaging masks quality issues.

Solution: Calculate Cohen's Kappa; dimensions below 0.6 need re-annotation.

Pitfall 5: Delayed Drift Detection in Production

Only discovering model degradation during monthly evaluations when impact has persisted for weeks.

Solution: Real-time monitoring with sliding window detection and hourly alerting.


10 Common Error Troubleshooting

Error Symptom Possible Cause Troubleshooting Steps Solution
lm-eval OOM batch_size too large or model too big Reduce batch_size, enable FSDP batch_size=1 + device_map=auto
RAGAS faithfulness = 0 Empty contexts or answer Check eval_data contexts field Ensure contexts are non-empty and relevant
pytest timeout High model inference latency Check GPU utilization and batch config Increase timeout or optimize inference config
A/B test not significant Insufficient sample size or small effect Calculate statistical power and required sample Extend test duration or increase traffic ratio
Human eval Kappa < 0.4 Unclear annotation guidelines Review guidelines and run calibration test Add examples and edge case descriptions
PSI false alerts Narrow reference data distribution Expand reference data time window Use 30-day data as baseline
Non-reproducible results Random seed not fixed Set global random seed torch.manual_seed(42) + numpy.random.seed(42)
Eval data format error Missing fields or type mismatch Validate data with schema Use Pydantic model validation
High drift detection latency Monitoring window too large Reduce sliding window From 1000 to 200 records
Missing metrics in report Some metric calculations failed Check LLM API call timeouts Add retry mechanism and fallback

Advanced Optimization Techniques

1. Cross-Dimensional Evaluation

Single metrics can't fully reflect model quality. Use cross-evaluation matrices to identify weaknesses:

class CrossDimensionEvaluator:
    def __init__(self, dimensions: List[str]):
        self.dimensions = dimensions
        self.matrix = {d: {d2: [] for d2 in dimensions} for d in dimensions}

    def evaluate_cross(self, samples: List[Dict], eval_fn) -> Dict:
        for sample in samples:
            scores = eval_fn(sample)
            for d1 in self.dimensions:
                for d2 in self.dimensions:
                    if d1 != d2:
                        self.matrix[d1][d2].append(scores.get(d1, 0) * scores.get(d2, 0))

        correlation = {}
        for d1 in self.dimensions:
            for d2 in self.dimensions:
                if d1 != d2 and self.matrix[d1][d2]:
                    import statistics
                    correlation[f"{d1}×{d2}"] = round(
                        statistics.mean(self.matrix[d1][d2]), 4
                    )
        return correlation

2. Dynamic Evaluation Set Generation

Static evaluation sets are vulnerable to "score gaming." Dynamically generate evaluation questions to ensure fairness:

class DynamicEvalGenerator:
    def __init__(self, llm_client):
        self.llm = llm_client

    def generate_eval_questions(
        self,
        domain: str,
        difficulty: str = "medium",
        count: int = 50,
    ) -> List[Dict]:
        prompt = f"""Generate {count} {difficulty}-difficulty evaluation questions in the {domain} domain.
        Each question should include: question, choices, answer, explanation.
        Return as a JSON array."""
        response = self.llm.chat.completions.create(
            model="default",
            messages=[{"role": "user", "content": prompt}],
            temperature=0.7,
        )
        return json.loads(response.choices[0].message.content)

3. Tiered Evaluation Strategy

Different scenarios use different evaluation granularity, balancing cost and quality assurance:

Tier Trigger Condition Evaluation Scope Evaluation Time
L0 Quick Validation Every commit 50 core test cases <5 minutes
L1 Standard Eval Daily build 500 standard set ~30 minutes
L2 Full Eval Version release 5000 complete set ~2 hours
L3 Human Spot Check Major release 100 expert reviews ~1 day

4. Evaluation Result Versioning

class EvalVersionManager:
    def __init__(self, store_path: str = "./eval_versions"):
        self.store_path = store_path
        import os
        os.makedirs(store_path, exist_ok=True)

    def save_version(
        self,
        model_version: str,
        eval_results: Dict,
        eval_config: Dict,
    ) -> str:
        version_id = f"{model_version}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
        record = {
            "version_id": version_id,
            "model_version": model_version,
            "eval_results": eval_results,
            "eval_config": eval_config,
            "timestamp": datetime.now().isoformat(),
        }
        path = os.path.join(self.store_path, f"{version_id}.json")
        with open(path, "w", encoding="utf-8") as f:
            json.dump(record, f, indent=2, ensure_ascii=False)
        return version_id

    def compare_versions(self, version_a: str, version_b: str) -> Dict:
        data_a = self._load_version(version_a)
        data_b = self._load_version(version_b)
        diff = {}
        for metric in data_a["eval_results"]:
            if metric in data_b["eval_results"]:
                diff[metric] = {
                    "a": data_a["eval_results"][metric],
                    "b": data_b["eval_results"][metric],
                    "delta": round(
                        data_b["eval_results"][metric] - data_a["eval_results"][metric], 4
                    ),
                }
        return diff

5. Evaluation Pipeline Orchestration

class EvalPipelineOrchestrator:
    def __init__(self):
        self.stages = []

    def add_stage(self, name: str, eval_fn: Callable, gate_threshold: float = 0.0):
        self.stages.append({
            "name": name,
            "eval_fn": eval_fn,
            "gate_threshold": gate_threshold,
        })

    def run(self, model, dataset) -> Dict:
        results = {}
        for stage in self.stages:
            print(f"Running stage: {stage['name']}")
            score = stage["eval_fn"](model, dataset)
            results[stage["name"]] = score

            if score < stage["gate_threshold"]:
                print(f"GATE FAILED: {stage['name']} score {score:.4f} < {stage['gate_threshold']}")
                results["status"] = "gate_failed"
                results["failed_stage"] = stage["name"]
                return results

        results["status"] = "passed"
        return results

Comparison Analysis

Evaluation Pattern Applicable Stage Evaluation Dimensions Automation Level Cost Reliability Timeliness
LLM Benchmarking Model selection/training General capabilities High Low Medium Low
RAG Evaluation RAG system development Retrieval + Generation High Medium High Medium
Automated Testing CI/CD pipeline Custom metrics High Low High High
A/B Testing Model deployment Business metrics Medium High High Low
Human Evaluation Quality assurance All dimensions Low High Highest Lowest
Production Monitoring Operations Drift/degradation High Medium Medium Highest
Drift Detection Method Applicable Scenario Detection Speed False Positive Rate Minimum Sample Size
KS Test Continuous data distribution Fast Medium 100
PSI Categorical/continuous Fast Low 500
Z-score Mean shift Fastest High 30
ADWIN Streaming data Medium Low 200
Page-Hinkley Cumulative drift Medium Medium 100


Summary: AI model evaluation isn't just about "running a score"—it's an engineering system covering the entire model lifecycle. The 6 production patterns each have their place: benchmarking quantifies general capabilities, RAG evaluation pinpoints retrieval/generation weaknesses, automated testing ensures CI/CD quality gates, A/B testing validates online effectiveness, human evaluation sets quality ceilings, and production monitoring prevents silent degradation. In 2026, running AI systems without systematic evaluation is flying blind.


Further Reading

Try these browser-local tools — no sign-up required →

#AI模型评估#LLM基准测试#RAG评估#自动化评测#Python#2026#AI与大数据