Python OpenTelemetry LLM链路追踪:从Span到智能告警的6种生产模式

AI与大数据

你的LLM应用是个黑盒吗?

2026年,LLM应用已经深入生产环境。但大多数团队面临同一个问题:LLM调用链路不可见。一个用户请求可能经过Prompt构造、Embedding检索、多轮对话、模型推理、后处理等多个环节——任何一环出问题,你都只能靠猜。

传统APM工具无法理解LLM的特殊语义:Token用量、首Token延迟、Prompt泄露、幻觉检测。OpenTelemetry凭借其语义约定(Semantic Conventions)可扩展的Span属性,成为LLM链路追踪的最佳选择。

本文核心要点:

  • LLM Span埋点与属性标注的标准模式
  • Token用量追踪与成本实时监控
  • Prompt/Response结构化日志与脱敏
  • 多模型链路追踪(RAG + Agent + Tool)
  • 错误监控与异常检测
  • 智能告警与SLA保障体系

目录

  • OpenTelemetry LLM核心概念
  • Pattern 1: LLM Span埋点与属性标注
  • Pattern 2: Token用量追踪与成本监控
  • Pattern 3: Prompt/Response结构化日志
  • Pattern 4: 多模型链路追踪
  • Pattern 5: 错误监控与异常检测
  • Pattern 6: 智能告警与SLA保障
  • 5个常见坑及解决方案
  • 10个常见报错排查
  • 进阶优化技巧
  • 对比分析:OpenTelemetry vs LangSmith vs Promptflow
  • 在线工具推荐

OpenTelemetry LLM核心概念

为什么选择OpenTelemetry做LLM链路追踪

┌─────────────────────────────────────────────────────────┐
│                  LLM 应用链路追踪架构                      │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  用户请求 ──► API Gateway ──► LLM Service               │
│                                  │                      │
│                    ┌─────────────┼─────────────┐        │
│                    ▼             ▼             ▼        │
│              Embedding      Chat Model    Tool Call     │
│              Service        (GPT-4o)      Service       │
│                    │             │             │        │
│                    ▼             ▼             ▼        │
│              Vector DB      Reranker      External API  │
│                                                         │
│  每一跳 = 1个 Span,携带 LLM 语义属性                      │
│  整条链路 = 1个 Trace,端到端可追踪                         │
│                                                         │
│  ──► OTel Collector ──► Jaeger/Tempo (Trace)            │
│                      ──► Prometheus (Metrics)            │
│                      ──► Loki/ELK (Logs)                 │
└─────────────────────────────────────────────────────────┘

OpenTelemetry为LLM链路追踪提供了三大核心能力:

能力 说明 LLM场景价值
Trace 跨服务链路追踪 追踪从用户请求到LLM响应的完整调用链
Span 单次操作记录 记录每次LLM调用的模型、参数、Token用量
Metric 指标采集 实时监控Token消耗、延迟分布、错误率
Baggage 跨服务传递元数据 传递用户ID、会话ID、A/B实验标识

OpenTelemetry LLM语义约定(2026版)

Span Kind: CLIENT
Span Name:  gen_ai.client.chat

标准属性:
  gen_ai.system              = "openai"        # LLM提供商
  gen_ai.request.model       = "gpt-4o"        # 请求模型
  gen_ai.request.max_tokens  = 4096            # 最大输出Token
  gen_ai.request.temperature = 0.7             # 温度参数
  gen_ai.response.model      = "gpt-4o-2026-05-13"  # 实际模型
  gen_ai.response.finish_reasons = ["stop"]    # 结束原因
  gen_ai.usage.input_tokens  = 1523            # 输入Token
  gen_ai.usage.output_tokens = 847             # 输出Token

事件(Span Events):
  gen_ai.content.prompt     = "..."            # Prompt内容
  gen_ai.content.completion = "..."            # 响应内容

Pattern 1: LLM Span埋点与属性标注

基础Span创建

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource

resource = Resource.create({
    "service.name": "llm-chat-service",
    "service.version": "2.1.0",
    "deployment.environment": "production",
})

tracer_provider = TracerProvider(resource=resource)
tracer_provider.add_span_processor(
    BatchSpanProcessor(
        OTLPSpanExporter(endpoint="http://localhost:4317", insecure=True)
    )
)
trace.set_tracer_provider(tracer_provider)
tracer = trace.get_tracer("llm-chat-service", "2.1.0")

标准LLM Span埋点

import time
from opentelemetry import trace

tracer = trace.get_tracer(__name__)

def trace_llm_call(model: str, prompt: str, **kwargs):
    with tracer.start_as_current_span(
        "gen_ai.client.chat",
        kind=trace.SpanKind.CLIENT,
        attributes={
            "gen_ai.system": "openai",
            "gen_ai.request.model": model,
            "gen_ai.request.max_tokens": kwargs.get("max_tokens", 4096),
            "gen_ai.request.temperature": kwargs.get("temperature", 0.7),
        },
    ) as span:
        start_time = time.time()

        try:
            response = call_openai_api(model, prompt, **kwargs)

            span.set_attributes({
                "gen_ai.response.model": response.model,
                "gen_ai.response.finish_reasons": [
                    choice.finish_reason for choice in response.choices
                ],
                "gen_ai.usage.input_tokens": response.usage.prompt_tokens,
                "gen_ai.usage.output_tokens": response.usage.completion_tokens,
                "llm.time_to_first_token_ms": kwargs.get("ttft_ms", 0),
                "llm.total_duration_ms": (time.time() - start_time) * 1000,
            })

            span.add_event(
                "gen_ai.content.completion",
                attributes={
                    "gen_ai.content": response.choices[0].message.content[:500],
                },
            )

            return response

        except Exception as e:
            span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
            span.record_exception(e)
            raise

