Python SSE Streaming LLM Output: 5 Implementation Patterns from Server-Sent Events to Real-time Chat

AI与大数据

While the LLM Spits Out One Word at a Time, Users Are Already Gone

You call the ChatGPT API, wait 5 seconds for a giant JSON blob, and users think the page is frozen. You switch to polling, hitting the server every second, and it can't handle the load. You try WebSocket, but find each conversation needs a persistent connection — scalability is a nightmare. In 2026, SSE (Server-Sent Events) is the optimal solution for LLM streaming output — unidirectional push, automatic reconnection, native HTTP support, CDN-friendly.

This article starts from SSE protocol principles and guides you through FastAPI SSE → Flask SSE → LLM streaming chat → error recovery → backpressure control — 5 implementation patterns from protocol to production.


SSE Protocol Core Concepts

Concept Description
EventSource Browser-native SSE client API with auto-reconnection
text/event-stream SSE Content-Type, tells the browser this is a stream of events
data field Event data, each line prefixed with data:
event field Custom event type, defaults to message
id field Event ID, used for resuming from last position on reconnection
retry field Suggested client reconnection interval (milliseconds)
Chunked Transfer HTTP chunked transfer encoding, the underlying transport for SSE

SSE vs WebSocket Comparison

SSE Advantages:
1. HTTP-based, no protocol upgrade needed, CDN/proxy friendly
2. Browser-native EventSource API with auto-reconnection
3. Unidirectional push, perfect for LLM output scenarios
4. Text protocol, easy to debug

WebSocket Advantages:
1. Full-duplex communication, suitable for bidirectional real-time interaction
2. Supports binary data
3. Lower frame overhead (2 bytes vs SSE text format)

Conclusion: LLM streaming output is a classic unidirectional push scenario — SSE is the best choice

Problem Analysis: 5 Major LLM Streaming Challenges

  1. Connection stability: LLM inference takes long (10-60s), network jitter causes disconnections — needs auto-reconnection and resume
  2. Backpressure control: LLM generates faster than clients consume, memory overflow risk
  3. Error recovery: Upstream LLM API timeouts, rate limits, errors — needs graceful degradation
  4. Multi-model adaptation: OpenAI/Anthropic/local models have different streaming formats — needs unified abstraction
  5. Concurrency management: Many SSE connections consume server resources — needs connection pooling and timeout management

Step-by-Step: 5 SSE Streaming Implementation Patterns

Pattern 1: Basic FastAPI SSE

import asyncio
import json
from datetime import datetime
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse

app = FastAPI()

async def event_generator(request: Request):
    count = 0
    while True:
        if await request.is_disconnected():
            break
        data = json.dumps({
            "id": count,
            "message": f"Event at {datetime.now().isoformat()}",
            "timestamp": datetime.now().timestamp()
        }, ensure_ascii=False)
        yield f"id: {count}\ndata: {data}\n\n"
        count += 1
        await asyncio.sleep(1)

@app.get("/sse/events")
async def sse_events(request: Request):
    return StreamingResponse(
        event_generator(request),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        }
    )

Pattern 2: Flask SSE Implementation

import json
import time
from datetime import datetime
from flask import Flask, Response

app = Flask(__name__)

def stream_events():
    count = 0
    while True:
        data = json.dumps({
            "id": count,
            "message": f"Flask SSE event at {datetime.now().isoformat()}",
        }, ensure_ascii=False)
        yield f"id: {count}\nevent: message\ndata: {data}\n\n"
        count += 1
        time.sleep(1)

@app.route("/sse/events")
def sse_events():
    return Response(
        stream_events(),
        mimetype="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        }
    )

Pattern 3: LLM Streaming Chat (OpenAI Compatible)

import asyncio
import json
import httpx
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse

app = FastAPI()

OPENAI_API_KEY = "sk-your-api-key"
OPENAI_API_URL = "https://api.openai.com/v1/chat/completions"

