Python LLM構造化出力:JSON SchemaからFunction Callingまで6つのプロダクションパターン

AI与大数据

LLMが自由テキストを出力し、ダウンストリームシステムが全崩壊

GPTにJSONを返させると、コメント付きJSONが返ってくる。整数フィールドを指定すると、文字列"42"が返ってくる。リスト長3を要求すると、5個返ってくる。LLM構造化出力は2026年のAIエンジニアリングで最も重要な基盤能力——これなしでは、RAGパイプライン、エージェントツール呼び出し、データ抽出ワークフローはすべて時限爆弾です。

本記事はJSON Schema制約から出発し、JSON Schema検証→OpenAI Function Calling→Instructor自動リトライ→マルチモデル適応→ストリーミング構造化出力→プロダクション信頼性保障の6つのプロダクションパターンを完全ガイドします。

主要な収穫

  • LLM構造化出力の3つのコアメカニズムを理解:Prompt制約、JSON Schema、Function Callingプロトコル
  • シンプルから複雑まで6つのPython構造化出力パターンを習得
  • Instructorライブラリの自動リトライと検証戦略を学習
  • クロスモデル(OpenAI/Anthropic/Gemini)構造化出力適応を実装
  • プロダクション級信頼性保障システムを構築

目次

  1. LLM構造化出力コア概念
  2. パターン1:JSON Schema制約出力
  3. パターン2:OpenAI Function Callingプロトコル
  4. パターン3:Instructorライブラリ自動リトライ
  5. パターン4:マルチモデル構造化出力適応
  6. パターン5:ストリーミング構造化出力
  7. パターン6:プロダクション級信頼性保障
  8. 5つのよくある落とし穴と解決策
  9. 10のよくあるエラートラブルシューティング
  10. 高度な最適化テクニック
  11. 比較分析:3つの構造化出力アプローチ
  12. オンラインツール推奨

LLM構造化出力コア概念

概念 説明
Structured Output LLMが事前定義Schemaに準拠した構造化データ(JSON/XML)を出力
JSON Schema JSONデータ構造を記述する仕様、LLM出力の制約と検証に使用
Function Calling OpenAIが提案したプロトコル、LLMが関数パラメータSchemaに準拠したJSONを出力
Tool Use Anthropic/GeminiのFunction Calling実装、意味は同じ
Constrained Decoding 推論時のトークン選択を制約、100% Schema準拠を保証
Instructor Pythonライブラリ、Pydanticモデルに基づきSchema+検証+リトライを自動生成

なぜLLM構造化出力が重要なのか

従来のLLM出力フロー:
  ユーザーPrompt → LLM自由生成 → 文字列 → 正規表現/JSON解析 → 失敗の可能性 → リトライ

構造化出力フロー:
  ユーザーPrompt + Schema → LLM制約付き生成 → 有効なJSON → Pydantic検証 → 成功

主な違い:
  1. 従来方式:出力が予測不可能、解析が脆弱、リトライコストが高い
  2. 構造化出力:出力が予測可能、検証が信頼性あり、リトライが保証される

3つの構造化出力メカニズム比較

メカニズム 原理 信頼性 レイテンシ 互換性
Prompt制約 プロンプトで出力形式を記述 ⭐低 追加なし 全モデル
JSON Schema Schemaで出力構造を制約 ⭐⭐中 わずか 一部モデル
Function Calling 専用APIチャネル+Constrained Decoding ⭐⭐⭐高 わずか 特定モデル

パターン1:JSON Schema制約出力

最も基本的な構造化出力アプローチ:Promptでフォーマット要件を記述し、JSON Schemaで結果を検証。

import json
import re
from typing import Optional
from pydantic import BaseModel, Field, ValidationError


class MovieReview(BaseModel):
    title: str = Field(description="映画タイトル")
    rating: int = Field(ge=1, le=10, description="評価1-10")
    sentiment: str = Field(pattern="^(positive|negative|neutral)$")
    summary: str = Field(max_length=200, description="短いレビュー")
    recommended: bool = Field(description="おすすめかどうか")


MOVIE_REVIEW_SCHEMA = MovieReview.model_json_schema()

STRUCTURED_PROMPT = """あなたはプロの映画レビュー分析器です。

以下のレビューを分析し、JSON Schemaに厳密に従って結果を返してください。

JSON Schema:
{schema}

レビュー内容:
{review}

重要な要件:
1. 有効なJSONを返す必要があります
2. ratingは1-10の整数である必要があります
3. sentimentはpositive/negative/neutralのみ
4. JSON以外のコンテンツを追加しないでください
5. ```json```でラップしないでください
"""


def extract_json_from_response(text: str) -> Optional[dict]:
    patterns = [
        r'```json\s*(.*?)\s*```',
        r'```\s*(.*?)\s*```',
        r'(\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\})',
    ]
    for pattern in patterns:
        match = re.search(pattern, text, re.DOTALL)
        if match:
            try:
                return json.loads(match.group(1))
            except json.JSONDecodeError:
                continue
    try:
        return json.loads(text.strip())
    except json.JSONDecodeError:
        return None


