Python LLM API 統合開発実践:基礎からプロダクション運用まで

AI与大数据

2026年のLLM APIエコシステム全景

LLM APIは現代アプリケーションのインフラとなりました。2026年の主要プロバイダー:

プロバイダー 代表モデル SDK 特徴
OpenAI GPT-4o, o3 openai エコシステム最充実、Function Calling標準
Anthropic Claude 4 Sonnet/Opus anthropic 200Kコンテキスト、安全性重視
Google Gemini 2.5 Pro/Flash google-genai ネイティブマルチモーダル、無料枠大
DeepSeek DeepSeek-V3/R1 openai互換 コスパ最強、推論力高い
Alibaba Qwen3 openai互換 中国語力抜群、中国国内アクセス高速

多くの中国モデルAPIはOpenAI SDKフォーマットと互換性があり、base_urlを変更するだけで切り替え可能です。


OpenAI SDK 完全ガイド

インストールと初期化

pip install openai pydantic tiktoken httpx
from openai import OpenAI

client = OpenAI(
    api_key="sk-xxxxxxxx",
    base_url="https://api.openai.com/v1"
)

# DeepSeek(OpenAI互換)を使用
deepseekClient = OpenAI(
    api_key="sk-xxxxxxxx",
    base_url="https://api.deepseek.com/v1"
)

基本的なチャット(Chat Completion)

response = client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {"role": "system", "content": "あなたはプロのPython開発アシスタントです。"},
        {"role": "user", "content": "Pythonのデコレータの仕組みを説明して"}
    ],
    temperature=0.7,
    max_tokens=1024
)

print(response.choices[0].message.content)
print(f"トークン使用量: {response.usage.total_tokens}")

ストリーミング出力

ストリーミングでユーザーは完全な応答を待つ必要がありません:

stream = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "プログラミングについて詩を書いて"}],
    stream=True
)

for chunk in stream:
    if chunk.choices[0].delta.content is not None:
        print(chunk.choices[0].delta.content, end="", flush=True)

Function Calling(ツール呼び出し)

モデルに外部関数を呼び出してリアルタイムデータを取得させます:

import json

tools = [
    {
        "type": "function",
        "function": {
            "name": "get_weather",
            "description": "指定都市の天気情報を取得",
            "parameters": {
                "type": "object",
                "properties": {
                    "city": {"type": "string", "description": "都市名"},
                    "unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}
                },
                "required": ["city"]
            }
        }
    }
]

def get_weather(city: str, unit: str = "celsius") -> dict:
    return {"city": city, "temperature": 22, "unit": unit, "condition": "晴れ"}

response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "東京の今日の天気は?"}],
    tools=tools
)

tool_call = response.choices[0].message.tool_calls[0]
args = json.loads(tool_call.function.arguments)
result = get_weather(**args)

response2 = client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {"role": "user", "content": "東京の今日の天気は?"},
        response.choices[0].message,
        {"role": "tool", "tool_call_id": tool_call.id, "content": json.dumps(result, ensure_ascii=False)}
    ]
)

print(response2.choices[0].message.content)

ビジョン(Vision)

import base64

def encode_image(image_path: str) -> str:
    with open(image_path, "rb") as f:
        return base64.b64encode(f.read()).decode("utf-8")

base64_image = encode_image("screenshot.png")

response = client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {
            "role": "user",
            "content": [
                {"type": "text", "text": "この画像の内容を説明して"},
                {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}}
            ]
        }
    ],
    max_tokens=512
)

print(response.choices[0].message.content)

Base64エンコードツールで画像を素早くエンコードしてテストできます。


Pydantic構造化出力

Structured Outputsの使用

from pydantic import BaseModel
from openai import OpenAI

class CodeReview(BaseModel):
    score: int
    issues: list[str]
    suggestions: list[str]
    summary: str

client = OpenAI()
response = client.beta.chat.completions.parse(
    model="gpt-4o",
    messages=[
        {"role": "system", "content": "あなたはコードレビューの専門家です。"},
        {"role": "user", "content": "このコードをレビューして:def add(a,b): return a+b"}
    ],
    response_format=CodeReview
)

review = response.choices[0].message.parsed
print(f"スコア: {review.score}/10")
print(f"問題: {review.issues}")
print(f"提案: {review.suggestions}")

