Python SSE流式输出LLM:从Server-Sent Events到实时对话的5种实现模式

AI与大数据

等LLM一个字一个字蹦出来,用户早跑了

你调ChatGPT的API,等了5秒返回一大坨JSON,用户以为页面卡死了;你改成轮询,每秒请求一次,服务器扛不住;你用WebSocket,结果发现一个对话就要维持长连接,扩展性差到爆炸。2026年,SSE(Server-Sent Events) 才是LLM流式输出的最优解——单向推送、自动重连、HTTP原生支持、CDN友好。

本文将从SSE协议原理出发,带你完成FastAPI SSE→Flask SSE→LLM流式对话→错误恢复→背压控制的5种实现模式,从协议到生产,一步不落。


SSE协议核心概念

概念 说明
EventSource 浏览器原生SSE客户端API,自动重连
text/event-stream SSE的Content-Type,告诉浏览器这是流式事件
data字段 事件数据,每行以data:开头
event字段 自定义事件类型,默认是message
id字段 事件ID,用于断线重连时从上次位置继续
retry字段 建议客户端重连间隔(毫秒)
Chunked Transfer HTTP分块传输编码,SSE的底层传输机制

SSE与WebSocket对比

SSE优势:
1. 基于HTTP,无需升级协议,CDN/代理友好
2. 浏览器原生EventSource API,自动重连
3. 单向推送,完美匹配LLM输出场景
4. 文本协议,调试简单

WebSocket优势:
1. 全双工通信,适合双向实时交互
2. 支持二进制数据
3. 更低的帧开销(2字节 vs SSE的文本格式)

结论:LLM流式输出是典型的单向推送场景,SSE是最佳选择

问题分析:LLM流式输出的5大挑战

  1. 连接稳定性:LLM推理耗时长(10-60秒),网络抖动导致连接中断,需要自动重连和断点续传
  2. 背压控制:LLM生成速度快于客户端消费速度,内存溢出风险
  3. 错误恢复:上游LLM API超时、限流、返回错误,需要优雅降级
  4. 多模型适配:OpenAI/Anthropic/本地模型流式格式各异,需要统一抽象
  5. 并发管理:大量SSE连接占用服务器资源,需要连接池和超时管理

分步实操:5种SSE流式实现模式

模式1:FastAPI基础SSE

import asyncio
import json
from datetime import datetime
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse

app = FastAPI()

async def event_generator(request: Request):
    count = 0
    while True:
        if await request.is_disconnected():
            break
        data = json.dumps({
            "id": count,
            "message": f"Event at {datetime.now().isoformat()}",
            "timestamp": datetime.now().timestamp()
        }, ensure_ascii=False)
        yield f"id: {count}\ndata: {data}\n\n"
        count += 1
        await asyncio.sleep(1)

@app.get("/sse/events")
async def sse_events(request: Request):
    return StreamingResponse(
        event_generator(request),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        }
    )

模式2:Flask SSE实现

import json
import time
import threading
from datetime import datetime
from flask import Flask, Response, request

app = Flask(__name__)

def stream_events():
    count = 0
    while True:
        data = json.dumps({
            "id": count,
            "message": f"Flask SSE event at {datetime.now().isoformat()}",
        }, ensure_ascii=False)
        yield f"id: {count}\nevent: message\ndata: {data}\n\n"
        count += 1
        time.sleep(1)

@app.route("/sse/events")
def sse_events():
    return Response(
        stream_events(),
        mimetype="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        }
    )

模式3:LLM流式对话(OpenAI兼容)

import asyncio
import json
import httpx
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse

app = FastAPI()

OPENAI_API_KEY = "sk-your-api-key"
OPENAI_API_URL = "https://api.openai.com/v1/chat/completions"

async def llm_stream_generator(request: Request, messages: list, model: str = "gpt-4o"):
    headers = {
        "Authorization": f"Bearer {OPENAI_API_KEY}",
        "Content-Type": "application/json",
    }
    payload = {
        "model": model,
        "messages": messages,
        "stream": True,
        "temperature": 0.7,
    }

    async with httpx.AsyncClient(timeout=120.0) as client:
        async with client.stream("POST", OPENAI_API_URL, json=payload, headers=headers) as response:
            if response.status_code != 200:
                error_data = await response.aread()
                error_msg = json.dumps({"error": error_data.decode()}, ensure_ascii=False)
                yield f"event: error\ndata: {error_msg}\n\n"
                return

            async for line in response.aiter_lines():
                if await request.is_disconnected():
                    break
                if line.startswith("data: "):
                    data_str = line[6:]
                    if data_str.strip() == "[DONE]":
                        yield f"data: [DONE]\n\n"
                        break
                    try:
                        chunk = json.loads(data_str)
                        delta = chunk.get("choices", [{}])[0].get("delta", {})
                        content = delta.get("content", "")
                        if content:
                            escaped = json.dumps({"content": content}, ensure_ascii=False)
                            yield f"data: {escaped}\n\n"
                    except json.JSONDecodeError:
                        continue

