Python OpenTelemetry LLM Tracing: 6 Production Patterns from Spans to Intelligent Alerting
Is Your LLM Application a Black Box?
In 2026, LLM applications are deep in production. But most teams face the same problem: LLM call chains are invisible. A single user request might pass through prompt construction, embedding retrieval, multi-turn dialogue, model inference, and post-processing — when anything goes wrong, you're left guessing.
Traditional APM tools can't understand LLM-specific semantics: token usage, time-to-first-token, prompt leakage, hallucination detection. OpenTelemetry, with its Semantic Conventions and extensible Span attributes, is the best choice for LLM tracing.
Key Takeaways:
- LLM Span instrumentation and attribute annotation patterns
- Token usage tracking and real-time cost monitoring
- Prompt/Response structured logging with sanitization
- Multi-model chain tracing (RAG + Agent + Tool)
- Error monitoring and anomaly detection
- Intelligent alerting and SLA guarantee systems
Table of Contents
- OpenTelemetry LLM Core Concepts
- Pattern 1: LLM Span Instrumentation & Attribute Annotation
- Pattern 2: Token Usage Tracking & Cost Monitoring
- Pattern 3: Prompt/Response Structured Logging
- Pattern 4: Multi-Model Chain Tracing
- Pattern 5: Error Monitoring & Anomaly Detection
- Pattern 6: Intelligent Alerting & SLA Guarantees
- 5 Common Pitfalls & Solutions
- 10 Common Error Troubleshooting
- Advanced Optimization Tips
- Comparison: OpenTelemetry vs LangSmith vs Promptflow
- Recommended Online Tools
OpenTelemetry LLM Core Concepts
Why OpenTelemetry for LLM Tracing
┌─────────────────────────────────────────────────────────┐
│ LLM Application Tracing Architecture │
├─────────────────────────────────────────────────────────┤
│ │
│ User Request ──► API Gateway ──► LLM Service │
│ │ │
│ ┌─────────────┼─────────────┐ │
│ ▼ ▼ ▼ │
│ Embedding Chat Model Tool Call │
│ Service (GPT-4o) Service │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ Vector DB Reranker External API │
│ │
│ Each hop = 1 Span with LLM semantic attributes │
│ Full chain = 1 Trace, end-to-end traceable │
│ │
│ ──► OTel Collector ──► Jaeger/Tempo (Trace) │
│ ──► Prometheus (Metrics) │
│ ──► Loki/ELK (Logs) │
└─────────────────────────────────────────────────────────┘
OpenTelemetry provides three core capabilities for LLM tracing:
| Capability | Description | LLM Value |
|---|---|---|
| Trace | Cross-service chain tracing | Track full call chain from user request to LLM response |
| Span | Single operation recording | Record model, parameters, token usage per LLM call |
| Metric | Metric collection | Real-time monitoring of token consumption, latency distribution, error rate |
| Baggage | Cross-service metadata propagation | Propagate user ID, session ID, A/B experiment labels |
OpenTelemetry LLM Semantic Conventions (2026)
Span Kind: CLIENT
Span Name: gen_ai.client.chat
Standard Attributes:
gen_ai.system = "openai" # LLM provider
gen_ai.request.model = "gpt-4o" # Requested model
gen_ai.request.max_tokens = 4096 # Max output tokens
gen_ai.request.temperature = 0.7 # Temperature parameter
gen_ai.response.model = "gpt-4o-2026-05-13" # Actual model
gen_ai.response.finish_reasons = ["stop"] # Finish reasons
gen_ai.usage.input_tokens = 1523 # Input tokens
gen_ai.usage.output_tokens = 847 # Output tokens
Events (Span Events):
gen_ai.content.prompt = "..." # Prompt content
gen_ai.content.completion = "..." # Response content
Pattern 1: LLM Span Instrumentation & Attribute Annotation
Basic Span Creation
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
resource = Resource.create({
"service.name": "llm-chat-service",
"service.version": "2.1.0",
"deployment.environment": "production",
})
tracer_provider = TracerProvider(resource=resource)
tracer_provider.add_span_processor(
BatchSpanProcessor(
OTLPSpanExporter(endpoint="http://localhost:4317", insecure=True)
)
)
trace.set_tracer_provider(tracer_provider)
tracer = trace.get_tracer("llm-chat-service", "2.1.0")
Standard LLM Span Instrumentation
import time
from opentelemetry import trace
tracer = trace.get_tracer(__name__)
def trace_llm_call(model: str, prompt: str, **kwargs):
with tracer.start_as_current_span(
"gen_ai.client.chat",
kind=trace.SpanKind.CLIENT,
attributes={
"gen_ai.system": "openai",
"gen_ai.request.model": model,
"gen_ai.request.max_tokens": kwargs.get("max_tokens", 4096),
"gen_ai.request.temperature": kwargs.get("temperature", 0.7),
},
) as span:
start_time = time.time()
try:
response = call_openai_api(model, prompt, **kwargs)
span.set_attributes({
"gen_ai.response.model": response.model,
"gen_ai.response.finish_reasons": [
choice.finish_reason for choice in response.choices
],
"gen_ai.usage.input_tokens": response.usage.prompt_tokens,
"gen_ai.usage.output_tokens": response.usage.completion_tokens,
"llm.time_to_first_token_ms": kwargs.get("ttft_ms", 0),
"llm.total_duration_ms": (time.time() - start_time) * 1000,
})
span.add_event(
"gen_ai.content.completion",
attributes={
"gen_ai.content": response.choices[0].message.content[:500],
},
)
return response
except Exception as e:
span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
span.record_exception(e)
raise
Span with Event Annotations
def trace_llm_with_events(model: str, messages: list, **kwargs):
with tracer.start_as_current_span(
"gen_ai.client.chat",
kind=trace.SpanKind.CLIENT,
attributes={
"gen_ai.system": "openai",
"gen_ai.request.model": model,
"gen_ai.request.max_tokens": kwargs.get("max_tokens", 4096),
},
) as span:
for i, msg in enumerate(messages):
span.add_event(
f"gen_ai.content.prompt.{i}",
attributes={
"gen_ai.content.role": msg["role"],
"gen_ai.content": msg["content"][:1000],
},
)
start_time = time.time()
first_token_time = None
response = call_openai_streaming(model, messages, **kwargs)
chunks = []
for chunk in response:
if first_token_time is None:
first_token_time = time.time()
ttft_ms = (first_token_time - start_time) * 1000
span.set_attribute("llm.time_to_first_token_ms", ttft_ms)
chunks.append(chunk)
full_content = "".join(chunks)
span.set_attributes({
"gen_ai.usage.output_tokens": len(full_content) // 4,
"llm.total_duration_ms": (time.time() - start_time) * 1000,
})
span.add_event(
"gen_ai.content.completion",
attributes={"gen_ai.content": full_content[:500]},
)
return full_content
Pattern 2: Token Usage Tracking & Cost Monitoring
Token Counter & Cost Calculation
from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
metric_reader = PeriodicExportingMetricReader(
OTLPMetricExporter(endpoint="http://localhost:4317", insecure=True),
export_interval_millis=60000,
)
meter_provider = MeterProvider(metric_readers=[metric_reader])
metrics.set_meter_provider(meter_provider)
meter = metrics.get_meter("llm-chat-service", "2.1.0")
token_counter = meter.create_counter(
name="gen_ai.client.token.usage",
description="Token usage counter for LLM calls",
unit="tokens",
)
cost_counter = meter.create_counter(
name="llm.cost.usd",
description="LLM API cost in USD",
unit="USD",
)
llm_duration = meter.create_histogram(
name="llm.request.duration",
description="LLM request duration in milliseconds",
unit="ms",
)
MODEL_PRICING = {
"gpt-4o": {"input": 2.50 / 1_000_000, "output": 10.00 / 1_000_000},
"gpt-4o-mini": {"input": 0.15 / 1_000_000, "output": 0.60 / 1_000_000},
"claude-sonnet-4-20250514": {"input": 3.00 / 1_000_000, "output": 15.00 / 1_000_000},
"deepseek-r1": {"input": 0.55 / 1_000_000, "output": 2.19 / 1_000_000},
}
def record_token_usage(model: str, input_tokens: int, output_tokens: int):
token_counter.add(input_tokens, {
"gen_ai.system": "openai",
"gen_ai.request.model": model,
"token.type": "input",
})
token_counter.add(output_tokens, {
"gen_ai.system": "openai",
"gen_ai.request.model": model,
"token.type": "output",
})
pricing = MODEL_PRICING.get(model, {"input": 0, "output": 0})
cost = input_tokens * pricing["input"] + output_tokens * pricing["output"]
cost_counter.add(cost, {
"gen_ai.request.model": model,
"cost.type": "api_call",
})
Cost Quota & Budget Control
import threading
from datetime import datetime
from collections import defaultdict
class TokenBudgetManager:
def __init__(self, daily_budget_usd: float = 100.0):
self.daily_budget = daily_budget_usd
self.current_spend = defaultdict(float)
self.lock = threading.Lock()
self._reset_date = datetime.now().date()
def check_and_record(self, model: str, input_tokens: int, output_tokens: int) -> bool:
with self.lock:
today = datetime.now().date()
if today != self._reset_date:
self.current_spend.clear()
self._reset_date = today
pricing = MODEL_PRICING.get(model, {"input": 0, "output": 0})
cost = input_tokens * pricing["input"] + output_tokens * pricing["output"]
total_spend = sum(self.current_spend.values()) + cost
if total_spend > self.daily_budget:
return False
self.current_spend[model] += cost
return True
def get_remaining_budget(self) -> float:
with self.lock:
return self.daily_budget - sum(self.current_spend.values())
budget_manager = TokenBudgetManager(daily_budget_usd=200.0)
LLM Call with Budget Check
def call_llm_with_budget(model: str, messages: list, **kwargs):
with tracer.start_as_current_span(
"gen_ai.client.chat",
kind=trace.SpanKind.CLIENT,
attributes={
"gen_ai.system": "openai",
"gen_ai.request.model": model,
},
) as span:
response = call_openai_api(model, messages, **kwargs)
input_tokens = response.usage.prompt_tokens
output_tokens = response.usage.completion_tokens
allowed = budget_manager.check_and_record(model, input_tokens, output_tokens)
if not allowed:
span.set_attribute("llm.budget.exceeded", True)
raise BudgetExceededError(
f"Daily budget exceeded. Remaining: ${budget_manager.get_remaining_budget():.2f}"
)
record_token_usage(model, input_tokens, output_tokens)
span.set_attributes({
"gen_ai.usage.input_tokens": input_tokens,
"gen_ai.usage.output_tokens": output_tokens,
"llm.cost.usd": calculate_cost(model, input_tokens, output_tokens),
"llm.budget.remaining_usd": budget_manager.get_remaining_budget(),
})
return response
Pattern 3: Prompt/Response Structured Logging
Sanitization & Structured Recording
import re
import json
from dataclasses import dataclass, field, asdict
@dataclass
class LLMCallLog:
trace_id: str
span_id: str
model: str
prompt_hash: str
prompt_length: int
response_hash: str
response_length: int
input_tokens: int
output_tokens: int
duration_ms: float
ttft_ms: float = 0.0
finish_reason: str = ""
sanitized_prompt: str = ""
sanitized_response: str = ""
metadata: dict = field(default_factory=dict)
PII_PATTERNS = [
(re.compile(r'\b\d{3}[-.]?\d{4}\b'), "[PHONE]"),
(re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'), "[EMAIL]"),
(re.compile(r'\b\d{17}[\dXx]\b'), "[ID_CARD]"),
(re.compile(r'\b(?:\d[ -]*?){13,19}\b'), "[CREDIT_CARD]"),
(re.compile(r'api[_-]?key[=:]\s*\S+', re.IGNORECASE), "[API_KEY]"),
]
def sanitize_text(text: str, max_length: int = 500) -> str:
sanitized = text[:max_length]
for pattern, replacement in PII_PATTERNS:
sanitized = pattern.sub(replacement, sanitized)
return sanitized
def hash_content(content: str) -> str:
import hashlib
return hashlib.sha256(content.encode()).hexdigest()[:16]
Structured Logger
import logging
from datetime import datetime, timezone
class LLMStructuredLogger:
def __init__(self, service_name: str = "llm-chat-service"):
self.logger = logging.getLogger(f"{service_name}.llm_calls")
self.sanitize_enabled = True
self.max_content_length = 500
def log_llm_call(self, span, model: str, prompt: str, response: str,
input_tokens: int, output_tokens: int,
duration_ms: float, ttft_ms: float = 0,
finish_reason: str = "", **metadata):
log_entry = LLMCallLog(
trace_id=format(span.get_span_context().trace_id, "032x"),
span_id=format(span.get_span_context().span_id, "016x"),
model=model,
prompt_hash=hash_content(prompt),
prompt_length=len(prompt),
response_hash=hash_content(response),
response_length=len(response),
input_tokens=input_tokens,
output_tokens=output_tokens,
duration_ms=round(duration_ms, 2),
ttft_ms=round(ttft_ms, 2),
finish_reason=finish_reason,
sanitized_prompt=sanitize_text(prompt, self.max_content_length) if self.sanitize_enabled else "",
sanitized_response=sanitize_text(response, self.max_content_length) if self.sanitize_enabled else "",
metadata=metadata,
)
self.logger.info(
"LLM call completed",
extra={"llm_call": asdict(log_entry)},
)
span.add_event("gen_ai.content.prompt", attributes={
"gen_ai.content": sanitize_text(prompt, 1000),
"gen_ai.content.hash": hash_content(prompt),
})
span.add_event("gen_ai.content.completion", attributes={
"gen_ai.content": sanitize_text(response, 1000),
"gen_ai.content.hash": hash_content(response),
})
llm_logger = LLMStructuredLogger()
Complete LLM Call with Logging
def call_llm_with_logging(model: str, messages: list, **kwargs):
with tracer.start_as_current_span(
"gen_ai.client.chat",
kind=trace.SpanKind.CLIENT,
attributes={
"gen_ai.system": "openai",
"gen_ai.request.model": model,
"gen_ai.request.temperature": kwargs.get("temperature", 0.7),
},
) as span:
start_time = time.time()
prompt_text = json.dumps(messages, ensure_ascii=False)
try:
response = call_openai_api(model, messages, **kwargs)
content = response.choices[0].message.content
finish_reason = response.choices[0].finish_reason
input_tokens = response.usage.prompt_tokens
output_tokens = response.usage.completion_tokens
duration_ms = (time.time() - start_time) * 1000
llm_logger.log_llm_call(
span=span,
model=response.model,
prompt=prompt_text,
response=content,
input_tokens=input_tokens,
output_tokens=output_tokens,
duration_ms=duration_ms,
finish_reason=finish_reason,
user_id=kwargs.get("user_id", "anonymous"),
session_id=kwargs.get("session_id", ""),
)
span.set_attributes({
"gen_ai.usage.input_tokens": input_tokens,
"gen_ai.usage.output_tokens": output_tokens,
"gen_ai.response.finish_reasons": [finish_reason],
})
return response
except Exception as e:
duration_ms = (time.time() - start_time) * 1000
llm_logger.log_llm_call(
span=span, model=model, prompt=prompt_text,
response="", input_tokens=0, output_tokens=0,
duration_ms=duration_ms, error=str(e),
)
span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
span.record_exception(e)
raise
Pattern 4: Multi-Model Chain Tracing
RAG Pipeline Tracing
def trace_rag_pipeline(query: str, user_id: str = ""):
with tracer.start_as_current_span(
"rag.pipeline",
kind=trace.SpanKind.INTERNAL,
attributes={
"rag.query": sanitize_text(query),
"rag.user_id": user_id,
},
) as pipeline_span:
embedding = trace_embedding(query, pipeline_span)
documents = trace_vector_search(embedding, pipeline_span)
context = trace_reranker(query, documents, pipeline_span)
answer = trace_llm_generation(query, context, pipeline_span)
pipeline_span.set_attributes({
"rag.documents_retrieved": len(documents),
"rag.context_length": len(context),
"rag.answer_length": len(answer),
})
return answer
def trace_embedding(query: str, parent_span):
with tracer.start_as_current_span(
"gen_ai.client.embedding",
kind=trace.SpanKind.CLIENT,
attributes={
"gen_ai.system": "openai",
"gen_ai.request.model": "text-embedding-3-large",
"gen_ai.request.encoding_format": "float",
},
) as span:
start_time = time.time()
response = call_embedding_api(query)
span.set_attributes({
"gen_ai.usage.input_tokens": response.usage.prompt_tokens,
"llm.duration_ms": (time.time() - start_time) * 1000,
})
return response.data[0].embedding
def trace_vector_search(embedding: list, parent_span):
with tracer.start_as_current_span(
"vector.search",
kind=trace.SpanKind.CLIENT,
attributes={
"db.system": "pgvector",
"db.operation": "similarity_search",
"db.vector.dimension": len(embedding),
},
) as span:
start_time = time.time()
results = search_pgvector(embedding, top_k=10)
span.set_attributes({
"db.results.count": len(results),
"db.results.top_score": results[0].score if results else 0,
"llm.duration_ms": (time.time() - start_time) * 1000,
})
for i, doc in enumerate(results[:3]):
span.add_event(f"vector.search.result.{i}", attributes={
"document.id": doc.id,
"document.score": doc.score,
"document.content_preview": doc.content[:200],
})
return results
def trace_reranker(query: str, documents: list, parent_span):
with tracer.start_as_current_span(
"gen_ai.client.rerank",
kind=trace.SpanKind.CLIENT,
attributes={
"gen_ai.system": "cohere",
"gen_ai.request.model": "rerank-v3.5",
"rerank.documents_count": len(documents),
},
) as span:
start_time = time.time()
reranked = call_reranker_api(query, documents, top_n=3)
span.set_attributes({
"rerank.results_count": len(reranked),
"llm.duration_ms": (time.time() - start_time) * 1000,
})
return "\n".join([doc.content for doc in reranked])
def trace_llm_generation(query: str, context: str, parent_span):
with tracer.start_as_current_span(
"gen_ai.client.chat",
kind=trace.SpanKind.CLIENT,
attributes={
"gen_ai.system": "openai",
"gen_ai.request.model": "gpt-4o",
},
) as span:
start_time = time.time()
messages = [
{"role": "system", "content": f"Answer based on this context:\n{context}"},
{"role": "user", "content": query},
]
response = call_openai_api("gpt-4o", messages)
span.set_attributes({
"gen_ai.usage.input_tokens": response.usage.prompt_tokens,
"gen_ai.usage.output_tokens": response.usage.completion_tokens,
"llm.duration_ms": (time.time() - start_time) * 1000,
})
return response.choices[0].message.content
Agent Chain Tracing
def trace_agent_execution(user_query: str, max_iterations: int = 5):
with tracer.start_as_current_span(
"agent.execution",
kind=trace.SpanKind.INTERNAL,
attributes={
"agent.type": "react",
"agent.max_iterations": max_iterations,
"agent.query": sanitize_text(user_query),
},
) as agent_span:
messages = [{"role": "user", "content": user_query}]
iteration = 0
while iteration < max_iterations:
iteration += 1
with tracer.start_as_current_span(
f"agent.iteration.{iteration}",
kind=trace.SpanKind.INTERNAL,
) as iter_span:
llm_response = trace_llm_call(
model="gpt-4o",
messages=messages,
temperature=0.0,
)
choice = llm_response.choices[0]
if choice.finish_reason == "stop":
agent_span.set_attributes({
"agent.iterations": iteration,
"agent.final_answer_length": len(choice.message.content),
})
return choice.message.content
if choice.message.tool_calls:
for tool_call in choice.message.tool_calls:
tool_result = trace_tool_call(tool_call, iter_span)
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": str(tool_result),
})
iter_span.set_attribute("agent.iteration.tools_called", len(
choice.message.tool_calls or []
))
messages.append(choice.message.model_dump())
agent_span.set_attribute("agent.status", "max_iterations_reached")
return "Agent reached maximum iterations without a final answer."
def trace_tool_call(tool_call, parent_span):
with tracer.start_as_current_span(
f"tool.{tool_call.function.name}",
kind=trace.SpanKind.CLIENT,
attributes={
"tool.name": tool_call.function.name,
"tool.call_id": tool_call.id,
},
) as span:
start_time = time.time()
try:
import json
arguments = json.loads(tool_call.function.arguments)
span.set_attribute("tool.arguments_hash", hash_content(
tool_call.function.arguments
))
result = execute_tool(tool_call.function.name, arguments)
span.set_attributes({
"tool.result_length": len(str(result)),
"llm.duration_ms": (time.time() - start_time) * 1000,
"tool.status": "success",
})
return result
except Exception as e:
span.set_attributes({
"tool.status": "error",
"tool.error": str(e),
})
span.record_exception(e)
return f"Tool error: {e}"
Multi-Model Chain Visualization
Trace: rag-agent-pipeline-trace-abc123
┌─ rag.pipeline (1280ms)
│ │
│ ├─ gen_ai.client.embedding (45ms)
│ │ model=text-embedding-3-large
│ │ tokens=23
│ │
│ ├─ vector.search (32ms)
│ │ db=pgvector, results=10
│ │
│ ├─ gen_ai.client.rerank (180ms)
│ │ model=rerank-v3.5
│ │ documents=10 → top_n=3
│ │
│ ├─ agent.execution (1023ms)
│ │ │
│ │ ├─ agent.iteration.1 (520ms)
│ │ │ ├─ gen_ai.client.chat (480ms)
│ │ │ │ model=gpt-4o, tokens=1523/847
│ │ │ └─ tool.search_database (40ms)
│ │ │
│ │ ├─ agent.iteration.2 (503ms)
│ │ │ ├─ gen_ai.client.chat (460ms)
│ │ │ │ model=gpt-4o, tokens=2100/620
│ │ │ └─ tool.calculate (43ms)
│ │ │
│ │ └─ agent.iteration.3 (final, 0ms)
│ │ final_answer_length=342
│ │
│ └─ Total: 3 LLM calls, 2 tool calls, 4247 input tokens, 1467 output tokens
Pattern 5: Error Monitoring & Anomaly Detection
LLM-Specific Error Classification
from enum import Enum
from dataclasses import dataclass
class LLMErrorType(Enum):
RATE_LIMIT = "rate_limit"
CONTEXT_LENGTH_EXCEEDED = "context_length_exceeded"
INVALID_API_KEY = "invalid_api_key"
MODEL_NOT_FOUND = "model_not_found"
TIMEOUT = "timeout"
CONTENT_FILTER = "content_filter"
HALLUCINATION = "hallucination"
EMPTY_RESPONSE = "empty_response"
BUDGET_EXCEEDED = "budget_exceeded"
SERVICE_UNAVAILABLE = "service_unavailable"
@dataclass
class LLMErrorEvent:
error_type: LLMErrorType
model: str
error_message: str
trace_id: str
span_id: str
retry_count: int
is_retryable: bool
def classify_llm_error(error: Exception) -> LLMErrorType:
error_str = str(error).lower()
if "rate_limit" in error_str or "429" in error_str:
return LLMErrorType.RATE_LIMIT
elif "context_length" in error_str or "maximum context" in error_str:
return LLMErrorType.CONTEXT_LENGTH_EXCEEDED
elif "invalid_api_key" in error_str or "401" in error_str:
return LLMErrorType.INVALID_API_KEY
elif "model_not_found" in error_str or "404" in error_str:
return LLMErrorType.MODEL_NOT_FOUND
elif "timeout" in error_str:
return LLMErrorType.TIMEOUT
elif "content_filter" in error_str or "content_policy" in error_str:
return LLMErrorType.CONTENT_FILTER
else:
return LLMErrorType.SERVICE_UNAVAILABLE
Error Tracing with Retry
import time
from functools import wraps
error_counter = meter.create_counter(
name="llm.errors",
description="LLM call errors by type",
unit="errors",
)
def llm_retry_with_tracing(max_retries: int = 3, base_delay: float = 1.0):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_error = None
for attempt in range(max_retries + 1):
with tracer.start_as_current_span(
f"llm.call.attempt.{attempt}",
kind=trace.SpanKind.CLIENT,
) as attempt_span:
attempt_span.set_attributes({
"llm.retry.attempt": attempt,
"llm.retry.max_retries": max_retries,
})
try:
result = func(*args, **kwargs)
if attempt > 0:
attempt_span.set_attribute("llm.retry.succeeded", True)
return result
except Exception as e:
error_type = classify_llm_error(e)
is_retryable = error_type in {
LLMErrorType.RATE_LIMIT,
LLMErrorType.TIMEOUT,
LLMErrorType.SERVICE_UNAVAILABLE,
}
error_counter.add(1, {
"error.type": error_type.value,
"gen_ai.request.model": kwargs.get("model", "unknown"),
})
attempt_span.set_attributes({
"error.type": error_type.value,
"error.message": str(e)[:200],
"error.retryable": is_retryable,
})
attempt_span.record_exception(e)
attempt_span.set_status(
trace.Status(trace.StatusCode.ERROR, str(e))
)
if not is_retryable or attempt == max_retries:
raise
delay = base_delay * (2 ** attempt)
time.sleep(delay)
last_error = e
raise last_error
return wrapper
return decorator
Hallucination Detection Span
def trace_hallucination_check(
query: str, answer: str, context: str, model: str = "gpt-4o-mini"
):
with tracer.start_as_current_span(
"llm.hallucination_check",
kind=trace.SpanKind.INTERNAL,
attributes={
"llm.check.type": "hallucination",
"llm.check.model": model,
},
) as span:
check_prompt = f"""Determine if the following answer is based on the given context. If the answer contains information not present in the context, flag it as hallucination.
Context:
{context}
Answer:
{answer}
Return in JSON format: {{"is_hallucination": true/false, "confidence": 0.0-1.0, "reason": "..."}}"""
start_time = time.time()
response = call_openai_api(model, [
{"role": "user", "content": check_prompt}
], temperature=0.0)
import json
result = json.loads(response.choices[0].message.content)
span.set_attributes({
"llm.hallucination.detected": result.get("is_hallucination", False),
"llm.hallucination.confidence": result.get("confidence", 0.0),
"llm.hallucination.reason": result.get("reason", "")[:200],
"llm.duration_ms": (time.time() - start_time) * 1000,
})
return result
Pattern 6: Intelligent Alerting & SLA Guarantees
SLA Metric Definitions
from dataclasses import dataclass
@dataclass
class LLMSLA:
p50_latency_ms: float = 2000.0
p95_latency_ms: float = 5000.0
p99_latency_ms: float = 10000.0
ttft_p95_ms: float = 1000.0
error_rate_threshold: float = 0.01
daily_budget_usd: float = 200.0
hallucination_rate_threshold: float = 0.05
min_success_rate: float = 0.99
sla = LLMSLA()
latency_histogram = meter.create_histogram(
name="llm.request.latency",
description="LLM request latency distribution",
unit="ms",
)
ttft_histogram = meter.create_histogram(
name="llm.time_to_first_token",
description="Time to first token distribution",
unit="ms",
)
Intelligent Alert Rules
from collections import deque
import threading
class LLMAlertManager:
def __init__(self, sla_config: LLMSLA):
self.sla = sla_config
self.recent_latencies = deque(maxlen=1000)
self.recent_errors = deque(maxlen=1000)
self.recent_ttft = deque(maxlen=1000)
self.lock = threading.Lock()
self.alert_callbacks = []
def add_alert_callback(self, callback):
self.alert_callbacks.append(callback)
def record_latency(self, latency_ms: float, ttft_ms: float = 0, is_error: bool = False):
with self.lock:
self.recent_latencies.append(latency_ms)
self.recent_ttft.append(ttft_ms)
self.recent_errors.append(1 if is_error else 0)
self._check_alerts(latency_ms, ttft_ms, is_error)
def _check_alerts(self, latency_ms: float, ttft_ms: float, is_error: bool):
alerts = []
if latency_ms > self.sla.p99_latency_ms:
alerts.append({
"level": "CRITICAL",
"type": "latency_p99_breach",
"message": f"LLM latency {latency_ms:.0f}ms exceeds P99 SLA {self.sla.p99_latency_ms:.0f}ms",
"value": latency_ms,
"threshold": self.sla.p99_latency_ms,
})
if ttft_ms > self.sla.ttft_p95_ms:
alerts.append({
"level": "WARNING",
"type": "ttft_breach",
"message": f"TTFT {ttft_ms:.0f}ms exceeds P95 SLA {self.sla.ttft_p95_ms:.0f}ms",
"value": ttft_ms,
"threshold": self.sla.ttft_p95_ms,
})
with self.lock:
if len(self.recent_errors) >= 100:
error_rate = sum(self.recent_errors) / len(self.recent_errors)
if error_rate > self.sla.error_rate_threshold:
alerts.append({
"level": "CRITICAL",
"type": "error_rate_breach",
"message": f"Error rate {error_rate:.2%} exceeds threshold {self.sla.error_rate_threshold:.2%}",
"value": error_rate,
"threshold": self.sla.error_rate_threshold,
})
for alert in alerts:
self._fire_alert(alert)
def _fire_alert(self, alert: dict):
with tracer.start_as_current_span(
"llm.alert",
kind=trace.SpanKind.INTERNAL,
attributes={
"alert.level": alert["level"],
"alert.type": alert["type"],
"alert.message": alert["message"],
"alert.value": str(alert["value"]),
"alert.threshold": str(alert["threshold"]),
},
):
for callback in self.alert_callbacks:
try:
callback(alert)
except Exception:
pass
alert_manager = LLMAlertManager(sla)
alert_manager.add_alert_callback(lambda a: print(f"[{a['level']}] {a['message']}"))
Prometheus Alert Rules
groups:
- name: llm_sla_alerts
rules:
- alert: LLMLatencyP99Breach
expr: histogram_quantile(0.99, rate(llm_request_latency_bucket[5m])) > 10000
for: 2m
labels:
severity: critical
team: ai-platform
annotations:
summary: "LLM P99 latency exceeds SLA threshold"
description: "Model {{ $labels.gen_ai_request_model }} P99 latency is {{ $value }}ms"
- alert: LLMErrorRateHigh
expr: rate(llm_errors_total[5m]) / rate(llm_calls_total[5m]) > 0.01
for: 3m
labels:
severity: critical
team: ai-platform
annotations:
summary: "LLM error rate exceeds 1%"
description: "Model {{ $labels.gen_ai_request_model }} error rate is {{ $value | humanizePercentage }}"
- alert: LLMTokenBudgetApproaching
expr: llm_cost_usd_total > 180
for: 5m
labels:
severity: warning
team: ai-platform
annotations:
summary: "LLM daily spend approaching budget limit"
description: "Current spend ${{ $value }}, budget limit $200"
- alert: LLMTTFTHigh
expr: histogram_quantile(0.95, rate(llm_time_to_first_token_bucket[5m])) > 1000
for: 3m
labels:
severity: warning
team: ai-platform
annotations:
summary: "LLM TTFT P95 exceeds 1 second"
description: "TTFT P95 = {{ $value }}ms"
5 Common Pitfalls & Solutions
Pitfall 1: Too Many Span Attributes Causing Sampling Loss
BAD_PRACTICE = {
"gen_ai.content.full_prompt": prompt[:10000],
"gen_ai.content.full_response": response[:10000],
}
GOOD_PRACTICE = {
"gen_ai.usage.input_tokens": input_tokens,
"gen_ai.usage.output_tokens": output_tokens,
}
span.add_event("gen_ai.content.prompt", attributes={
"gen_ai.content": sanitize_text(prompt, 1000),
})
Solution: Use Span attributes for core metrics, Span Events for long text, truncate or hash content over 1KB.
Pitfall 2: Inaccurate Token Counting
BAD_PRACTICE = {
"gen_ai.usage.input_tokens": len(prompt) // 4,
}
GOOD_PRACTICE = {
"gen_ai.usage.input_tokens": response.usage.prompt_tokens,
}
Solution: Always use the usage field from the API response, never estimate tokens from character count.
Pitfall 3: Ignoring Context Propagation
BAD_PRACTICE = None
from opentelemetry import context, baggage
GOOD_PRACTICE = {
"propagate_context": context.attach(baggage.set_baggage("user.id", user_id)),
}
Solution: Use propagate.inject()/extract() for cross-service context propagation.
Pitfall 4: Not Recording TTFT for Streaming Responses
BAD_PRACTICE = {
"streaming_without_ttft": "Only recording total latency, ignoring time to first token",
}
GOOD_PRACTICE = """
with tracer.start_as_current_span("gen_ai.client.chat") as span:
start = time.time()
first_token_time = None
for chunk in stream:
if first_token_time is None:
first_token_time = time.time()
span.set_attribute("llm.time_to_first_token_ms",
(first_token_time - start) * 1000)
"""
Solution: Streaming responses must record TTFT — it's the core metric for user experience.
Pitfall 5: Improper Sampling Strategy
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased, ParentBased, Sampler, SamplingResult, Decision
BAD_SAMPLER = TraceIdRatioBased(rate=1.0)
class LLMAwareSampler(Sampler):
def should_sample(self, parent_context, trace_id, name, kind,
attributes, links, trace_state):
if attributes:
system = attributes.get("gen_ai.system")
if system:
return SamplingResult(
decision=Decision.RECORD_AND_SAMPLE,
attributes=attributes,
trace_state=trace_state,
)
return SamplingResult(
decision=Decision.RECORD_AND_SAMPLE if hash(trace_id) % 10 == 0 else Decision.DROP,
attributes=attributes,
trace_state=trace_state,
)
def get_description(self):
return "LLMAwareSampler"
Solution: LLM Spans must be 100% sampled; non-LLM Spans can use ratio-based sampling.
10 Common Error Troubleshooting
| # | Error Message | Cause | Solution |
|---|---|---|---|
| 1 | StatusCode.UNAVAILABLE OTLP export failed |
Collector not running or wrong port | Check http://localhost:4317 is reachable |
| 2 | context_length_exceeded |
Prompt exceeds model context window | Use tiktoken to pre-calculate tokens, truncate or split |
| 3 | rate_limit_exceeded 429 |
API call rate exceeds limit | Implement exponential backoff retry, add token bucket rate limiting |
| 4 | Span is not being recorded |
TracerProvider not configured | Confirm trace.set_tracer_provider() has been called |
| 5 | Baggage not propagated |
Context not propagated across services | Use propagate.inject()/extract() |
| 6 | Metric reader timeout |
Metric export timeout | Increase export_timeout_millis or reduce collection frequency |
| 7 | content_filter_triggered |
Prompt triggered safety filter | Add content pre-check, use moderation API |
| 8 | Empty response from LLM |
Model returned empty content | Check finish_reason, add retry logic |
| 9 | Memory leak in span processor |
BatchSpanProcessor backlog | Adjust max_queue_size and schedule_delay_millis |
| 10 | Duplicate span attributes |
Same attribute set multiple times | Use span.set_attribute() not add_event() for metrics |
Diagnostic Script
def diagnose_otel_setup():
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.metrics import MeterProvider
issues = []
provider = trace.get_tracer_provider()
if not isinstance(provider, TracerProvider):
issues.append("TracerProvider not configured, may be using default NoOpProvider")
else:
if not provider._active_span_processor._span_processors:
issues.append("TracerProvider has no SpanProcessor configured")
meter_provider = metrics.get_meter_provider()
if not isinstance(meter_provider, MeterProvider):
issues.append("MeterProvider not configured")
try:
import opentelemetry.instrumentation.openai
except ImportError:
issues.append("opentelemetry-instrumentation-openai not installed")
try:
import tiktoken
except ImportError:
issues.append("tiktoken not installed, cannot pre-calculate token count")
if issues:
print("Diagnosis found issues:")
for i, issue in enumerate(issues, 1):
print(f" {i}. {issue}")
else:
print("OpenTelemetry configuration is healthy")
diagnose_otel_setup()
Advanced Optimization Tips
Auto-Instrumentation
from opentelemetry.instrumentation.openai import OpenAIInstrumentor
OpenAIInstrumentor().instrument()
from opentelemetry.instrumentation.requests import RequestsInstrumentor
RequestsInstrumentor().instrument()
from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor
AsyncioInstrumentor().instrument()
Custom Propagator
from opentelemetry.propagate import composite, global_set_text_map_propagator
from opentelemetry.trace.propagation.tracecontext import TraceContextPropagator
from opentelemetry.baggage.propagation import W3CBaggagePropagator
global_set_text_map_propagator(
composite.CompositePropagator([
TraceContextPropagator(),
W3CBaggagePropagator(),
])
)
Dynamic Sampling Strategy
from opentelemetry.sdk.trace.sampling import Sampler, SamplingResult, Decision
class AdaptiveLLMSampler(Sampler):
def __init__(self, base_rate: float = 0.1, error_sample_rate: float = 1.0,
slow_threshold_ms: float = 5000.0):
self.base_rate = base_rate
self.error_sample_rate = error_sample_rate
self.slow_threshold_ms = slow_threshold_ms
def should_sample(self, parent_context, trace_id, name, kind,
attributes, links, trace_state):
if attributes:
is_llm = attributes.get("gen_ai.system") is not None
if is_llm:
return SamplingResult(
decision=Decision.RECORD_AND_SAMPLE,
attributes=attributes,
trace_state=trace_state,
)
if parent_context:
from opentelemetry.trace import get_current_span
parent_span = get_current_span(parent_context)
if parent_span.is_recording():
parent_attrs = parent_span.attributes or {}
if parent_attrs.get("error.type"):
return SamplingResult(
decision=Decision.RECORD_AND_SAMPLE,
attributes=attributes,
trace_state=trace_state,
)
should_sample = (hash(trace_id) % 10000) < (self.base_rate * 10000)
return SamplingResult(
decision=Decision.RECORD_AND_SAMPLE if should_sample else Decision.DROP,
attributes=attributes,
trace_state=trace_state,
)
def get_description(self):
return f"AdaptiveLLMSampler(base={self.base_rate})"
Grafana Dashboard Configuration
{
"dashboard": {
"title": "LLM Observability Dashboard",
"panels": [
{
"title": "Token Usage (Input vs Output)",
"type": "timeseries",
"targets": [{
"expr": "sum(rate(gen_ai_client_token_usage[5m])) by (token_type, gen_ai_request_model)"
}]
},
{
"title": "LLM Request Latency P50/P95/P99",
"type": "timeseries",
"targets": [
{"expr": "histogram_quantile(0.50, rate(llm_request_latency_bucket[5m]))"},
{"expr": "histogram_quantile(0.95, rate(llm_request_latency_bucket[5m]))"},
{"expr": "histogram_quantile(0.99, rate(llm_request_latency_bucket[5m]))"}
]
},
{
"title": "Daily Cost by Model",
"type": "piechart",
"targets": [{
"expr": "sum(increase(llm_cost_usd_total[24h])) by (gen_ai_request_model)"
}]
},
{
"title": "Error Rate by Type",
"type": "barchart",
"targets": [{
"expr": "sum(rate(llm_errors_total[5m])) by (error_type)"
}]
}
]
}
}
Comparison: OpenTelemetry vs LangSmith vs Promptflow
| Dimension | OpenTelemetry | LangSmith | Promptflow |
|---|---|---|---|
| Open Source | Yes (CNCF project) | No (SaaS) | Yes (Microsoft) |
| Vendor Lock-in | None | High (LangChain ecosystem) | Medium (Azure ecosystem) |
| Tracing | Native, standard protocol | Proprietary trace system | Proprietary trace system |
| Token Tracking | Manual instrumentation | Automatic | Automatic |
| Prompt Management | Not supported | Supported | Supported |
| Cost Monitoring | Custom metrics required | Built-in | Built-in |
| Multi-language Support | 11+ languages | Python/JS | Python |
| APM Integration | Native (Jaeger/Tempo/Grafana) | Requires export | Requires export |
| Customization | Extremely flexible | Moderate | Moderate |
| Learning Curve | Steep | Gentle | Moderate |
| Production Readiness | High (CNCF graduated) | High | Moderate |
| Data Sovereignty | Full control | Data on LangSmith | Data on Azure |
Selection Guide:
- Existing APM infrastructure → OpenTelemetry (unified observability stack)
- Heavy LangChain user → LangSmith (out-of-the-box)
- Azure ecosystem → Promptflow (deep integration with Azure AI Studio)
Recommended Online Tools
When building LLM tracing systems, these online tools can boost your productivity:
- JSON Formatter — Format OpenTelemetry Span JSON data for quick attribute troubleshooting
- Base64 Encoder — Encode/decode Base64 data in Trace Context propagation
- cURL to Code Converter — Convert API debugging cURL commands to Python code
Summary
Python OpenTelemetry LLM tracing is the cornerstone of building production-grade AI application observability. The 6 production patterns cover the complete chain from basic Span instrumentation to intelligent alerting:
- LLM Span Instrumentation — Use
gen_ai.*semantic conventions for standardized attribute annotation - Token Tracking — Real-time usage and cost monitoring to prevent budget overruns
- Structured Logging — Sanitized Prompt/Response recording balancing debugging and compliance
- Multi-Model Chains — Trace complex call chains like RAG and Agent pipelines
- Error Monitoring — Classified exceptions, automatic retries, hallucination detection
- Intelligent Alerting — SLA guarantees, dynamic sampling, multi-level alerting
Key Principles: LLM Spans must be 100% sampled, token counts must come from API responses, prompt content must be sanitized, and TTFT is the core metric for user experience.
Related Reading
- OpenTelemetry LLM Observability Guide — Deep dive into LLM semantic conventions and Grafana dashboards
- Go OpenTelemetry Distributed Tracing — Go language OTel tracing practices
- Python AI Production Deployment Guide — Complete path from development to production for LLM applications
References
- OpenTelemetry LLM Semantic Conventions — Official LLM semantic conventions specification
- OpenTelemetry Python Documentation — Python SDK documentation
Try these browser-local tools — no sign-up required →