def parse_structured_output(raw_text: str) -> Optional[MovieReview]:
    parsed_json = extract_json_from_response(raw_text)
    if parsed_json is None:
        return None
    try:
        return MovieReview.model_validate(parsed_json)
    except ValidationError as e:
        print(f"検証失敗: {e}")
        return None


async def call_llm_with_schema(prompt: str) -> Optional[MovieReview]:
    from openai import AsyncOpenAI

    client = AsyncOpenAI()
    formatted_prompt = STRUCTURED_PROMPT.format(
        schema=json.dumps(MOVIE_REVIEW_SCHEMA, ensure_ascii=False, indent=2),
        review=prompt
    )

    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": formatted_prompt}],
        temperature=0.1,
    )

    raw_text = response.choices[0].message.content or ""
    return parse_structured_output(raw_text)

JSON Schema検証の限界

問題1:LLMが無効なJSONを返す可能性
  → extract_json_from_responseでフォールトトレラント抽出が必要

問題2:LLMがSchema制約を無視する可能性
  → ratingが"9点"ではなく9を返す
  → sentimentが"とてもポジティブ"ではなくpositiveを返す

問題3:ネスト構造がエラーになりやすい
  → リスト長が制御不能
  → オプションフィールドが欠落する可能性

問題4:毎回Promptを手書きする必要がある
  → メンテナンスコストが高く、見落としがち

パターン2:OpenAI Function Callingプロトコル

OpenAIのFunction Callingプロトコルは構造化出力の標準アプローチで、専用APIチャネルでLLMがSchemaに準拠したJSONを出力。

import json
from typing import Optional
from pydantic import BaseModel, Field
from openai import AsyncOpenAI


class SentimentAnalysis(BaseModel):
    text: str = Field(description="分析対象テキスト")
    sentiment: str = Field(description="感情: positive/negative/neutral")
    confidence: float = Field(ge=0.0, le=1.0, description="信頼度0-1")
    keywords: list[str] = Field(description="キーワードリスト")
    language: str = Field(description="検出言語")


class EntityExtraction(BaseModel):
    entities: list[dict] = Field(description="抽出エンティティリスト")
    relationships: list[dict] = Field(default_factory=list, description="エンティティ間関係")
    summary: str = Field(description="テキスト要約")


def pydantic_to_function_schema(model_class: type[BaseModel]) -> dict:
    schema = model_class.model_json_schema()
    return {
        "type": "function",
        "function": {
            "name": model_class.__name__,
            "description": model_class.__doc__ or f"Extract {model_class.__name__}",
            "parameters": {
                "type": "object",
                "properties": schema.get("properties", {}),
                "required": schema.get("required", []),
            }
        }
    }


async def function_calling_extract(
    text: str,
    model_class: type[BaseModel],
    model: str = "gpt-4o"
) -> Optional[BaseModel]:
    client = AsyncOpenAI()

    function_schema = pydantic_to_function_schema(model_class)

    response = await client.chat.completions.create(
        model=model,
        messages=[
            {
                "role": "system",
                "content": "あなたは正確なデータ抽出アシスタントです。提供された関数で構造化出力してください。"
            },
            {
                "role": "user",
                "content": text
            }
        ],
        tools=[function_schema],
        tool_choice={"type": "function", "function": {"name": model_class.__name__}},
    )

    message = response.choices[0].message

    if message.tool_calls and len(message.tool_calls) > 0:
        tool_call = message.tool_calls[0]
        try:
            args = json.loads(tool_call.function.arguments)
            return model_class.model_validate(args)
        except (json.JSONDecodeError, Exception) as e:
            print(f"関数呼び出し結果の解析に失敗: {e}")
            return None

    return None


async def multi_function_calling(
    text: str,
    model_classes: list[type[BaseModel]],
    model: str = "gpt-4o"
) -> dict[str, BaseModel]:
    client = AsyncOpenAI()

    tool_schemas = [pydantic_to_function_schema(cls) for cls in model_classes]

    response = await client.chat.completions.create(
        model=model,
        messages=[
            {"role": "system", "content": "あなたはマルチタスクデータ抽出アシスタントです。"},
            {"role": "user", "content": text}
        ],
        tools=tool_schemas,
        tool_choice="auto",
    )

    results = {}
    message = response.choices[0].message

    if message.tool_calls:
        for tool_call in message.tool_calls:
            for cls in model_classes:
                if tool_call.function.name == cls.__name__:
                    try:
                        args = json.loads(tool_call.function.arguments)
                        results[cls.__name__] = cls.model_validate(args)
                    except Exception as e:
                        print(f"{cls.__name__}の解析に失敗: {e}")

    return results

Function Calling Strict Mode

from openai import AsyncOpenAI