@app.post("/sse/chat")
async def sse_chat(request: Request):
    body = await request.json()
    messages = body.get("messages", [])
    model = body.get("model", "gpt-4o")
    return StreamingResponse(
        llm_stream_generator(request, messages, model),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        }
    )

模式4:带错误恢复的SSE

import asyncio
import json
import time
from datetime import datetime
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse

app = FastAPI()

class SSEStateManager:
    def __init__(self):
        self.events_store = {}
        self.max_store_size = 1000

    def store_event(self, session_id: str, event_id: int, data: str):
        if session_id not in self.events_store:
            self.events_store[session_id] = {}
        self.events_store[session_id][event_id] = data
        if len(self.events_store[session_id]) > self.max_store_size:
            oldest = min(self.events_store[session_id].keys())
            del self.events_store[session_id][oldest]

    def get_events_after(self, session_id: str, last_event_id: int):
        if session_id not in self.events_store:
            return []
        return [
            (eid, self.events_store[session_id][eid])
            for eid in sorted(self.events_store[session_id].keys())
            if eid > last_event_id
        ]

state_manager = SSEStateManager()

async def resilient_sse_generator(request: Request, session_id: str, last_event_id: int = 0):
    retry_count = 0
    max_retries = 3
    current_id = last_event_id

    missed_events = state_manager.get_events_after(session_id, last_event_id)
    for eid, data in missed_events:
        yield f"id: {eid}\ndata: {data}\n\n"
        current_id = eid

    while True:
        if await request.is_disconnected():
            break
        try:
            current_id += 1
            data = json.dumps({
                "id": current_id,
                "message": f"Resilient event at {datetime.now().isoformat()}",
                "retry_count": retry_count,
            }, ensure_ascii=False)
            state_manager.store_event(session_id, current_id, data)
            yield f"id: {current_id}\ndata: {data}\n\n"
            retry_count = 0
            await asyncio.sleep(1)
        except Exception as e:
            retry_count += 1
            if retry_count > max_retries:
                error_data = json.dumps({"error": str(e), "fatal": True}, ensure_ascii=False)
                yield f"event: error\ndata: {error_data}\n\n"
                break
            retry_delay = min(2 ** retry_count, 30)
            yield f"retry: {retry_delay * 1000}\n\n"
            await asyncio.sleep(retry_delay)

@app.get("/sse/resilient/{session_id}")
async def resilient_sse(request: Request, session_id: str, last_event_id: int = 0):
    return StreamingResponse(
        resilient_sse_generator(request, session_id, last_event_id),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        }
    )

模式5:带背压控制的SSE

import asyncio
import json
from collections import deque
from datetime import datetime
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse

app = FastAPI()

class BackpressureBuffer:
    def __init__(self, max_size: int = 100, high_watermark: float = 0.8, low_watermark: float = 0.3):
        self.max_size = max_size
        self.high_watermark = int(max_size * high_watermark)
        self.low_watermark = int(max_size * low_watermark)
        self.buffer = deque(maxlen=max_size)
        self.paused = False
        self._not_empty = asyncio.Event()
        self._not_full = asyncio.Event()
        self._not_full.set()

    async def put(self, item: str):
        while len(self.buffer) >= self.max_size:
            self._not_full.clear()
            await self._not_full.wait()
        self.buffer.append(item)
        self._not_empty.set()
        if len(self.buffer) >= self.high_watermark:
            self.paused = True

    async def get(self) -> str:
        while not self.buffer:
            self._not_empty.clear()
            await self._not_empty.wait()
        item = self.buffer.popleft()
        self._not_full.set()
        if len(self.buffer) <= self.low_watermark:
            self.paused = False
        return item

    @property
    def is_paused(self) -> bool:
        return self.paused

    @property
    def size(self) -> int:
        return len(self.buffer)

