Python LLM结构化输出:从JSON Schema到函数调用的6种生产模式

AI与大数据

LLM输出一坨自由文本,你的下游系统全崩了

你让GPT返回JSON,它给你带注释的JSON;你指定字段类型为整数,它返回字符串"42";你要求列表长度为3,它给你5个。LLM结构化输出是2026年AI工程最核心的基础能力——没有它,你的RAG管道、Agent工具调用、数据提取流水线全是定时炸弹。

本文将从JSON Schema约束出发,带你完成JSON Schema验证→OpenAI函数调用→Instructor自动重试→多模型适配→流式结构化输出→生产可靠性保障的6种生产模式,从概念到落地,一步到位。

核心收获

  • 理解LLM结构化输出的3种核心机制:Prompt约束、JSON Schema、函数调用协议
  • 掌握6种从简单到复杂的Python结构化输出模式
  • 学会Instructor库的自动重试和验证策略
  • 实现跨模型(OpenAI/Anthropic/Gemini)的结构化输出适配
  • 构建生产级可靠性保障体系

目录

  1. LLM结构化输出核心概念
  2. 模式1:JSON Schema约束输出
  3. 模式2:OpenAI函数调用协议
  4. 模式3:Instructor库自动重试
  5. 模式4:多模型结构化输出适配
  6. 模式5:流式结构化输出
  7. 模式6:生产级可靠性保障
  8. 5个常见坑及解决方案
  9. 10个常见报错排查
  10. 进阶优化技巧
  11. 对比分析:3种结构化输出方案
  12. 在线工具推荐

LLM结构化输出核心概念

概念 说明
Structured Output LLM输出符合预定义Schema的结构化数据(JSON/XML)
JSON Schema 描述JSON数据结构的规范,用于约束和验证LLM输出
Function Calling OpenAI提出的协议,让LLM输出符合函数参数Schema的JSON
Tool Use Anthropic/Gemini对函数调用的实现,语义相同
Constrained Decoding 推理时约束token选择,保证输出100%符合Schema
Instructor Python库,基于Pydantic模型自动生成Schema+验证+重试

为什么LLM结构化输出如此重要

传统LLM输出流程:
  用户Prompt → LLM自由生成 → 字符串 → 正则/JSON解析 → 可能失败 → 重试

结构化输出流程:
  用户Prompt + Schema → LLM受约束生成 → 合法JSON → Pydantic验证 → 成功

关键差异:
  1. 传统方式:输出不可预测,解析脆弱,重试成本高
  2. 结构化输出:输出可预测,验证可靠,重试有保障

3种结构化输出机制对比

机制 原理 可靠性 延迟 兼容性
Prompt约束 在提示词中描述输出格式 ⭐低 无额外 所有模型
JSON Schema 通过Schema约束输出结构 ⭐⭐中 轻微 部分模型
函数调用协议 专用API通道+Constrained Decoding ⭐⭐⭐高 轻微 特定模型

模式1:JSON Schema约束输出

最基础的结构化输出方式:在Prompt中描述格式要求,用JSON Schema验证结果。

import json
import re
from typing import Optional
from pydantic import BaseModel, Field, ValidationError


class MovieReview(BaseModel):
    title: str = Field(description="电影名称")
    rating: int = Field(ge=1, le=10, description="评分1-10")
    sentiment: str = Field(pattern="^(positive|negative|neutral)$")
    summary: str = Field(max_length=200, description="简短评价")
    recommended: bool = Field(description="是否推荐")


MOVIE_REVIEW_SCHEMA = MovieReview.model_json_schema()

STRUCTURED_PROMPT = """你是一个专业的电影评论分析器。

请分析以下评论,并严格按照JSON Schema返回结果。

JSON Schema:
{schema}

评论内容:
{review}

重要要求:
1. 必须返回合法JSON
2. rating必须是1-10的整数
3. sentiment只能是positive/negative/neutral
4. 不要添加任何JSON以外的内容
5. 不要用```json```包裹
"""


def extract_json_from_response(text: str) -> Optional[dict]:
    patterns = [
        r'```json\s*(.*?)\s*```',
        r'```\s*(.*?)\s*```',
        r'(\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\})',
    ]
    for pattern in patterns:
        match = re.search(pattern, text, re.DOTALL)
        if match:
            try:
                return json.loads(match.group(1))
            except json.JSONDecodeError:
                continue
    try:
        return json.loads(text.strip())
    except json.JSONDecodeError:
        return None


def parse_structured_output(raw_text: str) -> Optional[MovieReview]:
    parsed_json = extract_json_from_response(raw_text)
    if parsed_json is None:
        return None
    try:
        return MovieReview.model_validate(parsed_json)
    except ValidationError as e:
        print(f"验证失败: {e}")
        return None


