Rust Axumストリーミングレスポンス:SSEからWebSocketまでの6つのプロダクションパターン

编程语言

全データの読み込みを待ってからレスポンス?ユーザーはもうページを閉じてる

10万件のログを返すAPIを作ったら、クライアントは30秒待って完全なJSONを受け取り、メモリは2GBに急増;リアルタイム通知のためにポーリングで毎秒リクエストしたら、サーバーのQPSが10倍に;WebSocketで一方向プッシュをしたら、80%の接続がデータ受信だけに使われていて、全二重は無駄だった。2026年、Rust Axumストリーミングレスポンスこそが正解——SSEで一方向プッシュ、WebSocketで双方向通信、NDJSONでストリーミング転送、Tokioのゼロコスト非同期と組み合わせてメモリ安全かつ高性能。

本記事ではAxumストリーミングレスポンスの核心概念から出発し、SSEプッシュ→WebSocket通信→ストリーミングJSON→バックプレッシャー制御→マルチクライアントブロードキャスト→プロダクションデプロイの6つのプロダクションパターンを完了し、リアルタイムデータプッシュを「なんとか動く」から「プロダクション信頼性」にする。


主要ポイント

  • AxumストリーミングレスポンスはBodyストリームとSSEの2つのコア抽象化に基づく
  • SSEは一方向プッシュ(通知、LLMストリーミング出力)に適し、WebSocketは双方向インタラクションに適する
  • NDJSONはストリーミングJSONのデファクトスタンダード——1行に1つの完全なJSONオブジェクト
  • バックプレッシャー制御はストリーミングシステムの生命線、非境界チャネル=時限爆弾
  • tokio::sync::broadcastでマルチクライアントブロードキャスト、watchで状態同期を実現
  • プロダクションデプロイにはグレースフルシャットダウン、接続タイムアウト、ヘルスチェックの3点セットが必要

目次

  1. Axumストリーミングレスポンス核心概念
  2. パターン1: SSEサーバー送信イベント実装
  3. パターン2: WebSocket双方向通信
  4. パターン3: ストリーミングJSON/NDJSONレスポンス
  5. パターン4: バックプレッシャーとフロー制御
  6. パターン5: マルチクライアントブロードキャスト(Pub/Sub)
  7. パターン6: プロダクションデプロイとグレースフルシャットダウン
  8. 5つのよくある落とし穴と解決策
  9. 10のよくあるエラートラブルシューティング
  10. 高度な最適化テクニック
  11. 比較分析:SSE vs WebSocket vs ロングポーリング
  12. オンラインツールおすすめ
  13. まとめ

Axumストリーミングレスポンス核心概念

概念 説明 適用シナリオ
Sse<impl Stream> Server-Sent Events、HTTPベースの単方向プッシュ 通知、LLMストリーミング、リアルタイムログ
WebSocket 全二重通信、プロトコルアップグレード必要 チャット、協調編集、ゲーム
Body Axumレスポンスボディの基盤ストリーム抽象化 カスタムストリーミング転送
NDJSON 改行区切りJSONストリーム 大データエクスポート、イベントストリーム
Backpressure バックプレッシャー、消費者が生産速度を制御 OOM防止、トラフィックシェーピング
broadcast Tokioブロードキャストチャネル マルチクライアント同時サブスクライブ
watch Tokio状態観察チャネル 設定ホットリロード、状態同期
Chunked Encoding HTTPチャンク転送 ストリーミングレスポンスの基盤転送メカニズム

ストリーミングレスポンスアーキテクチャ

┌─────────────────────────────────────────────────┐
│                   Client                         │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐      │
│  │ Browser  │  │  Mobile  │  │   CLI    │      │
│  └────┬─────┘  └────┬─────┘  └────┬─────┘      │
└───────┼──────────────┼──────────────┼───────────┘
        │              │              │
        ▼              ▼              ▼
┌─────────────────────────────────────────────────┐
│              Axum Server                         │
│  ┌─────────────────────────────────────────┐    │
│  │            Router                        │    │
│  │  /sse  →  SSE Handler                   │    │
│  │  /ws   →  WebSocket Handler             │    │
│  │  /stream → NDJSON Handler               │    │
│  └──────────────┬──────────────────────────┘    │
│                 │                                │
│  ┌──────────────▼──────────────────────────┐    │
│  │         Tokio Runtime                    │    │
│  │  ┌────────┐ ┌────────┐ ┌────────┐      │    │
│  │  │  mpsc  │ │broadcast│ │  watch │      │    │
│  │  └────────┘ └────────┘ └────────┘      │    │
│  └─────────────────────────────────────────┘    │
└─────────────────────────────────────────────────┘

パターン1: SSEサーバー送信イベント実装

SSEはAxumストリーミングレスポンスで最も一般的なパターン、HTTP長接続で単方向プッシュを実現し、ブラウザネイティブのEventSource APIが自動再接続をサポート。

