Python AI模型评估实战:从基准测试到自动化评测的6种生产模式

AI与大数据

Python AI模型评估实战:从基准测试到自动化评测的6种生产模式

你的AI模型上线后,准确率到底怎么样?LLM回答质量如何量化?RAG系统的检索和生成到底哪个环节拖后腿?模型上线3个月后效果有没有退化?大多数团队还在用"人肉看几条结果"来评估模型——这就像用肉眼检测芯片良率,既不靠谱也不可复现。2026年,AI模型评估已经形成完整的工程体系:从lm-evaluation-harness基准测试、RAGAS框架评估RAG、pytest自动化评测流水线、A/B测试模型对比、人工评估平台到生产环境漂移检测,6种生产模式覆盖模型全生命周期。


核心收获

  • 掌握lm-evaluation-harness进行LLM标准化基准测试的完整流程
  • 使用RAGAS框架量化评估RAG系统的检索质量和生成质量
  • 构建pytest驱动的自动化评测流水线,实现CI/CD中的模型质量门禁
  • 设计科学的A/B测试方案对比不同模型版本的效果差异
  • 搭建人工评估平台收集高质量的人类反馈数据
  • 实现生产环境的模型漂移检测和自动化告警机制
  • 了解6种评估模式的适用场景、优缺点和组合使用策略

目录

  1. 架构总览:AI模型评估全景图
  2. Pattern 1:LLM基准测试(lm-evaluation-harness)
  3. Pattern 2:RAG评估(RAGAS框架)
  4. Pattern 3:自动化评测流水线(pytest)
  5. Pattern 4:A/B测试模型对比
  6. Pattern 5:人工评估平台
  7. Pattern 6:生产监控与漂移检测
  8. 5个常见陷阱与解决方案
  9. 10个常见错误排查
  10. 高级优化技巧
  11. 对比分析
  12. 在线工具推荐

架构总览:AI模型评估全景图

┌─────────────────────────────────────────────────────────────┐
│                   AI Model Evaluation Pipeline              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌──────────┐    ┌──────────┐    ┌──────────────────────┐  │
│  │ Offline  │    │ Online   │    │   Human-in-the-Loop  │  │
│  │ Eval     │    │ Eval     │    │   Evaluation         │  │
│  │          │    │          │    │                      │  │
│  │ • Bench  │    │ • A/B    │    │ • Preference Ranking │  │
│  │   mark   │    │   Test   │    │ • Quality Scoring    │  │
│  │ • RAG    │    │ • Drift  │    │ • Red Team Testing   │  │
│  │   Eval   │    │   Detect │    │ • Domain Expert      │  │
│  │ • Auto   │    │ • Prod   │    │   Review             │  │
│  │   Test   │    │   Monitor│    │                      │  │
│  └────┬─────┘    └────┬─────┘    └──────────┬───────────┘  │
│       │               │                     │              │
│       ▼               ▼                     ▼              │
│  ┌─────────────────────────────────────────────────────┐   │
│  │              Evaluation Results Store                │   │
│  │  ┌─────────┐  ┌──────────┐  ┌──────────────────┐  │   │
│  │  │ Metrics │  │ Reports  │  │ Comparison Board │  │   │
│  │  │ DB      │  │ Generator│  │                  │  │   │
│  │  └─────────┘  └──────────┘  └──────────────────┘  │   │
│  └─────────────────────────────────────────────────────┘   │
│                          │                                  │
│                          ▼                                  │
│  ┌─────────────────────────────────────────────────────┐   │
│  │              Decision & Action Layer                 │   │
│  │  • Model Promotion / Rollback                       │   │
│  │  • Retraining Trigger                               │   │
│  │  • Alert & Notification                             │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Pattern 1:LLM基准测试(lm-evaluation-harness)

为什么需要标准化基准测试?

"我们模型效果不错"——这句话没有任何信息量。你需要用标准化的数据集和指标,在可控条件下量化模型能力。EleutherAI的lm-evaluation-harness是2026年最广泛使用的LLM评测框架,支持200+任务。

完整评测流程

# llm_benchmark.py
from lm_eval import evaluator
from lm_eval.models.huggingface import HuggingFaceAuto
from typing import Dict, List, Optional
import json
import os

class LLMBenchmarkRunner:
    def __init__(
        self,
        model_path: str,
        device: str = "cuda",
        batch_size: int = 8,
    ):
        self.model_path = model_path
        self.device = device
        self.batch_size = batch_size
        self.results_history = []

    def run_core_tasks(self) -> Dict:
        core_tasks = [
            "mmlu",
            "hellaswag",
            "arc_challenge",
            "truthfulqa_mc2",
            "winogrande",
            "gsm8k",
        ]

        results = evaluator.simple_evaluate(
            model="hf",
            model_args=f"pretrained={self.model_path}",
            tasks=core_tasks,
            batch_size=self.batch_size,
            device=self.device,
        )

        formatted = self._format_results(results)
        self.results_history.append(formatted)
        return formatted

    def run_custom_task(self, task_config_path: str) -> Dict:
        results = evaluator.simple_evaluate(
            model="hf",
            model_args=f"pretrained={self.model_path}",
            tasks=[task_config_path],
            batch_size=self.batch_size,
            device=self.device,
        )
        return self._format_results(results)

    def _format_results(self, raw_results: Dict) -> Dict:
        formatted = {
            "model": self.model_path,
            "timestamp": self._get_timestamp(),
            "tasks": {},
        }
        for task_name, task_results in raw_results["results"].items():
            formatted["tasks"][task_name] = {
                k: round(v, 4) if isinstance(v, float) else v
                for k, v in task_results.items()
            }
        return formatted

    def compare_with_baseline(self, baseline_path: str) -> Dict:
        if not self.results_history:
            self.run_core_tasks()

        with open(baseline_path, "r") as f:
            baseline = json.load(f)

        current = self.results_history[-1]
        comparison = {}
        for task_name in current["tasks"]:
            if task_name in baseline["tasks"]:
                current_score = current["tasks"][task_name].get("acc,none", 0)
                baseline_score = baseline["tasks"][task_name].get("acc,none", 0)
                comparison[task_name] = {
                    "current": current_score,
                    "baseline": baseline_score,
                    "delta": round(current_score - baseline_score, 4),
                    "improved": current_score > baseline_score,
                }
        return comparison

    @staticmethod
    def _get_timestamp() -> str:
        from datetime import datetime
        return datetime.now().isoformat()