複雑なネスト構造

from pydantic import BaseModel
from typing import Optional

class ApiEndpoint(BaseModel):
    path: str
    method: str
    description: str
    request_body: Optional[dict] = None

class ApiSpec(BaseModel):
    title: str
    version: str
    endpoints: list[ApiEndpoint]

response = client.beta.chat.completions.parse(
    model="gpt-4o",
    messages=[{"role": "user", "content": "ユーザー管理システムのAPI仕様を設計して"}],
    response_format=ApiSpec
)

spec = response.choices[0].message.parsed
for ep in spec.endpoints:
    print(f"{ep.method} {ep.path}: {ep.description}")

非同期バッチ処理

AsyncOpenAIの基本

import asyncio
from openai import AsyncOpenAI

async_client = AsyncOpenAI()

async def translate_text(text: str, target_lang: str) -> str:
    response = await async_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": f"{target_lang}に翻訳して、翻訳結果のみ返して"},
            {"role": "user", "content": text}
        ],
        temperature=0.3
    )
    return response.choices[0].message.content

async def batch_translate(texts: list[str], target_lang: str) -> list[str]:
    tasks = [translate_text(t, target_lang) for t in texts]
    return await asyncio.gather(*tasks)

texts = ["Hello World", "Good morning", "Thank you"]
results = asyncio.run(batch_translate(texts, "日本語"))
for orig, trans in zip(texts, results):
    print(f"{orig} -> {trans}")

同時接続制限付きバッチ処理

from asyncio import Semaphore

async def batch_with_concurrency(
    texts: list[str],
    target_lang: str,
    max_concurrent: int = 5
) -> list[str]:
    sem = Semaphore(max_concurrent)

    async def limited_translate(text: str) -> str:
        async with sem:
            return await translate_text(text, target_lang)

    tasks = [limited_translate(t) for t in texts]
    return await asyncio.gather(*tasks)

レート制限とリトライ戦略

指数バックオフリトライ

import time
from openai import APITimeoutError, RateLimitError, APIConnectionError

def call_with_retry(client, max_retries: int = 3, **kwargs):
    for attempt in range(max_retries):
        try:
            return client.chat.completions.create(**kwargs)
        except RateLimitError:
            wait = 2 ** attempt + 1
            print(f"レート制限、{wait}秒待機後にリトライ...")
            time.sleep(wait)
        except APITimeoutError:
            print(f"リクエストタイムアウト、{attempt + 1}回目のリトライ")
        except APIConnectionError:
            print(f"接続エラー、{attempt + 1}回目のリトライ")
            time.sleep(1)
    raise Exception(f"{max_retries}回リトライ後も失敗")

tenacityライブラリの使用

from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=60),
    retry=retry_if_exception_type((RateLimitError, APITimeoutError))
)
def call_api(client, **kwargs):
    return client.chat.completions.create(**kwargs)

トークンカウントとコスト最適化

tiktokenでトークン数を計算

import tiktoken

def count_tokens(text: str, model: str = "gpt-4o") -> int:
    encoding = tiktoken.encoding_for_model(model)
    return len(encoding.encode(text))

prompt = "PythonのGILメカニズムについて詳しく説明して"
print(f"トークン数: {count_tokens(prompt)}")

コスト見積もり

PRICING = {
    "gpt-4o": {"input": 2.50 / 1_000_000, "output": 10.00 / 1_000_000},
    "gpt-4o-mini": {"input": 0.15 / 1_000_000, "output": 0.60 / 1_000_000},
    "deepseek-chat": {"input": 0.27 / 1_000_000, "output": 1.10 / 1_000_000},
}

def estimate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
    pricing = PRICING.get(model, PRICING["gpt-4o"])
    return input_tokens * pricing["input"] + output_tokens * pricing["output"]

cost = estimate_cost("gpt-4o", 1000, 500)
print(f"推定コスト: ${cost:.6f}")

最適化戦略

  • 安いモデルを使う:簡単なタスクはgpt-4o-miniで十分
  • プロンプトを圧縮:冗長な説明を削除、system promptを短く
  • 結果をキャッシュ:同じリクエストの重複呼び出しを回避
  • max_tokensを制御:合理的な最大出力長を設定
  • バッチ処理:複数の小さなリクエストを1つにまとめる