基本的なSSE実装

[dependencies]
axum = { version = "0.8", features = ["sse"] }
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
futures = "0.3"
use axum::{
    Router,
    response::sse::{Event, Sse},
    routing::get,
};
use futures::stream::{self, Stream};
use std::convert::Infallible;
use tokio_stream::StreamExt as _;
use std::time::Duration;

async fn sse_handler() -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let stream = stream::iter(0..100).map(|i| {
        Ok(Event::default()
            .data(format!("Message {}", i))
            .event("update")
            .id(i.to_string()))
    });

    Sse::new(stream).keep_alive(
        axum::response::sse::KeepAlive::new()
            .interval(Duration::from_secs(5))
            .text("ping"),
    )
}

#[tokio::main]
async fn main() {
    let app = Router::new().route("/sse", get(sse_handler));
    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

JSONデータ付きSSE

use axum::{
    Router,
    extract::State,
    response::sse::{Event, Sse},
    routing::get,
};
use futures::stream::Stream;
use serde::Serialize;
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;
use tokio_stream::StreamExt as _;

#[derive(Serialize, Clone)]
struct Notification {
    id: u64,
    title: String,
    body: String,
    timestamp: i64,
}

struct AppState {
    notification_tx: tokio::sync::broadcast::Sender<Notification>,
}

async fn sse_notifications(
    State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let mut rx = state.notification_tx.subscribe();

    let stream = async_stream::stream! {
        loop {
            match rx.recv().await {
                Ok(notification) => {
                    let data = serde_json::to_string(&notification).unwrap();
                    yield Ok(Event::default()
                        .event("notification")
                        .data(data)
                        .id(notification.id.to_string()));
                }
                Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
                    let data = serde_json::json!({"missed": n}).to_string();
                    yield Ok(Event::default()
                        .event("lagged")
                        .data(data));
                }
                Err(tokio::sync::broadcast::error::RecvError::Closed) => {
                    break;
                }
            }
        }
    };

    Sse::new(stream).keep_alive(
        axum::response::sse::KeepAlive::new()
            .interval(Duration::from_secs(15))
            .text("keepalive"),
    )
}

#[tokio::main]
async fn main() {
    let (tx, _) = tokio::sync::broadcast::channel::<Notification>(100);
    let state = Arc::new(AppState { notification_tx: tx });

    let app = Router::new()
        .route("/sse/notifications", get(sse_notifications))
        .with_state(state);

    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

SSEクライアント切断検出

use axum::{
    extract::Request,
    response::sse::{Event, Sse},
};
use futures::stream::Stream;
use std::convert::Infallible;
use tokio_stream::StreamExt as _;

async fn sse_with_disconnect(request: Request) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let cancel_token = tokio_util::sync::CancellationToken::new();
    let cancel_clone = cancel_token.clone();

    tokio::spawn(async move {
        tokio::select! {
            _ = cancel_clone.cancelled() => {
                tracing::info!("Client disconnected, stopping SSE stream");
            }
            _ = tokio::time::sleep(Duration::from_secs(3600)) => {
                tracing::info!("SSE stream timeout");
            }
        }
    });

    let stream = async_stream::stream! {
        let mut interval = tokio::time::interval(Duration::from_secs(1));
        loop {
            tokio::select! {
                _ = interval.tick() => {
                    let data = serde_json::json!({
                        "time": chrono::Utc::now().to_rfc3339()
                    });
                    yield Ok(Event::default().data(data.to_string()));
                }
                _ = cancel_token.cancelled() => {
                    break;
                }
            }
        }
    };

    Sse::new(stream).keep_alive(
        axum::response::sse::KeepAlive::new()
            .interval(Duration::from_secs(10)),
    )
}

パターン2: WebSocket双方向通信

WebSocketは全二重通信を提供し、チャット、協調編集、リアルタイムゲームなどの双方向インタラクションシナリオに適する。

基本的なWebSocket実装

[dependencies]
axum = { version = "0.8", features = ["ws"] }
tokio = { version = "1", features = ["full"] }
futures = "0.3"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
use axum::{
    Router,
    extract::ws::{Message, WebSocket, WebSocketUpgrade},
    response::IntoResponse,
    routing::get,
};
use futures::{SinkExt, StreamExt};

async fn ws_handler(ws: WebSocketUpgrade) -> impl IntoResponse {
    ws.on_upgrade(handle_socket)
}