async def llm_producer(buffer: BackpressureBuffer, messages: list):
    headers = {
        "Authorization": f"Bearer sk-your-api-key",
        "Content-Type": "application/json",
    }
    payload = {
        "model": "gpt-4o",
        "messages": messages,
        "stream": True,
    }
    import httpx
    async with httpx.AsyncClient(timeout=120.0) as client:
        async with client.stream("POST", "https://api.openai.com/v1/chat/completions", json=payload, headers=headers) as response:
            async for line in response.aiter_lines():
                if buffer.is_paused:
                    while buffer.is_paused:
                        await asyncio.sleep(0.1)
                if line.startswith("data: ") and line[6:].strip() != "[DONE]":
                    try:
                        chunk = json.loads(line[6:])
                        content = chunk.get("choices", [{}])[0].get("delta", {}).get("content", "")
                        if content:
                            await buffer.put(content)
                    except json.JSONDecodeError:
                        continue
    await buffer.put("[DONE]")

async def backpressure_sse_generator(request: Request, messages: list):
    buffer = BackpressureBuffer(max_size=100, high_watermark=0.8, low_watermark=0.3)
    producer_task = asyncio.create_task(llm_producer(buffer, messages))

    try:
        while True:
            if await request.is_disconnected():
                producer_task.cancel()
                break
            try:
                item = await asyncio.wait_for(buffer.get(), timeout=5.0)
                if item == "[DONE]":
                    yield "data: [DONE]\n\n"
                    break
                data = json.dumps({
                    "content": item,
                    "buffer_size": buffer.size,
                    "paused": buffer.is_paused,
                }, ensure_ascii=False)
                yield f"data: {data}\n\n"
            except asyncio.TimeoutError:
                yield ": heartbeat\n\n"
    finally:
        if not producer_task.done():
            producer_task.cancel()

@app.post("/sse/backpressure-chat")
async def backpressure_chat(request: Request):
    body = await request.json()
    messages = body.get("messages", [])
    return StreamingResponse(
        backpressure_sse_generator(request, messages),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        }
    )

避坑指南

坑1:忘记设置X-Accel-Buffering: no

# ❌ 错误:Nginx默认缓冲SSE响应,客户端收不到实时数据
return StreamingResponse(generator(), media_type="text/event-stream")

# ✅ 正确:禁用Nginx缓冲
return StreamingResponse(
    generator(),
    media_type="text/event-stream",
    headers={
        "Cache-Control": "no-cache",
        "Connection": "keep-alive",
        "X-Accel-Buffering": "no",
    }
)

坑2:SSE数据格式不规范

# ❌ 错误:data字段没有用双换行结尾
yield f"data: {json.dumps(data)}\n"

# ✅ 正确:每个事件必须以双换行\n\n结尾
yield f"data: {json.dumps(data)}\n\n"

坑3:没有心跳保活机制

# ❌ 错误:长时间无数据导致代理/负载均衡器断开连接
async def generator():
    while True:
        await asyncio.sleep(60)
        yield f"data: {json.dumps(data)}\n\n"

# ✅ 正确:定期发送SSE注释行作为心跳
async def generator():
    last_heartbeat = time.time()
    while True:
        if time.time() - last_heartbeat > 15:
            yield ": heartbeat\n\n"
            last_heartbeat = time.time()
        await asyncio.sleep(1)

坑4:EventSource不支持POST请求

// ❌ 错误:EventSource只支持GET,无法发送消息体
const es = new EventSource('/sse/chat', {
    method: 'POST',
    body: JSON.stringify({ messages })
});

// ✅ 正确:POST场景使用fetch + ReadableStream
const response = await fetch('/sse/chat', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ messages }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    const text = decoder.decode(value);
    const lines = text.split('\n');
    for (const line of lines) {
        if (line.startsWith('data: ')) {
            const data = JSON.parse(line.slice(6));
            processChunk(data);
        }
    }
}

坑5:没有处理客户端断连

# ❌ 错误:客户端已断开,生成器还在运行浪费资源
async def generator():
    while True:
        yield f"data: hello\n\n"
        await asyncio.sleep(1)

# ✅ 正确:检查客户端连接状态
async def generator(request: Request):
    while True:
        if await request.is_disconnected():
            break
        yield f"data: hello\n\n"
        await asyncio.sleep(1)

报错排查

