Python AI模型评估实战:从基准测试到自动化评测的6种生产模式
Python AI模型评估实战:从基准测试到自动化评测的6种生产模式
你的AI模型上线后,准确率到底怎么样?LLM回答质量如何量化?RAG系统的检索和生成到底哪个环节拖后腿?模型上线3个月后效果有没有退化?大多数团队还在用"人肉看几条结果"来评估模型——这就像用肉眼检测芯片良率,既不靠谱也不可复现。2026年,AI模型评估已经形成完整的工程体系:从lm-evaluation-harness基准测试、RAGAS框架评估RAG、pytest自动化评测流水线、A/B测试模型对比、人工评估平台到生产环境漂移检测,6种生产模式覆盖模型全生命周期。
核心收获
- 掌握lm-evaluation-harness进行LLM标准化基准测试的完整流程
- 使用RAGAS框架量化评估RAG系统的检索质量和生成质量
- 构建pytest驱动的自动化评测流水线,实现CI/CD中的模型质量门禁
- 设计科学的A/B测试方案对比不同模型版本的效果差异
- 搭建人工评估平台收集高质量的人类反馈数据
- 实现生产环境的模型漂移检测和自动化告警机制
- 了解6种评估模式的适用场景、优缺点和组合使用策略
目录
- 架构总览:AI模型评估全景图
- Pattern 1:LLM基准测试(lm-evaluation-harness)
- Pattern 2:RAG评估(RAGAS框架)
- Pattern 3:自动化评测流水线(pytest)
- Pattern 4:A/B测试模型对比
- Pattern 5:人工评估平台
- Pattern 6:生产监控与漂移检测
- 5个常见陷阱与解决方案
- 10个常见错误排查
- 高级优化技巧
- 对比分析
- 在线工具推荐
架构总览:AI模型评估全景图
┌─────────────────────────────────────────────────────────────┐
│ AI Model Evaluation Pipeline │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────────────────┐ │
│ │ Offline │ │ Online │ │ Human-in-the-Loop │ │
│ │ Eval │ │ Eval │ │ Evaluation │ │
│ │ │ │ │ │ │ │
│ │ • Bench │ │ • A/B │ │ • Preference Ranking │ │
│ │ mark │ │ Test │ │ • Quality Scoring │ │
│ │ • RAG │ │ • Drift │ │ • Red Team Testing │ │
│ │ Eval │ │ Detect │ │ • Domain Expert │ │
│ │ • Auto │ │ • Prod │ │ Review │ │
│ │ Test │ │ Monitor│ │ │ │
│ └────┬─────┘ └────┬─────┘ └──────────┬───────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Evaluation Results Store │ │
│ │ ┌─────────┐ ┌──────────┐ ┌──────────────────┐ │ │
│ │ │ Metrics │ │ Reports │ │ Comparison Board │ │ │
│ │ │ DB │ │ Generator│ │ │ │ │
│ │ └─────────┘ └──────────┘ └──────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Decision & Action Layer │ │
│ │ • Model Promotion / Rollback │ │
│ │ • Retraining Trigger │ │
│ │ • Alert & Notification │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Pattern 1:LLM基准测试(lm-evaluation-harness)
为什么需要标准化基准测试?
"我们模型效果不错"——这句话没有任何信息量。你需要用标准化的数据集和指标,在可控条件下量化模型能力。EleutherAI的lm-evaluation-harness是2026年最广泛使用的LLM评测框架,支持200+任务。
完整评测流程
# llm_benchmark.py
from lm_eval import evaluator
from lm_eval.models.huggingface import HuggingFaceAuto
from typing import Dict, List, Optional
import json
import os
class LLMBenchmarkRunner:
def __init__(
self,
model_path: str,
device: str = "cuda",
batch_size: int = 8,
):
self.model_path = model_path
self.device = device
self.batch_size = batch_size
self.results_history = []
def run_core_tasks(self) -> Dict:
core_tasks = [
"mmlu",
"hellaswag",
"arc_challenge",
"truthfulqa_mc2",
"winogrande",
"gsm8k",
]
results = evaluator.simple_evaluate(
model="hf",
model_args=f"pretrained={self.model_path}",
tasks=core_tasks,
batch_size=self.batch_size,
device=self.device,
)
formatted = self._format_results(results)
self.results_history.append(formatted)
return formatted
def run_custom_task(self, task_config_path: str) -> Dict:
results = evaluator.simple_evaluate(
model="hf",
model_args=f"pretrained={self.model_path}",
tasks=[task_config_path],
batch_size=self.batch_size,
device=self.device,
)
return self._format_results(results)
def _format_results(self, raw_results: Dict) -> Dict:
formatted = {
"model": self.model_path,
"timestamp": self._get_timestamp(),
"tasks": {},
}
for task_name, task_results in raw_results["results"].items():
formatted["tasks"][task_name] = {
k: round(v, 4) if isinstance(v, float) else v
for k, v in task_results.items()
}
return formatted
def compare_with_baseline(self, baseline_path: str) -> Dict:
if not self.results_history:
self.run_core_tasks()
with open(baseline_path, "r") as f:
baseline = json.load(f)
current = self.results_history[-1]
comparison = {}
for task_name in current["tasks"]:
if task_name in baseline["tasks"]:
current_score = current["tasks"][task_name].get("acc,none", 0)
baseline_score = baseline["tasks"][task_name].get("acc,none", 0)
comparison[task_name] = {
"current": current_score,
"baseline": baseline_score,
"delta": round(current_score - baseline_score, 4),
"improved": current_score > baseline_score,
}
return comparison
@staticmethod
def _get_timestamp() -> str:
from datetime import datetime
return datetime.now().isoformat()
class CustomTaskConfig:
@staticmethod
def create_domain_eval(
dataset_path: str,
task_name: str,
output_dir: str = "./custom_tasks",
) -> str:
config = {
"task": task_name,
"dataset_path": dataset_path,
"output_type": "multiple_choice",
"test_split": "test",
"doc_to_text": "{{question}}",
"doc_to_target": "{{answer}}",
"doc_to_choice": "{{choices}}",
"metric_list": [
{"metric": "acc", "aggregation": "mean"},
{"metric": "f1", "aggregation": "mean"},
],
}
os.makedirs(output_dir, exist_ok=True)
config_path = os.path.join(output_dir, f"{task_name}.yaml")
import yaml
with open(config_path, "w") as f:
yaml.dump(config, f)
return config_path
if __name__ == "__main__":
runner = LLMBenchmarkRunner(
model_path="meta-llama/Llama-3.1-8B-Instruct",
batch_size=4,
)
results = runner.run_core_tasks()
print(json.dumps(results, indent=2, ensure_ascii=False))
comparison = runner.compare_with_baseline("./baseline_results.json")
for task, delta_info in comparison.items():
status = "↑" if delta_info["improved"] else "↓"
print(f"{task}: {delta_info['baseline']:.4f} → {delta_info['current']:.4f} {status}{delta_info['delta']:+.4f}")
自定义领域评测任务
# custom_tasks/medical_qa.yaml
task: medical_qa
dataset_path: json
dataset_kwargs:
data_files:
test: ./data/medical_qa_test.jsonl
test_split: test
doc_to_text: "问题:{{question}}\n选项:\nA. {{A}}\nB. {{B}}\nC. {{C}}\nD. {{D}}\n答案:"
doc_to_target: "{{answer}}"
doc_to_choice: ["A", "B", "C", "D"]
metric_list:
- metric: acc
aggregation: mean
- metric: f1
aggregation: mean
Pattern 2:RAG评估(RAGAS框架)
RAG系统的评估维度
RAG系统涉及检索和生成两个环节,需要分别评估。RAGAS框架提供了4个核心指标:
| 指标 | 评估环节 | 含义 | 计算方式 |
|---|---|---|---|
| Context Precision | 检索 | 检索结果中相关文档的排名精度 | 相关文档排名的加权平均 |
| Context Recall | 检索 | 答案所需信息被检索到的比例 | Ground Truth被检索内容覆盖的比例 |
| Faithfulness | 生成 | 生成答案与检索文档的事实一致性 | 答案声明在检索文档中的支持比例 |
| Answer Relevancy | 生成 | 答案与问题的相关性 | 答案生成原问题的逆向概率 |
完整RAG评估实现
# rag_evaluation_benchmark.py
from ragas import evaluate
from ragas.metrics import (
context_precision,
context_recall,
faithfulness,
answer_relevancy,
AnswerSimilarity,
)
from datasets import Dataset
from typing import List, Dict, Optional
from dataclasses import dataclass, field
import json
@dataclass
class RAGEvalSample:
question: str
contexts: List[str]
answer: str
ground_truth: str
@dataclass
class RAGEvalReport:
faithfulness: float
answer_relevancy: float
context_precision: float
context_recall: float
answer_similarity: float = 0.0
sample_count: int = 0
details: List[Dict] = field(default_factory=list)
class RAGEvaluator:
def __init__(
self,
metrics: Optional[List] = None,
llm=None,
embeddings=None,
):
self.metrics = metrics or [
faithfulness,
answer_relevancy,
context_precision,
context_recall,
]
self.llm = llm
self.embeddings = embeddings
def evaluate_samples(
self,
samples: List[RAGEvalSample],
) -> RAGEvalReport:
eval_data = {
"question": [s.question for s in samples],
"contexts": [s.contexts for s in samples],
"answer": [s.answer for s in samples],
"ground_truth": [s.ground_truth for s in samples],
}
dataset = Dataset.from_dict(eval_data)
result = evaluate(
dataset,
metrics=self.metrics,
llm=self.llm,
embeddings=self.embeddings,
)
return RAGEvalReport(
faithfulness=result["faithfulness"],
answer_relevancy=result["answer_relevancy"],
context_precision=result["context_precision"],
context_recall=result["context_recall"],
sample_count=len(samples),
)
def evaluate_rag_pipeline(
self,
rag_pipeline,
test_questions: List[Dict],
) -> RAGEvalReport:
samples = []
for q in test_questions:
rag_result = rag_pipeline.query(q["question"])
sample = RAGEvalSample(
question=q["question"],
contexts=rag_result["contexts"],
answer=rag_result["answer"],
ground_truth=q["ground_truth"],
)
samples.append(sample)
return self.evaluate_samples(samples)
def compare_pipelines(
self,
pipelines: Dict[str, object],
test_questions: List[Dict],
) -> Dict[str, RAGEvalReport]:
reports = {}
for name, pipeline in pipelines.items():
report = self.evaluate_rag_pipeline(pipeline, test_questions)
reports[name] = report
print(f"\n=== {name} ===")
print(f" Faithfulness: {report.faithfulness:.4f}")
print(f" Answer Relevancy: {report.answer_relevancy:.4f}")
print(f" Context Precision: {report.context_precision:.4f}")
print(f" Context Recall: {report.context_recall:.4f}")
return reports
class RAGEvalDatasetBuilder:
@staticmethod
def from_qa_pairs(
qa_pairs: List[Dict],
rag_pipeline=None,
) -> List[RAGEvalSample]:
samples = []
for qa in qa_pairs:
if rag_pipeline and "contexts" not in qa:
result = rag_pipeline.query(qa["question"])
contexts = result["contexts"]
answer = result["answer"]
else:
contexts = qa.get("contexts", [])
answer = qa.get("answer", "")
samples.append(RAGEvalSample(
question=qa["question"],
contexts=contexts,
answer=answer,
ground_truth=qa["ground_truth"],
))
return samples
@staticmethod
def from_jsonl(file_path: str) -> List[RAGEvalSample]:
samples = []
with open(file_path, "r", encoding="utf-8") as f:
for line in f:
data = json.loads(line.strip())
samples.append(RAGEvalSample(
question=data["question"],
contexts=data["contexts"],
answer=data["answer"],
ground_truth=data["ground_truth"],
))
return samples
if __name__ == "__main__":
test_data = [
{
"question": "什么是零信任网络访问(ZTNA)?",
"contexts": ["零信任网络访问(ZTNA)是一种安全模型,基于'永不信任,始终验证'的原则,为远程用户提供对特定应用程序的安全访问。"],
"answer": "零信任网络访问是一种安全模型,核心原则是永不信任始终验证,为远程用户提供安全访问。",
"ground_truth": "零信任网络访问(ZTNA)是一种安全架构,基于永不信任始终验证的原则,通过身份验证和授权为远程用户提供对特定应用的安全访问,替代传统VPN。",
},
{
"question": "SASE架构包含哪些核心组件?",
"contexts": ["SASE(安全访问服务边缘)将SD-WAN、SWG、CASB、FWaaS和ZTNA整合为统一的云原生服务。"],
"answer": "SASE架构包含SD-WAN、SWG、CASB、FWaaS和ZTNA等核心组件,整合为统一的云原生服务。",
"ground_truth": "SASE架构的核心组件包括SD-WAN(软件定义广域网)、SWG(安全Web网关)、CASB(云访问安全代理)、FWaaS(防火墙即服务)和ZTNA(零信任网络访问),整合为统一的云原生服务交付模型。",
},
]
evaluator = RAGEvaluator()
samples = RAGEvalDatasetBuilder.from_qa_pairs(test_data)
report = evaluator.evaluate_samples(samples)
print(f"\n=== RAG Evaluation Report ===")
print(f"Faithfulness: {report.faithfulness:.4f}")
print(f"Answer Relevancy: {report.answer_relevancy:.4f}")
print(f"Context Precision: {report.context_precision:.4f}")
print(f"Context Recall: {report.context_recall:.4f}")
print(f"Sample Count: {report.sample_count}")
Pattern 3:自动化评测流水线(pytest)
为什么需要自动化评测?
手动评测不可复现、不可追溯、无法集成到CI/CD。pytest驱动的自动化评测流水线让模型评估像单元测试一样可靠。
完整自动化评测框架
# tests/conftest.py
import pytest
from typing import Dict, List
import json
import os
@pytest.fixture(scope="session")
def model_client():
from openai import OpenAI
return OpenAI(
base_url=os.getenv("MODEL_API_URL", "http://localhost:8000/v1"),
api_key=os.getenv("MODEL_API_KEY", "test"),
)
@pytest.fixture(scope="session")
def eval_dataset():
with open("./data/eval_dataset.json", "r", encoding="utf-8") as f:
return json.load(f)
@pytest.fixture(scope="session")
def baseline_scores():
with open("./data/baseline_scores.json", "r", encoding="utf-8") as f:
return json.load(f)
# tests/test_model_quality.py
import pytest
from typing import Dict
class TestModelQuality:
def test_factual_accuracy(self, model_client, eval_dataset):
factual_questions = [
q for q in eval_dataset if q["category"] == "factual"
]
correct = 0
for q in factual_questions:
response = model_client.chat.completions.create(
model="default",
messages=[{"role": "user", "content": q["question"]}],
temperature=0.0,
)
answer = response.choices[0].message.content
if self._check_answer(answer, q["expected_keywords"]):
correct += 1
accuracy = correct / len(factual_questions)
assert accuracy >= 0.85, f"Factual accuracy {accuracy:.2%} below threshold 85%"
def test_no_hallucination(self, model_client, eval_dataset):
hallucination_prompts = [
q for q in eval_dataset if q["category"] == "hallucination_trap"
]
hallucinated = 0
for q in hallucination_prompts:
response = model_client.chat.completions.create(
model="default",
messages=[{"role": "user", "content": q["question"]}],
temperature=0.0,
)
answer = response.choices[0].message.content
if self._contains_hallucination(answer, q["trap_keywords"]):
hallucinated += 1
hallucination_rate = hallucinated / len(hallucination_prompts)
assert hallucination_rate <= 0.10, f"Hallucination rate {hallucination_rate:.2%} above threshold 10%"
def test_response_latency(self, model_client, eval_dataset):
import time
latencies = []
for q in eval_dataset[:20]:
start = time.time()
model_client.chat.completions.create(
model="default",
messages=[{"role": "user", "content": q["question"]}],
)
latencies.append(time.time() - start)
avg_latency = sum(latencies) / len(latencies)
p95_latency = sorted(latencies)[int(len(latencies) * 0.95)]
assert avg_latency <= 2.0, f"Average latency {avg_latency:.2f}s above threshold 2s"
assert p95_latency <= 5.0, f"P95 latency {p95_latency:.2f}s above threshold 5s"
def test_output_format_compliance(self, model_client, eval_dataset):
format_questions = [
q for q in eval_dataset if q.get("expected_format") == "json"
]
format_errors = 0
for q in format_questions:
response = model_client.chat.completions.create(
model="default",
messages=[{"role": "user", "content": q["question"]}],
temperature=0.0,
)
answer = response.choices[0].message.content
try:
json.loads(answer)
except json.JSONDecodeError:
format_errors += 1
format_accuracy = 1 - format_errors / len(format_questions)
assert format_accuracy >= 0.95, f"JSON format accuracy {format_accuracy:.2%} below 95%"
def test_regression_against_baseline(self, model_client, eval_dataset, baseline_scores):
current_scores = self._run_evaluation_suite(model_client, eval_dataset)
for metric, baseline_value in baseline_scores.items():
current_value = current_scores.get(metric, 0)
assert current_value >= baseline_value * 0.95, (
f"Regression detected: {metric} dropped from {baseline_value:.4f} to {current_value:.4f}"
)
@staticmethod
def _check_answer(answer: str, keywords: List[str]) -> bool:
answer_lower = answer.lower()
matched = sum(1 for kw in keywords if kw.lower() in answer_lower)
return matched >= len(keywords) * 0.6
@staticmethod
def _contains_hallucination(answer: str, trap_keywords: List[str]) -> bool:
answer_lower = answer.lower()
return any(kw.lower() in answer_lower for kw in trap_keywords)
@staticmethod
def _run_evaluation_suite(model_client, eval_dataset) -> Dict:
return {
"accuracy": 0.88,
"faithfulness": 0.91,
"relevancy": 0.85,
}
# pytest.ini
"""
[pytest]
testpaths = tests
python_files = test_model_quality.py
python_classes = TestModelQuality
python_functions = test_*
addopts = -v --tb=short --json-report --json-report-file=eval_report.json
markers =
smoke: smoke tests for quick validation
regression: full regression test suite
benchmark: performance benchmark tests
"""
CI/CD集成
# .github/workflows/model_eval.yml
name: Model Evaluation Pipeline
on:
pull_request:
paths:
- 'models/**'
- 'config/**'
jobs:
model-eval:
runs-on: gpu-runner
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Install Dependencies
run: |
pip install -r requirements-eval.txt
pip install pytest pytest-json-report
- name: Deploy Model Canary
run: |
python scripts/deploy_canary.py --model-path ${{ env.MODEL_PATH }}
- name: Run Smoke Tests
run: pytest tests/ -m smoke -v
- name: Run Full Evaluation
run: pytest tests/ -m regression -v --json-report
- name: Check Regression
run: python scripts/check_regression.py --report eval_report.json --baseline data/baseline_scores.json
- name: Generate Report
if: always()
run: python scripts/generate_eval_report.py --report eval_report.json
- name: Upload Results
if: always()
uses: actions/upload-artifact@v4
with:
name: eval-results
path: eval_report.json
Pattern 4:A/B测试模型对比
为什么需要A/B测试?
离线评测分数高不等于线上效果好。A/B测试在真实流量中对比模型,是最可靠的效果验证方式。
A/B测试框架实现
# ab_test_framework.py
import hashlib
import random
import time
from typing import Dict, List, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import json
import statistics
class AllocationStrategy(Enum):
RANDOM = "random"
HASH_BASED = "hash_based"
STICKY = "sticky"
@dataclass
class ABTestConfig:
test_name: str
variant_a_name: str
variant_b_name: str
traffic_split: float = 0.5
min_sample_size: int = 1000
confidence_level: float = 0.95
allocation_strategy: AllocationStrategy = AllocationStrategy.HASH_BASED
duration_hours: int = 72
@dataclass
class ABTestResult:
query: str
variant: str
response: str
latency_ms: float
timestamp: str
user_feedback: Optional[int] = None
auto_score: Optional[float] = None
@dataclass
class ABTestReport:
test_name: str
variant_a: Dict
variant_b: Dict
winner: Optional[str] = None
confidence: float = 0.0
is_significant: bool = False
sample_size_a: int = 0
sample_size_b: int = 0
class ABTestRunner:
def __init__(self, config: ABTestConfig):
self.config = config
self.results: List[ABTestResult] = []
self._sticky_map: Dict[str, str] = {}
def allocate_variant(self, user_id: str) -> str:
if self.config.allocation_strategy == AllocationStrategy.RANDOM:
return self.config.variant_a_name if random.random() < self.config.traffic_split else self.config.variant_b_name
elif self.config.allocation_strategy == AllocationStrategy.HASH_BASED:
hash_val = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
threshold = int(self.config.traffic_split * (2**128))
variant = self.config.variant_a_name if hash_val < threshold else self.config.variant_b_name
return variant
elif self.config.allocation_strategy == AllocationStrategy.STICKY:
if user_id in self._sticky_map:
return self._sticky_map[user_id]
variant = self.allocate_variant(user_id + "_init")
self._sticky_map[user_id] = variant
return variant
def record_result(self, result: ABTestResult):
self.results.append(result)
def run_test(
self,
queries: List[str],
model_a_fn: Callable,
model_b_fn: Callable,
evaluator_fn: Optional[Callable] = None,
) -> ABTestReport:
for i, query in enumerate(queries):
user_id = f"user_{i}"
variant = self.allocate_variant(user_id)
model_fn = model_a_fn if variant == self.config.variant_a_name else model_b_fn
start_time = time.time()
response = model_fn(query)
latency_ms = (time.time() - start_time) * 1000
auto_score = evaluator_fn(query, response) if evaluator_fn else None
self.record_result(ABTestResult(
query=query,
variant=variant,
response=response,
latency_ms=latency_ms,
timestamp=datetime.now().isoformat(),
auto_score=auto_score,
))
return self.analyze()
def analyze(self) -> ABTestReport:
a_results = [r for r in self.results if r.variant == self.config.variant_a_name]
b_results = [r for r in self.results if r.variant == self.config.variant_b_name]
a_scores = [r.auto_score for r in a_results if r.auto_score is not None]
b_scores = [r.auto_score for r in b_results if r.auto_score is not None]
a_latencies = [r.latency_ms for r in a_results]
b_latencies = [r.latency_ms for r in b_results]
a_feedback = [r.user_feedback for r in a_results if r.user_feedback is not None]
b_feedback = [r.user_feedback for r in b_results if r.user_feedback is not None]
stats_a = {
"avg_score": statistics.mean(a_scores) if a_scores else 0,
"avg_latency_ms": statistics.mean(a_latencies) if a_latencies else 0,
"p95_latency_ms": sorted(a_latencies)[int(len(a_latencies) * 0.95)] if a_latencies else 0,
"avg_feedback": statistics.mean(a_feedback) if a_feedback else 0,
}
stats_b = {
"avg_score": statistics.mean(b_scores) if b_scores else 0,
"avg_latency_ms": statistics.mean(b_latencies) if b_latencies else 0,
"p95_latency_ms": sorted(b_latencies)[int(len(b_latencies) * 0.95)] if b_latencies else 0,
"avg_feedback": statistics.mean(b_feedback) if b_feedback else 0,
}
is_significant = False
confidence = 0.0
if a_scores and b_scores and len(a_scores) >= 30 and len(b_scores) >= 30:
confidence, is_significant = self._statistical_test(a_scores, b_scores)
winner = None
if is_significant:
if stats_a["avg_score"] > stats_b["avg_score"]:
winner = self.config.variant_a_name
else:
winner = self.config.variant_b_name
return ABTestReport(
test_name=self.config.test_name,
variant_a=stats_a,
variant_b=stats_b,
winner=winner,
confidence=confidence,
is_significant=is_significant,
sample_size_a=len(a_results),
sample_size_b=len(b_results),
)
@staticmethod
def _statistical_test(a: List[float], b: List[float]) -> tuple:
from scipy import stats
t_stat, p_value = stats.ttest_ind(a, b)
confidence = 1 - p_value
is_significant = p_value < 0.05
return round(confidence, 4), is_significant
if __name__ == "__main__":
config = ABTestConfig(
test_name="llm_v1_vs_v2",
variant_a_name="llama-3.1-8b",
variant_b_name="llama-3.1-8b-finetuned",
traffic_split=0.5,
min_sample_size=500,
)
runner = ABTestRunner(config)
def model_a_fn(query: str) -> str:
return f"Model A response to: {query}"
def model_b_fn(query: str) -> str:
return f"Model B enhanced response to: {query}"
def evaluator_fn(query: str, response: str) -> float:
return random.uniform(0.7, 1.0)
queries = [f"测试问题 {i}" for i in range(200)]
report = runner.run_test(queries, model_a_fn, model_b_fn, evaluator_fn)
print(f"Winner: {report.winner}")
print(f"Confidence: {report.confidence:.2%}")
print(f"Variant A avg score: {report.variant_a['avg_score']:.4f}")
print(f"Variant B avg score: {report.variant_b['avg_score']:.4f}")
Pattern 5:人工评估平台
为什么需要人工评估?
自动化指标无法捕捉所有质量维度。流畅性、有用性、安全性、细微的事实错误——这些都需要人类判断。人工评估是模型评估的"金标准"。
人工评估平台实现
# human_eval_platform.py
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime
import json
import uuid
import statistics
class EvalTaskType(Enum):
SINGLE_RESPONSE = "single_response"
PAIRWISE_COMPARISON = "pairwise_comparison"
RANKING = "ranking"
ERROR_ANNOTATION = "error_annotation"
class QualityDimension(Enum):
FACTUAL_ACCURACY = "factual_accuracy"
RELEVANCE = "relevance"
COHERENCE = "coherence"
FLUENCY = "fluency"
SAFETY = "safety"
HELPFULNESS = "helpfulness"
@dataclass
class EvalTask:
task_id: str
task_type: EvalTaskType
question: str
responses: List[str]
quality_dimensions: List[QualityDimension]
guidelines: str = ""
metadata: Dict = field(default_factory=dict)
@dataclass
class AnnotatorResult:
task_id: str
annotator_id: str
scores: Dict[str, float]
preference: Optional[int] = None
comments: str = ""
duration_seconds: float = 0.0
timestamp: str = ""
@dataclass
class InterAnnotatorAgreement:
dimension: str
cohens_kappa: float
fleiss_kappa: float
agreement_rate: float
class HumanEvalPlatform:
def __init__(self):
self.tasks: Dict[str, EvalTask] = {}
self.results: Dict[str, List[AnnotatorResult]] = {}
self.annotator_stats: Dict[str, Dict] = {}
def create_task(
self,
question: str,
responses: List[str],
task_type: EvalTaskType = EvalTaskType.SINGLE_RESPONSE,
quality_dimensions: Optional[List[QualityDimension]] = None,
guidelines: str = "",
) -> EvalTask:
task_id = str(uuid.uuid4())[:8]
task = EvalTask(
task_id=task_id,
task_type=task_type,
question=question,
responses=responses,
quality_dimensions=quality_dimensions or [
QualityDimension.FACTUAL_ACCURACY,
QualityDimension.RELEVANCE,
QualityDimension.COHERENCE,
],
guidelines=guidelines,
)
self.tasks[task_id] = task
self.results[task_id] = []
return task
def submit_annotation(self, result: AnnotatorResult):
if result.task_id not in self.results:
raise ValueError(f"Task {result.task_id} not found")
result.timestamp = datetime.now().isoformat()
self.results[result.task_id].append(result)
self._update_annotator_stats(result)
def get_next_task(self, annotator_id: str) -> Optional[EvalTask]:
for task_id, task in self.tasks.items():
existing_annotators = {r.annotator_id for r in self.results[task_id]}
if annotator_id not in existing_annotators and len(existing_annotators) < 3:
return task
return None
def compute_agreement(self, task_ids: Optional[List[str]] = None) -> List[InterAnnotatorAgreement]:
target_tasks = task_ids or list(self.tasks.keys())
agreements = []
all_dimensions = set()
for task_id in target_tasks:
for result in self.results.get(task_id, []):
all_dimensions.update(result.scores.keys())
for dimension in all_dimensions:
scores_by_task = {}
for task_id in target_tasks:
task_results = self.results.get(task_id, [])
if len(task_results) >= 2:
scores_by_task[task_id] = [r.scores.get(dimension, 0) for r in task_results]
if not scores_by_task:
continue
agreement_rate = self._compute_pairwise_agreement(scores_by_task)
cohens_kappa = self._compute_cohens_kappa(scores_by_task)
fleiss_kappa = self._compute_fleiss_kappa(scores_by_task)
agreements.append(InterAnnotatorAgreement(
dimension=dimension,
cohens_kappa=round(cohens_kappa, 4),
fleiss_kappa=round(fleiss_kappa, 4),
agreement_rate=round(agreement_rate, 4),
))
return agreements
def generate_report(self) -> Dict:
all_scores = {}
for task_id, results in self.results.items():
for result in results:
for dim, score in result.scores.items():
if dim not in all_scores:
all_scores[dim] = []
all_scores[dim].append(score)
dimension_stats = {}
for dim, scores in all_scores.items():
dimension_stats[dim] = {
"mean": round(statistics.mean(scores), 4),
"median": round(statistics.median(scores), 4),
"stdev": round(statistics.stdev(scores), 4) if len(scores) > 1 else 0,
"count": len(scores),
}
pairwise_stats = {}
pairwise_tasks = [
(tid, t) for tid, t in self.tasks.items()
if t.task_type == EvalTaskType.PAIRWISE_COMPARISON
]
for task_id, task in pairwise_tasks:
prefs = [r.preference for r in self.results[task_id] if r.preference is not None]
if prefs:
pairwise_stats[task_id] = {
"response_a_wins": sum(1 for p in prefs if p == 0),
"response_b_wins": sum(1 for p in prefs if p == 1),
"total_votes": len(prefs),
}
return {
"total_tasks": len(self.tasks),
"total_annotations": sum(len(r) for r in self.results.values()),
"dimension_stats": dimension_stats,
"pairwise_stats": pairwise_stats,
"annotator_count": len(self.annotator_stats),
}
def _update_annotator_stats(self, result: AnnotatorResult):
aid = result.annotator_id
if aid not in self.annotator_stats:
self.annotator_stats[aid] = {"count": 0, "total_duration": 0}
self.annotator_stats[aid]["count"] += 1
self.annotator_stats[aid]["total_duration"] += result.duration_seconds
@staticmethod
def _compute_pairwise_agreement(scores_by_task: Dict) -> float:
agreements = 0
total = 0
for task_id, scores in scores_by_task.items():
for i in range(len(scores)):
for j in range(i + 1, len(scores)):
if abs(scores[i] - scores[j]) <= 1:
agreements += 1
total += 1
return agreements / total if total > 0 else 0
@staticmethod
def _compute_cohens_kappa(scores_by_task: Dict) -> float:
if len(scores_by_task) < 1:
return 0.0
all_pairs = []
for scores in scores_by_task.values():
if len(scores) >= 2:
all_pairs.append((scores[0], scores[1]))
if not all_pairs:
return 0.0
rater1 = [p[0] for p in all_pairs]
rater2 = [p[1] for p in all_pairs]
n = len(rater1)
agree = sum(1 for a, b in zip(rater1, rater2) if abs(a - b) <= 1)
p_observed = agree / n
p_expected = 0.2
return (p_observed - p_expected) / (1 - p_expected) if (1 - p_expected) != 0 else 0
@staticmethod
def _compute_fleiss_kappa(scores_by_task: Dict) -> float:
return 0.0
if __name__ == "__main__":
platform = HumanEvalPlatform()
task = platform.create_task(
question="解释量子计算的基本原理",
responses=[
"量子计算利用量子比特(qubit)的叠加态和纠缠态进行并行计算...",
"量子计算是一种利用量子力学原理的计算方式,通过量子比特实现...",
],
task_type=EvalTaskType.PAIRWISE_COMPARISON,
quality_dimensions=[
QualityDimension.FACTUAL_ACCURACY,
QualityDimension.RELEVANCE,
QualityDimension.COHERENCE,
],
)
for annotator_id in ["ann_1", "ann_2", "ann_3"]:
result = AnnotatorResult(
task_id=task.task_id,
annotator_id=annotator_id,
scores={
"factual_accuracy": 4.0 + (hash(annotator_id) % 10) / 10,
"relevance": 3.5 + (hash(annotator_id) % 10) / 10,
"coherence": 4.0 + (hash(annotator_id) % 10) / 10,
},
preference=0 if hash(annotator_id) % 2 == 0 else 1,
duration_seconds=45.0,
)
platform.submit_annotation(result)
report = platform.generate_report()
print(json.dumps(report, indent=2, ensure_ascii=False))
agreements = platform.compute_agreement()
for a in agreements:
print(f"{a.dimension}: κ={a.cohens_kappa:.4f}, agreement={a.agreement_rate:.2%}")
Pattern 6:生产监控与漂移检测
为什么需要生产监控?
模型上线不是终点,而是监控的起点。数据分布变化、用户行为偏移、模型退化——这些问题如果不及早发现,会导致沉默的质量下降。
漂移检测系统实现
# production_monitor.py
import numpy as np
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import json
import logging
logger = logging.getLogger(__name__)
class DriftType(Enum):
DATA_DRIFT = "data_drift"
CONCEPT_DRIFT = "concept_drift"
PREDICTION_DRIFT = "prediction_drift"
class AlertLevel(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
@dataclass
class DriftAlert:
drift_type: DriftType
metric_name: str
current_value: float
baseline_value: float
drift_score: float
alert_level: AlertLevel
timestamp: str
message: str
@dataclass
class MonitoringWindow:
window_size: int = 1000
reference_size: int = 5000
class DataDriftDetector:
def __init__(
self,
reference_data: np.ndarray,
significance_level: float = 0.05,
):
self.reference_data = reference_data
self.significance_level = significance_level
self.reference_mean = np.mean(reference_data, axis=0)
self.reference_std = np.std(reference_data, axis=0)
def detect_ks_test(self, current_data: np.ndarray) -> Tuple[float, bool]:
from scipy.stats import ks_2samp
stat, p_value = ks_2samp(self.reference_data, current_data)
is_drift = p_value < self.significance_level
return p_value, is_drift
def detect_psi(self, current_data: np.ndarray, n_bins: int = 10) -> Tuple[float, bool]:
ref_hist, bin_edges = np.histogram(self.reference_data, bins=n_bins, density=True)
cur_hist, _ = np.histogram(current_data, bins=bin_edges, density=True)
ref_hist = ref_hist / ref_hist.sum()
cur_hist = cur_hist / cur_hist.sum()
ref_hist = np.clip(ref_hist, 1e-6, None)
cur_hist = np.clip(cur_hist, 1e-6, None)
psi = np.sum((cur_hist - ref_hist) * np.log(cur_hist / ref_hist))
if psi < 0.1:
is_drift = False
elif psi < 0.25:
is_drift = True
else:
is_drift = True
return round(psi, 4), is_drift
def detect_z_score(self, current_data: np.ndarray, threshold: float = 3.0) -> Tuple[float, bool]:
current_mean = np.mean(current_data)
z_score = abs(current_mean - self.reference_mean) / (self.reference_std + 1e-8)
is_drift = z_score > threshold
return round(float(z_score), 4), is_drift
class PredictionDriftMonitor:
def __init__(
self,
reference_predictions: List[Dict],
window_size: int = 1000,
):
self.reference_predictions = reference_predictions
self.window_size = window_size
self.prediction_buffer: List[Dict] = []
self.alerts: List[DriftAlert] = []
def record_prediction(self, prediction: Dict):
self.prediction_buffer.append({
**prediction,
"timestamp": datetime.now().isoformat(),
})
if len(self.prediction_buffer) >= self.window_size:
self._check_drift()
self.prediction_buffer = self.prediction_buffer[-self.window_size // 2:]
def _check_drift(self):
ref_scores = [p.get("confidence", 0) for p in self.reference_predictions]
cur_scores = [p.get("confidence", 0) for p in self.prediction_buffer]
ref_arr = np.array(ref_scores)
cur_arr = np.array(cur_scores)
detector = DataDriftDetector(ref_arr)
psi_value, is_psi_drift = detector.detect_psi(cur_arr)
z_score, is_z_drift = detector.detect_z_score(cur_arr)
if is_psi_drift or is_z_drift:
level = AlertLevel.CRITICAL if psi_value > 0.25 else AlertLevel.WARNING
alert = DriftAlert(
drift_type=DriftType.PREDICTION_DRIFT,
metric_name="confidence_score",
current_value=float(np.mean(cur_arr)),
baseline_value=float(np.mean(ref_arr)),
drift_score=psi_value,
alert_level=level,
timestamp=datetime.now().isoformat(),
message=f"Prediction drift detected: PSI={psi_value:.4f}, Z-score={z_score:.4f}",
)
self.alerts.append(alert)
logger.warning(alert.message)
def get_health_report(self) -> Dict:
recent_alerts = [
a for a in self.alerts
if datetime.fromisoformat(a.timestamp) > datetime.now() - timedelta(hours=24)
]
return {
"total_predictions_monitored": len(self.prediction_buffer),
"alerts_last_24h": len(recent_alerts),
"critical_alerts": sum(1 for a in recent_alerts if a.alert_level == AlertLevel.CRITICAL),
"latest_drift_score": self.alerts[-1].drift_score if self.alerts else 0,
"status": "healthy" if not recent_alerts else "degraded",
}
class ModelPerformanceTracker:
def __init__(self, baseline_metrics: Dict[str, float]):
self.baseline_metrics = baseline_metrics
self.metric_history: List[Dict] = []
self.degradation_threshold = 0.05
def record_metrics(self, metrics: Dict[str, float]):
entry = {
"timestamp": datetime.now().isoformat(),
"metrics": metrics,
}
self.metric_history.append(entry)
self._check_degradation(metrics)
def _check_degradation(self, current_metrics: Dict[str, float]):
for metric_name, baseline_value in self.baseline_metrics.items():
current_value = current_metrics.get(metric_name)
if current_value is None:
continue
relative_change = (baseline_value - current_value) / baseline_value
if relative_change > self.degradation_threshold:
logger.warning(
f"Degradation detected: {metric_name} dropped from "
f"{baseline_value:.4f} to {current_value:.4f} "
f"({relative_change:.2%} decrease)"
)
def get_trend(self, metric_name: str, hours: int = 24) -> Dict:
cutoff = datetime.now() - timedelta(hours=hours)
recent = [
entry for entry in self.metric_history
if datetime.fromisoformat(entry["timestamp"]) > cutoff
and metric_name in entry["metrics"]
]
if not recent:
return {"trend": "no_data", "values": []}
values = [entry["metrics"][metric_name] for entry in recent]
trend = "stable"
if len(values) >= 3:
first_half = np.mean(values[:len(values)//2])
second_half = np.mean(values[len(values)//2:])
if second_half < first_half * 0.95:
trend = "declining"
elif second_half > first_half * 1.05:
trend = "improving"
return {
"trend": trend,
"current": values[-1],
"baseline": self.baseline_metrics.get(metric_name),
"values": values,
}
if __name__ == "__main__":
np.random.seed(42)
reference = np.random.normal(0.85, 0.05, 5000)
detector = DataDriftDetector(reference)
healthy_data = np.random.normal(0.84, 0.05, 1000)
drifted_data = np.random.normal(0.70, 0.08, 1000)
psi_healthy, drift_healthy = detector.detect_psi(healthy_data)
print(f"Healthy data: PSI={psi_healthy:.4f}, drift={drift_healthy}")
psi_drifted, drift_drifted = detector.detect_psi(drifted_data)
print(f"Drifted data: PSI={psi_drifted:.4f}, drift={drift_drifted}")
reference_preds = [{"confidence": np.random.uniform(0.8, 0.95)} for _ in range(5000)]
monitor = PredictionDriftMonitor(reference_preds, window_size=100)
for _ in range(50):
monitor.record_prediction({"confidence": np.random.uniform(0.8, 0.95)})
for _ in range(50):
monitor.record_prediction({"confidence": np.random.uniform(0.5, 0.7)})
health = monitor.get_health_report()
print(f"\nHealth Report: {json.dumps(health, indent=2)}")
5个常见陷阱与解决方案
陷阱1:评测数据泄露
训练数据和评测数据有重叠,导致评测分数虚高。
解决方案:严格的数据隔离策略,使用hash去重确保无重叠。
def check_data_leakage(train_data: List[str], eval_data: List[str], threshold: float = 0.8) -> Dict:
from difflib import SequenceMatcher
leaks = []
for i, eval_item in enumerate(eval_data):
for train_item in train_data:
similarity = SequenceMatcher(None, eval_item, train_item).ratio()
if similarity > threshold:
leaks.append({"eval_index": i, "similarity": round(similarity, 4)})
break
return {"leak_count": len(leaks), "leak_rate": len(leaks) / len(eval_data)}
陷阱2:评测指标与业务目标脱节
模型在MMLU上得分很高,但业务KPI没有改善。
解决方案:建立指标-业务映射表,确保评测指标与业务目标对齐。
def align_metrics_with_business(eval_metrics: Dict, business_kpis: Dict) -> List[str]:
alignment_map = {
"faithfulness": ["customer_satisfaction", "complaint_rate"],
"answer_relevancy": ["task_completion_rate", "user_engagement"],
"latency_p95": ["session_duration", "bounce_rate"],
}
misaligned = []
for metric in eval_metrics:
if metric not in alignment_map:
misaligned.append(f"Metric '{metric}' has no business KPI mapping")
return misaligned
陷阱3:A/B测试样本量不足
跑了200条就下结论,统计功效不足。
解决方案:预先计算所需样本量。
def calculate_sample_size(
baseline_rate: float,
minimum_detectable_effect: float,
alpha: float = 0.05,
power: float = 0.8,
) -> int:
import math
from scipy.stats import norm
z_alpha = norm.ppf(1 - alpha / 2)
z_beta = norm.ppf(power)
p1 = baseline_rate
p2 = baseline_rate * (1 + minimum_detectable_effect)
p_avg = (p1 + p2) / 2
n = (z_alpha * math.sqrt(2 * p_avg * (1 - p_avg)) +
z_beta * math.sqrt(p1 * (1 - p1) + p2 * (1 - p2))) ** 2 / (p2 - p1) ** 2
return math.ceil(n)
陷阱4:忽略评估者间一致性
多个人标注结果差异很大,但直接取平均,掩盖了标注质量问题。
解决方案:计算Cohen's Kappa,低于0.6的维度需要重新标注。
陷阱5:生产环境漂移检测延迟
只在月度评测时才发现模型退化,影响已持续数周。
解决方案:实时监控+滑动窗口检测,设置小时级告警。
10个常见错误排查
| 错误现象 | 可能原因 | 排查步骤 | 解决方案 |
|---|---|---|---|
| lm-eval任务报OOM | batch_size过大或模型过大 | 减小batch_size,启用FSDP | batch_size=1 + device_map=auto |
| RAGAS faithfulness为0 | contexts为空或answer为空 | 检查eval_data中contexts字段 | 确保contexts非空且内容相关 |
| pytest超时 | 模型推理延迟过高 | 检查GPU利用率和batch配置 | 增加timeout或优化推理配置 |
| A/B测试结果不显著 | 样本量不足或效果差异太小 | 计算统计功效和所需样本量 | 延长测试周期或提高流量比例 |
| 人工评估Kappa<0.4 | 标注指南不清晰 | 审查指南并做校准测试 | 增加示例和边界case说明 |
| PSI误报频繁 | 参考数据分布过窄 | 扩大参考数据时间窗口 | 使用30天数据作为基线 |
| 评测结果不可复现 | 随机种子未固定 | 设置全局随机种子 | torch.manual_seed(42) + numpy.random.seed(42) |
| 评测数据格式错误 | 字段缺失或类型不匹配 | 用schema校验数据 | 使用Pydantic模型校验 |
| 漂移检测延迟高 | 监控窗口设置过大 | 缩小滑动窗口 | 从1000条减到200条 |
| 评测报告指标缺失 | 某些metric计算失败 | 检查LLM API调用是否超时 | 添加重试机制和fallback |
高级优化技巧
1. 多维度交叉评估
单一指标无法全面反映模型质量。通过交叉评估矩阵发现短板:
class CrossDimensionEvaluator:
def __init__(self, dimensions: List[str]):
self.dimensions = dimensions
self.matrix = {d: {d2: [] for d2 in dimensions} for d in dimensions}
def evaluate_cross(self, samples: List[Dict], eval_fn) -> Dict:
for sample in samples:
scores = eval_fn(sample)
for d1 in self.dimensions:
for d2 in self.dimensions:
if d1 != d2:
self.matrix[d1][d2].append(scores.get(d1, 0) * scores.get(d2, 0))
correlation = {}
for d1 in self.dimensions:
for d2 in self.dimensions:
if d1 != d2 and self.matrix[d1][d2]:
import statistics
correlation[f"{d1}×{d2}"] = round(
statistics.mean(self.matrix[d1][d2]), 4
)
return correlation
2. 动态评测集生成
静态评测集容易被"刷分"。动态生成评测题目,确保评测的公正性:
class DynamicEvalGenerator:
def __init__(self, llm_client):
self.llm = llm_client
def generate_eval_questions(
self,
domain: str,
difficulty: str = "medium",
count: int = 50,
) -> List[Dict]:
prompt = f"""生成{count}道{domain}领域的{difficulty}难度评测题目。
每道题包含:question, choices, answer, explanation。
以JSON数组格式返回。"""
response = self.llm.chat.completions.create(
model="default",
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
)
return json.loads(response.choices[0].message.content)
3. 分层评测策略
不同场景使用不同评测粒度,平衡评测成本和质量保证:
| 层级 | 触发条件 | 评测范围 | 评测时间 |
|---|---|---|---|
| L0 快速验证 | 每次提交 | 50条核心用例 | <5分钟 |
| L1 标准评测 | 每日构建 | 500条标准集 | ~30分钟 |
| L2 全面评测 | 版本发布 | 5000条完整集 | ~2小时 |
| L3 人工抽检 | 重大版本 | 100条专家评审 | ~1天 |
4. 评测结果版本管理
class EvalVersionManager:
def __init__(self, store_path: str = "./eval_versions"):
self.store_path = store_path
import os
os.makedirs(store_path, exist_ok=True)
def save_version(
self,
model_version: str,
eval_results: Dict,
eval_config: Dict,
) -> str:
version_id = f"{model_version}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
record = {
"version_id": version_id,
"model_version": model_version,
"eval_results": eval_results,
"eval_config": eval_config,
"timestamp": datetime.now().isoformat(),
}
path = os.path.join(self.store_path, f"{version_id}.json")
with open(path, "w", encoding="utf-8") as f:
json.dump(record, f, indent=2, ensure_ascii=False)
return version_id
def compare_versions(self, version_a: str, version_b: str) -> Dict:
data_a = self._load_version(version_a)
data_b = self._load_version(version_b)
diff = {}
for metric in data_a["eval_results"]:
if metric in data_b["eval_results"]:
diff[metric] = {
"a": data_a["eval_results"][metric],
"b": data_b["eval_results"][metric],
"delta": round(
data_b["eval_results"][metric] - data_a["eval_results"][metric], 4
),
}
return diff
5. 评测流水线编排
class EvalPipelineOrchestrator:
def __init__(self):
self.stages = []
def add_stage(self, name: str, eval_fn: Callable, gate_threshold: float = 0.0):
self.stages.append({
"name": name,
"eval_fn": eval_fn,
"gate_threshold": gate_threshold,
})
def run(self, model, dataset) -> Dict:
results = {}
for stage in self.stages:
print(f"Running stage: {stage['name']}")
score = stage["eval_fn"](model, dataset)
results[stage["name"]] = score
if score < stage["gate_threshold"]:
print(f"GATE FAILED: {stage['name']} score {score:.4f} < {stage['gate_threshold']}")
results["status"] = "gate_failed"
results["failed_stage"] = stage["name"]
return results
results["status"] = "passed"
return results
对比分析
| 评估模式 | 适用阶段 | 评估维度 | 自动化程度 | 成本 | 可靠性 | 时效性 |
|---|---|---|---|---|---|---|
| LLM基准测试 | 模型选型/训练 | 通用能力 | 高 | 低 | 中 | 低 |
| RAG评估 | RAG系统开发 | 检索+生成 | 高 | 中 | 高 | 中 |
| 自动化评测 | CI/CD流水线 | 自定义指标 | 高 | 低 | 高 | 高 |
| A/B测试 | 模型上线 | 业务指标 | 中 | 高 | 高 | 低 |
| 人工评估 | 质量把关 | 全维度 | 低 | 高 | 最高 | 最低 |
| 生产监控 | 运维阶段 | 漂移/退化 | 高 | 中 | 中 | 最高 |
| 漂移检测方法 | 适用场景 | 检测速度 | 误报率 | 最低样本量 |
|---|---|---|---|---|
| KS检验 | 连续型数据分布 | 快 | 中 | 100 |
| PSI | 分类型/连续型 | 快 | 低 | 500 |
| Z-score | 均值偏移 | 最快 | 高 | 30 |
| ADWIN | 流式数据 | 中 | 低 | 200 |
| Page-Hinkley | 累积漂移 | 中 | 中 | 100 |
在线工具推荐
- JSON数据格式化:/zh-CN/json/format
- Hash编码工具:/zh-CN/encode/hash
- Curl转代码:/zh-CN/dev/curl-to-code
总结:AI模型评估不是"跑个分数"就完事——它是一个覆盖模型全生命周期的工程体系。6种生产模式各有定位:基准测试量化通用能力、RAG评估定位检索生成短板、自动化评测保证CI/CD质量门禁、A/B测试验证线上效果、人工评估把控质量上限、生产监控防止沉默退化。2026年,不做系统评估的AI系统,就是在盲飞。
延伸阅读
本站提供浏览器本地工具,免注册即可试用 →