async fn handle_socket(mut socket: WebSocket) {
    let (mut sender, mut receiver) = socket.split();

    let send_task = tokio::spawn(async move {
        let mut interval = tokio::time::interval(Duration::from_secs(5));
        loop {
            interval.tick().await;
            let msg = serde_json::json!({
                "type": "heartbeat",
                "timestamp": chrono::Utc::now().to_rfc3339()
            });
            if sender.send(Message::Text(msg.to_string().into())).await.is_err() {
                break;
            }
        }
    });

    let recv_task = tokio::spawn(async move {
        while let Some(Ok(msg)) = receiver.next().await {
            match msg {
                Message::Text(text) => {
                    tracing::info!("Received: {}", text);
                }
                Message::Close(_) => {
                    tracing::info!("Client closed connection");
                    break;
                }
                _ => {}
            }
        }
    });

    tokio::select! {
        _ = send_task => {},
        _ = recv_task => {},
    }
}

#[tokio::main]
async fn main() {
    let app = Router::new().route("/ws", get(ws_handler));
    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

認証と状態管理付きWebSocket

use axum::{
    Router,
    extract::{State, WebSocketUpgrade, ws::WebSocket},
    response::IntoResponse,
    routing::get,
};
use futures::{SinkExt, StreamExt};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};

#[derive(Debug, Clone)]
struct ChatMessage {
    user: String,
    content: String,
    timestamp: i64,
}

struct AppState {
    rooms: RwLock<HashMap<String, Vec<mpsc::UnboundedSender<ChatMessage>>>>,
}

async fn ws_chat(
    ws: WebSocketUpgrade,
    State(state): State<Arc<AppState>>,
) -> impl IntoResponse {
    ws.on_upgrade(move |socket| handle_chat(socket, state))
}

async fn handle_chat(socket: WebSocket, state: Arc<AppState>) {
    let (mut ws_sender, mut ws_receiver) = socket.split();
    let (tx, mut rx) = mpsc::unbounded_channel::<ChatMessage>();

    let room_id = "general".to_string();
    {
        let mut rooms = state.rooms.write().await;
        rooms.entry(room_id.clone()).or_default().push(tx);
    }

    let send_task = tokio::spawn(async move {
        while let Some(msg) = rx.recv().await {
            let text = serde_json::to_string(&msg).unwrap();
            if ws_sender.send(axum::extract::ws::Message::Text(text.into())).await.is_err() {
                break;
            }
        }
    });

    let recv_task = tokio::spawn(async move {
        while let Some(Ok(msg)) = ws_receiver.next().await {
            if let axum::extract::ws::Message::Text(text) = msg {
                let chat_msg = ChatMessage {
                    user: "anonymous".to_string(),
                    content: text.to_string(),
                    timestamp: chrono::Utc::now().timestamp(),
                };
                let rooms = state.rooms.read().await;
                if let Some(senders) = rooms.get(&room_id) {
                    for sender in senders {
                        let _ = sender.send(chat_msg.clone());
                    }
                }
            }
        }
    });

    tokio::select! {
        _ = send_task => {},
        _ = recv_task => {},
    }

    let mut rooms = state.rooms.write().await;
    if let Some(senders) = rooms.get_mut(&room_id) {
        senders.retain(|s| !s.is_closed());
    }
}

パターン3: ストリーミングJSON/NDJSONレスポンス

NDJSON(Newline Delimited JSON)はストリーミングJSON転送のデファクトスタンダード。1行に1つの完全なJSONオブジェクトがあり、クライアントは完全なレスポンスを待つことなく行ごとにパースできる。

NDJSONストリーミングレスポンス

use axum::{
    Router,
    extract::{Path, Query},
    response::IntoResponse,
    routing::get,
};
use serde::{Deserialize, Serialize};
use tokio_stream::StreamExt;

#[derive(Serialize, Deserialize)]
struct LogEntry {
    id: u64,
    level: String,
    message: String,
    timestamp: String,
    service: String,
}

#[derive(Deserialize)]
struct LogQuery {
    service: Option<String>,
    level: Option<String>,
    limit: Option<usize>,
}

async fn stream_logs(
    Query(params): Query<LogQuery>,
) -> impl IntoResponse {
    let limit = params.limit.unwrap_or(1000);
    let service_filter = params.service.clone();

    let stream = async_stream::stream! {
        let mut count = 0u64;
        let mut interval = tokio::time::interval(Duration::from_millis(100));

        while count < limit as u64 {
            interval.tick().await;

            let entry = LogEntry {
                id: count,
                level: if count % 10 == 0 { "ERROR".to_string() } else { "INFO".to_string() },
                message: format!("Log entry #{}", count),
                timestamp: chrono::Utc::now().to_rfc3339(),
                service: service_filter.clone().unwrap_or_else(|| "api".to_string()),
            };

            let json_line = serde_json::to_string(&entry).unwrap();
            yield Ok::<_, std::convert::Infallible>(format!("{}\n", json_line));
            count += 1;
        }
    };

    (
        [
            ("content-type", "application/x-ndjson"),
            ("cache-control", "no-cache"),
            ("x-stream-format", "ndjson"),
        ],
        axum::body::Body::from_stream(stream),
    )
}

