Python SSEストリーミングLLM出力:Server-Sent Eventsからリアルタイムチャットまで5つの実装パターン

AI与大数据

LLMが一文字ずつ吐き出すのを待つ間、ユーザーはもういない

ChatGPT APIを呼び出し、5秒待って巨大なJSONが返ってくる——ユーザーはページがフリーズしたと思う。ポーリングに切り替えると毎秒リクエスト、サーバーが耐えられない。WebSocketを試すと、各会話で持続的接続が必要、スケーラビリティが最悪。2026年、SSE(Server-Sent Events) こそがLLMストリーミング出力の最適解——単方向プッシュ、自動再接続、HTTPネイティブサポート、CDDフレンドリー。

本記事はSSEプロトコル原理から出発し、FastAPI SSE→Flask SSE→LLMストリーミングチャット→エラー回復→バックプレッシャー制御の5つの実装パターンを完全ガイドします。


SSEプロトコルコア概念

概念 説明
EventSource ブラウザネイティブSSEクライアントAPI、自動再接続
text/event-stream SSEのContent-Type、ブラウザにストリームイベントであることを通知
dataフィールド イベントデータ、各行がdata:で始まる
eventフィールド カスタムイベントタイプ、デフォルトはmessage
idフィールド イベントID、再接続時に前回位置から再開するため
retryフィールド クライアント再接続間隔の提案(ミリ秒)
Chunked Transfer HTTPチャンク転送エンコーディング、SSEの基盤転送メカニズム

SSE vs WebSocket比較

SSEの利点:
1. HTTPベース、プロトコルアップグレード不要、CDN/プロキシフレンドリー
2. ブラウザネイティブEventSource API、自動再接続
3. 単方向プッシュ、LLM出力シナリオに完璧にマッチ
4. テキストプロトコル、デバッグが簡単

WebSocketの利点:
1. 全二重通信、双方向リアルタイムインタラクションに適する
2. バイナリデータサポート
3. より低いフレームオーバーヘッド(2バイト vs SSEテキスト形式)

結論:LLMストリーミング出力は典型的な単方向プッシュシナリオ——SSEが最適

問題分析:LLMストリーミング出力の5つの課題

  1. 接続安定性:LLM推論に長時間(10-60秒)、ネットワークジッターで切断——自動再接続とレジュームが必要
  2. バックプレッシャー制御:LLM生成速度がクライアント消費速度を上回り、メモリオーバーフローリスク
  3. エラー回復:上流LLM APIタイムアウト、レート制限、エラー返却——グレースフルデグラデーションが必要
  4. マルチモデル適応:OpenAI/Anthropic/ローカルモデルのストリーミング形式が異なる——統一抽象化が必要
  5. 並行管理:多数のSSE接続がサーバーリソースを消費——接続プールとタイムアウト管理が必要

ステップバイステップ:5つのSSEストリーミング実装パターン

パターン1:FastAPI基本SSE

import asyncio
import json
from datetime import datetime
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse

app = FastAPI()

async def event_generator(request: Request):
    count = 0
    while True:
        if await request.is_disconnected():
            break
        data = json.dumps({
            "id": count,
            "message": f"Event at {datetime.now().isoformat()}",
            "timestamp": datetime.now().timestamp()
        }, ensure_ascii=False)
        yield f"id: {count}\ndata: {data}\n\n"
        count += 1
        await asyncio.sleep(1)

@app.get("/sse/events")
async def sse_events(request: Request):
    return StreamingResponse(
        event_generator(request),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        }
    )

パターン2:Flask SSE実装

import json
import time
from datetime import datetime
from flask import Flask, Response

app = Flask(__name__)

def stream_events():
    count = 0
    while True:
        data = json.dumps({
            "id": count,
            "message": f"Flask SSE event at {datetime.now().isoformat()}",
        }, ensure_ascii=False)
        yield f"id: {count}\nevent: message\ndata: {data}\n\n"
        count += 1
        time.sleep(1)

