Python AI模型評估實戰:從基準測試到自動化評測的6種生產模式
Python AI模型評估實戰:從基準測試到自動化評測的6種生產模式
你的AI模型上線後,準確率到底怎麼樣?LLM回答品質如何量化?RAG系統的檢索和生成到底哪個環節拖後腿?模型上線3個月後效果有沒有退化?大多數團隊還在用「人肉看幾條結果」來評估模型——這就像用肉眼檢測晶片良率,既不靠譜也不可復現。2026年,AI模型評估已經形成完整的工程體系:從lm-evaluation-harness基準測試、RAGAS框架評估RAG、pytest自動化評測流水線、A/B測試模型對比、人工評估平台到生產環境漂移檢測,6種生產模式覆蓋模型全生命週期。
核心收穫
- 掌握lm-evaluation-harness進行LLM標準化基準測試的完整流程
- 使用RAGAS框架量化評估RAG系統的檢索品質和生成品質
- 建構pytest驅動的自動化評測流水線,實現CI/CD中的模型品質門禁
- 設計科學的A/B測試方案對比不同模型版本的效果差異
- 搭建人工評估平台收集高品質的人類回饋資料
- 實現生產環境的模型漂移檢測和自動化告警機制
- 了解6種評估模式的適用場景、優缺點和組合使用策略
目錄
- 架構總覽:AI模型評估全景圖
- Pattern 1:LLM基準測試(lm-evaluation-harness)
- Pattern 2:RAG評估(RAGAS框架)
- Pattern 3:自動化評測流水線(pytest)
- Pattern 4:A/B測試模型對比
- Pattern 5:人工評估平台
- Pattern 6:生產監控與漂移檢測
- 5個常見陷阱與解決方案
- 10個常見錯誤排查
- 進階最佳化技巧
- 對比分析
- 線上工具推薦
架構總覽:AI模型評估全景圖
┌─────────────────────────────────────────────────────────────┐
│ AI Model Evaluation Pipeline │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────────────────┐ │
│ │ Offline │ │ Online │ │ Human-in-the-Loop │ │
│ │ Eval │ │ Eval │ │ Evaluation │ │
│ │ │ │ │ │ │ │
│ │ • Bench │ │ • A/B │ │ • Preference Ranking │ │
│ │ mark │ │ Test │ │ • Quality Scoring │ │
│ │ • RAG │ │ • Drift │ │ • Red Team Testing │ │
│ │ Eval │ │ Detect │ │ • Domain Expert │ │
│ │ • Auto │ │ • Prod │ │ Review │ │
│ │ Test │ │ Monitor│ │ │ │
│ └────┬─────┘ └────┬─────┘ └──────────┬───────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Evaluation Results Store │ │
│ │ ┌─────────┐ ┌──────────┐ ┌──────────────────┐ │ │
│ │ │ Metrics │ │ Reports │ │ Comparison Board │ │ │
│ │ │ DB │ │ Generator│ │ │ │ │
│ │ └─────────┘ └──────────┘ └──────────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Decision & Action Layer │ │
│ │ • Model Promotion / Rollback │ │
│ │ • Retraining Trigger │ │
│ │ • Alert & Notification │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
Pattern 1:LLM基準測試(lm-evaluation-harness)
為什麼需要標準化基準測試?
「我們模型效果不錯」——這句話沒有任何資訊量。你需要用標準化的資料集和指標,在可控條件下量化模型能力。EleutherAI的lm-evaluation-harness是2026年最廣泛使用的LLM評測框架,支援200+任務。
完整評測流程
# llm_benchmark.py
from lm_eval import evaluator
from lm_eval.models.huggingface import HuggingFaceAuto
from typing import Dict, List, Optional
import json
import os
class LLMBenchmarkRunner:
def __init__(
self,
model_path: str,
device: str = "cuda",
batch_size: int = 8,
):
self.model_path = model_path
self.device = device
self.batch_size = batch_size
self.results_history = []
def run_core_tasks(self) -> Dict:
core_tasks = [
"mmlu",
"hellaswag",
"arc_challenge",
"truthfulqa_mc2",
"winogrande",
"gsm8k",
]
results = evaluator.simple_evaluate(
model="hf",
model_args=f"pretrained={self.model_path}",
tasks=core_tasks,
batch_size=self.batch_size,
device=self.device,
)
formatted = self._format_results(results)
self.results_history.append(formatted)
return formatted
def run_custom_task(self, task_config_path: str) -> Dict:
results = evaluator.simple_evaluate(
model="hf",
model_args=f"pretrained={self.model_path}",
tasks=[task_config_path],
batch_size=self.batch_size,
device=self.device,
)
return self._format_results(results)
def _format_results(self, raw_results: Dict) -> Dict:
formatted = {
"model": self.model_path,
"timestamp": self._get_timestamp(),
"tasks": {},
}
for task_name, task_results in raw_results["results"].items():
formatted["tasks"][task_name] = {
k: round(v, 4) if isinstance(v, float) else v
for k, v in task_results.items()
}
return formatted
def compare_with_baseline(self, baseline_path: str) -> Dict:
if not self.results_history:
self.run_core_tasks()
with open(baseline_path, "r") as f:
baseline = json.load(f)
current = self.results_history[-1]
comparison = {}
for task_name in current["tasks"]:
if task_name in baseline["tasks"]:
current_score = current["tasks"][task_name].get("acc,none", 0)
baseline_score = baseline["tasks"][task_name].get("acc,none", 0)
comparison[task_name] = {
"current": current_score,
"baseline": baseline_score,
"delta": round(current_score - baseline_score, 4),
"improved": current_score > baseline_score,
}
return comparison
@staticmethod
def _get_timestamp() -> str:
from datetime import datetime
return datetime.now().isoformat()
class CustomTaskConfig:
@staticmethod
def create_domain_eval(
dataset_path: str,
task_name: str,
output_dir: str = "./custom_tasks",
) -> str:
config = {
"task": task_name,
"dataset_path": dataset_path,
"output_type": "multiple_choice",
"test_split": "test",
"doc_to_text": "{{question}}",
"doc_to_target": "{{answer}}",
"doc_to_choice": "{{choices}}",
"metric_list": [
{"metric": "acc", "aggregation": "mean"},
{"metric": "f1", "aggregation": "mean"},
],
}
os.makedirs(output_dir, exist_ok=True)
config_path = os.path.join(output_dir, f"{task_name}.yaml")
import yaml
with open(config_path, "w") as f:
yaml.dump(config, f)
return config_path
if __name__ == "__main__":
runner = LLMBenchmarkRunner(
model_path="meta-llama/Llama-3.1-8B-Instruct",
batch_size=4,
)
results = runner.run_core_tasks()
print(json.dumps(results, indent=2, ensure_ascii=False))
comparison = runner.compare_with_baseline("./baseline_results.json")
for task, delta_info in comparison.items():
status = "↑" if delta_info["improved"] else "↓"
print(f"{task}: {delta_info['baseline']:.4f} → {delta_info['current']:.4f} {status}{delta_info['delta']:+.4f}")
自定義領域評測任務
# custom_tasks/medical_qa.yaml
task: medical_qa
dataset_path: json
dataset_kwargs:
data_files:
test: ./data/medical_qa_test.jsonl
test_split: test
doc_to_text: "問題:{{question}}\n選項:\nA. {{A}}\nB. {{B}}\nC. {{C}}\nD. {{D}}\n答案:"
doc_to_target: "{{answer}}"
doc_to_choice: ["A", "B", "C", "D"]
metric_list:
- metric: acc
aggregation: mean
- metric: f1
aggregation: mean
Pattern 2:RAG評估(RAGAS框架)
RAG系統的評估維度
RAG系統涉及檢索和生成兩個環節,需要分別評估。RAGAS框架提供了4個核心指標:
| 指標 | 評估環節 | 含義 | 計算方式 |
|---|---|---|---|
| Context Precision | 檢索 | 檢索結果中相關文件的排名精度 | 相關文件排名的加權平均 |
| Context Recall | 檢索 | 答案所需資訊被檢索到的比例 | Ground Truth被檢索內容覆蓋的比例 |
| Faithfulness | 生成 | 生成答案與檢索文件的事實一致性 | 答案聲明在檢索文件中的支援比例 |
| Answer Relevancy | 生成 | 答案與問題的相關性 | 答案生成原問題的逆向機率 |
完整RAG評估實作
# rag_evaluation_benchmark.py
from ragas import evaluate
from ragas.metrics import (
context_precision,
context_recall,
faithfulness,
answer_relevancy,
AnswerSimilarity,
)
from datasets import Dataset
from typing import List, Dict, Optional
from dataclasses import dataclass, field
import json
@dataclass
class RAGEvalSample:
question: str
contexts: List[str]
answer: str
ground_truth: str
@dataclass
class RAGEvalReport:
faithfulness: float
answer_relevancy: float
context_precision: float
context_recall: float
answer_similarity: float = 0.0
sample_count: int = 0
details: List[Dict] = field(default_factory=list)
class RAGEvaluator:
def __init__(
self,
metrics: Optional[List] = None,
llm=None,
embeddings=None,
):
self.metrics = metrics or [
faithfulness,
answer_relevancy,
context_precision,
context_recall,
]
self.llm = llm
self.embeddings = embeddings
def evaluate_samples(
self,
samples: List[RAGEvalSample],
) -> RAGEvalReport:
eval_data = {
"question": [s.question for s in samples],
"contexts": [s.contexts for s in samples],
"answer": [s.answer for s in samples],
"ground_truth": [s.ground_truth for s in samples],
}
dataset = Dataset.from_dict(eval_data)
result = evaluate(
dataset,
metrics=self.metrics,
llm=self.llm,
embeddings=self.embeddings,
)
return RAGEvalReport(
faithfulness=result["faithfulness"],
answer_relevancy=result["answer_relevancy"],
context_precision=result["context_precision"],
context_recall=result["context_recall"],
sample_count=len(samples),
)
def evaluate_rag_pipeline(
self,
rag_pipeline,
test_questions: List[Dict],
) -> RAGEvalReport:
samples = []
for q in test_questions:
rag_result = rag_pipeline.query(q["question"])
sample = RAGEvalSample(
question=q["question"],
contexts=rag_result["contexts"],
answer=rag_result["answer"],
ground_truth=q["ground_truth"],
)
samples.append(sample)
return self.evaluate_samples(samples)
def compare_pipelines(
self,
pipelines: Dict[str, object],
test_questions: List[Dict],
) -> Dict[str, RAGEvalReport]:
reports = {}
for name, pipeline in pipelines.items():
report = self.evaluate_rag_pipeline(pipeline, test_questions)
reports[name] = report
print(f"\n=== {name} ===")
print(f" Faithfulness: {report.faithfulness:.4f}")
print(f" Answer Relevancy: {report.answer_relevancy:.4f}")
print(f" Context Precision: {report.context_precision:.4f}")
print(f" Context Recall: {report.context_recall:.4f}")
return reports
class RAGEvalDatasetBuilder:
@staticmethod
def from_qa_pairs(
qa_pairs: List[Dict],
rag_pipeline=None,
) -> List[RAGEvalSample]:
samples = []
for qa in qa_pairs:
if rag_pipeline and "contexts" not in qa:
result = rag_pipeline.query(qa["question"])
contexts = result["contexts"]
answer = result["answer"]
else:
contexts = qa.get("contexts", [])
answer = qa.get("answer", "")
samples.append(RAGEvalSample(
question=qa["question"],
contexts=contexts,
answer=answer,
ground_truth=qa["ground_truth"],
))
return samples
@staticmethod
def from_jsonl(file_path: str) -> List[RAGEvalSample]:
samples = []
with open(file_path, "r", encoding="utf-8") as f:
for line in f:
data = json.loads(line.strip())
samples.append(RAGEvalSample(
question=data["question"],
contexts=data["contexts"],
answer=data["answer"],
ground_truth=data["ground_truth"],
))
return samples
if __name__ == "__main__":
test_data = [
{
"question": "什麼是零信任網路存取(ZTNA)?",
"contexts": ["零信任網路存取(ZTNA)是一種安全模型,基於「永不信任,始終驗證」的原則,為遠端使用者提供對特定應用程式的安全存取。"],
"answer": "零信任網路存取是一種安全模型,核心原則是永不信任始終驗證,為遠端使用者提供安全存取。",
"ground_truth": "零信任網路存取(ZTNA)是一種安全架構,基於永不信任始終驗證的原則,透過身分驗證和授權為遠端使用者提供對特定應用的安全存取,替代傳統VPN。",
},
{
"question": "SASE架構包含哪些核心元件?",
"contexts": ["SASE(安全存取服務邊緣)將SD-WAN、SWG、CASB、FWaaS和ZTNA整合為統一的雲原生服務。"],
"answer": "SASE架構包含SD-WAN、SWG、CASB、FWaaS和ZTNA等核心元件,整合為統一的雲原生服務。",
"ground_truth": "SASE架構的核心元件包括SD-WAN(軟體定義廣域網路)、SWG(安全Web閘道)、CASB(雲端存取安全代理)、FWaaS(防火牆即服務)和ZTNA(零信任網路存取),整合為統一的雲原生服務交付模型。",
},
]
evaluator = RAGEvaluator()
samples = RAGEvalDatasetBuilder.from_qa_pairs(test_data)
report = evaluator.evaluate_samples(samples)
print(f"\n=== RAG Evaluation Report ===")
print(f"Faithfulness: {report.faithfulness:.4f}")
print(f"Answer Relevancy: {report.answer_relevancy:.4f}")
print(f"Context Precision: {report.context_precision:.4f}")
print(f"Context Recall: {report.context_recall:.4f}")
print(f"Sample Count: {report.sample_count}")
Pattern 3:自動化評測流水線(pytest)
為什麼需要自動化評測?
手動評測不可復現、不可追溯、無法整合到CI/CD。pytest驅動的自動化評測流水線讓模型評估像單元測試一樣可靠。
完整自動化評測框架
# tests/conftest.py
import pytest
from typing import Dict, List
import json
import os
@pytest.fixture(scope="session")
def model_client():
from openai import OpenAI
return OpenAI(
base_url=os.getenv("MODEL_API_URL", "http://localhost:8000/v1"),
api_key=os.getenv("MODEL_API_KEY", "test"),
)
@pytest.fixture(scope="session")
def eval_dataset():
with open("./data/eval_dataset.json", "r", encoding="utf-8") as f:
return json.load(f)
@pytest.fixture(scope="session")
def baseline_scores():
with open("./data/baseline_scores.json", "r", encoding="utf-8") as f:
return json.load(f)
# tests/test_model_quality.py
import pytest
from typing import Dict
class TestModelQuality:
def test_factual_accuracy(self, model_client, eval_dataset):
factual_questions = [
q for q in eval_dataset if q["category"] == "factual"
]
correct = 0
for q in factual_questions:
response = model_client.chat.completions.create(
model="default",
messages=[{"role": "user", "content": q["question"]}],
temperature=0.0,
)
answer = response.choices[0].message.content
if self._check_answer(answer, q["expected_keywords"]):
correct += 1
accuracy = correct / len(factual_questions)
assert accuracy >= 0.85, f"Factual accuracy {accuracy:.2%} below threshold 85%"
def test_no_hallucination(self, model_client, eval_dataset):
hallucination_prompts = [
q for q in eval_dataset if q["category"] == "hallucination_trap"
]
hallucinated = 0
for q in hallucination_prompts:
response = model_client.chat.completions.create(
model="default",
messages=[{"role": "user", "content": q["question"]}],
temperature=0.0,
)
answer = response.choices[0].message.content
if self._contains_hallucination(answer, q["trap_keywords"]):
hallucinated += 1
hallucination_rate = hallucinated / len(hallucination_prompts)
assert hallucination_rate <= 0.10, f"Hallucination rate {hallucination_rate:.2%} above threshold 10%"
def test_response_latency(self, model_client, eval_dataset):
import time
latencies = []
for q in eval_dataset[:20]:
start = time.time()
model_client.chat.completions.create(
model="default",
messages=[{"role": "user", "content": q["question"]}],
)
latencies.append(time.time() - start)
avg_latency = sum(latencies) / len(latencies)
p95_latency = sorted(latencies)[int(len(latencies) * 0.95)]
assert avg_latency <= 2.0, f"Average latency {avg_latency:.2f}s above threshold 2s"
assert p95_latency <= 5.0, f"P95 latency {p95_latency:.2f}s above threshold 5s"
def test_output_format_compliance(self, model_client, eval_dataset):
format_questions = [
q for q in eval_dataset if q.get("expected_format") == "json"
]
format_errors = 0
for q in format_questions:
response = model_client.chat.completions.create(
model="default",
messages=[{"role": "user", "content": q["question"]}],
temperature=0.0,
)
answer = response.choices[0].message.content
try:
json.loads(answer)
except json.JSONDecodeError:
format_errors += 1
format_accuracy = 1 - format_errors / len(format_questions)
assert format_accuracy >= 0.95, f"JSON format accuracy {format_accuracy:.2%} below 95%"
def test_regression_against_baseline(self, model_client, eval_dataset, baseline_scores):
current_scores = self._run_evaluation_suite(model_client, eval_dataset)
for metric, baseline_value in baseline_scores.items():
current_value = current_scores.get(metric, 0)
assert current_value >= baseline_value * 0.95, (
f"Regression detected: {metric} dropped from {baseline_value:.4f} to {current_value:.4f}"
)
@staticmethod
def _check_answer(answer: str, keywords: List[str]) -> bool:
answer_lower = answer.lower()
matched = sum(1 for kw in keywords if kw.lower() in answer_lower)
return matched >= len(keywords) * 0.6
@staticmethod
def _contains_hallucination(answer: str, trap_keywords: List[str]) -> bool:
answer_lower = answer.lower()
return any(kw.lower() in answer_lower for kw in trap_keywords)
@staticmethod
def _run_evaluation_suite(model_client, eval_dataset) -> Dict:
return {
"accuracy": 0.88,
"faithfulness": 0.91,
"relevancy": 0.85,
}
# pytest.ini
"""
[pytest]
testpaths = tests
python_files = test_model_quality.py
python_classes = TestModelQuality
python_functions = test_*
addopts = -v --tb=short --json-report --json-report-file=eval_report.json
markers =
smoke: smoke tests for quick validation
regression: full regression test suite
benchmark: performance benchmark tests
"""
CI/CD整合
# .github/workflows/model_eval.yml
name: Model Evaluation Pipeline
on:
pull_request:
paths:
- 'models/**'
- 'config/**'
jobs:
model-eval:
runs-on: gpu-runner
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.12'
- name: Install Dependencies
run: |
pip install -r requirements-eval.txt
pip install pytest pytest-json-report
- name: Deploy Model Canary
run: |
python scripts/deploy_canary.py --model-path ${{ env.MODEL_PATH }}
- name: Run Smoke Tests
run: pytest tests/ -m smoke -v
- name: Run Full Evaluation
run: pytest tests/ -m regression -v --json-report
- name: Check Regression
run: python scripts/check_regression.py --report eval_report.json --baseline data/baseline_scores.json
- name: Generate Report
if: always()
run: python scripts/generate_eval_report.py --report eval_report.json
- name: Upload Results
if: always()
uses: actions/upload-artifact@v4
with:
name: eval-results
path: eval_report.json
Pattern 4:A/B測試模型對比
為什麼需要A/B測試?
離線評測分數高不等於線上效果好。A/B測試在真實流量中對比模型,是最可靠的效果驗證方式。
A/B測試框架實作
# ab_test_framework.py
import hashlib
import random
import time
from typing import Dict, List, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import json
import statistics
class AllocationStrategy(Enum):
RANDOM = "random"
HASH_BASED = "hash_based"
STICKY = "sticky"
@dataclass
class ABTestConfig:
test_name: str
variant_a_name: str
variant_b_name: str
traffic_split: float = 0.5
min_sample_size: int = 1000
confidence_level: float = 0.95
allocation_strategy: AllocationStrategy = AllocationStrategy.HASH_BASED
duration_hours: int = 72
@dataclass
class ABTestResult:
query: str
variant: str
response: str
latency_ms: float
timestamp: str
user_feedback: Optional[int] = None
auto_score: Optional[float] = None
@dataclass
class ABTestReport:
test_name: str
variant_a: Dict
variant_b: Dict
winner: Optional[str] = None
confidence: float = 0.0
is_significant: bool = False
sample_size_a: int = 0
sample_size_b: int = 0
class ABTestRunner:
def __init__(self, config: ABTestConfig):
self.config = config
self.results: List[ABTestResult] = []
self._sticky_map: Dict[str, str] = {}
def allocate_variant(self, user_id: str) -> str:
if self.config.allocation_strategy == AllocationStrategy.RANDOM:
return self.config.variant_a_name if random.random() < self.config.traffic_split else self.config.variant_b_name
elif self.config.allocation_strategy == AllocationStrategy.HASH_BASED:
hash_val = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
threshold = int(self.config.traffic_split * (2**128))
variant = self.config.variant_a_name if hash_val < threshold else self.config.variant_b_name
return variant
elif self.config.allocation_strategy == AllocationStrategy.STICKY:
if user_id in self._sticky_map:
return self._sticky_map[user_id]
variant = self.allocate_variant(user_id + "_init")
self._sticky_map[user_id] = variant
return variant
def record_result(self, result: ABTestResult):
self.results.append(result)
def run_test(
self,
queries: List[str],
model_a_fn: Callable,
model_b_fn: Callable,
evaluator_fn: Optional[Callable] = None,
) -> ABTestReport:
for i, query in enumerate(queries):
user_id = f"user_{i}"
variant = self.allocate_variant(user_id)
model_fn = model_a_fn if variant == self.config.variant_a_name else model_b_fn
start_time = time.time()
response = model_fn(query)
latency_ms = (time.time() - start_time) * 1000
auto_score = evaluator_fn(query, response) if evaluator_fn else None
self.record_result(ABTestResult(
query=query,
variant=variant,
response=response,
latency_ms=latency_ms,
timestamp=datetime.now().isoformat(),
auto_score=auto_score,
))
return self.analyze()
def analyze(self) -> ABTestReport:
a_results = [r for r in self.results if r.variant == self.config.variant_a_name]
b_results = [r for r in self.results if r.variant == self.config.variant_b_name]
a_scores = [r.auto_score for r in a_results if r.auto_score is not None]
b_scores = [r.auto_score for r in b_results if r.auto_score is not None]
a_latencies = [r.latency_ms for r in a_results]
b_latencies = [r.latency_ms for r in b_results]
a_feedback = [r.user_feedback for r in a_results if r.user_feedback is not None]
b_feedback = [r.user_feedback for r in b_results if r.user_feedback is not None]
stats_a = {
"avg_score": statistics.mean(a_scores) if a_scores else 0,
"avg_latency_ms": statistics.mean(a_latencies) if a_latencies else 0,
"p95_latency_ms": sorted(a_latencies)[int(len(a_latencies) * 0.95)] if a_latencies else 0,
"avg_feedback": statistics.mean(a_feedback) if a_feedback else 0,
}
stats_b = {
"avg_score": statistics.mean(b_scores) if b_scores else 0,
"avg_latency_ms": statistics.mean(b_latencies) if b_latencies else 0,
"p95_latency_ms": sorted(b_latencies)[int(len(b_latencies) * 0.95)] if b_latencies else 0,
"avg_feedback": statistics.mean(b_feedback) if b_feedback else 0,
}
is_significant = False
confidence = 0.0
if a_scores and b_scores and len(a_scores) >= 30 and len(b_scores) >= 30:
confidence, is_significant = self._statistical_test(a_scores, b_scores)
winner = None
if is_significant:
if stats_a["avg_score"] > stats_b["avg_score"]:
winner = self.config.variant_a_name
else:
winner = self.config.variant_b_name
return ABTestReport(
test_name=self.config.test_name,
variant_a=stats_a,
variant_b=stats_b,
winner=winner,
confidence=confidence,
is_significant=is_significant,
sample_size_a=len(a_results),
sample_size_b=len(b_results),
)
@staticmethod
def _statistical_test(a: List[float], b: List[float]) -> tuple:
from scipy.stats import ttest_ind
t_stat, p_value = ttest_ind(a, b)
confidence = 1 - p_value
is_significant = p_value < 0.05
return round(confidence, 4), is_significant
if __name__ == "__main__":
config = ABTestConfig(
test_name="llm_v1_vs_v2",
variant_a_name="llama-3.1-8b",
variant_b_name="llama-3.1-8b-finetuned",
traffic_split=0.5,
min_sample_size=500,
)
runner = ABTestRunner(config)
def model_a_fn(query: str) -> str:
return f"Model A response to: {query}"
def model_b_fn(query: str) -> str:
return f"Model B enhanced response to: {query}"
def evaluator_fn(query: str, response: str) -> float:
return random.uniform(0.7, 1.0)
queries = [f"測試問題 {i}" for i in range(200)]
report = runner.run_test(queries, model_a_fn, model_b_fn, evaluator_fn)
print(f"Winner: {report.winner}")
print(f"Confidence: {report.confidence:.2%}")
print(f"Variant A avg score: {report.variant_a['avg_score']:.4f}")
print(f"Variant B avg score: {report.variant_b['avg_score']:.4f}")
Pattern 5:人工評估平台
為什麼需要人工評估?
自動化指標無法捕捉所有品質維度。流暢性、有用性、安全性、細微的事實錯誤——這些都需要人類判斷。人工評估是模型評估的「金標準」。
人工評估平台實作
# human_eval_platform.py
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime
import json
import uuid
import statistics
class EvalTaskType(Enum):
SINGLE_RESPONSE = "single_response"
PAIRWISE_COMPARISON = "pairwise_comparison"
RANKING = "ranking"
ERROR_ANNOTATION = "error_annotation"
class QualityDimension(Enum):
FACTUAL_ACCURACY = "factual_accuracy"
RELEVANCE = "relevance"
COHERENCE = "coherence"
FLUENCY = "fluency"
SAFETY = "safety"
HELPFULNESS = "helpfulness"
@dataclass
class EvalTask:
task_id: str
task_type: EvalTaskType
question: str
responses: List[str]
quality_dimensions: List[QualityDimension]
guidelines: str = ""
metadata: Dict = field(default_factory=dict)
@dataclass
class AnnotatorResult:
task_id: str
annotator_id: str
scores: Dict[str, float]
preference: Optional[int] = None
comments: str = ""
duration_seconds: float = 0.0
timestamp: str = ""
@dataclass
class InterAnnotatorAgreement:
dimension: str
cohens_kappa: float
fleiss_kappa: float
agreement_rate: float
class HumanEvalPlatform:
def __init__(self):
self.tasks: Dict[str, EvalTask] = {}
self.results: Dict[str, List[AnnotatorResult]] = {}
self.annotator_stats: Dict[str, Dict] = {}
def create_task(
self,
question: str,
responses: List[str],
task_type: EvalTaskType = EvalTaskType.SINGLE_RESPONSE,
quality_dimensions: Optional[List[QualityDimension]] = None,
guidelines: str = "",
) -> EvalTask:
task_id = str(uuid.uuid4())[:8]
task = EvalTask(
task_id=task_id,
task_type=task_type,
question=question,
responses=responses,
quality_dimensions=quality_dimensions or [
QualityDimension.FACTUAL_ACCURACY,
QualityDimension.RELEVANCE,
QualityDimension.COHERENCE,
],
guidelines=guidelines,
)
self.tasks[task_id] = task
self.results[task_id] = []
return task
def submit_annotation(self, result: AnnotatorResult):
if result.task_id not in self.results:
raise ValueError(f"Task {result.task_id} not found")
result.timestamp = datetime.now().isoformat()
self.results[result.task_id].append(result)
self._update_annotator_stats(result)
def get_next_task(self, annotator_id: str) -> Optional[EvalTask]:
for task_id, task in self.tasks.items():
existing_annotators = {r.annotator_id for r in self.results[task_id]}
if annotator_id not in existing_annotators and len(existing_annotators) < 3:
return task
return None
def compute_agreement(self, task_ids: Optional[List[str]] = None) -> List[InterAnnotatorAgreement]:
target_tasks = task_ids or list(self.tasks.keys())
agreements = []
all_dimensions = set()
for task_id in target_tasks:
for result in self.results.get(task_id, []):
all_dimensions.update(result.scores.keys())
for dimension in all_dimensions:
scores_by_task = {}
for task_id in target_tasks:
task_results = self.results.get(task_id, [])
if len(task_results) >= 2:
scores_by_task[task_id] = [r.scores.get(dimension, 0) for r in task_results]
if not scores_by_task:
continue
agreement_rate = self._compute_pairwise_agreement(scores_by_task)
cohens_kappa = self._compute_cohens_kappa(scores_by_task)
fleiss_kappa = self._compute_fleiss_kappa(scores_by_task)
agreements.append(InterAnnotatorAgreement(
dimension=dimension,
cohens_kappa=round(cohens_kappa, 4),
fleiss_kappa=round(fleiss_kappa, 4),
agreement_rate=round(agreement_rate, 4),
))
return agreements
def generate_report(self) -> Dict:
all_scores = {}
for task_id, results in self.results.items():
for result in results:
for dim, score in result.scores.items():
if dim not in all_scores:
all_scores[dim] = []
all_scores[dim].append(score)
dimension_stats = {}
for dim, scores in all_scores.items():
dimension_stats[dim] = {
"mean": round(statistics.mean(scores), 4),
"median": round(statistics.median(scores), 4),
"stdev": round(statistics.stdev(scores), 4) if len(scores) > 1 else 0,
"count": len(scores),
}
pairwise_stats = {}
pairwise_tasks = [
(tid, t) for tid, t in self.tasks.items()
if t.task_type == EvalTaskType.PAIRWISE_COMPARISON
]
for task_id, task in pairwise_tasks:
prefs = [r.preference for r in self.results[task_id] if r.preference is not None]
if prefs:
pairwise_stats[task_id] = {
"response_a_wins": sum(1 for p in prefs if p == 0),
"response_b_wins": sum(1 for p in prefs if p == 1),
"total_votes": len(prefs),
}
return {
"total_tasks": len(self.tasks),
"total_annotations": sum(len(r) for r in self.results.values()),
"dimension_stats": dimension_stats,
"pairwise_stats": pairwise_stats,
"annotator_count": len(self.annotator_stats),
}
def _update_annotator_stats(self, result: AnnotatorResult):
aid = result.annotator_id
if aid not in self.annotator_stats:
self.annotator_stats[aid] = {"count": 0, "total_duration": 0}
self.annotator_stats[aid]["count"] += 1
self.annotator_stats[aid]["total_duration"] += result.duration_seconds
@staticmethod
def _compute_pairwise_agreement(scores_by_task: Dict) -> float:
agreements = 0
total = 0
for task_id, scores in scores_by_task.items():
for i in range(len(scores)):
for j in range(i + 1, len(scores)):
if abs(scores[i] - scores[j]) <= 1:
agreements += 1
total += 1
return agreements / total if total > 0 else 0
@staticmethod
def _compute_cohens_kappa(scores_by_task: Dict) -> float:
if len(scores_by_task) < 1:
return 0.0
all_pairs = []
for scores in scores_by_task.values():
if len(scores) >= 2:
all_pairs.append((scores[0], scores[1]))
if not all_pairs:
return 0.0
rater1 = [p[0] for p in all_pairs]
rater2 = [p[1] for p in all_pairs]
n = len(rater1)
agree = sum(1 for a, b in zip(rater1, rater2) if abs(a - b) <= 1)
p_observed = agree / n
p_expected = 0.2
return (p_observed - p_expected) / (1 - p_expected) if (1 - p_expected) != 0 else 0
@staticmethod
def _compute_fleiss_kappa(scores_by_task: Dict) -> float:
return 0.0
if __name__ == "__main__":
platform = HumanEvalPlatform()
task = platform.create_task(
question="解釋量子運算的基本原理",
responses=[
"量子運算利用量子位元(qubit)的疊加態和糾纏態進行平行運算...",
"量子運算是一種利用量子力學原理的運算方式,透過量子位元實現...",
],
task_type=EvalTaskType.PAIRWISE_COMPARISON,
quality_dimensions=[
QualityDimension.FACTUAL_ACCURACY,
QualityDimension.RELEVANCE,
QualityDimension.COHERENCE,
],
)
for annotator_id in ["ann_1", "ann_2", "ann_3"]:
result = AnnotatorResult(
task_id=task.task_id,
annotator_id=annotator_id,
scores={
"factual_accuracy": 4.0 + (hash(annotator_id) % 10) / 10,
"relevance": 3.5 + (hash(annotator_id) % 10) / 10,
"coherence": 4.0 + (hash(annotator_id) % 10) / 10,
},
preference=0 if hash(annotator_id) % 2 == 0 else 1,
duration_seconds=45.0,
)
platform.submit_annotation(result)
report = platform.generate_report()
print(json.dumps(report, indent=2, ensure_ascii=False))
agreements = platform.compute_agreement()
for a in agreements:
print(f"{a.dimension}: κ={a.cohens_kappa:.4f}, agreement={a.agreement_rate:.2%}")
Pattern 6:生產監控與漂移檢測
為什麼需要生產監控?
模型上線不是終點,而是監控的起點。資料分佈變化、使用者行為偏移、模型退化——這些問題如果不及早發現,會導致沉默的品質下降。
漂移檢測系統實作
# production_monitor.py
import numpy as np
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import json
import logging
logger = logging.getLogger(__name__)
class DriftType(Enum):
DATA_DRIFT = "data_drift"
CONCEPT_DRIFT = "concept_drift"
PREDICTION_DRIFT = "prediction_drift"
class AlertLevel(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
@dataclass
class DriftAlert:
drift_type: DriftType
metric_name: str
current_value: float
baseline_value: float
drift_score: float
alert_level: AlertLevel
timestamp: str
message: str
@dataclass
class MonitoringWindow:
window_size: int = 1000
reference_size: int = 5000
class DataDriftDetector:
def __init__(
self,
reference_data: np.ndarray,
significance_level: float = 0.05,
):
self.reference_data = reference_data
self.significance_level = significance_level
self.reference_mean = np.mean(reference_data, axis=0)
self.reference_std = np.std(reference_data, axis=0)
def detect_ks_test(self, current_data: np.ndarray) -> Tuple[float, bool]:
from scipy.stats import ks_2samp
stat, p_value = ks_2samp(self.reference_data, current_data)
is_drift = p_value < self.significance_level
return p_value, is_drift
def detect_psi(self, current_data: np.ndarray, n_bins: int = 10) -> Tuple[float, bool]:
ref_hist, bin_edges = np.histogram(self.reference_data, bins=n_bins, density=True)
cur_hist, _ = np.histogram(current_data, bins=bin_edges, density=True)
ref_hist = ref_hist / ref_hist.sum()
cur_hist = cur_hist / cur_hist.sum()
ref_hist = np.clip(ref_hist, 1e-6, None)
cur_hist = np.clip(cur_hist, 1e-6, None)
psi = np.sum((cur_hist - ref_hist) * np.log(cur_hist / ref_hist))
is_drift = psi >= 0.1
return round(psi, 4), is_drift
def detect_z_score(self, current_data: np.ndarray, threshold: float = 3.0) -> Tuple[float, bool]:
current_mean = np.mean(current_data)
z_score = abs(current_mean - self.reference_mean) / (self.reference_std + 1e-8)
is_drift = z_score > threshold
return round(float(z_score), 4), is_drift
class PredictionDriftMonitor:
def __init__(
self,
reference_predictions: List[Dict],
window_size: int = 1000,
):
self.reference_predictions = reference_predictions
self.window_size = window_size
self.prediction_buffer: List[Dict] = []
self.alerts: List[DriftAlert] = []
def record_prediction(self, prediction: Dict):
self.prediction_buffer.append({
**prediction,
"timestamp": datetime.now().isoformat(),
})
if len(self.prediction_buffer) >= self.window_size:
self._check_drift()
self.prediction_buffer = self.prediction_buffer[-self.window_size // 2:]
def _check_drift(self):
ref_scores = [p.get("confidence", 0) for p in self.reference_predictions]
cur_scores = [p.get("confidence", 0) for p in self.prediction_buffer]
ref_arr = np.array(ref_scores)
cur_arr = np.array(cur_scores)
detector = DataDriftDetector(ref_arr)
psi_value, is_psi_drift = detector.detect_psi(cur_arr)
z_score, is_z_drift = detector.detect_z_score(cur_arr)
if is_psi_drift or is_z_drift:
level = AlertLevel.CRITICAL if psi_value > 0.25 else AlertLevel.WARNING
alert = DriftAlert(
drift_type=DriftType.PREDICTION_DRIFT,
metric_name="confidence_score",
current_value=float(np.mean(cur_arr)),
baseline_value=float(np.mean(ref_arr)),
drift_score=psi_value,
alert_level=level,
timestamp=datetime.now().isoformat(),
message=f"Prediction drift detected: PSI={psi_value:.4f}, Z-score={z_score:.4f}",
)
self.alerts.append(alert)
logger.warning(alert.message)
def get_health_report(self) -> Dict:
recent_alerts = [
a for a in self.alerts
if datetime.fromisoformat(a.timestamp) > datetime.now() - timedelta(hours=24)
]
return {
"total_predictions_monitored": len(self.prediction_buffer),
"alerts_last_24h": len(recent_alerts),
"critical_alerts": sum(1 for a in recent_alerts if a.alert_level == AlertLevel.CRITICAL),
"latest_drift_score": self.alerts[-1].drift_score if self.alerts else 0,
"status": "healthy" if not recent_alerts else "degraded",
}
class ModelPerformanceTracker:
def __init__(self, baseline_metrics: Dict[str, float]):
self.baseline_metrics = baseline_metrics
self.metric_history: List[Dict] = []
self.degradation_threshold = 0.05
def record_metrics(self, metrics: Dict[str, float]):
entry = {
"timestamp": datetime.now().isoformat(),
"metrics": metrics,
}
self.metric_history.append(entry)
self._check_degradation(metrics)
def _check_degradation(self, current_metrics: Dict[str, float]):
for metric_name, baseline_value in self.baseline_metrics.items():
current_value = current_metrics.get(metric_name)
if current_value is None:
continue
relative_change = (baseline_value - current_value) / baseline_value
if relative_change > self.degradation_threshold:
logger.warning(
f"Degradation detected: {metric_name} dropped from "
f"{baseline_value:.4f} to {current_value:.4f} "
f"({relative_change:.2%} decrease)"
)
def get_trend(self, metric_name: str, hours: int = 24) -> Dict:
cutoff = datetime.now() - timedelta(hours=hours)
recent = [
entry for entry in self.metric_history
if datetime.fromisoformat(entry["timestamp"]) > cutoff
and metric_name in entry["metrics"]
]
if not recent:
return {"trend": "no_data", "values": []}
values = [entry["metrics"][metric_name] for entry in recent]
trend = "stable"
if len(values) >= 3:
first_half = np.mean(values[:len(values)//2])
second_half = np.mean(values[len(values)//2:])
if second_half < first_half * 0.95:
trend = "declining"
elif second_half > first_half * 1.05:
trend = "improving"
return {
"trend": trend,
"current": values[-1],
"baseline": self.baseline_metrics.get(metric_name),
"values": values,
}
if __name__ == "__main__":
np.random.seed(42)
reference = np.random.normal(0.85, 0.05, 5000)
detector = DataDriftDetector(reference)
healthy_data = np.random.normal(0.84, 0.05, 1000)
drifted_data = np.random.normal(0.70, 0.08, 1000)
psi_healthy, drift_healthy = detector.detect_psi(healthy_data)
print(f"Healthy data: PSI={psi_healthy:.4f}, drift={drift_healthy}")
psi_drifted, drift_drifted = detector.detect_psi(drifted_data)
print(f"Drifted data: PSI={psi_drifted:.4f}, drift={drift_drifted}")
reference_preds = [{"confidence": np.random.uniform(0.8, 0.95)} for _ in range(5000)]
monitor = PredictionDriftMonitor(reference_preds, window_size=100)
for _ in range(50):
monitor.record_prediction({"confidence": np.random.uniform(0.8, 0.95)})
for _ in range(50):
monitor.record_prediction({"confidence": np.random.uniform(0.5, 0.7)})
health = monitor.get_health_report()
print(f"\nHealth Report: {json.dumps(health, indent=2)}")
5個常見陷阱與解決方案
陷阱1:評測資料洩露
訓練資料和評測資料有重疊,導致評測分數虛高。
解決方案:嚴格的資料隔離策略,使用hash去重確保無重疊。
def check_data_leakage(train_data: List[str], eval_data: List[str], threshold: float = 0.8) -> Dict:
from difflib import SequenceMatcher
leaks = []
for i, eval_item in enumerate(eval_data):
for train_item in train_data:
similarity = SequenceMatcher(None, eval_item, train_item).ratio()
if similarity > threshold:
leaks.append({"eval_index": i, "similarity": round(similarity, 4)})
break
return {"leak_count": len(leaks), "leak_rate": len(leaks) / len(eval_data)}
陷阱2:評測指標與業務目標脫節
模型在MMLU上得分很高,但業務KPI沒有改善。
解決方案:建立指標-業務映射表,確保評測指標與業務目標對齊。
def align_metrics_with_business(eval_metrics: Dict, business_kpis: Dict) -> List[str]:
alignment_map = {
"faithfulness": ["customer_satisfaction", "complaint_rate"],
"answer_relevancy": ["task_completion_rate", "user_engagement"],
"latency_p95": ["session_duration", "bounce_rate"],
}
misaligned = []
for metric in eval_metrics:
if metric not in alignment_map:
misaligned.append(f"Metric '{metric}' has no business KPI mapping")
return misaligned
陷阱3:A/B測試樣本量不足
跑了200條就下結論,統計功效不足。
解決方案:預先計算所需樣本量。
def calculate_sample_size(
baseline_rate: float,
minimum_detectable_effect: float,
alpha: float = 0.05,
power: float = 0.8,
) -> int:
import math
from scipy.stats import norm
z_alpha = norm.ppf(1 - alpha / 2)
z_beta = norm.ppf(power)
p1 = baseline_rate
p2 = baseline_rate * (1 + minimum_detectable_effect)
p_avg = (p1 + p2) / 2
n = (z_alpha * math.sqrt(2 * p_avg * (1 - p_avg)) +
z_beta * math.sqrt(p1 * (1 - p1) + p2 * (1 - p2))) ** 2 / (p2 - p1) ** 2
return math.ceil(n)
陷阱4:忽略評估者間一致性
多個人標註結果差異很大,但直接取平均,掩蓋了標註品質問題。
解決方案:計算Cohen's Kappa,低於0.6的維度需要重新標註。
陷阱5:生產環境漂移檢測延遲
只在月度評測時才發現模型退化,影響已持續數週。
解決方案:即時監控+滑動視窗檢測,設定小時級告警。
10個常見錯誤排查
| 錯誤現象 | 可能原因 | 排查步驟 | 解決方案 |
|---|---|---|---|
| lm-eval任務報OOM | batch_size過大或模型過大 | 減小batch_size,啟用FSDP | batch_size=1 + device_map=auto |
| RAGAS faithfulness為0 | contexts為空或answer為空 | 檢查eval_data中contexts欄位 | 確保contexts非空且內容相關 |
| pytest超時 | 模型推理延遲過高 | 檢查GPU利用率和batch設定 | 增加timeout或最佳化推理設定 |
| A/B測試結果不顯著 | 樣本量不足或效果差異太小 | 計算統計功效和所需樣本量 | 延長測試週期或提高流量比例 |
| 人工評估Kappa<0.4 | 標註指南不清晰 | 審查指南並做校準測試 | 增加範例和邊界case說明 |
| PSI誤報頻繁 | 參考資料分佈過窄 | 擴大參考資料時間視窗 | 使用30天資料作為基線 |
| 評測結果不可復現 | 隨機種子未固定 | 設定全域隨機種子 | torch.manual_seed(42) + numpy.random.seed(42) |
| 評測資料格式錯誤 | 欄位缺失或類型不匹配 | 用schema校驗資料 | 使用Pydantic模型校驗 |
| 漂移檢測延遲高 | 監控視窗設定過大 | 縮小滑動視窗 | 從1000條減到200條 |
| 評測報告指標缺失 | 某些metric計算失敗 | 檢查LLM API呼叫是否超時 | 新增重試機制和fallback |
進階最佳化技巧
1. 多維度交叉評估
單一指標無法全面反映模型品質。透過交叉評估矩陣發現短板:
class CrossDimensionEvaluator:
def __init__(self, dimensions: List[str]):
self.dimensions = dimensions
self.matrix = {d: {d2: [] for d2 in dimensions} for d in dimensions}
def evaluate_cross(self, samples: List[Dict], eval_fn) -> Dict:
for sample in samples:
scores = eval_fn(sample)
for d1 in self.dimensions:
for d2 in self.dimensions:
if d1 != d2:
self.matrix[d1][d2].append(scores.get(d1, 0) * scores.get(d2, 0))
correlation = {}
for d1 in self.dimensions:
for d2 in self.dimensions:
if d1 != d2 and self.matrix[d1][d2]:
import statistics
correlation[f"{d1}×{d2}"] = round(
statistics.mean(self.matrix[d1][d2]), 4
)
return correlation
2. 動態評測集生成
靜態評測集容易被「刷分」。動態生成評測題目,確保評測的公正性:
class DynamicEvalGenerator:
def __init__(self, llm_client):
self.llm = llm_client
def generate_eval_questions(
self,
domain: str,
difficulty: str = "medium",
count: int = 50,
) -> List[Dict]:
prompt = f"""生成{count}道{domain}領域的{difficulty}難度評測題目。
每道題包含:question, choices, answer, explanation。
以JSON陣列格式回傳。"""
response = self.llm.chat.completions.create(
model="default",
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
)
return json.loads(response.choices[0].message.content)
3. 分層評測策略
不同場景使用不同評測粒度,平衡評測成本和品質保證:
| 層級 | 觸發條件 | 評測範圍 | 評測時間 |
|---|---|---|---|
| L0 快速驗證 | 每次提交 | 50條核心用例 | <5分鐘 |
| L1 標準評測 | 每日建構 | 500條標準集 | ~30分鐘 |
| L2 全面評測 | 版本發布 | 5000條完整集 | ~2小時 |
| L3 人工抽檢 | 重大版本 | 100條專家評審 | ~1天 |
4. 評測結果版本管理
class EvalVersionManager:
def __init__(self, store_path: str = "./eval_versions"):
self.store_path = store_path
import os
os.makedirs(store_path, exist_ok=True)
def save_version(
self,
model_version: str,
eval_results: Dict,
eval_config: Dict,
) -> str:
version_id = f"{model_version}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
record = {
"version_id": version_id,
"model_version": model_version,
"eval_results": eval_results,
"eval_config": eval_config,
"timestamp": datetime.now().isoformat(),
}
path = os.path.join(self.store_path, f"{version_id}.json")
with open(path, "w", encoding="utf-8") as f:
json.dump(record, f, indent=2, ensure_ascii=False)
return version_id
def compare_versions(self, version_a: str, version_b: str) -> Dict:
data_a = self._load_version(version_a)
data_b = self._load_version(version_b)
diff = {}
for metric in data_a["eval_results"]:
if metric in data_b["eval_results"]:
diff[metric] = {
"a": data_a["eval_results"][metric],
"b": data_b["eval_results"][metric],
"delta": round(
data_b["eval_results"][metric] - data_a["eval_results"][metric], 4
),
}
return diff
5. 評測流水線編排
class EvalPipelineOrchestrator:
def __init__(self):
self.stages = []
def add_stage(self, name: str, eval_fn: Callable, gate_threshold: float = 0.0):
self.stages.append({
"name": name,
"eval_fn": eval_fn,
"gate_threshold": gate_threshold,
})
def run(self, model, dataset) -> Dict:
results = {}
for stage in self.stages:
print(f"Running stage: {stage['name']}")
score = stage["eval_fn"](model, dataset)
results[stage["name"]] = score
if score < stage["gate_threshold"]:
print(f"GATE FAILED: {stage['name']} score {score:.4f} < {stage['gate_threshold']}")
results["status"] = "gate_failed"
results["failed_stage"] = stage["name"]
return results
results["status"] = "passed"
return results
對比分析
| 評估模式 | 適用階段 | 評估維度 | 自動化程度 | 成本 | 可靠性 | 時效性 |
|---|---|---|---|---|---|---|
| LLM基準測試 | 模型選型/訓練 | 通用能力 | 高 | 低 | 中 | 低 |
| RAG評估 | RAG系統開發 | 檢索+生成 | 高 | 中 | 高 | 中 |
| 自動化評測 | CI/CD流水線 | 自定義指標 | 高 | 低 | 高 | 高 |
| A/B測試 | 模型上線 | 業務指標 | 中 | 高 | 高 | 低 |
| 人工評估 | 品質把關 | 全維度 | 低 | 高 | 最高 | 最低 |
| 生產監控 | 運維階段 | 漂移/退化 | 高 | 中 | 中 | 最高 |
| 漂移檢測方法 | 適用場景 | 檢測速度 | 誤報率 | 最低樣本量 |
|---|---|---|---|---|
| KS檢定 | 連續型資料分佈 | 快 | 中 | 100 |
| PSI | 分類型/連續型 | 快 | 低 | 500 |
| Z-score | 均值偏移 | 最快 | 高 | 30 |
| ADWIN | 串流資料 | 中 | 低 | 200 |
| Page-Hinkley | 累積漂移 | 中 | 中 | 100 |
線上工具推薦
- JSON資料格式化:/zh-TW/json/format
- Hash編碼工具:/zh-TW/encode/hash
- Curl轉程式碼:/zh-TW/dev/curl-to-code
總結:AI模型評估不是「跑個分數」就完事——它是一個覆蓋模型全生命週期的工程體系。6種生產模式各有定位:基準測試量化通用能力、RAG評估定位檢索生成短板、自動化評測保證CI/CD品質門禁、A/B測試驗證線上效果、人工評估把關品質上限、生產監控防止沉默退化。2026年,不做系統評估的AI系統,就是在盲飛。
延伸閱讀
本站提供瀏覽器本地工具,免註冊即可試用 →