async def strict_structured_output(
    text: str,
    model_class: type[BaseModel],
    model: str = "gpt-4o-2024-08-06"
) -> Optional[BaseModel]:
    client = AsyncOpenAI()

    schema = model_class.model_json_schema()

    response = await client.chat.completions.create(
        model=model,
        messages=[
            {"role": "system", "content": "構造化データを抽出"},
            {"role": "user", "content": text}
        ],
        response_format={
            "type": "json_schema",
            "json_schema": {
                "name": model_class.__name__,
                "strict": True,
                "schema": schema,
            }
        }
    )

    raw = response.choices[0].message.content
    if raw:
        try:
            return model_class.model_validate(json.loads(raw))
        except Exception as e:
            print(f"Strict mode解析失敗: {e}")
    return None

パターン3:Instructorライブラリ自動リトライ

InstructorはPython LLM構造化出力のベストプラクティスで、Pydanticモデルに基づきSchema自動生成、出力検証、自動リトライを行います。

import instructor
from pydantic import BaseModel, Field, field_validator
from openai import AsyncOpenAI


class ProductInfo(BaseModel):
    name: str = Field(description="製品名")
    price: float = Field(gt=0, description="価格、0より大きい必要がある")
    category: str = Field(description="製品カテゴリ")
    features: list[str] = Field(description="製品機能リスト", min_length=1, max_length=5)
    in_stock: bool = Field(description="在庫ありかどうか")

    @field_validator("price")
    @classmethod
    def round_price(cls, v: float) -> float:
        return round(v, 2)

    @field_validator("category")
    @classmethod
    def normalize_category(cls, v: str) -> str:
        return v.strip().lower()


class ArticleMetadata(BaseModel):
    title: str = Field(description="記事タイトル")
    author: str = Field(description="著者")
    publish_date: str = Field(description="公開日、YYYY-MM-DD形式")
    tags: list[str] = Field(description="タグリスト")
    word_count: int = Field(ge=0, description="文字数")
    reading_time_minutes: int = Field(ge=1, description="推定読了時間(分)")

    @field_validator("publish_date")
    @classmethod
    def validate_date_format(cls, v: str) -> str:
        import re
        if not re.match(r'^\d{4}-\d{2}-\d{2}$', v):
            raise ValueError(f"日付形式エラー: {v}、YYYY-MM-DDが必要")
        return v


async def instructor_extract_product(text: str) -> ProductInfo:
    client = instructor.from_openai(AsyncOpenAI())

    result = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "user", "content": f"以下のテキストから製品情報を抽出:\n\n{text}"}
        ],
        response_model=ProductInfo,
        max_retries=3,
        temperature=0.1,
    )

    return result


async def instructor_extract_with_mode(
    text: str,
    mode: instructor.Mode = instructor.Mode.TOOLS
) -> ArticleMetadata:
    client = instructor.from_openai(AsyncOpenAI(), mode=mode)

    result = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "user", "content": f"記事メタデータを抽出:\n\n{text}"}
        ],
        response_model=ArticleMetadata,
        max_retries=3,
    )

    return result


async def instructor_partial_streaming(text: str):
    client = instructor.from_openai(AsyncOpenAI())

    article = await client.chat.completions.create_partial(
        model="gpt-4o",
        messages=[
            {"role": "user", "content": f"記事メタデータを抽出:\n\n{text}"}
        ],
        response_model=ArticleMetadata,
        max_retries=3,
    )

    async for partial in article:
        print(f"部分結果: {partial.model_dump_json(exclude_none=True)}")

Instructor Mode選択

Mode.JSON_SCHEMA    → OpenAIのresponse_format=json_schema(推奨、最も信頼性が高い)
Mode.TOOLS          → OpenAIのfunction calling(互換性が良い)
Mode.JSON           → PromptでJSON出力を要求(最も汎用的、信頼性は最低)
Mode.ANTHROPIC_TOOLS→ Anthropicのtool_use
Mode.GEMINI_JSON    → GeminiのJSONモード

Instructorリトライ戦略詳細

import instructor
from pydantic import BaseModel, Field, ValidationError
from openai import AsyncOpenAI
from tenacity import retry, stop_after_attempt, wait_exponential


class StrictUser(BaseModel):
    name: str = Field(min_length=2, max_length=50)
    age: int = Field(ge=0, le=150)
    email: str = Field(pattern=r'^[\w\.-]+@[\w\.-]+\.\w+$')


async def instructor_with_custom_retry(text: str) -> StrictUser:
    client = instructor.from_openai(
        AsyncOpenAI(),
        mode=instructor.Mode.JSON_SCHEMA,
    )

    result, completion = await client.chat.completions.create_with_completion(
        model="gpt-4o",
        messages=[{"role": "user", "content": text}],
        response_model=StrictUser,
        max_retries=3,
        validation_context={"strict": True},
    )

    print(f"Token使用量: prompt={completion.usage.prompt_tokens}, "
          f"completion={completion.usage.completion_tokens}")

    return result


async def instructor_batch_extract(
    texts: list[str],
) -> list[StrictUser]:
    client = instructor.from_openai(AsyncOpenAI())

    results = []
    for text in texts:
        try:
            result = await client.chat.completions.create(
                model="gpt-4o",
                messages=[{"role": "user", "content": text}],
                response_model=StrictUser,
                max_retries=2,
            )
            results.append(result)
        except instructor.exceptions.InstructorRetryException as e:
            print(f"バッチ抽出失敗、スキップ: {e}")
            results.append(None)

    return results