class CustomTaskConfig:
    @staticmethod
    def create_domain_eval(
        dataset_path: str,
        task_name: str,
        output_dir: str = "./custom_tasks",
    ) -> str:
        config = {
            "task": task_name,
            "dataset_path": dataset_path,
            "output_type": "multiple_choice",
            "test_split": "test",
            "doc_to_text": "{{question}}",
            "doc_to_target": "{{answer}}",
            "doc_to_choice": "{{choices}}",
            "metric_list": [
                {"metric": "acc", "aggregation": "mean"},
                {"metric": "f1", "aggregation": "mean"},
            ],
        }

        os.makedirs(output_dir, exist_ok=True)
        config_path = os.path.join(output_dir, f"{task_name}.yaml")
        import yaml
        with open(config_path, "w") as f:
            yaml.dump(config, f)

        return config_path


if __name__ == "__main__":
    runner = LLMBenchmarkRunner(
        model_path="meta-llama/Llama-3.1-8B-Instruct",
        batch_size=4,
    )

    results = runner.run_core_tasks()
    print(json.dumps(results, indent=2, ensure_ascii=False))

    comparison = runner.compare_with_baseline("./baseline_results.json")
    for task, delta_info in comparison.items():
        status = "↑" if delta_info["improved"] else "↓"
        print(f"{task}: {delta_info['baseline']:.4f} → {delta_info['current']:.4f} {status}{delta_info['delta']:+.4f}")

自定义领域评测任务

# custom_tasks/medical_qa.yaml
task: medical_qa
dataset_path: json
dataset_kwargs:
  data_files:
    test: ./data/medical_qa_test.jsonl
test_split: test
doc_to_text: "问题:{{question}}\n选项:\nA. {{A}}\nB. {{B}}\nC. {{C}}\nD. {{D}}\n答案:"
doc_to_target: "{{answer}}"
doc_to_choice: ["A", "B", "C", "D"]
metric_list:
  - metric: acc
    aggregation: mean
  - metric: f1
    aggregation: mean

Pattern 2:RAG评估(RAGAS框架)

RAG系统的评估维度

RAG系统涉及检索和生成两个环节,需要分别评估。RAGAS框架提供了4个核心指标:

指标 评估环节 含义 计算方式
Context Precision 检索 检索结果中相关文档的排名精度 相关文档排名的加权平均
Context Recall 检索 答案所需信息被检索到的比例 Ground Truth被检索内容覆盖的比例
Faithfulness 生成 生成答案与检索文档的事实一致性 答案声明在检索文档中的支持比例
Answer Relevancy 生成 答案与问题的相关性 答案生成原问题的逆向概率

完整RAG评估实现

# rag_evaluation_benchmark.py
from ragas import evaluate
from ragas.metrics import (
    context_precision,
    context_recall,
    faithfulness,
    answer_relevancy,
    AnswerSimilarity,
)
from datasets import Dataset
from typing import List, Dict, Optional
from dataclasses import dataclass, field
import json


@dataclass
class RAGEvalSample:
    question: str
    contexts: List[str]
    answer: str
    ground_truth: str


@dataclass
class RAGEvalReport:
    faithfulness: float
    answer_relevancy: float
    context_precision: float
    context_recall: float
    answer_similarity: float = 0.0
    sample_count: int = 0
    details: List[Dict] = field(default_factory=list)


class RAGEvaluator:
    def __init__(
        self,
        metrics: Optional[List] = None,
        llm=None,
        embeddings=None,
    ):
        self.metrics = metrics or [
            faithfulness,
            answer_relevancy,
            context_precision,
            context_recall,
        ]
        self.llm = llm
        self.embeddings = embeddings

    def evaluate_samples(
        self,
        samples: List[RAGEvalSample],
    ) -> RAGEvalReport:
        eval_data = {
            "question": [s.question for s in samples],
            "contexts": [s.contexts for s in samples],
            "answer": [s.answer for s in samples],
            "ground_truth": [s.ground_truth for s in samples],
        }

        dataset = Dataset.from_dict(eval_data)

        result = evaluate(
            dataset,
            metrics=self.metrics,
            llm=self.llm,
            embeddings=self.embeddings,
        )

        return RAGEvalReport(
            faithfulness=result["faithfulness"],
            answer_relevancy=result["answer_relevancy"],
            context_precision=result["context_precision"],
            context_recall=result["context_recall"],
            sample_count=len(samples),
        )

    def evaluate_rag_pipeline(
        self,
        rag_pipeline,
        test_questions: List[Dict],
    ) -> RAGEvalReport:
        samples = []
        for q in test_questions:
            rag_result = rag_pipeline.query(q["question"])
            sample = RAGEvalSample(
                question=q["question"],
                contexts=rag_result["contexts"],
                answer=rag_result["answer"],
                ground_truth=q["ground_truth"],
            )
            samples.append(sample)

        return self.evaluate_samples(samples)

    def compare_pipelines(
        self,
        pipelines: Dict[str, object],
        test_questions: List[Dict],
    ) -> Dict[str, RAGEvalReport]:
        reports = {}
        for name, pipeline in pipelines.items():
            report = self.evaluate_rag_pipeline(pipeline, test_questions)
            reports[name] = report
            print(f"\n=== {name} ===")
            print(f"  Faithfulness:       {report.faithfulness:.4f}")
            print(f"  Answer Relevancy:   {report.answer_relevancy:.4f}")
            print(f"  Context Precision:  {report.context_precision:.4f}")
            print(f"  Context Recall:     {report.context_recall:.4f}")
        return reports


class RAGEvalDatasetBuilder:
    @staticmethod
    def from_qa_pairs(
        qa_pairs: List[Dict],
        rag_pipeline=None,
    ) -> List[RAGEvalSample]:
        samples = []
        for qa in qa_pairs:
            if rag_pipeline and "contexts" not in qa:
                result = rag_pipeline.query(qa["question"])
                contexts = result["contexts"]
                answer = result["answer"]
            else:
                contexts = qa.get("contexts", [])
                answer = qa.get("answer", "")

            samples.append(RAGEvalSample(
                question=qa["question"],
                contexts=contexts,
                answer=answer,
                ground_truth=qa["ground_truth"],
            ))
        return samples

    @staticmethod
    def from_jsonl(file_path: str) -> List[RAGEvalSample]:
        samples = []
        with open(file_path, "r", encoding="utf-8") as f:
            for line in f:
                data = json.loads(line.strip())
                samples.append(RAGEvalSample(
                    question=data["question"],
                    contexts=data["contexts"],
                    answer=data["answer"],
                    ground_truth=data["ground_truth"],
                ))
        return samples