序号 报错信息 原因 解决方法
1 EventSource's response has a MIME type that is not text/event-stream 响应Content-Type不正确 设置media_type="text/event-stream"
2 net::ERR_INCOMPLETE_CHUNKED_ENCODING Nginx缓冲导致分块传输不完整 添加X-Accel-Buffering: no
3 502 Bad Gateway 后端SSE连接超时,网关断开 增加Nginx的proxy_read_timeout
4 Connection refused FastAPI/Flask未启动或端口错误 检查服务端口和防火墙配置
5 429 Too Many Requests LLM API限流 实现指数退避重试,添加请求队列
6 TimeoutError LLM推理超时 增加httpx timeout,添加心跳保活
7 JSONDecodeError SSE数据行不是合法JSON 跳过非data行,添加JSON解析异常处理
8 RuntimeError: Event loop is closed Flask同步框架中使用async生成器 使用asgiref.sync.async_to_sync或切换FastAPI
9 BrokenPipeError 客户端断开后继续写入 检查request.is_disconnected(),捕获BrokenPipeError
10 MemoryError 背压控制缺失,缓冲区无限增长 实现BackpressureBuffer,限制队列大小

进阶优化

1. 多模型统一流式适配器

from abc import ABC, abstractmethod
from typing import AsyncIterator
import json
import httpx

class LLMStreamAdapter(ABC):
    @abstractmethod
    async def stream(self, messages: list, model: str) -> AsyncIterator[str]:
        pass

class OpenAIStreamAdapter(LLMStreamAdapter):
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.openai.com/v1/chat/completions"

    async def stream(self, messages: list, model: str = "gpt-4o") -> AsyncIterator[str]:
        headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"}
        payload = {"model": model, "messages": messages, "stream": True}
        async with httpx.AsyncClient(timeout=120.0) as client:
            async with client.stream("POST", self.base_url, json=payload, headers=headers) as resp:
                async for line in resp.aiter_lines():
                    if line.startswith("data: ") and line[6:].strip() != "[DONE]":
                        chunk = json.loads(line[6:])
                        content = chunk.get("choices", [{}])[0].get("delta", {}).get("content", "")
                        if content:
                            yield content

class AnthropicStreamAdapter(LLMStreamAdapter):
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.anthropic.com/v1/messages"

    async def stream(self, messages: list, model: str = "claude-sonnet-4-20250514") -> AsyncIterator[str]:
        headers = {
            "x-api-key": self.api_key,
            "anthropic-version": "2023-06-01",
            "Content-Type": "application/json",
        }
        system_msg = ""
        user_messages = []
        for m in messages:
            if m["role"] == "system":
                system_msg = m["content"]
            else:
                user_messages.append(m)
        payload = {"model": model, "messages": user_messages, "stream": True, "max_tokens": 4096}
        if system_msg:
            payload["system"] = system_msg
        async with httpx.AsyncClient(timeout=120.0) as client:
            async with client.stream("POST", self.base_url, json=payload, headers=headers) as resp:
                async for line in resp.aiter_lines():
                    if line.startswith("data: "):
                        try:
                            chunk = json.loads(line[6:])
                            if chunk.get("type") == "content_block_delta":
                                text = chunk.get("delta", {}).get("text", "")
                                if text:
                                    yield text
                        except json.JSONDecodeError:
                            continue

class LLMStreamFactory:
    _adapters = {
        "openai": OpenAIStreamAdapter,
        "anthropic": AnthropicStreamAdapter,
    }

    @classmethod
    def create(cls, provider: str, api_key: str) -> LLMStreamAdapter:
        adapter_cls = cls._adapters.get(provider)
        if not adapter_cls:
            raise ValueError(f"Unsupported provider: {provider}")
        return adapter_cls(api_key)

2. SSE连接池管理

import asyncio
import time
from dataclasses import dataclass, field
from typing import Dict, Optional

@dataclass
class SSEConnection:
    session_id: str
    created_at: float = field(default_factory=time.time)
    last_active: float = field(default_factory=time.time)
    is_active: bool = True

