Python SSEストリーミングLLM出力:Server-Sent Eventsからリアルタイムチャットまで5つの実装パターン
AI与大数据
LLMが一文字ずつ吐き出すのを待つ間、ユーザーはもういない
ChatGPT APIを呼び出し、5秒待って巨大なJSONが返ってくる——ユーザーはページがフリーズしたと思う。ポーリングに切り替えると毎秒リクエスト、サーバーが耐えられない。WebSocketを試すと、各会話で持続的接続が必要、スケーラビリティが最悪。2026年、SSE(Server-Sent Events) こそがLLMストリーミング出力の最適解——単方向プッシュ、自動再接続、HTTPネイティブサポート、CDDフレンドリー。
本記事はSSEプロトコル原理から出発し、FastAPI SSE→Flask SSE→LLMストリーミングチャット→エラー回復→バックプレッシャー制御の5つの実装パターンを完全ガイドします。
SSEプロトコルコア概念
| 概念 | 説明 |
|---|---|
| EventSource | ブラウザネイティブSSEクライアントAPI、自動再接続 |
| text/event-stream | SSEのContent-Type、ブラウザにストリームイベントであることを通知 |
| dataフィールド | イベントデータ、各行がdata:で始まる |
| eventフィールド | カスタムイベントタイプ、デフォルトはmessage |
| idフィールド | イベントID、再接続時に前回位置から再開するため |
| retryフィールド | クライアント再接続間隔の提案(ミリ秒) |
| Chunked Transfer | HTTPチャンク転送エンコーディング、SSEの基盤転送メカニズム |
SSE vs WebSocket比較
SSEの利点:
1. HTTPベース、プロトコルアップグレード不要、CDN/プロキシフレンドリー
2. ブラウザネイティブEventSource API、自動再接続
3. 単方向プッシュ、LLM出力シナリオに完璧にマッチ
4. テキストプロトコル、デバッグが簡単
WebSocketの利点:
1. 全二重通信、双方向リアルタイムインタラクションに適する
2. バイナリデータサポート
3. より低いフレームオーバーヘッド(2バイト vs SSEテキスト形式)
結論:LLMストリーミング出力は典型的な単方向プッシュシナリオ——SSEが最適
問題分析:LLMストリーミング出力の5つの課題
- 接続安定性:LLM推論に長時間(10-60秒)、ネットワークジッターで切断——自動再接続とレジュームが必要
- バックプレッシャー制御:LLM生成速度がクライアント消費速度を上回り、メモリオーバーフローリスク
- エラー回復:上流LLM APIタイムアウト、レート制限、エラー返却——グレースフルデグラデーションが必要
- マルチモデル適応:OpenAI/Anthropic/ローカルモデルのストリーミング形式が異なる——統一抽象化が必要
- 並行管理:多数のSSE接続がサーバーリソースを消費——接続プールとタイムアウト管理が必要
ステップバイステップ:5つのSSEストリーミング実装パターン
パターン1:FastAPI基本SSE
import asyncio
import json
from datetime import datetime
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
app = FastAPI()
async def event_generator(request: Request):
count = 0
while True:
if await request.is_disconnected():
break
data = json.dumps({
"id": count,
"message": f"Event at {datetime.now().isoformat()}",
"timestamp": datetime.now().timestamp()
}, ensure_ascii=False)
yield f"id: {count}\ndata: {data}\n\n"
count += 1
await asyncio.sleep(1)
@app.get("/sse/events")
async def sse_events(request: Request):
return StreamingResponse(
event_generator(request),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
パターン2:Flask SSE実装
import json
import time
from datetime import datetime
from flask import Flask, Response
app = Flask(__name__)
def stream_events():
count = 0
while True:
data = json.dumps({
"id": count,
"message": f"Flask SSE event at {datetime.now().isoformat()}",
}, ensure_ascii=False)
yield f"id: {count}\nevent: message\ndata: {data}\n\n"
count += 1
time.sleep(1)
@app.route("/sse/events")
def sse_events():
return Response(
stream_events(),
mimetype="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
パターン3:LLMストリーミングチャット(OpenAI互換)
import asyncio
import json
import httpx
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
app = FastAPI()
OPENAI_API_KEY = "sk-your-api-key"
OPENAI_API_URL = "https://api.openai.com/v1/chat/completions"
async def llm_stream_generator(request: Request, messages: list, model: str = "gpt-4o"):
headers = {
"Authorization": f"Bearer {OPENAI_API_KEY}",
"Content-Type": "application/json",
}
payload = {
"model": model,
"messages": messages,
"stream": True,
"temperature": 0.7,
}
async with httpx.AsyncClient(timeout=120.0) as client:
async with client.stream("POST", OPENAI_API_URL, json=payload, headers=headers) as response:
if response.status_code != 200:
error_data = await response.aread()
error_msg = json.dumps({"error": error_data.decode()}, ensure_ascii=False)
yield f"event: error\ndata: {error_msg}\n\n"
return
async for line in response.aiter_lines():
if await request.is_disconnected():
break
if line.startswith("data: "):
data_str = line[6:]
if data_str.strip() == "[DONE]":
yield f"data: [DONE]\n\n"
break
try:
chunk = json.loads(data_str)
delta = chunk.get("choices", [{}])[0].get("delta", {})
content = delta.get("content", "")
if content:
escaped = json.dumps({"content": content}, ensure_ascii=False)
yield f"data: {escaped}\n\n"
except json.JSONDecodeError:
continue
@app.post("/sse/chat")
async def sse_chat(request: Request):
body = await request.json()
messages = body.get("messages", [])
model = body.get("model", "gpt-4o")
return StreamingResponse(
llm_stream_generator(request, messages, model),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
パターン4:エラー回復付きSSE
import asyncio
import json
from datetime import datetime
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
app = FastAPI()
class SSEStateManager:
def __init__(self):
self.events_store = {}
self.max_store_size = 1000
def store_event(self, session_id: str, event_id: int, data: str):
if session_id not in self.events_store:
self.events_store[session_id] = {}
self.events_store[session_id][event_id] = data
if len(self.events_store[session_id]) > self.max_store_size:
oldest = min(self.events_store[session_id].keys())
del self.events_store[session_id][oldest]
def get_events_after(self, session_id: str, last_event_id: int):
if session_id not in self.events_store:
return []
return [
(eid, self.events_store[session_id][eid])
for eid in sorted(self.events_store[session_id].keys())
if eid > last_event_id
]
state_manager = SSEStateManager()
async def resilient_sse_generator(request: Request, session_id: str, last_event_id: int = 0):
retry_count = 0
max_retries = 3
current_id = last_event_id
missed_events = state_manager.get_events_after(session_id, last_event_id)
for eid, data in missed_events:
yield f"id: {eid}\ndata: {data}\n\n"
current_id = eid
while True:
if await request.is_disconnected():
break
try:
current_id += 1
data = json.dumps({
"id": current_id,
"message": f"Resilient event at {datetime.now().isoformat()}",
"retry_count": retry_count,
}, ensure_ascii=False)
state_manager.store_event(session_id, current_id, data)
yield f"id: {current_id}\ndata: {data}\n\n"
retry_count = 0
await asyncio.sleep(1)
except Exception as e:
retry_count += 1
if retry_count > max_retries:
error_data = json.dumps({"error": str(e), "fatal": True}, ensure_ascii=False)
yield f"event: error\ndata: {error_data}\n\n"
break
retry_delay = min(2 ** retry_count, 30)
yield f"retry: {retry_delay * 1000}\n\n"
await asyncio.sleep(retry_delay)
@app.get("/sse/resilient/{session_id}")
async def resilient_sse(request: Request, session_id: str, last_event_id: int = 0):
return StreamingResponse(
resilient_sse_generator(request, session_id, last_event_id),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
パターン5:バックプレッシャー制御付きSSE
import asyncio
import json
from collections import deque
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
app = FastAPI()
class BackpressureBuffer:
def __init__(self, max_size: int = 100, high_watermark: float = 0.8, low_watermark: float = 0.3):
self.max_size = max_size
self.high_watermark = int(max_size * high_watermark)
self.low_watermark = int(max_size * low_watermark)
self.buffer = deque(maxlen=max_size)
self.paused = False
self._not_empty = asyncio.Event()
self._not_full = asyncio.Event()
self._not_full.set()
async def put(self, item: str):
while len(self.buffer) >= self.max_size:
self._not_full.clear()
await self._not_full.wait()
self.buffer.append(item)
self._not_empty.set()
if len(self.buffer) >= self.high_watermark:
self.paused = True
async def get(self) -> str:
while not self.buffer:
self._not_empty.clear()
await self._not_empty.wait()
item = self.buffer.popleft()
self._not_full.set()
if len(self.buffer) <= self.low_watermark:
self.paused = False
return item
@property
def is_paused(self) -> bool:
return self.paused
@property
def size(self) -> int:
return len(self.buffer)
async def llm_producer(buffer: BackpressureBuffer, messages: list):
import httpx
headers = {
"Authorization": f"Bearer sk-your-api-key",
"Content-Type": "application/json",
}
payload = {"model": "gpt-4o", "messages": messages, "stream": True}
async with httpx.AsyncClient(timeout=120.0) as client:
async with client.stream("POST", "https://api.openai.com/v1/chat/completions", json=payload, headers=headers) as response:
async for line in response.aiter_lines():
if buffer.is_paused:
while buffer.is_paused:
await asyncio.sleep(0.1)
if line.startswith("data: ") and line[6:].strip() != "[DONE]":
try:
chunk = json.loads(line[6:])
content = chunk.get("choices", [{}])[0].get("delta", {}).get("content", "")
if content:
await buffer.put(content)
except json.JSONDecodeError:
continue
await buffer.put("[DONE]")
async def backpressure_sse_generator(request: Request, messages: list):
buffer = BackpressureBuffer(max_size=100, high_watermark=0.8, low_watermark=0.3)
producer_task = asyncio.create_task(llm_producer(buffer, messages))
try:
while True:
if await request.is_disconnected():
producer_task.cancel()
break
try:
item = await asyncio.wait_for(buffer.get(), timeout=5.0)
if item == "[DONE]":
yield "data: [DONE]\n\n"
break
data = json.dumps({
"content": item,
"buffer_size": buffer.size,
"paused": buffer.is_paused,
}, ensure_ascii=False)
yield f"data: {data}\n\n"
except asyncio.TimeoutError:
yield ": heartbeat\n\n"
finally:
if not producer_task.done():
producer_task.cancel()
@app.post("/sse/backpressure-chat")
async def backpressure_chat(request: Request):
body = await request.json()
messages = body.get("messages", [])
return StreamingResponse(
backpressure_sse_generator(request, messages),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
落とし穴ガイド
落とし穴1:X-Accel-Buffering: noの設定忘れ
# ❌ 誤り:NginxがデフォルトでSSEレスポンスをバッファ、クライアントがリアルタイムデータを受信できない
return StreamingResponse(generator(), media_type="text/event-stream")
# ✅ 正しい:Nginxバッファリングを無効化
return StreamingResponse(
generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
落とし穴2:SSEデータ形式が非標準
# ❌ 誤り:dataフィールドがダブル改行で終わっていない
yield f"data: {json.dumps(data)}\n"
# ✅ 正しい:各イベントはダブル改行\n\nで終了する必要がある
yield f"data: {json.dumps(data)}\n\n"
落とし穴3:ハートビートキープアライブがない
# ❌ 誤り:長時間データなしでプロキシ/ロードバランサーが接続を切断
async def generator():
while True:
await asyncio.sleep(60)
yield f"data: {json.dumps(data)}\n\n"
# ✅ 正しい:SSEコメント行をハートビートとして定期的に送信
async def generator():
last_heartbeat = time.time()
while True:
if time.time() - last_heartbeat > 15:
yield ": heartbeat\n\n"
last_heartbeat = time.time()
await asyncio.sleep(1)
落とし穴4:EventSourceはPOSTリクエストをサポートしない
// ❌ 誤り:EventSourceはGETのみ、リクエストボディを送信できない
const es = new EventSource('/sse/chat', {
method: 'POST',
body: JSON.stringify({ messages })
});
// ✅ 正しい:POSTシナリオではfetch + ReadableStreamを使用
const response = await fetch('/sse/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ messages }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
const lines = text.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = JSON.parse(line.slice(6));
processChunk(data);
}
}
}
落とし穴5:クライアント切断の処理なし
# ❌ 誤り:クライアントが切断してもジェネレーターが実行されリソースを浪費
async def generator():
while True:
yield f"data: hello\n\n"
await asyncio.sleep(1)
# ✅ 正しい:クライアント接続状態をチェック
async def generator(request: Request):
while True:
if await request.is_disconnected():
break
yield f"data: hello\n\n"
await asyncio.sleep(1)
エラートラブルシューティング
| # | エラーメッセージ | 原因 | 解決方法 |
|---|---|---|---|
| 1 | EventSource's response has a MIME type that is not text/event-stream |
レスポンスContent-Typeが不正 | media_type="text/event-stream"を設定 |
| 2 | net::ERR_INCOMPLETE_CHUNKED_ENCODING |
Nginxバッファリングでチャンク転送が不完全 | X-Accel-Buffering: noヘッダーを追加 |
| 3 | 502 Bad Gateway |
バックエンドSSE接続タイムアウト、ゲートウェイ切断 | Nginxのproxy_read_timeoutを増加 |
| 4 | Connection refused |
FastAPI/Flaskが未起動またはポートエラー | サービスポートとファイアウォールを確認 |
| 5 | 429 Too Many Requests |
LLM APIレート制限 | 指数バックオフリトライを実装、リクエストキューを追加 |
| 6 | TimeoutError |
LLM推論タイムアウト | httpx timeoutを増加、ハートビートキープアライブを追加 |
| 7 | JSONDecodeError |
SSEデータ行が有効なJSONではない | 非data行をスキップ、JSONパースエラー処理を追加 |
| 8 | RuntimeError: Event loop is closed |
Flask同期フレームワークでasyncジェネレーター使用 | asgiref.sync.async_to_syncを使用またはFastAPIに切り替え |
| 9 | BrokenPipeError |
クライアント切断後に書き込み継続 | request.is_disconnected()を確認、BrokenPipeErrorをキャッチ |
| 10 | MemoryError |
バックプレッシャー制御なし、バッファが無限成長 | BackpressureBufferを実装、キューサイズを制限 |
高度な最適化
1. マルチモデル統一ストリーミングアダプター
from abc import ABC, abstractmethod
from typing import AsyncIterator
import json
import httpx
class LLMStreamAdapter(ABC):
@abstractmethod
async def stream(self, messages: list, model: str) -> AsyncIterator[str]:
pass
class OpenAIStreamAdapter(LLMStreamAdapter):
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.openai.com/v1/chat/completions"
async def stream(self, messages: list, model: str = "gpt-4o") -> AsyncIterator[str]:
headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"}
payload = {"model": model, "messages": messages, "stream": True}
async with httpx.AsyncClient(timeout=120.0) as client:
async with client.stream("POST", self.base_url, json=payload, headers=headers) as resp:
async for line in resp.aiter_lines():
if line.startswith("data: ") and line[6:].strip() != "[DONE]":
chunk = json.loads(line[6:])
content = chunk.get("choices", [{}])[0].get("delta", {}).get("content", "")
if content:
yield content
class AnthropicStreamAdapter(LLMStreamAdapter):
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.anthropic.com/v1/messages"
async def stream(self, messages: list, model: str = "claude-sonnet-4-20250514") -> AsyncIterator[str]:
headers = {
"x-api-key": self.api_key,
"anthropic-version": "2023-06-01",
"Content-Type": "application/json",
}
system_msg = ""
user_messages = []
for m in messages:
if m["role"] == "system":
system_msg = m["content"]
else:
user_messages.append(m)
payload = {"model": model, "messages": user_messages, "stream": True, "max_tokens": 4096}
if system_msg:
payload["system"] = system_msg
async with httpx.AsyncClient(timeout=120.0) as client:
async with client.stream("POST", self.base_url, json=payload, headers=headers) as resp:
async for line in resp.aiter_lines():
if line.startswith("data: "):
try:
chunk = json.loads(line[6:])
if chunk.get("type") == "content_block_delta":
text = chunk.get("delta", {}).get("text", "")
if text:
yield text
except json.JSONDecodeError:
continue
class LLMStreamFactory:
_adapters = {
"openai": OpenAIStreamAdapter,
"anthropic": AnthropicStreamAdapter,
}
@classmethod
def create(cls, provider: str, api_key: str) -> LLMStreamAdapter:
adapter_cls = cls._adapters.get(provider)
if not adapter_cls:
raise ValueError(f"Unsupported provider: {provider}")
return adapter_cls(api_key)
2. SSE接続プール管理
import asyncio
import time
from dataclasses import dataclass, field
from typing import Dict, Optional
@dataclass
class SSEConnection:
session_id: str
created_at: float = field(default_factory=time.time)
last_active: float = field(default_factory=time.time)
is_active: bool = True
class SSEConnectionPool:
def __init__(self, max_connections: int = 1000, idle_timeout: int = 300):
self.max_connections = max_connections
self.idle_timeout = idle_timeout
self.connections: Dict[str, SSEConnection] = {}
self._cleanup_task: Optional[asyncio.Task] = None
async def start(self):
self._cleanup_task = asyncio.create_task(self._cleanup_loop())
async def stop(self):
if self._cleanup_task:
self._cleanup_task.cancel()
def register(self, session_id: str) -> bool:
if len(self.connections) >= self.max_connections:
return False
self.connections[session_id] = SSEConnection(session_id=session_id)
return True
def heartbeat(self, session_id: str):
if session_id in self.connections:
self.connections[session_id].last_active = time.time()
def disconnect(self, session_id: str):
if session_id in self.connections:
self.connections[session_id].is_active = False
del self.connections[session_id]
@property
def active_count(self) -> int:
return sum(1 for c in self.connections.values() if c.is_active)
async def _cleanup_loop(self):
while True:
now = time.time()
expired = [
sid for sid, conn in self.connections.items()
if now - conn.last_active > self.idle_timeout
]
for sid in expired:
self.disconnect(sid)
await asyncio.sleep(30)
3. フロントエンドSSEクライアントラッパー
class SSEClient {
constructor(url, options = {}) {
this.url = url;
this.options = {
retryInterval: 1000,
maxRetries: 5,
heartbeatInterval: 15000,
...options,
};
this.eventSource = null;
this.retryCount = 0;
this.lastEventId = null;
this.handlers = new Map();
this._heartbeatTimer = null;
}
connect() {
let url = this.url;
if (this.lastEventId) {
url += `?last_event_id=${this.lastEventId}`;
}
this.eventSource = new EventSource(url);
this.eventSource.onopen = () => {
this.retryCount = 0;
this._startHeartbeat();
this._emit('open', {});
};
this.eventSource.onmessage = (event) => {
this.lastEventId = event.lastEventId;
try {
const data = JSON.parse(event.data);
this._emit('message', data);
} catch {
this._emit('message', { raw: event.data });
}
};
this.eventSource.onerror = () => {
this._stopHeartbeat();
this.eventSource.close();
if (this.retryCount < this.options.maxRetries) {
const delay = Math.min(
this.options.retryInterval * Math.pow(2, this.retryCount),
30000
);
this.retryCount++;
setTimeout(() => this.connect(), delay);
} else {
this._emit('error', { message: 'Max retries exceeded' });
}
};
}
on(event, handler) {
if (!this.handlers.has(event)) {
this.handlers.set(event, []);
}
this.handlers.get(event).push(handler);
return this;
}
_emit(event, data) {
const handlers = this.handlers.get(event) || [];
for (const handler of handlers) {
handler(data);
}
}
_startHeartbeat() {
this._stopHeartbeat();
this._heartbeatTimer = setInterval(() => {
if (this.eventSource?.readyState === EventSource.OPEN) {
this._emit('heartbeat', { timestamp: Date.now() });
}
}, this.options.heartbeatInterval);
}
_stopHeartbeat() {
if (this._heartbeatTimer) {
clearInterval(this._heartbeatTimer);
this._heartbeatTimer = null;
}
}
disconnect() {
this._stopHeartbeat();
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
}
}
比較分析
| 次元 | SSE | WebSocket | ロングポーリング | ショートポーリング | gRPC Stream |
|---|---|---|---|---|---|
| 方向 | 単方向(サーバー→クライアント) | 双方向 | 単方向 | 単方向 | 双方向 |
| プロトコル | HTTP/1.1+ | WS | HTTP | HTTP | HTTP/2 |
| 自動再接続 | ✅ネイティブ | ❌手動 | ✅ | ✅ | ❌手動 |
| ブラウザAPI | EventSource | WebSocket | fetch | fetch | gRPC-Web必要 |
| CDN/プロキシ | ✅ | ❌ | ✅ | ✅ | ⚠️ |
| バイナリ | ❌ | ✅ | ❌ | ❌ | ✅ |
| LLM適合度 | ⭐最適 | ⭐良い | ⭐悪い | ⭐最悪 | ⭐良い |
| 複雑さ | 低 | 中 | 低 | 最低 | 高 |
| リソース使用 | 低 | 中 | 高 | 最高 | 低 |
まとめ:SSEはLLMストリーミング出力の最適解——HTTPネイティブプロトコル、ブラウザEventSource自動再接続、単方向プッシュがLLM生成シナリオに完璧にマッチ。5つの実装パターンはシンプルから複雑へ:FastAPI基本SSE→Flask SSE→LLMストリーミングチャット→エラー回復→バックプレッシャー制御。本番環境で注目すべき3点:1)Nginxバッファリング無効化(
X-Accel-Buffering: no)、2)ハートビートキープアライブで接続切断防止、3)バックプレッシャー制御でメモリオーバーフロー防止。マルチモデルシナリオではアダプターパターンで統一抽象化、接続プールで並行管理。
オンラインツール推奨
- JSONフォーマッター:/ja/json/format
- Base64エンコード/デコード:/ja/encode/base64
- Hash計算:/ja/encode/hash
ブラウザローカルツールを無料で試す →
#Python#SSE#流式输出#LLM#Server-Sent Events#2026#实时对话