@app.route("/sse/events")
def sse_events():
    return Response(
        stream_events(),
        mimetype="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        }
    )

パターン3:LLMストリーミングチャット(OpenAI互換)

import asyncio
import json
import httpx
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse

app = FastAPI()

OPENAI_API_KEY = "sk-your-api-key"
OPENAI_API_URL = "https://api.openai.com/v1/chat/completions"

async def llm_stream_generator(request: Request, messages: list, model: str = "gpt-4o"):
    headers = {
        "Authorization": f"Bearer {OPENAI_API_KEY}",
        "Content-Type": "application/json",
    }
    payload = {
        "model": model,
        "messages": messages,
        "stream": True,
        "temperature": 0.7,
    }

    async with httpx.AsyncClient(timeout=120.0) as client:
        async with client.stream("POST", OPENAI_API_URL, json=payload, headers=headers) as response:
            if response.status_code != 200:
                error_data = await response.aread()
                error_msg = json.dumps({"error": error_data.decode()}, ensure_ascii=False)
                yield f"event: error\ndata: {error_msg}\n\n"
                return

            async for line in response.aiter_lines():
                if await request.is_disconnected():
                    break
                if line.startswith("data: "):
                    data_str = line[6:]
                    if data_str.strip() == "[DONE]":
                        yield f"data: [DONE]\n\n"
                        break
                    try:
                        chunk = json.loads(data_str)
                        delta = chunk.get("choices", [{}])[0].get("delta", {})
                        content = delta.get("content", "")
                        if content:
                            escaped = json.dumps({"content": content}, ensure_ascii=False)
                            yield f"data: {escaped}\n\n"
                    except json.JSONDecodeError:
                        continue

@app.post("/sse/chat")
async def sse_chat(request: Request):
    body = await request.json()
    messages = body.get("messages", [])
    model = body.get("model", "gpt-4o")
    return StreamingResponse(
        llm_stream_generator(request, messages, model),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        }
    )

パターン4:エラー回復付きSSE

import asyncio
import json
from datetime import datetime
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse

app = FastAPI()

class SSEStateManager:
    def __init__(self):
        self.events_store = {}
        self.max_store_size = 1000

    def store_event(self, session_id: str, event_id: int, data: str):
        if session_id not in self.events_store:
            self.events_store[session_id] = {}
        self.events_store[session_id][event_id] = data
        if len(self.events_store[session_id]) > self.max_store_size:
            oldest = min(self.events_store[session_id].keys())
            del self.events_store[session_id][oldest]

    def get_events_after(self, session_id: str, last_event_id: int):
        if session_id not in self.events_store:
            return []
        return [
            (eid, self.events_store[session_id][eid])
            for eid in sorted(self.events_store[session_id].keys())
            if eid > last_event_id
        ]

state_manager = SSEStateManager()

async def resilient_sse_generator(request: Request, session_id: str, last_event_id: int = 0):
    retry_count = 0
    max_retries = 3
    current_id = last_event_id

    missed_events = state_manager.get_events_after(session_id, last_event_id)
    for eid, data in missed_events:
        yield f"id: {eid}\ndata: {data}\n\n"
        current_id = eid

    while True:
        if await request.is_disconnected():
            break
        try:
            current_id += 1
            data = json.dumps({
                "id": current_id,
                "message": f"Resilient event at {datetime.now().isoformat()}",
                "retry_count": retry_count,
            }, ensure_ascii=False)
            state_manager.store_event(session_id, current_id, data)
            yield f"id: {current_id}\ndata: {data}\n\n"
            retry_count = 0
            await asyncio.sleep(1)
        except Exception as e:
            retry_count += 1
            if retry_count > max_retries:
                error_data = json.dumps({"error": str(e), "fatal": True}, ensure_ascii=False)
                yield f"event: error\ndata: {error_data}\n\n"
                break
            retry_delay = min(2 ** retry_count, 30)
            yield f"retry: {retry_delay * 1000}\n\n"
            await asyncio.sleep(retry_delay)