if __name__ == "__main__":
    test_data = [
        {
            "question": "什么是零信任网络访问(ZTNA)?",
            "contexts": ["零信任网络访问(ZTNA)是一种安全模型,基于'永不信任,始终验证'的原则,为远程用户提供对特定应用程序的安全访问。"],
            "answer": "零信任网络访问是一种安全模型,核心原则是永不信任始终验证,为远程用户提供安全访问。",
            "ground_truth": "零信任网络访问(ZTNA)是一种安全架构,基于永不信任始终验证的原则,通过身份验证和授权为远程用户提供对特定应用的安全访问,替代传统VPN。",
        },
        {
            "question": "SASE架构包含哪些核心组件?",
            "contexts": ["SASE(安全访问服务边缘)将SD-WAN、SWG、CASB、FWaaS和ZTNA整合为统一的云原生服务。"],
            "answer": "SASE架构包含SD-WAN、SWG、CASB、FWaaS和ZTNA等核心组件,整合为统一的云原生服务。",
            "ground_truth": "SASE架构的核心组件包括SD-WAN(软件定义广域网)、SWG(安全Web网关)、CASB(云访问安全代理)、FWaaS(防火墙即服务)和ZTNA(零信任网络访问),整合为统一的云原生服务交付模型。",
        },
    ]

    evaluator = RAGEvaluator()
    samples = RAGEvalDatasetBuilder.from_qa_pairs(test_data)
    report = evaluator.evaluate_samples(samples)

    print(f"\n=== RAG Evaluation Report ===")
    print(f"Faithfulness:       {report.faithfulness:.4f}")
    print(f"Answer Relevancy:   {report.answer_relevancy:.4f}")
    print(f"Context Precision:  {report.context_precision:.4f}")
    print(f"Context Recall:     {report.context_recall:.4f}")
    print(f"Sample Count:       {report.sample_count}")

Pattern 3:自动化评测流水线(pytest)

为什么需要自动化评测?

手动评测不可复现、不可追溯、无法集成到CI/CD。pytest驱动的自动化评测流水线让模型评估像单元测试一样可靠。

完整自动化评测框架

# tests/conftest.py
import pytest
from typing import Dict, List
import json
import os


@pytest.fixture(scope="session")
def model_client():
    from openai import OpenAI
    return OpenAI(
        base_url=os.getenv("MODEL_API_URL", "http://localhost:8000/v1"),
        api_key=os.getenv("MODEL_API_KEY", "test"),
    )


@pytest.fixture(scope="session")
def eval_dataset():
    with open("./data/eval_dataset.json", "r", encoding="utf-8") as f:
        return json.load(f)


@pytest.fixture(scope="session")
def baseline_scores():
    with open("./data/baseline_scores.json", "r", encoding="utf-8") as f:
        return json.load(f)


# tests/test_model_quality.py
import pytest
from typing import Dict


class TestModelQuality:
    def test_factual_accuracy(self, model_client, eval_dataset):
        factual_questions = [
            q for q in eval_dataset if q["category"] == "factual"
        ]
        correct = 0
        for q in factual_questions:
            response = model_client.chat.completions.create(
                model="default",
                messages=[{"role": "user", "content": q["question"]}],
                temperature=0.0,
            )
            answer = response.choices[0].message.content
            if self._check_answer(answer, q["expected_keywords"]):
                correct += 1

        accuracy = correct / len(factual_questions)
        assert accuracy >= 0.85, f"Factual accuracy {accuracy:.2%} below threshold 85%"

    def test_no_hallucination(self, model_client, eval_dataset):
        hallucination_prompts = [
            q for q in eval_dataset if q["category"] == "hallucination_trap"
        ]
        hallucinated = 0
        for q in hallucination_prompts:
            response = model_client.chat.completions.create(
                model="default",
                messages=[{"role": "user", "content": q["question"]}],
                temperature=0.0,
            )
            answer = response.choices[0].message.content
            if self._contains_hallucination(answer, q["trap_keywords"]):
                hallucinated += 1

        hallucination_rate = hallucinated / len(hallucination_prompts)
        assert hallucination_rate <= 0.10, f"Hallucination rate {hallucination_rate:.2%} above threshold 10%"

    def test_response_latency(self, model_client, eval_dataset):
        import time
        latencies = []
        for q in eval_dataset[:20]:
            start = time.time()
            model_client.chat.completions.create(
                model="default",
                messages=[{"role": "user", "content": q["question"]}],
            )
            latencies.append(time.time() - start)

        avg_latency = sum(latencies) / len(latencies)
        p95_latency = sorted(latencies)[int(len(latencies) * 0.95)]
        assert avg_latency <= 2.0, f"Average latency {avg_latency:.2f}s above threshold 2s"
        assert p95_latency <= 5.0, f"P95 latency {p95_latency:.2f}s above threshold 5s"

    def test_output_format_compliance(self, model_client, eval_dataset):
        format_questions = [
            q for q in eval_dataset if q.get("expected_format") == "json"
        ]
        format_errors = 0
        for q in format_questions:
            response = model_client.chat.completions.create(
                model="default",
                messages=[{"role": "user", "content": q["question"]}],
                temperature=0.0,
            )
            answer = response.choices[0].message.content
            try:
                json.loads(answer)
            except json.JSONDecodeError:
                format_errors += 1

        format_accuracy = 1 - format_errors / len(format_questions)
        assert format_accuracy >= 0.95, f"JSON format accuracy {format_accuracy:.2%} below 95%"

    def test_regression_against_baseline(self, model_client, eval_dataset, baseline_scores):
        current_scores = self._run_evaluation_suite(model_client, eval_dataset)
        for metric, baseline_value in baseline_scores.items():
            current_value = current_scores.get(metric, 0)
            assert current_value >= baseline_value * 0.95, (
                f"Regression detected: {metric} dropped from {baseline_value:.4f} to {current_value:.4f}"
            )

    @staticmethod
    def _check_answer(answer: str, keywords: List[str]) -> bool:
        answer_lower = answer.lower()
        matched = sum(1 for kw in keywords if kw.lower() in answer_lower)
        return matched >= len(keywords) * 0.6

    @staticmethod
    def _contains_hallucination(answer: str, trap_keywords: List[str]) -> bool:
        answer_lower = answer.lower()
        return any(kw.lower() in answer_lower for kw in trap_keywords)

    @staticmethod
    def _run_evaluation_suite(model_client, eval_dataset) -> Dict:
        return {
            "accuracy": 0.88,
            "faithfulness": 0.91,
            "relevancy": 0.85,
        }


# pytest.ini
"""
[pytest]
testpaths = tests
python_files = test_model_quality.py
python_classes = TestModelQuality
python_functions = test_*
addopts = -v --tb=short --json-report --json-report-file=eval_report.json
markers =
    smoke: smoke tests for quick validation
    regression: full regression test suite
    benchmark: performance benchmark tests
"""

CI/CD集成

# .github/workflows/model_eval.yml
name: Model Evaluation Pipeline

