Python LLM API Integration in Practice: From Basics to Production
The LLM API Landscape in 2026
LLM APIs have become essential infrastructure for modern applications. Major providers in 2026:
| Provider | Models | SDK | Highlights |
|---|---|---|---|
| OpenAI | GPT-4o, o3 | openai |
Best ecosystem, Function Calling standard |
| Anthropic | Claude 4 Sonnet/Opus | anthropic |
200K context, strong safety alignment |
| Gemini 2.5 Pro/Flash | google-genai |
Native multimodal, generous free tier | |
| DeepSeek | DeepSeek-V3/R1 | openai compatible |
Excellent cost-performance, strong reasoning |
| Alibaba | Qwen3 | openai compatible |
Strong Chinese capabilities, fast in China |
Most Chinese model APIs are compatible with the OpenAI SDK format — just swap base_url.
OpenAI SDK Complete Usage Guide
Installation & Initialization
pip install openai pydantic tiktoken httpx
from openai import OpenAI
client = OpenAI(
api_key="sk-xxxxxxxx",
base_url="https://api.openai.com/v1"
)
# Using DeepSeek (OpenAI-compatible)
deepseekClient = OpenAI(
api_key="sk-xxxxxxxx",
base_url="https://api.deepseek.com/v1"
)
Basic Chat Completion
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "You are a professional Python development assistant."},
{"role": "user", "content": "Explain how Python decorators work"}
],
temperature=0.7,
max_tokens=1024
)
print(response.choices[0].message.content)
print(f"Token usage: {response.usage.total_tokens}")
Streaming Output
Streaming lets users see responses in real-time without waiting for the full output:
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Write a poem about programming"}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content is not None:
print(chunk.choices[0].delta.content, end="", flush=True)
Function Calling
Let the model invoke external functions to fetch real-time data:
import json
tools = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get weather information for a city",
"parameters": {
"type": "object",
"properties": {
"city": {"type": "string", "description": "City name"},
"unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}
},
"required": ["city"]
}
}
}
]
def get_weather(city: str, unit: str = "celsius") -> dict:
return {"city": city, "temperature": 72, "unit": unit, "condition": "Sunny"}
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "What's the weather in New York?"}],
tools=tools
)
tool_call = response.choices[0].message.tool_calls[0]
args = json.loads(tool_call.function.arguments)
result = get_weather(**args)
response2 = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "user", "content": "What's the weather in New York?"},
response.choices[0].message,
{"role": "tool", "tool_call_id": tool_call.id, "content": json.dumps(result)}
]
)
print(response2.choices[0].message.content)
Vision
import base64
def encode_image(image_path: str) -> str:
with open(image_path, "rb") as f:
return base64.b64encode(f.read()).decode("utf-8")
base64_image = encode_image("screenshot.png")
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": "Describe this image"},
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}}
]
}
],
max_tokens=512
)
print(response.choices[0].message.content)
Use the Base64 Encode tool to quickly encode images for testing.
Pydantic Structured Output
Using Structured Outputs
from pydantic import BaseModel
from openai import OpenAI
class CodeReview(BaseModel):
score: int
issues: list[str]
suggestions: list[str]
summary: str
client = OpenAI()
response = client.beta.chat.completions.parse(
model="gpt-4o",
messages=[
{"role": "system", "content": "You are a code review expert."},
{"role": "user", "content": "Review this code: def add(a,b): return a+b"}
],
response_format=CodeReview
)
review = response.choices[0].message.parsed
print(f"Score: {review.score}/10")
print(f"Issues: {review.issues}")
print(f"Suggestions: {review.suggestions}")
Complex Nested Structures
from pydantic import BaseModel
from typing import Optional
class ApiEndpoint(BaseModel):
path: str
method: str
description: str
request_body: Optional[dict] = None
class ApiSpec(BaseModel):
title: str
version: str
endpoints: list[ApiEndpoint]
response = client.beta.chat.completions.parse(
model="gpt-4o",
messages=[{"role": "user", "content": "Design an API spec for a user management system"}],
response_format=ApiSpec
)
spec = response.choices[0].message.parsed
for ep in spec.endpoints:
print(f"{ep.method} {ep.path}: {ep.description}")
Async Batch Processing
AsyncOpenAI Basics
import asyncio
from openai import AsyncOpenAI
async_client = AsyncOpenAI()
async def translate_text(text: str, target_lang: str) -> str:
response = await async_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": f"Translate to {target_lang}, return only the translation"},
{"role": "user", "content": text}
],
temperature=0.3
)
return response.choices[0].message.content
async def batch_translate(texts: list[str], target_lang: str) -> list[str]:
tasks = [translate_text(t, target_lang) for t in texts]
return await asyncio.gather(*tasks)
texts = ["Hello World", "Good morning", "Thank you"]
results = asyncio.run(batch_translate(texts, "Spanish"))
for orig, trans in zip(texts, results):
print(f"{orig} -> {trans}")
Concurrency-Limited Batch Processing
from asyncio import Semaphore
async def batch_with_concurrency(
texts: list[str],
target_lang: str,
max_concurrent: int = 5
) -> list[str]:
sem = Semaphore(max_concurrent)
async def limited_translate(text: str) -> str:
async with sem:
return await translate_text(text, target_lang)
tasks = [limited_translate(t) for t in texts]
return await asyncio.gather(*tasks)
Rate Limiting & Retry Strategies
Exponential Backoff Retry
import time
from openai import APITimeoutError, RateLimitError, APIConnectionError
def call_with_retry(client, max_retries: int = 3, **kwargs):
for attempt in range(max_retries):
try:
return client.chat.completions.create(**kwargs)
except RateLimitError:
wait = 2 ** attempt + 1
print(f"Rate limited, waiting {wait}s before retry...")
time.sleep(wait)
except APITimeoutError:
print(f"Request timeout, attempt {attempt + 1}")
except APIConnectionError:
print(f"Connection error, attempt {attempt + 1}")
time.sleep(1)
raise Exception(f"Failed after {max_retries} retries")
Using tenacity
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=60),
retry=retry_if_exception_type((RateLimitError, APITimeoutError))
)
def call_api(client, **kwargs):
return client.chat.completions.create(**kwargs)
Token Counting & Cost Optimization
Counting Tokens with tiktoken
import tiktoken
def count_tokens(text: str, model: str = "gpt-4o") -> int:
encoding = tiktoken.encoding_for_model(model)
return len(encoding.encode(text))
prompt = "Explain Python's GIL mechanism in detail"
print(f"Token count: {count_tokens(prompt)}")
Cost Estimation
PRICING = {
"gpt-4o": {"input": 2.50 / 1_000_000, "output": 10.00 / 1_000_000},
"gpt-4o-mini": {"input": 0.15 / 1_000_000, "output": 0.60 / 1_000_000},
"deepseek-chat": {"input": 0.27 / 1_000_000, "output": 1.10 / 1_000_000},
}
def estimate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
pricing = PRICING.get(model, PRICING["gpt-4o"])
return input_tokens * pricing["input"] + output_tokens * pricing["output"]
cost = estimate_cost("gpt-4o", 1000, 500)
print(f"Estimated cost: ${cost:.6f}")
Optimization Strategies
- Use cheaper models: Simple tasks can use
gpt-4o-miniinstead ofgpt-4o - Compress prompts: Remove redundant descriptions, shorten system prompts
- Cache results: Avoid duplicate API calls for identical requests
- Control max_tokens: Set reasonable maximum output lengths
- Batch processing: Combine multiple small requests into one
Multi-Model Routing
Smart Router
from openai import OpenAI
from typing import Optional
class ModelRouter:
def __init__(self):
self.clients = {
"openai": OpenAI(api_key="sk-xxx"),
"deepseek": OpenAI(api_key="sk-xxx", base_url="https://api.deepseek.com/v1"),
"qwen": OpenAI(api_key="sk-xxx", base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"),
}
self.model_map = {
"openai": "gpt-4o",
"deepseek": "deepseek-chat",
"qwen": "qwen-plus",
}
def route(self, prompt: str) -> str:
if any(kw in prompt.lower() for kw in ["code", "debug", "programming"]):
return "deepseek"
if any(kw in prompt.lower() for kw in ["translate", "chinese", "writing"]):
return "qwen"
return "openai"
def chat(self, prompt: str, system: str = "") -> str:
provider = self.route(prompt)
client = self.clients[provider]
model = self.model_map[provider]
messages = []
if system:
messages.append({"role": "system", "content": system})
messages.append({"role": "user", "content": prompt})
response = client.chat.completions.create(model=model, messages=messages)
return response.choices[0].message.content
router = ModelRouter()
print(router.chat("Write a quicksort implementation in Python"))
Error Handling Best Practices
Common Error Types
| Error | HTTP Status | Cause | Solution |
|---|---|---|---|
| RateLimitError | 429 | Too many requests | Exponential backoff retry |
| BadRequestError | 400 | Invalid parameters | Check request params |
| AuthenticationError | 401 | Invalid API Key | Verify key configuration |
| NotFoundError | 404 | Model not found | Confirm model name |
| APIStatusError | 500+ | Server error | Retry or switch model |
Context Length Exceeded
from openai import BadRequestError
def safe_chat(client, model: str, messages: list, max_context: int = 128000) -> str:
try:
return client.chat.completions.create(model=model, messages=messages)
except BadRequestError as e:
if "context_length_exceeded" in str(e):
while messages and len(str(messages)) > max_context:
if len(messages) > 2:
messages.pop(1)
else:
messages[-1]["content"] = messages[-1]["content"][:max_context // 2]
break
return client.chat.completions.create(model=model, messages=messages)
raise
Building a Production API with FastAPI
Project Structure
llm-api/
├── main.py
├── config.py
├── routers/
│ └── chat.py
├── services/
│ ├── llm_service.py
│ └── cache_service.py
└── requirements.txt
Main Application
from fastapi import FastAPI
from routers.chat import router as chat_router
app = FastAPI(title="LLM API Service", version="1.0.0")
app.include_router(chat_router, prefix="/api/v1")
@app.get("/health")
async def health_check():
return {"status": "ok"}
Chat Router
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
router = APIRouter()
class ChatRequest(BaseModel):
message: str
model: str = "gpt-4o-mini"
temperature: float = 0.7
max_tokens: int = 1024
stream: bool = False
class ChatResponse(BaseModel):
reply: str
model: str
tokens: int
@router.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
try:
from services.llm_service import llm_service
result = await llm_service.chat(
message=request.message,
model=request.model,
temperature=request.temperature,
max_tokens=request.max_tokens
)
return ChatResponse(**result)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
LLM Service Layer
from openai import AsyncOpenAI
from services.cache_service import cache_service
class LLMService:
def __init__(self):
self.client = AsyncOpenAI()
async def chat(self, message: str, model: str = "gpt-4o-mini",
temperature: float = 0.7, max_tokens: int = 1024) -> dict:
cache_key = cache_service.make_key(message, model, temperature)
cached = await cache_service.get(cache_key)
if cached:
return cached
response = await self.client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": message}],
temperature=temperature,
max_tokens=max_tokens
)
result = {
"reply": response.choices[0].message.content,
"model": response.model,
"tokens": response.usage.total_tokens
}
await cache_service.set(cache_key, result, ttl=3600)
return result
llm_service = LLMService()
Caching Strategies
Redis Cache Implementation
import hashlib
import json
class CacheService:
def __init__(self, redis_url: str = "redis://localhost:6379"):
import redis.asyncio as aioredis
self.redis = aioredis.from_url(redis_url)
def make_key(self, message: str, model: str, temperature: float) -> str:
raw = f"{message}:{model}:{temperature}"
return f"llm:cache:{hashlib.md5(raw.encode()).hexdigest()}"
async def get(self, key: str) -> dict | None:
data = await self.redis.get(key)
return json.loads(data) if data else None
async def set(self, key: str, value: dict, ttl: int = 3600):
await self.redis.setex(key, ttl, json.dumps(value))
cache_service = CacheService()
Use the Hash Calculator to understand cache key hashing.
Monitoring & Logging
Structured Logging
import logging
import json
from datetime import datetime
class LLMLogger:
def __init__(self):
self.logger = logging.getLogger("llm_api")
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(message)s"))
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def log_request(self, model: str, prompt: str, tokens_in: int):
self.logger.info(json.dumps({
"event": "llm_request",
"model": model,
"tokens_in": tokens_in,
"timestamp": datetime.utcnow().isoformat()
}))
def log_response(self, model: str, tokens_out: int, latency_ms: float, cost: float):
self.logger.info(json.dumps({
"event": "llm_response",
"model": model,
"tokens_out": tokens_out,
"latency_ms": round(latency_ms, 2),
"cost_usd": round(cost, 6),
"timestamp": datetime.utcnow().isoformat()
}))
def log_error(self, model: str, error_type: str, error_msg: str):
self.logger.error(json.dumps({
"event": "llm_error",
"model": model,
"error_type": error_type,
"error_msg": error_msg,
"timestamp": datetime.utcnow().isoformat()
}))
Common Errors & Debugging
Issue 1: API Key Configuration Error
import os
from openai import AuthenticationError
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY", ""))
if not client.api_key:
raise ValueError("Please set the OPENAI_API_KEY environment variable")
Issue 2: Empty Response Content
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "hello"}],
max_tokens=10
)
if response.choices[0].finish_reason == "length":
print("Output truncated, increase max_tokens")
Issue 3: JSON Parsing Failure
import json
import re
def extract_json(text: str) -> dict:
patterns = [r"```json\n(.*?)\n```", r"```\n(.*?)\n```", r"(\{.*\})"]
for pattern in patterns:
match = re.search(pattern, text, re.DOTALL)
if match:
try:
return json.loads(match.group(1))
except json.JSONDecodeError:
continue
raise ValueError("Cannot extract JSON from response")
FAQ
How to choose the right model?
Use gpt-4o-mini for simple classification/extraction, gpt-4o or o3 for complex reasoning, and consider qwen-plus or deepseek-chat for Chinese-heavy tasks.
How to handle long contexts?
Use tiktoken to pre-calculate token counts. When exceeding limits, truncate history or use summarization.
How to reduce API costs?
Prefer mini models, cache repeated requests, control max_tokens, compress prompts, and batch process.
How to access OpenAI API from China?
Use Azure OpenAI Service or switch to OpenAI-compatible Chinese models (DeepSeek, Qwen).
Summary
Python LLM API integration requires mastering SDK usage, structured output, async processing, error retry, and cost optimization. Production environments also need caching, monitoring, and multi-model routing. Use ToolsKu's JSON Formatter, Base64 Encode, and Hash Calculator to assist with API development and debugging.
Try these browser-local tools — no sign-up required →