マルチモデルルーティング

スマートルーター

from openai import OpenAI
from typing import Optional

class ModelRouter:
    def __init__(self):
        self.clients = {
            "openai": OpenAI(api_key="sk-xxx"),
            "deepseek": OpenAI(api_key="sk-xxx", base_url="https://api.deepseek.com/v1"),
            "qwen": OpenAI(api_key="sk-xxx", base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"),
        }
        self.model_map = {
            "openai": "gpt-4o",
            "deepseek": "deepseek-chat",
            "qwen": "qwen-plus",
        }

    def route(self, prompt: str) -> str:
        if any(kw in prompt for kw in ["コード", "プログラミング", "debug", "code"]):
            return "deepseek"
        if any(kw in prompt for kw in ["翻訳", "中国語", "執筆"]):
            return "qwen"
        return "openai"

    def chat(self, prompt: str, system: str = "") -> str:
        provider = self.route(prompt)
        client = self.clients[provider]
        model = self.model_map[provider]
        messages = []
        if system:
            messages.append({"role": "system", "content": system})
        messages.append({"role": "user", "content": prompt})
        response = client.chat.completions.create(model=model, messages=messages)
        return response.choices[0].message.content

router = ModelRouter()
print(router.chat("Pythonでクイックソートを実装して"))

エラーハンドリングのベストプラクティス

よくあるエラータイプ

エラー HTTPステータス 原因 対処法
RateLimitError 429 リクエスト頻度過多 指数バックオフリトライ
BadRequestError 400 パラメータエラー リクエストパラメータ確認
AuthenticationError 401 API Key無効 キー設定を確認
NotFoundError 404 モデルが存在しない モデル名を確認
APIStatusError 500+ サーバーエラー リトライまたはモデル切替

コンテキスト長超過の処理

from openai import BadRequestError

def safe_chat(client, model: str, messages: list, max_context: int = 128000) -> str:
    try:
        return client.chat.completions.create(model=model, messages=messages)
    except BadRequestError as e:
        if "context_length_exceeded" in str(e):
            while messages and len(str(messages)) > max_context:
                if len(messages) > 2:
                    messages.pop(1)
                else:
                    messages[-1]["content"] = messages[-1]["content"][:max_context // 2]
                    break
            return client.chat.completions.create(model=model, messages=messages)
        raise

FastAPIでプロダクションAPIを構築

プロジェクト構造

llm-api/
├── main.py
├── config.py
├── routers/
│   └── chat.py
├── services/
│   ├── llm_service.py
│   └── cache_service.py
└── requirements.txt

メインアプリケーション

from fastapi import FastAPI
from routers.chat import router as chat_router

app = FastAPI(title="LLM API Service", version="1.0.0")
app.include_router(chat_router, prefix="/api/v1")

@app.get("/health")
async def health_check():
    return {"status": "ok"}

チャットルーター

from fastapi import APIRouter, HTTPException
from pydantic import BaseModel

router = APIRouter()

class ChatRequest(BaseModel):
    message: str
    model: str = "gpt-4o-mini"
    temperature: float = 0.7
    max_tokens: int = 1024
    stream: bool = False

class ChatResponse(BaseModel):
    reply: str
    model: str
    tokens: int

@router.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    try:
        from services.llm_service import llm_service
        result = await llm_service.chat(
            message=request.message,
            model=request.model,
            temperature=request.temperature,
            max_tokens=request.max_tokens
        )
        return ChatResponse(**result)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

LLMサービス層

from openai import AsyncOpenAI
from services.cache_service import cache_service

class LLMService:
    def __init__(self):
        self.client = AsyncOpenAI()

    async def chat(self, message: str, model: str = "gpt-4o-mini",
                   temperature: float = 0.7, max_tokens: int = 1024) -> dict:
        cache_key = cache_service.make_key(message, model, temperature)
        cached = await cache_service.get(cache_key)
        if cached:
            return cached

        response = await self.client.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": message}],
            temperature=temperature,
            max_tokens=max_tokens
        )

        result = {
            "reply": response.choices[0].message.content,
            "model": response.model,
            "tokens": response.usage.total_tokens
        }

        await cache_service.set(cache_key, result, ttl=3600)
        return result

llm_service = LLMService()

キャッシュ戦略

Redisキャッシュ実装

import hashlib
import json

class CacheService:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        import redis.asyncio as aioredis
        self.redis = aioredis.from_url(redis_url)

    def make_key(self, message: str, model: str, temperature: float) -> str:
        raw = f"{message}:{model}:{temperature}"
        return f"llm:cache:{hashlib.md5(raw.encode()).hexdigest()}"

    async def get(self, key: str) -> dict | None:
        data = await self.redis.get(key)
        return json.loads(data) if data else None

    async def set(self, key: str, value: dict, ttl: int = 3600):
        await self.redis.setex(key, ttl, json.dumps(value, ensure_ascii=False))

cache_service = CacheService()

ハッシュ計算ツールでキャッシュキーのハッシュ原理を理解できます。


監視とログ

構造化ログ

import logging
import json
from datetime import datetime

class LLMLogger:
    def __init__(self):
        self.logger = logging.getLogger("llm_api")
        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter("%(message)s"))
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)

    def log_request(self, model: str, prompt: str, tokens_in: int):
        self.logger.info(json.dumps({
            "event": "llm_request",
            "model": model,
            "tokens_in": tokens_in,
            "timestamp": datetime.utcnow().isoformat()
        }, ensure_ascii=False))

    def log_response(self, model: str, tokens_out: int, latency_ms: float, cost: float):
        self.logger.info(json.dumps({
            "event": "llm_response",
            "model": model,
            "tokens_out": tokens_out,
            "latency_ms": round(latency_ms, 2),
            "cost_usd": round(cost, 6),
            "timestamp": datetime.utcnow().isoformat()
        }, ensure_ascii=False))

    def log_error(self, model: str, error_type: str, error_msg: str):
        self.logger.error(json.dumps({
            "event": "llm_error",
            "model": model,
            "error_type": error_type,
            "error_msg": error_msg,
            "timestamp": datetime.utcnow().isoformat()
        }, ensure_ascii=False))