on:
  pull_request:
    paths:
      - 'models/**'
      - 'config/**'

jobs:
  model-eval:
    runs-on: gpu-runner
    steps:
      - uses: actions/checkout@v4

      - name: Setup Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.12'

      - name: Install Dependencies
        run: |
          pip install -r requirements-eval.txt
          pip install pytest pytest-json-report

      - name: Deploy Model Canary
        run: |
          python scripts/deploy_canary.py --model-path ${{ env.MODEL_PATH }}

      - name: Run Smoke Tests
        run: pytest tests/ -m smoke -v

      - name: Run Full Evaluation
        run: pytest tests/ -m regression -v --json-report

      - name: Check Regression
        run: python scripts/check_regression.py --report eval_report.json --baseline data/baseline_scores.json

      - name: Generate Report
        if: always()
        run: python scripts/generate_eval_report.py --report eval_report.json

      - name: Upload Results
        if: always()
        uses: actions/upload-artifact@v4
        with:
          name: eval-results
          path: eval_report.json

Pattern 4:A/B测试模型对比

为什么需要A/B测试?

离线评测分数高不等于线上效果好。A/B测试在真实流量中对比模型,是最可靠的效果验证方式。

A/B测试框架实现

# ab_test_framework.py
import hashlib
import random
import time
from typing import Dict, List, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import json
import statistics


class AllocationStrategy(Enum):
    RANDOM = "random"
    HASH_BASED = "hash_based"
    STICKY = "sticky"


@dataclass
class ABTestConfig:
    test_name: str
    variant_a_name: str
    variant_b_name: str
    traffic_split: float = 0.5
    min_sample_size: int = 1000
    confidence_level: float = 0.95
    allocation_strategy: AllocationStrategy = AllocationStrategy.HASH_BASED
    duration_hours: int = 72


@dataclass
class ABTestResult:
    query: str
    variant: str
    response: str
    latency_ms: float
    timestamp: str
    user_feedback: Optional[int] = None
    auto_score: Optional[float] = None


@dataclass
class ABTestReport:
    test_name: str
    variant_a: Dict
    variant_b: Dict
    winner: Optional[str] = None
    confidence: float = 0.0
    is_significant: bool = False
    sample_size_a: int = 0
    sample_size_b: int = 0


class ABTestRunner:
    def __init__(self, config: ABTestConfig):
        self.config = config
        self.results: List[ABTestResult] = []
        self._sticky_map: Dict[str, str] = {}

    def allocate_variant(self, user_id: str) -> str:
        if self.config.allocation_strategy == AllocationStrategy.RANDOM:
            return self.config.variant_a_name if random.random() < self.config.traffic_split else self.config.variant_b_name

        elif self.config.allocation_strategy == AllocationStrategy.HASH_BASED:
            hash_val = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
            threshold = int(self.config.traffic_split * (2**128))
            variant = self.config.variant_a_name if hash_val < threshold else self.config.variant_b_name
            return variant

        elif self.config.allocation_strategy == AllocationStrategy.STICKY:
            if user_id in self._sticky_map:
                return self._sticky_map[user_id]
            variant = self.allocate_variant(user_id + "_init")
            self._sticky_map[user_id] = variant
            return variant

    def record_result(self, result: ABTestResult):
        self.results.append(result)

    def run_test(
        self,
        queries: List[str],
        model_a_fn: Callable,
        model_b_fn: Callable,
        evaluator_fn: Optional[Callable] = None,
    ) -> ABTestReport:
        for i, query in enumerate(queries):
            user_id = f"user_{i}"
            variant = self.allocate_variant(user_id)
            model_fn = model_a_fn if variant == self.config.variant_a_name else model_b_fn

            start_time = time.time()
            response = model_fn(query)
            latency_ms = (time.time() - start_time) * 1000

            auto_score = evaluator_fn(query, response) if evaluator_fn else None

            self.record_result(ABTestResult(
                query=query,
                variant=variant,
                response=response,
                latency_ms=latency_ms,
                timestamp=datetime.now().isoformat(),
                auto_score=auto_score,
            ))

        return self.analyze()

    def analyze(self) -> ABTestReport:
        a_results = [r for r in self.results if r.variant == self.config.variant_a_name]
        b_results = [r for r in self.results if r.variant == self.config.variant_b_name]

        a_scores = [r.auto_score for r in a_results if r.auto_score is not None]
        b_scores = [r.auto_score for r in b_results if r.auto_score is not None]

        a_latencies = [r.latency_ms for r in a_results]
        b_latencies = [r.latency_ms for r in b_results]

        a_feedback = [r.user_feedback for r in a_results if r.user_feedback is not None]
        b_feedback = [r.user_feedback for r in b_results if r.user_feedback is not None]

        stats_a = {
            "avg_score": statistics.mean(a_scores) if a_scores else 0,
            "avg_latency_ms": statistics.mean(a_latencies) if a_latencies else 0,
            "p95_latency_ms": sorted(a_latencies)[int(len(a_latencies) * 0.95)] if a_latencies else 0,
            "avg_feedback": statistics.mean(a_feedback) if a_feedback else 0,
        }
        stats_b = {
            "avg_score": statistics.mean(b_scores) if b_scores else 0,
            "avg_latency_ms": statistics.mean(b_latencies) if b_latencies else 0,
            "p95_latency_ms": sorted(b_latencies)[int(len(b_latencies) * 0.95)] if b_latencies else 0,
            "avg_feedback": statistics.mean(b_feedback) if b_feedback else 0,
        }

        is_significant = False
        confidence = 0.0
        if a_scores and b_scores and len(a_scores) >= 30 and len(b_scores) >= 30:
            confidence, is_significant = self._statistical_test(a_scores, b_scores)

        winner = None
        if is_significant:
            if stats_a["avg_score"] > stats_b["avg_score"]:
                winner = self.config.variant_a_name
            else:
                winner = self.config.variant_b_name

        return ABTestReport(
            test_name=self.config.test_name,
            variant_a=stats_a,
            variant_b=stats_b,
            winner=winner,
            confidence=confidence,
            is_significant=is_significant,
            sample_size_a=len(a_results),
            sample_size_b=len(b_results),
        )

    @staticmethod
    def _statistical_test(a: List[float], b: List[float]) -> tuple:
        from scipy import stats
        t_stat, p_value = stats.ttest_ind(a, b)
        confidence = 1 - p_value
        is_significant = p_value < 0.05
        return round(confidence, 4), is_significant