async def llm_stream_generator(request: Request, messages: list, model: str = "gpt-4o"):
    headers = {
        "Authorization": f"Bearer {OPENAI_API_KEY}",
        "Content-Type": "application/json",
    }
    payload = {
        "model": model,
        "messages": messages,
        "stream": True,
        "temperature": 0.7,
    }

    async with httpx.AsyncClient(timeout=120.0) as client:
        async with client.stream("POST", OPENAI_API_URL, json=payload, headers=headers) as response:
            if response.status_code != 200:
                error_data = await response.aread()
                error_msg = json.dumps({"error": error_data.decode()}, ensure_ascii=False)
                yield f"event: error\ndata: {error_msg}\n\n"
                return

            async for line in response.aiter_lines():
                if await request.is_disconnected():
                    break
                if line.startswith("data: "):
                    data_str = line[6:]
                    if data_str.strip() == "[DONE]":
                        yield f"data: [DONE]\n\n"
                        break
                    try:
                        chunk = json.loads(data_str)
                        delta = chunk.get("choices", [{}])[0].get("delta", {})
                        content = delta.get("content", "")
                        if content:
                            escaped = json.dumps({"content": content}, ensure_ascii=False)
                            yield f"data: {escaped}\n\n"
                    except json.JSONDecodeError:
                        continue

@app.post("/sse/chat")
async def sse_chat(request: Request):
    body = await request.json()
    messages = body.get("messages", [])
    model = body.get("model", "gpt-4o")
    return StreamingResponse(
        llm_stream_generator(request, messages, model),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        }
    )

Pattern 4: SSE with Error Recovery

import asyncio
import json
from datetime import datetime
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse

app = FastAPI()

class SSEStateManager:
    def __init__(self):
        self.events_store = {}
        self.max_store_size = 1000

    def store_event(self, session_id: str, event_id: int, data: str):
        if session_id not in self.events_store:
            self.events_store[session_id] = {}
        self.events_store[session_id][event_id] = data
        if len(self.events_store[session_id]) > self.max_store_size:
            oldest = min(self.events_store[session_id].keys())
            del self.events_store[session_id][oldest]

    def get_events_after(self, session_id: str, last_event_id: int):
        if session_id not in self.events_store:
            return []
        return [
            (eid, self.events_store[session_id][eid])
            for eid in sorted(self.events_store[session_id].keys())
            if eid > last_event_id
        ]

state_manager = SSEStateManager()

async def resilient_sse_generator(request: Request, session_id: str, last_event_id: int = 0):
    retry_count = 0
    max_retries = 3
    current_id = last_event_id

    missed_events = state_manager.get_events_after(session_id, last_event_id)
    for eid, data in missed_events:
        yield f"id: {eid}\ndata: {data}\n\n"
        current_id = eid

    while True:
        if await request.is_disconnected():
            break
        try:
            current_id += 1
            data = json.dumps({
                "id": current_id,
                "message": f"Resilient event at {datetime.now().isoformat()}",
                "retry_count": retry_count,
            }, ensure_ascii=False)
            state_manager.store_event(session_id, current_id, data)
            yield f"id: {current_id}\ndata: {data}\n\n"
            retry_count = 0
            await asyncio.sleep(1)
        except Exception as e:
            retry_count += 1
            if retry_count > max_retries:
                error_data = json.dumps({"error": str(e), "fatal": True}, ensure_ascii=False)
                yield f"event: error\ndata: {error_data}\n\n"
                break
            retry_delay = min(2 ** retry_count, 30)
            yield f"retry: {retry_delay * 1000}\n\n"
            await asyncio.sleep(retry_delay)

@app.get("/sse/resilient/{session_id}")
async def resilient_sse(request: Request, session_id: str, last_event_id: int = 0):
    return StreamingResponse(
        resilient_sse_generator(request, session_id, last_event_id),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        }
    )

Pattern 5: SSE with Backpressure Control

import asyncio
import json
from collections import deque
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse

app = FastAPI()

class BackpressureBuffer:
    def __init__(self, max_size: int = 100, high_watermark: float = 0.8, low_watermark: float = 0.3):
        self.max_size = max_size
        self.high_watermark = int(max_size * high_watermark)
        self.low_watermark = int(max_size * low_watermark)
        self.buffer = deque(maxlen=max_size)
        self.paused = False
        self._not_empty = asyncio.Event()
        self._not_full = asyncio.Event()
        self._not_full.set()

    async def put(self, item: str):
        while len(self.buffer) >= self.max_size:
            self._not_full.clear()
            await self._not_full.wait()
        self.buffer.append(item)
        self._not_empty.set()
        if len(self.buffer) >= self.high_watermark:
            self.paused = True

    async def get(self) -> str:
        while not self.buffer:
            self._not_empty.clear()
            await self._not_empty.wait()
        item = self.buffer.popleft()
        self._not_full.set()
        if len(self.buffer) <= self.low_watermark:
            self.paused = False
        return item

    @property
    def is_paused(self) -> bool:
        return self.paused

    @property
    def size(self) -> int:
        return len(self.buffer)

