Python OpenTelemetry LLM Tracing: 6 Production Patterns from Spans to Intelligent Alerting

AI与大数据

Is Your LLM Application a Black Box?

In 2026, LLM applications are deep in production. But most teams face the same problem: LLM call chains are invisible. A single user request might pass through prompt construction, embedding retrieval, multi-turn dialogue, model inference, and post-processing — when anything goes wrong, you're left guessing.

Traditional APM tools can't understand LLM-specific semantics: token usage, time-to-first-token, prompt leakage, hallucination detection. OpenTelemetry, with its Semantic Conventions and extensible Span attributes, is the best choice for LLM tracing.

Key Takeaways:

  • LLM Span instrumentation and attribute annotation patterns
  • Token usage tracking and real-time cost monitoring
  • Prompt/Response structured logging with sanitization
  • Multi-model chain tracing (RAG + Agent + Tool)
  • Error monitoring and anomaly detection
  • Intelligent alerting and SLA guarantee systems

Table of Contents

  • OpenTelemetry LLM Core Concepts
  • Pattern 1: LLM Span Instrumentation & Attribute Annotation
  • Pattern 2: Token Usage Tracking & Cost Monitoring
  • Pattern 3: Prompt/Response Structured Logging
  • Pattern 4: Multi-Model Chain Tracing
  • Pattern 5: Error Monitoring & Anomaly Detection
  • Pattern 6: Intelligent Alerting & SLA Guarantees
  • 5 Common Pitfalls & Solutions
  • 10 Common Error Troubleshooting
  • Advanced Optimization Tips
  • Comparison: OpenTelemetry vs LangSmith vs Promptflow
  • Recommended Online Tools

OpenTelemetry LLM Core Concepts

Why OpenTelemetry for LLM Tracing

┌─────────────────────────────────────────────────────────┐
│              LLM Application Tracing Architecture        │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  User Request ──► API Gateway ──► LLM Service           │
│                                    │                    │
│                      ┌─────────────┼─────────────┐      │
│                      ▼             ▼             ▼      │
│                Embedding      Chat Model    Tool Call   │
│                Service        (GPT-4o)      Service     │
│                      │             │             │      │
│                      ▼             ▼             ▼      │
│                Vector DB      Reranker      External API │
│                                                         │
│  Each hop = 1 Span with LLM semantic attributes         │
│  Full chain = 1 Trace, end-to-end traceable             │
│                                                         │
│  ──► OTel Collector ──► Jaeger/Tempo (Trace)            │
│                      ──► Prometheus (Metrics)            │
│                      ──► Loki/ELK (Logs)                 │
└─────────────────────────────────────────────────────────┘

OpenTelemetry provides three core capabilities for LLM tracing:

Capability Description LLM Value
Trace Cross-service chain tracing Track full call chain from user request to LLM response
Span Single operation recording Record model, parameters, token usage per LLM call
Metric Metric collection Real-time monitoring of token consumption, latency distribution, error rate
Baggage Cross-service metadata propagation Propagate user ID, session ID, A/B experiment labels

OpenTelemetry LLM Semantic Conventions (2026)

Span Kind: CLIENT
Span Name:  gen_ai.client.chat

Standard Attributes:
  gen_ai.system              = "openai"        # LLM provider
  gen_ai.request.model       = "gpt-4o"        # Requested model
  gen_ai.request.max_tokens  = 4096            # Max output tokens
  gen_ai.request.temperature = 0.7             # Temperature parameter
  gen_ai.response.model      = "gpt-4o-2026-05-13"  # Actual model
  gen_ai.response.finish_reasons = ["stop"]    # Finish reasons
  gen_ai.usage.input_tokens  = 1523            # Input tokens
  gen_ai.usage.output_tokens = 847             # Output tokens

Events (Span Events):
  gen_ai.content.prompt     = "..."            # Prompt content
  gen_ai.content.completion = "..."            # Response content

Pattern 1: LLM Span Instrumentation & Attribute Annotation

Basic Span Creation

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource

resource = Resource.create({
    "service.name": "llm-chat-service",
    "service.version": "2.1.0",
    "deployment.environment": "production",
})

tracer_provider = TracerProvider(resource=resource)
tracer_provider.add_span_processor(
    BatchSpanProcessor(
        OTLPSpanExporter(endpoint="http://localhost:4317", insecure=True)
    )
)
trace.set_tracer_provider(tracer_provider)
tracer = trace.get_tracer("llm-chat-service", "2.1.0")

Standard LLM Span Instrumentation

import time
from opentelemetry import trace

tracer = trace.get_tracer(__name__)

def trace_llm_call(model: str, prompt: str, **kwargs):
    with tracer.start_as_current_span(
        "gen_ai.client.chat",
        kind=trace.SpanKind.CLIENT,
        attributes={
            "gen_ai.system": "openai",
            "gen_ai.request.model": model,
            "gen_ai.request.max_tokens": kwargs.get("max_tokens", 4096),
            "gen_ai.request.temperature": kwargs.get("temperature", 0.7),
        },
    ) as span:
        start_time = time.time()

        try:
            response = call_openai_api(model, prompt, **kwargs)

            span.set_attributes({
                "gen_ai.response.model": response.model,
                "gen_ai.response.finish_reasons": [
                    choice.finish_reason for choice in response.choices
                ],
                "gen_ai.usage.input_tokens": response.usage.prompt_tokens,
                "gen_ai.usage.output_tokens": response.usage.completion_tokens,
                "llm.time_to_first_token_ms": kwargs.get("ttft_ms", 0),
                "llm.total_duration_ms": (time.time() - start_time) * 1000,
            })

            span.add_event(
                "gen_ai.content.completion",
                attributes={
                    "gen_ai.content": response.choices[0].message.content[:500],
                },
            )

            return response

        except Exception as e:
            span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
            span.record_exception(e)
            raise

