Python OpenTelemetry LLM鏈路追蹤:從Span到智慧告警的6種生產模式

AI与大数据

你的LLM應用是個黑盒嗎?

2026年,LLM應用已經深入生產環境。但大多數團隊面臨同一個問題:LLM呼叫鏈路不可見。一個使用者請求可能經過Prompt構造、Embedding檢索、多輪對話、模型推理、後處理等多個環節——任何一環出問題,你都只能靠猜。

傳統APM工具無法理解LLM的特殊語意:Token用量、首Token延遲、Prompt洩露、幻覺檢測。OpenTelemetry憑藉其語意約定(Semantic Conventions)可擴展的Span屬性,成為LLM鏈路追蹤的最佳選擇。

本文核心要點:

  • LLM Span埋點與屬性標註的標準模式
  • Token用量追蹤與成本即時監控
  • Prompt/Response結構化日誌與脫敏
  • 多模型鏈路追蹤(RAG + Agent + Tool)
  • 錯誤監控與異常檢測
  • 智慧告警與SLA保障體系

目錄

  • OpenTelemetry LLM核心概念
  • Pattern 1: LLM Span埋點與屬性標註
  • Pattern 2: Token用量追蹤與成本監控
  • Pattern 3: Prompt/Response結構化日誌
  • Pattern 4: 多模型鏈路追蹤
  • Pattern 5: 錯誤監控與異常檢測
  • Pattern 6: 智慧告警與SLA保障
  • 5個常見坑及解決方案
  • 10個常見報錯排查
  • 進階最佳化技巧
  • 對比分析:OpenTelemetry vs LangSmith vs Promptflow
  • 線上工具推薦

OpenTelemetry LLM核心概念

為什麼選擇OpenTelemetry做LLM鏈路追蹤

┌─────────────────────────────────────────────────────────┐
│                  LLM 應用鏈路追蹤架構                      │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  使用者請求 ──► API Gateway ──► LLM Service              │
│                                   │                     │
│                     ┌─────────────┼─────────────┐       │
│                     ▼             ▼             ▼       │
│               Embedding      Chat Model    Tool Call    │
│               Service        (GPT-4o)      Service      │
│                     │             │             │       │
│                     ▼             ▼             ▼       │
│               Vector DB      Reranker      External API │
│                                                         │
│  每一跳 = 1個 Span,攜帶 LLM 語意屬性                      │
│  整條鏈路 = 1個 Trace,端到端可追蹤                         │
│                                                         │
│  ──► OTel Collector ──► Jaeger/Tempo (Trace)            │
│                      ──► Prometheus (Metrics)            │
│                      ──► Loki/ELK (Logs)                 │
└─────────────────────────────────────────────────────────┘

OpenTelemetry為LLM鏈路追蹤提供了三大核心能力:

能力 說明 LLM場景價值
Trace 跨服務鏈路追蹤 追蹤從使用者請求到LLM回應的完整呼叫鏈
Span 單次操作記錄 記錄每次LLM呼叫的模型、參數、Token用量
Metric 指標採集 即時監控Token消耗、延遲分佈、錯誤率
Baggage 跨服務傳遞元資料 傳遞使用者ID、會話ID、A/B實驗標識

OpenTelemetry LLM語意約定(2026版)

Span Kind: CLIENT
Span Name:  gen_ai.client.chat

標準屬性:
  gen_ai.system              = "openai"        # LLM提供商
  gen_ai.request.model       = "gpt-4o"        # 請求模型
  gen_ai.request.max_tokens  = 4096            # 最大輸出Token
  gen_ai.request.temperature = 0.7             # 溫度參數
  gen_ai.response.model      = "gpt-4o-2026-05-13"  # 實際模型
  gen_ai.response.finish_reasons = ["stop"]    # 結束原因
  gen_ai.usage.input_tokens  = 1523            # 輸入Token
  gen_ai.usage.output_tokens = 847             # 輸出Token

事件(Span Events):
  gen_ai.content.prompt     = "..."            # Prompt內容
  gen_ai.content.completion = "..."            # 回應內容

Pattern 1: LLM Span埋點與屬性標註

基礎Span建立

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource

resource = Resource.create({
    "service.name": "llm-chat-service",
    "service.version": "2.1.0",
    "deployment.environment": "production",
})