パターン4:マルチモデル構造化出力適応

異なるLLMベンダーの構造化出力APIはそれぞれ異なり、適応レイヤーで統一処理が必要です。

import json
from abc import ABC, abstractmethod
from typing import Optional, TypeVar
from pydantic import BaseModel
from openai import AsyncOpenAI

T = TypeVar("T", bound=BaseModel)


class StructuredOutputAdapter(ABC):
    @abstractmethod
    async def extract(self, text: str, model_class: type[T]) -> Optional[T]:
        pass


class OpenAIStructuredAdapter(StructuredOutputAdapter):
    def __init__(self, model: str = "gpt-4o"):
        self.client = AsyncOpenAI()
        self.model = model

    async def extract(self, text: str, model_class: type[T]) -> Optional[T]:
        import instructor
        client = instructor.from_openai(self.client, mode=instructor.Mode.JSON_SCHEMA)

        return await client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": text}],
            response_model=model_class,
            max_retries=3,
        )


class AnthropicStructuredAdapter(StructuredOutputAdapter):
    def __init__(self, model: str = "claude-sonnet-4-20250514"):
        try:
            import anthropic
            self.client = anthropic.AsyncAnthropic()
        except ImportError:
            raise ImportError("anthropicをインストール: pip install anthropic")
        self.model = model

    async def extract(self, text: str, model_class: type[T]) -> Optional[T]:
        import anthropic
        import instructor

        client = instructor.from_anthropic(
            self.client,
            mode=instructor.Mode.ANTHROPIC_TOOLS,
        )

        return await client.chat.completions.create(
            model=self.model,
            max_tokens=4096,
            messages=[{"role": "user", "content": text}],
            response_model=model_class,
            max_retries=3,
        )


class GeminiStructuredAdapter(StructuredOutputAdapter):
    def __init__(self, model: str = "gemini-2.0-flash"):
        self.model_name = model

    async def extract(self, text: str, model_class: type[T]) -> Optional[T]:
        import google.generativeai as genai
        import os

        genai.configure(api_key=os.environ.get("GEMINI_API_KEY"))
        model = genai.GenerativeModel(self.model_name)

        schema = model_class.model_json_schema()
        prompt = f"""以下のテキストから構造化データを抽出してください。
JSON Schemaに厳密に従って結果を返してください。追加コンテンツは不要です。

JSON Schema:
{json.dumps(schema, ensure_ascii=False, indent=2)}

テキスト:
{text}"""

        response = await model.generate_content_async(prompt)
        raw = response.text.strip()

        if raw.startswith("```json"):
            raw = raw[7:]
        if raw.endswith("```"):
            raw = raw[:-3]
        raw = raw.strip()

        try:
            return model_class.model_validate(json.loads(raw))
        except Exception as e:
            print(f"Gemini解析失敗: {e}")
            return None


class MultiModelStructuredExtractor:
    def __init__(self):
        self.adapters: dict[str, StructuredOutputAdapter] = {}

    def register(self, name: str, adapter: StructuredOutputAdapter):
        self.adapters[name] = adapter

    async def extract(
        self,
        text: str,
        model_class: type[T],
        preferred: str = "openai",
        fallback: bool = True,
    ) -> Optional[T]:
        order = [preferred]
        if fallback:
            order.extend([k for k in self.adapters if k != preferred])

        for model_name in order:
            adapter = self.adapters.get(model_name)
            if adapter is None:
                continue
            try:
                result = await adapter.extract(text, model_class)
                if result is not None:
                    return result
            except Exception as e:
                print(f"[{model_name}] 抽出失敗: {e}")
                continue

        return None

    async def extract_consensus(
        self,
        text: str,
        model_class: type[T],
        min_agreement: int = 2,
    ) -> Optional[T]:
        import asyncio

        tasks = {
            name: adapter.extract(text, model_class)
            for name, adapter in self.adapters.items()
        }

        results = await asyncio.gather(*tasks.values(), return_exceptions=True)

        valid_results = []
        for (name, _), result in zip(tasks.items(), results):
            if isinstance(result, Exception):
                print(f"[{name}] 例外: {result}")
                continue
            if result is not None:
                valid_results.append(result)

        if len(valid_results) >= min_agreement:
            return valid_results[0]

        return valid_results[0] if valid_results else None

マルチモデル適応アーキテクチャ

                    ┌─────────────────────┐
                    │  MultiModelExtractor │
                    │  (統一インターフェース) │
                    └──────────┬──────────┘
                               │
              ┌────────────────┼────────────────┐
              │                │                │
    ┌─────────▼──────┐ ┌──────▼───────┐ ┌──────▼───────┐
    │ OpenAI Adapter │ │Anthropic Adp │ │ Gemini Adptr │
    │ JSON_SCHEMA    │ │ANTHROPIC_TOOLS│ │ Prompt+Parse │
    │ Instructor     │ │ Instructor   │ │ 手動解析      │
    └────────────────┘ └──────────────┘ └──────────────┘