#[tokio::main]
async fn main() {
    let app = Router::new().route("/logs/stream", get(stream_logs));
    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

データベースクエリストリーミング出力

use axum::{
    Router,
    extract::State,
    response::IntoResponse,
    routing::get,
};
use sqlx::postgres::PgPool;
use std::sync::Arc;

struct AppState {
    pool: PgPool,
}

async fn stream_query_results(
    State(state): State<Arc<AppState>>,
) -> impl IntoResponse {
    let pool = state.pool.clone();

    let stream = async_stream::stream! {
        let mut stream = sqlx::query_as::<_, (i64, String, String)>(
            "SELECT id, name, email FROM users ORDER BY id"
        )
        .fetch(&pool);

        while let Some(row) = stream.next().await {
            match row {
                Ok((id, name, email)) => {
                    let json = serde_json::json!({
                        "id": id,
                        "name": name,
                        "email": email,
                    });
                    yield Ok::<_, std::convert::Infallible>(
                        format!("{}\n", json)
                    );
                }
                Err(e) => {
                    tracing::error!("Query error: {}", e);
                    break;
                }
            }
        }
    };

    (
        [("content-type", "application/x-ndjson")],
        axum::body::Body::from_stream(stream),
    )
}

パターン4: バックプレッシャーとフロー制御

バックプレッシャーはストリーミングシステムの生命線。消費者の処理速度が生産者の生成速度に追いつかない場合、バックプレッシャー制御がないとメモリが継続的に増加しOOMに至る。

バックプレッシャー制御アーキテクチャ

┌──────────┐     ┌──────────┐     ┌──────────┐
│ Producer  │────▶│  Buffer  │────▶│ Consumer │
│ (fast)    │     │ (bounded)│     │ (slow)   │
└──────────┘     └──────────┘     └──────────┘
     │                │                  │
     │   backpressure │                  │
     │◀───────────────┤                  │
     │  (wait when    │                  │
     │   full)        │                  │

境界付きチャネルでバックプレッシャー実装

use axum::{
    Router,
    extract::State,
    response::sse::{Event, Sse},
    routing::get,
};
use futures::stream::Stream;
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;

struct AppState {
    event_tx: mpsc::Sender<String>,
}

async fn sse_with_backpressure(
    State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let (tx, mut rx) = mpsc::channel::<String>(32);

    let producer_tx = state.event_tx.clone();
    tokio::spawn(async move {
        let mut interval = tokio::time::interval(Duration::from_millis(10));
        for i in 0..10000 {
            interval.tick().await;
            let msg = format!("Event {}", i);
            if tx.send(msg).await.is_err() {
                tracing::warn!("Consumer dropped, stopping producer");
                break;
            }
        }
    });

    let stream = async_stream::stream! {
        while let Some(msg) = rx.recv().await {
            yield Ok(Event::default().data(msg));
        }
    };

    Sse::new(stream).keep_alive(
        axum::response::sse::KeepAlive::new()
            .interval(Duration::from_secs(15)),
    )
}

トラフィックシェーピング(レートリミットストリーム)

use axum::{
    Router,
    response::IntoResponse,
    routing::get,
};
use tokio::time::{self, Duration};
use tokio_stream::StreamExt;

#[derive(Clone)]
struct ThrottleConfig {
    max_per_second: u64,
    burst_size: usize,
}

async fn throttled_stream() -> impl IntoResponse {
    let config = ThrottleConfig {
        max_per_second: 100,
        burst_size: 50,
    };

    let stream = async_stream::stream! {
        let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(config.burst_size);
        let mut tokens = config.burst_size as f64;
        let max_tokens = config.burst_size as f64;
        let refill_rate = config.max_per_second as f64 / 1000.0;
        let mut last_refill = tokio::time::Instant::now();

        tokio::spawn(async move {
            let mut count = 0u64;
            loop {
                tokio::time::sleep(Duration::from_millis(10)).await;
                let msg = format!("Item {}", count);
                if tx.send(msg).await.is_err() {
                    break;
                }
                count += 1;
            }
        });

        while let Some(item) = rx.recv().await {
            let now = tokio::time::Instant::now();
            let elapsed = now.duration_since(last_refill).as_millis() as f64;
            tokens = (tokens + elapsed * refill_rate).min(max_tokens);
            last_refill = now;

            if tokens < 1.0 {
                let wait_ms = ((1.0 - tokens) / refill_rate) as u64;
                tokio::time::sleep(Duration::from_millis(wait_ms)).await;
                tokens = 0.0;
                last_refill = tokio::time::Instant::now();
            } else {
                tokens -= 1.0;
            }

            yield Ok::<_, std::convert::Infallible>(format!("{}\n", item));
        }
    };

    (
        [("content-type", "application/x-ndjson")],
        axum::body::Body::from_stream(stream),
    )
}

パターン5: マルチクライアントブロードキャスト(Pub/Sub)

マルチクライアントブロードキャストはリアルタイムプッシュのコアシナリオ:1つのイベントソースがメッセージを生成し、複数のクライアントが同時に受信する。

ブロードキャストチャネルベースのブロードキャスト

use axum::{
    Router,
    extract::State,
    response::sse::{Event, Sse},
    routing::{get, post},
};
use futures::stream::Stream;
use serde::{Deserialize, Serialize};
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;

#[derive(Serialize, Deserialize, Clone, Debug)]
struct BroadcastEvent {
    event_type: String,
    payload: serde_json::Value,
    timestamp: i64,
}

struct AppState {
    broadcast_tx: tokio::sync::broadcast::Sender<BroadcastEvent>,
    client_count: Arc<std::sync::atomic::AtomicUsize>,
}

async fn subscribe(
    State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let mut rx = state.broadcast_tx.subscribe();
    let client_id = state.client_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);

    tracing::info!("Client {} subscribed", client_id);

    let stream = async_stream::stream! {
        loop {
            match rx.recv().await {
                Ok(event) => {
                    let data = serde_json::to_string(&event).unwrap();
                    yield Ok(Event::default()
                        .event(&event.event_type)
                        .data(data)
                        .id(event.timestamp.to_string()));
                }
                Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
                    tracing::warn!("Client {} lagged {} messages", client_id, n);
                    yield Ok(Event::default()
                        .event("lagged")
                        .data(format!("Missed {} messages", n)));
                }
                Err(tokio::sync::broadcast::error::RecvError::Closed) => {
                    break;
                }
            }
        }
    };

    Sse::new(stream).keep_alive(
        axum::response::sse::KeepAlive::new()
            .interval(Duration::from_secs(10)),
    )
}