if __name__ == "__main__":
    config = ABTestConfig(
        test_name="llm_v1_vs_v2",
        variant_a_name="llama-3.1-8b",
        variant_b_name="llama-3.1-8b-finetuned",
        traffic_split=0.5,
        min_sample_size=500,
    )

    runner = ABTestRunner(config)

    def model_a_fn(query: str) -> str:
        return f"Model A response to: {query}"

    def model_b_fn(query: str) -> str:
        return f"Model B enhanced response to: {query}"

    def evaluator_fn(query: str, response: str) -> float:
        return random.uniform(0.7, 1.0)

    queries = [f"测试问题 {i}" for i in range(200)]
    report = runner.run_test(queries, model_a_fn, model_b_fn, evaluator_fn)

    print(f"Winner: {report.winner}")
    print(f"Confidence: {report.confidence:.2%}")
    print(f"Variant A avg score: {report.variant_a['avg_score']:.4f}")
    print(f"Variant B avg score: {report.variant_b['avg_score']:.4f}")

Pattern 5:人工评估平台

为什么需要人工评估?

自动化指标无法捕捉所有质量维度。流畅性、有用性、安全性、细微的事实错误——这些都需要人类判断。人工评估是模型评估的"金标准"。

人工评估平台实现

# human_eval_platform.py
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime
import json
import uuid
import statistics


class EvalTaskType(Enum):
    SINGLE_RESPONSE = "single_response"
    PAIRWISE_COMPARISON = "pairwise_comparison"
    RANKING = "ranking"
    ERROR_ANNOTATION = "error_annotation"


class QualityDimension(Enum):
    FACTUAL_ACCURACY = "factual_accuracy"
    RELEVANCE = "relevance"
    COHERENCE = "coherence"
    FLUENCY = "fluency"
    SAFETY = "safety"
    HELPFULNESS = "helpfulness"


@dataclass
class EvalTask:
    task_id: str
    task_type: EvalTaskType
    question: str
    responses: List[str]
    quality_dimensions: List[QualityDimension]
    guidelines: str = ""
    metadata: Dict = field(default_factory=dict)


@dataclass
class AnnotatorResult:
    task_id: str
    annotator_id: str
    scores: Dict[str, float]
    preference: Optional[int] = None
    comments: str = ""
    duration_seconds: float = 0.0
    timestamp: str = ""


@dataclass
class InterAnnotatorAgreement:
    dimension: str
    cohens_kappa: float
    fleiss_kappa: float
    agreement_rate: float


class HumanEvalPlatform:
    def __init__(self):
        self.tasks: Dict[str, EvalTask] = {}
        self.results: Dict[str, List[AnnotatorResult]] = {}
        self.annotator_stats: Dict[str, Dict] = {}

    def create_task(
        self,
        question: str,
        responses: List[str],
        task_type: EvalTaskType = EvalTaskType.SINGLE_RESPONSE,
        quality_dimensions: Optional[List[QualityDimension]] = None,
        guidelines: str = "",
    ) -> EvalTask:
        task_id = str(uuid.uuid4())[:8]
        task = EvalTask(
            task_id=task_id,
            task_type=task_type,
            question=question,
            responses=responses,
            quality_dimensions=quality_dimensions or [
                QualityDimension.FACTUAL_ACCURACY,
                QualityDimension.RELEVANCE,
                QualityDimension.COHERENCE,
            ],
            guidelines=guidelines,
        )
        self.tasks[task_id] = task
        self.results[task_id] = []
        return task

    def submit_annotation(self, result: AnnotatorResult):
        if result.task_id not in self.results:
            raise ValueError(f"Task {result.task_id} not found")
        result.timestamp = datetime.now().isoformat()
        self.results[result.task_id].append(result)
        self._update_annotator_stats(result)

    def get_next_task(self, annotator_id: str) -> Optional[EvalTask]:
        for task_id, task in self.tasks.items():
            existing_annotators = {r.annotator_id for r in self.results[task_id]}
            if annotator_id not in existing_annotators and len(existing_annotators) < 3:
                return task
        return None

    def compute_agreement(self, task_ids: Optional[List[str]] = None) -> List[InterAnnotatorAgreement]:
        target_tasks = task_ids or list(self.tasks.keys())
        agreements = []

        all_dimensions = set()
        for task_id in target_tasks:
            for result in self.results.get(task_id, []):
                all_dimensions.update(result.scores.keys())

        for dimension in all_dimensions:
            scores_by_task = {}
            for task_id in target_tasks:
                task_results = self.results.get(task_id, [])
                if len(task_results) >= 2:
                    scores_by_task[task_id] = [r.scores.get(dimension, 0) for r in task_results]

            if not scores_by_task:
                continue

            agreement_rate = self._compute_pairwise_agreement(scores_by_task)
            cohens_kappa = self._compute_cohens_kappa(scores_by_task)
            fleiss_kappa = self._compute_fleiss_kappa(scores_by_task)

            agreements.append(InterAnnotatorAgreement(
                dimension=dimension,
                cohens_kappa=round(cohens_kappa, 4),
                fleiss_kappa=round(fleiss_kappa, 4),
                agreement_rate=round(agreement_rate, 4),
            ))

        return agreements

    def generate_report(self) -> Dict:
        all_scores = {}
        for task_id, results in self.results.items():
            for result in results:
                for dim, score in result.scores.items():
                    if dim not in all_scores:
                        all_scores[dim] = []
                    all_scores[dim].append(score)

        dimension_stats = {}
        for dim, scores in all_scores.items():
            dimension_stats[dim] = {
                "mean": round(statistics.mean(scores), 4),
                "median": round(statistics.median(scores), 4),
                "stdev": round(statistics.stdev(scores), 4) if len(scores) > 1 else 0,
                "count": len(scores),
            }

        pairwise_stats = {}
        pairwise_tasks = [
            (tid, t) for tid, t in self.tasks.items()
            if t.task_type == EvalTaskType.PAIRWISE_COMPARISON
        ]
        for task_id, task in pairwise_tasks:
            prefs = [r.preference for r in self.results[task_id] if r.preference is not None]
            if prefs:
                pairwise_stats[task_id] = {
                    "response_a_wins": sum(1 for p in prefs if p == 0),
                    "response_b_wins": sum(1 for p in prefs if p == 1),
                    "total_votes": len(prefs),
                }

        return {
            "total_tasks": len(self.tasks),
            "total_annotations": sum(len(r) for r in self.results.values()),
            "dimension_stats": dimension_stats,
            "pairwise_stats": pairwise_stats,
            "annotator_count": len(self.annotator_stats),
        }

    def _update_annotator_stats(self, result: AnnotatorResult):
        aid = result.annotator_id
        if aid not in self.annotator_stats:
            self.annotator_stats[aid] = {"count": 0, "total_duration": 0}
        self.annotator_stats[aid]["count"] += 1
        self.annotator_stats[aid]["total_duration"] += result.duration_seconds

    @staticmethod
    def _compute_pairwise_agreement(scores_by_task: Dict) -> float:
        agreements = 0
        total = 0
        for task_id, scores in scores_by_task.items():
            for i in range(len(scores)):
                for j in range(i + 1, len(scores)):
                    if abs(scores[i] - scores[j]) <= 1:
                        agreements += 1
                    total += 1
        return agreements / total if total > 0 else 0

    @staticmethod
    def _compute_cohens_kappa(scores_by_task: Dict) -> float:
        if len(scores_by_task) < 1:
            return 0.0
        all_pairs = []
        for scores in scores_by_task.values():
            if len(scores) >= 2:
                all_pairs.append((scores[0], scores[1]))
        if not all_pairs:
            return 0.0
        rater1 = [p[0] for p in all_pairs]
        rater2 = [p[1] for p in all_pairs]
        n = len(rater1)
        agree = sum(1 for a, b in zip(rater1, rater2) if abs(a - b) <= 1)
        p_observed = agree / n
        p_expected = 0.2
        return (p_observed - p_expected) / (1 - p_expected) if (1 - p_expected) != 0 else 0

    @staticmethod
    def _compute_fleiss_kappa(scores_by_task: Dict) -> float:
        return 0.0