async def call_llm_with_schema(prompt: str) -> Optional[MovieReview]:
    from openai import AsyncOpenAI

    client = AsyncOpenAI()
    formatted_prompt = STRUCTURED_PROMPT.format(
        schema=json.dumps(MOVIE_REVIEW_SCHEMA, ensure_ascii=False, indent=2),
        review=prompt
    )

    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": formatted_prompt}],
        temperature=0.1,
    )

    raw_text = response.choices[0].message.content or ""
    return parse_structured_output(raw_text)

JSON Schema验证的局限

问题1:LLM可能返回不合法的JSON
  → 需要extract_json_from_response做容错提取

问题2:LLM可能忽略Schema约束
  → rating返回"9分"而不是9
  → sentiment返回"很积极"而不是positive

问题3:嵌套结构容易出错
  → 列表长度不可控
  → 可选字段可能缺失

问题4:每次都要手写Prompt
  → 维护成本高,容易遗漏

模式2:OpenAI函数调用协议

OpenAI的Function Calling协议是结构化输出的标准方案,通过专用API通道让LLM输出符合Schema的JSON。

import json
from typing import Optional
from pydantic import BaseModel, Field
from openai import AsyncOpenAI


class SentimentAnalysis(BaseModel):
    text: str = Field(description="被分析的文本")
    sentiment: str = Field(description="情感倾向: positive/negative/neutral")
    confidence: float = Field(ge=0.0, le=1.0, description="置信度0-1")
    keywords: list[str] = Field(description="关键词列表")
    language: str = Field(description="检测到的语言")


class EntityExtraction(BaseModel):
    entities: list[dict] = Field(description="提取的实体列表")
    relationships: list[dict] = Field(default_factory=list, description="实体间关系")
    summary: str = Field(description="文本摘要")


def pydantic_to_function_schema(model_class: type[BaseModel]) -> dict:
    schema = model_class.model_json_schema()
    return {
        "type": "function",
        "function": {
            "name": model_class.__name__,
            "description": model_class.__doc__ or f"Extract {model_class.__name__}",
            "parameters": {
                "type": "object",
                "properties": schema.get("properties", {}),
                "required": schema.get("required", []),
            }
        }
    }


async def function_calling_extract(
    text: str,
    model_class: type[BaseModel],
    model: str = "gpt-4o"
) -> Optional[BaseModel]:
    client = AsyncOpenAI()

    function_schema = pydantic_to_function_schema(model_class)

    response = await client.chat.completions.create(
        model=model,
        messages=[
            {
                "role": "system",
                "content": "你是一个精确的数据提取助手。使用提供的函数来结构化输出结果。"
            },
            {
                "role": "user",
                "content": text
            }
        ],
        tools=[function_schema],
        tool_choice={"type": "function", "function": {"name": model_class.__name__}},
    )

    message = response.choices[0].message

    if message.tool_calls and len(message.tool_calls) > 0:
        tool_call = message.tool_calls[0]
        try:
            args = json.loads(tool_call.function.arguments)
            return model_class.model_validate(args)
        except (json.JSONDecodeError, Exception) as e:
            print(f"解析函数调用结果失败: {e}")
            return None

    return None


async def multi_function_calling(
    text: str,
    model_classes: list[type[BaseModel]],
    model: str = "gpt-4o"
) -> dict[str, BaseModel]:
    client = AsyncOpenAI()

    tool_schemas = [pydantic_to_function_schema(cls) for cls in model_classes]

    response = await client.chat.completions.create(
        model=model,
        messages=[
            {"role": "system", "content": "你是一个多任务数据提取助手。"},
            {"role": "user", "content": text}
        ],
        tools=tool_schemas,
        tool_choice="auto",
    )

    results = {}
    message = response.choices[0].message

    if message.tool_calls:
        for tool_call in message.tool_calls:
            for cls in model_classes:
                if tool_call.function.name == cls.__name__:
                    try:
                        args = json.loads(tool_call.function.arguments)
                        results[cls.__name__] = cls.model_validate(args)
                    except Exception as e:
                        print(f"解析{cls.__name__}失败: {e}")

    return results

函数调用协议的Strict Mode

from openai import AsyncOpenAI


async def strict_structured_output(
    text: str,
    model_class: type[BaseModel],
    model: str = "gpt-4o-2024-08-06"
) -> Optional[BaseModel]:
    client = AsyncOpenAI()

    schema = model_class.model_json_schema()

    response = await client.chat.completions.create(
        model=model,
        messages=[
            {"role": "system", "content": "提取结构化数据"},
            {"role": "user", "content": text}
        ],
        response_format={
            "type": "json_schema",
            "json_schema": {
                "name": model_class.__name__,
                "strict": True,
                "schema": schema,
            }
        }
    )

    raw = response.choices[0].message.content
    if raw:
        try:
            return model_class.model_validate(json.loads(raw))
        except Exception as e:
            print(f"Strict mode解析失败: {e}")
    return None