async fn publish(
    State(state): State<Arc<AppState>>,
    axum::Json(event): axum::Json<BroadcastEvent>,
) -> impl IntoResponse {
    let receiver_count = state.broadcast_tx.receiver_count();
    match state.broadcast_tx.send(event) {
        Ok(_) => axum::Json(serde_json::json!({
            "status": "published",
            "receivers": receiver_count,
        })),
        Err(_) => axum::Json(serde_json::json!({
            "status": "no_receivers",
        })),
    }
}

#[tokio::main]
async fn main() {
    let (broadcast_tx, _) = tokio::sync::broadcast::channel::<BroadcastEvent>(256);
    let state = Arc::new(AppState {
        broadcast_tx,
        client_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
    });

    let app = Router::new()
        .route("/subscribe", get(subscribe))
        .route("/publish", post(publish))
        .with_state(state);

    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

トピックベースのサブスクリプション

use axum::{
    Router,
    extract::{Path, State},
    response::sse::{Event, Sse},
    routing::{get, post},
};
use futures::stream::Stream;
use std::collections::HashMap;
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, RwLock};

#[derive(Clone, Serialize, Debug)]
struct TopicEvent {
    topic: String,
    data: serde_json::Value,
}

struct AppState {
    topics: RwLock<HashMap<String, broadcast::Sender<TopicEvent>>>,
}

impl AppState {
    fn new() -> Self {
        Self {
            topics: RwLock::new(HashMap::new()),
        }
    }

    async fn get_or_create_topic(&self, topic: &str) -> broadcast::Sender<TopicEvent> {
        let mut topics = self.topics.write().await;
        topics
            .entry(topic.to_string())
            .or_insert_with(|| {
                let (tx, _) = broadcast::channel::<TopicEvent>(256);
                tx
            })
            .clone()
    }
}

async fn subscribe_topic(
    Path(topic): Path<String>,
    State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let tx = state.get_or_create_topic(&topic).await;
    let mut rx = tx.subscribe();

    let stream = async_stream::stream! {
        loop {
            match rx.recv().await {
                Ok(event) => {
                    let data = serde_json::to_string(&event).unwrap();
                    yield Ok(Event::default()
                        .event(&event.topic)
                        .data(data));
                }
                Err(broadcast::error::RecvError::Lagged(n)) => {
                    yield Ok(Event::default()
                        .event("lagged")
                        .data(format!("Missed {}", n)));
                }
                Err(broadcast::error::RecvError::Closed) => break,
            }
        }
    };

    Sse::new(stream).keep_alive(
        axum::response::sse::KeepAlive::new()
            .interval(Duration::from_secs(15)),
    )
}

async fn publish_topic(
    Path(topic): Path<String>,
    State(state): State<Arc<AppState>>,
    axum::Json(data): axum::Json<serde_json::Value>,
) -> impl IntoResponse {
    let tx = state.get_or_create_topic(&topic).await;
    let event = TopicEvent {
        topic: topic.clone(),
        data,
    };
    let receivers = tx.receiver_count();
    let _ = tx.send(event);
    axum::Json(serde_json::json!({"delivered_to": receivers}))
}

