Python 大模型 API 集成开发实战:从入门到生产级部署

AI与大数据

2026 年大模型 API 生态全景

大模型 API 已经成为现代应用的基础设施。2026 年主流 API 提供商包括:

提供商 代表模型 SDK 特点
OpenAI GPT-4o, o3 openai 生态最完善,Function Calling 标杆
Anthropic Claude 4 Sonnet/Opus anthropic 长上下文(200K),安全对齐强
Google Gemini 2.5 Pro/Flash google-genai 多模态原生支持,免费额度大
DeepSeek DeepSeek-V3/R1 openai 兼容 性价比极高,推理能力强
阿里云 Qwen3 openai 兼容 中文能力突出,国内访问快

大多数国内模型 API 兼容 OpenAI SDK 格式,只需更换 base_url 即可切换。


OpenAI SDK 完整使用指南

安装与初始化

pip install openai pydantic tiktoken httpx
from openai import OpenAI

client = OpenAI(
    api_key="sk-xxxxxxxx",
    base_url="https://api.openai.com/v1"
)

# 使用国内模型(如 DeepSeek)
deepseekClient = OpenAI(
    api_key="sk-xxxxxxxx",
    base_url="https://api.deepseek.com/v1"
)

基础对话(Chat Completion)

response = client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {"role": "system", "content": "你是一个专业的 Python 开发助手。"},
        {"role": "user", "content": "解释 Python 装饰器的工作原理"}
    ],
    temperature=0.7,
    max_tokens=1024
)

print(response.choices[0].message.content)
print(f"Token 用量: {response.usage.total_tokens}")

流式输出(Streaming)

流式输出让用户无需等待完整响应,大幅提升体验:

stream = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "写一首关于编程的诗"}],
    stream=True
)

for chunk in stream:
    if chunk.choices[0].delta.content is not None:
        print(chunk.choices[0].delta.content, end="", flush=True)

Function Calling(工具调用)

让模型调用外部函数获取实时数据:

import json

tools = [
    {
        "type": "function",
        "function": {
            "name": "get_weather",
            "description": "获取指定城市的天气信息",
            "parameters": {
                "type": "object",
                "properties": {
                    "city": {"type": "string", "description": "城市名称"},
                    "unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}
                },
                "required": ["city"]
            }
        }
    }
]

def get_weather(city: str, unit: str = "celsius") -> dict:
    return {"city": city, "temperature": 26, "unit": unit, "condition": "晴"}

response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "北京今天天气怎么样?"}],
    tools=tools
)

tool_call = response.choices[0].message.tool_calls[0]
args = json.loads(tool_call.function.arguments)
result = get_weather(**args)

# 将工具结果返回给模型
response2 = client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {"role": "user", "content": "北京今天天气怎么样?"},
        response.choices[0].message,
        {"role": "tool", "tool_call_id": tool_call.id, "content": json.dumps(result, ensure_ascii=False)}
    ]
)

print(response2.choices[0].message.content)

视觉能力(Vision)

import base64

def encode_image(image_path: str) -> str:
    with open(image_path, "rb") as f:
        return base64.b64encode(f.read()).decode("utf-8")

base64_image = encode_image("screenshot.png")

response = client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {
            "role": "user",
            "content": [
                {"type": "text", "text": "描述这张图片的内容"},
                {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}}
            ]
        }
    ],
    max_tokens=512
)

print(response.choices[0].message.content)

使用 Base64 编码工具 快速编码图片进行测试。


Pydantic 结构化输出

使用 Structured Outputs

from pydantic import BaseModel
from openai import OpenAI

class CodeReview(BaseModel):
    score: int
    issues: list[str]
    suggestions: list[str]
    summary: str

client = OpenAI()
response = client.beta.chat.completions.parse(
    model="gpt-4o",
    messages=[
        {"role": "system", "content": "你是一个代码审查专家。"},
        {"role": "user", "content": "审查这段代码:def add(a,b): return a+b"}
    ],
    response_format=CodeReview
)

review = response.choices[0].message.parsed
print(f"评分: {review.score}/10")
print(f"问题: {review.issues}")
print(f"建议: {review.suggestions}")