模式3:Instructor库自动重试

Instructor库是Python LLM结构化输出的最佳实践,基于Pydantic模型自动生成Schema、验证输出、自动重试。

import instructor
from pydantic import BaseModel, Field, field_validator
from openai import AsyncOpenAI


class ProductInfo(BaseModel):
    name: str = Field(description="产品名称")
    price: float = Field(gt=0, description="价格,必须大于0")
    category: str = Field(description="产品分类")
    features: list[str] = Field(description="产品特性列表", min_length=1, max_length=5)
    in_stock: bool = Field(description="是否有库存")

    @field_validator("price")
    @classmethod
    def round_price(cls, v: float) -> float:
        return round(v, 2)

    @field_validator("category")
    @classmethod
    def normalize_category(cls, v: str) -> str:
        return v.strip().lower()


class ArticleMetadata(BaseModel):
    title: str = Field(description="文章标题")
    author: str = Field(description="作者")
    publish_date: str = Field(description="发布日期,格式YYYY-MM-DD")
    tags: list[str] = Field(description="标签列表")
    word_count: int = Field(ge=0, description="字数")
    reading_time_minutes: int = Field(ge=1, description="预计阅读时间(分钟)")

    @field_validator("publish_date")
    @classmethod
    def validate_date_format(cls, v: str) -> str:
        import re
        if not re.match(r'^\d{4}-\d{2}-\d{2}$', v):
            raise ValueError(f"日期格式错误: {v},需要YYYY-MM-DD")
        return v


async def instructor_extract_product(text: str) -> ProductInfo:
    client = instructor.from_openai(AsyncOpenAI())

    result = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "user", "content": f"从以下文本中提取产品信息:\n\n{text}"}
        ],
        response_model=ProductInfo,
        max_retries=3,
        temperature=0.1,
    )

    return result


async def instructor_extract_with_mode(
    text: str,
    mode: instructor.Mode = instructor.Mode.TOOLS
) -> ArticleMetadata:
    client = instructor.from_openai(AsyncOpenAI(), mode=mode)

    result = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "user", "content": f"提取文章元数据:\n\n{text}"}
        ],
        response_model=ArticleMetadata,
        max_retries=3,
    )

    return result


async def instructor_partial_streaming(text: str):
    client = instructor.from_openai(AsyncOpenAI())

    article = await client.chat.completions.create_partial(
        model="gpt-4o",
        messages=[
            {"role": "user", "content": f"提取文章元数据:\n\n{text}"}
        ],
        response_model=ArticleMetadata,
        max_retries=3,
    )

    async for partial in article:
        print(f"部分结果: {partial.model_dump_json(exclude_none=True)}")

Instructor的Mode选择

Mode.JSON_SCHEMA    → OpenAI的response_format=json_schema(推荐,最可靠)
Mode.TOOLS          → OpenAI的function calling(兼容性好)
Mode.JSON           → 在Prompt中要求JSON输出(最通用,可靠性最低)
Mode.ANTHROPIC_TOOLS→ Anthropic的tool_use
Mode.GEMINI_JSON    → Gemini的JSON模式

Instructor重试策略详解

import instructor
from pydantic import BaseModel, Field, ValidationError
from openai import AsyncOpenAI
from tenacity import retry, stop_after_attempt, wait_exponential


class StrictUser(BaseModel):
    name: str = Field(min_length=2, max_length=50)
    age: int = Field(ge=0, le=150)
    email: str = Field(pattern=r'^[\w\.-]+@[\w\.-]+\.\w+$')


async def instructor_with_custom_retry(text: str) -> StrictUser:
    client = instructor.from_openai(
        AsyncOpenAI(),
        mode=instructor.Mode.JSON_SCHEMA,
    )

    result, completion = await client.chat.completions.create_with_completion(
        model="gpt-4o",
        messages=[{"role": "user", "content": text}],
        response_model=StrictUser,
        max_retries=3,
        validation_context={"strict": True},
    )

    print(f"Token使用: prompt={completion.usage.prompt_tokens}, "
          f"completion={completion.usage.completion_tokens}")

    return result


async def instructor_batch_extract(
    texts: list[str],
) -> list[StrictUser]:
    client = instructor.from_openai(AsyncOpenAI())

    results = []
    for text in texts:
        try:
            result = await client.chat.completions.create(
                model="gpt-4o",
                messages=[{"role": "user", "content": text}],
                response_model=StrictUser,
                max_retries=2,
            )
            results.append(result)
        except instructor.exceptions.InstructorRetryException as e:
            print(f"批量提取失败,跳过: {e}")
            results.append(None)

    return results

模式4:多模型结构化输出适配

