Python 大模型 API 整合開發實戰:從入門到生產級部署
AI与大数据
2026 年大模型 API 生態全景
大模型 API 已成為現代應用的基礎設施。2026 年主流 API 供應商包括:
| 供應商 | 代表模型 | SDK | 特點 |
|---|---|---|---|
| OpenAI | GPT-4o, o3 | openai |
生態最完善,Function Calling 標竿 |
| Anthropic | Claude 4 Sonnet/Opus | anthropic |
長上下文(200K),安全對齊強 |
| Gemini 2.5 Pro/Flash | google-genai |
多模態原生支援,免費額度大 | |
| DeepSeek | DeepSeek-V3/R1 | openai 相容 |
性價比極高,推理能力強 |
| 阿里雲 | Qwen3 | openai 相容 |
中文能力突出,國內存取快 |
大多數國內模型 API 相容 OpenAI SDK 格式,只需更換 base_url 即可切換。
OpenAI SDK 完整使用指南
安裝與初始化
pip install openai pydantic tiktoken httpx
from openai import OpenAI
client = OpenAI(
api_key="sk-xxxxxxxx",
base_url="https://api.openai.com/v1"
)
# 使用國內模型(如 DeepSeek)
deepseekClient = OpenAI(
api_key="sk-xxxxxxxx",
base_url="https://api.deepseek.com/v1"
)
基礎對話(Chat Completion)
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "你是一個專業的 Python 開發助手。"},
{"role": "user", "content": "解釋 Python 裝飾器的工作原理"}
],
temperature=0.7,
max_tokens=1024
)
print(response.choices[0].message.content)
print(f"Token 用量: {response.usage.total_tokens}")
串流輸出(Streaming)
串流輸出讓使用者無需等待完整回應,大幅提升體驗:
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "寫一首關於程式設計的詩"}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content is not None:
print(chunk.choices[0].delta.content, end="", flush=True)
Function Calling(工具呼叫)
讓模型呼叫外部函式取得即時資料:
import json
tools = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "取得指定城市的天氣資訊",
"parameters": {
"type": "object",
"properties": {
"city": {"type": "string", "description": "城市名稱"},
"unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}
},
"required": ["city"]
}
}
}
]
def get_weather(city: str, unit: str = "celsius") -> dict:
return {"city": city, "temperature": 26, "unit": unit, "condition": "晴"}
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "台北今天天氣怎麼樣?"}],
tools=tools
)
tool_call = response.choices[0].message.tool_calls[0]
args = json.loads(tool_call.function.arguments)
result = get_weather(**args)
response2 = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "user", "content": "台北今天天氣怎麼樣?"},
response.choices[0].message,
{"role": "tool", "tool_call_id": tool_call.id, "content": json.dumps(result, ensure_ascii=False)}
]
)
print(response2.choices[0].message.content)
視覺能力(Vision)
import base64
def encode_image(image_path: str) -> str:
with open(image_path, "rb") as f:
return base64.b64encode(f.read()).decode("utf-8")
base64_image = encode_image("screenshot.png")
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": "描述這張圖片的內容"},
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}}
]
}
],
max_tokens=512
)
print(response.choices[0].message.content)
使用 Base64 編碼工具 快速編碼圖片進行測試。
Pydantic 結構化輸出
使用 Structured Outputs
from pydantic import BaseModel
from openai import OpenAI
class CodeReview(BaseModel):
score: int
issues: list[str]
suggestions: list[str]
summary: str
client = OpenAI()
response = client.beta.chat.completions.parse(
model="gpt-4o",
messages=[
{"role": "system", "content": "你是一個程式碼審查專家。"},
{"role": "user", "content": "審查這段程式碼:def add(a,b): return a+b"}
],
response_format=CodeReview
)
review = response.choices[0].message.parsed
print(f"評分: {review.score}/10")
print(f"問題: {review.issues}")
print(f"建議: {review.suggestions}")
複雜巢狀結構
from pydantic import BaseModel
from typing import Optional
class ApiEndpoint(BaseModel):
path: str
method: str
description: str
request_body: Optional[dict] = None
class ApiSpec(BaseModel):
title: str
version: str
endpoints: list[ApiEndpoint]
response = client.beta.chat.completions.parse(
model="gpt-4o",
messages=[{"role": "user", "content": "設計一個使用者管理系統的 API 規範"}],
response_format=ApiSpec
)
spec = response.choices[0].message.parsed
for ep in spec.endpoints:
print(f"{ep.method} {ep.path}: {ep.description}")
非同步批次處理
AsyncOpenAI 基礎
import asyncio
from openai import AsyncOpenAI
async_client = AsyncOpenAI()
async def translate_text(text: str, target_lang: str) -> str:
response = await async_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": f"翻譯為{target_lang},只回傳翻譯結果"},
{"role": "user", "content": text}
],
temperature=0.3
)
return response.choices[0].message.content
async def batch_translate(texts: list[str], target_lang: str) -> list[str]:
tasks = [translate_text(t, target_lang) for t in texts]
return await asyncio.gather(*tasks)
texts = ["Hello World", "Good morning", "Thank you"]
results = asyncio.run(batch_translate(texts, "中文"))
for orig, trans in zip(texts, results):
print(f"{orig} -> {trans}")
帶並發控制的批次處理
from asyncio import Semaphore
async def batch_with_concurrency(
texts: list[str],
target_lang: str,
max_concurrent: int = 5
) -> list[str]:
sem = Semaphore(max_concurrent)
async def limited_translate(text: str) -> str:
async with sem:
return await translate_text(text, target_lang)
tasks = [limited_translate(t) for t in texts]
return await asyncio.gather(*tasks)
速率限制與重試策略
指數退避重試
import time
from openai import APITimeoutError, RateLimitError, APIConnectionError
def call_with_retry(client, max_retries: int = 3, **kwargs):
for attempt in range(max_retries):
try:
return client.chat.completions.create(**kwargs)
except RateLimitError:
wait = 2 ** attempt + 1
print(f"速率限制,等待 {wait}s 後重試...")
time.sleep(wait)
except APITimeoutError:
print(f"請求逾時,第 {attempt + 1} 次重試")
except APIConnectionError:
print(f"連線錯誤,第 {attempt + 1} 次重試")
time.sleep(1)
raise Exception(f"重試 {max_retries} 次後仍然失敗")
使用 tenacity 函式庫
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=60),
retry=retry_if_exception_type((RateLimitError, APITimeoutError))
)
def call_api(client, **kwargs):
return client.chat.completions.create(**kwargs)
Token 計數與成本最佳化
使用 tiktoken 計算 Token
import tiktoken
def count_tokens(text: str, model: str = "gpt-4o") -> int:
encoding = tiktoken.encoding_for_model(model)
return len(encoding.encode(text))
prompt = "請詳細解釋 Python 的 GIL 機制"
print(f"Token 數量: {count_tokens(prompt)}")
成本估算
PRICING = {
"gpt-4o": {"input": 2.50 / 1_000_000, "output": 10.00 / 1_000_000},
"gpt-4o-mini": {"input": 0.15 / 1_000_000, "output": 0.60 / 1_000_000},
"deepseek-chat": {"input": 0.27 / 1_000_000, "output": 1.10 / 1_000_000},
}
def estimate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
pricing = PRICING.get(model, PRICING["gpt-4o"])
return input_tokens * pricing["input"] + output_tokens * pricing["output"]
cost = estimate_cost("gpt-4o", 1000, 500)
print(f"預估費用: ${cost:.6f}")
最佳化策略
- 使用更便宜的模型:簡單任務用
gpt-4o-mini代替gpt-4o - 壓縮 Prompt:移除冗餘描述,減少 system prompt 長度
- 快取結果:相同請求不重複呼叫
- 控制 max_tokens:設定合理的最大輸出長度
- 批次處理:合併多個小請求為一個大請求
多模型路由
智慧路由器
from openai import OpenAI
from typing import Optional
class ModelRouter:
def __init__(self):
self.clients = {
"openai": OpenAI(api_key="sk-xxx"),
"deepseek": OpenAI(api_key="sk-xxx", base_url="https://api.deepseek.com/v1"),
"qwen": OpenAI(api_key="sk-xxx", base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"),
}
self.model_map = {
"openai": "gpt-4o",
"deepseek": "deepseek-chat",
"qwen": "qwen-plus",
}
def route(self, prompt: str) -> str:
if any(kw in prompt for kw in ["程式碼", "程式設計", "debug", "code"]):
return "deepseek"
if any(kw in prompt for kw in ["翻譯", "中文", "寫作"]):
return "qwen"
return "openai"
def chat(self, prompt: str, system: str = "") -> str:
provider = self.route(prompt)
client = self.clients[provider]
model = self.model_map[provider]
messages = []
if system:
messages.append({"role": "system", "content": system})
messages.append({"role": "user", "content": prompt})
response = client.chat.completions.create(model=model, messages=messages)
return response.choices[0].message.content
router = ModelRouter()
print(router.chat("幫我寫一個快速排序的 Python 實作"))
錯誤處理最佳實踐
常見錯誤類型
| 錯誤 | HTTP 狀態碼 | 原因 | 處理方式 |
|---|---|---|---|
| RateLimitError | 429 | 請求頻率過高 | 指數退避重試 |
| BadRequestError | 400 | 參數錯誤 | 檢查請求參數 |
| AuthenticationError | 401 | API Key 無效 | 檢查金鑰設定 |
| NotFoundError | 404 | 模型不存在 | 確認模型名稱 |
| APIStatusError | 500+ | 伺服器端錯誤 | 重試或切換模型 |
Context Length 超限處理
from openai import BadRequestError
def safe_chat(client, model: str, messages: list, max_context: int = 128000) -> str:
try:
return client.chat.completions.create(model=model, messages=messages)
except BadRequestError as e:
if "context_length_exceeded" in str(e):
while messages and len(str(messages)) > max_context:
if len(messages) > 2:
messages.pop(1)
else:
messages[-1]["content"] = messages[-1]["content"][:max_context // 2]
break
return client.chat.completions.create(model=model, messages=messages)
raise
FastAPI 生產級 API 搭建
完整專案結構
llm-api/
├── main.py
├── config.py
├── routers/
│ └── chat.py
├── services/
│ ├── llm_service.py
│ └── cache_service.py
└── requirements.txt
主應用入口
from fastapi import FastAPI
from routers.chat import router as chat_router
app = FastAPI(title="LLM API Service", version="1.0.0")
app.include_router(chat_router, prefix="/api/v1")
@app.get("/health")
async def health_check():
return {"status": "ok"}
對話路由
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
router = APIRouter()
class ChatRequest(BaseModel):
message: str
model: str = "gpt-4o-mini"
temperature: float = 0.7
max_tokens: int = 1024
stream: bool = False
class ChatResponse(BaseModel):
reply: str
model: str
tokens: int
@router.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
try:
from services.llm_service import llm_service
result = await llm_service.chat(
message=request.message,
model=request.model,
temperature=request.temperature,
max_tokens=request.max_tokens
)
return ChatResponse(**result)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
LLM 服務層
from openai import AsyncOpenAI
from services.cache_service import cache_service
class LLMService:
def __init__(self):
self.client = AsyncOpenAI()
async def chat(self, message: str, model: str = "gpt-4o-mini",
temperature: float = 0.7, max_tokens: int = 1024) -> dict:
cache_key = cache_service.make_key(message, model, temperature)
cached = await cache_service.get(cache_key)
if cached:
return cached
response = await self.client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": message}],
temperature=temperature,
max_tokens=max_tokens
)
result = {
"reply": response.choices[0].message.content,
"model": response.model,
"tokens": response.usage.total_tokens
}
await cache_service.set(cache_key, result, ttl=3600)
return result
llm_service = LLMService()
快取策略
Redis 快取實作
import hashlib
import json
class CacheService:
def __init__(self, redis_url: str = "redis://localhost:6379"):
import redis.asyncio as aioredis
self.redis = aioredis.from_url(redis_url)
def make_key(self, message: str, model: str, temperature: float) -> str:
raw = f"{message}:{model}:{temperature}"
return f"llm:cache:{hashlib.md5(raw.encode()).hexdigest()}"
async def get(self, key: str) -> dict | None:
data = await self.redis.get(key)
return json.loads(data) if data else None
async def set(self, key: str, value: dict, ttl: int = 3600):
await self.redis.setex(key, ttl, json.dumps(value, ensure_ascii=False))
cache_service = CacheService()
使用 Hash 計算工具 了解快取鍵的雜湊原理。
監控與日誌
結構化日誌
import logging
import json
from datetime import datetime
class LLMLogger:
def __init__(self):
self.logger = logging.getLogger("llm_api")
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(message)s"))
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def log_request(self, model: str, prompt: str, tokens_in: int):
self.logger.info(json.dumps({
"event": "llm_request",
"model": model,
"tokens_in": tokens_in,
"timestamp": datetime.utcnow().isoformat()
}, ensure_ascii=False))
def log_response(self, model: str, tokens_out: int, latency_ms: float, cost: float):
self.logger.info(json.dumps({
"event": "llm_response",
"model": model,
"tokens_out": tokens_out,
"latency_ms": round(latency_ms, 2),
"cost_usd": round(cost, 6),
"timestamp": datetime.utcnow().isoformat()
}, ensure_ascii=False))
def log_error(self, model: str, error_type: str, error_msg: str):
self.logger.error(json.dumps({
"event": "llm_error",
"model": model,
"error_type": error_type,
"error_msg": error_msg,
"timestamp": datetime.utcnow().isoformat()
}, ensure_ascii=False))
常見錯誤與除錯
問題 1:API Key 設定錯誤
import os
from openai import AuthenticationError
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY", ""))
if not client.api_key:
raise ValueError("請設定 OPENAI_API_KEY 環境變數")
問題 2:回應內容為空
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "hello"}],
max_tokens=10
)
if response.choices[0].finish_reason == "length":
print("輸出被截斷,請增大 max_tokens")
問題 3:JSON 解析失敗
import json
import re
def extract_json(text: str) -> dict:
patterns = [r"```json\n(.*?)\n```", r"```\n(.*?)\n```", r"(\{.*\})"]
for pattern in patterns:
match = re.search(pattern, text, re.DOTALL)
if match:
try:
return json.loads(match.group(1))
except json.JSONDecodeError:
continue
raise ValueError("無法從回應中提取 JSON")
常見問題
如何選擇合適的模型?
簡單分類/擷取用 gpt-4o-mini,複雜推理用 gpt-4o 或 o3,中文場景優先考慮 qwen-plus 或 deepseek-chat。
如何處理超長上下文?
使用 tiktoken 預先計算 Token 數,超出限制時截斷歷史訊息或使用摘要壓縮。
如何降低 API 成本?
優先使用 mini 模型、快取重複請求、控制 max_tokens、壓縮 Prompt、批次處理。
國內如何穩定存取 OpenAI API?
可使用 Azure OpenAI 服務或透過相容 OpenAI 格式的國內模型(DeepSeek、Qwen)替代。
總結
Python 大模型 API 整合開發需要掌握 SDK 使用、結構化輸出、非同步處理、錯誤重試、成本最佳化等核心技能。生產環境還需要考慮快取、監控、多模型路由等工程實踐。使用工具庫的 JSON 格式化、Base64 編碼、Hash 計算 等工具可以輔助 API 開發除錯。
本站提供瀏覽器本地工具,免註冊即可試用 →
#Python#大模型#LLM#API#OpenAI#教程