Python OpenTelemetry LLM链路追踪:从Span到智能告警的6种生产模式
AI与大数据
你的LLM应用是个黑盒吗?
2026年,LLM应用已经深入生产环境。但大多数团队面临同一个问题:LLM调用链路不可见。一个用户请求可能经过Prompt构造、Embedding检索、多轮对话、模型推理、后处理等多个环节——任何一环出问题,你都只能靠猜。
传统APM工具无法理解LLM的特殊语义:Token用量、首Token延迟、Prompt泄露、幻觉检测。OpenTelemetry凭借其语义约定(Semantic Conventions)和可扩展的Span属性,成为LLM链路追踪的最佳选择。
本文核心要点:
- LLM Span埋点与属性标注的标准模式
- Token用量追踪与成本实时监控
- Prompt/Response结构化日志与脱敏
- 多模型链路追踪(RAG + Agent + Tool)
- 错误监控与异常检测
- 智能告警与SLA保障体系
目录
- OpenTelemetry LLM核心概念
- Pattern 1: LLM Span埋点与属性标注
- Pattern 2: Token用量追踪与成本监控
- Pattern 3: Prompt/Response结构化日志
- Pattern 4: 多模型链路追踪
- Pattern 5: 错误监控与异常检测
- Pattern 6: 智能告警与SLA保障
- 5个常见坑及解决方案
- 10个常见报错排查
- 进阶优化技巧
- 对比分析:OpenTelemetry vs LangSmith vs Promptflow
- 在线工具推荐
OpenTelemetry LLM核心概念
为什么选择OpenTelemetry做LLM链路追踪
┌─────────────────────────────────────────────────────────┐
│ LLM 应用链路追踪架构 │
├─────────────────────────────────────────────────────────┤
│ │
│ 用户请求 ──► API Gateway ──► LLM Service │
│ │ │
│ ┌─────────────┼─────────────┐ │
│ ▼ ▼ ▼ │
│ Embedding Chat Model Tool Call │
│ Service (GPT-4o) Service │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ Vector DB Reranker External API │
│ │
│ 每一跳 = 1个 Span,携带 LLM 语义属性 │
│ 整条链路 = 1个 Trace,端到端可追踪 │
│ │
│ ──► OTel Collector ──► Jaeger/Tempo (Trace) │
│ ──► Prometheus (Metrics) │
│ ──► Loki/ELK (Logs) │
└─────────────────────────────────────────────────────────┘
OpenTelemetry为LLM链路追踪提供了三大核心能力:
| 能力 | 说明 | LLM场景价值 |
|---|---|---|
| Trace | 跨服务链路追踪 | 追踪从用户请求到LLM响应的完整调用链 |
| Span | 单次操作记录 | 记录每次LLM调用的模型、参数、Token用量 |
| Metric | 指标采集 | 实时监控Token消耗、延迟分布、错误率 |
| Baggage | 跨服务传递元数据 | 传递用户ID、会话ID、A/B实验标识 |
OpenTelemetry LLM语义约定(2026版)
Span Kind: CLIENT
Span Name: gen_ai.client.chat
标准属性:
gen_ai.system = "openai" # LLM提供商
gen_ai.request.model = "gpt-4o" # 请求模型
gen_ai.request.max_tokens = 4096 # 最大输出Token
gen_ai.request.temperature = 0.7 # 温度参数
gen_ai.response.model = "gpt-4o-2026-05-13" # 实际模型
gen_ai.response.finish_reasons = ["stop"] # 结束原因
gen_ai.usage.input_tokens = 1523 # 输入Token
gen_ai.usage.output_tokens = 847 # 输出Token
事件(Span Events):
gen_ai.content.prompt = "..." # Prompt内容
gen_ai.content.completion = "..." # 响应内容
Pattern 1: LLM Span埋点与属性标注
基础Span创建
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
resource = Resource.create({
"service.name": "llm-chat-service",
"service.version": "2.1.0",
"deployment.environment": "production",
})
tracer_provider = TracerProvider(resource=resource)
tracer_provider.add_span_processor(
BatchSpanProcessor(
OTLPSpanExporter(endpoint="http://localhost:4317", insecure=True)
)
)
trace.set_tracer_provider(tracer_provider)
tracer = trace.get_tracer("llm-chat-service", "2.1.0")
标准LLM Span埋点
import time
from opentelemetry import trace
tracer = trace.get_tracer(__name__)
def trace_llm_call(model: str, prompt: str, **kwargs):
with tracer.start_as_current_span(
"gen_ai.client.chat",
kind=trace.SpanKind.CLIENT,
attributes={
"gen_ai.system": "openai",
"gen_ai.request.model": model,
"gen_ai.request.max_tokens": kwargs.get("max_tokens", 4096),
"gen_ai.request.temperature": kwargs.get("temperature", 0.7),
},
) as span:
start_time = time.time()
try:
response = call_openai_api(model, prompt, **kwargs)
span.set_attributes({
"gen_ai.response.model": response.model,
"gen_ai.response.finish_reasons": [
choice.finish_reason for choice in response.choices
],
"gen_ai.usage.input_tokens": response.usage.prompt_tokens,
"gen_ai.usage.output_tokens": response.usage.completion_tokens,
"llm.time_to_first_token_ms": kwargs.get("ttft_ms", 0),
"llm.total_duration_ms": (time.time() - start_time) * 1000,
})
span.add_event(
"gen_ai.content.completion",
attributes={
"gen_ai.content": response.choices[0].message.content[:500],
},
)
return response
except Exception as e:
span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
span.record_exception(e)
raise
带事件标注的Span
def trace_llm_with_events(model: str, messages: list, **kwargs):
with tracer.start_as_current_span(
"gen_ai.client.chat",
kind=trace.SpanKind.CLIENT,
attributes={
"gen_ai.system": "openai",
"gen_ai.request.model": model,
"gen_ai.request.max_tokens": kwargs.get("max_tokens", 4096),
},
) as span:
for i, msg in enumerate(messages):
span.add_event(
f"gen_ai.content.prompt.{i}",
attributes={
"gen_ai.content.role": msg["role"],
"gen_ai.content": msg["content"][:1000],
},
)
start_time = time.time()
first_token_time = None
response = call_openai_streaming(model, messages, **kwargs)
chunks = []
for chunk in response:
if first_token_time is None:
first_token_time = time.time()
ttft_ms = (first_token_time - start_time) * 1000
span.set_attribute("llm.time_to_first_token_ms", ttft_ms)
chunks.append(chunk)
full_content = "".join(chunks)
span.set_attributes({
"gen_ai.usage.output_tokens": len(full_content) // 4,
"llm.total_duration_ms": (time.time() - start_time) * 1000,
})
span.add_event(
"gen_ai.content.completion",
attributes={"gen_ai.content": full_content[:500]},
)
return full_content
Pattern 2: Token用量追踪与成本监控
Token计数器与成本计算
from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
metric_reader = PeriodicExportingMetricReader(
OTLPMetricExporter(endpoint="http://localhost:4317", insecure=True),
export_interval_millis=60000,
)
meter_provider = MeterProvider(metric_readers=[metric_reader])
metrics.set_meter_provider(meter_provider)
meter = metrics.get_meter("llm-chat-service", "2.1.0")
token_counter = meter.create_counter(
name="gen_ai.client.token.usage",
description="Token usage counter for LLM calls",
unit="tokens",
)
cost_counter = meter.create_counter(
name="llm.cost.usd",
description="LLM API cost in USD",
unit="USD",
)
llm_duration = meter.create_histogram(
name="llm.request.duration",
description="LLM request duration in milliseconds",
unit="ms",
)
MODEL_PRICING = {
"gpt-4o": {"input": 2.50 / 1_000_000, "output": 10.00 / 1_000_000},
"gpt-4o-mini": {"input": 0.15 / 1_000_000, "output": 0.60 / 1_000_000},
"claude-sonnet-4-20250514": {"input": 3.00 / 1_000_000, "output": 15.00 / 1_000_000},
"deepseek-r1": {"input": 0.55 / 1_000_000, "output": 2.19 / 1_000_000},
}
def record_token_usage(model: str, input_tokens: int, output_tokens: int):
token_counter.add(input_tokens, {
"gen_ai.system": "openai",
"gen_ai.request.model": model,
"token.type": "input",
})
token_counter.add(output_tokens, {
"gen_ai.system": "openai",
"gen_ai.request.model": model,
"token.type": "output",
})
pricing = MODEL_PRICING.get(model, {"input": 0, "output": 0})
cost = input_tokens * pricing["input"] + output_tokens * pricing["output"]
cost_counter.add(cost, {
"gen_ai.request.model": model,
"cost.type": "api_call",
})
成本配额与预算控制
import threading
from datetime import datetime, timedelta
from collections import defaultdict
class TokenBudgetManager:
def __init__(self, daily_budget_usd: float = 100.0):
self.daily_budget = daily_budget_usd
self.current_spend = defaultdict(float)
self.lock = threading.Lock()
self._reset_date = datetime.now().date()
def check_and_record(self, model: str, input_tokens: int, output_tokens: int) -> bool:
with self.lock:
today = datetime.now().date()
if today != self._reset_date:
self.current_spend.clear()
self._reset_date = today
pricing = MODEL_PRICING.get(model, {"input": 0, "output": 0})
cost = input_tokens * pricing["input"] + output_tokens * pricing["output"]
total_spend = sum(self.current_spend.values()) + cost
if total_spend > self.daily_budget:
return False
self.current_spend[model] += cost
return True
def get_remaining_budget(self) -> float:
with self.lock:
return self.daily_budget - sum(self.current_spend.values())
budget_manager = TokenBudgetManager(daily_budget_usd=200.0)
带预算检查的LLM调用
def call_llm_with_budget(model: str, messages: list, **kwargs):
with tracer.start_as_current_span(
"gen_ai.client.chat",
kind=trace.SpanKind.CLIENT,
attributes={
"gen_ai.system": "openai",
"gen_ai.request.model": model,
},
) as span:
response = call_openai_api(model, messages, **kwargs)
input_tokens = response.usage.prompt_tokens
output_tokens = response.usage.completion_tokens
allowed = budget_manager.check_and_record(model, input_tokens, output_tokens)
if not allowed:
span.set_attribute("llm.budget.exceeded", True)
raise BudgetExceededError(
f"Daily budget exceeded. Remaining: ${budget_manager.get_remaining_budget():.2f}"
)
record_token_usage(model, input_tokens, output_tokens)
span.set_attributes({
"gen_ai.usage.input_tokens": input_tokens,
"gen_ai.usage.output_tokens": output_tokens,
"llm.cost.usd": calculate_cost(model, input_tokens, output_tokens),
"llm.budget.remaining_usd": budget_manager.get_remaining_budget(),
})
return response
Pattern 3: Prompt/Response结构化日志
脱敏与结构化记录
import re
import json
from dataclasses import dataclass, field, asdict
@dataclass
class LLMCallLog:
trace_id: str
span_id: str
model: str
prompt_hash: str
prompt_length: int
response_hash: str
response_length: int
input_tokens: int
output_tokens: int
duration_ms: float
ttft_ms: float = 0.0
finish_reason: str = ""
sanitized_prompt: str = ""
sanitized_response: str = ""
metadata: dict = field(default_factory=dict)
PII_PATTERNS = [
(re.compile(r'\b\d{3}[-.]?\d{4}\b'), "[PHONE]"),
(re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'), "[EMAIL]"),
(re.compile(r'\b\d{17}[\dXx]\b'), "[ID_CARD]"),
(re.compile(r'\b(?:\d[ -]*?){13,19}\b'), "[CREDIT_CARD]"),
(re.compile(r'api[_-]?key[=:]\s*\S+', re.IGNORECASE), "[API_KEY]"),
]
def sanitize_text(text: str, max_length: int = 500) -> str:
sanitized = text[:max_length]
for pattern, replacement in PII_PATTERNS:
sanitized = pattern.sub(replacement, sanitized)
return sanitized
def hash_content(content: str) -> str:
import hashlib
return hashlib.sha256(content.encode()).hexdigest()[:16]
结构化日志记录器
import logging
import json
from datetime import datetime, timezone
class LLMStructuredLogger:
def __init__(self, service_name: str = "llm-chat-service"):
self.logger = logging.getLogger(f"{service_name}.llm_calls")
self.sanitize_enabled = True
self.max_content_length = 500
def log_llm_call(self, span, model: str, prompt: str, response: str,
input_tokens: int, output_tokens: int,
duration_ms: float, ttft_ms: float = 0,
finish_reason: str = "", **metadata):
log_entry = LLMCallLog(
trace_id=format(span.get_span_context().trace_id, "032x"),
span_id=format(span.get_span_context().span_id, "016x"),
model=model,
prompt_hash=hash_content(prompt),
prompt_length=len(prompt),
response_hash=hash_content(response),
response_length=len(response),
input_tokens=input_tokens,
output_tokens=output_tokens,
duration_ms=round(duration_ms, 2),
ttft_ms=round(ttft_ms, 2),
finish_reason=finish_reason,
sanitized_prompt=sanitize_text(prompt, self.max_content_length) if self.sanitize_enabled else "",
sanitized_response=sanitize_text(response, self.max_content_length) if self.sanitize_enabled else "",
metadata=metadata,
)
self.logger.info(
"LLM call completed",
extra={"llm_call": asdict(log_entry)},
)
span.add_event("gen_ai.content.prompt", attributes={
"gen_ai.content": sanitize_text(prompt, 1000),
"gen_ai.content.hash": hash_content(prompt),
})
span.add_event("gen_ai.content.completion", attributes={
"gen_ai.content": sanitize_text(response, 1000),
"gen_ai.content.hash": hash_content(response),
})
llm_logger = LLMStructuredLogger()
带日志的完整LLM调用
def call_llm_with_logging(model: str, messages: list, **kwargs):
with tracer.start_as_current_span(
"gen_ai.client.chat",
kind=trace.SpanKind.CLIENT,
attributes={
"gen_ai.system": "openai",
"gen_ai.request.model": model,
"gen_ai.request.temperature": kwargs.get("temperature", 0.7),
},
) as span:
start_time = time.time()
prompt_text = json.dumps(messages, ensure_ascii=False)
try:
response = call_openai_api(model, messages, **kwargs)
first_token_time = time.time()
content = response.choices[0].message.content
finish_reason = response.choices[0].finish_reason
input_tokens = response.usage.prompt_tokens
output_tokens = response.usage.completion_tokens
duration_ms = (time.time() - start_time) * 1000
llm_logger.log_llm_call(
span=span,
model=response.model,
prompt=prompt_text,
response=content,
input_tokens=input_tokens,
output_tokens=output_tokens,
duration_ms=duration_ms,
finish_reason=finish_reason,
user_id=kwargs.get("user_id", "anonymous"),
session_id=kwargs.get("session_id", ""),
)
span.set_attributes({
"gen_ai.usage.input_tokens": input_tokens,
"gen_ai.usage.output_tokens": output_tokens,
"gen_ai.response.finish_reasons": [finish_reason],
})
return response
except Exception as e:
duration_ms = (time.time() - start_time) * 1000
llm_logger.log_llm_call(
span=span, model=model, prompt=prompt_text,
response="", input_tokens=0, output_tokens=0,
duration_ms=duration_ms, error=str(e),
)
span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
span.record_exception(e)
raise
Pattern 4: 多模型链路追踪
RAG链路追踪
def trace_rag_pipeline(query: str, user_id: str = ""):
with tracer.start_as_current_span(
"rag.pipeline",
kind=trace.SpanKind.INTERNAL,
attributes={
"rag.query": sanitize_text(query),
"rag.user_id": user_id,
},
) as pipeline_span:
embedding = trace_embedding(query, pipeline_span)
documents = trace_vector_search(embedding, pipeline_span)
context = trace_reranker(query, documents, pipeline_span)
answer = trace_llm_generation(query, context, pipeline_span)
pipeline_span.set_attributes({
"rag.documents_retrieved": len(documents),
"rag.context_length": len(context),
"rag.answer_length": len(answer),
})
return answer
def trace_embedding(query: str, parent_span):
with tracer.start_as_current_span(
"gen_ai.client.embedding",
kind=trace.SpanKind.CLIENT,
attributes={
"gen_ai.system": "openai",
"gen_ai.request.model": "text-embedding-3-large",
"gen_ai.request.encoding_format": "float",
},
) as span:
start_time = time.time()
response = call_embedding_api(query)
span.set_attributes({
"gen_ai.usage.input_tokens": response.usage.prompt_tokens,
"llm.duration_ms": (time.time() - start_time) * 1000,
})
return response.data[0].embedding
def trace_vector_search(embedding: list, parent_span):
with tracer.start_as_current_span(
"vector.search",
kind=trace.SpanKind.CLIENT,
attributes={
"db.system": "pgvector",
"db.operation": "similarity_search",
"db.vector.dimension": len(embedding),
},
) as span:
start_time = time.time()
results = search_pgvector(embedding, top_k=10)
span.set_attributes({
"db.results.count": len(results),
"db.results.top_score": results[0].score if results else 0,
"llm.duration_ms": (time.time() - start_time) * 1000,
})
for i, doc in enumerate(results[:3]):
span.add_event(f"vector.search.result.{i}", attributes={
"document.id": doc.id,
"document.score": doc.score,
"document.content_preview": doc.content[:200],
})
return results
def trace_reranker(query: str, documents: list, parent_span):
with tracer.start_as_current_span(
"gen_ai.client.rerank",
kind=trace.SpanKind.CLIENT,
attributes={
"gen_ai.system": "cohere",
"gen_ai.request.model": "rerank-v3.5",
"rerank.documents_count": len(documents),
},
) as span:
start_time = time.time()
reranked = call_reranker_api(query, documents, top_n=3)
span.set_attributes({
"rerank.results_count": len(reranked),
"llm.duration_ms": (time.time() - start_time) * 1000,
})
return "\n".join([doc.content for doc in reranked])
def trace_llm_generation(query: str, context: str, parent_span):
with tracer.start_as_current_span(
"gen_ai.client.chat",
kind=trace.SpanKind.CLIENT,
attributes={
"gen_ai.system": "openai",
"gen_ai.request.model": "gpt-4o",
},
) as span:
start_time = time.time()
messages = [
{"role": "system", "content": f"基于以下上下文回答问题:\n{context}"},
{"role": "user", "content": query},
]
response = call_openai_api("gpt-4o", messages)
span.set_attributes({
"gen_ai.usage.input_tokens": response.usage.prompt_tokens,
"gen_ai.usage.output_tokens": response.usage.completion_tokens,
"llm.duration_ms": (time.time() - start_time) * 1000,
})
return response.choices[0].message.content
Agent链路追踪
def trace_agent_execution(user_query: str, max_iterations: int = 5):
with tracer.start_as_current_span(
"agent.execution",
kind=trace.SpanKind.INTERNAL,
attributes={
"agent.type": "react",
"agent.max_iterations": max_iterations,
"agent.query": sanitize_text(user_query),
},
) as agent_span:
messages = [{"role": "user", "content": user_query}]
iteration = 0
tool_calls_log = []
while iteration < max_iterations:
iteration += 1
with tracer.start_as_current_span(
f"agent.iteration.{iteration}",
kind=trace.SpanKind.INTERNAL,
) as iter_span:
llm_response = trace_llm_call(
model="gpt-4o",
messages=messages,
temperature=0.0,
)
choice = llm_response.choices[0]
if choice.finish_reason == "stop":
agent_span.set_attributes({
"agent.iterations": iteration,
"agent.final_answer_length": len(choice.message.content),
})
return choice.message.content
if choice.message.tool_calls:
for tool_call in choice.message.tool_calls:
tool_result = trace_tool_call(tool_call, iter_span)
tool_calls_log.append({
"tool": tool_call.function.name,
"iteration": iteration,
})
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": str(tool_result),
})
iter_span.set_attribute("agent.iteration.tools_called", len(
choice.message.tool_calls or []
))
messages.append(choice.message.model_dump())
agent_span.set_attribute("agent.status", "max_iterations_reached")
return "Agent reached maximum iterations without a final answer."
def trace_tool_call(tool_call, parent_span):
with tracer.start_as_current_span(
f"tool.{tool_call.function.name}",
kind=trace.SpanKind.CLIENT,
attributes={
"tool.name": tool_call.function.name,
"tool.call_id": tool_call.id,
},
) as span:
start_time = time.time()
try:
import json
arguments = json.loads(tool_call.function.arguments)
span.set_attribute("tool.arguments_hash", hash_content(
tool_call.function.arguments
))
result = execute_tool(tool_call.function.name, arguments)
span.set_attributes({
"tool.result_length": len(str(result)),
"llm.duration_ms": (time.time() - start_time) * 1000,
"tool.status": "success",
})
return result
except Exception as e:
span.set_attributes({
"tool.status": "error",
"tool.error": str(e),
})
span.record_exception(e)
return f"Tool error: {e}"
多模型链路可视化
Trace: rag-agent-pipeline-trace-abc123
┌─ rag.pipeline (1280ms)
│ │
│ ├─ gen_ai.client.embedding (45ms)
│ │ model=text-embedding-3-large
│ │ tokens=23
│ │
│ ├─ vector.search (32ms)
│ │ db=pgvector, results=10
│ │
│ ├─ gen_ai.client.rerank (180ms)
│ │ model=rerank-v3.5
│ │ documents=10 → top_n=3
│ │
│ ├─ agent.execution (1023ms)
│ │ │
│ │ ├─ agent.iteration.1 (520ms)
│ │ │ ├─ gen_ai.client.chat (480ms)
│ │ │ │ model=gpt-4o, tokens=1523/847
│ │ │ └─ tool.search_database (40ms)
│ │ │
│ │ ├─ agent.iteration.2 (503ms)
│ │ │ ├─ gen_ai.client.chat (460ms)
│ │ │ │ model=gpt-4o, tokens=2100/620
│ │ │ └─ tool.calculate (43ms)
│ │ │
│ │ └─ agent.iteration.3 (final, 0ms)
│ │ final_answer_length=342
│ │
│ └─ Total: 3 LLM calls, 2 tool calls, 4247 input tokens, 1467 output tokens
Pattern 5: 错误监控与异常检测
LLM专用异常分类
from enum import Enum
from dataclasses import dataclass
class LLMErrorType(Enum):
RATE_LIMIT = "rate_limit"
CONTEXT_LENGTH_EXCEEDED = "context_length_exceeded"
INVALID_API_KEY = "invalid_api_key"
MODEL_NOT_FOUND = "model_not_found"
TIMEOUT = "timeout"
CONTENT_FILTER = "content_filter"
HALLUCINATION = "hallucination"
EMPTY_RESPONSE = "empty_response"
BUDGET_EXCEEDED = "budget_exceeded"
SERVICE_UNAVAILABLE = "service_unavailable"
@dataclass
class LLMErrorEvent:
error_type: LLMErrorType
model: str
error_message: str
trace_id: str
span_id: str
retry_count: int
is_retryable: bool
def classify_llm_error(error: Exception) -> LLMErrorType:
error_str = str(error).lower()
if "rate_limit" in error_str or "429" in error_str:
return LLMErrorType.RATE_LIMIT
elif "context_length" in error_str or "maximum context" in error_str:
return LLMErrorType.CONTEXT_LENGTH_EXCEEDED
elif "invalid_api_key" in error_str or "401" in error_str:
return LLMErrorType.INVALID_API_KEY
elif "model_not_found" in error_str or "404" in error_str:
return LLMErrorType.MODEL_NOT_FOUND
elif "timeout" in error_str:
return LLMErrorType.TIMEOUT
elif "content_filter" in error_str or "content_policy" in error_str:
return LLMErrorType.CONTENT_FILTER
else:
return LLMErrorType.SERVICE_UNAVAILABLE
错误追踪与重试
import time
from functools import wraps
error_counter = meter.create_counter(
name="llm.errors",
description="LLM call errors by type",
unit="errors",
)
def llm_retry_with_tracing(max_retries: int = 3, base_delay: float = 1.0):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_error = None
for attempt in range(max_retries + 1):
with tracer.start_as_current_span(
f"llm.call.attempt.{attempt}",
kind=trace.SpanKind.CLIENT,
) as attempt_span:
attempt_span.set_attributes({
"llm.retry.attempt": attempt,
"llm.retry.max_retries": max_retries,
})
try:
result = func(*args, **kwargs)
if attempt > 0:
attempt_span.set_attribute("llm.retry.succeeded", True)
return result
except Exception as e:
error_type = classify_llm_error(e)
is_retryable = error_type in {
LLMErrorType.RATE_LIMIT,
LLMErrorType.TIMEOUT,
LLMErrorType.SERVICE_UNAVAILABLE,
}
error_counter.add(1, {
"error.type": error_type.value,
"gen_ai.request.model": kwargs.get("model", "unknown"),
})
attempt_span.set_attributes({
"error.type": error_type.value,
"error.message": str(e)[:200],
"error.retryable": is_retryable,
})
attempt_span.record_exception(e)
attempt_span.set_status(
trace.Status(trace.StatusCode.ERROR, str(e))
)
if not is_retryable or attempt == max_retries:
raise
delay = base_delay * (2 ** attempt)
time.sleep(delay)
last_error = e
raise last_error
return wrapper
return decorator
幻觉检测Span
def trace_hallucination_check(
query: str, answer: str, context: str, model: str = "gpt-4o-mini"
):
with tracer.start_as_current_span(
"llm.hallucination_check",
kind=trace.SpanKind.INTERNAL,
attributes={
"llm.check.type": "hallucination",
"llm.check.model": model,
},
) as span:
check_prompt = f"""请判断以下回答是否基于给定的上下文。如果回答包含上下文中不存在的信息,标记为幻觉。
上下文:
{context}
回答:
{answer}
请以JSON格式返回:{{"is_hallucination": true/false, "confidence": 0.0-1.0, "reason": "..."}}"""
start_time = time.time()
response = call_openai_api(model, [
{"role": "user", "content": check_prompt}
], temperature=0.0)
import json
result = json.loads(response.choices[0].message.content)
span.set_attributes({
"llm.hallucination.detected": result.get("is_hallucination", False),
"llm.hallucination.confidence": result.get("confidence", 0.0),
"llm.hallucination.reason": result.get("reason", "")[:200],
"llm.duration_ms": (time.time() - start_time) * 1000,
})
return result
Pattern 6: 智能告警与SLA保障
SLA指标定义
from dataclasses import dataclass
@dataclass
class LLMSLA:
p50_latency_ms: float = 2000.0
p95_latency_ms: float = 5000.0
p99_latency_ms: float = 10000.0
ttft_p95_ms: float = 1000.0
error_rate_threshold: float = 0.01
daily_budget_usd: float = 200.0
hallucination_rate_threshold: float = 0.05
min_success_rate: float = 0.99
sla = LLMSLA()
latency_histogram = meter.create_histogram(
name="llm.request.latency",
description="LLM request latency distribution",
unit="ms",
)
ttft_histogram = meter.create_histogram(
name="llm.time_to_first_token",
description="Time to first token distribution",
unit="ms",
)
error_rate_gauge = meter.create_observable_gauge(
name="llm.error_rate",
description="Current LLM error rate",
callbacks=[observe_error_rate],
)
智能告警规则
from datetime import datetime, timedelta
from collections import deque
import threading
class LLMAlertManager:
def __init__(self, sla_config: LLMSLA):
self.sla = sla_config
self.recent_latencies = deque(maxlen=1000)
self.recent_errors = deque(maxlen=1000)
self.recent_ttft = deque(maxlen=1000)
self.lock = threading.Lock()
self.alert_callbacks = []
def add_alert_callback(self, callback):
self.alert_callbacks.append(callback)
def record_latency(self, latency_ms: float, ttft_ms: float = 0, is_error: bool = False):
with self.lock:
self.recent_latencies.append(latency_ms)
self.recent_ttft.append(ttft_ms)
self.recent_errors.append(1 if is_error else 0)
self._check_alerts(latency_ms, ttft_ms, is_error)
def _check_alerts(self, latency_ms: float, ttft_ms: float, is_error: bool):
alerts = []
if latency_ms > self.sla.p99_latency_ms:
alerts.append({
"level": "CRITICAL",
"type": "latency_p99_breach",
"message": f"LLM latency {latency_ms:.0f}ms exceeds P99 SLA {self.sla.p99_latency_ms:.0f}ms",
"value": latency_ms,
"threshold": self.sla.p99_latency_ms,
})
if ttft_ms > self.sla.ttft_p95_ms:
alerts.append({
"level": "WARNING",
"type": "ttft_breach",
"message": f"TTFT {ttft_ms:.0f}ms exceeds P95 SLA {self.sla.ttft_p95_ms:.0f}ms",
"value": ttft_ms,
"threshold": self.sla.ttft_p95_ms,
})
with self.lock:
if len(self.recent_errors) >= 100:
error_rate = sum(self.recent_errors) / len(self.recent_errors)
if error_rate > self.sla.error_rate_threshold:
alerts.append({
"level": "CRITICAL",
"type": "error_rate_breach",
"message": f"Error rate {error_rate:.2%} exceeds threshold {self.sla.error_rate_threshold:.2%}",
"value": error_rate,
"threshold": self.sla.error_rate_threshold,
})
for alert in alerts:
self._fire_alert(alert)
def _fire_alert(self, alert: dict):
with tracer.start_as_current_span(
"llm.alert",
kind=trace.SpanKind.INTERNAL,
attributes={
"alert.level": alert["level"],
"alert.type": alert["type"],
"alert.message": alert["message"],
"alert.value": str(alert["value"]),
"alert.threshold": str(alert["threshold"]),
},
):
for callback in self.alert_callbacks:
try:
callback(alert)
except Exception:
pass
alert_manager = LLMAlertManager(sla)
alert_manager.add_alert_callback(lambda a: print(f"[{a['level']}] {a['message']}"))
告警规则配置(Prometheus)
groups:
- name: llm_sla_alerts
rules:
- alert: LLMLatencyP99Breach
expr: histogram_quantile(0.99, rate(llm_request_latency_bucket[5m])) > 10000
for: 2m
labels:
severity: critical
team: ai-platform
annotations:
summary: "LLM P99延迟超过SLA阈值"
description: "模型 {{ $labels.gen_ai_request_model }} 的P99延迟为 {{ $value }}ms"
- alert: LLMErrorRateHigh
expr: rate(llm_errors_total[5m]) / rate(llm_calls_total[5m]) > 0.01
for: 3m
labels:
severity: critical
team: ai-platform
annotations:
summary: "LLM错误率超过1%"
description: "模型 {{ $labels.gen_ai_request_model }} 错误率为 {{ $value | humanizePercentage }}"
- alert: LLMTokenBudgetApproaching
expr: llm_cost_usd_total > 180
for: 5m
labels:
severity: warning
team: ai-platform
annotations:
summary: "LLM日消费接近预算上限"
description: "当前消费 ${{ $value }},预算上限 $200"
- alert: LLMTTFTHigh
expr: histogram_quantile(0.95, rate(llm_time_to_first_token_bucket[5m])) > 1000
for: 3m
labels:
severity: warning
team: ai-platform
annotations:
summary: "LLM首Token延迟P95超过1秒"
description: "TTFT P95 = {{ $value }}ms"
5个常见坑及解决方案
坑1:Span属性过多导致采样丢失
BAD_PRACTICE = {
"gen_ai.content.full_prompt": prompt[:10000],
"gen_ai.content.full_response": response[:10000],
"gen_ai.content.all_tool_calls": json.dumps(tool_calls),
}
GOOD_PRACTICE = {
"gen_ai.usage.input_tokens": input_tokens,
"gen_ai.usage.output_tokens": output_tokens,
"gen_ai.response.finish_reasons": [finish_reason],
}
span.add_event("gen_ai.content.prompt", attributes={
"gen_ai.content": sanitize_text(prompt, 1000),
})
解决方案:核心指标用Span属性,长文本用Span Event,超过1KB的内容截断或哈希。
坑2:Token计数不准
BAD_PRACTICE = {
"gen_ai.usage.input_tokens": len(prompt) // 4,
}
GOOD_PRACTICE = {
"gen_ai.usage.input_tokens": response.usage.prompt_tokens,
}
解决方案:始终使用API返回的usage字段,不要用字符数估算Token。
坑3:忽略Context Propagation
BAD_PRACTICE = None
from opentelemetry import context, baggage
GOOD_PRACTICE = {
"propagate_context": context.attach(baggage.set_baggage("user.id", user_id)),
}
解决方案:在跨服务调用时,使用propagate注入和提取Trace Context。
坑4:流式响应不记录TTFT
BAD_PRACTICE = {
"streaming_without_ttft": "只记录总延迟,忽略首Token时间",
}
GOOD_PRACTICE = """
with tracer.start_as_current_span("gen_ai.client.chat") as span:
start = time.time()
first_token_time = None
for chunk in stream:
if first_token_time is None:
first_token_time = time.time()
span.set_attribute("llm.time_to_first_token_ms",
(first_token_time - start) * 1000)
"""
解决方案:流式响应必须记录TTFT,这是用户体验的核心指标。
坑5:采样策略不当
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased, ParentBased
BAD_SAMPLER = TraceIdRatioBased(rate=1.0)
GOOD_SAMPLER = ParentBased(
root=TraceIdRatioBased(rate=0.1),
)
from opentelemetry.sdk.trace.sampling import Sampler, SamplingResult, Decision
class LLMAwareSampler(Sampler):
def should_sample(self, parent_context, trace_id, name, kind,
attributes, links, trace_state):
if attributes:
system = attributes.get("gen_ai.system")
if system:
return SamplingResult(
decision=Decision.RECORD_AND_SAMPLE,
attributes=attributes,
trace_state=trace_state,
)
return SamplingResult(
decision=Decision.RECORD_AND_SAMPLE if hash(trace_id) % 10 == 0 else Decision.DROP,
attributes=attributes,
trace_state=trace_state,
)
def get_description(self):
return "LLMAwareSampler"
解决方案:LLM Span必须100%采样,非LLM Span可以按比例采样。
10个常见报错排查
| # | 报错信息 | 原因 | 解决方案 |
|---|---|---|---|
| 1 | StatusCode.UNAVAILABLE OTLP导出失败 |
Collector未启动或端口错误 | 检查http://localhost:4317是否可达 |
| 2 | context_length_exceeded |
Prompt超过模型上下文窗口 | 使用tiktoken预计算Token数,截断或分段 |
| 3 | rate_limit_exceeded 429 |
API调用频率超限 | 实现指数退避重试,添加令牌桶限流 |
| 4 | Span is not being recorded |
TracerProvider未配置 | 确认trace.set_tracer_provider()已调用 |
| 5 | Baggage not propagated |
跨服务未传播上下文 | 使用propagate.inject()/extract() |
| 6 | Metric reader timeout |
Metric导出超时 | 增大export_timeout_millis或减少采集频率 |
| 7 | content_filter_triggered |
Prompt触发安全过滤 | 添加内容预检,使用moderation API |
| 8 | Empty response from LLM |
模型返回空内容 | 检查finish_reason,添加重试逻辑 |
| 9 | Memory leak in span processor |
BatchSpanProcessor积压 | 调整max_queue_size和schedule_delay_millis |
| 10 | Duplicate span attributes |
同一属性重复设置 | 使用span.set_attribute()而非add_event()记录指标 |
排查脚本
def diagnose_otel_setup():
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.metrics import MeterProvider
issues = []
provider = trace.get_tracer_provider()
if not isinstance(provider, TracerProvider):
issues.append("TracerProvider未正确配置,可能使用的是默认NoOpProvider")
else:
if not provider._active_span_processor._span_processors:
issues.append("TracerProvider没有配置任何SpanProcessor")
meter_provider = metrics.get_meter_provider()
if not isinstance(meter_provider, MeterProvider):
issues.append("MeterProvider未正确配置")
try:
import opentelemetry.instrumentation.openai
except ImportError:
issues.append("opentelemetry-instrumentation-openai未安装")
try:
import tiktoken
except ImportError:
issues.append("tiktoken未安装,无法预计算Token数")
if issues:
print("诊断发现问题:")
for i, issue in enumerate(issues, 1):
print(f" {i}. {issue}")
else:
print("OpenTelemetry配置正常")
diagnose_otel_setup()
进阶优化技巧
自动埋点Instrumentation
from opentelemetry.instrumentation.openai import OpenAIInstrumentor
OpenAIInstrumentor().instrument()
from opentelemetry.instrumentation.requests import RequestsInstrumentor
RequestsInstrumentor().instrument()
from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor
AsyncioInstrumentor().instrument()
自定义Propagator
from opentelemetry.propagate import composite, global_set_text_map_propagator
from opentelemetry.trace.propagation.tracecontext import TraceContextPropagator
from opentelemetry.baggage.propagation import W3CBaggagePropagator
global_set_text_map_propagator(
composite.CompositePropagator([
TraceContextPropagator(),
W3CBaggagePropagator(),
])
)
动态采样策略
from opentelemetry.sdk.trace.sampling import Sampler, SamplingResult, Decision
class AdaptiveLLMSampler(Sampler):
def __init__(self, base_rate: float = 0.1, error_sample_rate: float = 1.0,
slow_threshold_ms: float = 5000.0):
self.base_rate = base_rate
self.error_sample_rate = error_sample_rate
self.slow_threshold_ms = slow_threshold_ms
def should_sample(self, parent_context, trace_id, name, kind,
attributes, links, trace_state):
if attributes:
is_llm = attributes.get("gen_ai.system") is not None
if is_llm:
return SamplingResult(
decision=Decision.RECORD_AND_SAMPLE,
attributes=attributes,
trace_state=trace_state,
)
if parent_context:
from opentelemetry.trace import get_current_span
parent_span = get_current_span(parent_context)
if parent_span.is_recording():
parent_attrs = parent_span.attributes or {}
if parent_attrs.get("error.type"):
return SamplingResult(
decision=Decision.RECORD_AND_SAMPLE,
attributes=attributes,
trace_state=trace_state,
)
should_sample = (hash(trace_id) % 10000) < (self.base_rate * 10000)
return SamplingResult(
decision=Decision.RECORD_AND_SAMPLE if should_sample else Decision.DROP,
attributes=attributes,
trace_state=trace_state,
)
def get_description(self):
return f"AdaptiveLLMSampler(base={self.base_rate})"
Grafana仪表盘配置
{
"dashboard": {
"title": "LLM Observability Dashboard",
"panels": [
{
"title": "Token Usage (Input vs Output)",
"type": "timeseries",
"targets": [{
"expr": "sum(rate(gen_ai_client_token_usage[5m])) by (token_type, gen_ai_request_model)"
}]
},
{
"title": "LLM Request Latency P50/P95/P99",
"type": "timeseries",
"targets": [
{"expr": "histogram_quantile(0.50, rate(llm_request_latency_bucket[5m]))"},
{"expr": "histogram_quantile(0.95, rate(llm_request_latency_bucket[5m]))"},
{"expr": "histogram_quantile(0.99, rate(llm_request_latency_bucket[5m]))"}
]
},
{
"title": "Daily Cost by Model",
"type": "piechart",
"targets": [{
"expr": "sum(increase(llm_cost_usd_total[24h])) by (gen_ai_request_model)"
}]
},
{
"title": "Error Rate by Type",
"type": "barchart",
"targets": [{
"expr": "sum(rate(llm_errors_total[5m])) by (error_type)"
}]
}
]
}
}
对比分析:OpenTelemetry vs LangSmith vs Promptflow
| 维度 | OpenTelemetry | LangSmith | Promptflow |
|---|---|---|---|
| 开源 | 是(CNCF项目) | 否(SaaS) | 是(微软) |
| 厂商锁定 | 无 | 高(LangChain生态) | 中(Azure生态) |
| 链路追踪 | 原生支持,标准协议 | 自有Trace系统 | 自有Trace系统 |
| Token追踪 | 需手动埋点 | 自动 | 自动 |
| Prompt管理 | 不支持 | 支持 | 支持 |
| 成本监控 | 需自定义Metric | 内置 | 内置 |
| 多语言支持 | 11+语言 | Python/JS | Python |
| 与现有APM集成 | 原生(Jaeger/Tempo/Grafana) | 需导出 | 需导出 |
| 自定义能力 | 极强 | 中等 | 中等 |
| 学习曲线 | 陡峭 | 平缓 | 中等 |
| 生产就绪度 | 高(CNCF毕业项目) | 高 | 中等 |
| 数据主权 | 完全自控 | 数据在LangSmith | 数据在Azure |
选择建议:
- 已有多套APM基础设施 → OpenTelemetry(统一可观测性栈)
- LangChain重度用户 → LangSmith(开箱即用)
- Azure生态 → Promptflow(与Azure AI Studio深度集成)
在线工具推荐
在构建LLM链路追踪系统时,以下在线工具可以帮助你提升效率:
- JSON格式化工具 — 格式化OpenTelemetry Span JSON数据,快速排查属性问题
- Base64编码工具 — 编码/解码Trace Context传播中的Base64数据
- cURL转代码工具 — 将API调试cURL命令转换为Python代码
总结
Python OpenTelemetry LLM链路追踪是构建生产级AI应用可观测性的基石。6种生产模式覆盖了从基础Span埋点到智能告警的完整链路:
- LLM Span埋点 — 使用
gen_ai.*语义约定标准化属性标注 - Token追踪 — 实时监控用量与成本,防止预算超支
- 结构化日志 — 脱敏记录Prompt/Response,兼顾调试与合规
- 多模型链路 — 追踪RAG、Agent等复杂调用链
- 错误监控 — 分类异常、自动重试、幻觉检测
- 智能告警 — SLA保障、动态采样、多级告警
关键原则:LLM Span必须100%采样,Token用量必须来自API返回值,Prompt内容必须脱敏,TTFT是用户体验的核心指标。
相关阅读
- OpenTelemetry LLM可观测性实战指南 — 深入理解LLM语义约定与Grafana仪表盘
- Go OpenTelemetry分布式追踪 — Go语言的OTel链路追踪实践
- Python AI部署生产化指南 — LLM应用从开发到生产的完整路径
参考资源
- OpenTelemetry LLM Semantic Conventions — 官方LLM语义约定规范
- OpenTelemetry Python Documentation — Python SDK文档
本站提供浏览器本地工具,免注册即可试用 →
#Python#OpenTelemetry#LLM#链路追踪#可观测性#监控#2026#AI与大数据