不同LLM厂商的结构化输出API各不相同,需要适配层统一处理。

import json
from abc import ABC, abstractmethod
from typing import Optional, TypeVar
from pydantic import BaseModel
from openai import AsyncOpenAI

T = TypeVar("T", bound=BaseModel)


class StructuredOutputAdapter(ABC):
    @abstractmethod
    async def extract(self, text: str, model_class: type[T]) -> Optional[T]:
        pass


class OpenAIStructuredAdapter(StructuredOutputAdapter):
    def __init__(self, model: str = "gpt-4o"):
        self.client = AsyncOpenAI()
        self.model = model

    async def extract(self, text: str, model_class: type[T]) -> Optional[T]:
        import instructor
        client = instructor.from_openai(self.client, mode=instructor.Mode.JSON_SCHEMA)

        return await client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": text}],
            response_model=model_class,
            max_retries=3,
        )


class AnthropicStructuredAdapter(StructuredOutputAdapter):
    def __init__(self, model: str = "claude-sonnet-4-20250514"):
        try:
            import anthropic
            self.client = anthropic.AsyncAnthropic()
        except ImportError:
            raise ImportError("请安装anthropic: pip install anthropic")
        self.model = model

    async def extract(self, text: str, model_class: type[T]) -> Optional[T]:
        import anthropic
        import instructor

        client = instructor.from_anthropic(
            self.client,
            mode=instructor.Mode.ANTHROPIC_TOOLS,
        )

        return await client.chat.completions.create(
            model=self.model,
            max_tokens=4096,
            messages=[{"role": "user", "content": text}],
            response_model=model_class,
            max_retries=3,
        )


class GeminiStructuredAdapter(StructuredOutputAdapter):
    def __init__(self, model: str = "gemini-2.0-flash"):
        self.model_name = model

    async def extract(self, text: str, model_class: type[T]) -> Optional[T]:
        import google.generativeai as genai
        import os

        genai.configure(api_key=os.environ.get("GEMINI_API_KEY"))
        model = genai.GenerativeModel(self.model_name)

        schema = model_class.model_json_schema()
        prompt = f"""从以下文本中提取结构化数据。
严格按照JSON Schema返回结果,不要添加任何额外内容。

JSON Schema:
{json.dumps(schema, ensure_ascii=False, indent=2)}

文本:
{text}"""

        response = await model.generate_content_async(prompt)
        raw = response.text.strip()

        if raw.startswith("```json"):
            raw = raw[7:]
        if raw.endswith("```"):
            raw = raw[:-3]
        raw = raw.strip()

        try:
            return model_class.model_validate(json.loads(raw))
        except Exception as e:
            print(f"Gemini解析失败: {e}")
            return None


class MultiModelStructuredExtractor:
    def __init__(self):
        self.adapters: dict[str, StructuredOutputAdapter] = {}

    def register(self, name: str, adapter: StructuredOutputAdapter):
        self.adapters[name] = adapter

    async def extract(
        self,
        text: str,
        model_class: type[T],
        preferred: str = "openai",
        fallback: bool = True,
    ) -> Optional[T]:
        order = [preferred]
        if fallback:
            order.extend([k for k in self.adapters if k != preferred])

        for model_name in order:
            adapter = self.adapters.get(model_name)
            if adapter is None:
                continue
            try:
                result = await adapter.extract(text, model_class)
                if result is not None:
                    return result
            except Exception as e:
                print(f"[{model_name}] 提取失败: {e}")
                continue

        return None

    async def extract_consensus(
        self,
        text: str,
        model_class: type[T],
        min_agreement: int = 2,
    ) -> Optional[T]:
        import asyncio

        tasks = {
            name: adapter.extract(text, model_class)
            for name, adapter in self.adapters.items()
        }

        results = await asyncio.gather(*tasks.values(), return_exceptions=True)

        valid_results = []
        for (name, _), result in zip(tasks.items(), results):
            if isinstance(result, Exception):
                print(f"[{name}] 异常: {result}")
                continue
            if result is not None:
                valid_results.append(result)

        if len(valid_results) >= min_agreement:
            return valid_results[0]

        return valid_results[0] if valid_results else None

多模型适配架构

                    ┌─────────────────────┐
                    │  MultiModelExtractor │
                    │  (统一接口)          │
                    └──────────┬──────────┘
                               │
              ┌────────────────┼────────────────┐
              │                │                │
    ┌─────────▼──────┐ ┌──────▼───────┐ ┌──────▼───────┐
    │ OpenAI Adapter │ │Anthropic Adp │ │ Gemini Adptr │
    │ JSON_SCHEMA    │ │ANTHROPIC_TOOLS│ │ Prompt+Parse │
    │ Instructor     │ │ Instructor   │ │ 手动解析      │
    └────────────────┘ └──────────────┘ └──────────────┘

