Python SSE串流輸出LLM:從Server-Sent Events到即時對話的5種實作模式
AI与大数据
等LLM一個字一個字蹦出來,使用者早跑了
你呼叫ChatGPT的API,等了5秒回傳一大坨JSON,使用者以為頁面卡死了;你改成輪詢,每秒請求一次,伺服器扛不住;你用WebSocket,結果發現一個對話就要維持長連線,擴展性差到爆炸。2026年,SSE(Server-Sent Events) 才是LLM串流輸出的最優解——單向推送、自動重連、HTTP原生支援、CDN友善。
本文將從SSE協定原理出發,帶你完成FastAPI SSE→Flask SSE→LLM串流對話→錯誤恢復→背壓控制的5種實作模式,從協定到生產,一步不落。
SSE協定核心概念
| 概念 | 說明 |
|---|---|
| EventSource | 瀏覽器原生SSE客戶端API,自動重連 |
| text/event-stream | SSE的Content-Type,告訴瀏覽器這是串流事件 |
| data欄位 | 事件資料,每行以data:開頭 |
| event欄位 | 自訂事件型別,預設是message |
| id欄位 | 事件ID,用於斷線重連時從上次位置繼續 |
| retry欄位 | 建議客戶端重連間隔(毫秒) |
| Chunked Transfer | HTTP分塊傳輸編碼,SSE的底層傳輸機制 |
SSE與WebSocket對比
SSE優勢:
1. 基於HTTP,無需升級協定,CDN/代理友善
2. 瀏覽器原生EventSource API,自動重連
3. 單向推送,完美匹配LLM輸出場景
4. 文字協定,除錯簡單
WebSocket優勢:
1. 全雙工通訊,適合雙向即時互動
2. 支援二進位資料
3. 更低的幀開銷(2位元組 vs SSE的文字格式)
結論:LLM串流輸出是典型的單向推送場景,SSE是最佳選擇
問題分析:LLM串流輸出的5大挑戰
- 連線穩定性:LLM推理耗時長(10-60秒),網路抖動導致連線中斷,需要自動重連和斷點續傳
- 背壓控制:LLM生成速度快於客戶端消費速度,記憶體溢位風險
- 錯誤恢復:上游LLM API超時、限流、回傳錯誤,需要優雅降級
- 多模型適配:OpenAI/Anthropic/本地模型串流格式各異,需要統一抽象
- 並行管理:大量SSE連線佔用伺服器資源,需要連線池和超時管理
分步實作:5種SSE串流實作模式
模式1:FastAPI基礎SSE
import asyncio
import json
from datetime import datetime
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
app = FastAPI()
async def event_generator(request: Request):
count = 0
while True:
if await request.is_disconnected():
break
data = json.dumps({
"id": count,
"message": f"Event at {datetime.now().isoformat()}",
"timestamp": datetime.now().timestamp()
}, ensure_ascii=False)
yield f"id: {count}\ndata: {data}\n\n"
count += 1
await asyncio.sleep(1)
@app.get("/sse/events")
async def sse_events(request: Request):
return StreamingResponse(
event_generator(request),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
模式2:Flask SSE實作
import json
import time
from datetime import datetime
from flask import Flask, Response
app = Flask(__name__)
def stream_events():
count = 0
while True:
data = json.dumps({
"id": count,
"message": f"Flask SSE event at {datetime.now().isoformat()}",
}, ensure_ascii=False)
yield f"id: {count}\nevent: message\ndata: {data}\n\n"
count += 1
time.sleep(1)
@app.route("/sse/events")
def sse_events():
return Response(
stream_events(),
mimetype="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
模式3:LLM串流對話(OpenAI相容)
import asyncio
import json
import httpx
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
app = FastAPI()
OPENAI_API_KEY = "sk-your-api-key"
OPENAI_API_URL = "https://api.openai.com/v1/chat/completions"
async def llm_stream_generator(request: Request, messages: list, model: str = "gpt-4o"):
headers = {
"Authorization": f"Bearer {OPENAI_API_KEY}",
"Content-Type": "application/json",
}
payload = {
"model": model,
"messages": messages,
"stream": True,
"temperature": 0.7,
}
async with httpx.AsyncClient(timeout=120.0) as client:
async with client.stream("POST", OPENAI_API_URL, json=payload, headers=headers) as response:
if response.status_code != 200:
error_data = await response.aread()
error_msg = json.dumps({"error": error_data.decode()}, ensure_ascii=False)
yield f"event: error\ndata: {error_msg}\n\n"
return
async for line in response.aiter_lines():
if await request.is_disconnected():
break
if line.startswith("data: "):
data_str = line[6:]
if data_str.strip() == "[DONE]":
yield f"data: [DONE]\n\n"
break
try:
chunk = json.loads(data_str)
delta = chunk.get("choices", [{}])[0].get("delta", {})
content = delta.get("content", "")
if content:
escaped = json.dumps({"content": content}, ensure_ascii=False)
yield f"data: {escaped}\n\n"
except json.JSONDecodeError:
continue
@app.post("/sse/chat")
async def sse_chat(request: Request):
body = await request.json()
messages = body.get("messages", [])
model = body.get("model", "gpt-4o")
return StreamingResponse(
llm_stream_generator(request, messages, model),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
模式4:帶錯誤恢復的SSE
import asyncio
import json
from datetime import datetime
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
app = FastAPI()
class SSEStateManager:
def __init__(self):
self.events_store = {}
self.max_store_size = 1000
def store_event(self, session_id: str, event_id: int, data: str):
if session_id not in self.events_store:
self.events_store[session_id] = {}
self.events_store[session_id][event_id] = data
if len(self.events_store[session_id]) > self.max_store_size:
oldest = min(self.events_store[session_id].keys())
del self.events_store[session_id][oldest]
def get_events_after(self, session_id: str, last_event_id: int):
if session_id not in self.events_store:
return []
return [
(eid, self.events_store[session_id][eid])
for eid in sorted(self.events_store[session_id].keys())
if eid > last_event_id
]
state_manager = SSEStateManager()
async def resilient_sse_generator(request: Request, session_id: str, last_event_id: int = 0):
retry_count = 0
max_retries = 3
current_id = last_event_id
missed_events = state_manager.get_events_after(session_id, last_event_id)
for eid, data in missed_events:
yield f"id: {eid}\ndata: {data}\n\n"
current_id = eid
while True:
if await request.is_disconnected():
break
try:
current_id += 1
data = json.dumps({
"id": current_id,
"message": f"Resilient event at {datetime.now().isoformat()}",
"retry_count": retry_count,
}, ensure_ascii=False)
state_manager.store_event(session_id, current_id, data)
yield f"id: {current_id}\ndata: {data}\n\n"
retry_count = 0
await asyncio.sleep(1)
except Exception as e:
retry_count += 1
if retry_count > max_retries:
error_data = json.dumps({"error": str(e), "fatal": True}, ensure_ascii=False)
yield f"event: error\ndata: {error_data}\n\n"
break
retry_delay = min(2 ** retry_count, 30)
yield f"retry: {retry_delay * 1000}\n\n"
await asyncio.sleep(retry_delay)
@app.get("/sse/resilient/{session_id}")
async def resilient_sse(request: Request, session_id: str, last_event_id: int = 0):
return StreamingResponse(
resilient_sse_generator(request, session_id, last_event_id),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
模式5:帶背壓控制的SSE
import asyncio
import json
from collections import deque
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
app = FastAPI()
class BackpressureBuffer:
def __init__(self, max_size: int = 100, high_watermark: float = 0.8, low_watermark: float = 0.3):
self.max_size = max_size
self.high_watermark = int(max_size * high_watermark)
self.low_watermark = int(max_size * low_watermark)
self.buffer = deque(maxlen=max_size)
self.paused = False
self._not_empty = asyncio.Event()
self._not_full = asyncio.Event()
self._not_full.set()
async def put(self, item: str):
while len(self.buffer) >= self.max_size:
self._not_full.clear()
await self._not_full.wait()
self.buffer.append(item)
self._not_empty.set()
if len(self.buffer) >= self.high_watermark:
self.paused = True
async def get(self) -> str:
while not self.buffer:
self._not_empty.clear()
await self._not_empty.wait()
item = self.buffer.popleft()
self._not_full.set()
if len(self.buffer) <= self.low_watermark:
self.paused = False
return item
@property
def is_paused(self) -> bool:
return self.paused
@property
def size(self) -> int:
return len(self.buffer)
async def llm_producer(buffer: BackpressureBuffer, messages: list):
import httpx
headers = {
"Authorization": f"Bearer sk-your-api-key",
"Content-Type": "application/json",
}
payload = {"model": "gpt-4o", "messages": messages, "stream": True}
async with httpx.AsyncClient(timeout=120.0) as client:
async with client.stream("POST", "https://api.openai.com/v1/chat/completions", json=payload, headers=headers) as response:
async for line in response.aiter_lines():
if buffer.is_paused:
while buffer.is_paused:
await asyncio.sleep(0.1)
if line.startswith("data: ") and line[6:].strip() != "[DONE]":
try:
chunk = json.loads(line[6:])
content = chunk.get("choices", [{}])[0].get("delta", {}).get("content", "")
if content:
await buffer.put(content)
except json.JSONDecodeError:
continue
await buffer.put("[DONE]")
async def backpressure_sse_generator(request: Request, messages: list):
buffer = BackpressureBuffer(max_size=100, high_watermark=0.8, low_watermark=0.3)
producer_task = asyncio.create_task(llm_producer(buffer, messages))
try:
while True:
if await request.is_disconnected():
producer_task.cancel()
break
try:
item = await asyncio.wait_for(buffer.get(), timeout=5.0)
if item == "[DONE]":
yield "data: [DONE]\n\n"
break
data = json.dumps({
"content": item,
"buffer_size": buffer.size,
"paused": buffer.is_paused,
}, ensure_ascii=False)
yield f"data: {data}\n\n"
except asyncio.TimeoutError:
yield ": heartbeat\n\n"
finally:
if not producer_task.done():
producer_task.cancel()
@app.post("/sse/backpressure-chat")
async def backpressure_chat(request: Request):
body = await request.json()
messages = body.get("messages", [])
return StreamingResponse(
backpressure_sse_generator(request, messages),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
避坑指南
坑1:忘記設定X-Accel-Buffering: no
# ❌ 錯誤:Nginx預設緩衝SSE回應,客戶端收不到即時資料
return StreamingResponse(generator(), media_type="text/event-stream")
# ✅ 正確:停用Nginx緩衝
return StreamingResponse(
generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
坑2:SSE資料格式不規範
# ❌ 錯誤:data欄位沒有用雙換行結尾
yield f"data: {json.dumps(data)}\n"
# ✅ 正確:每個事件必須以雙換行\n\n結尾
yield f"data: {json.dumps(data)}\n\n"
坑3:沒有心跳保活機制
# ❌ 錯誤:長時間無資料導致代理/負載均衡器斷開連線
async def generator():
while True:
await asyncio.sleep(60)
yield f"data: {json.dumps(data)}\n\n"
# ✅ 正確:定期傳送SSE註解行作為心跳
async def generator():
last_heartbeat = time.time()
while True:
if time.time() - last_heartbeat > 15:
yield ": heartbeat\n\n"
last_heartbeat = time.time()
await asyncio.sleep(1)
坑4:EventSource不支援POST請求
// ❌ 錯誤:EventSource只支援GET,無法傳送訊息體
const es = new EventSource('/sse/chat', {
method: 'POST',
body: JSON.stringify({ messages })
});
// ✅ 正確:POST場景使用fetch + ReadableStream
const response = await fetch('/sse/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ messages }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
const lines = text.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = JSON.parse(line.slice(6));
processChunk(data);
}
}
}
坑5:沒有處理客戶端斷連
# ❌ 錯誤:客戶端已斷開,生成器還在執行浪費資源
async def generator():
while True:
yield f"data: hello\n\n"
await asyncio.sleep(1)
# ✅ 正確:檢查客戶端連線狀態
async def generator(request: Request):
while True:
if await request.is_disconnected():
break
yield f"data: hello\n\n"
await asyncio.sleep(1)
報錯排查
| 序號 | 報錯訊息 | 原因 | 解決方法 |
|---|---|---|---|
| 1 | EventSource's response has a MIME type that is not text/event-stream |
回應Content-Type不正確 | 設定media_type="text/event-stream" |
| 2 | net::ERR_INCOMPLETE_CHUNKED_ENCODING |
Nginx緩衝導致分塊傳輸不完整 | 新增X-Accel-Buffering: no標頭 |
| 3 | 502 Bad Gateway |
後端SSE連線超時,閘道斷開 | 增加Nginx的proxy_read_timeout |
| 4 | Connection refused |
FastAPI/Flask未啟動或連接埠錯誤 | 檢查服務連接埠和防火牆設定 |
| 5 | 429 Too Many Requests |
LLM API限流 | 實作指數退避重試,新增請求佇列 |
| 6 | TimeoutError |
LLM推理超時 | 增加httpx timeout,新增心跳保活 |
| 7 | JSONDecodeError |
SSE資料行不是合法JSON | 跳過非data行,新增JSON解析異常處理 |
| 8 | RuntimeError: Event loop is closed |
Flask同步框架中使用async生成器 | 使用asgiref.sync.async_to_sync或切換FastAPI |
| 9 | BrokenPipeError |
客戶端斷開後繼續寫入 | 檢查request.is_disconnected(),捕獲BrokenPipeError |
| 10 | MemoryError |
背壓控制缺失,緩衝區無限增長 | 實作BackpressureBuffer,限制佇列大小 |
進階最佳化
1. 多模型統一串流適配器
from abc import ABC, abstractmethod
from typing import AsyncIterator
import json
import httpx
class LLMStreamAdapter(ABC):
@abstractmethod
async def stream(self, messages: list, model: str) -> AsyncIterator[str]:
pass
class OpenAIStreamAdapter(LLMStreamAdapter):
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.openai.com/v1/chat/completions"
async def stream(self, messages: list, model: str = "gpt-4o") -> AsyncIterator[str]:
headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"}
payload = {"model": model, "messages": messages, "stream": True}
async with httpx.AsyncClient(timeout=120.0) as client:
async with client.stream("POST", self.base_url, json=payload, headers=headers) as resp:
async for line in resp.aiter_lines():
if line.startswith("data: ") and line[6:].strip() != "[DONE]":
chunk = json.loads(line[6:])
content = chunk.get("choices", [{}])[0].get("delta", {}).get("content", "")
if content:
yield content
class AnthropicStreamAdapter(LLMStreamAdapter):
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.anthropic.com/v1/messages"
async def stream(self, messages: list, model: str = "claude-sonnet-4-20250514") -> AsyncIterator[str]:
headers = {
"x-api-key": self.api_key,
"anthropic-version": "2023-06-01",
"Content-Type": "application/json",
}
system_msg = ""
user_messages = []
for m in messages:
if m["role"] == "system":
system_msg = m["content"]
else:
user_messages.append(m)
payload = {"model": model, "messages": user_messages, "stream": True, "max_tokens": 4096}
if system_msg:
payload["system"] = system_msg
async with httpx.AsyncClient(timeout=120.0) as client:
async with client.stream("POST", self.base_url, json=payload, headers=headers) as resp:
async for line in resp.aiter_lines():
if line.startswith("data: "):
try:
chunk = json.loads(line[6:])
if chunk.get("type") == "content_block_delta":
text = chunk.get("delta", {}).get("text", "")
if text:
yield text
except json.JSONDecodeError:
continue
class LLMStreamFactory:
_adapters = {
"openai": OpenAIStreamAdapter,
"anthropic": AnthropicStreamAdapter,
}
@classmethod
def create(cls, provider: str, api_key: str) -> LLMStreamAdapter:
adapter_cls = cls._adapters.get(provider)
if not adapter_cls:
raise ValueError(f"Unsupported provider: {provider}")
return adapter_cls(api_key)
2. SSE連線池管理
import asyncio
import time
from dataclasses import dataclass, field
from typing import Dict, Optional
@dataclass
class SSEConnection:
session_id: str
created_at: float = field(default_factory=time.time)
last_active: float = field(default_factory=time.time)
is_active: bool = True
class SSEConnectionPool:
def __init__(self, max_connections: int = 1000, idle_timeout: int = 300):
self.max_connections = max_connections
self.idle_timeout = idle_timeout
self.connections: Dict[str, SSEConnection] = {}
self._cleanup_task: Optional[asyncio.Task] = None
async def start(self):
self._cleanup_task = asyncio.create_task(self._cleanup_loop())
async def stop(self):
if self._cleanup_task:
self._cleanup_task.cancel()
def register(self, session_id: str) -> bool:
if len(self.connections) >= self.max_connections:
return False
self.connections[session_id] = SSEConnection(session_id=session_id)
return True
def heartbeat(self, session_id: str):
if session_id in self.connections:
self.connections[session_id].last_active = time.time()
def disconnect(self, session_id: str):
if session_id in self.connections:
self.connections[session_id].is_active = False
del self.connections[session_id]
@property
def active_count(self) -> int:
return sum(1 for c in self.connections.values() if c.is_active)
async def _cleanup_loop(self):
while True:
now = time.time()
expired = [
sid for sid, conn in self.connections.items()
if now - conn.last_active > self.idle_timeout
]
for sid in expired:
self.disconnect(sid)
await asyncio.sleep(30)
3. 前端SSE客戶端封裝
class SSEClient {
constructor(url, options = {}) {
this.url = url;
this.options = {
retryInterval: 1000,
maxRetries: 5,
heartbeatInterval: 15000,
...options,
};
this.eventSource = null;
this.retryCount = 0;
this.lastEventId = null;
this.handlers = new Map();
this._heartbeatTimer = null;
}
connect() {
let url = this.url;
if (this.lastEventId) {
url += `?last_event_id=${this.lastEventId}`;
}
this.eventSource = new EventSource(url);
this.eventSource.onopen = () => {
this.retryCount = 0;
this._startHeartbeat();
this._emit('open', {});
};
this.eventSource.onmessage = (event) => {
this.lastEventId = event.lastEventId;
try {
const data = JSON.parse(event.data);
this._emit('message', data);
} catch {
this._emit('message', { raw: event.data });
}
};
this.eventSource.onerror = () => {
this._stopHeartbeat();
this.eventSource.close();
if (this.retryCount < this.options.maxRetries) {
const delay = Math.min(
this.options.retryInterval * Math.pow(2, this.retryCount),
30000
);
this.retryCount++;
setTimeout(() => this.connect(), delay);
} else {
this._emit('error', { message: 'Max retries exceeded' });
}
};
}
on(event, handler) {
if (!this.handlers.has(event)) {
this.handlers.set(event, []);
}
this.handlers.get(event).push(handler);
return this;
}
_emit(event, data) {
const handlers = this.handlers.get(event) || [];
for (const handler of handlers) {
handler(data);
}
}
_startHeartbeat() {
this._stopHeartbeat();
this._heartbeatTimer = setInterval(() => {
if (this.eventSource?.readyState === EventSource.OPEN) {
this._emit('heartbeat', { timestamp: Date.now() });
}
}, this.options.heartbeatInterval);
}
_stopHeartbeat() {
if (this._heartbeatTimer) {
clearInterval(this._heartbeatTimer);
this._heartbeatTimer = null;
}
}
disconnect() {
this._stopHeartbeat();
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
}
}
對比分析
| 維度 | SSE | WebSocket | 長輪詢 | 短輪詢 | gRPC Stream |
|---|---|---|---|---|---|
| 傳輸方向 | 單向(服務端→客戶端) | 雙向 | 單向 | 單向 | 雙向 |
| 協定 | HTTP/1.1+ | WS | HTTP | HTTP | HTTP/2 |
| 自動重連 | ✅原生 | ❌需手動 | ✅ | ✅ | ❌需手動 |
| 瀏覽器API | EventSource | WebSocket | fetch | fetch | 需gRPC-Web |
| CDN/代理友善 | ✅ | ❌ | ✅ | ✅ | ⚠️ |
| 二進位支援 | ❌ | ✅ | ❌ | ❌ | ✅ |
| LLM適配度 | ⭐最佳 | ⭐好 | ⭐差 | ⭐最差 | ⭐好 |
| 實作複雜度 | 低 | 中 | 低 | 最低 | 高 |
| 資源佔用 | 低 | 中 | 高 | 最高 | 低 |
總結:SSE是LLM串流輸出的最優解——基於HTTP原生協定,瀏覽器EventSource自動重連,單向推送完美匹配LLM生成場景。5種實作模式從簡到繁:FastAPI基礎SSE→Flask SSE→LLM串流對話→錯誤恢復→背壓控制。生產環境必須關注三點:1)Nginx緩衝停用(
X-Accel-Buffering: no),2)心跳保活防止連線斷開,3)背壓控制防止記憶體溢位。多模型場景用適配器模式統一抽象,連線池管理控制並行。
線上工具推薦
- JSON格式化:/zh-TW/json/format
- Base64編解碼:/zh-TW/encode/base64
- Hash計算:/zh-TW/encode/hash
本站提供瀏覽器本地工具,免註冊即可試用 →
#Python#SSE#流式输出#LLM#Server-Sent Events#2026#实时对话