tracer_provider = TracerProvider(resource=resource)
tracer_provider.add_span_processor(
    BatchSpanProcessor(
        OTLPSpanExporter(endpoint="http://localhost:4317", insecure=True)
    )
)
trace.set_tracer_provider(tracer_provider)
tracer = trace.get_tracer("llm-chat-service", "2.1.0")

標準LLM Span埋點

import time
from opentelemetry import trace

tracer = trace.get_tracer(__name__)

def trace_llm_call(model: str, prompt: str, **kwargs):
    with tracer.start_as_current_span(
        "gen_ai.client.chat",
        kind=trace.SpanKind.CLIENT,
        attributes={
            "gen_ai.system": "openai",
            "gen_ai.request.model": model,
            "gen_ai.request.max_tokens": kwargs.get("max_tokens", 4096),
            "gen_ai.request.temperature": kwargs.get("temperature", 0.7),
        },
    ) as span:
        start_time = time.time()

        try:
            response = call_openai_api(model, prompt, **kwargs)

            span.set_attributes({
                "gen_ai.response.model": response.model,
                "gen_ai.response.finish_reasons": [
                    choice.finish_reason for choice in response.choices
                ],
                "gen_ai.usage.input_tokens": response.usage.prompt_tokens,
                "gen_ai.usage.output_tokens": response.usage.completion_tokens,
                "llm.time_to_first_token_ms": kwargs.get("ttft_ms", 0),
                "llm.total_duration_ms": (time.time() - start_time) * 1000,
            })

            span.add_event(
                "gen_ai.content.completion",
                attributes={
                    "gen_ai.content": response.choices[0].message.content[:500],
                },
            )

            return response

        except Exception as e:
            span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
            span.record_exception(e)
            raise

帶事件標註的Span

def trace_llm_with_events(model: str, messages: list, **kwargs):
    with tracer.start_as_current_span(
        "gen_ai.client.chat",
        kind=trace.SpanKind.CLIENT,
        attributes={
            "gen_ai.system": "openai",
            "gen_ai.request.model": model,
            "gen_ai.request.max_tokens": kwargs.get("max_tokens", 4096),
        },
    ) as span:
        for i, msg in enumerate(messages):
            span.add_event(
                f"gen_ai.content.prompt.{i}",
                attributes={
                    "gen_ai.content.role": msg["role"],
                    "gen_ai.content": msg["content"][:1000],
                },
            )

        start_time = time.time()
        first_token_time = None

        response = call_openai_streaming(model, messages, **kwargs)

        chunks = []
        for chunk in response:
            if first_token_time is None:
                first_token_time = time.time()
                ttft_ms = (first_token_time - start_time) * 1000
                span.set_attribute("llm.time_to_first_token_ms", ttft_ms)
            chunks.append(chunk)

        full_content = "".join(chunks)
        span.set_attributes({
            "gen_ai.usage.output_tokens": len(full_content) // 4,
            "llm.total_duration_ms": (time.time() - start_time) * 1000,
        })

        span.add_event(
            "gen_ai.content.completion",
            attributes={"gen_ai.content": full_content[:500]},
        )

        return full_content

Pattern 2: Token用量追蹤與成本監控

Token計數器與成本計算

from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter

metric_reader = PeriodicExportingMetricReader(
    OTLPMetricExporter(endpoint="http://localhost:4317", insecure=True),
    export_interval_millis=60000,
)
meter_provider = MeterProvider(metric_readers=[metric_reader])
metrics.set_meter_provider(meter_provider)

meter = metrics.get_meter("llm-chat-service", "2.1.0")

token_counter = meter.create_counter(
    name="gen_ai.client.token.usage",
    description="Token usage counter for LLM calls",
    unit="tokens",
)

cost_counter = meter.create_counter(
    name="llm.cost.usd",
    description="LLM API cost in USD",
    unit="USD",
)

MODEL_PRICING = {
    "gpt-4o": {"input": 2.50 / 1_000_000, "output": 10.00 / 1_000_000},
    "gpt-4o-mini": {"input": 0.15 / 1_000_000, "output": 0.60 / 1_000_000},
    "claude-sonnet-4-20250514": {"input": 3.00 / 1_000_000, "output": 15.00 / 1_000_000},
    "deepseek-r1": {"input": 0.55 / 1_000_000, "output": 2.19 / 1_000_000},
}