if __name__ == "__main__":
    platform = HumanEvalPlatform()

    task = platform.create_task(
        question="解释量子计算的基本原理",
        responses=[
            "量子计算利用量子比特(qubit)的叠加态和纠缠态进行并行计算...",
            "量子计算是一种利用量子力学原理的计算方式,通过量子比特实现...",
        ],
        task_type=EvalTaskType.PAIRWISE_COMPARISON,
        quality_dimensions=[
            QualityDimension.FACTUAL_ACCURACY,
            QualityDimension.RELEVANCE,
            QualityDimension.COHERENCE,
        ],
    )

    for annotator_id in ["ann_1", "ann_2", "ann_3"]:
        result = AnnotatorResult(
            task_id=task.task_id,
            annotator_id=annotator_id,
            scores={
                "factual_accuracy": 4.0 + (hash(annotator_id) % 10) / 10,
                "relevance": 3.5 + (hash(annotator_id) % 10) / 10,
                "coherence": 4.0 + (hash(annotator_id) % 10) / 10,
            },
            preference=0 if hash(annotator_id) % 2 == 0 else 1,
            duration_seconds=45.0,
        )
        platform.submit_annotation(result)

    report = platform.generate_report()
    print(json.dumps(report, indent=2, ensure_ascii=False))

    agreements = platform.compute_agreement()
    for a in agreements:
        print(f"{a.dimension}: κ={a.cohens_kappa:.4f}, agreement={a.agreement_rate:.2%}")

Pattern 6:生产监控与漂移检测

为什么需要生产监控?

模型上线不是终点,而是监控的起点。数据分布变化、用户行为偏移、模型退化——这些问题如果不及早发现,会导致沉默的质量下降。

漂移检测系统实现

# production_monitor.py
import numpy as np
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import json
import logging

logger = logging.getLogger(__name__)


class DriftType(Enum):
    DATA_DRIFT = "data_drift"
    CONCEPT_DRIFT = "concept_drift"
    PREDICTION_DRIFT = "prediction_drift"


class AlertLevel(Enum):
    INFO = "info"
    WARNING = "warning"
    CRITICAL = "critical"


@dataclass
class DriftAlert:
    drift_type: DriftType
    metric_name: str
    current_value: float
    baseline_value: float
    drift_score: float
    alert_level: AlertLevel
    timestamp: str
    message: str


@dataclass
class MonitoringWindow:
    window_size: int = 1000
    reference_size: int = 5000


class DataDriftDetector:
    def __init__(
        self,
        reference_data: np.ndarray,
        significance_level: float = 0.05,
    ):
        self.reference_data = reference_data
        self.significance_level = significance_level
        self.reference_mean = np.mean(reference_data, axis=0)
        self.reference_std = np.std(reference_data, axis=0)

    def detect_ks_test(self, current_data: np.ndarray) -> Tuple[float, bool]:
        from scipy.stats import ks_2samp
        stat, p_value = ks_2samp(self.reference_data, current_data)
        is_drift = p_value < self.significance_level
        return p_value, is_drift

    def detect_psi(self, current_data: np.ndarray, n_bins: int = 10) -> Tuple[float, bool]:
        ref_hist, bin_edges = np.histogram(self.reference_data, bins=n_bins, density=True)
        cur_hist, _ = np.histogram(current_data, bins=bin_edges, density=True)

        ref_hist = ref_hist / ref_hist.sum()
        cur_hist = cur_hist / cur_hist.sum()

        ref_hist = np.clip(ref_hist, 1e-6, None)
        cur_hist = np.clip(cur_hist, 1e-6, None)

        psi = np.sum((cur_hist - ref_hist) * np.log(cur_hist / ref_hist))

        if psi < 0.1:
            is_drift = False
        elif psi < 0.25:
            is_drift = True
        else:
            is_drift = True

        return round(psi, 4), is_drift

    def detect_z_score(self, current_data: np.ndarray, threshold: float = 3.0) -> Tuple[float, bool]:
        current_mean = np.mean(current_data)
        z_score = abs(current_mean - self.reference_mean) / (self.reference_std + 1e-8)
        is_drift = z_score > threshold
        return round(float(z_score), 4), is_drift