带事件标注的Span

def trace_llm_with_events(model: str, messages: list, **kwargs):
    with tracer.start_as_current_span(
        "gen_ai.client.chat",
        kind=trace.SpanKind.CLIENT,
        attributes={
            "gen_ai.system": "openai",
            "gen_ai.request.model": model,
            "gen_ai.request.max_tokens": kwargs.get("max_tokens", 4096),
        },
    ) as span:
        for i, msg in enumerate(messages):
            span.add_event(
                f"gen_ai.content.prompt.{i}",
                attributes={
                    "gen_ai.content.role": msg["role"],
                    "gen_ai.content": msg["content"][:1000],
                },
            )

        start_time = time.time()
        first_token_time = None

        response = call_openai_streaming(model, messages, **kwargs)

        chunks = []
        for chunk in response:
            if first_token_time is None:
                first_token_time = time.time()
                ttft_ms = (first_token_time - start_time) * 1000
                span.set_attribute("llm.time_to_first_token_ms", ttft_ms)
            chunks.append(chunk)

        full_content = "".join(chunks)
        span.set_attributes({
            "gen_ai.usage.output_tokens": len(full_content) // 4,
            "llm.total_duration_ms": (time.time() - start_time) * 1000,
        })

        span.add_event(
            "gen_ai.content.completion",
            attributes={"gen_ai.content": full_content[:500]},
        )

        return full_content

Pattern 2: Token用量追踪与成本监控

Token计数器与成本计算

from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter

metric_reader = PeriodicExportingMetricReader(
    OTLPMetricExporter(endpoint="http://localhost:4317", insecure=True),
    export_interval_millis=60000,
)
meter_provider = MeterProvider(metric_readers=[metric_reader])
metrics.set_meter_provider(meter_provider)

meter = metrics.get_meter("llm-chat-service", "2.1.0")

token_counter = meter.create_counter(
    name="gen_ai.client.token.usage",
    description="Token usage counter for LLM calls",
    unit="tokens",
)

cost_counter = meter.create_counter(
    name="llm.cost.usd",
    description="LLM API cost in USD",
    unit="USD",
)

llm_duration = meter.create_histogram(
    name="llm.request.duration",
    description="LLM request duration in milliseconds",
    unit="ms",
)

MODEL_PRICING = {
    "gpt-4o": {"input": 2.50 / 1_000_000, "output": 10.00 / 1_000_000},
    "gpt-4o-mini": {"input": 0.15 / 1_000_000, "output": 0.60 / 1_000_000},
    "claude-sonnet-4-20250514": {"input": 3.00 / 1_000_000, "output": 15.00 / 1_000_000},
    "deepseek-r1": {"input": 0.55 / 1_000_000, "output": 2.19 / 1_000_000},
}

def record_token_usage(model: str, input_tokens: int, output_tokens: int):
    token_counter.add(input_tokens, {
        "gen_ai.system": "openai",
        "gen_ai.request.model": model,
        "token.type": "input",
    })
    token_counter.add(output_tokens, {
        "gen_ai.system": "openai",
        "gen_ai.request.model": model,
        "token.type": "output",
    })

    pricing = MODEL_PRICING.get(model, {"input": 0, "output": 0})
    cost = input_tokens * pricing["input"] + output_tokens * pricing["output"]
    cost_counter.add(cost, {
        "gen_ai.request.model": model,
        "cost.type": "api_call",
    })

成本配额与预算控制

import threading
from datetime import datetime, timedelta
from collections import defaultdict

class TokenBudgetManager:
    def __init__(self, daily_budget_usd: float = 100.0):
        self.daily_budget = daily_budget_usd
        self.current_spend = defaultdict(float)
        self.lock = threading.Lock()
        self._reset_date = datetime.now().date()

    def check_and_record(self, model: str, input_tokens: int, output_tokens: int) -> bool:
        with self.lock:
            today = datetime.now().date()
            if today != self._reset_date:
                self.current_spend.clear()
                self._reset_date = today

            pricing = MODEL_PRICING.get(model, {"input": 0, "output": 0})
            cost = input_tokens * pricing["input"] + output_tokens * pricing["output"]

            total_spend = sum(self.current_spend.values()) + cost
            if total_spend > self.daily_budget:
                return False

            self.current_spend[model] += cost
            return True

    def get_remaining_budget(self) -> float:
        with self.lock:
            return self.daily_budget - sum(self.current_spend.values())

budget_manager = TokenBudgetManager(daily_budget_usd=200.0)

带预算检查的LLM调用

def call_llm_with_budget(model: str, messages: list, **kwargs):
    with tracer.start_as_current_span(
        "gen_ai.client.chat",
        kind=trace.SpanKind.CLIENT,
        attributes={
            "gen_ai.system": "openai",
            "gen_ai.request.model": model,
        },
    ) as span:
        response = call_openai_api(model, messages, **kwargs)

        input_tokens = response.usage.prompt_tokens
        output_tokens = response.usage.completion_tokens

        allowed = budget_manager.check_and_record(model, input_tokens, output_tokens)
        if not allowed:
            span.set_attribute("llm.budget.exceeded", True)
            raise BudgetExceededError(
                f"Daily budget exceeded. Remaining: ${budget_manager.get_remaining_budget():.2f}"
            )

        record_token_usage(model, input_tokens, output_tokens)

        span.set_attributes({
            "gen_ai.usage.input_tokens": input_tokens,
            "gen_ai.usage.output_tokens": output_tokens,
            "llm.cost.usd": calculate_cost(model, input_tokens, output_tokens),
            "llm.budget.remaining_usd": budget_manager.get_remaining_budget(),
        })

        return response