@app.get("/sse/resilient/{session_id}")
async def resilient_sse(request: Request, session_id: str, last_event_id: int = 0):
    return StreamingResponse(
        resilient_sse_generator(request, session_id, last_event_id),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        }
    )

パターン5:バックプレッシャー制御付きSSE

import asyncio
import json
from collections import deque
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse

app = FastAPI()

class BackpressureBuffer:
    def __init__(self, max_size: int = 100, high_watermark: float = 0.8, low_watermark: float = 0.3):
        self.max_size = max_size
        self.high_watermark = int(max_size * high_watermark)
        self.low_watermark = int(max_size * low_watermark)
        self.buffer = deque(maxlen=max_size)
        self.paused = False
        self._not_empty = asyncio.Event()
        self._not_full = asyncio.Event()
        self._not_full.set()

    async def put(self, item: str):
        while len(self.buffer) >= self.max_size:
            self._not_full.clear()
            await self._not_full.wait()
        self.buffer.append(item)
        self._not_empty.set()
        if len(self.buffer) >= self.high_watermark:
            self.paused = True

    async def get(self) -> str:
        while not self.buffer:
            self._not_empty.clear()
            await self._not_empty.wait()
        item = self.buffer.popleft()
        self._not_full.set()
        if len(self.buffer) <= self.low_watermark:
            self.paused = False
        return item

    @property
    def is_paused(self) -> bool:
        return self.paused

    @property
    def size(self) -> int:
        return len(self.buffer)

async def llm_producer(buffer: BackpressureBuffer, messages: list):
    import httpx
    headers = {
        "Authorization": f"Bearer sk-your-api-key",
        "Content-Type": "application/json",
    }
    payload = {"model": "gpt-4o", "messages": messages, "stream": True}
    async with httpx.AsyncClient(timeout=120.0) as client:
        async with client.stream("POST", "https://api.openai.com/v1/chat/completions", json=payload, headers=headers) as response:
            async for line in response.aiter_lines():
                if buffer.is_paused:
                    while buffer.is_paused:
                        await asyncio.sleep(0.1)
                if line.startswith("data: ") and line[6:].strip() != "[DONE]":
                    try:
                        chunk = json.loads(line[6:])
                        content = chunk.get("choices", [{}])[0].get("delta", {}).get("content", "")
                        if content:
                            await buffer.put(content)
                    except json.JSONDecodeError:
                        continue
    await buffer.put("[DONE]")

async def backpressure_sse_generator(request: Request, messages: list):
    buffer = BackpressureBuffer(max_size=100, high_watermark=0.8, low_watermark=0.3)
    producer_task = asyncio.create_task(llm_producer(buffer, messages))

    try:
        while True:
            if await request.is_disconnected():
                producer_task.cancel()
                break
            try:
                item = await asyncio.wait_for(buffer.get(), timeout=5.0)
                if item == "[DONE]":
                    yield "data: [DONE]\n\n"
                    break
                data = json.dumps({
                    "content": item,
                    "buffer_size": buffer.size,
                    "paused": buffer.is_paused,
                }, ensure_ascii=False)
                yield f"data: {data}\n\n"
            except asyncio.TimeoutError:
                yield ": heartbeat\n\n"
    finally:
        if not producer_task.done():
            producer_task.cancel()

@app.post("/sse/backpressure-chat")
async def backpressure_chat(request: Request):
    body = await request.json()
    messages = body.get("messages", [])
    return StreamingResponse(
        backpressure_sse_generator(request, messages),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        }
    )

落とし穴ガイド

落とし穴1:X-Accel-Buffering: noの設定忘れ

# ❌ 誤り:NginxがデフォルトでSSEレスポンスをバッファ、クライアントがリアルタイムデータを受信できない
return StreamingResponse(generator(), media_type="text/event-stream")