パターン5:ストリーミング構造化出力

LLM構造化出力とストリーミングを組み合わせ、リアルタイム解析とプログレッシブ表示を実現。

import json
import asyncio
from typing import AsyncIterator, Optional
from pydantic import BaseModel, Field
from openai import AsyncOpenAI


class StreamingJsonParser:
    def __init__(self):
        self.buffer = ""
        self.depth = 0
        self.in_string = False
        self.escape_next = False
        self.started = False

    def feed(self, chunk: str) -> list[dict]:
        self.buffer += chunk
        results = []

        for char in chunk:
            if self.escape_next:
                self.escape_next = False
                continue

            if char == '\\' and self.in_string:
                self.escape_next = True
                continue

            if char == '"' and not self.escape_next:
                self.in_string = not self.in_string
                continue

            if self.in_string:
                continue

            if char == '{':
                if not self.started:
                    self.started = True
                    idx = self.buffer.rfind('{')
                    self.buffer = self.buffer[idx:]
                self.depth += 1
            elif char == '}':
                self.depth -= 1
                if self.depth == 0 and self.started:
                    try:
                        parsed = json.loads(self.buffer)
                        results.append(parsed)
                    except json.JSONDecodeError:
                        pass
                    self.buffer = ""
                    self.started = False

        return results


class PartialModelBuilder:
    def __init__(self, model_class: type[BaseModel]):
        self.model_class = model_class
        self.current_json = {}
        self.last_valid = None

    def update(self, json_data: dict) -> Optional[BaseModel]:
        self.current_json.update(json_data)
        try:
            self.last_valid = self.model_class.model_validate(self.current_json)
            return self.last_valid
        except Exception:
            return self.last_valid


class StreamingEvent(BaseModel):
    event_type: str = Field(description="イベントタイプ")
    data: dict = Field(description="イベントデータ")
    confidence: float = Field(ge=0.0, le=1.0, description="信頼度")


async def stream_structured_output(
    prompt: str,
    model_class: type[BaseModel],
    model: str = "gpt-4o",
) -> AsyncIterator[BaseModel]:
    import instructor

    client = instructor.from_openai(AsyncOpenAI())

    stream = await client.chat.completions.create_partial(
        model=model,
        messages=[{"role": "user", "content": prompt}],
        response_model=model_class,
        max_retries=2,
    )

    async for partial in stream:
        yield partial


async def stream_with_raw_parser(
    prompt: str,
    model: str = "gpt-4o",
) -> AsyncIterator[dict]:
    client = AsyncOpenAI()

    schema_prompt = f"""JSON形式で結果を返してください。JSONのみ、他は不要。
{prompt}"""

    stream = await client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": schema_prompt}],
        stream=True,
        temperature=0.1,
    )

    parser = StreamingJsonParser()
    full_content = ""

    async for chunk in stream:
        if chunk.choices and chunk.choices[0].delta.content:
            content = chunk.choices[0].delta.content
            full_content += content
            parsed_results = parser.feed(content)
            for result in parsed_results:
                yield result

    if not parsed_results and full_content:
        try:
            yield json.loads(full_content)
        except json.JSONDecodeError:
            pass


async def stream_sse_structured(
    prompt: str,
    model_class: type[BaseModel],
):
    from fastapi import FastAPI
    from fastapi.responses import StreamingResponse

    app = FastAPI()

    async def generate():
        async for partial in stream_structured_output(prompt, model_class):
            data = partial.model_dump_json(exclude_none=True)
            yield f"data: {data}\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={"X-Accel-Buffering": "no"},
    )

ストリーミング構造化出力アーキテクチャ

クライアントリクエスト
    │
    ▼
FastAPI SSEエンドポイント
    │
    ▼
Instructor create_partial()
    │
    ├──→ chunk1: {"name": "製品A"...}
    ├──→ chunk2: {"name": "製品A", "price": 99...}
    ├──→ chunk3: {"name": "製品A", "price": 99.0, "category": "電子"...}
    └──→ 最終: 完全なPydanticモデルインスタンス

各chunkはSSE経由でクライアントにプッシュ
クライアントはプログレッシブにUIをレンダリング

パターン6:プロダクション級信頼性保障

すべてのパターンをプロダクション対応の構造化出力サービスに統合。

import json
import time
import logging
from typing import Optional
from dataclasses import dataclass, field
from enum import Enum
from pydantic import BaseModel, Field
from openai import AsyncOpenAI

logger = logging.getLogger(__name__)


class OutputStatus(str, Enum):
    SUCCESS = "success"
    VALIDATION_FAILED = "validation_failed"
    PARSE_FAILED = "parse_failed"
    LLM_ERROR = "llm_error"
    TIMEOUT = "timeout"
    RETRY_EXHAUSTED = "retry_exhausted"


@dataclass
class ExtractionResult:
    data: Optional[BaseModel] = None
    status: OutputStatus = OutputStatus.SUCCESS
    attempts: int = 0
    latency_ms: float = 0.0
    error_message: str = ""
    model_used: str = ""
    tokens_used: dict = field(default_factory=dict)