よくあるエラーとデバッグ

問題1:API Key設定エラー

import os
from openai import AuthenticationError

client = OpenAI(api_key=os.getenv("OPENAI_API_KEY", ""))
if not client.api_key:
    raise ValueError("OPENAI_API_KEY環境変数を設定してください")

問題2:レスポンス内容が空

response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "hello"}],
    max_tokens=10
)

if response.choices[0].finish_reason == "length":
    print("出力が途切れています。max_tokensを増やしてください")

問題3:JSONパース失敗

import json
import re

def extract_json(text: str) -> dict:
    patterns = [r"```json\n(.*?)\n```", r"```\n(.*?)\n```", r"(\{.*\})"]
    for pattern in patterns:
        match = re.search(pattern, text, re.DOTALL)
        if match:
            try:
                return json.loads(match.group(1))
            except json.JSONDecodeError:
                continue
    raise ValueError("レスポンスからJSONを抽出できません")

よくある質問

適切なモデルの選び方は?

簡単な分類・抽出はgpt-4o-mini、複雑な推論はgpt-4oo3、日本語・中国語メインならqwen-plusdeepseek-chatを検討。

長いコンテキストの処理方法は?

tiktokenでトークン数を事前計算し、制限を超える場合は履歴メッセージを切り詰めるか要約で圧縮。

APIコストを下げるには?

miniモデルの優先使用、重複リクエストのキャッシュ、max_tokensの制御、プロンプト圧縮、バッチ処理が有効。

中国からOpenAI APIに安定アクセスするには?

Azure OpenAI Serviceを利用するか、OpenAI互換の中国モデル(DeepSeek、Qwen)に切り替え。


まとめ

Python LLM API統合開発では、SDKの使い方、構造化出力、非同期処理、エラーリトライ、コスト最適化の習得が不可欠です。本番環境ではキャッシュ、監視、マルチモデルルーティングも重要です。ToolsKuのJSONフォーマッターBase64エンコードハッシュ計算などのツールでAPI開発・デバッグをサポートできます。

ブラウザローカルツールを無料で試す →

#Python#大模型#LLM#API#OpenAI#教程