# ✅ 正しい:Nginxバッファリングを無効化
return StreamingResponse(
    generator(),
    media_type="text/event-stream",
    headers={
        "Cache-Control": "no-cache",
        "Connection": "keep-alive",
        "X-Accel-Buffering": "no",
    }
)

落とし穴2:SSEデータ形式が非標準

# ❌ 誤り:dataフィールドがダブル改行で終わっていない
yield f"data: {json.dumps(data)}\n"

# ✅ 正しい:各イベントはダブル改行\n\nで終了する必要がある
yield f"data: {json.dumps(data)}\n\n"

落とし穴3:ハートビートキープアライブがない

# ❌ 誤り:長時間データなしでプロキシ/ロードバランサーが接続を切断
async def generator():
    while True:
        await asyncio.sleep(60)
        yield f"data: {json.dumps(data)}\n\n"

# ✅ 正しい:SSEコメント行をハートビートとして定期的に送信
async def generator():
    last_heartbeat = time.time()
    while True:
        if time.time() - last_heartbeat > 15:
            yield ": heartbeat\n\n"
            last_heartbeat = time.time()
        await asyncio.sleep(1)

落とし穴4:EventSourceはPOSTリクエストをサポートしない

// ❌ 誤り:EventSourceはGETのみ、リクエストボディを送信できない
const es = new EventSource('/sse/chat', {
    method: 'POST',
    body: JSON.stringify({ messages })
});

// ✅ 正しい:POSTシナリオではfetch + ReadableStreamを使用
const response = await fetch('/sse/chat', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ messages }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    const text = decoder.decode(value);
    const lines = text.split('\n');
    for (const line of lines) {
        if (line.startsWith('data: ')) {
            const data = JSON.parse(line.slice(6));
            processChunk(data);
        }
    }
}

落とし穴5:クライアント切断の処理なし

# ❌ 誤り:クライアントが切断してもジェネレーターが実行されリソースを浪費
async def generator():
    while True:
        yield f"data: hello\n\n"
        await asyncio.sleep(1)

# ✅ 正しい:クライアント接続状態をチェック
async def generator(request: Request):
    while True:
        if await request.is_disconnected():
            break
        yield f"data: hello\n\n"
        await asyncio.sleep(1)

エラートラブルシューティング

# エラーメッセージ 原因 解決方法
1 EventSource's response has a MIME type that is not text/event-stream レスポンスContent-Typeが不正 media_type="text/event-stream"を設定
2 net::ERR_INCOMPLETE_CHUNKED_ENCODING Nginxバッファリングでチャンク転送が不完全 X-Accel-Buffering: noヘッダーを追加
3 502 Bad Gateway バックエンドSSE接続タイムアウト、ゲートウェイ切断 Nginxのproxy_read_timeoutを増加
4 Connection refused FastAPI/Flaskが未起動またはポートエラー サービスポートとファイアウォールを確認
5 429 Too Many Requests LLM APIレート制限 指数バックオフリトライを実装、リクエストキューを追加
6 TimeoutError LLM推論タイムアウト httpx timeoutを増加、ハートビートキープアライブを追加
7 JSONDecodeError SSEデータ行が有効なJSONではない 非data行をスキップ、JSONパースエラー処理を追加
8 RuntimeError: Event loop is closed Flask同期フレームワークでasyncジェネレーター使用 asgiref.sync.async_to_syncを使用またはFastAPIに切り替え
9 BrokenPipeError クライアント切断後に書き込み継続 request.is_disconnected()を確認、BrokenPipeErrorをキャッチ
10 MemoryError バックプレッシャー制御なし、バッファが無限成長 BackpressureBufferを実装、キューサイズを制限

高度な最適化

1. マルチモデル統一ストリーミングアダプター

from abc import ABC, abstractmethod
from typing import AsyncIterator
import json
import httpx