Pattern 3: Prompt/Response结构化日志

脱敏与结构化记录

import re
import json
from dataclasses import dataclass, field, asdict

@dataclass
class LLMCallLog:
    trace_id: str
    span_id: str
    model: str
    prompt_hash: str
    prompt_length: int
    response_hash: str
    response_length: int
    input_tokens: int
    output_tokens: int
    duration_ms: float
    ttft_ms: float = 0.0
    finish_reason: str = ""
    sanitized_prompt: str = ""
    sanitized_response: str = ""
    metadata: dict = field(default_factory=dict)

PII_PATTERNS = [
    (re.compile(r'\b\d{3}[-.]?\d{4}\b'), "[PHONE]"),
    (re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'), "[EMAIL]"),
    (re.compile(r'\b\d{17}[\dXx]\b'), "[ID_CARD]"),
    (re.compile(r'\b(?:\d[ -]*?){13,19}\b'), "[CREDIT_CARD]"),
    (re.compile(r'api[_-]?key[=:]\s*\S+', re.IGNORECASE), "[API_KEY]"),
]

def sanitize_text(text: str, max_length: int = 500) -> str:
    sanitized = text[:max_length]
    for pattern, replacement in PII_PATTERNS:
        sanitized = pattern.sub(replacement, sanitized)
    return sanitized

def hash_content(content: str) -> str:
    import hashlib
    return hashlib.sha256(content.encode()).hexdigest()[:16]

结构化日志记录器

import logging
import json
from datetime import datetime, timezone

class LLMStructuredLogger:
    def __init__(self, service_name: str = "llm-chat-service"):
        self.logger = logging.getLogger(f"{service_name}.llm_calls")
        self.sanitize_enabled = True
        self.max_content_length = 500

    def log_llm_call(self, span, model: str, prompt: str, response: str,
                     input_tokens: int, output_tokens: int,
                     duration_ms: float, ttft_ms: float = 0,
                     finish_reason: str = "", **metadata):
        log_entry = LLMCallLog(
            trace_id=format(span.get_span_context().trace_id, "032x"),
            span_id=format(span.get_span_context().span_id, "016x"),
            model=model,
            prompt_hash=hash_content(prompt),
            prompt_length=len(prompt),
            response_hash=hash_content(response),
            response_length=len(response),
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            duration_ms=round(duration_ms, 2),
            ttft_ms=round(ttft_ms, 2),
            finish_reason=finish_reason,
            sanitized_prompt=sanitize_text(prompt, self.max_content_length) if self.sanitize_enabled else "",
            sanitized_response=sanitize_text(response, self.max_content_length) if self.sanitize_enabled else "",
            metadata=metadata,
        )

        self.logger.info(
            "LLM call completed",
            extra={"llm_call": asdict(log_entry)},
        )

        span.add_event("gen_ai.content.prompt", attributes={
            "gen_ai.content": sanitize_text(prompt, 1000),
            "gen_ai.content.hash": hash_content(prompt),
        })
        span.add_event("gen_ai.content.completion", attributes={
            "gen_ai.content": sanitize_text(response, 1000),
            "gen_ai.content.hash": hash_content(response),
        })

llm_logger = LLMStructuredLogger()

带日志的完整LLM调用

def call_llm_with_logging(model: str, messages: list, **kwargs):
    with tracer.start_as_current_span(
        "gen_ai.client.chat",
        kind=trace.SpanKind.CLIENT,
        attributes={
            "gen_ai.system": "openai",
            "gen_ai.request.model": model,
            "gen_ai.request.temperature": kwargs.get("temperature", 0.7),
        },
    ) as span:
        start_time = time.time()
        prompt_text = json.dumps(messages, ensure_ascii=False)

        try:
            response = call_openai_api(model, messages, **kwargs)
            first_token_time = time.time()

            content = response.choices[0].message.content
            finish_reason = response.choices[0].finish_reason
            input_tokens = response.usage.prompt_tokens
            output_tokens = response.usage.completion_tokens
            duration_ms = (time.time() - start_time) * 1000

            llm_logger.log_llm_call(
                span=span,
                model=response.model,
                prompt=prompt_text,
                response=content,
                input_tokens=input_tokens,
                output_tokens=output_tokens,
                duration_ms=duration_ms,
                finish_reason=finish_reason,
                user_id=kwargs.get("user_id", "anonymous"),
                session_id=kwargs.get("session_id", ""),
            )

            span.set_attributes({
                "gen_ai.usage.input_tokens": input_tokens,
                "gen_ai.usage.output_tokens": output_tokens,
                "gen_ai.response.finish_reasons": [finish_reason],
            })

            return response

        except Exception as e:
            duration_ms = (time.time() - start_time) * 1000
            llm_logger.log_llm_call(
                span=span, model=model, prompt=prompt_text,
                response="", input_tokens=0, output_tokens=0,
                duration_ms=duration_ms, error=str(e),
            )
            span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
            span.record_exception(e)
            raise

Pattern 4: 多模型链路追踪

RAG链路追踪

def trace_rag_pipeline(query: str, user_id: str = ""):
    with tracer.start_as_current_span(
        "rag.pipeline",
        kind=trace.SpanKind.INTERNAL,
        attributes={
            "rag.query": sanitize_text(query),
            "rag.user_id": user_id,
        },
    ) as pipeline_span:
        embedding = trace_embedding(query, pipeline_span)
        documents = trace_vector_search(embedding, pipeline_span)
        context = trace_reranker(query, documents, pipeline_span)
        answer = trace_llm_generation(query, context, pipeline_span)

        pipeline_span.set_attributes({
            "rag.documents_retrieved": len(documents),
            "rag.context_length": len(context),
            "rag.answer_length": len(answer),
        })

        return answer

def trace_embedding(query: str, parent_span):
    with tracer.start_as_current_span(
        "gen_ai.client.embedding",
        kind=trace.SpanKind.CLIENT,
        attributes={
            "gen_ai.system": "openai",
            "gen_ai.request.model": "text-embedding-3-large",
            "gen_ai.request.encoding_format": "float",
        },
    ) as span:
        start_time = time.time()
        response = call_embedding_api(query)

        span.set_attributes({
            "gen_ai.usage.input_tokens": response.usage.prompt_tokens,
            "llm.duration_ms": (time.time() - start_time) * 1000,
        })

        return response.data[0].embedding

def trace_vector_search(embedding: list, parent_span):
    with tracer.start_as_current_span(
        "vector.search",
        kind=trace.SpanKind.CLIENT,
        attributes={
            "db.system": "pgvector",
            "db.operation": "similarity_search",
            "db.vector.dimension": len(embedding),
        },
    ) as span:
        start_time = time.time()
        results = search_pgvector(embedding, top_k=10)

        span.set_attributes({
            "db.results.count": len(results),
            "db.results.top_score": results[0].score if results else 0,
            "llm.duration_ms": (time.time() - start_time) * 1000,
        })

        for i, doc in enumerate(results[:3]):
            span.add_event(f"vector.search.result.{i}", attributes={
                "document.id": doc.id,
                "document.score": doc.score,
                "document.content_preview": doc.content[:200],
            })

        return results

def trace_reranker(query: str, documents: list, parent_span):
    with tracer.start_as_current_span(
        "gen_ai.client.rerank",
        kind=trace.SpanKind.CLIENT,
        attributes={
            "gen_ai.system": "cohere",
            "gen_ai.request.model": "rerank-v3.5",
            "rerank.documents_count": len(documents),
        },
    ) as span:
        start_time = time.time()
        reranked = call_reranker_api(query, documents, top_n=3)

        span.set_attributes({
            "rerank.results_count": len(reranked),
            "llm.duration_ms": (time.time() - start_time) * 1000,
        })

        return "\n".join([doc.content for doc in reranked])

def trace_llm_generation(query: str, context: str, parent_span):
    with tracer.start_as_current_span(
        "gen_ai.client.chat",
        kind=trace.SpanKind.CLIENT,
        attributes={
            "gen_ai.system": "openai",
            "gen_ai.request.model": "gpt-4o",
        },
    ) as span:
        start_time = time.time()
        messages = [
            {"role": "system", "content": f"基于以下上下文回答问题:\n{context}"},
            {"role": "user", "content": query},
        ]
        response = call_openai_api("gpt-4o", messages)

        span.set_attributes({
            "gen_ai.usage.input_tokens": response.usage.prompt_tokens,
            "gen_ai.usage.output_tokens": response.usage.completion_tokens,
            "llm.duration_ms": (time.time() - start_time) * 1000,
        })

        return response.choices[0].message.content

Agent链路追踪

def trace_agent_execution(user_query: str, max_iterations: int = 5):
    with tracer.start_as_current_span(
        "agent.execution",
        kind=trace.SpanKind.INTERNAL,
        attributes={
            "agent.type": "react",
            "agent.max_iterations": max_iterations,
            "agent.query": sanitize_text(user_query),
        },
    ) as agent_span:
        messages = [{"role": "user", "content": user_query}]
        iteration = 0
        tool_calls_log = []

        while iteration < max_iterations:
            iteration += 1

            with tracer.start_as_current_span(
                f"agent.iteration.{iteration}",
                kind=trace.SpanKind.INTERNAL,
            ) as iter_span:
                llm_response = trace_llm_call(
                    model="gpt-4o",
                    messages=messages,
                    temperature=0.0,
                )

                choice = llm_response.choices[0]

                if choice.finish_reason == "stop":
                    agent_span.set_attributes({
                        "agent.iterations": iteration,
                        "agent.final_answer_length": len(choice.message.content),
                    })
                    return choice.message.content

                if choice.message.tool_calls:
                    for tool_call in choice.message.tool_calls:
                        tool_result = trace_tool_call(tool_call, iter_span)
                        tool_calls_log.append({
                            "tool": tool_call.function.name,
                            "iteration": iteration,
                        })
                        messages.append({
                            "role": "tool",
                            "tool_call_id": tool_call.id,
                            "content": str(tool_result),
                        })

                iter_span.set_attribute("agent.iteration.tools_called", len(
                    choice.message.tool_calls or []
                ))

                messages.append(choice.message.model_dump())

        agent_span.set_attribute("agent.status", "max_iterations_reached")
        return "Agent reached maximum iterations without a final answer."

def trace_tool_call(tool_call, parent_span):
    with tracer.start_as_current_span(
        f"tool.{tool_call.function.name}",
        kind=trace.SpanKind.CLIENT,
        attributes={
            "tool.name": tool_call.function.name,
            "tool.call_id": tool_call.id,
        },
    ) as span:
        start_time = time.time()

        try:
            import json
            arguments = json.loads(tool_call.function.arguments)
            span.set_attribute("tool.arguments_hash", hash_content(
                tool_call.function.arguments
            ))

            result = execute_tool(tool_call.function.name, arguments)

            span.set_attributes({
                "tool.result_length": len(str(result)),
                "llm.duration_ms": (time.time() - start_time) * 1000,
                "tool.status": "success",
            })

            return result

        except Exception as e:
            span.set_attributes({
                "tool.status": "error",
                "tool.error": str(e),
            })
            span.record_exception(e)
            return f"Tool error: {e}"

多模型链路可视化

Trace: rag-agent-pipeline-trace-abc123

┌─ rag.pipeline (1280ms)
│  │
│  ├─ gen_ai.client.embedding (45ms)
│  │    model=text-embedding-3-large
│  │    tokens=23
│  │
│  ├─ vector.search (32ms)
│  │    db=pgvector, results=10
│  │
│  ├─ gen_ai.client.rerank (180ms)
│  │    model=rerank-v3.5
│  │    documents=10 → top_n=3
│  │
│  ├─ agent.execution (1023ms)
│  │  │
│  │  ├─ agent.iteration.1 (520ms)
│  │  │    ├─ gen_ai.client.chat (480ms)
│  │  │    │    model=gpt-4o, tokens=1523/847
│  │  │    └─ tool.search_database (40ms)
│  │  │
│  │  ├─ agent.iteration.2 (503ms)
│  │  │    ├─ gen_ai.client.chat (460ms)
│  │  │    │    model=gpt-4o, tokens=2100/620
│  │  │    └─ tool.calculate (43ms)
│  │  │
│  │  └─ agent.iteration.3 (final, 0ms)
│  │       final_answer_length=342
│  │
│  └─ Total: 3 LLM calls, 2 tool calls, 4247 input tokens, 1467 output tokens

Pattern 5: 错误监控与异常检测

LLM专用异常分类

from enum import Enum
from dataclasses import dataclass

class LLMErrorType(Enum):
    RATE_LIMIT = "rate_limit"
    CONTEXT_LENGTH_EXCEEDED = "context_length_exceeded"
    INVALID_API_KEY = "invalid_api_key"
    MODEL_NOT_FOUND = "model_not_found"
    TIMEOUT = "timeout"
    CONTENT_FILTER = "content_filter"
    HALLUCINATION = "hallucination"
    EMPTY_RESPONSE = "empty_response"
    BUDGET_EXCEEDED = "budget_exceeded"
    SERVICE_UNAVAILABLE = "service_unavailable"

@dataclass
class LLMErrorEvent:
    error_type: LLMErrorType
    model: str
    error_message: str
    trace_id: str
    span_id: str
    retry_count: int
    is_retryable: bool

def classify_llm_error(error: Exception) -> LLMErrorType:
    error_str = str(error).lower()
    if "rate_limit" in error_str or "429" in error_str:
        return LLMErrorType.RATE_LIMIT
    elif "context_length" in error_str or "maximum context" in error_str:
        return LLMErrorType.CONTEXT_LENGTH_EXCEEDED
    elif "invalid_api_key" in error_str or "401" in error_str:
        return LLMErrorType.INVALID_API_KEY
    elif "model_not_found" in error_str or "404" in error_str:
        return LLMErrorType.MODEL_NOT_FOUND
    elif "timeout" in error_str:
        return LLMErrorType.TIMEOUT
    elif "content_filter" in error_str or "content_policy" in error_str:
        return LLMErrorType.CONTENT_FILTER
    else:
        return LLMErrorType.SERVICE_UNAVAILABLE

错误追踪与重试

import time
from functools import wraps

error_counter = meter.create_counter(
    name="llm.errors",
    description="LLM call errors by type",
    unit="errors",
)

def llm_retry_with_tracing(max_retries: int = 3, base_delay: float = 1.0):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_error = None
            for attempt in range(max_retries + 1):
                with tracer.start_as_current_span(
                    f"llm.call.attempt.{attempt}",
                    kind=trace.SpanKind.CLIENT,
                ) as attempt_span:
                    attempt_span.set_attributes({
                        "llm.retry.attempt": attempt,
                        "llm.retry.max_retries": max_retries,
                    })

                    try:
                        result = func(*args, **kwargs)
                        if attempt > 0:
                            attempt_span.set_attribute("llm.retry.succeeded", True)
                        return result

                    except Exception as e:
                        error_type = classify_llm_error(e)
                        is_retryable = error_type in {
                            LLMErrorType.RATE_LIMIT,
                            LLMErrorType.TIMEOUT,
                            LLMErrorType.SERVICE_UNAVAILABLE,
                        }

                        error_counter.add(1, {
                            "error.type": error_type.value,
                            "gen_ai.request.model": kwargs.get("model", "unknown"),
                        })

                        attempt_span.set_attributes({
                            "error.type": error_type.value,
                            "error.message": str(e)[:200],
                            "error.retryable": is_retryable,
                        })
                        attempt_span.record_exception(e)
                        attempt_span.set_status(
                            trace.Status(trace.StatusCode.ERROR, str(e))
                        )

                        if not is_retryable or attempt == max_retries:
                            raise

                        delay = base_delay * (2 ** attempt)
                        time.sleep(delay)
                        last_error = e

            raise last_error
        return wrapper
    return decorator

幻觉检测Span

def trace_hallucination_check(
    query: str, answer: str, context: str, model: str = "gpt-4o-mini"
):
    with tracer.start_as_current_span(
        "llm.hallucination_check",
        kind=trace.SpanKind.INTERNAL,
        attributes={
            "llm.check.type": "hallucination",
            "llm.check.model": model,
        },
    ) as span:
        check_prompt = f"""请判断以下回答是否基于给定的上下文。如果回答包含上下文中不存在的信息,标记为幻觉。

上下文:
{context}

回答:
{answer}

请以JSON格式返回:{{"is_hallucination": true/false, "confidence": 0.0-1.0, "reason": "..."}}"""

        start_time = time.time()
        response = call_openai_api(model, [
            {"role": "user", "content": check_prompt}
        ], temperature=0.0)

        import json
        result = json.loads(response.choices[0].message.content)

        span.set_attributes({
            "llm.hallucination.detected": result.get("is_hallucination", False),
            "llm.hallucination.confidence": result.get("confidence", 0.0),
            "llm.hallucination.reason": result.get("reason", "")[:200],
            "llm.duration_ms": (time.time() - start_time) * 1000,
        })

        return result

Pattern 6: 智能告警与SLA保障

SLA指标定义

from dataclasses import dataclass

@dataclass
class LLMSLA:
    p50_latency_ms: float = 2000.0
    p95_latency_ms: float = 5000.0
    p99_latency_ms: float = 10000.0
    ttft_p95_ms: float = 1000.0
    error_rate_threshold: float = 0.01
    daily_budget_usd: float = 200.0
    hallucination_rate_threshold: float = 0.05
    min_success_rate: float = 0.99

sla = LLMSLA()

latency_histogram = meter.create_histogram(
    name="llm.request.latency",
    description="LLM request latency distribution",
    unit="ms",
)

ttft_histogram = meter.create_histogram(
    name="llm.time_to_first_token",
    description="Time to first token distribution",
    unit="ms",
)

error_rate_gauge = meter.create_observable_gauge(
    name="llm.error_rate",
    description="Current LLM error rate",
    callbacks=[observe_error_rate],
)

智能告警规则

from datetime import datetime, timedelta
from collections import deque
import threading

class LLMAlertManager:
    def __init__(self, sla_config: LLMSLA):
        self.sla = sla_config
        self.recent_latencies = deque(maxlen=1000)
        self.recent_errors = deque(maxlen=1000)
        self.recent_ttft = deque(maxlen=1000)
        self.lock = threading.Lock()
        self.alert_callbacks = []

    def add_alert_callback(self, callback):
        self.alert_callbacks.append(callback)

    def record_latency(self, latency_ms: float, ttft_ms: float = 0, is_error: bool = False):
        with self.lock:
            self.recent_latencies.append(latency_ms)
            self.recent_ttft.append(ttft_ms)
            self.recent_errors.append(1 if is_error else 0)

        self._check_alerts(latency_ms, ttft_ms, is_error)

    def _check_alerts(self, latency_ms: float, ttft_ms: float, is_error: bool):
        alerts = []

        if latency_ms > self.sla.p99_latency_ms:
            alerts.append({
                "level": "CRITICAL",
                "type": "latency_p99_breach",
                "message": f"LLM latency {latency_ms:.0f}ms exceeds P99 SLA {self.sla.p99_latency_ms:.0f}ms",
                "value": latency_ms,
                "threshold": self.sla.p99_latency_ms,
            })

        if ttft_ms > self.sla.ttft_p95_ms:
            alerts.append({
                "level": "WARNING",
                "type": "ttft_breach",
                "message": f"TTFT {ttft_ms:.0f}ms exceeds P95 SLA {self.sla.ttft_p95_ms:.0f}ms",
                "value": ttft_ms,
                "threshold": self.sla.ttft_p95_ms,
            })

        with self.lock:
            if len(self.recent_errors) >= 100:
                error_rate = sum(self.recent_errors) / len(self.recent_errors)
                if error_rate > self.sla.error_rate_threshold:
                    alerts.append({
                        "level": "CRITICAL",
                        "type": "error_rate_breach",
                        "message": f"Error rate {error_rate:.2%} exceeds threshold {self.sla.error_rate_threshold:.2%}",
                        "value": error_rate,
                        "threshold": self.sla.error_rate_threshold,
                    })

        for alert in alerts:
            self._fire_alert(alert)

    def _fire_alert(self, alert: dict):
        with tracer.start_as_current_span(
            "llm.alert",
            kind=trace.SpanKind.INTERNAL,
            attributes={
                "alert.level": alert["level"],
                "alert.type": alert["type"],
                "alert.message": alert["message"],
                "alert.value": str(alert["value"]),
                "alert.threshold": str(alert["threshold"]),
            },
        ):
            for callback in self.alert_callbacks:
                try:
                    callback(alert)
                except Exception:
                    pass

alert_manager = LLMAlertManager(sla)
alert_manager.add_alert_callback(lambda a: print(f"[{a['level']}] {a['message']}"))

告警规则配置(Prometheus)

groups:
  - name: llm_sla_alerts
    rules:
      - alert: LLMLatencyP99Breach
        expr: histogram_quantile(0.99, rate(llm_request_latency_bucket[5m])) > 10000
        for: 2m
        labels:
          severity: critical
          team: ai-platform
        annotations:
          summary: "LLM P99延迟超过SLA阈值"
          description: "模型 {{ $labels.gen_ai_request_model }} 的P99延迟为 {{ $value }}ms"

      - alert: LLMErrorRateHigh
        expr: rate(llm_errors_total[5m]) / rate(llm_calls_total[5m]) > 0.01
        for: 3m
        labels:
          severity: critical
          team: ai-platform
        annotations:
          summary: "LLM错误率超过1%"
          description: "模型 {{ $labels.gen_ai_request_model }} 错误率为 {{ $value | humanizePercentage }}"

      - alert: LLMTokenBudgetApproaching
        expr: llm_cost_usd_total > 180
        for: 5m
        labels:
          severity: warning
          team: ai-platform
        annotations:
          summary: "LLM日消费接近预算上限"
          description: "当前消费 ${{ $value }},预算上限 $200"

      - alert: LLMTTFTHigh
        expr: histogram_quantile(0.95, rate(llm_time_to_first_token_bucket[5m])) > 1000
        for: 3m
        labels:
          severity: warning
          team: ai-platform
        annotations:
          summary: "LLM首Token延迟P95超过1秒"
          description: "TTFT P95 = {{ $value }}ms"

5个常见坑及解决方案

坑1:Span属性过多导致采样丢失

BAD_PRACTICE = {
    "gen_ai.content.full_prompt": prompt[:10000],
    "gen_ai.content.full_response": response[:10000],
    "gen_ai.content.all_tool_calls": json.dumps(tool_calls),
}

GOOD_PRACTICE = {
    "gen_ai.usage.input_tokens": input_tokens,
    "gen_ai.usage.output_tokens": output_tokens,
    "gen_ai.response.finish_reasons": [finish_reason],
}

span.add_event("gen_ai.content.prompt", attributes={
    "gen_ai.content": sanitize_text(prompt, 1000),
})

解决方案:核心指标用Span属性,长文本用Span Event,超过1KB的内容截断或哈希。

坑2:Token计数不准

BAD_PRACTICE = {
    "gen_ai.usage.input_tokens": len(prompt) // 4,
}

GOOD_PRACTICE = {
    "gen_ai.usage.input_tokens": response.usage.prompt_tokens,
}

解决方案:始终使用API返回的usage字段,不要用字符数估算Token。

坑3:忽略Context Propagation

BAD_PRACTICE = None

from opentelemetry import context, baggage

GOOD_PRACTICE = {
    "propagate_context": context.attach(baggage.set_baggage("user.id", user_id)),
}

解决方案:在跨服务调用时,使用propagate注入和提取Trace Context。

坑4:流式响应不记录TTFT

BAD_PRACTICE = {
    "streaming_without_ttft": "只记录总延迟,忽略首Token时间",
}

GOOD_PRACTICE = """
with tracer.start_as_current_span("gen_ai.client.chat") as span:
    start = time.time()
    first_token_time = None
    for chunk in stream:
        if first_token_time is None:
            first_token_time = time.time()
            span.set_attribute("llm.time_to_first_token_ms",
                             (first_token_time - start) * 1000)
"""

解决方案:流式响应必须记录TTFT,这是用户体验的核心指标。

坑5:采样策略不当

from opentelemetry.sdk.trace.sampling import TraceIdRatioBased, ParentBased

BAD_SAMPLER = TraceIdRatioBased(rate=1.0)

GOOD_SAMPLER = ParentBased(
    root=TraceIdRatioBased(rate=0.1),
)

from opentelemetry.sdk.trace.sampling import Sampler, SamplingResult, Decision

class LLMAwareSampler(Sampler):
    def should_sample(self, parent_context, trace_id, name, kind,
                      attributes, links, trace_state):
        if attributes:
            system = attributes.get("gen_ai.system")
            if system:
                return SamplingResult(
                    decision=Decision.RECORD_AND_SAMPLE,
                    attributes=attributes,
                    trace_state=trace_state,
                )
        return SamplingResult(
            decision=Decision.RECORD_AND_SAMPLE if hash(trace_id) % 10 == 0 else Decision.DROP,
            attributes=attributes,
            trace_state=trace_state,
        )

    def get_description(self):
        return "LLMAwareSampler"

解决方案:LLM Span必须100%采样,非LLM Span可以按比例采样。


10个常见报错排查

# 报错信息 原因 解决方案
1 StatusCode.UNAVAILABLE OTLP导出失败 Collector未启动或端口错误 检查http://localhost:4317是否可达
2 context_length_exceeded Prompt超过模型上下文窗口 使用tiktoken预计算Token数,截断或分段
3 rate_limit_exceeded 429 API调用频率超限 实现指数退避重试,添加令牌桶限流
4 Span is not being recorded TracerProvider未配置 确认trace.set_tracer_provider()已调用
5 Baggage not propagated 跨服务未传播上下文 使用propagate.inject()/extract()
6 Metric reader timeout Metric导出超时 增大export_timeout_millis或减少采集频率
7 content_filter_triggered Prompt触发安全过滤 添加内容预检,使用moderation API
8 Empty response from LLM 模型返回空内容 检查finish_reason,添加重试逻辑
9 Memory leak in span processor BatchSpanProcessor积压 调整max_queue_sizeschedule_delay_millis
10 Duplicate span attributes 同一属性重复设置 使用span.set_attribute()而非add_event()记录指标

排查脚本

def diagnose_otel_setup():
    from opentelemetry import trace, metrics
    from opentelemetry.sdk.trace import TracerProvider
    from opentelemetry.sdk.metrics import MeterProvider

    issues = []

    provider = trace.get_tracer_provider()
    if not isinstance(provider, TracerProvider):
        issues.append("TracerProvider未正确配置,可能使用的是默认NoOpProvider")
    else:
        if not provider._active_span_processor._span_processors:
            issues.append("TracerProvider没有配置任何SpanProcessor")

    meter_provider = metrics.get_meter_provider()
    if not isinstance(meter_provider, MeterProvider):
        issues.append("MeterProvider未正确配置")

    try:
        import opentelemetry.instrumentation.openai
    except ImportError:
        issues.append("opentelemetry-instrumentation-openai未安装")

    try:
        import tiktoken
    except ImportError:
        issues.append("tiktoken未安装,无法预计算Token数")

    if issues:
        print("诊断发现问题:")
        for i, issue in enumerate(issues, 1):
            print(f"  {i}. {issue}")
    else:
        print("OpenTelemetry配置正常")

diagnose_otel_setup()

进阶优化技巧

自动埋点Instrumentation

from opentelemetry.instrumentation.openai import OpenAIInstrumentor

OpenAIInstrumentor().instrument()

from opentelemetry.instrumentation.requests import RequestsInstrumentor
RequestsInstrumentor().instrument()

from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor
AsyncioInstrumentor().instrument()

自定义Propagator

from opentelemetry.propagate import composite, global_set_text_map_propagator
from opentelemetry.trace.propagation.tracecontext import TraceContextPropagator
from opentelemetry.baggage.propagation import W3CBaggagePropagator

global_set_text_map_propagator(
    composite.CompositePropagator([
        TraceContextPropagator(),
        W3CBaggagePropagator(),
    ])
)

动态采样策略

from opentelemetry.sdk.trace.sampling import Sampler, SamplingResult, Decision

class AdaptiveLLMSampler(Sampler):
    def __init__(self, base_rate: float = 0.1, error_sample_rate: float = 1.0,
                 slow_threshold_ms: float = 5000.0):
        self.base_rate = base_rate
        self.error_sample_rate = error_sample_rate
        self.slow_threshold_ms = slow_threshold_ms

    def should_sample(self, parent_context, trace_id, name, kind,
                      attributes, links, trace_state):
        if attributes:
            is_llm = attributes.get("gen_ai.system") is not None
            if is_llm:
                return SamplingResult(
                    decision=Decision.RECORD_AND_SAMPLE,
                    attributes=attributes,
                    trace_state=trace_state,
                )

        if parent_context:
            from opentelemetry.trace import get_current_span
            parent_span = get_current_span(parent_context)
            if parent_span.is_recording():
                parent_attrs = parent_span.attributes or {}
                if parent_attrs.get("error.type"):
                    return SamplingResult(
                        decision=Decision.RECORD_AND_SAMPLE,
                        attributes=attributes,
                        trace_state=trace_state,
                    )

        should_sample = (hash(trace_id) % 10000) < (self.base_rate * 10000)
        return SamplingResult(
            decision=Decision.RECORD_AND_SAMPLE if should_sample else Decision.DROP,
            attributes=attributes,
            trace_state=trace_state,
        )

    def get_description(self):
        return f"AdaptiveLLMSampler(base={self.base_rate})"

Grafana仪表盘配置

{
  "dashboard": {
    "title": "LLM Observability Dashboard",
    "panels": [
      {
        "title": "Token Usage (Input vs Output)",
        "type": "timeseries",
        "targets": [{
          "expr": "sum(rate(gen_ai_client_token_usage[5m])) by (token_type, gen_ai_request_model)"
        }]
      },
      {
        "title": "LLM Request Latency P50/P95/P99",
        "type": "timeseries",
        "targets": [
          {"expr": "histogram_quantile(0.50, rate(llm_request_latency_bucket[5m]))"},
          {"expr": "histogram_quantile(0.95, rate(llm_request_latency_bucket[5m]))"},
          {"expr": "histogram_quantile(0.99, rate(llm_request_latency_bucket[5m]))"}
        ]
      },
      {
        "title": "Daily Cost by Model",
        "type": "piechart",
        "targets": [{
          "expr": "sum(increase(llm_cost_usd_total[24h])) by (gen_ai_request_model)"
        }]
      },
      {
        "title": "Error Rate by Type",
        "type": "barchart",
        "targets": [{
          "expr": "sum(rate(llm_errors_total[5m])) by (error_type)"
        }]
      }
    ]
  }
}

对比分析:OpenTelemetry vs LangSmith vs Promptflow

维度 OpenTelemetry LangSmith Promptflow
开源 是(CNCF项目) 否(SaaS) 是(微软)
厂商锁定 高(LangChain生态) 中(Azure生态)
链路追踪 原生支持,标准协议 自有Trace系统 自有Trace系统
Token追踪 需手动埋点 自动 自动
Prompt管理 不支持 支持 支持
成本监控 需自定义Metric 内置 内置
多语言支持 11+语言 Python/JS Python
与现有APM集成 原生(Jaeger/Tempo/Grafana) 需导出 需导出
自定义能力 极强 中等 中等
学习曲线 陡峭 平缓 中等
生产就绪度 高(CNCF毕业项目) 中等
数据主权 完全自控 数据在LangSmith 数据在Azure

选择建议

  • 已有多套APM基础设施 → OpenTelemetry(统一可观测性栈)
  • LangChain重度用户 → LangSmith(开箱即用)
  • Azure生态 → Promptflow(与Azure AI Studio深度集成)

在线工具推荐

在构建LLM链路追踪系统时,以下在线工具可以帮助你提升效率:


总结

Python OpenTelemetry LLM链路追踪是构建生产级AI应用可观测性的基石。6种生产模式覆盖了从基础Span埋点到智能告警的完整链路:

  1. LLM Span埋点 — 使用gen_ai.*语义约定标准化属性标注
  2. Token追踪 — 实时监控用量与成本,防止预算超支
  3. 结构化日志 — 脱敏记录Prompt/Response,兼顾调试与合规
  4. 多模型链路 — 追踪RAG、Agent等复杂调用链
  5. 错误监控 — 分类异常、自动重试、幻觉检测
  6. 智能告警 — SLA保障、动态采样、多级告警

关键原则:LLM Span必须100%采样,Token用量必须来自API返回值,Prompt内容必须脱敏,TTFT是用户体验的核心指标。

相关阅读

参考资源

本站提供浏览器本地工具,免注册即可试用 →

#Python#OpenTelemetry#LLM#链路追踪#可观测性#监控#2026#AI与大数据