class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 60.0,
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time: Optional[float] = None
        self.is_open = False

    def record_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.is_open = True

    def record_success(self):
        self.failure_count = 0
        self.is_open = False

    def can_execute(self) -> bool:
        if not self.is_open:
            return True
        if self.last_failure_time and \
           time.time() - self.last_failure_time > self.recovery_timeout:
            self.is_open = False
            self.failure_count = 0
            return True
        return False


class StructuredOutputService:
    def __init__(
        self,
        max_retries: int = 3,
        timeout: float = 30.0,
        fallback_models: Optional[list[str]] = None,
    ):
        self.client = AsyncOpenAI()
        self.max_retries = max_retries
        self.timeout = timeout
        self.fallback_models = fallback_models or ["gpt-4o", "gpt-4o-mini"]
        self.circuit_breakers: dict[str, CircuitBreaker] = {}

    def _get_breaker(self, model: str) -> CircuitBreaker:
        if model not in self.circuit_breakers:
            self.circuit_breakers[model] = CircuitBreaker()
        return self.circuit_breakers[model]

    async def extract(
        self,
        text: str,
        model_class: type[BaseModel],
        preferred_model: Optional[str] = None,
    ) -> ExtractionResult:
        import instructor

        models = [preferred_model] if preferred_model else self.fallback_models
        models = [m for m in models if self._get_breaker(m).can_execute()]

        if not models:
            return ExtractionResult(
                status=OutputStatus.RETRY_EXHAUSTED,
                error_message="全モデルのサーキットブレーカーがオープン",
            )

        for model in models:
            result = await self._try_extract(text, model_class, model)
            if result.status == OutputStatus.SUCCESS:
                self._get_breaker(model).record_success()
                return result
            else:
                self._get_breaker(model).record_failure()
                logger.warning(f"モデル{model}抽出失敗: {result.error_message}")

        return result

    async def _try_extract(
        self,
        text: str,
        model_class: type[BaseModel],
        model: str,
    ) -> ExtractionResult:
        import instructor

        start_time = time.time()
        client = instructor.from_openai(self.client, mode=instructor.Mode.JSON_SCHEMA)

        for attempt in range(1, self.max_retries + 1):
            try:
                result = await client.chat.completions.create(
                    model=model,
                    messages=[{"role": "user", "content": text}],
                    response_model=model_class,
                    max_retries=0,
                    timeout=self.timeout,
                )

                latency = (time.time() - start_time) * 1000
                return ExtractionResult(
                    data=result,
                    status=OutputStatus.SUCCESS,
                    attempts=attempt,
                    latency_ms=latency,
                    model_used=model,
                )

            except instructor.exceptions.InstructorRetryException as e:
                logger.warning(f"試行{attempt}検証失敗: {e}")
                continue

            except Exception as e:
                error_msg = str(e)
                if "timeout" in error_msg.lower():
                    return ExtractionResult(
                        status=OutputStatus.TIMEOUT,
                        attempts=attempt,
                        latency_ms=(time.time() - start_time) * 1000,
                        error_message=error_msg,
                        model_used=model,
                    )
                logger.error(f"試行{attempt}LLMエラー: {e}")
                continue

        return ExtractionResult(
            status=OutputStatus.RETRY_EXHAUSTED,
            attempts=self.max_retries,
            latency_ms=(time.time() - start_time) * 1000,
            error_message=f"{self.max_retries}回リトライ後も失敗",
            model_used=model,
        )


class StructuredOutputCache:
    def __init__(self, ttl: float = 3600.0, max_size: int = 1000):
        self.ttl = ttl
        self.max_size = max_size
        self._cache: dict[str, tuple[float, BaseModel]] = {}

    def _make_key(self, text: str, model_class: type[BaseModel]) -> str:
        import hashlib
        content_hash = hashlib.sha256(text.encode()).hexdigest()[:16]
        return f"{model_class.__name__}:{content_hash}"

    def get(self, text: str, model_class: type[BaseModel]) -> Optional[BaseModel]:
        key = self._make_key(text, model_class)
        if key in self._cache:
            timestamp, data = self._cache[key]
            if time.time() - timestamp < self.ttl:
                return data
            del self._cache[key]
        return None

    def set(self, text: str, model_class: type[BaseModel], data: BaseModel):
        if len(self._cache) >= self.max_size:
            oldest_key = min(self._cache, key=lambda k: self._cache[k][0])
            del self._cache[oldest_key]

        key = self._make_key(text, model_class)
        self._cache[key] = (time.time(), data)

プロダクションアーキテクチャ全体像

                    ┌────────────────────────────┐
                    │  StructuredOutputService    │
                    │  (統一エントリーポイント)    │
                    └──────────┬─────────────────┘
                               │
                    ┌──────────▼─────────────────┐
                    │  CircuitBreaker             │
                    │  (モデルレベルサーキットブレーカー) │
                    └──────────┬─────────────────┘
                               │
              ┌────────────────┼────────────────┐
              │                │                │
    ┌─────────▼──────┐ ┌──────▼───────┐ ┌──────▼───────┐
    │  gpt-4o        │ │  gpt-4o-mini │ │  fallback    │
    │  JSON_SCHEMA   │ │  JSON_SCHEMA │ │  Prompt+Parse│
    │  +Instructor   │ │  +Instructor │ │  +リトライ    │
    └────────────────┘ └──────────────┘ └──────────────┘
              │                │                │
              └────────────────┼────────────────┘
                               │
                    ┌──────────▼─────────────────┐
                    │  StructuredOutputCache      │
                    │  (結果キャッシュ)            │
                    └────────────────────────────┘