模式5:流式结构化输出

LLM结构化输出结合流式传输,实现实时解析和渐进式展示。

import json
import asyncio
from typing import AsyncIterator, Optional
from pydantic import BaseModel, Field
from openai import AsyncOpenAI


class StreamingJsonParser:
    def __init__(self):
        self.buffer = ""
        self.depth = 0
        self.in_string = False
        self.escape_next = False
        self.started = False

    def feed(self, chunk: str) -> list[dict]:
        self.buffer += chunk
        results = []

        for char in chunk:
            if self.escape_next:
                self.escape_next = False
                continue

            if char == '\\' and self.in_string:
                self.escape_next = True
                continue

            if char == '"' and not self.escape_next:
                self.in_string = not self.in_string
                continue

            if self.in_string:
                continue

            if char == '{':
                if not self.started:
                    self.started = True
                    idx = self.buffer.rfind('{')
                    self.buffer = self.buffer[idx:]
                self.depth += 1
            elif char == '}':
                self.depth -= 1
                if self.depth == 0 and self.started:
                    try:
                        parsed = json.loads(self.buffer)
                        results.append(parsed)
                    except json.JSONDecodeError:
                        pass
                    self.buffer = ""
                    self.started = False

        return results


class PartialModelBuilder:
    def __init__(self, model_class: type[BaseModel]):
        self.model_class = model_class
        self.current_json = {}
        self.last_valid = None

    def update(self, json_data: dict) -> Optional[BaseModel]:
        self.current_json.update(json_data)
        try:
            self.last_valid = self.model_class.model_validate(self.current_json)
            return self.last_valid
        except Exception:
            return self.last_valid


class StreamingEvent(BaseModel):
    event_type: str = Field(description="事件类型")
    data: dict = Field(description="事件数据")
    confidence: float = Field(ge=0.0, le=1.0, description="置信度")


async def stream_structured_output(
    prompt: str,
    model_class: type[BaseModel],
    model: str = "gpt-4o",
) -> AsyncIterator[BaseModel]:
    import instructor

    client = instructor.from_openai(AsyncOpenAI())

    stream = await client.chat.completions.create_partial(
        model=model,
        messages=[{"role": "user", "content": prompt}],
        response_model=model_class,
        max_retries=2,
    )

    async for partial in stream:
        yield partial


async def stream_with_raw_parser(
    prompt: str,
    model: str = "gpt-4o",
) -> AsyncIterator[dict]:
    client = AsyncOpenAI()

    schema_prompt = f"""请以JSON格式返回结果。只返回JSON,不要其他内容。
{prompt}"""

    stream = await client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": schema_prompt}],
        stream=True,
        temperature=0.1,
    )

    parser = StreamingJsonParser()
    full_content = ""

    async for chunk in stream:
        if chunk.choices and chunk.choices[0].delta.content:
            content = chunk.choices[0].delta.content
            full_content += content
            parsed_results = parser.feed(content)
            for result in parsed_results:
                yield result

    if not parsed_results and full_content:
        try:
            yield json.loads(full_content)
        except json.JSONDecodeError:
            pass


async def stream_sse_structured(
    prompt: str,
    model_class: type[BaseModel],
):
    from fastapi import FastAPI
    from fastapi.responses import StreamingResponse

    app = FastAPI()

    async def generate():
        async for partial in stream_structured_output(prompt, model_class):
            data = partial.model_dump_json(exclude_none=True)
            yield f"data: {data}\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={"X-Accel-Buffering": "no"},
    )

流式结构化输出架构

客户端请求
    │
    ▼
FastAPI SSE端点
    │
    ▼
Instructor create_partial()
    │
    ├──→ chunk1: {"name": "产品A"...}
    ├──→ chunk2: {"name": "产品A", "price": 99...}
    ├──→ chunk3: {"name": "产品A", "price": 99.0, "category": "电子"...}
    └──→ 最终: 完整的Pydantic模型实例

每个chunk通过SSE推送到客户端
客户端渐进式渲染UI

模式6:生产级可靠性保障

将所有模式整合为生产可用的结构化输出服务。

import json
import time
import logging
from typing import Optional
from dataclasses import dataclass, field
from enum import Enum
from pydantic import BaseModel, Field
from openai import AsyncOpenAI

logger = logging.getLogger(__name__)


class OutputStatus(str, Enum):
    SUCCESS = "success"
    VALIDATION_FAILED = "validation_failed"
    PARSE_FAILED = "parse_failed"
    LLM_ERROR = "llm_error"
    TIMEOUT = "timeout"
    RETRY_EXHAUSTED = "retry_exhausted"


