Python 大模型 API 集成开发实战:从入门到生产级部署
AI与大数据
2026 年大模型 API 生态全景
大模型 API 已经成为现代应用的基础设施。2026 年主流 API 提供商包括:
| 提供商 | 代表模型 | SDK | 特点 |
|---|---|---|---|
| OpenAI | GPT-4o, o3 | openai |
生态最完善,Function Calling 标杆 |
| Anthropic | Claude 4 Sonnet/Opus | anthropic |
长上下文(200K),安全对齐强 |
| Gemini 2.5 Pro/Flash | google-genai |
多模态原生支持,免费额度大 | |
| DeepSeek | DeepSeek-V3/R1 | openai 兼容 |
性价比极高,推理能力强 |
| 阿里云 | Qwen3 | openai 兼容 |
中文能力突出,国内访问快 |
大多数国内模型 API 兼容 OpenAI SDK 格式,只需更换 base_url 即可切换。
OpenAI SDK 完整使用指南
安装与初始化
pip install openai pydantic tiktoken httpx
from openai import OpenAI
client = OpenAI(
api_key="sk-xxxxxxxx",
base_url="https://api.openai.com/v1"
)
# 使用国内模型(如 DeepSeek)
deepseekClient = OpenAI(
api_key="sk-xxxxxxxx",
base_url="https://api.deepseek.com/v1"
)
基础对话(Chat Completion)
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "你是一个专业的 Python 开发助手。"},
{"role": "user", "content": "解释 Python 装饰器的工作原理"}
],
temperature=0.7,
max_tokens=1024
)
print(response.choices[0].message.content)
print(f"Token 用量: {response.usage.total_tokens}")
流式输出(Streaming)
流式输出让用户无需等待完整响应,大幅提升体验:
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "写一首关于编程的诗"}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content is not None:
print(chunk.choices[0].delta.content, end="", flush=True)
Function Calling(工具调用)
让模型调用外部函数获取实时数据:
import json
tools = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "获取指定城市的天气信息",
"parameters": {
"type": "object",
"properties": {
"city": {"type": "string", "description": "城市名称"},
"unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}
},
"required": ["city"]
}
}
}
]
def get_weather(city: str, unit: str = "celsius") -> dict:
return {"city": city, "temperature": 26, "unit": unit, "condition": "晴"}
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "北京今天天气怎么样?"}],
tools=tools
)
tool_call = response.choices[0].message.tool_calls[0]
args = json.loads(tool_call.function.arguments)
result = get_weather(**args)
# 将工具结果返回给模型
response2 = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "user", "content": "北京今天天气怎么样?"},
response.choices[0].message,
{"role": "tool", "tool_call_id": tool_call.id, "content": json.dumps(result, ensure_ascii=False)}
]
)
print(response2.choices[0].message.content)
视觉能力(Vision)
import base64
def encode_image(image_path: str) -> str:
with open(image_path, "rb") as f:
return base64.b64encode(f.read()).decode("utf-8")
base64_image = encode_image("screenshot.png")
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": "描述这张图片的内容"},
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}}
]
}
],
max_tokens=512
)
print(response.choices[0].message.content)
使用 Base64 编码工具 快速编码图片进行测试。
Pydantic 结构化输出
使用 Structured Outputs
from pydantic import BaseModel
from openai import OpenAI
class CodeReview(BaseModel):
score: int
issues: list[str]
suggestions: list[str]
summary: str
client = OpenAI()
response = client.beta.chat.completions.parse(
model="gpt-4o",
messages=[
{"role": "system", "content": "你是一个代码审查专家。"},
{"role": "user", "content": "审查这段代码:def add(a,b): return a+b"}
],
response_format=CodeReview
)
review = response.choices[0].message.parsed
print(f"评分: {review.score}/10")
print(f"问题: {review.issues}")
print(f"建议: {review.suggestions}")
复杂嵌套结构
from pydantic import BaseModel
from typing import Optional
class ApiEndpoint(BaseModel):
path: str
method: str
description: str
request_body: Optional[dict] = None
class ApiSpec(BaseModel):
title: str
version: str
endpoints: list[ApiEndpoint]
response = client.beta.chat.completions.parse(
model="gpt-4o",
messages=[{"role": "user", "content": "设计一个用户管理系统的 API 规范"}],
response_format=ApiSpec
)
spec = response.choices[0].message.parsed
for ep in spec.endpoints:
print(f"{ep.method} {ep.path}: {ep.description}")
异步批处理
AsyncOpenAI 基础
import asyncio
from openai import AsyncOpenAI
async_client = AsyncOpenAI()
async def translate_text(text: str, target_lang: str) -> str:
response = await async_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": f"翻译为{target_lang},只返回翻译结果"},
{"role": "user", "content": text}
],
temperature=0.3
)
return response.choices[0].message.content
async def batch_translate(texts: list[str], target_lang: str) -> list[str]:
tasks = [translate_text(t, target_lang) for t in texts]
return await asyncio.gather(*tasks)
texts = ["Hello World", "Good morning", "Thank you"]
results = asyncio.run(batch_translate(texts, "中文"))
for orig, trans in zip(texts, results):
print(f"{orig} -> {trans}")
带并发控制的批处理
from asyncio import Semaphore
async def batch_with_concurrency(
texts: list[str],
target_lang: str,
max_concurrent: int = 5
) -> list[str]:
sem = Semaphore(max_concurrent)
async def limited_translate(text: str) -> str:
async with sem:
return await translate_text(text, target_lang)
tasks = [limited_translate(t) for t in texts]
return await asyncio.gather(*tasks)
速率限制与重试策略
指数退避重试
import time
from openai import APITimeoutError, RateLimitError, APIConnectionError
def call_with_retry(client, max_retries: int = 3, **kwargs):
for attempt in range(max_retries):
try:
return client.chat.completions.create(**kwargs)
except RateLimitError:
wait = 2 ** attempt + 1
print(f"速率限制,等待 {wait}s 后重试...")
time.sleep(wait)
except APITimeoutError:
print(f"请求超时,第 {attempt + 1} 次重试")
except APIConnectionError:
print(f"连接错误,第 {attempt + 1} 次重试")
time.sleep(1)
raise Exception(f"重试 {max_retries} 次后仍然失败")
使用 tenacity 库
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=60),
retry=retry_if_exception_type((RateLimitError, APITimeoutError))
)
def call_api(client, **kwargs):
return client.chat.completions.create(**kwargs)
Token 计数与成本优化
使用 tiktoken 计算 Token
import tiktoken
def count_tokens(text: str, model: str = "gpt-4o") -> int:
encoding = tiktoken.encoding_for_model(model)
return len(encoding.encode(text))
prompt = "请详细解释 Python 的 GIL 机制"
print(f"Token 数量: {count_tokens(prompt)}")
成本估算
PRICING = {
"gpt-4o": {"input": 2.50 / 1_000_000, "output": 10.00 / 1_000_000},
"gpt-4o-mini": {"input": 0.15 / 1_000_000, "output": 0.60 / 1_000_000},
"deepseek-chat": {"input": 0.27 / 1_000_000, "output": 1.10 / 1_000_000},
}
def estimate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
pricing = PRICING.get(model, PRICING["gpt-4o"])
return input_tokens * pricing["input"] + output_tokens * pricing["output"]
cost = estimate_cost("gpt-4o", 1000, 500)
print(f"预估费用: ${cost:.6f}")
优化策略
- 使用更便宜的模型:简单任务用
gpt-4o-mini代替gpt-4o - 压缩 Prompt:移除冗余描述,减少 system prompt 长度
- 缓存结果:相同请求不重复调用
- 控制 max_tokens:设置合理的最大输出长度
- 批量处理:合并多个小请求为一个大请求
多模型路由
智能路由器
from openai import OpenAI
from typing import Optional
import re
class ModelRouter:
def __init__(self):
self.clients = {
"openai": OpenAI(api_key="sk-xxx"),
"deepseek": OpenAI(api_key="sk-xxx", base_url="https://api.deepseek.com/v1"),
"qwen": OpenAI(api_key="sk-xxx", base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"),
}
self.model_map = {
"openai": "gpt-4o",
"deepseek": "deepseek-chat",
"qwen": "qwen-plus",
}
def route(self, prompt: str) -> str:
if any(kw in prompt for kw in ["代码", "编程", "debug", "code"]):
return "deepseek"
if any(kw in prompt for kw in ["翻译", "中文", "写作"]):
return "qwen"
return "openai"
def chat(self, prompt: str, system: str = "") -> str:
provider = self.route(prompt)
client = self.clients[provider]
model = self.model_map[provider]
messages = []
if system:
messages.append({"role": "system", "content": system})
messages.append({"role": "user", "content": prompt})
response = client.chat.completions.create(model=model, messages=messages)
return response.choices[0].message.content
router = ModelRouter()
print(router.chat("帮我写一个快速排序的 Python 实现"))
错误处理最佳实践
常见错误类型
| 错误 | HTTP 状态码 | 原因 | 处理方式 |
|---|---|---|---|
| RateLimitError | 429 | 请求频率过高 | 指数退避重试 |
| BadRequestError | 400 | 参数错误 | 检查请求参数 |
| AuthenticationError | 401 | API Key 无效 | 检查密钥配置 |
| NotFoundError | 404 | 模型不存在 | 确认模型名称 |
| APIStatusError | 500+ | 服务端错误 | 重试或切换模型 |
Context Length 超限处理
from openai import BadRequestError
def safe_chat(client, model: str, messages: list, max_context: int = 128000) -> str:
try:
return client.chat.completions.create(model=model, messages=messages)
except BadRequestError as e:
if "context_length_exceeded" in str(e):
while messages and len(str(messages)) > max_context:
if len(messages) > 2:
messages.pop(1)
else:
messages[-1]["content"] = messages[-1]["content"][:max_context // 2]
break
return client.chat.completions.create(model=model, messages=messages)
raise
FastAPI 生产级 API 搭建
完整项目结构
llm-api/
├── main.py
├── config.py
├── routers/
│ └── chat.py
├── services/
│ ├── llm_service.py
│ └── cache_service.py
└── requirements.txt
主应用入口
from fastapi import FastAPI
from routers.chat import router as chat_router
app = FastAPI(title="LLM API Service", version="1.0.0")
app.include_router(chat_router, prefix="/api/v1")
@app.get("/health")
async def health_check():
return {"status": "ok"}
对话路由
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
router = APIRouter()
class ChatRequest(BaseModel):
message: str
model: str = "gpt-4o-mini"
temperature: float = 0.7
max_tokens: int = 1024
stream: bool = False
class ChatResponse(BaseModel):
reply: str
model: str
tokens: int
@router.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
try:
from services.llm_service import llm_service
result = await llm_service.chat(
message=request.message,
model=request.model,
temperature=request.temperature,
max_tokens=request.max_tokens
)
return ChatResponse(**result)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
LLM 服务层
from openai import AsyncOpenAI
from services.cache_service import cache_service
class LLMService:
def __init__(self):
self.client = AsyncOpenAI()
async def chat(self, message: str, model: str = "gpt-4o-mini",
temperature: float = 0.7, max_tokens: int = 1024) -> dict:
cache_key = cache_service.make_key(message, model, temperature)
cached = await cache_service.get(cache_key)
if cached:
return cached
response = await self.client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": message}],
temperature=temperature,
max_tokens=max_tokens
)
result = {
"reply": response.choices[0].message.content,
"model": response.model,
"tokens": response.usage.total_tokens
}
await cache_service.set(cache_key, result, ttl=3600)
return result
llm_service = LLMService()
缓存策略
Redis 缓存实现
import hashlib
import json
class CacheService:
def __init__(self, redis_url: str = "redis://localhost:6379"):
import redis.asyncio as aioredis
self.redis = aioredis.from_url(redis_url)
def make_key(self, message: str, model: str, temperature: float) -> str:
raw = f"{message}:{model}:{temperature}"
return f"llm:cache:{hashlib.md5(raw.encode()).hexdigest()}"
async def get(self, key: str) -> dict | None:
data = await self.redis.get(key)
return json.loads(data) if data else None
async def set(self, key: str, value: dict, ttl: int = 3600):
await self.redis.setex(key, ttl, json.dumps(value, ensure_ascii=False))
cache_service = CacheService()
使用 Hash 计算工具 了解缓存键的哈希原理。
监控与日志
结构化日志
import logging
import json
from datetime import datetime
class LLMLogger:
def __init__(self):
self.logger = logging.getLogger("llm_api")
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(message)s"))
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def log_request(self, model: str, prompt: str, tokens_in: int):
self.logger.info(json.dumps({
"event": "llm_request",
"model": model,
"tokens_in": tokens_in,
"timestamp": datetime.utcnow().isoformat()
}, ensure_ascii=False))
def log_response(self, model: str, tokens_out: int, latency_ms: float, cost: float):
self.logger.info(json.dumps({
"event": "llm_response",
"model": model,
"tokens_out": tokens_out,
"latency_ms": round(latency_ms, 2),
"cost_usd": round(cost, 6),
"timestamp": datetime.utcnow().isoformat()
}, ensure_ascii=False))
def log_error(self, model: str, error_type: str, error_msg: str):
self.logger.error(json.dumps({
"event": "llm_error",
"model": model,
"error_type": error_type,
"error_msg": error_msg,
"timestamp": datetime.utcnow().isoformat()
}, ensure_ascii=False))
常见错误与调试
问题 1:API Key 配置错误
import os
from openai import AuthenticationError
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY", ""))
if not client.api_key:
raise ValueError("请设置 OPENAI_API_KEY 环境变量")
问题 2:响应内容为空
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "hello"}],
max_tokens=10
)
if response.choices[0].finish_reason == "length":
print("输出被截断,请增大 max_tokens")
问题 3:JSON 解析失败
import json
def extract_json(text: str) -> dict:
patterns = [r"```json\n(.*?)\n```", r"```\n(.*?)\n```", r"(\{.*\})"]
for pattern in patterns:
match = re.search(pattern, text, re.DOTALL)
if match:
try:
return json.loads(match.group(1))
except json.JSONDecodeError:
continue
raise ValueError("无法从响应中提取 JSON")
常见问题
如何选择合适的模型?
简单分类/提取用 gpt-4o-mini,复杂推理用 gpt-4o 或 o3,中文场景优先考虑 qwen-plus 或 deepseek-chat。
如何处理超长上下文?
使用 tiktoken 预计算 Token 数,超出限制时截断历史消息或使用摘要压缩。
如何降低 API 成本?
优先使用 mini 模型、缓存重复请求、控制 max_tokens、压缩 Prompt、批量处理。
国内如何稳定访问 OpenAI API?
可使用 Azure OpenAI 服务或通过兼容 OpenAI 格式的国内模型(DeepSeek、Qwen)替代。
总结
Python 大模型 API 集成开发需要掌握 SDK 使用、结构化输出、异步处理、错误重试、成本优化等核心技能。生产环境还需要考虑缓存、监控、多模型路由等工程实践。使用工具库的 JSON 格式化、Base64 编码、Hash 计算 等工具可以辅助 API 开发调试。
本站提供浏览器本地工具,免注册即可试用 →
#Python#大模型#LLM#API#OpenAI#教程