class LLMStreamAdapter(ABC):
    @abstractmethod
    async def stream(self, messages: list, model: str) -> AsyncIterator[str]:
        pass

class OpenAIStreamAdapter(LLMStreamAdapter):
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.openai.com/v1/chat/completions"

    async def stream(self, messages: list, model: str = "gpt-4o") -> AsyncIterator[str]:
        headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"}
        payload = {"model": model, "messages": messages, "stream": True}
        async with httpx.AsyncClient(timeout=120.0) as client:
            async with client.stream("POST", self.base_url, json=payload, headers=headers) as resp:
                async for line in resp.aiter_lines():
                    if line.startswith("data: ") and line[6:].strip() != "[DONE]":
                        chunk = json.loads(line[6:])
                        content = chunk.get("choices", [{}])[0].get("delta", {}).get("content", "")
                        if content:
                            yield content

class AnthropicStreamAdapter(LLMStreamAdapter):
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.anthropic.com/v1/messages"

    async def stream(self, messages: list, model: str = "claude-sonnet-4-20250514") -> AsyncIterator[str]:
        headers = {
            "x-api-key": self.api_key,
            "anthropic-version": "2023-06-01",
            "Content-Type": "application/json",
        }
        system_msg = ""
        user_messages = []
        for m in messages:
            if m["role"] == "system":
                system_msg = m["content"]
            else:
                user_messages.append(m)
        payload = {"model": model, "messages": user_messages, "stream": True, "max_tokens": 4096}
        if system_msg:
            payload["system"] = system_msg
        async with httpx.AsyncClient(timeout=120.0) as client:
            async with client.stream("POST", self.base_url, json=payload, headers=headers) as resp:
                async for line in resp.aiter_lines():
                    if line.startswith("data: "):
                        try:
                            chunk = json.loads(line[6:])
                            if chunk.get("type") == "content_block_delta":
                                text = chunk.get("delta", {}).get("text", "")
                                if text:
                                    yield text
                        except json.JSONDecodeError:
                            continue

class LLMStreamFactory:
    _adapters = {
        "openai": OpenAIStreamAdapter,
        "anthropic": AnthropicStreamAdapter,
    }

    @classmethod
    def create(cls, provider: str, api_key: str) -> LLMStreamAdapter:
        adapter_cls = cls._adapters.get(provider)
        if not adapter_cls:
            raise ValueError(f"Unsupported provider: {provider}")
        return adapter_cls(api_key)

2. SSE接続プール管理

import asyncio
import time
from dataclasses import dataclass, field
from typing import Dict, Optional

@dataclass
class SSEConnection:
    session_id: str
    created_at: float = field(default_factory=time.time)
    last_active: float = field(default_factory=time.time)
    is_active: bool = True

class SSEConnectionPool:
    def __init__(self, max_connections: int = 1000, idle_timeout: int = 300):
        self.max_connections = max_connections
        self.idle_timeout = idle_timeout
        self.connections: Dict[str, SSEConnection] = {}
        self._cleanup_task: Optional[asyncio.Task] = None

    async def start(self):
        self._cleanup_task = asyncio.create_task(self._cleanup_loop())

    async def stop(self):
        if self._cleanup_task:
            self._cleanup_task.cancel()

    def register(self, session_id: str) -> bool:
        if len(self.connections) >= self.max_connections:
            return False
        self.connections[session_id] = SSEConnection(session_id=session_id)
        return True

    def heartbeat(self, session_id: str):
        if session_id in self.connections:
            self.connections[session_id].last_active = time.time()

    def disconnect(self, session_id: str):
        if session_id in self.connections:
            self.connections[session_id].is_active = False
            del self.connections[session_id]

    @property
    def active_count(self) -> int:
        return sum(1 for c in self.connections.values() if c.is_active)

    async def _cleanup_loop(self):
        while True:
            now = time.time()
            expired = [
                sid for sid, conn in self.connections.items()
                if now - conn.last_active > self.idle_timeout
            ]
            for sid in expired:
                self.disconnect(sid)
            await asyncio.sleep(30)