复杂嵌套结构

from pydantic import BaseModel
from typing import Optional

class ApiEndpoint(BaseModel):
    path: str
    method: str
    description: str
    request_body: Optional[dict] = None

class ApiSpec(BaseModel):
    title: str
    version: str
    endpoints: list[ApiEndpoint]

response = client.beta.chat.completions.parse(
    model="gpt-4o",
    messages=[{"role": "user", "content": "设计一个用户管理系统的 API 规范"}],
    response_format=ApiSpec
)

spec = response.choices[0].message.parsed
for ep in spec.endpoints:
    print(f"{ep.method} {ep.path}: {ep.description}")

异步批处理

AsyncOpenAI 基础

import asyncio
from openai import AsyncOpenAI

async_client = AsyncOpenAI()

async def translate_text(text: str, target_lang: str) -> str:
    response = await async_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": f"翻译为{target_lang},只返回翻译结果"},
            {"role": "user", "content": text}
        ],
        temperature=0.3
    )
    return response.choices[0].message.content

async def batch_translate(texts: list[str], target_lang: str) -> list[str]:
    tasks = [translate_text(t, target_lang) for t in texts]
    return await asyncio.gather(*tasks)

texts = ["Hello World", "Good morning", "Thank you"]
results = asyncio.run(batch_translate(texts, "中文"))
for orig, trans in zip(texts, results):
    print(f"{orig} -> {trans}")

带并发控制的批处理

from asyncio import Semaphore

async def batch_with_concurrency(
    texts: list[str],
    target_lang: str,
    max_concurrent: int = 5
) -> list[str]:
    sem = Semaphore(max_concurrent)

    async def limited_translate(text: str) -> str:
        async with sem:
            return await translate_text(text, target_lang)

    tasks = [limited_translate(t) for t in texts]
    return await asyncio.gather(*tasks)

速率限制与重试策略

指数退避重试

import time
from openai import APITimeoutError, RateLimitError, APIConnectionError

def call_with_retry(client, max_retries: int = 3, **kwargs):
    for attempt in range(max_retries):
        try:
            return client.chat.completions.create(**kwargs)
        except RateLimitError:
            wait = 2 ** attempt + 1
            print(f"速率限制,等待 {wait}s 后重试...")
            time.sleep(wait)
        except APITimeoutError:
            print(f"请求超时,第 {attempt + 1} 次重试")
        except APIConnectionError:
            print(f"连接错误,第 {attempt + 1} 次重试")
            time.sleep(1)
    raise Exception(f"重试 {max_retries} 次后仍然失败")

使用 tenacity 库

from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=60),
    retry=retry_if_exception_type((RateLimitError, APITimeoutError))
)
def call_api(client, **kwargs):
    return client.chat.completions.create(**kwargs)

Token 计数与成本优化

使用 tiktoken 计算 Token

import tiktoken

def count_tokens(text: str, model: str = "gpt-4o") -> int:
    encoding = tiktoken.encoding_for_model(model)
    return len(encoding.encode(text))

prompt = "请详细解释 Python 的 GIL 机制"
print(f"Token 数量: {count_tokens(prompt)}")

成本估算

PRICING = {
    "gpt-4o": {"input": 2.50 / 1_000_000, "output": 10.00 / 1_000_000},
    "gpt-4o-mini": {"input": 0.15 / 1_000_000, "output": 0.60 / 1_000_000},
    "deepseek-chat": {"input": 0.27 / 1_000_000, "output": 1.10 / 1_000_000},
}

def estimate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
    pricing = PRICING.get(model, PRICING["gpt-4o"])
    return input_tokens * pricing["input"] + output_tokens * pricing["output"]

cost = estimate_cost("gpt-4o", 1000, 500)
print(f"预估费用: ${cost:.6f}")

优化策略

  • 使用更便宜的模型:简单任务用 gpt-4o-mini 代替 gpt-4o
  • 压缩 Prompt:移除冗余描述,减少 system prompt 长度
  • 缓存结果:相同请求不重复调用
  • 控制 max_tokens:设置合理的最大输出长度
  • 批量处理:合并多个小请求为一个大请求