async def llm_producer(buffer: BackpressureBuffer, messages: list):
    import httpx
    headers = {
        "Authorization": f"Bearer sk-your-api-key",
        "Content-Type": "application/json",
    }
    payload = {"model": "gpt-4o", "messages": messages, "stream": True}
    async with httpx.AsyncClient(timeout=120.0) as client:
        async with client.stream("POST", "https://api.openai.com/v1/chat/completions", json=payload, headers=headers) as response:
            async for line in response.aiter_lines():
                if buffer.is_paused:
                    while buffer.is_paused:
                        await asyncio.sleep(0.1)
                if line.startswith("data: ") and line[6:].strip() != "[DONE]":
                    try:
                        chunk = json.loads(line[6:])
                        content = chunk.get("choices", [{}])[0].get("delta", {}).get("content", "")
                        if content:
                            await buffer.put(content)
                    except json.JSONDecodeError:
                        continue
    await buffer.put("[DONE]")

async def backpressure_sse_generator(request: Request, messages: list):
    buffer = BackpressureBuffer(max_size=100, high_watermark=0.8, low_watermark=0.3)
    producer_task = asyncio.create_task(llm_producer(buffer, messages))

    try:
        while True:
            if await request.is_disconnected():
                producer_task.cancel()
                break
            try:
                item = await asyncio.wait_for(buffer.get(), timeout=5.0)
                if item == "[DONE]":
                    yield "data: [DONE]\n\n"
                    break
                data = json.dumps({
                    "content": item,
                    "buffer_size": buffer.size,
                    "paused": buffer.is_paused,
                }, ensure_ascii=False)
                yield f"data: {data}\n\n"
            except asyncio.TimeoutError:
                yield ": heartbeat\n\n"
    finally:
        if not producer_task.done():
            producer_task.cancel()

@app.post("/sse/backpressure-chat")
async def backpressure_chat(request: Request):
    body = await request.json()
    messages = body.get("messages", [])
    return StreamingResponse(
        backpressure_sse_generator(request, messages),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        }
    )

Pitfall Guide

Pitfall 1: Forgetting X-Accel-Buffering: no

# ❌ Wrong: Nginx buffers SSE responses by default, client doesn't receive real-time data
return StreamingResponse(generator(), media_type="text/event-stream")

# ✅ Correct: Disable Nginx buffering
return StreamingResponse(
    generator(),
    media_type="text/event-stream",
    headers={
        "Cache-Control": "no-cache",
        "Connection": "keep-alive",
        "X-Accel-Buffering": "no",
    }
)

Pitfall 2: Non-standard SSE Data Format

# ❌ Wrong: data field not terminated with double newline
yield f"data: {json.dumps(data)}\n"

# ✅ Correct: Each event must end with double newline \n\n
yield f"data: {json.dumps(data)}\n\n"

Pitfall 3: No Heartbeat Keep-Alive

# ❌ Wrong: Long idle periods cause proxy/load balancer to drop connections
async def generator():
    while True:
        await asyncio.sleep(60)
        yield f"data: {json.dumps(data)}\n\n"

# ✅ Correct: Send SSE comment lines as heartbeat
async def generator():
    last_heartbeat = time.time()
    while True:
        if time.time() - last_heartbeat > 15:
            yield ": heartbeat\n\n"
            last_heartbeat = time.time()
        await asyncio.sleep(1)

Pitfall 4: EventSource Doesn't Support POST

// ❌ Wrong: EventSource only supports GET, can't send request body
const es = new EventSource('/sse/chat', {
    method: 'POST',
    body: JSON.stringify({ messages })
});

// ✅ Correct: Use fetch + ReadableStream for POST scenarios
const response = await fetch('/sse/chat', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ messages }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    const text = decoder.decode(value);
    const lines = text.split('\n');
    for (const line of lines) {
        if (line.startsWith('data: ')) {
            const data = JSON.parse(line.slice(6));
            processChunk(data);
        }
    }
}

Pitfall 5: Not Handling Client Disconnection

# ❌ Wrong: Client disconnected, generator still running wasting resources
async def generator():
    while True:
        yield f"data: hello\n\n"
        await asyncio.sleep(1)

# ✅ Correct: Check client connection status
async def generator(request: Request):
    while True:
        if await request.is_disconnected():
            break
        yield f"data: hello\n\n"
        await asyncio.sleep(1)

Error Troubleshooting