def record_token_usage(model: str, input_tokens: int, output_tokens: int):
    token_counter.add(input_tokens, {
        "gen_ai.system": "openai",
        "gen_ai.request.model": model,
        "token.type": "input",
    })
    token_counter.add(output_tokens, {
        "gen_ai.system": "openai",
        "gen_ai.request.model": model,
        "token.type": "output",
    })

    pricing = MODEL_PRICING.get(model, {"input": 0, "output": 0})
    cost = input_tokens * pricing["input"] + output_tokens * pricing["output"]
    cost_counter.add(cost, {
        "gen_ai.request.model": model,
        "cost.type": "api_call",
    })

成本配額與預算控制

import threading
from datetime import datetime
from collections import defaultdict

class TokenBudgetManager:
    def __init__(self, daily_budget_usd: float = 100.0):
        self.daily_budget = daily_budget_usd
        self.current_spend = defaultdict(float)
        self.lock = threading.Lock()
        self._reset_date = datetime.now().date()

    def check_and_record(self, model: str, input_tokens: int, output_tokens: int) -> bool:
        with self.lock:
            today = datetime.now().date()
            if today != self._reset_date:
                self.current_spend.clear()
                self._reset_date = today

            pricing = MODEL_PRICING.get(model, {"input": 0, "output": 0})
            cost = input_tokens * pricing["input"] + output_tokens * pricing["output"]

            total_spend = sum(self.current_spend.values()) + cost
            if total_spend > self.daily_budget:
                return False

            self.current_spend[model] += cost
            return True

    def get_remaining_budget(self) -> float:
        with self.lock:
            return self.daily_budget - sum(self.current_spend.values())

budget_manager = TokenBudgetManager(daily_budget_usd=200.0)

帶預算檢查的LLM呼叫

def call_llm_with_budget(model: str, messages: list, **kwargs):
    with tracer.start_as_current_span(
        "gen_ai.client.chat",
        kind=trace.SpanKind.CLIENT,
        attributes={
            "gen_ai.system": "openai",
            "gen_ai.request.model": model,
        },
    ) as span:
        response = call_openai_api(model, messages, **kwargs)

        input_tokens = response.usage.prompt_tokens
        output_tokens = response.usage.completion_tokens

        allowed = budget_manager.check_and_record(model, input_tokens, output_tokens)
        if not allowed:
            span.set_attribute("llm.budget.exceeded", True)
            raise BudgetExceededError(
                f"Daily budget exceeded. Remaining: ${budget_manager.get_remaining_budget():.2f}"
            )

        record_token_usage(model, input_tokens, output_tokens)

        span.set_attributes({
            "gen_ai.usage.input_tokens": input_tokens,
            "gen_ai.usage.output_tokens": output_tokens,
            "llm.cost.usd": calculate_cost(model, input_tokens, output_tokens),
            "llm.budget.remaining_usd": budget_manager.get_remaining_budget(),
        })

        return response

Pattern 3: Prompt/Response結構化日誌

脫敏與結構化記錄

import re
import json
from dataclasses import dataclass, field, asdict

@dataclass
class LLMCallLog:
    trace_id: str
    span_id: str
    model: str
    prompt_hash: str
    prompt_length: int
    response_hash: str
    response_length: int
    input_tokens: int
    output_tokens: int
    duration_ms: float
    ttft_ms: float = 0.0
    finish_reason: str = ""
    sanitized_prompt: str = ""
    sanitized_response: str = ""
    metadata: dict = field(default_factory=dict)