@dataclass
class ExtractionResult:
    data: Optional[BaseModel] = None
    status: OutputStatus = OutputStatus.SUCCESS
    attempts: int = 0
    latency_ms: float = 0.0
    error_message: str = ""
    model_used: str = ""
    tokens_used: dict = field(default_factory=dict)


class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 60.0,
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time: Optional[float] = None
        self.is_open = False

    def record_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.is_open = True

    def record_success(self):
        self.failure_count = 0
        self.is_open = False

    def can_execute(self) -> bool:
        if not self.is_open:
            return True
        if self.last_failure_time and \
           time.time() - self.last_failure_time > self.recovery_timeout:
            self.is_open = False
            self.failure_count = 0
            return True
        return False


class StructuredOutputService:
    def __init__(
        self,
        max_retries: int = 3,
        timeout: float = 30.0,
        fallback_models: Optional[list[str]] = None,
    ):
        self.client = AsyncOpenAI()
        self.max_retries = max_retries
        self.timeout = timeout
        self.fallback_models = fallback_models or ["gpt-4o", "gpt-4o-mini"]
        self.circuit_breakers: dict[str, CircuitBreaker] = {}

    def _get_breaker(self, model: str) -> CircuitBreaker:
        if model not in self.circuit_breakers:
            self.circuit_breakers[model] = CircuitBreaker()
        return self.circuit_breakers[model]

    async def extract(
        self,
        text: str,
        model_class: type[BaseModel],
        preferred_model: Optional[str] = None,
    ) -> ExtractionResult:
        import instructor

        models = [preferred_model] if preferred_model else self.fallback_models
        models = [m for m in models if self._get_breaker(m).can_execute()]

        if not models:
            return ExtractionResult(
                status=OutputStatus.RETRY_EXHAUSTED,
                error_message="所有模型熔断器已开启",
            )

        for model in models:
            result = await self._try_extract(text, model_class, model)
            if result.status == OutputStatus.SUCCESS:
                self._get_breaker(model).record_success()
                return result
            else:
                self._get_breaker(model).record_failure()
                logger.warning(f"模型{model}提取失败: {result.error_message}")

        return result

    async def _try_extract(
        self,
        text: str,
        model_class: type[BaseModel],
        model: str,
    ) -> ExtractionResult:
        import instructor

        start_time = time.time()
        client = instructor.from_openai(self.client, mode=instructor.Mode.JSON_SCHEMA)

        for attempt in range(1, self.max_retries + 1):
            try:
                result = await client.chat.completions.create(
                    model=model,
                    messages=[{"role": "user", "content": text}],
                    response_model=model_class,
                    max_retries=0,
                    timeout=self.timeout,
                )

                latency = (time.time() - start_time) * 1000
                return ExtractionResult(
                    data=result,
                    status=OutputStatus.SUCCESS,
                    attempts=attempt,
                    latency_ms=latency,
                    model_used=model,
                )

            except instructor.exceptions.InstructorRetryException as e:
                logger.warning(f"尝试{attempt}验证失败: {e}")
                continue

            except Exception as e:
                error_msg = str(e)
                if "timeout" in error_msg.lower():
                    return ExtractionResult(
                        status=OutputStatus.TIMEOUT,
                        attempts=attempt,
                        latency_ms=(time.time() - start_time) * 1000,
                        error_message=error_msg,
                        model_used=model,
                    )
                logger.error(f"尝试{attempt}LLM错误: {e}")
                continue

        return ExtractionResult(
            status=OutputStatus.RETRY_EXHAUSTED,
            attempts=self.max_retries,
            latency_ms=(time.time() - start_time) * 1000,
            error_message=f"重试{self.max_retries}次后仍失败",
            model_used=model,
        )


class StructuredOutputCache:
    def __init__(self, ttl: float = 3600.0, max_size: int = 1000):
        self.ttl = ttl
        self.max_size = max_size
        self._cache: dict[str, tuple[float, BaseModel]] = {}

    def _make_key(self, text: str, model_class: type[BaseModel]) -> str:
        import hashlib
        content_hash = hashlib.sha256(text.encode()).hexdigest()[:16]
        return f"{model_class.__name__}:{content_hash}"

    def get(self, text: str, model_class: type[BaseModel]) -> Optional[BaseModel]:
        key = self._make_key(text, model_class)
        if key in self._cache:
            timestamp, data = self._cache[key]
            if time.time() - timestamp < self.ttl:
                return data
            del self._cache[key]
        return None

    def set(self, text: str, model_class: type[BaseModel], data: BaseModel):
        if len(self._cache) >= self.max_size:
            oldest_key = min(self._cache, key=lambda k: self._cache[k][0])
            del self._cache[oldest_key]

        key = self._make_key(text, model_class)
        self._cache[key] = (time.time(), data)