パターン6: プロダクションデプロイとグレースフルシャットダウン

プロダクション環境のストリーミングサービスは接続管理、グレースフルシャットダウン、ヘルスチェックを処理する必要がある。

完全なプロダクショングレードサービス

use axum::{
    Router,
    extract::State,
    response::sse::{Event, Sse},
    routing::{get, post},
};
use futures::stream::Stream;
use serde::Serialize;
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, RwLock};
use tokio_util::sync::CancellationToken;

#[derive(Serialize, Clone)]
struct ServerEvent {
    event_type: String,
    data: serde_json::Value,
}

struct AppState {
    broadcast_tx: broadcast::Sender<ServerEvent>,
    cancel_token: CancellationToken,
    connections: RwLock<Vec<String>>,
}

async fn sse_production(
    State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let mut rx = state.broadcast_tx.subscribe();
    let cancel = state.cancel_token.clone();
    let conn_id = uuid::Uuid::new_v4().to_string();

    {
        let mut conns = state.connections.write().await;
        conns.push(conn_id.clone());
    }

    let conn_id_clone = conn_id.clone();
    let connections = state.connections.clone();

    let stream = async_stream::stream! {
        loop {
            tokio::select! {
                event = rx.recv() => {
                    match event {
                        Ok(ev) => {
                            let data = serde_json::to_string(&ev).unwrap();
                            yield Ok(Event::default()
                                .event(&ev.event_type)
                                .data(data));
                        }
                        Err(broadcast::error::RecvError::Lagged(n)) => {
                            yield Ok(Event::default()
                                .event("lagged")
                                .data(format!("Missed {}", n)));
                        }
                        Err(broadcast::error::RecvError::Closed) => break,
                    }
                }
                _ = cancel.cancelled() => {
                    yield Ok(Event::default()
                        .event("shutdown")
                        .data("Server is shutting down"));
                    break;
                }
            }
        }
    };

    let conns = connections.clone();
    let id = conn_id_clone.clone();
    tokio::spawn(async move {
        cancel.cancelled().await;
        let mut conns = conns.write().await;
        conns.retain(|c| c != &id);
    });

    Sse::new(stream).keep_alive(
        axum::response::sse::KeepAlive::new()
            .interval(Duration::from_secs(10)),
    )
}

async fn health_check(State(state): State<Arc<AppState>>) -> impl IntoResponse {
    let conn_count = state.connections.read().await.len();
    let is_shutting_down = state.cancel_token.is_cancelled();
    axum::Json(serde_json::json!({
        "status": if is_shutting_down { "shutting_down" } else { "healthy" },
        "active_connections": conn_count,
        "uptime_secs": 0,
    }))
}

#[tokio::main]
async fn main() {
    let (broadcast_tx, _) = broadcast::channel::<ServerEvent>(512);
    let cancel_token = CancellationToken::new();

    let state = Arc::new(AppState {
        broadcast_tx,
        cancel_token: cancel_token.clone(),
        connections: RwLock::new(Vec::new()),
    });

    let app = Router::new()
        .route("/events", get(sse_production))
        .route("/health", get(health_check))
        .with_state(state.clone());

    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();

    tokio::spawn(async move {
        tokio::signal::ctrl_c().await.unwrap();
        tracing::info!("Shutdown signal received, draining connections...");
        cancel_token.cancel();
        tokio::time::sleep(Duration::from_secs(10)).await;
        tracing::info!("Graceful shutdown complete");
        std::process::exit(0);
    });

    axum::serve(listener, app)
        .with_graceful_shutdown(shutdown_signal())
        .await
        .unwrap();
}

async fn shutdown_signal() {
    tokio::signal::ctrl_c().await.unwrap();
}

Dockerデプロイ設定

FROM rust:1.85 as builder
WORKDIR /app
COPY Cargo.toml Cargo.lock ./
COPY src ./src
RUN cargo build --release

FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/*
COPY --from=builder /app/target/release/streaming-server /usr/local/bin/
EXPOSE 3000
CMD ["streaming-server"]
# docker-compose.yml
services:
  streaming-server:
    build: .
    ports:
      - "3000:3000"
    environment:
      - RUST_LOG=info
      - STREAM_BUFFER_SIZE=256
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:3000/health"]
      interval: 10s
      timeout: 5s
      retries: 3
    deploy:
      resources:
        limits:
          memory: 512M

5つのよくある落とし穴と解決策

落とし穴1: SSEストリームが閉じず接続リーク

症状: クライアント切断後、サーバーのストリームが実行を続け、メモリが継続的に増加。

原因: AxumのSSEストリームはクライアントの切断を自動検出しない。

解決策:

use axum::extract::Request;

async fn sse_safe(request: Request) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let cancel = tokio_util::sync::CancellationToken::new();
    let cancel_clone = cancel.clone();

    let stream = async_stream::stream! {
        let mut interval = tokio::time::interval(Duration::from_secs(1));
        loop {
            tokio::select! {
                _ = interval.tick() => {
                    yield Ok(Event::default().data("tick"));
                }
                _ = cancel_clone.cancelled() => {
                    tracing::info!("Stream cancelled, client disconnected");
                    break;
                }
            }
        }
    };

    tokio::spawn(async move {
        tokio::time::sleep(Duration::from_secs(3600)).await;
        cancel.cancel();
    });

    Sse::new(stream)
}

落とし穴2: ブロードキャストチャネルのLaggedでメッセージ損失

症状: RecvError::Lagged(n) が頻繁に発生、クライアントがメッセージを損失。

原因: ブロードキャストチャネルの容量不足、遅いコンシューマーが速いプロデューサーに追いつけない。

解決策: チャネル容量を増やす + 損失メッセージ数を記録 + クライアント再接続時に再送。

let (tx, _) = tokio::sync::broadcast::channel::<Event>(1024);

match rx.recv().await {
    Err(tokio::sync::broadcast::error::RecvError::Lagged(missed)) => {
        tracing::warn!("Lagged {}, requesting catch-up", missed);
        yield Ok(Event::default()
            .event("lagged")
            .data(serde_json::json!({"missed": missed}).to_string()));
    }
    _ => {}
}

落とし穴3: WebSocketメッセージが大きすぎて拒否される

症状: 大きなメッセージを送信するとWebSocketエラーまたは切断。

原因: AxumのデフォルトWebSocketメッセージサイズ制限は64KB。

解決策:

use axum::extract::ws::WebSocketUpgrade;

async fn ws_large_messages(ws: WebSocketUpgrade) -> impl IntoResponse {
    ws.max_frame_size(1024 * 1024)
      .max_message_size(4 * 1024 * 1024)
      .on_upgrade(handle_socket)
}

落とし穴4: ストリーミングレスポンスにContent-Typeがない

症状: クライアントがストリーミングデータを受信するが正しくパースできない。

原因: 正しいContent-Typeヘッダーの設定を忘れた。

解決策:

async fn ndjson_response() -> impl IntoResponse {
    let stream = create_stream();
    (
        [
            ("content-type", "application/x-ndjson; charset=utf-8"),
            ("cache-control", "no-cache"),
            ("connection", "keep-alive"),
        ],
        axum::body::Body::from_stream(stream),
    )
}

落とし穴5: グレースフルシャットダウン時にストリームが強制中断

症状: サービスシャットダウン時にクライアントが接続リセットを受信し、正常な終了ではない。

原因: プロセスを直接killし、アクティブなストリームの完了を待っていない。

解決策: CancellationTokenを使用してすべてのストリームにグレースフルな終了を通知(パターン6を参照)。


10のよくあるエラートラブルシューティング

# エラーメッセージ 原因 解決策
1 the trait bound impl Stream: IntoResponse is not satisfied Stream型の不一致 StreamのItem型がResult<Event, Infallible>であることを確認
2 WebSocket upgrade failed: Connection header is not upgrade クライアントがUpgradeヘッダーを送信していない リクエストにConnection: UpgradeUpgrade: websocketが含まれていることを確認
3 channel send error: channel closed レシーバーが既にクローズされている レシーバーが早期にdropされていないか確認、send().await.is_err()で処理
4 RecvError::Lagged(n) 消費速度が生産速度に追いつかない ブロードキャスト容量を増やす、または境界付きmpscでバックプレッシャーを実装
5 body write aborted: connection closed クライアントが切断 クライアント切断検出を追加、リソースを速やかにクリーンアップ
6 SSE stream timeout Keep-Aliveタイムアウト KeepAlive::new().interval()の時間を調整
7 WebSocket message too large メッセージがデフォルト64KB制限を超過 ws.max_frame_size()ws.max_message_size()を使用
8 task hung detected 非同期タスクのブロッキング 同期ブロッキング操作がないか確認、tokio::task::spawn_blockingを使用
9 too many open files 接続数がシステム制限を超過 ulimit -nを調整、または最大接続数を制限
10 connection reset by peer クライアントが異常終了 WebSocketハンドラーでMessage::Closeとエラーを処理

高度な最適化テクニック

1. 接続数制限

use axum::middleware;
use std::sync::atomic::{AtomicUsize, Ordering};

struct ConnectionLimiter {
    current: AtomicUsize,
    max: usize,
}

async fn limit_connections(
    State(limiter): State<Arc<ConnectionLimiter>>,
    request: axum::extract::Request,
    next: axum::middleware::Next,
) -> impl IntoResponse {
    let current = limiter.current.fetch_add(1, Ordering::Relaxed);
    if current >= limiter.max {
        limiter.current.fetch_sub(1, Ordering::Relaxed);
        return axum::http::StatusCode::SERVICE_UNAVAILABLE.into_response();
    }

    let response = next.run(request).await;
    limiter.current.fetch_sub(1, Ordering::Relaxed);
    response
}

2. ストリーミング圧縮

use axum::response::IntoResponse;
use tokio_util::io::StreamReader;

async fn compressed_stream() -> impl IntoResponse {
    let stream = create_data_stream();
    let reader = StreamReader::new(stream);

    let mut encoder = async_compression::tokio::write::GzipEncoder::new(tokio::io::BufWriter::new(reader));
    let compressed_stream = tokio_util::io::ReaderStream::new(encoder);

    (
        [
            ("content-type", "application/x-ndjson"),
            ("content-encoding", "gzip"),
        ],
        axum::body::Body::from_stream(compressed_stream),
    )
}

3. ストリーミングレスポンスモニタリング

use prometheus::{IntCounter, IntGauge, Registry};

lazy_static::lazy_static! {
    static ref REGISTRY: Registry = Registry::new();
    static ref SSE_CONNECTIONS: IntGauge = IntGauge::new(
        "sse_connections_active",
        "Active SSE connections"
    ).unwrap();
    static ref SSE_EVENTS_SENT: IntCounter = IntCounter::new(
        "sse_events_sent_total",
        "Total SSE events sent"
    ).unwrap();
    static ref WS_CONNECTIONS: IntGauge = IntGauge::new(
        "ws_connections_active",
        "Active WebSocket connections"
    ).unwrap();
}

async fn sse_monitored(
    State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    SSE_CONNECTIONS.inc();
    let mut rx = state.broadcast_tx.subscribe();

    let stream = async_stream::stream! {
        loop {
            match rx.recv().await {
                Ok(event) => {
                    SSE_EVENTS_SENT.inc();
                    let data = serde_json::to_string(&event).unwrap();
                    yield Ok(Event::default().data(data));
                }
                Err(tokio::sync::broadcast::error::RecvError::Closed) => {
                    break;
                }
                Err(_) => continue,
            }
        }
    };

    let stream = stream.chain(futures::stream::once(async {
        SSE_CONNECTIONS.dec();
        Ok(Event::default())
    }));

    Sse::new(stream)
}

比較分析:SSE vs WebSocket vs ロングポーリング

次元 SSE WebSocket ロングポーリング
通信方向 単方向(サーバー→クライアント) 双方向 単方向(シミュレート)
プロトコル HTTP WS/WSS HTTP
自動再接続 ブラウザネイティブサポート 手動実装必要 自然にサポート
ブラウザAPI EventSource WebSocket API fetch/XHR
データ形式 テキスト テキスト+バイナリ 任意
プロキシ/CDN フレンドリー 特別な設定が必要 フレンドリー
接続オーバーヘッド 高(頻繁な接続確立)
適用シナリオ 通知、LLMストリーミング、ログ チャット、協調、ゲーム 高い互換性要件
メモリ使用量
実装複雑度

選択の決定ツリー

リアルタイムプッシュが必要?
├── サーバー→クライアントのみ?
│   ├── 自動再接続が必要? → SSE
│   └── データ量が少なく、頻度が低い? → ロングポーリング
└── 双方向通信が必要?
    ├── バイナリデータが必要? → WebSocket
    └── テキストのみ? → WebSocket

オンラインツールおすすめ

Rust Axumストリーミングレスポンスの開発時に、以下のオンラインツールが生産性を大幅に向上:


まとめ

Rust Axumストリーミングレスポンスは、リアルタイムWebアプリケーションに高性能でメモリ安全なソリューションを提供する。6つのプロダクションパターンにはそれぞれ理想的なユースケースがある:

  1. SSE: 単方向プッシュシナリオに最適、実装がシンプル、ブラウザネイティブサポート
  2. WebSocket: 双方向通信の第一選択、チャットや協調に最適
  3. NDJSON: 大データストリーミングの標準フォーマット、行ごとにパース、待機不要
  4. バックプレッシャー制御: ストリーミングシステムの生命線、境界付きチャネル+トークンバケットが必須
  5. マルチクライアントブロードキャスト: ブロードキャストチャネルでPub/Sub、トピックサブスクリプションで的確なプッシュ
  6. プロダクションデプロイ: グレースフルシャットダウン+ヘルスチェック+接続制限の3点セット

覚えておいて:非境界チャネルは時限爆弾、バックプレッシャーのないストリーミングシステムはいつかOOMする。適切なチャネルタイプと容量を選択し、Tokioのゼロコスト非同期と組み合わせることで、Axumストリーミングサービスはプロダクション環境で安定稼働する。


関連記事

参考リソース

ブラウザローカルツールを無料で試す →

#Rust#Axum#流式响应#SSE#WebSocket#实时推送#2026#编程语言