5つのよくある落とし穴と解決策

落とし穴1:LLMがコメント付きJSONを返す

import json
import re


def strip_json_comments(text: str) -> str:
    text = re.sub(r'//.*?$', '', text, flags=re.MULTILINE)
    text = re.sub(r'/\*.*?\*/', '', text, flags=re.DOTALL)
    return text


raw = '''{
    "name": "製品A",  // コメント
    "price": 99.0
    /* 複数行
       コメント */
}'''

clean = strip_json_comments(raw)
data = json.loads(clean)

落とし穴2:ネストSchemaで出力が切り詰められる

from pydantic import BaseModel, Field


class Address(BaseModel):
    street: str
    city: str
    zip_code: str


class PersonFlat(BaseModel):
    name: str
    street: str = Field(description="番地")
    city: str = Field(description="都市")
    zip_code: str = Field(description="郵便番号")


class PersonNested(BaseModel):
    name: str
    address: Address


# 推奨:ネスト深度は2レベル以下、それ以上はフラット化
# 非推奨:Person → Address → GeoLocation → Coordinates
# 推奨:PersonFlat(全フィールドを同じレベルに)

落とし穴3:列挙値をLLMが守らない

from enum import Enum
from pydantic import BaseModel, Field, field_validator


class Sentiment(str, Enum):
    POSITIVE = "positive"
    NEGATIVE = "negative"
    NEUTRAL = "neutral"


class ReviewWithEnum(BaseModel):
    text: str
    sentiment: Sentiment

    @field_validator("sentiment", mode="before")
    @classmethod
    def normalize_sentiment(cls, v):
        if isinstance(v, str):
            v = v.strip().lower()
            mapping = {
                "ポジティブ": "positive", "良い": "positive", "素晴らしい": "positive",
                "ネガティブ": "negative", "悪い": "negative", "ひどい": "negative",
                "ニュートラル": "neutral", "普通": "neutral", "まあまあ": "neutral",
            }
            return mapping.get(v, v)
        return v

落とし穴4:リスト長が制御不能

from pydantic import BaseModel, Field, field_validator


class TaggedContent(BaseModel):
    content: str
    tags: list[str] = Field(min_length=1, max_length=5)

    @field_validator("tags")
    @classmethod
    def deduplicate_tags(cls, v: list[str]) -> list[str]:
        seen = set()
        result = []
        for tag in v:
            normalized = tag.strip().lower()
            if normalized not in seen:
                seen.add(normalized)
                result.append(tag.strip())
        return result[:5]

落とし穴5:Strict ModeがすべてのSchema機能をサポートしない

from pydantic import BaseModel, Field


# Strict Modeでサポートされない機能:
# 1. additionalProperties: false を明示的に設定する必要がある
# 2. オプションフィールドにはdefault値が必要
# 3. union型はサポートされない(一部モデル)
# 4. 複雑な正規表現patternはサポートされない

# 解決策:Schemaを簡素化 + field_validatorで補完

class SimpleProduct(BaseModel):
    name: str
    price: float = Field(gt=0)
    category: str = Field(default="other")
    tags: list[str] = Field(default_factory=list)

10のよくあるエラートラブルシューティング

# エラーメッセージ 原因 解決策
1 json.decoder.JSONDecodeError LLMが無効なJSONを返した extract_json_from_responseでフォールトトレラント抽出
2 ValidationError: field required LLMが必須フィールドを省略 default値を追加またはInstructor自動リトライを使用
3 InstructorRetryException: max retries 3回リトライしても検証に通らない Schemaが複雑すぎないか確認、ネストを簡素化
4 TypeError: 'NoneType' object tool_callsが空、LLMが関数を呼び出していない tool_choice設定を確認、モデルがFunction Callingをサポートしているか確認
5 RateLimitError: 429 API呼び出しレート超過 指数バックオフリトライを追加、並行度を下げる
6 Timeout: request timed out LLM推論タイムアウト Schema複雑度を下げる、timeoutパラメータを増やす
7 BadRequestError: Invalid schema Schemaがモデル要件を満たしていない strict mode制限を確認、Schemaを簡素化
8 ValidationError: string too long LLMが長すぎる文字列を返した max_length制約を追加
9 KeyError: 'tool_calls' モデルがFunction Callingをサポートしていない JSON SchemaモードまたはPromptモードに切り替え
10 RecursionError: maximum depth Schemaネストが深すぎる ネスト構造をフラット化、最大2レベル

高度な最適化テクニック

テクニック1:Few-shot例で精度向上