3. フロントエンドSSEクライアントラッパー

class SSEClient {
    constructor(url, options = {}) {
        this.url = url;
        this.options = {
            retryInterval: 1000,
            maxRetries: 5,
            heartbeatInterval: 15000,
            ...options,
        };
        this.eventSource = null;
        this.retryCount = 0;
        this.lastEventId = null;
        this.handlers = new Map();
        this._heartbeatTimer = null;
    }

    connect() {
        let url = this.url;
        if (this.lastEventId) {
            url += `?last_event_id=${this.lastEventId}`;
        }

        this.eventSource = new EventSource(url);

        this.eventSource.onopen = () => {
            this.retryCount = 0;
            this._startHeartbeat();
            this._emit('open', {});
        };

        this.eventSource.onmessage = (event) => {
            this.lastEventId = event.lastEventId;
            try {
                const data = JSON.parse(event.data);
                this._emit('message', data);
            } catch {
                this._emit('message', { raw: event.data });
            }
        };

        this.eventSource.onerror = () => {
            this._stopHeartbeat();
            this.eventSource.close();

            if (this.retryCount < this.options.maxRetries) {
                const delay = Math.min(
                    this.options.retryInterval * Math.pow(2, this.retryCount),
                    30000
                );
                this.retryCount++;
                setTimeout(() => this.connect(), delay);
            } else {
                this._emit('error', { message: 'Max retries exceeded' });
            }
        };
    }

    on(event, handler) {
        if (!this.handlers.has(event)) {
            this.handlers.set(event, []);
        }
        this.handlers.get(event).push(handler);
        return this;
    }

    _emit(event, data) {
        const handlers = this.handlers.get(event) || [];
        for (const handler of handlers) {
            handler(data);
        }
    }

    _startHeartbeat() {
        this._stopHeartbeat();
        this._heartbeatTimer = setInterval(() => {
            if (this.eventSource?.readyState === EventSource.OPEN) {
                this._emit('heartbeat', { timestamp: Date.now() });
            }
        }, this.options.heartbeatInterval);
    }

    _stopHeartbeat() {
        if (this._heartbeatTimer) {
            clearInterval(this._heartbeatTimer);
            this._heartbeatTimer = null;
        }
    }

    disconnect() {
        this._stopHeartbeat();
        if (this.eventSource) {
            this.eventSource.close();
            this.eventSource = null;
        }
    }
}

比較分析

次元 SSE WebSocket ロングポーリング ショートポーリング gRPC Stream
方向 単方向(サーバー→クライアント) 双方向 単方向 単方向 双方向
プロトコル HTTP/1.1+ WS HTTP HTTP HTTP/2
自動再接続 ✅ネイティブ ❌手動 ❌手動
ブラウザAPI EventSource WebSocket fetch fetch gRPC-Web必要
CDN/プロキシ ⚠️
バイナリ
LLM適合度 ⭐最適 ⭐良い ⭐悪い ⭐最悪 ⭐良い
複雑さ 最低
リソース使用 最高

まとめ:SSEはLLMストリーミング出力の最適解——HTTPネイティブプロトコル、ブラウザEventSource自動再接続、単方向プッシュがLLM生成シナリオに完璧にマッチ。5つの実装パターンはシンプルから複雑へ:FastAPI基本SSE→Flask SSE→LLMストリーミングチャット→エラー回復→バックプレッシャー制御。本番環境で注目すべき3点:1)Nginxバッファリング無効化(X-Accel-Buffering: no)、2)ハートビートキープアライブで接続切断防止、3)バックプレッシャー制御でメモリオーバーフロー防止。マルチモデルシナリオではアダプターパターンで統一抽象化、接続プールで並行管理。


オンラインツール推奨

ブラウザローカルツールを無料で試す →

#Python#SSE#流式输出#LLM#Server-Sent Events#2026#实时对话