# Error Message Cause Solution
1 EventSource's response has a MIME type that is not text/event-stream Incorrect response Content-Type Set media_type="text/event-stream"
2 net::ERR_INCOMPLETE_CHUNKED_ENCODING Nginx buffering causes incomplete chunked transfer Add X-Accel-Buffering: no header
3 502 Bad Gateway Backend SSE connection timeout, gateway disconnects Increase Nginx proxy_read_timeout
4 Connection refused FastAPI/Flask not running or wrong port Check service port and firewall
5 429 Too Many Requests LLM API rate limiting Implement exponential backoff, add request queue
6 TimeoutError LLM inference timeout Increase httpx timeout, add heartbeat keep-alive
7 JSONDecodeError SSE data line is not valid JSON Skip non-data lines, add JSON parse error handling
8 RuntimeError: Event loop is closed Using async generator in Flask sync framework Use asgiref.sync.async_to_sync or switch to FastAPI
9 BrokenPipeError Writing after client disconnect Check request.is_disconnected(), catch BrokenPipeError
10 MemoryError No backpressure control, buffer grows indefinitely Implement BackpressureBuffer, limit queue size

Advanced Optimization

1. Multi-Model Unified Streaming Adapter

from abc import ABC, abstractmethod
from typing import AsyncIterator
import json
import httpx

class LLMStreamAdapter(ABC):
    @abstractmethod
    async def stream(self, messages: list, model: str) -> AsyncIterator[str]:
        pass

class OpenAIStreamAdapter(LLMStreamAdapter):
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.openai.com/v1/chat/completions"

    async def stream(self, messages: list, model: str = "gpt-4o") -> AsyncIterator[str]:
        headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"}
        payload = {"model": model, "messages": messages, "stream": True}
        async with httpx.AsyncClient(timeout=120.0) as client:
            async with client.stream("POST", self.base_url, json=payload, headers=headers) as resp:
                async for line in resp.aiter_lines():
                    if line.startswith("data: ") and line[6:].strip() != "[DONE]":
                        chunk = json.loads(line[6:])
                        content = chunk.get("choices", [{}])[0].get("delta", {}).get("content", "")
                        if content:
                            yield content

class AnthropicStreamAdapter(LLMStreamAdapter):
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.anthropic.com/v1/messages"

    async def stream(self, messages: list, model: str = "claude-sonnet-4-20250514") -> AsyncIterator[str]:
        headers = {
            "x-api-key": self.api_key,
            "anthropic-version": "2023-06-01",
            "Content-Type": "application/json",
        }
        system_msg = ""
        user_messages = []
        for m in messages:
            if m["role"] == "system":
                system_msg = m["content"]
            else:
                user_messages.append(m)
        payload = {"model": model, "messages": user_messages, "stream": True, "max_tokens": 4096}
        if system_msg:
            payload["system"] = system_msg
        async with httpx.AsyncClient(timeout=120.0) as client:
            async with client.stream("POST", self.base_url, json=payload, headers=headers) as resp:
                async for line in resp.aiter_lines():
                    if line.startswith("data: "):
                        try:
                            chunk = json.loads(line[6:])
                            if chunk.get("type") == "content_block_delta":
                                text = chunk.get("delta", {}).get("text", "")
                                if text:
                                    yield text
                        except json.JSONDecodeError:
                            continue

class LLMStreamFactory:
    _adapters = {
        "openai": OpenAIStreamAdapter,
        "anthropic": AnthropicStreamAdapter,
    }

    @classmethod
    def create(cls, provider: str, api_key: str) -> LLMStreamAdapter:
        adapter_cls = cls._adapters.get(provider)
        if not adapter_cls:
            raise ValueError(f"Unsupported provider: {provider}")
        return adapter_cls(api_key)

2. SSE Connection Pool Management

import asyncio
import time
from dataclasses import dataclass, field
from typing import Dict, Optional

@dataclass
class SSEConnection:
    session_id: str
    created_at: float = field(default_factory=time.time)
    last_active: float = field(default_factory=time.time)
    is_active: bool = True

