Python LLM構造化出力:JSON SchemaからFunction Callingまで6つのプロダクションパターン
LLMが自由テキストを出力し、ダウンストリームシステムが全崩壊
GPTにJSONを返させると、コメント付きJSONが返ってくる。整数フィールドを指定すると、文字列"42"が返ってくる。リスト長3を要求すると、5個返ってくる。LLM構造化出力は2026年のAIエンジニアリングで最も重要な基盤能力——これなしでは、RAGパイプライン、エージェントツール呼び出し、データ抽出ワークフローはすべて時限爆弾です。
本記事はJSON Schema制約から出発し、JSON Schema検証→OpenAI Function Calling→Instructor自動リトライ→マルチモデル適応→ストリーミング構造化出力→プロダクション信頼性保障の6つのプロダクションパターンを完全ガイドします。
主要な収穫
- LLM構造化出力の3つのコアメカニズムを理解:Prompt制約、JSON Schema、Function Callingプロトコル
- シンプルから複雑まで6つのPython構造化出力パターンを習得
- Instructorライブラリの自動リトライと検証戦略を学習
- クロスモデル(OpenAI/Anthropic/Gemini)構造化出力適応を実装
- プロダクション級信頼性保障システムを構築
目次
- LLM構造化出力コア概念
- パターン1:JSON Schema制約出力
- パターン2:OpenAI Function Callingプロトコル
- パターン3:Instructorライブラリ自動リトライ
- パターン4:マルチモデル構造化出力適応
- パターン5:ストリーミング構造化出力
- パターン6:プロダクション級信頼性保障
- 5つのよくある落とし穴と解決策
- 10のよくあるエラートラブルシューティング
- 高度な最適化テクニック
- 比較分析:3つの構造化出力アプローチ
- オンラインツール推奨
LLM構造化出力コア概念
| 概念 | 説明 |
|---|---|
| Structured Output | LLMが事前定義Schemaに準拠した構造化データ(JSON/XML)を出力 |
| JSON Schema | JSONデータ構造を記述する仕様、LLM出力の制約と検証に使用 |
| Function Calling | OpenAIが提案したプロトコル、LLMが関数パラメータSchemaに準拠したJSONを出力 |
| Tool Use | Anthropic/GeminiのFunction Calling実装、意味は同じ |
| Constrained Decoding | 推論時のトークン選択を制約、100% Schema準拠を保証 |
| Instructor | Pythonライブラリ、Pydanticモデルに基づきSchema+検証+リトライを自動生成 |
なぜLLM構造化出力が重要なのか
従来のLLM出力フロー:
ユーザーPrompt → LLM自由生成 → 文字列 → 正規表現/JSON解析 → 失敗の可能性 → リトライ
構造化出力フロー:
ユーザーPrompt + Schema → LLM制約付き生成 → 有効なJSON → Pydantic検証 → 成功
主な違い:
1. 従来方式:出力が予測不可能、解析が脆弱、リトライコストが高い
2. 構造化出力:出力が予測可能、検証が信頼性あり、リトライが保証される
3つの構造化出力メカニズム比較
| メカニズム | 原理 | 信頼性 | レイテンシ | 互換性 |
|---|---|---|---|---|
| Prompt制約 | プロンプトで出力形式を記述 | ⭐低 | 追加なし | 全モデル |
| JSON Schema | Schemaで出力構造を制約 | ⭐⭐中 | わずか | 一部モデル |
| Function Calling | 専用APIチャネル+Constrained Decoding | ⭐⭐⭐高 | わずか | 特定モデル |
パターン1:JSON Schema制約出力
最も基本的な構造化出力アプローチ:Promptでフォーマット要件を記述し、JSON Schemaで結果を検証。
import json
import re
from typing import Optional
from pydantic import BaseModel, Field, ValidationError
class MovieReview(BaseModel):
title: str = Field(description="映画タイトル")
rating: int = Field(ge=1, le=10, description="評価1-10")
sentiment: str = Field(pattern="^(positive|negative|neutral)$")
summary: str = Field(max_length=200, description="短いレビュー")
recommended: bool = Field(description="おすすめかどうか")
MOVIE_REVIEW_SCHEMA = MovieReview.model_json_schema()
STRUCTURED_PROMPT = """あなたはプロの映画レビュー分析器です。
以下のレビューを分析し、JSON Schemaに厳密に従って結果を返してください。
JSON Schema:
{schema}
レビュー内容:
{review}
重要な要件:
1. 有効なJSONを返す必要があります
2. ratingは1-10の整数である必要があります
3. sentimentはpositive/negative/neutralのみ
4. JSON以外のコンテンツを追加しないでください
5. ```json```でラップしないでください
"""
def extract_json_from_response(text: str) -> Optional[dict]:
patterns = [
r'```json\s*(.*?)\s*```',
r'```\s*(.*?)\s*```',
r'(\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\})',
]
for pattern in patterns:
match = re.search(pattern, text, re.DOTALL)
if match:
try:
return json.loads(match.group(1))
except json.JSONDecodeError:
continue
try:
return json.loads(text.strip())
except json.JSONDecodeError:
return None
def parse_structured_output(raw_text: str) -> Optional[MovieReview]:
parsed_json = extract_json_from_response(raw_text)
if parsed_json is None:
return None
try:
return MovieReview.model_validate(parsed_json)
except ValidationError as e:
print(f"検証失敗: {e}")
return None
async def call_llm_with_schema(prompt: str) -> Optional[MovieReview]:
from openai import AsyncOpenAI
client = AsyncOpenAI()
formatted_prompt = STRUCTURED_PROMPT.format(
schema=json.dumps(MOVIE_REVIEW_SCHEMA, ensure_ascii=False, indent=2),
review=prompt
)
response = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": formatted_prompt}],
temperature=0.1,
)
raw_text = response.choices[0].message.content or ""
return parse_structured_output(raw_text)
JSON Schema検証の限界
問題1:LLMが無効なJSONを返す可能性
→ extract_json_from_responseでフォールトトレラント抽出が必要
問題2:LLMがSchema制約を無視する可能性
→ ratingが"9点"ではなく9を返す
→ sentimentが"とてもポジティブ"ではなくpositiveを返す
問題3:ネスト構造がエラーになりやすい
→ リスト長が制御不能
→ オプションフィールドが欠落する可能性
問題4:毎回Promptを手書きする必要がある
→ メンテナンスコストが高く、見落としがち
パターン2:OpenAI Function Callingプロトコル
OpenAIのFunction Callingプロトコルは構造化出力の標準アプローチで、専用APIチャネルでLLMがSchemaに準拠したJSONを出力。
import json
from typing import Optional
from pydantic import BaseModel, Field
from openai import AsyncOpenAI
class SentimentAnalysis(BaseModel):
text: str = Field(description="分析対象テキスト")
sentiment: str = Field(description="感情: positive/negative/neutral")
confidence: float = Field(ge=0.0, le=1.0, description="信頼度0-1")
keywords: list[str] = Field(description="キーワードリスト")
language: str = Field(description="検出言語")
class EntityExtraction(BaseModel):
entities: list[dict] = Field(description="抽出エンティティリスト")
relationships: list[dict] = Field(default_factory=list, description="エンティティ間関係")
summary: str = Field(description="テキスト要約")
def pydantic_to_function_schema(model_class: type[BaseModel]) -> dict:
schema = model_class.model_json_schema()
return {
"type": "function",
"function": {
"name": model_class.__name__,
"description": model_class.__doc__ or f"Extract {model_class.__name__}",
"parameters": {
"type": "object",
"properties": schema.get("properties", {}),
"required": schema.get("required", []),
}
}
}
async def function_calling_extract(
text: str,
model_class: type[BaseModel],
model: str = "gpt-4o"
) -> Optional[BaseModel]:
client = AsyncOpenAI()
function_schema = pydantic_to_function_schema(model_class)
response = await client.chat.completions.create(
model=model,
messages=[
{
"role": "system",
"content": "あなたは正確なデータ抽出アシスタントです。提供された関数で構造化出力してください。"
},
{
"role": "user",
"content": text
}
],
tools=[function_schema],
tool_choice={"type": "function", "function": {"name": model_class.__name__}},
)
message = response.choices[0].message
if message.tool_calls and len(message.tool_calls) > 0:
tool_call = message.tool_calls[0]
try:
args = json.loads(tool_call.function.arguments)
return model_class.model_validate(args)
except (json.JSONDecodeError, Exception) as e:
print(f"関数呼び出し結果の解析に失敗: {e}")
return None
return None
async def multi_function_calling(
text: str,
model_classes: list[type[BaseModel]],
model: str = "gpt-4o"
) -> dict[str, BaseModel]:
client = AsyncOpenAI()
tool_schemas = [pydantic_to_function_schema(cls) for cls in model_classes]
response = await client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": "あなたはマルチタスクデータ抽出アシスタントです。"},
{"role": "user", "content": text}
],
tools=tool_schemas,
tool_choice="auto",
)
results = {}
message = response.choices[0].message
if message.tool_calls:
for tool_call in message.tool_calls:
for cls in model_classes:
if tool_call.function.name == cls.__name__:
try:
args = json.loads(tool_call.function.arguments)
results[cls.__name__] = cls.model_validate(args)
except Exception as e:
print(f"{cls.__name__}の解析に失敗: {e}")
return results
Function Calling Strict Mode
from openai import AsyncOpenAI
async def strict_structured_output(
text: str,
model_class: type[BaseModel],
model: str = "gpt-4o-2024-08-06"
) -> Optional[BaseModel]:
client = AsyncOpenAI()
schema = model_class.model_json_schema()
response = await client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": "構造化データを抽出"},
{"role": "user", "content": text}
],
response_format={
"type": "json_schema",
"json_schema": {
"name": model_class.__name__,
"strict": True,
"schema": schema,
}
}
)
raw = response.choices[0].message.content
if raw:
try:
return model_class.model_validate(json.loads(raw))
except Exception as e:
print(f"Strict mode解析失敗: {e}")
return None
パターン3:Instructorライブラリ自動リトライ
InstructorはPython LLM構造化出力のベストプラクティスで、Pydanticモデルに基づきSchema自動生成、出力検証、自動リトライを行います。
import instructor
from pydantic import BaseModel, Field, field_validator
from openai import AsyncOpenAI
class ProductInfo(BaseModel):
name: str = Field(description="製品名")
price: float = Field(gt=0, description="価格、0より大きい必要がある")
category: str = Field(description="製品カテゴリ")
features: list[str] = Field(description="製品機能リスト", min_length=1, max_length=5)
in_stock: bool = Field(description="在庫ありかどうか")
@field_validator("price")
@classmethod
def round_price(cls, v: float) -> float:
return round(v, 2)
@field_validator("category")
@classmethod
def normalize_category(cls, v: str) -> str:
return v.strip().lower()
class ArticleMetadata(BaseModel):
title: str = Field(description="記事タイトル")
author: str = Field(description="著者")
publish_date: str = Field(description="公開日、YYYY-MM-DD形式")
tags: list[str] = Field(description="タグリスト")
word_count: int = Field(ge=0, description="文字数")
reading_time_minutes: int = Field(ge=1, description="推定読了時間(分)")
@field_validator("publish_date")
@classmethod
def validate_date_format(cls, v: str) -> str:
import re
if not re.match(r'^\d{4}-\d{2}-\d{2}$', v):
raise ValueError(f"日付形式エラー: {v}、YYYY-MM-DDが必要")
return v
async def instructor_extract_product(text: str) -> ProductInfo:
client = instructor.from_openai(AsyncOpenAI())
result = await client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "user", "content": f"以下のテキストから製品情報を抽出:\n\n{text}"}
],
response_model=ProductInfo,
max_retries=3,
temperature=0.1,
)
return result
async def instructor_extract_with_mode(
text: str,
mode: instructor.Mode = instructor.Mode.TOOLS
) -> ArticleMetadata:
client = instructor.from_openai(AsyncOpenAI(), mode=mode)
result = await client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "user", "content": f"記事メタデータを抽出:\n\n{text}"}
],
response_model=ArticleMetadata,
max_retries=3,
)
return result
async def instructor_partial_streaming(text: str):
client = instructor.from_openai(AsyncOpenAI())
article = await client.chat.completions.create_partial(
model="gpt-4o",
messages=[
{"role": "user", "content": f"記事メタデータを抽出:\n\n{text}"}
],
response_model=ArticleMetadata,
max_retries=3,
)
async for partial in article:
print(f"部分結果: {partial.model_dump_json(exclude_none=True)}")
Instructor Mode選択
Mode.JSON_SCHEMA → OpenAIのresponse_format=json_schema(推奨、最も信頼性が高い)
Mode.TOOLS → OpenAIのfunction calling(互換性が良い)
Mode.JSON → PromptでJSON出力を要求(最も汎用的、信頼性は最低)
Mode.ANTHROPIC_TOOLS→ Anthropicのtool_use
Mode.GEMINI_JSON → GeminiのJSONモード
Instructorリトライ戦略詳細
import instructor
from pydantic import BaseModel, Field, ValidationError
from openai import AsyncOpenAI
from tenacity import retry, stop_after_attempt, wait_exponential
class StrictUser(BaseModel):
name: str = Field(min_length=2, max_length=50)
age: int = Field(ge=0, le=150)
email: str = Field(pattern=r'^[\w\.-]+@[\w\.-]+\.\w+$')
async def instructor_with_custom_retry(text: str) -> StrictUser:
client = instructor.from_openai(
AsyncOpenAI(),
mode=instructor.Mode.JSON_SCHEMA,
)
result, completion = await client.chat.completions.create_with_completion(
model="gpt-4o",
messages=[{"role": "user", "content": text}],
response_model=StrictUser,
max_retries=3,
validation_context={"strict": True},
)
print(f"Token使用量: prompt={completion.usage.prompt_tokens}, "
f"completion={completion.usage.completion_tokens}")
return result
async def instructor_batch_extract(
texts: list[str],
) -> list[StrictUser]:
client = instructor.from_openai(AsyncOpenAI())
results = []
for text in texts:
try:
result = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": text}],
response_model=StrictUser,
max_retries=2,
)
results.append(result)
except instructor.exceptions.InstructorRetryException as e:
print(f"バッチ抽出失敗、スキップ: {e}")
results.append(None)
return results
パターン4:マルチモデル構造化出力適応
異なるLLMベンダーの構造化出力APIはそれぞれ異なり、適応レイヤーで統一処理が必要です。
import json
from abc import ABC, abstractmethod
from typing import Optional, TypeVar
from pydantic import BaseModel
from openai import AsyncOpenAI
T = TypeVar("T", bound=BaseModel)
class StructuredOutputAdapter(ABC):
@abstractmethod
async def extract(self, text: str, model_class: type[T]) -> Optional[T]:
pass
class OpenAIStructuredAdapter(StructuredOutputAdapter):
def __init__(self, model: str = "gpt-4o"):
self.client = AsyncOpenAI()
self.model = model
async def extract(self, text: str, model_class: type[T]) -> Optional[T]:
import instructor
client = instructor.from_openai(self.client, mode=instructor.Mode.JSON_SCHEMA)
return await client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": text}],
response_model=model_class,
max_retries=3,
)
class AnthropicStructuredAdapter(StructuredOutputAdapter):
def __init__(self, model: str = "claude-sonnet-4-20250514"):
try:
import anthropic
self.client = anthropic.AsyncAnthropic()
except ImportError:
raise ImportError("anthropicをインストール: pip install anthropic")
self.model = model
async def extract(self, text: str, model_class: type[T]) -> Optional[T]:
import anthropic
import instructor
client = instructor.from_anthropic(
self.client,
mode=instructor.Mode.ANTHROPIC_TOOLS,
)
return await client.chat.completions.create(
model=self.model,
max_tokens=4096,
messages=[{"role": "user", "content": text}],
response_model=model_class,
max_retries=3,
)
class GeminiStructuredAdapter(StructuredOutputAdapter):
def __init__(self, model: str = "gemini-2.0-flash"):
self.model_name = model
async def extract(self, text: str, model_class: type[T]) -> Optional[T]:
import google.generativeai as genai
import os
genai.configure(api_key=os.environ.get("GEMINI_API_KEY"))
model = genai.GenerativeModel(self.model_name)
schema = model_class.model_json_schema()
prompt = f"""以下のテキストから構造化データを抽出してください。
JSON Schemaに厳密に従って結果を返してください。追加コンテンツは不要です。
JSON Schema:
{json.dumps(schema, ensure_ascii=False, indent=2)}
テキスト:
{text}"""
response = await model.generate_content_async(prompt)
raw = response.text.strip()
if raw.startswith("```json"):
raw = raw[7:]
if raw.endswith("```"):
raw = raw[:-3]
raw = raw.strip()
try:
return model_class.model_validate(json.loads(raw))
except Exception as e:
print(f"Gemini解析失敗: {e}")
return None
class MultiModelStructuredExtractor:
def __init__(self):
self.adapters: dict[str, StructuredOutputAdapter] = {}
def register(self, name: str, adapter: StructuredOutputAdapter):
self.adapters[name] = adapter
async def extract(
self,
text: str,
model_class: type[T],
preferred: str = "openai",
fallback: bool = True,
) -> Optional[T]:
order = [preferred]
if fallback:
order.extend([k for k in self.adapters if k != preferred])
for model_name in order:
adapter = self.adapters.get(model_name)
if adapter is None:
continue
try:
result = await adapter.extract(text, model_class)
if result is not None:
return result
except Exception as e:
print(f"[{model_name}] 抽出失敗: {e}")
continue
return None
async def extract_consensus(
self,
text: str,
model_class: type[T],
min_agreement: int = 2,
) -> Optional[T]:
import asyncio
tasks = {
name: adapter.extract(text, model_class)
for name, adapter in self.adapters.items()
}
results = await asyncio.gather(*tasks.values(), return_exceptions=True)
valid_results = []
for (name, _), result in zip(tasks.items(), results):
if isinstance(result, Exception):
print(f"[{name}] 例外: {result}")
continue
if result is not None:
valid_results.append(result)
if len(valid_results) >= min_agreement:
return valid_results[0]
return valid_results[0] if valid_results else None
マルチモデル適応アーキテクチャ
┌─────────────────────┐
│ MultiModelExtractor │
│ (統一インターフェース) │
└──────────┬──────────┘
│
┌────────────────┼────────────────┐
│ │ │
┌─────────▼──────┐ ┌──────▼───────┐ ┌──────▼───────┐
│ OpenAI Adapter │ │Anthropic Adp │ │ Gemini Adptr │
│ JSON_SCHEMA │ │ANTHROPIC_TOOLS│ │ Prompt+Parse │
│ Instructor │ │ Instructor │ │ 手動解析 │
└────────────────┘ └──────────────┘ └──────────────┘
パターン5:ストリーミング構造化出力
LLM構造化出力とストリーミングを組み合わせ、リアルタイム解析とプログレッシブ表示を実現。
import json
import asyncio
from typing import AsyncIterator, Optional
from pydantic import BaseModel, Field
from openai import AsyncOpenAI
class StreamingJsonParser:
def __init__(self):
self.buffer = ""
self.depth = 0
self.in_string = False
self.escape_next = False
self.started = False
def feed(self, chunk: str) -> list[dict]:
self.buffer += chunk
results = []
for char in chunk:
if self.escape_next:
self.escape_next = False
continue
if char == '\\' and self.in_string:
self.escape_next = True
continue
if char == '"' and not self.escape_next:
self.in_string = not self.in_string
continue
if self.in_string:
continue
if char == '{':
if not self.started:
self.started = True
idx = self.buffer.rfind('{')
self.buffer = self.buffer[idx:]
self.depth += 1
elif char == '}':
self.depth -= 1
if self.depth == 0 and self.started:
try:
parsed = json.loads(self.buffer)
results.append(parsed)
except json.JSONDecodeError:
pass
self.buffer = ""
self.started = False
return results
class PartialModelBuilder:
def __init__(self, model_class: type[BaseModel]):
self.model_class = model_class
self.current_json = {}
self.last_valid = None
def update(self, json_data: dict) -> Optional[BaseModel]:
self.current_json.update(json_data)
try:
self.last_valid = self.model_class.model_validate(self.current_json)
return self.last_valid
except Exception:
return self.last_valid
class StreamingEvent(BaseModel):
event_type: str = Field(description="イベントタイプ")
data: dict = Field(description="イベントデータ")
confidence: float = Field(ge=0.0, le=1.0, description="信頼度")
async def stream_structured_output(
prompt: str,
model_class: type[BaseModel],
model: str = "gpt-4o",
) -> AsyncIterator[BaseModel]:
import instructor
client = instructor.from_openai(AsyncOpenAI())
stream = await client.chat.completions.create_partial(
model=model,
messages=[{"role": "user", "content": prompt}],
response_model=model_class,
max_retries=2,
)
async for partial in stream:
yield partial
async def stream_with_raw_parser(
prompt: str,
model: str = "gpt-4o",
) -> AsyncIterator[dict]:
client = AsyncOpenAI()
schema_prompt = f"""JSON形式で結果を返してください。JSONのみ、他は不要。
{prompt}"""
stream = await client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": schema_prompt}],
stream=True,
temperature=0.1,
)
parser = StreamingJsonParser()
full_content = ""
async for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
full_content += content
parsed_results = parser.feed(content)
for result in parsed_results:
yield result
if not parsed_results and full_content:
try:
yield json.loads(full_content)
except json.JSONDecodeError:
pass
async def stream_sse_structured(
prompt: str,
model_class: type[BaseModel],
):
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
async def generate():
async for partial in stream_structured_output(prompt, model_class):
data = partial.model_dump_json(exclude_none=True)
yield f"data: {data}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={"X-Accel-Buffering": "no"},
)
ストリーミング構造化出力アーキテクチャ
クライアントリクエスト
│
▼
FastAPI SSEエンドポイント
│
▼
Instructor create_partial()
│
├──→ chunk1: {"name": "製品A"...}
├──→ chunk2: {"name": "製品A", "price": 99...}
├──→ chunk3: {"name": "製品A", "price": 99.0, "category": "電子"...}
└──→ 最終: 完全なPydanticモデルインスタンス
各chunkはSSE経由でクライアントにプッシュ
クライアントはプログレッシブにUIをレンダリング
パターン6:プロダクション級信頼性保障
すべてのパターンをプロダクション対応の構造化出力サービスに統合。
import json
import time
import logging
from typing import Optional
from dataclasses import dataclass, field
from enum import Enum
from pydantic import BaseModel, Field
from openai import AsyncOpenAI
logger = logging.getLogger(__name__)
class OutputStatus(str, Enum):
SUCCESS = "success"
VALIDATION_FAILED = "validation_failed"
PARSE_FAILED = "parse_failed"
LLM_ERROR = "llm_error"
TIMEOUT = "timeout"
RETRY_EXHAUSTED = "retry_exhausted"
@dataclass
class ExtractionResult:
data: Optional[BaseModel] = None
status: OutputStatus = OutputStatus.SUCCESS
attempts: int = 0
latency_ms: float = 0.0
error_message: str = ""
model_used: str = ""
tokens_used: dict = field(default_factory=dict)
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 60.0,
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time: Optional[float] = None
self.is_open = False
def record_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.is_open = True
def record_success(self):
self.failure_count = 0
self.is_open = False
def can_execute(self) -> bool:
if not self.is_open:
return True
if self.last_failure_time and \
time.time() - self.last_failure_time > self.recovery_timeout:
self.is_open = False
self.failure_count = 0
return True
return False
class StructuredOutputService:
def __init__(
self,
max_retries: int = 3,
timeout: float = 30.0,
fallback_models: Optional[list[str]] = None,
):
self.client = AsyncOpenAI()
self.max_retries = max_retries
self.timeout = timeout
self.fallback_models = fallback_models or ["gpt-4o", "gpt-4o-mini"]
self.circuit_breakers: dict[str, CircuitBreaker] = {}
def _get_breaker(self, model: str) -> CircuitBreaker:
if model not in self.circuit_breakers:
self.circuit_breakers[model] = CircuitBreaker()
return self.circuit_breakers[model]
async def extract(
self,
text: str,
model_class: type[BaseModel],
preferred_model: Optional[str] = None,
) -> ExtractionResult:
import instructor
models = [preferred_model] if preferred_model else self.fallback_models
models = [m for m in models if self._get_breaker(m).can_execute()]
if not models:
return ExtractionResult(
status=OutputStatus.RETRY_EXHAUSTED,
error_message="全モデルのサーキットブレーカーがオープン",
)
for model in models:
result = await self._try_extract(text, model_class, model)
if result.status == OutputStatus.SUCCESS:
self._get_breaker(model).record_success()
return result
else:
self._get_breaker(model).record_failure()
logger.warning(f"モデル{model}抽出失敗: {result.error_message}")
return result
async def _try_extract(
self,
text: str,
model_class: type[BaseModel],
model: str,
) -> ExtractionResult:
import instructor
start_time = time.time()
client = instructor.from_openai(self.client, mode=instructor.Mode.JSON_SCHEMA)
for attempt in range(1, self.max_retries + 1):
try:
result = await client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": text}],
response_model=model_class,
max_retries=0,
timeout=self.timeout,
)
latency = (time.time() - start_time) * 1000
return ExtractionResult(
data=result,
status=OutputStatus.SUCCESS,
attempts=attempt,
latency_ms=latency,
model_used=model,
)
except instructor.exceptions.InstructorRetryException as e:
logger.warning(f"試行{attempt}検証失敗: {e}")
continue
except Exception as e:
error_msg = str(e)
if "timeout" in error_msg.lower():
return ExtractionResult(
status=OutputStatus.TIMEOUT,
attempts=attempt,
latency_ms=(time.time() - start_time) * 1000,
error_message=error_msg,
model_used=model,
)
logger.error(f"試行{attempt}LLMエラー: {e}")
continue
return ExtractionResult(
status=OutputStatus.RETRY_EXHAUSTED,
attempts=self.max_retries,
latency_ms=(time.time() - start_time) * 1000,
error_message=f"{self.max_retries}回リトライ後も失敗",
model_used=model,
)
class StructuredOutputCache:
def __init__(self, ttl: float = 3600.0, max_size: int = 1000):
self.ttl = ttl
self.max_size = max_size
self._cache: dict[str, tuple[float, BaseModel]] = {}
def _make_key(self, text: str, model_class: type[BaseModel]) -> str:
import hashlib
content_hash = hashlib.sha256(text.encode()).hexdigest()[:16]
return f"{model_class.__name__}:{content_hash}"
def get(self, text: str, model_class: type[BaseModel]) -> Optional[BaseModel]:
key = self._make_key(text, model_class)
if key in self._cache:
timestamp, data = self._cache[key]
if time.time() - timestamp < self.ttl:
return data
del self._cache[key]
return None
def set(self, text: str, model_class: type[BaseModel], data: BaseModel):
if len(self._cache) >= self.max_size:
oldest_key = min(self._cache, key=lambda k: self._cache[k][0])
del self._cache[oldest_key]
key = self._make_key(text, model_class)
self._cache[key] = (time.time(), data)
プロダクションアーキテクチャ全体像
┌────────────────────────────┐
│ StructuredOutputService │
│ (統一エントリーポイント) │
└──────────┬─────────────────┘
│
┌──────────▼─────────────────┐
│ CircuitBreaker │
│ (モデルレベルサーキットブレーカー) │
└──────────┬─────────────────┘
│
┌────────────────┼────────────────┐
│ │ │
┌─────────▼──────┐ ┌──────▼───────┐ ┌──────▼───────┐
│ gpt-4o │ │ gpt-4o-mini │ │ fallback │
│ JSON_SCHEMA │ │ JSON_SCHEMA │ │ Prompt+Parse│
│ +Instructor │ │ +Instructor │ │ +リトライ │
└────────────────┘ └──────────────┘ └──────────────┘
│ │ │
└────────────────┼────────────────┘
│
┌──────────▼─────────────────┐
│ StructuredOutputCache │
│ (結果キャッシュ) │
└────────────────────────────┘
5つのよくある落とし穴と解決策
落とし穴1:LLMがコメント付きJSONを返す
import json
import re
def strip_json_comments(text: str) -> str:
text = re.sub(r'//.*?$', '', text, flags=re.MULTILINE)
text = re.sub(r'/\*.*?\*/', '', text, flags=re.DOTALL)
return text
raw = '''{
"name": "製品A", // コメント
"price": 99.0
/* 複数行
コメント */
}'''
clean = strip_json_comments(raw)
data = json.loads(clean)
落とし穴2:ネストSchemaで出力が切り詰められる
from pydantic import BaseModel, Field
class Address(BaseModel):
street: str
city: str
zip_code: str
class PersonFlat(BaseModel):
name: str
street: str = Field(description="番地")
city: str = Field(description="都市")
zip_code: str = Field(description="郵便番号")
class PersonNested(BaseModel):
name: str
address: Address
# 推奨:ネスト深度は2レベル以下、それ以上はフラット化
# 非推奨:Person → Address → GeoLocation → Coordinates
# 推奨:PersonFlat(全フィールドを同じレベルに)
落とし穴3:列挙値をLLMが守らない
from enum import Enum
from pydantic import BaseModel, Field, field_validator
class Sentiment(str, Enum):
POSITIVE = "positive"
NEGATIVE = "negative"
NEUTRAL = "neutral"
class ReviewWithEnum(BaseModel):
text: str
sentiment: Sentiment
@field_validator("sentiment", mode="before")
@classmethod
def normalize_sentiment(cls, v):
if isinstance(v, str):
v = v.strip().lower()
mapping = {
"ポジティブ": "positive", "良い": "positive", "素晴らしい": "positive",
"ネガティブ": "negative", "悪い": "negative", "ひどい": "negative",
"ニュートラル": "neutral", "普通": "neutral", "まあまあ": "neutral",
}
return mapping.get(v, v)
return v
落とし穴4:リスト長が制御不能
from pydantic import BaseModel, Field, field_validator
class TaggedContent(BaseModel):
content: str
tags: list[str] = Field(min_length=1, max_length=5)
@field_validator("tags")
@classmethod
def deduplicate_tags(cls, v: list[str]) -> list[str]:
seen = set()
result = []
for tag in v:
normalized = tag.strip().lower()
if normalized not in seen:
seen.add(normalized)
result.append(tag.strip())
return result[:5]
落とし穴5:Strict ModeがすべてのSchema機能をサポートしない
from pydantic import BaseModel, Field
# Strict Modeでサポートされない機能:
# 1. additionalProperties: false を明示的に設定する必要がある
# 2. オプションフィールドにはdefault値が必要
# 3. union型はサポートされない(一部モデル)
# 4. 複雑な正規表現patternはサポートされない
# 解決策:Schemaを簡素化 + field_validatorで補完
class SimpleProduct(BaseModel):
name: str
price: float = Field(gt=0)
category: str = Field(default="other")
tags: list[str] = Field(default_factory=list)
10のよくあるエラートラブルシューティング
| # | エラーメッセージ | 原因 | 解決策 |
|---|---|---|---|
| 1 | json.decoder.JSONDecodeError |
LLMが無効なJSONを返した | extract_json_from_responseでフォールトトレラント抽出 |
| 2 | ValidationError: field required |
LLMが必須フィールドを省略 | default値を追加またはInstructor自動リトライを使用 |
| 3 | InstructorRetryException: max retries |
3回リトライしても検証に通らない | Schemaが複雑すぎないか確認、ネストを簡素化 |
| 4 | TypeError: 'NoneType' object |
tool_callsが空、LLMが関数を呼び出していない | tool_choice設定を確認、モデルがFunction Callingをサポートしているか確認 |
| 5 | RateLimitError: 429 |
API呼び出しレート超過 | 指数バックオフリトライを追加、並行度を下げる |
| 6 | Timeout: request timed out |
LLM推論タイムアウト | Schema複雑度を下げる、timeoutパラメータを増やす |
| 7 | BadRequestError: Invalid schema |
Schemaがモデル要件を満たしていない | strict mode制限を確認、Schemaを簡素化 |
| 8 | ValidationError: string too long |
LLMが長すぎる文字列を返した | max_length制約を追加 |
| 9 | KeyError: 'tool_calls' |
モデルがFunction Callingをサポートしていない | JSON SchemaモードまたはPromptモードに切り替え |
| 10 | RecursionError: maximum depth |
Schemaネストが深すぎる | ネスト構造をフラット化、最大2レベル |
高度な最適化テクニック
テクニック1:Few-shot例で精度向上
from pydantic import BaseModel, Field
from openai import AsyncOpenAI
import instructor
class Classification(BaseModel):
category: str = Field(description="カテゴリ")
confidence: float = Field(ge=0.0, le=1.0)
async def few_shot_extract(text: str) -> Classification:
client = instructor.from_openai(AsyncOpenAI())
examples = [
{"role": "user", "content": "この製品は素晴らしい、強くお勧めします!"},
{"role": "assistant", "content": '{"category": "positive", "confidence": 0.95}'},
{"role": "user", "content": "品質は普通、価格が高すぎる"},
{"role": "assistant", "content": '{"category": "neutral", "confidence": 0.7}'},
]
return await client.chat.completions.create(
model="gpt-4o",
messages=examples + [{"role": "user", "content": text}],
response_model=Classification,
max_retries=2,
)
テクニック2:Schema記述の最適化
from pydantic import BaseModel, Field
class BadSchema(BaseModel):
type: str
value: str
class GoodSchema(BaseModel):
type: str = Field(
description="エンティティタイプ、次のいずれか: person, organization, location, date"
)
value: str = Field(
description="エンティティの標準化値。personはフルネーム、organizationは正式名称、"
"dateはYYYY-MM-DD形式、locationは都市+国"
)
テクニック3:段階的抽出で複雑構造に対応
from pydantic import BaseModel, Field
from openai import AsyncOpenAI
import instructor
class BasicInfo(BaseModel):
title: str
summary: str
class DetailedInfo(BasicInfo):
key_points: list[str]
entities: list[str]
sentiment: str
async def progressive_extract(text: str) -> DetailedInfo:
client = instructor.from_openai(AsyncOpenAI())
basic = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": f"基本情報を抽出:\n{text}"}],
response_model=BasicInfo,
max_retries=2,
)
detailed = await client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "user", "content": f"以下の基本情報に基づき、詳細分析を抽出:\n"
f"タイトル: {basic.title}\n要約: {basic.summary}\n\n原文:\n{text}"}
],
response_model=DetailedInfo,
max_retries=2,
)
return detailed
テクニック4:出力品質自己チェック
from pydantic import BaseModel, Field, model_validator
class SelfValidatingOutput(BaseModel):
question: str
answer: str
sources: list[str] = Field(min_length=1)
confidence: float = Field(ge=0.0, le=1.0)
@model_validator(mode="after")
def check_answer_quality(self):
if len(self.answer) < 10:
raise ValueError("回答が短すぎます、不完全な可能性があります")
if self.confidence > 0.9 and len(self.sources) < 2:
raise ValueError("高信頼度だがソースが不足、再検証してください")
return self
比較分析:3つの構造化出力アプローチ
| 次元 | Prompt+JSON解析 | Function Callingプロトコル | Instructorライブラリ |
|---|---|---|---|
| 信頼性 | ⭐⭐ 60-80% | ⭐⭐⭐⭐ 90-95% | ⭐⭐⭐⭐⭐ 95-99% |
| 実装複雑さ | 低 | 中 | 低(ラップ後) |
| モデル互換性 | 全モデル | OpenAI/一部モデル | OpenAI/Anthropic/Gemini |
| 自動リトライ | ❌手動 | ❌手動 | ✅内蔵 |
| ストリーミング対応 | ❌困難 | ⚠️制限あり | ✅create_partial |
| Schema検証 | ❌手動 | ⚠️一部 | ✅Pydantic自動 |
| デバッグ難易度 | 高 | 中 | 低 |
| プロダクション推奨度 | ⭐非推奨 | ⭐⭐⭐推奨 | ⭐⭐⭐⭐⭐強く推奨 |
| Tokenオーバーヘッド | 低 | 中(+tool定義) | 中(+tool定義) |
| ネスト深度 | 無制限 | 制限あり | 制限あり |
選択判定ツリー
構造化出力が必要か?
├── いいえ → Chat Completionを直接使用
└── はい → どのモデルを使用?
├── OpenAIのみ → Instructor + Mode.JSON_SCHEMA
├── OpenAI + Anthropic → Instructor + アダプターパターン
├── 任意のモデル → Prompt+JSON解析 + 厳格な検証
└── ストリーミングが必要 → Instructor + create_partial
オンラインツール推奨
- JSONフォーマッター&バリデーター:/ja/json/format
- JSONPathクエリ:/ja/json/jsonpath
- cURL to Code:/ja/dev/curl-to-code
まとめ:Python LLM構造化出力はAIエンジニアリングのコアインフラです。6つのパターンはシンプルから複雑へ:JSON Schema制約→Function Callingプロトコル→Instructor自動リトライ→マルチモデル適応→ストリーミング構造化出力→プロダクション信頼性保障。プロダクションではInstructorライブラリを推奨、Pydantic検証と自動リトライで95%以上の信頼性を実現。重要ポイント:1)ネストSchemaは2レベル以下、2)列挙値はfield_validatorで正規化、3)サーキットブレーカーで下流モデルを保護、4)キャッシュで重複呼び出しを削減。マルチモデルシナリオではアダプターパターンで統一インターフェース、ストリーミングシナリオではcreate_partialでプログレッシブ出力。
関連記事
ブラウザローカルツールを無料で試す →