多模型路由

智能路由器

from openai import OpenAI
from typing import Optional
import re

class ModelRouter:
    def __init__(self):
        self.clients = {
            "openai": OpenAI(api_key="sk-xxx"),
            "deepseek": OpenAI(api_key="sk-xxx", base_url="https://api.deepseek.com/v1"),
            "qwen": OpenAI(api_key="sk-xxx", base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"),
        }
        self.model_map = {
            "openai": "gpt-4o",
            "deepseek": "deepseek-chat",
            "qwen": "qwen-plus",
        }

    def route(self, prompt: str) -> str:
        if any(kw in prompt for kw in ["代码", "编程", "debug", "code"]):
            return "deepseek"
        if any(kw in prompt for kw in ["翻译", "中文", "写作"]):
            return "qwen"
        return "openai"

    def chat(self, prompt: str, system: str = "") -> str:
        provider = self.route(prompt)
        client = self.clients[provider]
        model = self.model_map[provider]
        messages = []
        if system:
            messages.append({"role": "system", "content": system})
        messages.append({"role": "user", "content": prompt})
        response = client.chat.completions.create(model=model, messages=messages)
        return response.choices[0].message.content

router = ModelRouter()
print(router.chat("帮我写一个快速排序的 Python 实现"))

错误处理最佳实践

常见错误类型

错误 HTTP 状态码 原因 处理方式
RateLimitError 429 请求频率过高 指数退避重试
BadRequestError 400 参数错误 检查请求参数
AuthenticationError 401 API Key 无效 检查密钥配置
NotFoundError 404 模型不存在 确认模型名称
APIStatusError 500+ 服务端错误 重试或切换模型

Context Length 超限处理

from openai import BadRequestError

def safe_chat(client, model: str, messages: list, max_context: int = 128000) -> str:
    try:
        return client.chat.completions.create(model=model, messages=messages)
    except BadRequestError as e:
        if "context_length_exceeded" in str(e):
            while messages and len(str(messages)) > max_context:
                if len(messages) > 2:
                    messages.pop(1)
                else:
                    messages[-1]["content"] = messages[-1]["content"][:max_context // 2]
                    break
            return client.chat.completions.create(model=model, messages=messages)
        raise

FastAPI 生产级 API 搭建

完整项目结构

llm-api/
├── main.py
├── config.py
├── routers/
│   └── chat.py
├── services/
│   ├── llm_service.py
│   └── cache_service.py
└── requirements.txt

主应用入口

from fastapi import FastAPI
from routers.chat import router as chat_router

app = FastAPI(title="LLM API Service", version="1.0.0")
app.include_router(chat_router, prefix="/api/v1")

@app.get("/health")
async def health_check():
    return {"status": "ok"}

对话路由

from fastapi import APIRouter, HTTPException
from pydantic import BaseModel

router = APIRouter()

class ChatRequest(BaseModel):
    message: str
    model: str = "gpt-4o-mini"
    temperature: float = 0.7
    max_tokens: int = 1024
    stream: bool = False

class ChatResponse(BaseModel):
    reply: str
    model: str
    tokens: int

@router.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    try:
        from services.llm_service import llm_service
        result = await llm_service.chat(
            message=request.message,
            model=request.model,
            temperature=request.temperature,
            max_tokens=request.max_tokens
        )
        return ChatResponse(**result)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

LLM 服务层

from openai import AsyncOpenAI
from services.cache_service import cache_service

class LLMService:
    def __init__(self):
        self.client = AsyncOpenAI()

    async def chat(self, message: str, model: str = "gpt-4o-mini",
                   temperature: float = 0.7, max_tokens: int = 1024) -> dict:
        cache_key = cache_service.make_key(message, model, temperature)
        cached = await cache_service.get(cache_key)
        if cached:
            return cached

        response = await self.client.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": message}],
            temperature=temperature,
            max_tokens=max_tokens
        )

        result = {
            "reply": response.choices[0].message.content,
            "model": response.model,
            "tokens": response.usage.total_tokens
        }

        await cache_service.set(cache_key, result, ttl=3600)
        return result

llm_service = LLMService()

缓存策略

Redis 缓存实现

import hashlib
import json

class CacheService:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        import redis.asyncio as aioredis
        self.redis = aioredis.from_url(redis_url)

    def make_key(self, message: str, model: str, temperature: float) -> str:
        raw = f"{message}:{model}:{temperature}"
        return f"llm:cache:{hashlib.md5(raw.encode()).hexdigest()}"

    async def get(self, key: str) -> dict | None:
        data = await self.redis.get(key)
        return json.loads(data) if data else None

    async def set(self, key: str, value: dict, ttl: int = 3600):
        await self.redis.setex(key, ttl, json.dumps(value, ensure_ascii=False))

cache_service = CacheService()

使用 Hash 计算工具 了解缓存键的哈希原理。


监控与日志

结构化日志

import logging
import json
from datetime import datetime

class LLMLogger:
    def __init__(self):
        self.logger = logging.getLogger("llm_api")
        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter("%(message)s"))
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)

    def log_request(self, model: str, prompt: str, tokens_in: int):
        self.logger.info(json.dumps({
            "event": "llm_request",
            "model": model,
            "tokens_in": tokens_in,
            "timestamp": datetime.utcnow().isoformat()
        }, ensure_ascii=False))

    def log_response(self, model: str, tokens_out: int, latency_ms: float, cost: float):
        self.logger.info(json.dumps({
            "event": "llm_response",
            "model": model,
            "tokens_out": tokens_out,
            "latency_ms": round(latency_ms, 2),
            "cost_usd": round(cost, 6),
            "timestamp": datetime.utcnow().isoformat()
        }, ensure_ascii=False))

    def log_error(self, model: str, error_type: str, error_msg: str):
        self.logger.error(json.dumps({
            "event": "llm_error",
            "model": model,
            "error_type": error_type,
            "error_msg": error_msg,
            "timestamp": datetime.utcnow().isoformat()
        }, ensure_ascii=False))