生产架构全景

                    ┌────────────────────────────┐
                    │  StructuredOutputService    │
                    │  (统一入口)                 │
                    └──────────┬─────────────────┘
                               │
                    ┌──────────▼─────────────────┐
                    │  CircuitBreaker             │
                    │  (模型级熔断)               │
                    └──────────┬─────────────────┘
                               │
              ┌────────────────┼────────────────┐
              │                │                │
    ┌─────────▼──────┐ ┌──────▼───────┐ ┌──────▼───────┐
    │  gpt-4o        │ │  gpt-4o-mini │ │  fallback    │
    │  JSON_SCHEMA   │ │  JSON_SCHEMA │ │  Prompt+Parse│
    │  +Instructor   │ │  +Instructor │ │  +重试       │
    └────────────────┘ └──────────────┘ └──────────────┘
              │                │                │
              └────────────────┼────────────────┘
                               │
                    ┌──────────▼─────────────────┐
                    │  StructuredOutputCache      │
                    │  (结果缓存)                 │
                    └────────────────────────────┘

5个常见坑及解决方案

坑1:LLM返回的JSON带注释

import json
import re


def strip_json_comments(text: str) -> str:
    text = re.sub(r'//.*?$', '', text, flags=re.MULTILINE)
    text = re.sub(r'/\*.*?\*/', '', text, flags=re.DOTALL)
    return text


raw = '''{
    "name": "产品A",  // 这是注释
    "price": 99.0
    /* 多行
       注释 */
}'''

clean = strip_json_comments(raw)
data = json.loads(clean)

坑2:嵌套Schema导致输出截断

from pydantic import BaseModel, Field


class Address(BaseModel):
    street: str
    city: str
    zip_code: str


class PersonFlat(BaseModel):
    name: str
    street: str = Field(description="街道地址")
    city: str = Field(description="城市")
    zip_code: str = Field(description="邮编")


class PersonNested(BaseModel):
    name: str
    address: Address


# 建议:嵌套层级不超过2层,超过则展平
# 不推荐:Person → Address → GeoLocation → Coordinates
# 推荐:PersonFlat(所有字段在同一层级)

坑3:枚举值LLM不遵守

from enum import Enum
from pydantic import BaseModel, Field, field_validator


class Sentiment(str, Enum):
    POSITIVE = "positive"
    NEGATIVE = "negative"
    NEUTRAL = "neutral"


class ReviewWithEnum(BaseModel):
    text: str
    sentiment: Sentiment

    @field_validator("sentiment", mode="before")
    @classmethod
    def normalize_sentiment(cls, v):
        if isinstance(v, str):
            v = v.strip().lower()
            mapping = {
                "积极": "positive", "正面": "positive", "好": "positive",
                "消极": "negative", "负面": "negative", "差": "negative",
                "中性": "neutral", "一般": "neutral",
            }
            return mapping.get(v, v)
        return v

坑4:列表长度不可控

from pydantic import BaseModel, Field, field_validator


class TaggedContent(BaseModel):
    content: str
    tags: list[str] = Field(min_length=1, max_length=5)

    @field_validator("tags")
    @classmethod
    def deduplicate_tags(cls, v: list[str]) -> list[str]:
        seen = set()
        result = []
        for tag in v:
            normalized = tag.strip().lower()
            if normalized not in seen:
                seen.add(normalized)
                result.append(tag.strip())
        return result[:5]

坑5:Strict Mode不支持所有Schema特性

from pydantic import BaseModel, Field


# Strict Mode不支持的特性:
# 1. additionalProperties: false 必须显式设置
# 2. 可选字段必须有default值
# 3. 不支持union类型(部分模型)
# 4. 不支持复杂的正则pattern

# 解决方案:简化Schema + field_validator补偿

class SimpleProduct(BaseModel):
    name: str
    price: float = Field(gt=0)
    category: str = Field(default="other")
    tags: list[str] = Field(default_factory=list)

10个常见报错排查

# 报错信息 原因 解决方案
1 json.decoder.JSONDecodeError LLM返回的不是合法JSON 使用extract_json_from_response容错提取
2 ValidationError: field required LLM遗漏了必填字段 添加default值或使用Instructor自动重试
3 InstructorRetryException: max retries 重试3次仍无法通过验证 检查Schema是否过于复杂,简化嵌套
4 TypeError: 'NoneType' object tool_calls为空,LLM未调用函数 检查tool_choice设置,确认模型支持函数调用
5 RateLimitError: 429 API调用频率超限 添加指数退避重试,降低并发
6 Timeout: request timed out LLM推理超时 减小Schema复杂度,增加timeout参数
7 BadRequestError: Invalid schema Schema不符合模型要求 检查strict mode限制,简化Schema
8 ValidationError: string too long LLM返回超长字符串 添加max_length约束
9 KeyError: 'tool_calls' 模型不支持函数调用 切换到JSON Schema模式或Prompt模式
10 RecursionError: maximum depth Schema嵌套层级过深 展平嵌套结构,最多2层