class PredictionDriftMonitor:
    def __init__(
        self,
        reference_predictions: List[Dict],
        window_size: int = 1000,
    ):
        self.reference_predictions = reference_predictions
        self.window_size = window_size
        self.prediction_buffer: List[Dict] = []
        self.alerts: List[DriftAlert] = []

    def record_prediction(self, prediction: Dict):
        self.prediction_buffer.append({
            **prediction,
            "timestamp": datetime.now().isoformat(),
        })

        if len(self.prediction_buffer) >= self.window_size:
            self._check_drift()
            self.prediction_buffer = self.prediction_buffer[-self.window_size // 2:]

    def _check_drift(self):
        ref_scores = [p.get("confidence", 0) for p in self.reference_predictions]
        cur_scores = [p.get("confidence", 0) for p in self.prediction_buffer]

        ref_arr = np.array(ref_scores)
        cur_arr = np.array(cur_scores)

        detector = DataDriftDetector(ref_arr)
        psi_value, is_psi_drift = detector.detect_psi(cur_arr)
        z_score, is_z_drift = detector.detect_z_score(cur_arr)

        if is_psi_drift or is_z_drift:
            level = AlertLevel.CRITICAL if psi_value > 0.25 else AlertLevel.WARNING
            alert = DriftAlert(
                drift_type=DriftType.PREDICTION_DRIFT,
                metric_name="confidence_score",
                current_value=float(np.mean(cur_arr)),
                baseline_value=float(np.mean(ref_arr)),
                drift_score=psi_value,
                alert_level=level,
                timestamp=datetime.now().isoformat(),
                message=f"Prediction drift detected: PSI={psi_value:.4f}, Z-score={z_score:.4f}",
            )
            self.alerts.append(alert)
            logger.warning(alert.message)

    def get_health_report(self) -> Dict:
        recent_alerts = [
            a for a in self.alerts
            if datetime.fromisoformat(a.timestamp) > datetime.now() - timedelta(hours=24)
        ]
        return {
            "total_predictions_monitored": len(self.prediction_buffer),
            "alerts_last_24h": len(recent_alerts),
            "critical_alerts": sum(1 for a in recent_alerts if a.alert_level == AlertLevel.CRITICAL),
            "latest_drift_score": self.alerts[-1].drift_score if self.alerts else 0,
            "status": "healthy" if not recent_alerts else "degraded",
        }


class ModelPerformanceTracker:
    def __init__(self, baseline_metrics: Dict[str, float]):
        self.baseline_metrics = baseline_metrics
        self.metric_history: List[Dict] = []
        self.degradation_threshold = 0.05

    def record_metrics(self, metrics: Dict[str, float]):
        entry = {
            "timestamp": datetime.now().isoformat(),
            "metrics": metrics,
        }
        self.metric_history.append(entry)
        self._check_degradation(metrics)

    def _check_degradation(self, current_metrics: Dict[str, float]):
        for metric_name, baseline_value in self.baseline_metrics.items():
            current_value = current_metrics.get(metric_name)
            if current_value is None:
                continue
            relative_change = (baseline_value - current_value) / baseline_value
            if relative_change > self.degradation_threshold:
                logger.warning(
                    f"Degradation detected: {metric_name} dropped from "
                    f"{baseline_value:.4f} to {current_value:.4f} "
                    f"({relative_change:.2%} decrease)"
                )

    def get_trend(self, metric_name: str, hours: int = 24) -> Dict:
        cutoff = datetime.now() - timedelta(hours=hours)
        recent = [
            entry for entry in self.metric_history
            if datetime.fromisoformat(entry["timestamp"]) > cutoff
            and metric_name in entry["metrics"]
        ]
        if not recent:
            return {"trend": "no_data", "values": []}

        values = [entry["metrics"][metric_name] for entry in recent]
        trend = "stable"
        if len(values) >= 3:
            first_half = np.mean(values[:len(values)//2])
            second_half = np.mean(values[len(values)//2:])
            if second_half < first_half * 0.95:
                trend = "declining"
            elif second_half > first_half * 1.05:
                trend = "improving"

        return {
            "trend": trend,
            "current": values[-1],
            "baseline": self.baseline_metrics.get(metric_name),
            "values": values,
        }


if __name__ == "__main__":
    np.random.seed(42)
    reference = np.random.normal(0.85, 0.05, 5000)

    detector = DataDriftDetector(reference)

    healthy_data = np.random.normal(0.84, 0.05, 1000)
    drifted_data = np.random.normal(0.70, 0.08, 1000)

    psi_healthy, drift_healthy = detector.detect_psi(healthy_data)
    print(f"Healthy data: PSI={psi_healthy:.4f}, drift={drift_healthy}")

    psi_drifted, drift_drifted = detector.detect_psi(drifted_data)
    print(f"Drifted data: PSI={psi_drifted:.4f}, drift={drift_drifted}")

    reference_preds = [{"confidence": np.random.uniform(0.8, 0.95)} for _ in range(5000)]
    monitor = PredictionDriftMonitor(reference_preds, window_size=100)

    for _ in range(50):
        monitor.record_prediction({"confidence": np.random.uniform(0.8, 0.95)})

    for _ in range(50):
        monitor.record_prediction({"confidence": np.random.uniform(0.5, 0.7)})

    health = monitor.get_health_report()
    print(f"\nHealth Report: {json.dumps(health, indent=2)}")

5个常见陷阱与解决方案

陷阱1:评测数据泄露

训练数据和评测数据有重叠,导致评测分数虚高。

解决方案:严格的数据隔离策略,使用hash去重确保无重叠。

def check_data_leakage(train_data: List[str], eval_data: List[str], threshold: float = 0.8) -> Dict:
    from difflib import SequenceMatcher
    leaks = []
    for i, eval_item in enumerate(eval_data):
        for train_item in train_data:
            similarity = SequenceMatcher(None, eval_item, train_item).ratio()
            if similarity > threshold:
                leaks.append({"eval_index": i, "similarity": round(similarity, 4)})
                break
    return {"leak_count": len(leaks), "leak_rate": len(leaks) / len(eval_data)}

陷阱2:评测指标与业务目标脱节

模型在MMLU上得分很高,但业务KPI没有改善。

解决方案:建立指标-业务映射表,确保评测指标与业务目标对齐。

def align_metrics_with_business(eval_metrics: Dict, business_kpis: Dict) -> List[str]:
    alignment_map = {
        "faithfulness": ["customer_satisfaction", "complaint_rate"],
        "answer_relevancy": ["task_completion_rate", "user_engagement"],
        "latency_p95": ["session_duration", "bounce_rate"],
    }
    misaligned = []
    for metric in eval_metrics:
        if metric not in alignment_map:
            misaligned.append(f"Metric '{metric}' has no business KPI mapping")
    return misaligned

陷阱3:A/B测试样本量不足

跑了200条就下结论,统计功效不足。

解决方案:预先计算所需样本量。

def calculate_sample_size(
    baseline_rate: float,
    minimum_detectable_effect: float,
    alpha: float = 0.05,
    power: float = 0.8,
) -> int:
    import math
    from scipy.stats import norm
    z_alpha = norm.ppf(1 - alpha / 2)
    z_beta = norm.ppf(power)
    p1 = baseline_rate
    p2 = baseline_rate * (1 + minimum_detectable_effect)
    p_avg = (p1 + p2) / 2
    n = (z_alpha * math.sqrt(2 * p_avg * (1 - p_avg)) +
         z_beta * math.sqrt(p1 * (1 - p1) + p2 * (1 - p2))) ** 2 / (p2 - p1) ** 2
    return math.ceil(n)

陷阱4:忽略评估者间一致性

多个人标注结果差异很大,但直接取平均,掩盖了标注质量问题。

解决方案:计算Cohen's Kappa,低于0.6的维度需要重新标注。

陷阱5:生产环境漂移检测延迟

只在月度评测时才发现模型退化,影响已持续数周。

解决方案:实时监控+滑动窗口检测,设置小时级告警。


10个常见错误排查

错误现象 可能原因 排查步骤 解决方案
lm-eval任务报OOM batch_size过大或模型过大 减小batch_size,启用FSDP batch_size=1 + device_map=auto
RAGAS faithfulness为0 contexts为空或answer为空 检查eval_data中contexts字段 确保contexts非空且内容相关
pytest超时 模型推理延迟过高 检查GPU利用率和batch配置 增加timeout或优化推理配置
A/B测试结果不显著 样本量不足或效果差异太小 计算统计功效和所需样本量 延长测试周期或提高流量比例
人工评估Kappa<0.4 标注指南不清晰 审查指南并做校准测试 增加示例和边界case说明
PSI误报频繁 参考数据分布过窄 扩大参考数据时间窗口 使用30天数据作为基线
评测结果不可复现 随机种子未固定 设置全局随机种子 torch.manual_seed(42) + numpy.random.seed(42)
评测数据格式错误 字段缺失或类型不匹配 用schema校验数据 使用Pydantic模型校验
漂移检测延迟高 监控窗口设置过大 缩小滑动窗口 从1000条减到200条
评测报告指标缺失 某些metric计算失败 检查LLM API调用是否超时 添加重试机制和fallback

高级优化技巧

1. 多维度交叉评估

单一指标无法全面反映模型质量。通过交叉评估矩阵发现短板:

class CrossDimensionEvaluator:
    def __init__(self, dimensions: List[str]):
        self.dimensions = dimensions
        self.matrix = {d: {d2: [] for d2 in dimensions} for d in dimensions}

    def evaluate_cross(self, samples: List[Dict], eval_fn) -> Dict:
        for sample in samples:
            scores = eval_fn(sample)
            for d1 in self.dimensions:
                for d2 in self.dimensions:
                    if d1 != d2:
                        self.matrix[d1][d2].append(scores.get(d1, 0) * scores.get(d2, 0))

        correlation = {}
        for d1 in self.dimensions:
            for d2 in self.dimensions:
                if d1 != d2 and self.matrix[d1][d2]:
                    import statistics
                    correlation[f"{d1}×{d2}"] = round(
                        statistics.mean(self.matrix[d1][d2]), 4
                    )
        return correlation

2. 动态评测集生成

静态评测集容易被"刷分"。动态生成评测题目,确保评测的公正性:

class DynamicEvalGenerator:
    def __init__(self, llm_client):
        self.llm = llm_client

    def generate_eval_questions(
        self,
        domain: str,
        difficulty: str = "medium",
        count: int = 50,
    ) -> List[Dict]:
        prompt = f"""生成{count}道{domain}领域的{difficulty}难度评测题目。
        每道题包含:question, choices, answer, explanation。
        以JSON数组格式返回。"""
        response = self.llm.chat.completions.create(
            model="default",
            messages=[{"role": "user", "content": prompt}],
            temperature=0.7,
        )
        return json.loads(response.choices[0].message.content)

3. 分层评测策略

不同场景使用不同评测粒度,平衡评测成本和质量保证:

层级 触发条件 评测范围 评测时间
L0 快速验证 每次提交 50条核心用例 <5分钟
L1 标准评测 每日构建 500条标准集 ~30分钟
L2 全面评测 版本发布 5000条完整集 ~2小时
L3 人工抽检 重大版本 100条专家评审 ~1天

4. 评测结果版本管理

class EvalVersionManager:
    def __init__(self, store_path: str = "./eval_versions"):
        self.store_path = store_path
        import os
        os.makedirs(store_path, exist_ok=True)

    def save_version(
        self,
        model_version: str,
        eval_results: Dict,
        eval_config: Dict,
    ) -> str:
        version_id = f"{model_version}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
        record = {
            "version_id": version_id,
            "model_version": model_version,
            "eval_results": eval_results,
            "eval_config": eval_config,
            "timestamp": datetime.now().isoformat(),
        }
        path = os.path.join(self.store_path, f"{version_id}.json")
        with open(path, "w", encoding="utf-8") as f:
            json.dump(record, f, indent=2, ensure_ascii=False)
        return version_id

    def compare_versions(self, version_a: str, version_b: str) -> Dict:
        data_a = self._load_version(version_a)
        data_b = self._load_version(version_b)
        diff = {}
        for metric in data_a["eval_results"]:
            if metric in data_b["eval_results"]:
                diff[metric] = {
                    "a": data_a["eval_results"][metric],
                    "b": data_b["eval_results"][metric],
                    "delta": round(
                        data_b["eval_results"][metric] - data_a["eval_results"][metric], 4
                    ),
                }
        return diff

5. 评测流水线编排

class EvalPipelineOrchestrator:
    def __init__(self):
        self.stages = []

    def add_stage(self, name: str, eval_fn: Callable, gate_threshold: float = 0.0):
        self.stages.append({
            "name": name,
            "eval_fn": eval_fn,
            "gate_threshold": gate_threshold,
        })

    def run(self, model, dataset) -> Dict:
        results = {}
        for stage in self.stages:
            print(f"Running stage: {stage['name']}")
            score = stage["eval_fn"](model, dataset)
            results[stage["name"]] = score

            if score < stage["gate_threshold"]:
                print(f"GATE FAILED: {stage['name']} score {score:.4f} < {stage['gate_threshold']}")
                results["status"] = "gate_failed"
                results["failed_stage"] = stage["name"]
                return results

        results["status"] = "passed"
        return results

对比分析

评估模式 适用阶段 评估维度 自动化程度 成本 可靠性 时效性
LLM基准测试 模型选型/训练 通用能力
RAG评估 RAG系统开发 检索+生成
自动化评测 CI/CD流水线 自定义指标
A/B测试 模型上线 业务指标
人工评估 质量把关 全维度 最高 最低
生产监控 运维阶段 漂移/退化 最高
漂移检测方法 适用场景 检测速度 误报率 最低样本量
KS检验 连续型数据分布 100
PSI 分类型/连续型 500
Z-score 均值偏移 最快 30
ADWIN 流式数据 200
Page-Hinkley 累积漂移 100

在线工具推荐


总结:AI模型评估不是"跑个分数"就完事——它是一个覆盖模型全生命周期的工程体系。6种生产模式各有定位:基准测试量化通用能力、RAG评估定位检索生成短板、自动化评测保证CI/CD质量门禁、A/B测试验证线上效果、人工评估把控质量上限、生产监控防止沉默退化。2026年,不做系统评估的AI系统,就是在盲飞。


延伸阅读

本站提供浏览器本地工具,免注册即可试用 →

#AI模型评估#LLM基准测试#RAG评估#自动化评测#Python#2026#AI与大数据