from pydantic import BaseModel, Field
from openai import AsyncOpenAI
import instructor


class Classification(BaseModel):
    category: str = Field(description="カテゴリ")
    confidence: float = Field(ge=0.0, le=1.0)


async def few_shot_extract(text: str) -> Classification:
    client = instructor.from_openai(AsyncOpenAI())

    examples = [
        {"role": "user", "content": "この製品は素晴らしい、強くお勧めします!"},
        {"role": "assistant", "content": '{"category": "positive", "confidence": 0.95}'},
        {"role": "user", "content": "品質は普通、価格が高すぎる"},
        {"role": "assistant", "content": '{"category": "neutral", "confidence": 0.7}'},
    ]

    return await client.chat.completions.create(
        model="gpt-4o",
        messages=examples + [{"role": "user", "content": text}],
        response_model=Classification,
        max_retries=2,
    )

テクニック2:Schema記述の最適化

from pydantic import BaseModel, Field


class BadSchema(BaseModel):
    type: str
    value: str


class GoodSchema(BaseModel):
    type: str = Field(
        description="エンティティタイプ、次のいずれか: person, organization, location, date"
    )
    value: str = Field(
        description="エンティティの標準化値。personはフルネーム、organizationは正式名称、"
                    "dateはYYYY-MM-DD形式、locationは都市+国"
    )

テクニック3:段階的抽出で複雑構造に対応

from pydantic import BaseModel, Field
from openai import AsyncOpenAI
import instructor


class BasicInfo(BaseModel):
    title: str
    summary: str


class DetailedInfo(BasicInfo):
    key_points: list[str]
    entities: list[str]
    sentiment: str


async def progressive_extract(text: str) -> DetailedInfo:
    client = instructor.from_openai(AsyncOpenAI())

    basic = await client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": f"基本情報を抽出:\n{text}"}],
        response_model=BasicInfo,
        max_retries=2,
    )

    detailed = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "user", "content": f"以下の基本情報に基づき、詳細分析を抽出:\n"
                                        f"タイトル: {basic.title}\n要約: {basic.summary}\n\n原文:\n{text}"}
        ],
        response_model=DetailedInfo,
        max_retries=2,
    )

    return detailed

テクニック4:出力品質自己チェック

from pydantic import BaseModel, Field, model_validator


class SelfValidatingOutput(BaseModel):
    question: str
    answer: str
    sources: list[str] = Field(min_length=1)
    confidence: float = Field(ge=0.0, le=1.0)

    @model_validator(mode="after")
    def check_answer_quality(self):
        if len(self.answer) < 10:
            raise ValueError("回答が短すぎます、不完全な可能性があります")
        if self.confidence > 0.9 and len(self.sources) < 2:
            raise ValueError("高信頼度だがソースが不足、再検証してください")
        return self

比較分析:3つの構造化出力アプローチ

次元 Prompt+JSON解析 Function Callingプロトコル Instructorライブラリ
信頼性 ⭐⭐ 60-80% ⭐⭐⭐⭐ 90-95% ⭐⭐⭐⭐⭐ 95-99%
実装複雑さ 低(ラップ後)
モデル互換性 全モデル OpenAI/一部モデル OpenAI/Anthropic/Gemini
自動リトライ ❌手動 ❌手動 ✅内蔵
ストリーミング対応 ❌困難 ⚠️制限あり ✅create_partial
Schema検証 ❌手動 ⚠️一部 ✅Pydantic自動
デバッグ難易度
プロダクション推奨度 ⭐非推奨 ⭐⭐⭐推奨 ⭐⭐⭐⭐⭐強く推奨
Tokenオーバーヘッド 中(+tool定義) 中(+tool定義)
ネスト深度 無制限 制限あり 制限あり

選択判定ツリー

構造化出力が必要か?
  ├── いいえ → Chat Completionを直接使用
  └── はい → どのモデルを使用?
       ├── OpenAIのみ → Instructor + Mode.JSON_SCHEMA
       ├── OpenAI + Anthropic → Instructor + アダプターパターン
       ├── 任意のモデル → Prompt+JSON解析 + 厳格な検証
       └── ストリーミングが必要 → Instructor + create_partial

オンラインツール推奨


まとめ:Python LLM構造化出力はAIエンジニアリングのコアインフラです。6つのパターンはシンプルから複雑へ:JSON Schema制約→Function Callingプロトコル→Instructor自動リトライ→マルチモデル適応→ストリーミング構造化出力→プロダクション信頼性保障。プロダクションではInstructorライブラリを推奨、Pydantic検証と自動リトライで95%以上の信頼性を実現。重要ポイント:1)ネストSchemaは2レベル以下、2)列挙値はfield_validatorで正規化、3)サーキットブレーカーで下流モデルを保護、4)キャッシュで重複呼び出しを削減。マルチモデルシナリオではアダプターパターンで統一インターフェース、ストリーミングシナリオではcreate_partialでプログレッシブ出力。


関連記事

ブラウザローカルツールを無料で試す →

#Python#LLM#结构化输出#JSON Schema#函数调用#Instructor#2026#AI与大数据