Python SSE Streaming LLM Output: 5 Implementation Patterns from Server-Sent Events to Real-time Chat
While the LLM Spits Out One Word at a Time, Users Are Already Gone
You call the ChatGPT API, wait 5 seconds for a giant JSON blob, and users think the page is frozen. You switch to polling, hitting the server every second, and it can't handle the load. You try WebSocket, but find each conversation needs a persistent connection — scalability is a nightmare. In 2026, SSE (Server-Sent Events) is the optimal solution for LLM streaming output — unidirectional push, automatic reconnection, native HTTP support, CDN-friendly.
This article starts from SSE protocol principles and guides you through FastAPI SSE → Flask SSE → LLM streaming chat → error recovery → backpressure control — 5 implementation patterns from protocol to production.
SSE Protocol Core Concepts
| Concept | Description |
|---|---|
| EventSource | Browser-native SSE client API with auto-reconnection |
| text/event-stream | SSE Content-Type, tells the browser this is a stream of events |
| data field | Event data, each line prefixed with data: |
| event field | Custom event type, defaults to message |
| id field | Event ID, used for resuming from last position on reconnection |
| retry field | Suggested client reconnection interval (milliseconds) |
| Chunked Transfer | HTTP chunked transfer encoding, the underlying transport for SSE |
SSE vs WebSocket Comparison
SSE Advantages:
1. HTTP-based, no protocol upgrade needed, CDN/proxy friendly
2. Browser-native EventSource API with auto-reconnection
3. Unidirectional push, perfect for LLM output scenarios
4. Text protocol, easy to debug
WebSocket Advantages:
1. Full-duplex communication, suitable for bidirectional real-time interaction
2. Supports binary data
3. Lower frame overhead (2 bytes vs SSE text format)
Conclusion: LLM streaming output is a classic unidirectional push scenario — SSE is the best choice
Problem Analysis: 5 Major LLM Streaming Challenges
- Connection stability: LLM inference takes long (10-60s), network jitter causes disconnections — needs auto-reconnection and resume
- Backpressure control: LLM generates faster than clients consume, memory overflow risk
- Error recovery: Upstream LLM API timeouts, rate limits, errors — needs graceful degradation
- Multi-model adaptation: OpenAI/Anthropic/local models have different streaming formats — needs unified abstraction
- Concurrency management: Many SSE connections consume server resources — needs connection pooling and timeout management
Step-by-Step: 5 SSE Streaming Implementation Patterns
Pattern 1: Basic FastAPI SSE
import asyncio
import json
from datetime import datetime
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
app = FastAPI()
async def event_generator(request: Request):
count = 0
while True:
if await request.is_disconnected():
break
data = json.dumps({
"id": count,
"message": f"Event at {datetime.now().isoformat()}",
"timestamp": datetime.now().timestamp()
}, ensure_ascii=False)
yield f"id: {count}\ndata: {data}\n\n"
count += 1
await asyncio.sleep(1)
@app.get("/sse/events")
async def sse_events(request: Request):
return StreamingResponse(
event_generator(request),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
Pattern 2: Flask SSE Implementation
import json
import time
from datetime import datetime
from flask import Flask, Response
app = Flask(__name__)
def stream_events():
count = 0
while True:
data = json.dumps({
"id": count,
"message": f"Flask SSE event at {datetime.now().isoformat()}",
}, ensure_ascii=False)
yield f"id: {count}\nevent: message\ndata: {data}\n\n"
count += 1
time.sleep(1)
@app.route("/sse/events")
def sse_events():
return Response(
stream_events(),
mimetype="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
Pattern 3: LLM Streaming Chat (OpenAI Compatible)
import asyncio
import json
import httpx
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
app = FastAPI()
OPENAI_API_KEY = "sk-your-api-key"
OPENAI_API_URL = "https://api.openai.com/v1/chat/completions"
async def llm_stream_generator(request: Request, messages: list, model: str = "gpt-4o"):
headers = {
"Authorization": f"Bearer {OPENAI_API_KEY}",
"Content-Type": "application/json",
}
payload = {
"model": model,
"messages": messages,
"stream": True,
"temperature": 0.7,
}
async with httpx.AsyncClient(timeout=120.0) as client:
async with client.stream("POST", OPENAI_API_URL, json=payload, headers=headers) as response:
if response.status_code != 200:
error_data = await response.aread()
error_msg = json.dumps({"error": error_data.decode()}, ensure_ascii=False)
yield f"event: error\ndata: {error_msg}\n\n"
return
async for line in response.aiter_lines():
if await request.is_disconnected():
break
if line.startswith("data: "):
data_str = line[6:]
if data_str.strip() == "[DONE]":
yield f"data: [DONE]\n\n"
break
try:
chunk = json.loads(data_str)
delta = chunk.get("choices", [{}])[0].get("delta", {})
content = delta.get("content", "")
if content:
escaped = json.dumps({"content": content}, ensure_ascii=False)
yield f"data: {escaped}\n\n"
except json.JSONDecodeError:
continue
@app.post("/sse/chat")
async def sse_chat(request: Request):
body = await request.json()
messages = body.get("messages", [])
model = body.get("model", "gpt-4o")
return StreamingResponse(
llm_stream_generator(request, messages, model),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
Pattern 4: SSE with Error Recovery
import asyncio
import json
from datetime import datetime
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
app = FastAPI()
class SSEStateManager:
def __init__(self):
self.events_store = {}
self.max_store_size = 1000
def store_event(self, session_id: str, event_id: int, data: str):
if session_id not in self.events_store:
self.events_store[session_id] = {}
self.events_store[session_id][event_id] = data
if len(self.events_store[session_id]) > self.max_store_size:
oldest = min(self.events_store[session_id].keys())
del self.events_store[session_id][oldest]
def get_events_after(self, session_id: str, last_event_id: int):
if session_id not in self.events_store:
return []
return [
(eid, self.events_store[session_id][eid])
for eid in sorted(self.events_store[session_id].keys())
if eid > last_event_id
]
state_manager = SSEStateManager()
async def resilient_sse_generator(request: Request, session_id: str, last_event_id: int = 0):
retry_count = 0
max_retries = 3
current_id = last_event_id
missed_events = state_manager.get_events_after(session_id, last_event_id)
for eid, data in missed_events:
yield f"id: {eid}\ndata: {data}\n\n"
current_id = eid
while True:
if await request.is_disconnected():
break
try:
current_id += 1
data = json.dumps({
"id": current_id,
"message": f"Resilient event at {datetime.now().isoformat()}",
"retry_count": retry_count,
}, ensure_ascii=False)
state_manager.store_event(session_id, current_id, data)
yield f"id: {current_id}\ndata: {data}\n\n"
retry_count = 0
await asyncio.sleep(1)
except Exception as e:
retry_count += 1
if retry_count > max_retries:
error_data = json.dumps({"error": str(e), "fatal": True}, ensure_ascii=False)
yield f"event: error\ndata: {error_data}\n\n"
break
retry_delay = min(2 ** retry_count, 30)
yield f"retry: {retry_delay * 1000}\n\n"
await asyncio.sleep(retry_delay)
@app.get("/sse/resilient/{session_id}")
async def resilient_sse(request: Request, session_id: str, last_event_id: int = 0):
return StreamingResponse(
resilient_sse_generator(request, session_id, last_event_id),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
Pattern 5: SSE with Backpressure Control
import asyncio
import json
from collections import deque
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
app = FastAPI()
class BackpressureBuffer:
def __init__(self, max_size: int = 100, high_watermark: float = 0.8, low_watermark: float = 0.3):
self.max_size = max_size
self.high_watermark = int(max_size * high_watermark)
self.low_watermark = int(max_size * low_watermark)
self.buffer = deque(maxlen=max_size)
self.paused = False
self._not_empty = asyncio.Event()
self._not_full = asyncio.Event()
self._not_full.set()
async def put(self, item: str):
while len(self.buffer) >= self.max_size:
self._not_full.clear()
await self._not_full.wait()
self.buffer.append(item)
self._not_empty.set()
if len(self.buffer) >= self.high_watermark:
self.paused = True
async def get(self) -> str:
while not self.buffer:
self._not_empty.clear()
await self._not_empty.wait()
item = self.buffer.popleft()
self._not_full.set()
if len(self.buffer) <= self.low_watermark:
self.paused = False
return item
@property
def is_paused(self) -> bool:
return self.paused
@property
def size(self) -> int:
return len(self.buffer)
async def llm_producer(buffer: BackpressureBuffer, messages: list):
import httpx
headers = {
"Authorization": f"Bearer sk-your-api-key",
"Content-Type": "application/json",
}
payload = {"model": "gpt-4o", "messages": messages, "stream": True}
async with httpx.AsyncClient(timeout=120.0) as client:
async with client.stream("POST", "https://api.openai.com/v1/chat/completions", json=payload, headers=headers) as response:
async for line in response.aiter_lines():
if buffer.is_paused:
while buffer.is_paused:
await asyncio.sleep(0.1)
if line.startswith("data: ") and line[6:].strip() != "[DONE]":
try:
chunk = json.loads(line[6:])
content = chunk.get("choices", [{}])[0].get("delta", {}).get("content", "")
if content:
await buffer.put(content)
except json.JSONDecodeError:
continue
await buffer.put("[DONE]")
async def backpressure_sse_generator(request: Request, messages: list):
buffer = BackpressureBuffer(max_size=100, high_watermark=0.8, low_watermark=0.3)
producer_task = asyncio.create_task(llm_producer(buffer, messages))
try:
while True:
if await request.is_disconnected():
producer_task.cancel()
break
try:
item = await asyncio.wait_for(buffer.get(), timeout=5.0)
if item == "[DONE]":
yield "data: [DONE]\n\n"
break
data = json.dumps({
"content": item,
"buffer_size": buffer.size,
"paused": buffer.is_paused,
}, ensure_ascii=False)
yield f"data: {data}\n\n"
except asyncio.TimeoutError:
yield ": heartbeat\n\n"
finally:
if not producer_task.done():
producer_task.cancel()
@app.post("/sse/backpressure-chat")
async def backpressure_chat(request: Request):
body = await request.json()
messages = body.get("messages", [])
return StreamingResponse(
backpressure_sse_generator(request, messages),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
Pitfall Guide
Pitfall 1: Forgetting X-Accel-Buffering: no
# ❌ Wrong: Nginx buffers SSE responses by default, client doesn't receive real-time data
return StreamingResponse(generator(), media_type="text/event-stream")
# ✅ Correct: Disable Nginx buffering
return StreamingResponse(
generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
Pitfall 2: Non-standard SSE Data Format
# ❌ Wrong: data field not terminated with double newline
yield f"data: {json.dumps(data)}\n"
# ✅ Correct: Each event must end with double newline \n\n
yield f"data: {json.dumps(data)}\n\n"
Pitfall 3: No Heartbeat Keep-Alive
# ❌ Wrong: Long idle periods cause proxy/load balancer to drop connections
async def generator():
while True:
await asyncio.sleep(60)
yield f"data: {json.dumps(data)}\n\n"
# ✅ Correct: Send SSE comment lines as heartbeat
async def generator():
last_heartbeat = time.time()
while True:
if time.time() - last_heartbeat > 15:
yield ": heartbeat\n\n"
last_heartbeat = time.time()
await asyncio.sleep(1)
Pitfall 4: EventSource Doesn't Support POST
// ❌ Wrong: EventSource only supports GET, can't send request body
const es = new EventSource('/sse/chat', {
method: 'POST',
body: JSON.stringify({ messages })
});
// ✅ Correct: Use fetch + ReadableStream for POST scenarios
const response = await fetch('/sse/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ messages }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
const lines = text.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = JSON.parse(line.slice(6));
processChunk(data);
}
}
}
Pitfall 5: Not Handling Client Disconnection
# ❌ Wrong: Client disconnected, generator still running wasting resources
async def generator():
while True:
yield f"data: hello\n\n"
await asyncio.sleep(1)
# ✅ Correct: Check client connection status
async def generator(request: Request):
while True:
if await request.is_disconnected():
break
yield f"data: hello\n\n"
await asyncio.sleep(1)
Error Troubleshooting
| # | Error Message | Cause | Solution |
|---|---|---|---|
| 1 | EventSource's response has a MIME type that is not text/event-stream |
Incorrect response Content-Type | Set media_type="text/event-stream" |
| 2 | net::ERR_INCOMPLETE_CHUNKED_ENCODING |
Nginx buffering causes incomplete chunked transfer | Add X-Accel-Buffering: no header |
| 3 | 502 Bad Gateway |
Backend SSE connection timeout, gateway disconnects | Increase Nginx proxy_read_timeout |
| 4 | Connection refused |
FastAPI/Flask not running or wrong port | Check service port and firewall |
| 5 | 429 Too Many Requests |
LLM API rate limiting | Implement exponential backoff, add request queue |
| 6 | TimeoutError |
LLM inference timeout | Increase httpx timeout, add heartbeat keep-alive |
| 7 | JSONDecodeError |
SSE data line is not valid JSON | Skip non-data lines, add JSON parse error handling |
| 8 | RuntimeError: Event loop is closed |
Using async generator in Flask sync framework | Use asgiref.sync.async_to_sync or switch to FastAPI |
| 9 | BrokenPipeError |
Writing after client disconnect | Check request.is_disconnected(), catch BrokenPipeError |
| 10 | MemoryError |
No backpressure control, buffer grows indefinitely | Implement BackpressureBuffer, limit queue size |
Advanced Optimization
1. Multi-Model Unified Streaming Adapter
from abc import ABC, abstractmethod
from typing import AsyncIterator
import json
import httpx
class LLMStreamAdapter(ABC):
@abstractmethod
async def stream(self, messages: list, model: str) -> AsyncIterator[str]:
pass
class OpenAIStreamAdapter(LLMStreamAdapter):
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.openai.com/v1/chat/completions"
async def stream(self, messages: list, model: str = "gpt-4o") -> AsyncIterator[str]:
headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"}
payload = {"model": model, "messages": messages, "stream": True}
async with httpx.AsyncClient(timeout=120.0) as client:
async with client.stream("POST", self.base_url, json=payload, headers=headers) as resp:
async for line in resp.aiter_lines():
if line.startswith("data: ") and line[6:].strip() != "[DONE]":
chunk = json.loads(line[6:])
content = chunk.get("choices", [{}])[0].get("delta", {}).get("content", "")
if content:
yield content
class AnthropicStreamAdapter(LLMStreamAdapter):
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.anthropic.com/v1/messages"
async def stream(self, messages: list, model: str = "claude-sonnet-4-20250514") -> AsyncIterator[str]:
headers = {
"x-api-key": self.api_key,
"anthropic-version": "2023-06-01",
"Content-Type": "application/json",
}
system_msg = ""
user_messages = []
for m in messages:
if m["role"] == "system":
system_msg = m["content"]
else:
user_messages.append(m)
payload = {"model": model, "messages": user_messages, "stream": True, "max_tokens": 4096}
if system_msg:
payload["system"] = system_msg
async with httpx.AsyncClient(timeout=120.0) as client:
async with client.stream("POST", self.base_url, json=payload, headers=headers) as resp:
async for line in resp.aiter_lines():
if line.startswith("data: "):
try:
chunk = json.loads(line[6:])
if chunk.get("type") == "content_block_delta":
text = chunk.get("delta", {}).get("text", "")
if text:
yield text
except json.JSONDecodeError:
continue
class LLMStreamFactory:
_adapters = {
"openai": OpenAIStreamAdapter,
"anthropic": AnthropicStreamAdapter,
}
@classmethod
def create(cls, provider: str, api_key: str) -> LLMStreamAdapter:
adapter_cls = cls._adapters.get(provider)
if not adapter_cls:
raise ValueError(f"Unsupported provider: {provider}")
return adapter_cls(api_key)
2. SSE Connection Pool Management
import asyncio
import time
from dataclasses import dataclass, field
from typing import Dict, Optional
@dataclass
class SSEConnection:
session_id: str
created_at: float = field(default_factory=time.time)
last_active: float = field(default_factory=time.time)
is_active: bool = True
class SSEConnectionPool:
def __init__(self, max_connections: int = 1000, idle_timeout: int = 300):
self.max_connections = max_connections
self.idle_timeout = idle_timeout
self.connections: Dict[str, SSEConnection] = {}
self._cleanup_task: Optional[asyncio.Task] = None
async def start(self):
self._cleanup_task = asyncio.create_task(self._cleanup_loop())
async def stop(self):
if self._cleanup_task:
self._cleanup_task.cancel()
def register(self, session_id: str) -> bool:
if len(self.connections) >= self.max_connections:
return False
self.connections[session_id] = SSEConnection(session_id=session_id)
return True
def heartbeat(self, session_id: str):
if session_id in self.connections:
self.connections[session_id].last_active = time.time()
def disconnect(self, session_id: str):
if session_id in self.connections:
self.connections[session_id].is_active = False
del self.connections[session_id]
@property
def active_count(self) -> int:
return sum(1 for c in self.connections.values() if c.is_active)
async def _cleanup_loop(self):
while True:
now = time.time()
expired = [
sid for sid, conn in self.connections.items()
if now - conn.last_active > self.idle_timeout
]
for sid in expired:
self.disconnect(sid)
await asyncio.sleep(30)
3. Frontend SSE Client Wrapper
class SSEClient {
constructor(url, options = {}) {
this.url = url;
this.options = {
retryInterval: 1000,
maxRetries: 5,
heartbeatInterval: 15000,
...options,
};
this.eventSource = null;
this.retryCount = 0;
this.lastEventId = null;
this.handlers = new Map();
this._heartbeatTimer = null;
}
connect() {
let url = this.url;
if (this.lastEventId) {
url += `?last_event_id=${this.lastEventId}`;
}
this.eventSource = new EventSource(url);
this.eventSource.onopen = () => {
this.retryCount = 0;
this._startHeartbeat();
this._emit('open', {});
};
this.eventSource.onmessage = (event) => {
this.lastEventId = event.lastEventId;
try {
const data = JSON.parse(event.data);
this._emit('message', data);
} catch {
this._emit('message', { raw: event.data });
}
};
this.eventSource.onerror = () => {
this._stopHeartbeat();
this.eventSource.close();
if (this.retryCount < this.options.maxRetries) {
const delay = Math.min(
this.options.retryInterval * Math.pow(2, this.retryCount),
30000
);
this.retryCount++;
setTimeout(() => this.connect(), delay);
} else {
this._emit('error', { message: 'Max retries exceeded' });
}
};
}
on(event, handler) {
if (!this.handlers.has(event)) {
this.handlers.set(event, []);
}
this.handlers.get(event).push(handler);
return this;
}
_emit(event, data) {
const handlers = this.handlers.get(event) || [];
for (const handler of handlers) {
handler(data);
}
}
_startHeartbeat() {
this._stopHeartbeat();
this._heartbeatTimer = setInterval(() => {
if (this.eventSource?.readyState === EventSource.OPEN) {
this._emit('heartbeat', { timestamp: Date.now() });
}
}, this.options.heartbeatInterval);
}
_stopHeartbeat() {
if (this._heartbeatTimer) {
clearInterval(this._heartbeatTimer);
this._heartbeatTimer = null;
}
}
disconnect() {
this._stopHeartbeat();
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
}
}
Comparison Analysis
| Dimension | SSE | WebSocket | Long Polling | Short Polling | gRPC Stream |
|---|---|---|---|---|---|
| Direction | Unidirectional (server→client) | Bidirectional | Unidirectional | Unidirectional | Bidirectional |
| Protocol | HTTP/1.1+ | WS | HTTP | HTTP | HTTP/2 |
| Auto-reconnect | ✅ Native | ❌ Manual | ✅ | ✅ | ❌ Manual |
| Browser API | EventSource | WebSocket | fetch | fetch | Requires gRPC-Web |
| CDN/Proxy friendly | ✅ | ❌ | ✅ | ✅ | ⚠️ |
| Binary support | ❌ | ✅ | ❌ | ❌ | ✅ |
| LLM fit | ⭐ Best | ⭐ Good | ⭐ Poor | ⭐ Worst | ⭐ Good |
| Complexity | Low | Medium | Low | Lowest | High |
| Resource usage | Low | Medium | High | Highest | Low |
Summary: SSE is the optimal solution for LLM streaming output — HTTP-native protocol, browser EventSource auto-reconnection, unidirectional push perfectly matching LLM generation scenarios. The 5 implementation patterns from simple to complex: FastAPI basic SSE → Flask SSE → LLM streaming chat → error recovery → backpressure control. Production environments must focus on three things: 1) Disable Nginx buffering (
X-Accel-Buffering: no), 2) Heartbeat keep-alive to prevent connection drops, 3) Backpressure control to prevent memory overflow. Use adapter pattern for multi-model abstraction, connection pool for concurrency management.
Recommended Online Tools
- JSON Formatter: /en/json/format
- Base64 Encode/Decode: /en/encode/base64
- Hash Calculator: /en/encode/hash
Try these browser-local tools — no sign-up required →