进阶优化技巧

技巧1:Few-shot示例提升准确率

from pydantic import BaseModel, Field
from openai import AsyncOpenAI
import instructor


class Classification(BaseModel):
    category: str = Field(description="分类")
    confidence: float = Field(ge=0.0, le=1.0)


async def few_shot_extract(text: str) -> Classification:
    client = instructor.from_openai(AsyncOpenAI())

    examples = [
        {"role": "user", "content": "这个产品太棒了,强烈推荐!"},
        {"role": "assistant", "content": '{"category": "positive", "confidence": 0.95}'},
        {"role": "user", "content": "质量一般,价格偏高"},
        {"role": "assistant", "content": '{"category": "neutral", "confidence": 0.7}'},
    ]

    return await client.chat.completions.create(
        model="gpt-4o",
        messages=examples + [{"role": "user", "content": text}],
        response_model=Classification,
        max_retries=2,
    )

技巧2:Schema描述优化

from pydantic import BaseModel, Field


class BadSchema(BaseModel):
    type: str
    value: str


class GoodSchema(BaseModel):
    type: str = Field(
        description="实体类型,只能是: person, organization, location, date"
    )
    value: str = Field(
        description="实体的标准化值。person用全名,organization用官方名称,"
                    "date用YYYY-MM-DD格式,location用城市+国家"
    )

技巧3:分步提取复杂结构

from pydantic import BaseModel, Field
from openai import AsyncOpenAI
import instructor


class BasicInfo(BaseModel):
    title: str
    summary: str


class DetailedInfo(BasicInfo):
    key_points: list[str]
    entities: list[str]
    sentiment: str


async def progressive_extract(text: str) -> DetailedInfo:
    client = instructor.from_openai(AsyncOpenAI())

    basic = await client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": f"提取基本信息:\n{text}"}],
        response_model=BasicInfo,
        max_retries=2,
    )

    detailed = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "user", "content": f"基于以下基本信息,提取详细分析:\n"
                                        f"标题: {basic.title}\n摘要: {basic.summary}\n\n原文:\n{text}"}
        ],
        response_model=DetailedInfo,
        max_retries=2,
    )

    return detailed

技巧4:输出质量自检

from pydantic import BaseModel, Field, model_validator


class SelfValidatingOutput(BaseModel):
    question: str
    answer: str
    sources: list[str] = Field(min_length=1)
    confidence: float = Field(ge=0.0, le=1.0)

    @model_validator(mode="after")
    def check_answer_quality(self):
        if len(self.answer) < 10:
            raise ValueError("回答太短,可能不完整")
        if self.confidence > 0.9 and len(self.sources) < 2:
            raise ValueError("高置信度但来源不足,请重新验证")
        return self

对比分析:3种结构化输出方案

维度 Prompt+JSON解析 函数调用协议 Instructor库
可靠性 ⭐⭐ 60-80% ⭐⭐⭐⭐ 90-95% ⭐⭐⭐⭐⭐ 95-99%
实现复杂度 低(封装后)
模型兼容性 所有模型 OpenAI/部分模型 OpenAI/Anthropic/Gemini
自动重试 ❌需手动 ❌需手动 ✅内置
流式支持 ❌困难 ⚠️有限 ✅create_partial
Schema验证 ❌需手动 ⚠️部分 ✅Pydantic自动
调试难度
生产推荐度 ⭐不推荐 ⭐⭐⭐推荐 ⭐⭐⭐⭐⭐强烈推荐
Token开销 中(+tool定义) 中(+tool定义)
嵌套深度 无限制 有限制 有限制

选型决策树

是否需要结构化输出?
  ├── 否 → 直接使用Chat Completion
  └── 是 → 使用什么模型?
       ├── 仅OpenAI → Instructor + Mode.JSON_SCHEMA
       ├── OpenAI + Anthropic → Instructor + 适配器模式
       ├── 任意模型 → Prompt+JSON解析 + 严格验证
       └── 需要流式 → Instructor + create_partial

在线工具推荐


总结:Python LLM结构化输出是AI工程的核心基础设施。6种模式从简到繁:JSON Schema约束→函数调用协议→Instructor自动重试→多模型适配→流式结构化输出→生产可靠性保障。生产环境首选Instructor库,配合Pydantic验证和自动重试,可靠率达95%以上。关键注意点:1)嵌套Schema不超过2层,2)枚举值用field_validator归一化,3)熔断器保护下游模型,4)缓存减少重复调用。多模型场景用适配器模式统一接口,流式场景用create_partial渐进式输出。


相关阅读

本站提供浏览器本地工具,免注册即可试用 →

#Python#LLM#结构化输出#JSON Schema#函数调用#Instructor#2026#AI与大数据