常见错误与调试

问题 1:API Key 配置错误

import os
from openai import AuthenticationError

client = OpenAI(api_key=os.getenv("OPENAI_API_KEY", ""))
if not client.api_key:
    raise ValueError("请设置 OPENAI_API_KEY 环境变量")

问题 2:响应内容为空

response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "hello"}],
    max_tokens=10
)

if response.choices[0].finish_reason == "length":
    print("输出被截断,请增大 max_tokens")

问题 3:JSON 解析失败

import json

def extract_json(text: str) -> dict:
    patterns = [r"```json\n(.*?)\n```", r"```\n(.*?)\n```", r"(\{.*\})"]
    for pattern in patterns:
        match = re.search(pattern, text, re.DOTALL)
        if match:
            try:
                return json.loads(match.group(1))
            except json.JSONDecodeError:
                continue
    raise ValueError("无法从响应中提取 JSON")

常见问题

如何选择合适的模型?

简单分类/提取用 gpt-4o-mini,复杂推理用 gpt-4oo3,中文场景优先考虑 qwen-plusdeepseek-chat

如何处理超长上下文?

使用 tiktoken 预计算 Token 数,超出限制时截断历史消息或使用摘要压缩。

如何降低 API 成本?

优先使用 mini 模型、缓存重复请求、控制 max_tokens、压缩 Prompt、批量处理。

国内如何稳定访问 OpenAI API?

可使用 Azure OpenAI 服务或通过兼容 OpenAI 格式的国内模型(DeepSeek、Qwen)替代。


总结

Python 大模型 API 集成开发需要掌握 SDK 使用、结构化输出、异步处理、错误重试、成本优化等核心技能。生产环境还需要考虑缓存、监控、多模型路由等工程实践。使用工具库的 JSON 格式化Base64 编码Hash 计算 等工具可以辅助 API 开发调试。

本站提供浏览器本地工具,免注册即可试用 →

#Python#大模型#LLM#API#OpenAI#教程