class SSEConnectionPool:
    def __init__(self, max_connections: int = 1000, idle_timeout: int = 300):
        self.max_connections = max_connections
        self.idle_timeout = idle_timeout
        self.connections: Dict[str, SSEConnection] = {}
        self._cleanup_task: Optional[asyncio.Task] = None

    async def start(self):
        self._cleanup_task = asyncio.create_task(self._cleanup_loop())

    async def stop(self):
        if self._cleanup_task:
            self._cleanup_task.cancel()

    def register(self, session_id: str) -> bool:
        if len(self.connections) >= self.max_connections:
            return False
        self.connections[session_id] = SSEConnection(session_id=session_id)
        return True

    def heartbeat(self, session_id: str):
        if session_id in self.connections:
            self.connections[session_id].last_active = time.time()

    def disconnect(self, session_id: str):
        if session_id in self.connections:
            self.connections[session_id].is_active = False
            del self.connections[session_id]

    @property
    def active_count(self) -> int:
        return sum(1 for c in self.connections.values() if c.is_active)

    async def _cleanup_loop(self):
        while True:
            now = time.time()
            expired = [
                sid for sid, conn in self.connections.items()
                if now - conn.last_active > self.idle_timeout
            ]
            for sid in expired:
                self.disconnect(sid)
            await asyncio.sleep(30)

3. 前端SSE客户端封装

class SSEClient {
    constructor(url, options = {}) {
        this.url = url;
        this.options = {
            retryInterval: 1000,
            maxRetries: 5,
            heartbeatInterval: 15000,
            ...options,
        };
        this.eventSource = null;
        this.retryCount = 0;
        this.lastEventId = null;
        this.handlers = new Map();
        this._heartbeatTimer = null;
    }

    connect() {
        let url = this.url;
        if (this.lastEventId) {
            url += `?last_event_id=${this.lastEventId}`;
        }

        this.eventSource = new EventSource(url);

        this.eventSource.onopen = () => {
            this.retryCount = 0;
            this._startHeartbeat();
            this._emit('open', {});
        };

        this.eventSource.onmessage = (event) => {
            this.lastEventId = event.lastEventId;
            try {
                const data = JSON.parse(event.data);
                this._emit('message', data);
            } catch {
                this._emit('message', { raw: event.data });
            }
        };

        this.eventSource.onerror = () => {
            this._stopHeartbeat();
            this.eventSource.close();

            if (this.retryCount < this.options.maxRetries) {
                const delay = Math.min(
                    this.options.retryInterval * Math.pow(2, this.retryCount),
                    30000
                );
                this.retryCount++;
                setTimeout(() => this.connect(), delay);
            } else {
                this._emit('error', { message: 'Max retries exceeded' });
            }
        };
    }

    on(event, handler) {
        if (!this.handlers.has(event)) {
            this.handlers.set(event, []);
        }
        this.handlers.get(event).push(handler);
        return this;
    }

    _emit(event, data) {
        const handlers = this.handlers.get(event) || [];
        for (const handler of handlers) {
            handler(data);
        }
    }

    _startHeartbeat() {
        this._stopHeartbeat();
        this._heartbeatTimer = setInterval(() => {
            if (this.eventSource?.readyState === EventSource.OPEN) {
                this._emit('heartbeat', { timestamp: Date.now() });
            }
        }, this.options.heartbeatInterval);
    }

    _stopHeartbeat() {
        if (this._heartbeatTimer) {
            clearInterval(this._heartbeatTimer);
            this._heartbeatTimer = null;
        }
    }

    disconnect() {
        this._stopHeartbeat();
        if (this.eventSource) {
            this.eventSource.close();
            this.eventSource = null;
        }
    }
}

对比分析

维度 SSE WebSocket 长轮询 短轮询 gRPC Stream
传输方向 单向(服务端→客户端) 双向 单向 单向 双向
协议 HTTP/1.1+ WS HTTP HTTP HTTP/2
自动重连 ✅原生 ❌需手动 ❌需手动
浏览器API EventSource WebSocket fetch fetch 需gRPC-Web
CDN/代理友好 ⚠️
二进制支持
LLM适配度 ⭐最佳 ⭐好 ⭐差 ⭐最差 ⭐好
实现复杂度 最低
资源占用 最高

总结:SSE是LLM流式输出的最优解——基于HTTP原生协议,浏览器EventSource自动重连,单向推送完美匹配LLM生成场景。5种实现模式从简到繁:FastAPI基础SSE→Flask SSE→LLM流式对话→错误恢复→背压控制。生产环境必须关注三点:1)Nginx缓冲禁用(X-Accel-Buffering: no),2)心跳保活防止连接断开,3)背压控制防止内存溢出。多模型场景用适配器模式统一抽象,连接池管理控制并发。


在线工具推荐

本站提供浏览器本地工具,免注册即可试用 →

#Python#SSE#流式输出#LLM#Server-Sent Events#2026#实时对话