Span with Event Annotations

def trace_llm_with_events(model: str, messages: list, **kwargs):
    with tracer.start_as_current_span(
        "gen_ai.client.chat",
        kind=trace.SpanKind.CLIENT,
        attributes={
            "gen_ai.system": "openai",
            "gen_ai.request.model": model,
            "gen_ai.request.max_tokens": kwargs.get("max_tokens", 4096),
        },
    ) as span:
        for i, msg in enumerate(messages):
            span.add_event(
                f"gen_ai.content.prompt.{i}",
                attributes={
                    "gen_ai.content.role": msg["role"],
                    "gen_ai.content": msg["content"][:1000],
                },
            )

        start_time = time.time()
        first_token_time = None

        response = call_openai_streaming(model, messages, **kwargs)

        chunks = []
        for chunk in response:
            if first_token_time is None:
                first_token_time = time.time()
                ttft_ms = (first_token_time - start_time) * 1000
                span.set_attribute("llm.time_to_first_token_ms", ttft_ms)
            chunks.append(chunk)

        full_content = "".join(chunks)
        span.set_attributes({
            "gen_ai.usage.output_tokens": len(full_content) // 4,
            "llm.total_duration_ms": (time.time() - start_time) * 1000,
        })

        span.add_event(
            "gen_ai.content.completion",
            attributes={"gen_ai.content": full_content[:500]},
        )

        return full_content

Pattern 2: Token Usage Tracking & Cost Monitoring

Token Counter & Cost Calculation

from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter

metric_reader = PeriodicExportingMetricReader(
    OTLPMetricExporter(endpoint="http://localhost:4317", insecure=True),
    export_interval_millis=60000,
)
meter_provider = MeterProvider(metric_readers=[metric_reader])
metrics.set_meter_provider(meter_provider)

meter = metrics.get_meter("llm-chat-service", "2.1.0")

token_counter = meter.create_counter(
    name="gen_ai.client.token.usage",
    description="Token usage counter for LLM calls",
    unit="tokens",
)

cost_counter = meter.create_counter(
    name="llm.cost.usd",
    description="LLM API cost in USD",
    unit="USD",
)

llm_duration = meter.create_histogram(
    name="llm.request.duration",
    description="LLM request duration in milliseconds",
    unit="ms",
)

MODEL_PRICING = {
    "gpt-4o": {"input": 2.50 / 1_000_000, "output": 10.00 / 1_000_000},
    "gpt-4o-mini": {"input": 0.15 / 1_000_000, "output": 0.60 / 1_000_000},
    "claude-sonnet-4-20250514": {"input": 3.00 / 1_000_000, "output": 15.00 / 1_000_000},
    "deepseek-r1": {"input": 0.55 / 1_000_000, "output": 2.19 / 1_000_000},
}

def record_token_usage(model: str, input_tokens: int, output_tokens: int):
    token_counter.add(input_tokens, {
        "gen_ai.system": "openai",
        "gen_ai.request.model": model,
        "token.type": "input",
    })
    token_counter.add(output_tokens, {
        "gen_ai.system": "openai",
        "gen_ai.request.model": model,
        "token.type": "output",
    })

    pricing = MODEL_PRICING.get(model, {"input": 0, "output": 0})
    cost = input_tokens * pricing["input"] + output_tokens * pricing["output"]
    cost_counter.add(cost, {
        "gen_ai.request.model": model,
        "cost.type": "api_call",
    })

Cost Quota & Budget Control

import threading
from datetime import datetime
from collections import defaultdict

class TokenBudgetManager:
    def __init__(self, daily_budget_usd: float = 100.0):
        self.daily_budget = daily_budget_usd
        self.current_spend = defaultdict(float)
        self.lock = threading.Lock()
        self._reset_date = datetime.now().date()

    def check_and_record(self, model: str, input_tokens: int, output_tokens: int) -> bool:
        with self.lock:
            today = datetime.now().date()
            if today != self._reset_date:
                self.current_spend.clear()
                self._reset_date = today

            pricing = MODEL_PRICING.get(model, {"input": 0, "output": 0})
            cost = input_tokens * pricing["input"] + output_tokens * pricing["output"]

            total_spend = sum(self.current_spend.values()) + cost
            if total_spend > self.daily_budget:
                return False

            self.current_spend[model] += cost
            return True

    def get_remaining_budget(self) -> float:
        with self.lock:
            return self.daily_budget - sum(self.current_spend.values())

budget_manager = TokenBudgetManager(daily_budget_usd=200.0)

LLM Call with Budget Check

def call_llm_with_budget(model: str, messages: list, **kwargs):
    with tracer.start_as_current_span(
        "gen_ai.client.chat",
        kind=trace.SpanKind.CLIENT,
        attributes={
            "gen_ai.system": "openai",
            "gen_ai.request.model": model,
        },
    ) as span:
        response = call_openai_api(model, messages, **kwargs)

        input_tokens = response.usage.prompt_tokens
        output_tokens = response.usage.completion_tokens

        allowed = budget_manager.check_and_record(model, input_tokens, output_tokens)
        if not allowed:
            span.set_attribute("llm.budget.exceeded", True)
            raise BudgetExceededError(
                f"Daily budget exceeded. Remaining: ${budget_manager.get_remaining_budget():.2f}"
            )

        record_token_usage(model, input_tokens, output_tokens)

        span.set_attributes({
            "gen_ai.usage.input_tokens": input_tokens,
            "gen_ai.usage.output_tokens": output_tokens,
            "llm.cost.usd": calculate_cost(model, input_tokens, output_tokens),
            "llm.budget.remaining_usd": budget_manager.get_remaining_budget(),
        })

        return response

