Python LLM结构化输出:从JSON Schema到函数调用的6种生产模式
LLM输出一坨自由文本,你的下游系统全崩了
你让GPT返回JSON,它给你带注释的JSON;你指定字段类型为整数,它返回字符串"42";你要求列表长度为3,它给你5个。LLM结构化输出是2026年AI工程最核心的基础能力——没有它,你的RAG管道、Agent工具调用、数据提取流水线全是定时炸弹。
本文将从JSON Schema约束出发,带你完成JSON Schema验证→OpenAI函数调用→Instructor自动重试→多模型适配→流式结构化输出→生产可靠性保障的6种生产模式,从概念到落地,一步到位。
核心收获
- 理解LLM结构化输出的3种核心机制:Prompt约束、JSON Schema、函数调用协议
- 掌握6种从简单到复杂的Python结构化输出模式
- 学会Instructor库的自动重试和验证策略
- 实现跨模型(OpenAI/Anthropic/Gemini)的结构化输出适配
- 构建生产级可靠性保障体系
目录
- LLM结构化输出核心概念
- 模式1:JSON Schema约束输出
- 模式2:OpenAI函数调用协议
- 模式3:Instructor库自动重试
- 模式4:多模型结构化输出适配
- 模式5:流式结构化输出
- 模式6:生产级可靠性保障
- 5个常见坑及解决方案
- 10个常见报错排查
- 进阶优化技巧
- 对比分析:3种结构化输出方案
- 在线工具推荐
LLM结构化输出核心概念
| 概念 | 说明 |
|---|---|
| Structured Output | LLM输出符合预定义Schema的结构化数据(JSON/XML) |
| JSON Schema | 描述JSON数据结构的规范,用于约束和验证LLM输出 |
| Function Calling | OpenAI提出的协议,让LLM输出符合函数参数Schema的JSON |
| Tool Use | Anthropic/Gemini对函数调用的实现,语义相同 |
| Constrained Decoding | 推理时约束token选择,保证输出100%符合Schema |
| Instructor | Python库,基于Pydantic模型自动生成Schema+验证+重试 |
为什么LLM结构化输出如此重要
传统LLM输出流程:
用户Prompt → LLM自由生成 → 字符串 → 正则/JSON解析 → 可能失败 → 重试
结构化输出流程:
用户Prompt + Schema → LLM受约束生成 → 合法JSON → Pydantic验证 → 成功
关键差异:
1. 传统方式:输出不可预测,解析脆弱,重试成本高
2. 结构化输出:输出可预测,验证可靠,重试有保障
3种结构化输出机制对比
| 机制 | 原理 | 可靠性 | 延迟 | 兼容性 |
|---|---|---|---|---|
| Prompt约束 | 在提示词中描述输出格式 | ⭐低 | 无额外 | 所有模型 |
| JSON Schema | 通过Schema约束输出结构 | ⭐⭐中 | 轻微 | 部分模型 |
| 函数调用协议 | 专用API通道+Constrained Decoding | ⭐⭐⭐高 | 轻微 | 特定模型 |
模式1:JSON Schema约束输出
最基础的结构化输出方式:在Prompt中描述格式要求,用JSON Schema验证结果。
import json
import re
from typing import Optional
from pydantic import BaseModel, Field, ValidationError
class MovieReview(BaseModel):
title: str = Field(description="电影名称")
rating: int = Field(ge=1, le=10, description="评分1-10")
sentiment: str = Field(pattern="^(positive|negative|neutral)$")
summary: str = Field(max_length=200, description="简短评价")
recommended: bool = Field(description="是否推荐")
MOVIE_REVIEW_SCHEMA = MovieReview.model_json_schema()
STRUCTURED_PROMPT = """你是一个专业的电影评论分析器。
请分析以下评论,并严格按照JSON Schema返回结果。
JSON Schema:
{schema}
评论内容:
{review}
重要要求:
1. 必须返回合法JSON
2. rating必须是1-10的整数
3. sentiment只能是positive/negative/neutral
4. 不要添加任何JSON以外的内容
5. 不要用```json```包裹
"""
def extract_json_from_response(text: str) -> Optional[dict]:
patterns = [
r'```json\s*(.*?)\s*```',
r'```\s*(.*?)\s*```',
r'(\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\})',
]
for pattern in patterns:
match = re.search(pattern, text, re.DOTALL)
if match:
try:
return json.loads(match.group(1))
except json.JSONDecodeError:
continue
try:
return json.loads(text.strip())
except json.JSONDecodeError:
return None
def parse_structured_output(raw_text: str) -> Optional[MovieReview]:
parsed_json = extract_json_from_response(raw_text)
if parsed_json is None:
return None
try:
return MovieReview.model_validate(parsed_json)
except ValidationError as e:
print(f"验证失败: {e}")
return None
async def call_llm_with_schema(prompt: str) -> Optional[MovieReview]:
from openai import AsyncOpenAI
client = AsyncOpenAI()
formatted_prompt = STRUCTURED_PROMPT.format(
schema=json.dumps(MOVIE_REVIEW_SCHEMA, ensure_ascii=False, indent=2),
review=prompt
)
response = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": formatted_prompt}],
temperature=0.1,
)
raw_text = response.choices[0].message.content or ""
return parse_structured_output(raw_text)
JSON Schema验证的局限
问题1:LLM可能返回不合法的JSON
→ 需要extract_json_from_response做容错提取
问题2:LLM可能忽略Schema约束
→ rating返回"9分"而不是9
→ sentiment返回"很积极"而不是positive
问题3:嵌套结构容易出错
→ 列表长度不可控
→ 可选字段可能缺失
问题4:每次都要手写Prompt
→ 维护成本高,容易遗漏
模式2:OpenAI函数调用协议
OpenAI的Function Calling协议是结构化输出的标准方案,通过专用API通道让LLM输出符合Schema的JSON。
import json
from typing import Optional
from pydantic import BaseModel, Field
from openai import AsyncOpenAI
class SentimentAnalysis(BaseModel):
text: str = Field(description="被分析的文本")
sentiment: str = Field(description="情感倾向: positive/negative/neutral")
confidence: float = Field(ge=0.0, le=1.0, description="置信度0-1")
keywords: list[str] = Field(description="关键词列表")
language: str = Field(description="检测到的语言")
class EntityExtraction(BaseModel):
entities: list[dict] = Field(description="提取的实体列表")
relationships: list[dict] = Field(default_factory=list, description="实体间关系")
summary: str = Field(description="文本摘要")
def pydantic_to_function_schema(model_class: type[BaseModel]) -> dict:
schema = model_class.model_json_schema()
return {
"type": "function",
"function": {
"name": model_class.__name__,
"description": model_class.__doc__ or f"Extract {model_class.__name__}",
"parameters": {
"type": "object",
"properties": schema.get("properties", {}),
"required": schema.get("required", []),
}
}
}
async def function_calling_extract(
text: str,
model_class: type[BaseModel],
model: str = "gpt-4o"
) -> Optional[BaseModel]:
client = AsyncOpenAI()
function_schema = pydantic_to_function_schema(model_class)
response = await client.chat.completions.create(
model=model,
messages=[
{
"role": "system",
"content": "你是一个精确的数据提取助手。使用提供的函数来结构化输出结果。"
},
{
"role": "user",
"content": text
}
],
tools=[function_schema],
tool_choice={"type": "function", "function": {"name": model_class.__name__}},
)
message = response.choices[0].message
if message.tool_calls and len(message.tool_calls) > 0:
tool_call = message.tool_calls[0]
try:
args = json.loads(tool_call.function.arguments)
return model_class.model_validate(args)
except (json.JSONDecodeError, Exception) as e:
print(f"解析函数调用结果失败: {e}")
return None
return None
async def multi_function_calling(
text: str,
model_classes: list[type[BaseModel]],
model: str = "gpt-4o"
) -> dict[str, BaseModel]:
client = AsyncOpenAI()
tool_schemas = [pydantic_to_function_schema(cls) for cls in model_classes]
response = await client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": "你是一个多任务数据提取助手。"},
{"role": "user", "content": text}
],
tools=tool_schemas,
tool_choice="auto",
)
results = {}
message = response.choices[0].message
if message.tool_calls:
for tool_call in message.tool_calls:
for cls in model_classes:
if tool_call.function.name == cls.__name__:
try:
args = json.loads(tool_call.function.arguments)
results[cls.__name__] = cls.model_validate(args)
except Exception as e:
print(f"解析{cls.__name__}失败: {e}")
return results
函数调用协议的Strict Mode
from openai import AsyncOpenAI
async def strict_structured_output(
text: str,
model_class: type[BaseModel],
model: str = "gpt-4o-2024-08-06"
) -> Optional[BaseModel]:
client = AsyncOpenAI()
schema = model_class.model_json_schema()
response = await client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": "提取结构化数据"},
{"role": "user", "content": text}
],
response_format={
"type": "json_schema",
"json_schema": {
"name": model_class.__name__,
"strict": True,
"schema": schema,
}
}
)
raw = response.choices[0].message.content
if raw:
try:
return model_class.model_validate(json.loads(raw))
except Exception as e:
print(f"Strict mode解析失败: {e}")
return None
模式3:Instructor库自动重试
Instructor库是Python LLM结构化输出的最佳实践,基于Pydantic模型自动生成Schema、验证输出、自动重试。
import instructor
from pydantic import BaseModel, Field, field_validator
from openai import AsyncOpenAI
class ProductInfo(BaseModel):
name: str = Field(description="产品名称")
price: float = Field(gt=0, description="价格,必须大于0")
category: str = Field(description="产品分类")
features: list[str] = Field(description="产品特性列表", min_length=1, max_length=5)
in_stock: bool = Field(description="是否有库存")
@field_validator("price")
@classmethod
def round_price(cls, v: float) -> float:
return round(v, 2)
@field_validator("category")
@classmethod
def normalize_category(cls, v: str) -> str:
return v.strip().lower()
class ArticleMetadata(BaseModel):
title: str = Field(description="文章标题")
author: str = Field(description="作者")
publish_date: str = Field(description="发布日期,格式YYYY-MM-DD")
tags: list[str] = Field(description="标签列表")
word_count: int = Field(ge=0, description="字数")
reading_time_minutes: int = Field(ge=1, description="预计阅读时间(分钟)")
@field_validator("publish_date")
@classmethod
def validate_date_format(cls, v: str) -> str:
import re
if not re.match(r'^\d{4}-\d{2}-\d{2}$', v):
raise ValueError(f"日期格式错误: {v},需要YYYY-MM-DD")
return v
async def instructor_extract_product(text: str) -> ProductInfo:
client = instructor.from_openai(AsyncOpenAI())
result = await client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "user", "content": f"从以下文本中提取产品信息:\n\n{text}"}
],
response_model=ProductInfo,
max_retries=3,
temperature=0.1,
)
return result
async def instructor_extract_with_mode(
text: str,
mode: instructor.Mode = instructor.Mode.TOOLS
) -> ArticleMetadata:
client = instructor.from_openai(AsyncOpenAI(), mode=mode)
result = await client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "user", "content": f"提取文章元数据:\n\n{text}"}
],
response_model=ArticleMetadata,
max_retries=3,
)
return result
async def instructor_partial_streaming(text: str):
client = instructor.from_openai(AsyncOpenAI())
article = await client.chat.completions.create_partial(
model="gpt-4o",
messages=[
{"role": "user", "content": f"提取文章元数据:\n\n{text}"}
],
response_model=ArticleMetadata,
max_retries=3,
)
async for partial in article:
print(f"部分结果: {partial.model_dump_json(exclude_none=True)}")
Instructor的Mode选择
Mode.JSON_SCHEMA → OpenAI的response_format=json_schema(推荐,最可靠)
Mode.TOOLS → OpenAI的function calling(兼容性好)
Mode.JSON → 在Prompt中要求JSON输出(最通用,可靠性最低)
Mode.ANTHROPIC_TOOLS→ Anthropic的tool_use
Mode.GEMINI_JSON → Gemini的JSON模式
Instructor重试策略详解
import instructor
from pydantic import BaseModel, Field, ValidationError
from openai import AsyncOpenAI
from tenacity import retry, stop_after_attempt, wait_exponential
class StrictUser(BaseModel):
name: str = Field(min_length=2, max_length=50)
age: int = Field(ge=0, le=150)
email: str = Field(pattern=r'^[\w\.-]+@[\w\.-]+\.\w+$')
async def instructor_with_custom_retry(text: str) -> StrictUser:
client = instructor.from_openai(
AsyncOpenAI(),
mode=instructor.Mode.JSON_SCHEMA,
)
result, completion = await client.chat.completions.create_with_completion(
model="gpt-4o",
messages=[{"role": "user", "content": text}],
response_model=StrictUser,
max_retries=3,
validation_context={"strict": True},
)
print(f"Token使用: prompt={completion.usage.prompt_tokens}, "
f"completion={completion.usage.completion_tokens}")
return result
async def instructor_batch_extract(
texts: list[str],
) -> list[StrictUser]:
client = instructor.from_openai(AsyncOpenAI())
results = []
for text in texts:
try:
result = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": text}],
response_model=StrictUser,
max_retries=2,
)
results.append(result)
except instructor.exceptions.InstructorRetryException as e:
print(f"批量提取失败,跳过: {e}")
results.append(None)
return results
模式4:多模型结构化输出适配
不同LLM厂商的结构化输出API各不相同,需要适配层统一处理。
import json
from abc import ABC, abstractmethod
from typing import Optional, TypeVar
from pydantic import BaseModel
from openai import AsyncOpenAI
T = TypeVar("T", bound=BaseModel)
class StructuredOutputAdapter(ABC):
@abstractmethod
async def extract(self, text: str, model_class: type[T]) -> Optional[T]:
pass
class OpenAIStructuredAdapter(StructuredOutputAdapter):
def __init__(self, model: str = "gpt-4o"):
self.client = AsyncOpenAI()
self.model = model
async def extract(self, text: str, model_class: type[T]) -> Optional[T]:
import instructor
client = instructor.from_openai(self.client, mode=instructor.Mode.JSON_SCHEMA)
return await client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": text}],
response_model=model_class,
max_retries=3,
)
class AnthropicStructuredAdapter(StructuredOutputAdapter):
def __init__(self, model: str = "claude-sonnet-4-20250514"):
try:
import anthropic
self.client = anthropic.AsyncAnthropic()
except ImportError:
raise ImportError("请安装anthropic: pip install anthropic")
self.model = model
async def extract(self, text: str, model_class: type[T]) -> Optional[T]:
import anthropic
import instructor
client = instructor.from_anthropic(
self.client,
mode=instructor.Mode.ANTHROPIC_TOOLS,
)
return await client.chat.completions.create(
model=self.model,
max_tokens=4096,
messages=[{"role": "user", "content": text}],
response_model=model_class,
max_retries=3,
)
class GeminiStructuredAdapter(StructuredOutputAdapter):
def __init__(self, model: str = "gemini-2.0-flash"):
self.model_name = model
async def extract(self, text: str, model_class: type[T]) -> Optional[T]:
import google.generativeai as genai
import os
genai.configure(api_key=os.environ.get("GEMINI_API_KEY"))
model = genai.GenerativeModel(self.model_name)
schema = model_class.model_json_schema()
prompt = f"""从以下文本中提取结构化数据。
严格按照JSON Schema返回结果,不要添加任何额外内容。
JSON Schema:
{json.dumps(schema, ensure_ascii=False, indent=2)}
文本:
{text}"""
response = await model.generate_content_async(prompt)
raw = response.text.strip()
if raw.startswith("```json"):
raw = raw[7:]
if raw.endswith("```"):
raw = raw[:-3]
raw = raw.strip()
try:
return model_class.model_validate(json.loads(raw))
except Exception as e:
print(f"Gemini解析失败: {e}")
return None
class MultiModelStructuredExtractor:
def __init__(self):
self.adapters: dict[str, StructuredOutputAdapter] = {}
def register(self, name: str, adapter: StructuredOutputAdapter):
self.adapters[name] = adapter
async def extract(
self,
text: str,
model_class: type[T],
preferred: str = "openai",
fallback: bool = True,
) -> Optional[T]:
order = [preferred]
if fallback:
order.extend([k for k in self.adapters if k != preferred])
for model_name in order:
adapter = self.adapters.get(model_name)
if adapter is None:
continue
try:
result = await adapter.extract(text, model_class)
if result is not None:
return result
except Exception as e:
print(f"[{model_name}] 提取失败: {e}")
continue
return None
async def extract_consensus(
self,
text: str,
model_class: type[T],
min_agreement: int = 2,
) -> Optional[T]:
import asyncio
tasks = {
name: adapter.extract(text, model_class)
for name, adapter in self.adapters.items()
}
results = await asyncio.gather(*tasks.values(), return_exceptions=True)
valid_results = []
for (name, _), result in zip(tasks.items(), results):
if isinstance(result, Exception):
print(f"[{name}] 异常: {result}")
continue
if result is not None:
valid_results.append(result)
if len(valid_results) >= min_agreement:
return valid_results[0]
return valid_results[0] if valid_results else None
多模型适配架构
┌─────────────────────┐
│ MultiModelExtractor │
│ (统一接口) │
└──────────┬──────────┘
│
┌────────────────┼────────────────┐
│ │ │
┌─────────▼──────┐ ┌──────▼───────┐ ┌──────▼───────┐
│ OpenAI Adapter │ │Anthropic Adp │ │ Gemini Adptr │
│ JSON_SCHEMA │ │ANTHROPIC_TOOLS│ │ Prompt+Parse │
│ Instructor │ │ Instructor │ │ 手动解析 │
└────────────────┘ └──────────────┘ └──────────────┘
模式5:流式结构化输出
LLM结构化输出结合流式传输,实现实时解析和渐进式展示。
import json
import asyncio
from typing import AsyncIterator, Optional
from pydantic import BaseModel, Field
from openai import AsyncOpenAI
class StreamingJsonParser:
def __init__(self):
self.buffer = ""
self.depth = 0
self.in_string = False
self.escape_next = False
self.started = False
def feed(self, chunk: str) -> list[dict]:
self.buffer += chunk
results = []
for char in chunk:
if self.escape_next:
self.escape_next = False
continue
if char == '\\' and self.in_string:
self.escape_next = True
continue
if char == '"' and not self.escape_next:
self.in_string = not self.in_string
continue
if self.in_string:
continue
if char == '{':
if not self.started:
self.started = True
idx = self.buffer.rfind('{')
self.buffer = self.buffer[idx:]
self.depth += 1
elif char == '}':
self.depth -= 1
if self.depth == 0 and self.started:
try:
parsed = json.loads(self.buffer)
results.append(parsed)
except json.JSONDecodeError:
pass
self.buffer = ""
self.started = False
return results
class PartialModelBuilder:
def __init__(self, model_class: type[BaseModel]):
self.model_class = model_class
self.current_json = {}
self.last_valid = None
def update(self, json_data: dict) -> Optional[BaseModel]:
self.current_json.update(json_data)
try:
self.last_valid = self.model_class.model_validate(self.current_json)
return self.last_valid
except Exception:
return self.last_valid
class StreamingEvent(BaseModel):
event_type: str = Field(description="事件类型")
data: dict = Field(description="事件数据")
confidence: float = Field(ge=0.0, le=1.0, description="置信度")
async def stream_structured_output(
prompt: str,
model_class: type[BaseModel],
model: str = "gpt-4o",
) -> AsyncIterator[BaseModel]:
import instructor
client = instructor.from_openai(AsyncOpenAI())
stream = await client.chat.completions.create_partial(
model=model,
messages=[{"role": "user", "content": prompt}],
response_model=model_class,
max_retries=2,
)
async for partial in stream:
yield partial
async def stream_with_raw_parser(
prompt: str,
model: str = "gpt-4o",
) -> AsyncIterator[dict]:
client = AsyncOpenAI()
schema_prompt = f"""请以JSON格式返回结果。只返回JSON,不要其他内容。
{prompt}"""
stream = await client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": schema_prompt}],
stream=True,
temperature=0.1,
)
parser = StreamingJsonParser()
full_content = ""
async for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
full_content += content
parsed_results = parser.feed(content)
for result in parsed_results:
yield result
if not parsed_results and full_content:
try:
yield json.loads(full_content)
except json.JSONDecodeError:
pass
async def stream_sse_structured(
prompt: str,
model_class: type[BaseModel],
):
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
async def generate():
async for partial in stream_structured_output(prompt, model_class):
data = partial.model_dump_json(exclude_none=True)
yield f"data: {data}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={"X-Accel-Buffering": "no"},
)
流式结构化输出架构
客户端请求
│
▼
FastAPI SSE端点
│
▼
Instructor create_partial()
│
├──→ chunk1: {"name": "产品A"...}
├──→ chunk2: {"name": "产品A", "price": 99...}
├──→ chunk3: {"name": "产品A", "price": 99.0, "category": "电子"...}
└──→ 最终: 完整的Pydantic模型实例
每个chunk通过SSE推送到客户端
客户端渐进式渲染UI
模式6:生产级可靠性保障
将所有模式整合为生产可用的结构化输出服务。
import json
import time
import logging
from typing import Optional
from dataclasses import dataclass, field
from enum import Enum
from pydantic import BaseModel, Field
from openai import AsyncOpenAI
logger = logging.getLogger(__name__)
class OutputStatus(str, Enum):
SUCCESS = "success"
VALIDATION_FAILED = "validation_failed"
PARSE_FAILED = "parse_failed"
LLM_ERROR = "llm_error"
TIMEOUT = "timeout"
RETRY_EXHAUSTED = "retry_exhausted"
@dataclass
class ExtractionResult:
data: Optional[BaseModel] = None
status: OutputStatus = OutputStatus.SUCCESS
attempts: int = 0
latency_ms: float = 0.0
error_message: str = ""
model_used: str = ""
tokens_used: dict = field(default_factory=dict)
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 60.0,
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time: Optional[float] = None
self.is_open = False
def record_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.is_open = True
def record_success(self):
self.failure_count = 0
self.is_open = False
def can_execute(self) -> bool:
if not self.is_open:
return True
if self.last_failure_time and \
time.time() - self.last_failure_time > self.recovery_timeout:
self.is_open = False
self.failure_count = 0
return True
return False
class StructuredOutputService:
def __init__(
self,
max_retries: int = 3,
timeout: float = 30.0,
fallback_models: Optional[list[str]] = None,
):
self.client = AsyncOpenAI()
self.max_retries = max_retries
self.timeout = timeout
self.fallback_models = fallback_models or ["gpt-4o", "gpt-4o-mini"]
self.circuit_breakers: dict[str, CircuitBreaker] = {}
def _get_breaker(self, model: str) -> CircuitBreaker:
if model not in self.circuit_breakers:
self.circuit_breakers[model] = CircuitBreaker()
return self.circuit_breakers[model]
async def extract(
self,
text: str,
model_class: type[BaseModel],
preferred_model: Optional[str] = None,
) -> ExtractionResult:
import instructor
models = [preferred_model] if preferred_model else self.fallback_models
models = [m for m in models if self._get_breaker(m).can_execute()]
if not models:
return ExtractionResult(
status=OutputStatus.RETRY_EXHAUSTED,
error_message="所有模型熔断器已开启",
)
for model in models:
result = await self._try_extract(text, model_class, model)
if result.status == OutputStatus.SUCCESS:
self._get_breaker(model).record_success()
return result
else:
self._get_breaker(model).record_failure()
logger.warning(f"模型{model}提取失败: {result.error_message}")
return result
async def _try_extract(
self,
text: str,
model_class: type[BaseModel],
model: str,
) -> ExtractionResult:
import instructor
start_time = time.time()
client = instructor.from_openai(self.client, mode=instructor.Mode.JSON_SCHEMA)
for attempt in range(1, self.max_retries + 1):
try:
result = await client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": text}],
response_model=model_class,
max_retries=0,
timeout=self.timeout,
)
latency = (time.time() - start_time) * 1000
return ExtractionResult(
data=result,
status=OutputStatus.SUCCESS,
attempts=attempt,
latency_ms=latency,
model_used=model,
)
except instructor.exceptions.InstructorRetryException as e:
logger.warning(f"尝试{attempt}验证失败: {e}")
continue
except Exception as e:
error_msg = str(e)
if "timeout" in error_msg.lower():
return ExtractionResult(
status=OutputStatus.TIMEOUT,
attempts=attempt,
latency_ms=(time.time() - start_time) * 1000,
error_message=error_msg,
model_used=model,
)
logger.error(f"尝试{attempt}LLM错误: {e}")
continue
return ExtractionResult(
status=OutputStatus.RETRY_EXHAUSTED,
attempts=self.max_retries,
latency_ms=(time.time() - start_time) * 1000,
error_message=f"重试{self.max_retries}次后仍失败",
model_used=model,
)
class StructuredOutputCache:
def __init__(self, ttl: float = 3600.0, max_size: int = 1000):
self.ttl = ttl
self.max_size = max_size
self._cache: dict[str, tuple[float, BaseModel]] = {}
def _make_key(self, text: str, model_class: type[BaseModel]) -> str:
import hashlib
content_hash = hashlib.sha256(text.encode()).hexdigest()[:16]
return f"{model_class.__name__}:{content_hash}"
def get(self, text: str, model_class: type[BaseModel]) -> Optional[BaseModel]:
key = self._make_key(text, model_class)
if key in self._cache:
timestamp, data = self._cache[key]
if time.time() - timestamp < self.ttl:
return data
del self._cache[key]
return None
def set(self, text: str, model_class: type[BaseModel], data: BaseModel):
if len(self._cache) >= self.max_size:
oldest_key = min(self._cache, key=lambda k: self._cache[k][0])
del self._cache[oldest_key]
key = self._make_key(text, model_class)
self._cache[key] = (time.time(), data)
生产架构全景
┌────────────────────────────┐
│ StructuredOutputService │
│ (统一入口) │
└──────────┬─────────────────┘
│
┌──────────▼─────────────────┐
│ CircuitBreaker │
│ (模型级熔断) │
└──────────┬─────────────────┘
│
┌────────────────┼────────────────┐
│ │ │
┌─────────▼──────┐ ┌──────▼───────┐ ┌──────▼───────┐
│ gpt-4o │ │ gpt-4o-mini │ │ fallback │
│ JSON_SCHEMA │ │ JSON_SCHEMA │ │ Prompt+Parse│
│ +Instructor │ │ +Instructor │ │ +重试 │
└────────────────┘ └──────────────┘ └──────────────┘
│ │ │
└────────────────┼────────────────┘
│
┌──────────▼─────────────────┐
│ StructuredOutputCache │
│ (结果缓存) │
└────────────────────────────┘
5个常见坑及解决方案
坑1:LLM返回的JSON带注释
import json
import re
def strip_json_comments(text: str) -> str:
text = re.sub(r'//.*?$', '', text, flags=re.MULTILINE)
text = re.sub(r'/\*.*?\*/', '', text, flags=re.DOTALL)
return text
raw = '''{
"name": "产品A", // 这是注释
"price": 99.0
/* 多行
注释 */
}'''
clean = strip_json_comments(raw)
data = json.loads(clean)
坑2:嵌套Schema导致输出截断
from pydantic import BaseModel, Field
class Address(BaseModel):
street: str
city: str
zip_code: str
class PersonFlat(BaseModel):
name: str
street: str = Field(description="街道地址")
city: str = Field(description="城市")
zip_code: str = Field(description="邮编")
class PersonNested(BaseModel):
name: str
address: Address
# 建议:嵌套层级不超过2层,超过则展平
# 不推荐:Person → Address → GeoLocation → Coordinates
# 推荐:PersonFlat(所有字段在同一层级)
坑3:枚举值LLM不遵守
from enum import Enum
from pydantic import BaseModel, Field, field_validator
class Sentiment(str, Enum):
POSITIVE = "positive"
NEGATIVE = "negative"
NEUTRAL = "neutral"
class ReviewWithEnum(BaseModel):
text: str
sentiment: Sentiment
@field_validator("sentiment", mode="before")
@classmethod
def normalize_sentiment(cls, v):
if isinstance(v, str):
v = v.strip().lower()
mapping = {
"积极": "positive", "正面": "positive", "好": "positive",
"消极": "negative", "负面": "negative", "差": "negative",
"中性": "neutral", "一般": "neutral",
}
return mapping.get(v, v)
return v
坑4:列表长度不可控
from pydantic import BaseModel, Field, field_validator
class TaggedContent(BaseModel):
content: str
tags: list[str] = Field(min_length=1, max_length=5)
@field_validator("tags")
@classmethod
def deduplicate_tags(cls, v: list[str]) -> list[str]:
seen = set()
result = []
for tag in v:
normalized = tag.strip().lower()
if normalized not in seen:
seen.add(normalized)
result.append(tag.strip())
return result[:5]
坑5:Strict Mode不支持所有Schema特性
from pydantic import BaseModel, Field
# Strict Mode不支持的特性:
# 1. additionalProperties: false 必须显式设置
# 2. 可选字段必须有default值
# 3. 不支持union类型(部分模型)
# 4. 不支持复杂的正则pattern
# 解决方案:简化Schema + field_validator补偿
class SimpleProduct(BaseModel):
name: str
price: float = Field(gt=0)
category: str = Field(default="other")
tags: list[str] = Field(default_factory=list)
10个常见报错排查
| # | 报错信息 | 原因 | 解决方案 |
|---|---|---|---|
| 1 | json.decoder.JSONDecodeError |
LLM返回的不是合法JSON | 使用extract_json_from_response容错提取 |
| 2 | ValidationError: field required |
LLM遗漏了必填字段 | 添加default值或使用Instructor自动重试 |
| 3 | InstructorRetryException: max retries |
重试3次仍无法通过验证 | 检查Schema是否过于复杂,简化嵌套 |
| 4 | TypeError: 'NoneType' object |
tool_calls为空,LLM未调用函数 | 检查tool_choice设置,确认模型支持函数调用 |
| 5 | RateLimitError: 429 |
API调用频率超限 | 添加指数退避重试,降低并发 |
| 6 | Timeout: request timed out |
LLM推理超时 | 减小Schema复杂度,增加timeout参数 |
| 7 | BadRequestError: Invalid schema |
Schema不符合模型要求 | 检查strict mode限制,简化Schema |
| 8 | ValidationError: string too long |
LLM返回超长字符串 | 添加max_length约束 |
| 9 | KeyError: 'tool_calls' |
模型不支持函数调用 | 切换到JSON Schema模式或Prompt模式 |
| 10 | RecursionError: maximum depth |
Schema嵌套层级过深 | 展平嵌套结构,最多2层 |
进阶优化技巧
技巧1:Few-shot示例提升准确率
from pydantic import BaseModel, Field
from openai import AsyncOpenAI
import instructor
class Classification(BaseModel):
category: str = Field(description="分类")
confidence: float = Field(ge=0.0, le=1.0)
async def few_shot_extract(text: str) -> Classification:
client = instructor.from_openai(AsyncOpenAI())
examples = [
{"role": "user", "content": "这个产品太棒了,强烈推荐!"},
{"role": "assistant", "content": '{"category": "positive", "confidence": 0.95}'},
{"role": "user", "content": "质量一般,价格偏高"},
{"role": "assistant", "content": '{"category": "neutral", "confidence": 0.7}'},
]
return await client.chat.completions.create(
model="gpt-4o",
messages=examples + [{"role": "user", "content": text}],
response_model=Classification,
max_retries=2,
)
技巧2:Schema描述优化
from pydantic import BaseModel, Field
class BadSchema(BaseModel):
type: str
value: str
class GoodSchema(BaseModel):
type: str = Field(
description="实体类型,只能是: person, organization, location, date"
)
value: str = Field(
description="实体的标准化值。person用全名,organization用官方名称,"
"date用YYYY-MM-DD格式,location用城市+国家"
)
技巧3:分步提取复杂结构
from pydantic import BaseModel, Field
from openai import AsyncOpenAI
import instructor
class BasicInfo(BaseModel):
title: str
summary: str
class DetailedInfo(BasicInfo):
key_points: list[str]
entities: list[str]
sentiment: str
async def progressive_extract(text: str) -> DetailedInfo:
client = instructor.from_openai(AsyncOpenAI())
basic = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": f"提取基本信息:\n{text}"}],
response_model=BasicInfo,
max_retries=2,
)
detailed = await client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "user", "content": f"基于以下基本信息,提取详细分析:\n"
f"标题: {basic.title}\n摘要: {basic.summary}\n\n原文:\n{text}"}
],
response_model=DetailedInfo,
max_retries=2,
)
return detailed
技巧4:输出质量自检
from pydantic import BaseModel, Field, model_validator
class SelfValidatingOutput(BaseModel):
question: str
answer: str
sources: list[str] = Field(min_length=1)
confidence: float = Field(ge=0.0, le=1.0)
@model_validator(mode="after")
def check_answer_quality(self):
if len(self.answer) < 10:
raise ValueError("回答太短,可能不完整")
if self.confidence > 0.9 and len(self.sources) < 2:
raise ValueError("高置信度但来源不足,请重新验证")
return self
对比分析:3种结构化输出方案
| 维度 | Prompt+JSON解析 | 函数调用协议 | Instructor库 |
|---|---|---|---|
| 可靠性 | ⭐⭐ 60-80% | ⭐⭐⭐⭐ 90-95% | ⭐⭐⭐⭐⭐ 95-99% |
| 实现复杂度 | 低 | 中 | 低(封装后) |
| 模型兼容性 | 所有模型 | OpenAI/部分模型 | OpenAI/Anthropic/Gemini |
| 自动重试 | ❌需手动 | ❌需手动 | ✅内置 |
| 流式支持 | ❌困难 | ⚠️有限 | ✅create_partial |
| Schema验证 | ❌需手动 | ⚠️部分 | ✅Pydantic自动 |
| 调试难度 | 高 | 中 | 低 |
| 生产推荐度 | ⭐不推荐 | ⭐⭐⭐推荐 | ⭐⭐⭐⭐⭐强烈推荐 |
| Token开销 | 低 | 中(+tool定义) | 中(+tool定义) |
| 嵌套深度 | 无限制 | 有限制 | 有限制 |
选型决策树
是否需要结构化输出?
├── 否 → 直接使用Chat Completion
└── 是 → 使用什么模型?
├── 仅OpenAI → Instructor + Mode.JSON_SCHEMA
├── OpenAI + Anthropic → Instructor + 适配器模式
├── 任意模型 → Prompt+JSON解析 + 严格验证
└── 需要流式 → Instructor + create_partial
在线工具推荐
- JSON格式化验证:/zh-CN/json/format
- JSONPath查询:/zh-CN/json/jsonpath
- cURL转代码:/zh-CN/dev/curl-to-code
总结:Python LLM结构化输出是AI工程的核心基础设施。6种模式从简到繁:JSON Schema约束→函数调用协议→Instructor自动重试→多模型适配→流式结构化输出→生产可靠性保障。生产环境首选Instructor库,配合Pydantic验证和自动重试,可靠率达95%以上。关键注意点:1)嵌套Schema不超过2层,2)枚举值用field_validator归一化,3)熔断器保护下游模型,4)缓存减少重复调用。多模型场景用适配器模式统一接口,流式场景用create_partial渐进式输出。
相关阅读
本站提供浏览器本地工具,免注册即可试用 →