class SSEConnectionPool:
    def __init__(self, max_connections: int = 1000, idle_timeout: int = 300):
        self.max_connections = max_connections
        self.idle_timeout = idle_timeout
        self.connections: Dict[str, SSEConnection] = {}
        self._cleanup_task: Optional[asyncio.Task] = None

    async def start(self):
        self._cleanup_task = asyncio.create_task(self._cleanup_loop())

    async def stop(self):
        if self._cleanup_task:
            self._cleanup_task.cancel()

    def register(self, session_id: str) -> bool:
        if len(self.connections) >= self.max_connections:
            return False
        self.connections[session_id] = SSEConnection(session_id=session_id)
        return True

    def heartbeat(self, session_id: str):
        if session_id in self.connections:
            self.connections[session_id].last_active = time.time()

    def disconnect(self, session_id: str):
        if session_id in self.connections:
            self.connections[session_id].is_active = False
            del self.connections[session_id]

    @property
    def active_count(self) -> int:
        return sum(1 for c in self.connections.values() if c.is_active)

    async def _cleanup_loop(self):
        while True:
            now = time.time()
            expired = [
                sid for sid, conn in self.connections.items()
                if now - conn.last_active > self.idle_timeout
            ]
            for sid in expired:
                self.disconnect(sid)
            await asyncio.sleep(30)

3. Frontend SSE Client Wrapper

class SSEClient {
    constructor(url, options = {}) {
        this.url = url;
        this.options = {
            retryInterval: 1000,
            maxRetries: 5,
            heartbeatInterval: 15000,
            ...options,
        };
        this.eventSource = null;
        this.retryCount = 0;
        this.lastEventId = null;
        this.handlers = new Map();
        this._heartbeatTimer = null;
    }

    connect() {
        let url = this.url;
        if (this.lastEventId) {
            url += `?last_event_id=${this.lastEventId}`;
        }

        this.eventSource = new EventSource(url);

        this.eventSource.onopen = () => {
            this.retryCount = 0;
            this._startHeartbeat();
            this._emit('open', {});
        };

        this.eventSource.onmessage = (event) => {
            this.lastEventId = event.lastEventId;
            try {
                const data = JSON.parse(event.data);
                this._emit('message', data);
            } catch {
                this._emit('message', { raw: event.data });
            }
        };

        this.eventSource.onerror = () => {
            this._stopHeartbeat();
            this.eventSource.close();

            if (this.retryCount < this.options.maxRetries) {
                const delay = Math.min(
                    this.options.retryInterval * Math.pow(2, this.retryCount),
                    30000
                );
                this.retryCount++;
                setTimeout(() => this.connect(), delay);
            } else {
                this._emit('error', { message: 'Max retries exceeded' });
            }
        };
    }

    on(event, handler) {
        if (!this.handlers.has(event)) {
            this.handlers.set(event, []);
        }
        this.handlers.get(event).push(handler);
        return this;
    }

    _emit(event, data) {
        const handlers = this.handlers.get(event) || [];
        for (const handler of handlers) {
            handler(data);
        }
    }

    _startHeartbeat() {
        this._stopHeartbeat();
        this._heartbeatTimer = setInterval(() => {
            if (this.eventSource?.readyState === EventSource.OPEN) {
                this._emit('heartbeat', { timestamp: Date.now() });
            }
        }, this.options.heartbeatInterval);
    }

    _stopHeartbeat() {
        if (this._heartbeatTimer) {
            clearInterval(this._heartbeatTimer);
            this._heartbeatTimer = null;
        }
    }

    disconnect() {
        this._stopHeartbeat();
        if (this.eventSource) {
            this.eventSource.close();
            this.eventSource = null;
        }
    }
}

Comparison Analysis

Dimension SSE WebSocket Long Polling Short Polling gRPC Stream
Direction Unidirectional (server→client) Bidirectional Unidirectional Unidirectional Bidirectional
Protocol HTTP/1.1+ WS HTTP HTTP HTTP/2
Auto-reconnect ✅ Native ❌ Manual ❌ Manual
Browser API EventSource WebSocket fetch fetch Requires gRPC-Web
CDN/Proxy friendly ⚠️
Binary support
LLM fit ⭐ Best ⭐ Good ⭐ Poor ⭐ Worst ⭐ Good
Complexity Low Medium Low Lowest High
Resource usage Low Medium High Highest Low

Summary: SSE is the optimal solution for LLM streaming output — HTTP-native protocol, browser EventSource auto-reconnection, unidirectional push perfectly matching LLM generation scenarios. The 5 implementation patterns from simple to complex: FastAPI basic SSE → Flask SSE → LLM streaming chat → error recovery → backpressure control. Production environments must focus on three things: 1) Disable Nginx buffering (X-Accel-Buffering: no), 2) Heartbeat keep-alive to prevent connection drops, 3) Backpressure control to prevent memory overflow. Use adapter pattern for multi-model abstraction, connection pool for concurrency management.


Try these browser-local tools — no sign-up required →

#Python#SSE#流式输出#LLM#Server-Sent Events#2026#实时对话