PII_PATTERNS = [
    (re.compile(r'\b\d{3}[-.]?\d{4}\b'), "[PHONE]"),
    (re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'), "[EMAIL]"),
    (re.compile(r'\b\d{17}[\dXx]\b'), "[ID_CARD]"),
    (re.compile(r'\b(?:\d[ -]*?){13,19}\b'), "[CREDIT_CARD]"),
    (re.compile(r'api[_-]?key[=:]\s*\S+', re.IGNORECASE), "[API_KEY]"),
]

def sanitize_text(text: str, max_length: int = 500) -> str:
    sanitized = text[:max_length]
    for pattern, replacement in PII_PATTERNS:
        sanitized = pattern.sub(replacement, sanitized)
    return sanitized

def hash_content(content: str) -> str:
    import hashlib
    return hashlib.sha256(content.encode()).hexdigest()[:16]

結構化日誌記錄器

import logging
from datetime import datetime, timezone

class LLMStructuredLogger:
    def __init__(self, service_name: str = "llm-chat-service"):
        self.logger = logging.getLogger(f"{service_name}.llm_calls")
        self.sanitize_enabled = True
        self.max_content_length = 500

    def log_llm_call(self, span, model: str, prompt: str, response: str,
                     input_tokens: int, output_tokens: int,
                     duration_ms: float, ttft_ms: float = 0,
                     finish_reason: str = "", **metadata):
        log_entry = LLMCallLog(
            trace_id=format(span.get_span_context().trace_id, "032x"),
            span_id=format(span.get_span_context().span_id, "016x"),
            model=model,
            prompt_hash=hash_content(prompt),
            prompt_length=len(prompt),
            response_hash=hash_content(response),
            response_length=len(response),
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            duration_ms=round(duration_ms, 2),
            ttft_ms=round(ttft_ms, 2),
            finish_reason=finish_reason,
            sanitized_prompt=sanitize_text(prompt, self.max_content_length) if self.sanitize_enabled else "",
            sanitized_response=sanitize_text(response, self.max_content_length) if self.sanitize_enabled else "",
            metadata=metadata,
        )

        self.logger.info(
            "LLM call completed",
            extra={"llm_call": asdict(log_entry)},
        )

        span.add_event("gen_ai.content.prompt", attributes={
            "gen_ai.content": sanitize_text(prompt, 1000),
            "gen_ai.content.hash": hash_content(prompt),
        })
        span.add_event("gen_ai.content.completion", attributes={
            "gen_ai.content": sanitize_text(response, 1000),
            "gen_ai.content.hash": hash_content(response),
        })

llm_logger = LLMStructuredLogger()

Pattern 4: 多模型鏈路追蹤

RAG鏈路追蹤

def trace_rag_pipeline(query: str, user_id: str = ""):
    with tracer.start_as_current_span(
        "rag.pipeline",
        kind=trace.SpanKind.INTERNAL,
        attributes={
            "rag.query": sanitize_text(query),
            "rag.user_id": user_id,
        },
    ) as pipeline_span:
        embedding = trace_embedding(query, pipeline_span)
        documents = trace_vector_search(embedding, pipeline_span)
        context = trace_reranker(query, documents, pipeline_span)
        answer = trace_llm_generation(query, context, pipeline_span)

        pipeline_span.set_attributes({
            "rag.documents_retrieved": len(documents),
            "rag.context_length": len(context),
            "rag.answer_length": len(answer),
        })

        return answer

def trace_embedding(query: str, parent_span):
    with tracer.start_as_current_span(
        "gen_ai.client.embedding",
        kind=trace.SpanKind.CLIENT,
        attributes={
            "gen_ai.system": "openai",
            "gen_ai.request.model": "text-embedding-3-large",
        },
    ) as span:
        start_time = time.time()
        response = call_embedding_api(query)

        span.set_attributes({
            "gen_ai.usage.input_tokens": response.usage.prompt_tokens,
            "llm.duration_ms": (time.time() - start_time) * 1000,
        })

        return response.data[0].embedding

def trace_vector_search(embedding: list, parent_span):
    with tracer.start_as_current_span(
        "vector.search",
        kind=trace.SpanKind.CLIENT,
        attributes={
            "db.system": "pgvector",
            "db.operation": "similarity_search",
        },
    ) as span:
        start_time = time.time()
        results = search_pgvector(embedding, top_k=10)

        span.set_attributes({
            "db.results.count": len(results),
            "db.results.top_score": results[0].score if results else 0,
            "llm.duration_ms": (time.time() - start_time) * 1000,
        })

        return results

def trace_llm_generation(query: str, context: str, parent_span):
    with tracer.start_as_current_span(
        "gen_ai.client.chat",
        kind=trace.SpanKind.CLIENT,
        attributes={
            "gen_ai.system": "openai",
            "gen_ai.request.model": "gpt-4o",
        },
    ) as span:
        start_time = time.time()
        messages = [
            {"role": "system", "content": f"基於以下上下文回答問題:\n{context}"},
            {"role": "user", "content": query},
        ]
        response = call_openai_api("gpt-4o", messages)

        span.set_attributes({
            "gen_ai.usage.input_tokens": response.usage.prompt_tokens,
            "gen_ai.usage.output_tokens": response.usage.completion_tokens,
            "llm.duration_ms": (time.time() - start_time) * 1000,
        })

        return response.choices[0].message.content

Agent鏈路追蹤

def trace_agent_execution(user_query: str, max_iterations: int = 5):
    with tracer.start_as_current_span(
        "agent.execution",
        kind=trace.SpanKind.INTERNAL,
        attributes={
            "agent.type": "react",
            "agent.max_iterations": max_iterations,
            "agent.query": sanitize_text(user_query),
        },
    ) as agent_span:
        messages = [{"role": "user", "content": user_query}]
        iteration = 0

        while iteration < max_iterations:
            iteration += 1

            with tracer.start_as_current_span(
                f"agent.iteration.{iteration}",
                kind=trace.SpanKind.INTERNAL,
            ) as iter_span:
                llm_response = trace_llm_call(
                    model="gpt-4o",
                    messages=messages,
                    temperature=0.0,
                )

                choice = llm_response.choices[0]

                if choice.finish_reason == "stop":
                    agent_span.set_attributes({
                        "agent.iterations": iteration,
                        "agent.final_answer_length": len(choice.message.content),
                    })
                    return choice.message.content

                if choice.message.tool_calls:
                    for tool_call in choice.message.tool_calls:
                        tool_result = trace_tool_call(tool_call, iter_span)
                        messages.append({
                            "role": "tool",
                            "tool_call_id": tool_call.id,
                            "content": str(tool_result),
                        })

                iter_span.set_attribute("agent.iteration.tools_called", len(
                    choice.message.tool_calls or []
                ))

                messages.append(choice.message.model_dump())

        agent_span.set_attribute("agent.status", "max_iterations_reached")
        return "Agent reached maximum iterations without a final answer."

多模型鏈路視覺化

Trace: rag-agent-pipeline-trace-abc123

┌─ rag.pipeline (1280ms)
│  │
│  ├─ gen_ai.client.embedding (45ms)
│  │    model=text-embedding-3-large
│  │    tokens=23
│  │
│  ├─ vector.search (32ms)
│  │    db=pgvector, results=10
│  │
│  ├─ gen_ai.client.rerank (180ms)
│  │    model=rerank-v3.5
│  │    documents=10 → top_n=3
│  │
│  ├─ agent.execution (1023ms)
│  │  │
│  │  ├─ agent.iteration.1 (520ms)
│  │  │    ├─ gen_ai.client.chat (480ms)
│  │  │    │    model=gpt-4o, tokens=1523/847
│  │  │    └─ tool.search_database (40ms)
│  │  │
│  │  ├─ agent.iteration.2 (503ms)
│  │  │    ├─ gen_ai.client.chat (460ms)
│  │  │    │    model=gpt-4o, tokens=2100/620
│  │  │    └─ tool.calculate (43ms)
│  │  │
│  │  └─ agent.iteration.3 (final, 0ms)
│  │       final_answer_length=342
│  │
│  └─ Total: 3 LLM calls, 2 tool calls, 4247 input tokens, 1467 output tokens

Pattern 5: 錯誤監控與異常檢測

LLM專用異常分類

from enum import Enum
from dataclasses import dataclass

class LLMErrorType(Enum):
    RATE_LIMIT = "rate_limit"
    CONTEXT_LENGTH_EXCEEDED = "context_length_exceeded"
    INVALID_API_KEY = "invalid_api_key"
    MODEL_NOT_FOUND = "model_not_found"
    TIMEOUT = "timeout"
    CONTENT_FILTER = "content_filter"
    HALLUCINATION = "hallucination"
    EMPTY_RESPONSE = "empty_response"
    BUDGET_EXCEEDED = "budget_exceeded"
    SERVICE_UNAVAILABLE = "service_unavailable"

def classify_llm_error(error: Exception) -> LLMErrorType:
    error_str = str(error).lower()
    if "rate_limit" in error_str or "429" in error_str:
        return LLMErrorType.RATE_LIMIT
    elif "context_length" in error_str or "maximum context" in error_str:
        return LLMErrorType.CONTEXT_LENGTH_EXCEEDED
    elif "invalid_api_key" in error_str or "401" in error_str:
        return LLMErrorType.INVALID_API_KEY
    elif "model_not_found" in error_str or "404" in error_str:
        return LLMErrorType.MODEL_NOT_FOUND
    elif "timeout" in error_str:
        return LLMErrorType.TIMEOUT
    elif "content_filter" in error_str or "content_policy" in error_str:
        return LLMErrorType.CONTENT_FILTER
    else:
        return LLMErrorType.SERVICE_UNAVAILABLE

錯誤追蹤與重試

import time
from functools import wraps

error_counter = meter.create_counter(
    name="llm.errors",
    description="LLM call errors by type",
    unit="errors",
)

def llm_retry_with_tracing(max_retries: int = 3, base_delay: float = 1.0):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_error = None
            for attempt in range(max_retries + 1):
                with tracer.start_as_current_span(
                    f"llm.call.attempt.{attempt}",
                    kind=trace.SpanKind.CLIENT,
                ) as attempt_span:
                    attempt_span.set_attributes({
                        "llm.retry.attempt": attempt,
                        "llm.retry.max_retries": max_retries,
                    })

                    try:
                        result = func(*args, **kwargs)
                        if attempt > 0:
                            attempt_span.set_attribute("llm.retry.succeeded", True)
                        return result

                    except Exception as e:
                        error_type = classify_llm_error(e)
                        is_retryable = error_type in {
                            LLMErrorType.RATE_LIMIT,
                            LLMErrorType.TIMEOUT,
                            LLMErrorType.SERVICE_UNAVAILABLE,
                        }

                        error_counter.add(1, {
                            "error.type": error_type.value,
                            "gen_ai.request.model": kwargs.get("model", "unknown"),
                        })

                        attempt_span.set_attributes({
                            "error.type": error_type.value,
                            "error.message": str(e)[:200],
                            "error.retryable": is_retryable,
                        })
                        attempt_span.record_exception(e)
                        attempt_span.set_status(
                            trace.Status(trace.StatusCode.ERROR, str(e))
                        )

                        if not is_retryable or attempt == max_retries:
                            raise

                        delay = base_delay * (2 ** attempt)
                        time.sleep(delay)
                        last_error = e

            raise last_error
        return wrapper
    return decorator

幻覺檢測Span

def trace_hallucination_check(
    query: str, answer: str, context: str, model: str = "gpt-4o-mini"
):
    with tracer.start_as_current_span(
        "llm.hallucination_check",
        kind=trace.SpanKind.INTERNAL,
        attributes={
            "llm.check.type": "hallucination",
            "llm.check.model": model,
        },
    ) as span:
        check_prompt = f"""請判斷以下回答是否基於給定的上下文。如果回答包含上下文中不存在的資訊,標記為幻覺。

上下文:
{context}

回答:
{answer}

請以JSON格式回傳:{{"is_hallucination": true/false, "confidence": 0.0-1.0, "reason": "..."}}"""

        start_time = time.time()
        response = call_openai_api(model, [
            {"role": "user", "content": check_prompt}
        ], temperature=0.0)

        import json
        result = json.loads(response.choices[0].message.content)

        span.set_attributes({
            "llm.hallucination.detected": result.get("is_hallucination", False),
            "llm.hallucination.confidence": result.get("confidence", 0.0),
            "llm.hallucination.reason": result.get("reason", "")[:200],
            "llm.duration_ms": (time.time() - start_time) * 1000,
        })

        return result

Pattern 6: 智慧告警與SLA保障

SLA指標定義

from dataclasses import dataclass

@dataclass
class LLMSLA:
    p50_latency_ms: float = 2000.0
    p95_latency_ms: float = 5000.0
    p99_latency_ms: float = 10000.0
    ttft_p95_ms: float = 1000.0
    error_rate_threshold: float = 0.01
    daily_budget_usd: float = 200.0
    hallucination_rate_threshold: float = 0.05
    min_success_rate: float = 0.99

sla = LLMSLA()

智慧告警規則

from collections import deque
import threading

class LLMAlertManager:
    def __init__(self, sla_config: LLMSLA):
        self.sla = sla_config
        self.recent_latencies = deque(maxlen=1000)
        self.recent_errors = deque(maxlen=1000)
        self.recent_ttft = deque(maxlen=1000)
        self.lock = threading.Lock()
        self.alert_callbacks = []

    def add_alert_callback(self, callback):
        self.alert_callbacks.append(callback)

    def record_latency(self, latency_ms: float, ttft_ms: float = 0, is_error: bool = False):
        with self.lock:
            self.recent_latencies.append(latency_ms)
            self.recent_ttft.append(ttft_ms)
            self.recent_errors.append(1 if is_error else 0)

        self._check_alerts(latency_ms, ttft_ms, is_error)

    def _check_alerts(self, latency_ms: float, ttft_ms: float, is_error: bool):
        alerts = []

        if latency_ms > self.sla.p99_latency_ms:
            alerts.append({
                "level": "CRITICAL",
                "type": "latency_p99_breach",
                "message": f"LLM延遲 {latency_ms:.0f}ms 超過P99 SLA {self.sla.p99_latency_ms:.0f}ms",
                "value": latency_ms,
                "threshold": self.sla.p99_latency_ms,
            })

        if ttft_ms > self.sla.ttft_p95_ms:
            alerts.append({
                "level": "WARNING",
                "type": "ttft_breach",
                "message": f"TTFT {ttft_ms:.0f}ms 超過P95 SLA {self.sla.ttft_p95_ms:.0f}ms",
                "value": ttft_ms,
                "threshold": self.sla.ttft_p95_ms,
            })

        with self.lock:
            if len(self.recent_errors) >= 100:
                error_rate = sum(self.recent_errors) / len(self.recent_errors)
                if error_rate > self.sla.error_rate_threshold:
                    alerts.append({
                        "level": "CRITICAL",
                        "type": "error_rate_breach",
                        "message": f"錯誤率 {error_rate:.2%} 超過閾值 {self.sla.error_rate_threshold:.2%}",
                        "value": error_rate,
                        "threshold": self.sla.error_rate_threshold,
                    })

        for alert in alerts:
            self._fire_alert(alert)

    def _fire_alert(self, alert: dict):
        with tracer.start_as_current_span(
            "llm.alert",
            kind=trace.SpanKind.INTERNAL,
            attributes={
                "alert.level": alert["level"],
                "alert.type": alert["type"],
                "alert.message": alert["message"],
            },
        ):
            for callback in self.alert_callbacks:
                try:
                    callback(alert)
                except Exception:
                    pass

alert_manager = LLMAlertManager(sla)

Prometheus告警規則

groups:
  - name: llm_sla_alerts
    rules:
      - alert: LLMLatencyP99Breach
        expr: histogram_quantile(0.99, rate(llm_request_latency_bucket[5m])) > 10000
        for: 2m
        labels:
          severity: critical
          team: ai-platform
        annotations:
          summary: "LLM P99延遲超過SLA閾值"

      - alert: LLMErrorRateHigh
        expr: rate(llm_errors_total[5m]) / rate(llm_calls_total[5m]) > 0.01
        for: 3m
        labels:
          severity: critical
        annotations:
          summary: "LLM錯誤率超過1%"

      - alert: LLMTokenBudgetApproaching
        expr: llm_cost_usd_total > 180
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "LLM日消費接近預算上限"

5個常見坑及解決方案

坑1:Span屬性過多導致採樣丟失

核心指標用Span屬性,長文字用Span Event,超過1KB的內容截斷或雜湊。

坑2:Token計數不準

始終使用API回傳的usage欄位,不要用字元數估算Token。

坑3:忽略Context Propagation

跨服務呼叫時,使用propagate注入和提取Trace Context。

坑4:串流回應不記錄TTFT

串流回應必須記錄TTFT,這是使用者體驗的核心指標。

坑5:採樣策略不當

LLM Span必須100%採樣,非LLM Span可以按比例採樣。


10個常見報錯排查

# 報錯資訊 原因 解決方案
1 StatusCode.UNAVAILABLE Collector未啟動 檢查localhost:4317是否可達
2 context_length_exceeded Prompt超過上下文視窗 使用tiktoken預計算Token數
3 rate_limit_exceeded 429 API呼叫頻率超限 實作指數退避重試
4 Span is not being recorded TracerProvider未配置 確認set_tracer_provider()已呼叫
5 Baggage not propagated 跨服務未傳播上下文 使用propagate.inject()/extract()
6 Metric reader timeout Metric匯出超時 增大export_timeout_millis
7 content_filter_triggered Prompt觸發安全過濾 新增內容預檢
8 Empty response from LLM 模型回傳空內容 檢查finish_reason,新增重試邏輯
9 Memory leak in span processor BatchSpanProcessor積壓 調整max_queue_size
10 Duplicate span attributes 同一屬性重複設定 使用set_attribute()而非add_event()

進階最佳化技巧

自動埋點Instrumentation

from opentelemetry.instrumentation.openai import OpenAIInstrumentor

OpenAIInstrumentor().instrument()

from opentelemetry.instrumentation.requests import RequestsInstrumentor
RequestsInstrumentor().instrument()

自定義Propagator

from opentelemetry.propagate import composite, global_set_text_map_propagator
from opentelemetry.trace.propagation.tracecontext import TraceContextPropagator
from opentelemetry.baggage.propagation import W3CBaggagePropagator

global_set_text_map_propagator(
    composite.CompositePropagator([
        TraceContextPropagator(),
        W3CBaggagePropagator(),
    ])
)

動態採樣策略

from opentelemetry.sdk.trace.sampling import Sampler, SamplingResult, Decision

class AdaptiveLLMSampler(Sampler):
    def __init__(self, base_rate: float = 0.1):
        self.base_rate = base_rate

    def should_sample(self, parent_context, trace_id, name, kind,
                      attributes, links, trace_state):
        if attributes and attributes.get("gen_ai.system"):
            return SamplingResult(
                decision=Decision.RECORD_AND_SAMPLE,
                attributes=attributes,
                trace_state=trace_state,
            )

        should_sample = (hash(trace_id) % 10000) < (self.base_rate * 10000)
        return SamplingResult(
            decision=Decision.RECORD_AND_SAMPLE if should_sample else Decision.DROP,
            attributes=attributes,
            trace_state=trace_state,
        )

    def get_description(self):
        return f"AdaptiveLLMSampler(base={self.base_rate})"

對比分析:OpenTelemetry vs LangSmith vs Promptflow

維度 OpenTelemetry LangSmith Promptflow
開源 是(CNCF專案) 否(SaaS) 是(微軟)
廠商鎖定 高(LangChain生態) 中(Azure生態)
鏈路追蹤 原生支援,標準協議 自有Trace系統 自有Trace系統
Token追蹤 需手動埋點 自動 自動
成本監控 需自定義Metric 內建 內建
多語言支援 11+語言 Python/JS Python
與現有APM整合 原生(Jaeger/Tempo/Grafana) 需匯出 需匯出
自定義能力 極強 中等 中等
學習曲線 陡峭 平緩 中等
生產就緒度 高(CNCF畢業專案) 中等
資料主權 完全自控 資料在LangSmith 資料在Azure

選擇建議

  • 已有多套APM基礎設施 → OpenTelemetry(統一可觀測性堆疊)
  • LangChain重度使用者 → LangSmith(開箱即用)
  • Azure生態 → Promptflow(與Azure AI Studio深度整合)

線上工具推薦


總結

Python OpenTelemetry LLM鏈路追蹤是構建生產級AI應用可觀測性的基石。6種生產模式涵蓋了從基礎Span埋點到智慧告警的完整鏈路:

  1. LLM Span埋點 — 使用gen_ai.*語意約定標準化屬性標註
  2. Token追蹤 — 即時監控用量與成本,防止預算超支
  3. 結構化日誌 — 脫敏記錄Prompt/Response,兼顧除錯與合規
  4. 多模型鏈路 — 追蹤RAG、Agent等複雜呼叫鏈
  5. 錯誤監控 — 分類異常、自動重試、幻覺檢測
  6. 智慧告警 — SLA保障、動態採樣、多級告警

關鍵原則:LLM Span必須100%採樣,Token用量必須來自API回傳值,Prompt內容必須脫敏,TTFT是使用者體驗的核心指標。

相關閱讀

參考資源

本站提供瀏覽器本地工具,免註冊即可試用 →

#Python#OpenTelemetry#LLM#链路追踪#可观测性#监控#2026#AI与大数据