Pattern 3: Prompt/Response Structured Logging

Sanitization & Structured Recording

import re
import json
from dataclasses import dataclass, field, asdict

@dataclass
class LLMCallLog:
    trace_id: str
    span_id: str
    model: str
    prompt_hash: str
    prompt_length: int
    response_hash: str
    response_length: int
    input_tokens: int
    output_tokens: int
    duration_ms: float
    ttft_ms: float = 0.0
    finish_reason: str = ""
    sanitized_prompt: str = ""
    sanitized_response: str = ""
    metadata: dict = field(default_factory=dict)

PII_PATTERNS = [
    (re.compile(r'\b\d{3}[-.]?\d{4}\b'), "[PHONE]"),
    (re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'), "[EMAIL]"),
    (re.compile(r'\b\d{17}[\dXx]\b'), "[ID_CARD]"),
    (re.compile(r'\b(?:\d[ -]*?){13,19}\b'), "[CREDIT_CARD]"),
    (re.compile(r'api[_-]?key[=:]\s*\S+', re.IGNORECASE), "[API_KEY]"),
]

def sanitize_text(text: str, max_length: int = 500) -> str:
    sanitized = text[:max_length]
    for pattern, replacement in PII_PATTERNS:
        sanitized = pattern.sub(replacement, sanitized)
    return sanitized

def hash_content(content: str) -> str:
    import hashlib
    return hashlib.sha256(content.encode()).hexdigest()[:16]

Structured Logger

import logging
from datetime import datetime, timezone

class LLMStructuredLogger:
    def __init__(self, service_name: str = "llm-chat-service"):
        self.logger = logging.getLogger(f"{service_name}.llm_calls")
        self.sanitize_enabled = True
        self.max_content_length = 500

    def log_llm_call(self, span, model: str, prompt: str, response: str,
                     input_tokens: int, output_tokens: int,
                     duration_ms: float, ttft_ms: float = 0,
                     finish_reason: str = "", **metadata):
        log_entry = LLMCallLog(
            trace_id=format(span.get_span_context().trace_id, "032x"),
            span_id=format(span.get_span_context().span_id, "016x"),
            model=model,
            prompt_hash=hash_content(prompt),
            prompt_length=len(prompt),
            response_hash=hash_content(response),
            response_length=len(response),
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            duration_ms=round(duration_ms, 2),
            ttft_ms=round(ttft_ms, 2),
            finish_reason=finish_reason,
            sanitized_prompt=sanitize_text(prompt, self.max_content_length) if self.sanitize_enabled else "",
            sanitized_response=sanitize_text(response, self.max_content_length) if self.sanitize_enabled else "",
            metadata=metadata,
        )

        self.logger.info(
            "LLM call completed",
            extra={"llm_call": asdict(log_entry)},
        )

        span.add_event("gen_ai.content.prompt", attributes={
            "gen_ai.content": sanitize_text(prompt, 1000),
            "gen_ai.content.hash": hash_content(prompt),
        })
        span.add_event("gen_ai.content.completion", attributes={
            "gen_ai.content": sanitize_text(response, 1000),
            "gen_ai.content.hash": hash_content(response),
        })

llm_logger = LLMStructuredLogger()

Complete LLM Call with Logging

def call_llm_with_logging(model: str, messages: list, **kwargs):
    with tracer.start_as_current_span(
        "gen_ai.client.chat",
        kind=trace.SpanKind.CLIENT,
        attributes={
            "gen_ai.system": "openai",
            "gen_ai.request.model": model,
            "gen_ai.request.temperature": kwargs.get("temperature", 0.7),
        },
    ) as span:
        start_time = time.time()
        prompt_text = json.dumps(messages, ensure_ascii=False)

        try:
            response = call_openai_api(model, messages, **kwargs)

            content = response.choices[0].message.content
            finish_reason = response.choices[0].finish_reason
            input_tokens = response.usage.prompt_tokens
            output_tokens = response.usage.completion_tokens
            duration_ms = (time.time() - start_time) * 1000

            llm_logger.log_llm_call(
                span=span,
                model=response.model,
                prompt=prompt_text,
                response=content,
                input_tokens=input_tokens,
                output_tokens=output_tokens,
                duration_ms=duration_ms,
                finish_reason=finish_reason,
                user_id=kwargs.get("user_id", "anonymous"),
                session_id=kwargs.get("session_id", ""),
            )

            span.set_attributes({
                "gen_ai.usage.input_tokens": input_tokens,
                "gen_ai.usage.output_tokens": output_tokens,
                "gen_ai.response.finish_reasons": [finish_reason],
            })

            return response

        except Exception as e:
            duration_ms = (time.time() - start_time) * 1000
            llm_logger.log_llm_call(
                span=span, model=model, prompt=prompt_text,
                response="", input_tokens=0, output_tokens=0,
                duration_ms=duration_ms, error=str(e),
            )
            span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
            span.record_exception(e)
            raise

Pattern 4: Multi-Model Chain Tracing

RAG Pipeline Tracing

def trace_rag_pipeline(query: str, user_id: str = ""):
    with tracer.start_as_current_span(
        "rag.pipeline",
        kind=trace.SpanKind.INTERNAL,
        attributes={
            "rag.query": sanitize_text(query),
            "rag.user_id": user_id,
        },
    ) as pipeline_span:
        embedding = trace_embedding(query, pipeline_span)
        documents = trace_vector_search(embedding, pipeline_span)
        context = trace_reranker(query, documents, pipeline_span)
        answer = trace_llm_generation(query, context, pipeline_span)

        pipeline_span.set_attributes({
            "rag.documents_retrieved": len(documents),
            "rag.context_length": len(context),
            "rag.answer_length": len(answer),
        })

        return answer

def trace_embedding(query: str, parent_span):
    with tracer.start_as_current_span(
        "gen_ai.client.embedding",
        kind=trace.SpanKind.CLIENT,
        attributes={
            "gen_ai.system": "openai",
            "gen_ai.request.model": "text-embedding-3-large",
            "gen_ai.request.encoding_format": "float",
        },
    ) as span:
        start_time = time.time()
        response = call_embedding_api(query)

        span.set_attributes({
            "gen_ai.usage.input_tokens": response.usage.prompt_tokens,
            "llm.duration_ms": (time.time() - start_time) * 1000,
        })

        return response.data[0].embedding

def trace_vector_search(embedding: list, parent_span):
    with tracer.start_as_current_span(
        "vector.search",
        kind=trace.SpanKind.CLIENT,
        attributes={
            "db.system": "pgvector",
            "db.operation": "similarity_search",
            "db.vector.dimension": len(embedding),
        },
    ) as span:
        start_time = time.time()
        results = search_pgvector(embedding, top_k=10)

        span.set_attributes({
            "db.results.count": len(results),
            "db.results.top_score": results[0].score if results else 0,
            "llm.duration_ms": (time.time() - start_time) * 1000,
        })

        for i, doc in enumerate(results[:3]):
            span.add_event(f"vector.search.result.{i}", attributes={
                "document.id": doc.id,
                "document.score": doc.score,
                "document.content_preview": doc.content[:200],
            })

        return results

def trace_reranker(query: str, documents: list, parent_span):
    with tracer.start_as_current_span(
        "gen_ai.client.rerank",
        kind=trace.SpanKind.CLIENT,
        attributes={
            "gen_ai.system": "cohere",
            "gen_ai.request.model": "rerank-v3.5",
            "rerank.documents_count": len(documents),
        },
    ) as span:
        start_time = time.time()
        reranked = call_reranker_api(query, documents, top_n=3)

        span.set_attributes({
            "rerank.results_count": len(reranked),
            "llm.duration_ms": (time.time() - start_time) * 1000,
        })

        return "\n".join([doc.content for doc in reranked])

def trace_llm_generation(query: str, context: str, parent_span):
    with tracer.start_as_current_span(
        "gen_ai.client.chat",
        kind=trace.SpanKind.CLIENT,
        attributes={
            "gen_ai.system": "openai",
            "gen_ai.request.model": "gpt-4o",
        },
    ) as span:
        start_time = time.time()
        messages = [
            {"role": "system", "content": f"Answer based on this context:\n{context}"},
            {"role": "user", "content": query},
        ]
        response = call_openai_api("gpt-4o", messages)

        span.set_attributes({
            "gen_ai.usage.input_tokens": response.usage.prompt_tokens,
            "gen_ai.usage.output_tokens": response.usage.completion_tokens,
            "llm.duration_ms": (time.time() - start_time) * 1000,
        })

        return response.choices[0].message.content

Agent Chain Tracing

def trace_agent_execution(user_query: str, max_iterations: int = 5):
    with tracer.start_as_current_span(
        "agent.execution",
        kind=trace.SpanKind.INTERNAL,
        attributes={
            "agent.type": "react",
            "agent.max_iterations": max_iterations,
            "agent.query": sanitize_text(user_query),
        },
    ) as agent_span:
        messages = [{"role": "user", "content": user_query}]
        iteration = 0

        while iteration < max_iterations:
            iteration += 1

            with tracer.start_as_current_span(
                f"agent.iteration.{iteration}",
                kind=trace.SpanKind.INTERNAL,
            ) as iter_span:
                llm_response = trace_llm_call(
                    model="gpt-4o",
                    messages=messages,
                    temperature=0.0,
                )

                choice = llm_response.choices[0]

                if choice.finish_reason == "stop":
                    agent_span.set_attributes({
                        "agent.iterations": iteration,
                        "agent.final_answer_length": len(choice.message.content),
                    })
                    return choice.message.content

                if choice.message.tool_calls:
                    for tool_call in choice.message.tool_calls:
                        tool_result = trace_tool_call(tool_call, iter_span)
                        messages.append({
                            "role": "tool",
                            "tool_call_id": tool_call.id,
                            "content": str(tool_result),
                        })

                iter_span.set_attribute("agent.iteration.tools_called", len(
                    choice.message.tool_calls or []
                ))

                messages.append(choice.message.model_dump())

        agent_span.set_attribute("agent.status", "max_iterations_reached")
        return "Agent reached maximum iterations without a final answer."

def trace_tool_call(tool_call, parent_span):
    with tracer.start_as_current_span(
        f"tool.{tool_call.function.name}",
        kind=trace.SpanKind.CLIENT,
        attributes={
            "tool.name": tool_call.function.name,
            "tool.call_id": tool_call.id,
        },
    ) as span:
        start_time = time.time()

        try:
            import json
            arguments = json.loads(tool_call.function.arguments)
            span.set_attribute("tool.arguments_hash", hash_content(
                tool_call.function.arguments
            ))

            result = execute_tool(tool_call.function.name, arguments)

            span.set_attributes({
                "tool.result_length": len(str(result)),
                "llm.duration_ms": (time.time() - start_time) * 1000,
                "tool.status": "success",
            })

            return result

        except Exception as e:
            span.set_attributes({
                "tool.status": "error",
                "tool.error": str(e),
            })
            span.record_exception(e)
            return f"Tool error: {e}"

Multi-Model Chain Visualization

Trace: rag-agent-pipeline-trace-abc123

┌─ rag.pipeline (1280ms)
│  │
│  ├─ gen_ai.client.embedding (45ms)
│  │    model=text-embedding-3-large
│  │    tokens=23
│  │
│  ├─ vector.search (32ms)
│  │    db=pgvector, results=10
│  │
│  ├─ gen_ai.client.rerank (180ms)
│  │    model=rerank-v3.5
│  │    documents=10 → top_n=3
│  │
│  ├─ agent.execution (1023ms)
│  │  │
│  │  ├─ agent.iteration.1 (520ms)
│  │  │    ├─ gen_ai.client.chat (480ms)
│  │  │    │    model=gpt-4o, tokens=1523/847
│  │  │    └─ tool.search_database (40ms)
│  │  │
│  │  ├─ agent.iteration.2 (503ms)
│  │  │    ├─ gen_ai.client.chat (460ms)
│  │  │    │    model=gpt-4o, tokens=2100/620
│  │  │    └─ tool.calculate (43ms)
│  │  │
│  │  └─ agent.iteration.3 (final, 0ms)
│  │       final_answer_length=342
│  │
│  └─ Total: 3 LLM calls, 2 tool calls, 4247 input tokens, 1467 output tokens

Pattern 5: Error Monitoring & Anomaly Detection

LLM-Specific Error Classification

from enum import Enum
from dataclasses import dataclass

class LLMErrorType(Enum):
    RATE_LIMIT = "rate_limit"
    CONTEXT_LENGTH_EXCEEDED = "context_length_exceeded"
    INVALID_API_KEY = "invalid_api_key"
    MODEL_NOT_FOUND = "model_not_found"
    TIMEOUT = "timeout"
    CONTENT_FILTER = "content_filter"
    HALLUCINATION = "hallucination"
    EMPTY_RESPONSE = "empty_response"
    BUDGET_EXCEEDED = "budget_exceeded"
    SERVICE_UNAVAILABLE = "service_unavailable"

@dataclass
class LLMErrorEvent:
    error_type: LLMErrorType
    model: str
    error_message: str
    trace_id: str
    span_id: str
    retry_count: int
    is_retryable: bool

def classify_llm_error(error: Exception) -> LLMErrorType:
    error_str = str(error).lower()
    if "rate_limit" in error_str or "429" in error_str:
        return LLMErrorType.RATE_LIMIT
    elif "context_length" in error_str or "maximum context" in error_str:
        return LLMErrorType.CONTEXT_LENGTH_EXCEEDED
    elif "invalid_api_key" in error_str or "401" in error_str:
        return LLMErrorType.INVALID_API_KEY
    elif "model_not_found" in error_str or "404" in error_str:
        return LLMErrorType.MODEL_NOT_FOUND
    elif "timeout" in error_str:
        return LLMErrorType.TIMEOUT
    elif "content_filter" in error_str or "content_policy" in error_str:
        return LLMErrorType.CONTENT_FILTER
    else:
        return LLMErrorType.SERVICE_UNAVAILABLE

Error Tracing with Retry

import time
from functools import wraps

error_counter = meter.create_counter(
    name="llm.errors",
    description="LLM call errors by type",
    unit="errors",
)

def llm_retry_with_tracing(max_retries: int = 3, base_delay: float = 1.0):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_error = None
            for attempt in range(max_retries + 1):
                with tracer.start_as_current_span(
                    f"llm.call.attempt.{attempt}",
                    kind=trace.SpanKind.CLIENT,
                ) as attempt_span:
                    attempt_span.set_attributes({
                        "llm.retry.attempt": attempt,
                        "llm.retry.max_retries": max_retries,
                    })

                    try:
                        result = func(*args, **kwargs)
                        if attempt > 0:
                            attempt_span.set_attribute("llm.retry.succeeded", True)
                        return result

                    except Exception as e:
                        error_type = classify_llm_error(e)
                        is_retryable = error_type in {
                            LLMErrorType.RATE_LIMIT,
                            LLMErrorType.TIMEOUT,
                            LLMErrorType.SERVICE_UNAVAILABLE,
                        }

                        error_counter.add(1, {
                            "error.type": error_type.value,
                            "gen_ai.request.model": kwargs.get("model", "unknown"),
                        })

                        attempt_span.set_attributes({
                            "error.type": error_type.value,
                            "error.message": str(e)[:200],
                            "error.retryable": is_retryable,
                        })
                        attempt_span.record_exception(e)
                        attempt_span.set_status(
                            trace.Status(trace.StatusCode.ERROR, str(e))
                        )

                        if not is_retryable or attempt == max_retries:
                            raise

                        delay = base_delay * (2 ** attempt)
                        time.sleep(delay)
                        last_error = e

            raise last_error
        return wrapper
    return decorator

Hallucination Detection Span

def trace_hallucination_check(
    query: str, answer: str, context: str, model: str = "gpt-4o-mini"
):
    with tracer.start_as_current_span(
        "llm.hallucination_check",
        kind=trace.SpanKind.INTERNAL,
        attributes={
            "llm.check.type": "hallucination",
            "llm.check.model": model,
        },
    ) as span:
        check_prompt = f"""Determine if the following answer is based on the given context. If the answer contains information not present in the context, flag it as hallucination.

Context:
{context}

Answer:
{answer}

Return in JSON format: {{"is_hallucination": true/false, "confidence": 0.0-1.0, "reason": "..."}}"""

        start_time = time.time()
        response = call_openai_api(model, [
            {"role": "user", "content": check_prompt}
        ], temperature=0.0)

        import json
        result = json.loads(response.choices[0].message.content)

        span.set_attributes({
            "llm.hallucination.detected": result.get("is_hallucination", False),
            "llm.hallucination.confidence": result.get("confidence", 0.0),
            "llm.hallucination.reason": result.get("reason", "")[:200],
            "llm.duration_ms": (time.time() - start_time) * 1000,
        })

        return result

Pattern 6: Intelligent Alerting & SLA Guarantees

SLA Metric Definitions

from dataclasses import dataclass

@dataclass
class LLMSLA:
    p50_latency_ms: float = 2000.0
    p95_latency_ms: float = 5000.0
    p99_latency_ms: float = 10000.0
    ttft_p95_ms: float = 1000.0
    error_rate_threshold: float = 0.01
    daily_budget_usd: float = 200.0
    hallucination_rate_threshold: float = 0.05
    min_success_rate: float = 0.99

sla = LLMSLA()

latency_histogram = meter.create_histogram(
    name="llm.request.latency",
    description="LLM request latency distribution",
    unit="ms",
)

ttft_histogram = meter.create_histogram(
    name="llm.time_to_first_token",
    description="Time to first token distribution",
    unit="ms",
)

Intelligent Alert Rules

from collections import deque
import threading

class LLMAlertManager:
    def __init__(self, sla_config: LLMSLA):
        self.sla = sla_config
        self.recent_latencies = deque(maxlen=1000)
        self.recent_errors = deque(maxlen=1000)
        self.recent_ttft = deque(maxlen=1000)
        self.lock = threading.Lock()
        self.alert_callbacks = []

    def add_alert_callback(self, callback):
        self.alert_callbacks.append(callback)

    def record_latency(self, latency_ms: float, ttft_ms: float = 0, is_error: bool = False):
        with self.lock:
            self.recent_latencies.append(latency_ms)
            self.recent_ttft.append(ttft_ms)
            self.recent_errors.append(1 if is_error else 0)

        self._check_alerts(latency_ms, ttft_ms, is_error)

    def _check_alerts(self, latency_ms: float, ttft_ms: float, is_error: bool):
        alerts = []

        if latency_ms > self.sla.p99_latency_ms:
            alerts.append({
                "level": "CRITICAL",
                "type": "latency_p99_breach",
                "message": f"LLM latency {latency_ms:.0f}ms exceeds P99 SLA {self.sla.p99_latency_ms:.0f}ms",
                "value": latency_ms,
                "threshold": self.sla.p99_latency_ms,
            })

        if ttft_ms > self.sla.ttft_p95_ms:
            alerts.append({
                "level": "WARNING",
                "type": "ttft_breach",
                "message": f"TTFT {ttft_ms:.0f}ms exceeds P95 SLA {self.sla.ttft_p95_ms:.0f}ms",
                "value": ttft_ms,
                "threshold": self.sla.ttft_p95_ms,
            })

        with self.lock:
            if len(self.recent_errors) >= 100:
                error_rate = sum(self.recent_errors) / len(self.recent_errors)
                if error_rate > self.sla.error_rate_threshold:
                    alerts.append({
                        "level": "CRITICAL",
                        "type": "error_rate_breach",
                        "message": f"Error rate {error_rate:.2%} exceeds threshold {self.sla.error_rate_threshold:.2%}",
                        "value": error_rate,
                        "threshold": self.sla.error_rate_threshold,
                    })

        for alert in alerts:
            self._fire_alert(alert)

    def _fire_alert(self, alert: dict):
        with tracer.start_as_current_span(
            "llm.alert",
            kind=trace.SpanKind.INTERNAL,
            attributes={
                "alert.level": alert["level"],
                "alert.type": alert["type"],
                "alert.message": alert["message"],
                "alert.value": str(alert["value"]),
                "alert.threshold": str(alert["threshold"]),
            },
        ):
            for callback in self.alert_callbacks:
                try:
                    callback(alert)
                except Exception:
                    pass

alert_manager = LLMAlertManager(sla)
alert_manager.add_alert_callback(lambda a: print(f"[{a['level']}] {a['message']}"))

Prometheus Alert Rules

groups:
  - name: llm_sla_alerts
    rules:
      - alert: LLMLatencyP99Breach
        expr: histogram_quantile(0.99, rate(llm_request_latency_bucket[5m])) > 10000
        for: 2m
        labels:
          severity: critical
          team: ai-platform
        annotations:
          summary: "LLM P99 latency exceeds SLA threshold"
          description: "Model {{ $labels.gen_ai_request_model }} P99 latency is {{ $value }}ms"

      - alert: LLMErrorRateHigh
        expr: rate(llm_errors_total[5m]) / rate(llm_calls_total[5m]) > 0.01
        for: 3m
        labels:
          severity: critical
          team: ai-platform
        annotations:
          summary: "LLM error rate exceeds 1%"
          description: "Model {{ $labels.gen_ai_request_model }} error rate is {{ $value | humanizePercentage }}"

      - alert: LLMTokenBudgetApproaching
        expr: llm_cost_usd_total > 180
        for: 5m
        labels:
          severity: warning
          team: ai-platform
        annotations:
          summary: "LLM daily spend approaching budget limit"
          description: "Current spend ${{ $value }}, budget limit $200"

      - alert: LLMTTFTHigh
        expr: histogram_quantile(0.95, rate(llm_time_to_first_token_bucket[5m])) > 1000
        for: 3m
        labels:
          severity: warning
          team: ai-platform
        annotations:
          summary: "LLM TTFT P95 exceeds 1 second"
          description: "TTFT P95 = {{ $value }}ms"

5 Common Pitfalls & Solutions

Pitfall 1: Too Many Span Attributes Causing Sampling Loss

BAD_PRACTICE = {
    "gen_ai.content.full_prompt": prompt[:10000],
    "gen_ai.content.full_response": response[:10000],
}

GOOD_PRACTICE = {
    "gen_ai.usage.input_tokens": input_tokens,
    "gen_ai.usage.output_tokens": output_tokens,
}

span.add_event("gen_ai.content.prompt", attributes={
    "gen_ai.content": sanitize_text(prompt, 1000),
})

Solution: Use Span attributes for core metrics, Span Events for long text, truncate or hash content over 1KB.

Pitfall 2: Inaccurate Token Counting

BAD_PRACTICE = {
    "gen_ai.usage.input_tokens": len(prompt) // 4,
}

GOOD_PRACTICE = {
    "gen_ai.usage.input_tokens": response.usage.prompt_tokens,
}

Solution: Always use the usage field from the API response, never estimate tokens from character count.

Pitfall 3: Ignoring Context Propagation

BAD_PRACTICE = None

from opentelemetry import context, baggage

GOOD_PRACTICE = {
    "propagate_context": context.attach(baggage.set_baggage("user.id", user_id)),
}

Solution: Use propagate.inject()/extract() for cross-service context propagation.

Pitfall 4: Not Recording TTFT for Streaming Responses

BAD_PRACTICE = {
    "streaming_without_ttft": "Only recording total latency, ignoring time to first token",
}

GOOD_PRACTICE = """
with tracer.start_as_current_span("gen_ai.client.chat") as span:
    start = time.time()
    first_token_time = None
    for chunk in stream:
        if first_token_time is None:
            first_token_time = time.time()
            span.set_attribute("llm.time_to_first_token_ms",
                             (first_token_time - start) * 1000)
"""

Solution: Streaming responses must record TTFT — it's the core metric for user experience.

Pitfall 5: Improper Sampling Strategy

from opentelemetry.sdk.trace.sampling import TraceIdRatioBased, ParentBased, Sampler, SamplingResult, Decision

BAD_SAMPLER = TraceIdRatioBased(rate=1.0)

class LLMAwareSampler(Sampler):
    def should_sample(self, parent_context, trace_id, name, kind,
                      attributes, links, trace_state):
        if attributes:
            system = attributes.get("gen_ai.system")
            if system:
                return SamplingResult(
                    decision=Decision.RECORD_AND_SAMPLE,
                    attributes=attributes,
                    trace_state=trace_state,
                )
        return SamplingResult(
            decision=Decision.RECORD_AND_SAMPLE if hash(trace_id) % 10 == 0 else Decision.DROP,
            attributes=attributes,
            trace_state=trace_state,
        )

    def get_description(self):
        return "LLMAwareSampler"

Solution: LLM Spans must be 100% sampled; non-LLM Spans can use ratio-based sampling.


10 Common Error Troubleshooting

# Error Message Cause Solution
1 StatusCode.UNAVAILABLE OTLP export failed Collector not running or wrong port Check http://localhost:4317 is reachable
2 context_length_exceeded Prompt exceeds model context window Use tiktoken to pre-calculate tokens, truncate or split
3 rate_limit_exceeded 429 API call rate exceeds limit Implement exponential backoff retry, add token bucket rate limiting
4 Span is not being recorded TracerProvider not configured Confirm trace.set_tracer_provider() has been called
5 Baggage not propagated Context not propagated across services Use propagate.inject()/extract()
6 Metric reader timeout Metric export timeout Increase export_timeout_millis or reduce collection frequency
7 content_filter_triggered Prompt triggered safety filter Add content pre-check, use moderation API
8 Empty response from LLM Model returned empty content Check finish_reason, add retry logic
9 Memory leak in span processor BatchSpanProcessor backlog Adjust max_queue_size and schedule_delay_millis
10 Duplicate span attributes Same attribute set multiple times Use span.set_attribute() not add_event() for metrics

Diagnostic Script

def diagnose_otel_setup():
    from opentelemetry import trace, metrics
    from opentelemetry.sdk.trace import TracerProvider
    from opentelemetry.sdk.metrics import MeterProvider

    issues = []

    provider = trace.get_tracer_provider()
    if not isinstance(provider, TracerProvider):
        issues.append("TracerProvider not configured, may be using default NoOpProvider")
    else:
        if not provider._active_span_processor._span_processors:
            issues.append("TracerProvider has no SpanProcessor configured")

    meter_provider = metrics.get_meter_provider()
    if not isinstance(meter_provider, MeterProvider):
        issues.append("MeterProvider not configured")

    try:
        import opentelemetry.instrumentation.openai
    except ImportError:
        issues.append("opentelemetry-instrumentation-openai not installed")

    try:
        import tiktoken
    except ImportError:
        issues.append("tiktoken not installed, cannot pre-calculate token count")

    if issues:
        print("Diagnosis found issues:")
        for i, issue in enumerate(issues, 1):
            print(f"  {i}. {issue}")
    else:
        print("OpenTelemetry configuration is healthy")

diagnose_otel_setup()

Advanced Optimization Tips

Auto-Instrumentation

from opentelemetry.instrumentation.openai import OpenAIInstrumentor

OpenAIInstrumentor().instrument()

from opentelemetry.instrumentation.requests import RequestsInstrumentor
RequestsInstrumentor().instrument()

from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor
AsyncioInstrumentor().instrument()

Custom Propagator

from opentelemetry.propagate import composite, global_set_text_map_propagator
from opentelemetry.trace.propagation.tracecontext import TraceContextPropagator
from opentelemetry.baggage.propagation import W3CBaggagePropagator

global_set_text_map_propagator(
    composite.CompositePropagator([
        TraceContextPropagator(),
        W3CBaggagePropagator(),
    ])
)

Dynamic Sampling Strategy

from opentelemetry.sdk.trace.sampling import Sampler, SamplingResult, Decision

class AdaptiveLLMSampler(Sampler):
    def __init__(self, base_rate: float = 0.1, error_sample_rate: float = 1.0,
                 slow_threshold_ms: float = 5000.0):
        self.base_rate = base_rate
        self.error_sample_rate = error_sample_rate
        self.slow_threshold_ms = slow_threshold_ms

    def should_sample(self, parent_context, trace_id, name, kind,
                      attributes, links, trace_state):
        if attributes:
            is_llm = attributes.get("gen_ai.system") is not None
            if is_llm:
                return SamplingResult(
                    decision=Decision.RECORD_AND_SAMPLE,
                    attributes=attributes,
                    trace_state=trace_state,
                )

        if parent_context:
            from opentelemetry.trace import get_current_span
            parent_span = get_current_span(parent_context)
            if parent_span.is_recording():
                parent_attrs = parent_span.attributes or {}
                if parent_attrs.get("error.type"):
                    return SamplingResult(
                        decision=Decision.RECORD_AND_SAMPLE,
                        attributes=attributes,
                        trace_state=trace_state,
                    )

        should_sample = (hash(trace_id) % 10000) < (self.base_rate * 10000)
        return SamplingResult(
            decision=Decision.RECORD_AND_SAMPLE if should_sample else Decision.DROP,
            attributes=attributes,
            trace_state=trace_state,
        )

    def get_description(self):
        return f"AdaptiveLLMSampler(base={self.base_rate})"

Grafana Dashboard Configuration

{
  "dashboard": {
    "title": "LLM Observability Dashboard",
    "panels": [
      {
        "title": "Token Usage (Input vs Output)",
        "type": "timeseries",
        "targets": [{
          "expr": "sum(rate(gen_ai_client_token_usage[5m])) by (token_type, gen_ai_request_model)"
        }]
      },
      {
        "title": "LLM Request Latency P50/P95/P99",
        "type": "timeseries",
        "targets": [
          {"expr": "histogram_quantile(0.50, rate(llm_request_latency_bucket[5m]))"},
          {"expr": "histogram_quantile(0.95, rate(llm_request_latency_bucket[5m]))"},
          {"expr": "histogram_quantile(0.99, rate(llm_request_latency_bucket[5m]))"}
        ]
      },
      {
        "title": "Daily Cost by Model",
        "type": "piechart",
        "targets": [{
          "expr": "sum(increase(llm_cost_usd_total[24h])) by (gen_ai_request_model)"
        }]
      },
      {
        "title": "Error Rate by Type",
        "type": "barchart",
        "targets": [{
          "expr": "sum(rate(llm_errors_total[5m])) by (error_type)"
        }]
      }
    ]
  }
}

Comparison: OpenTelemetry vs LangSmith vs Promptflow

Dimension OpenTelemetry LangSmith Promptflow
Open Source Yes (CNCF project) No (SaaS) Yes (Microsoft)
Vendor Lock-in None High (LangChain ecosystem) Medium (Azure ecosystem)
Tracing Native, standard protocol Proprietary trace system Proprietary trace system
Token Tracking Manual instrumentation Automatic Automatic
Prompt Management Not supported Supported Supported
Cost Monitoring Custom metrics required Built-in Built-in
Multi-language Support 11+ languages Python/JS Python
APM Integration Native (Jaeger/Tempo/Grafana) Requires export Requires export
Customization Extremely flexible Moderate Moderate
Learning Curve Steep Gentle Moderate
Production Readiness High (CNCF graduated) High Moderate
Data Sovereignty Full control Data on LangSmith Data on Azure

Selection Guide:

  • Existing APM infrastructure → OpenTelemetry (unified observability stack)
  • Heavy LangChain user → LangSmith (out-of-the-box)
  • Azure ecosystem → Promptflow (deep integration with Azure AI Studio)

When building LLM tracing systems, these online tools can boost your productivity:


Summary

Python OpenTelemetry LLM tracing is the cornerstone of building production-grade AI application observability. The 6 production patterns cover the complete chain from basic Span instrumentation to intelligent alerting:

  1. LLM Span Instrumentation — Use gen_ai.* semantic conventions for standardized attribute annotation
  2. Token Tracking — Real-time usage and cost monitoring to prevent budget overruns
  3. Structured Logging — Sanitized Prompt/Response recording balancing debugging and compliance
  4. Multi-Model Chains — Trace complex call chains like RAG and Agent pipelines
  5. Error Monitoring — Classified exceptions, automatic retries, hallucination detection
  6. Intelligent Alerting — SLA guarantees, dynamic sampling, multi-level alerting

Key Principles: LLM Spans must be 100% sampled, token counts must come from API responses, prompt content must be sanitized, and TTFT is the core metric for user experience.

References

Try these browser-local tools — no sign-up required →

#Python#OpenTelemetry#LLM#链路追踪#可观测性#监控#2026#AI与大数据