Rust Axumストリーミングレスポンス:SSEからWebSocketまでの6つのプロダクションパターン
全データの読み込みを待ってからレスポンス?ユーザーはもうページを閉じてる
10万件のログを返すAPIを作ったら、クライアントは30秒待って完全なJSONを受け取り、メモリは2GBに急増;リアルタイム通知のためにポーリングで毎秒リクエストしたら、サーバーのQPSが10倍に;WebSocketで一方向プッシュをしたら、80%の接続がデータ受信だけに使われていて、全二重は無駄だった。2026年、Rust Axumストリーミングレスポンスこそが正解——SSEで一方向プッシュ、WebSocketで双方向通信、NDJSONでストリーミング転送、Tokioのゼロコスト非同期と組み合わせてメモリ安全かつ高性能。
本記事ではAxumストリーミングレスポンスの核心概念から出発し、SSEプッシュ→WebSocket通信→ストリーミングJSON→バックプレッシャー制御→マルチクライアントブロードキャスト→プロダクションデプロイの6つのプロダクションパターンを完了し、リアルタイムデータプッシュを「なんとか動く」から「プロダクション信頼性」にする。
主要ポイント
- Axumストリーミングレスポンスは
BodyストリームとSSEの2つのコア抽象化に基づく - SSEは一方向プッシュ(通知、LLMストリーミング出力)に適し、WebSocketは双方向インタラクションに適する
- NDJSONはストリーミングJSONのデファクトスタンダード——1行に1つの完全なJSONオブジェクト
- バックプレッシャー制御はストリーミングシステムの生命線、非境界チャネル=時限爆弾
tokio::sync::broadcastでマルチクライアントブロードキャスト、watchで状態同期を実現- プロダクションデプロイにはグレースフルシャットダウン、接続タイムアウト、ヘルスチェックの3点セットが必要
目次
- Axumストリーミングレスポンス核心概念
- パターン1: SSEサーバー送信イベント実装
- パターン2: WebSocket双方向通信
- パターン3: ストリーミングJSON/NDJSONレスポンス
- パターン4: バックプレッシャーとフロー制御
- パターン5: マルチクライアントブロードキャスト(Pub/Sub)
- パターン6: プロダクションデプロイとグレースフルシャットダウン
- 5つのよくある落とし穴と解決策
- 10のよくあるエラートラブルシューティング
- 高度な最適化テクニック
- 比較分析:SSE vs WebSocket vs ロングポーリング
- オンラインツールおすすめ
- まとめ
Axumストリーミングレスポンス核心概念
| 概念 | 説明 | 適用シナリオ |
|---|---|---|
Sse<impl Stream> |
Server-Sent Events、HTTPベースの単方向プッシュ | 通知、LLMストリーミング、リアルタイムログ |
WebSocket |
全二重通信、プロトコルアップグレード必要 | チャット、協調編集、ゲーム |
Body |
Axumレスポンスボディの基盤ストリーム抽象化 | カスタムストリーミング転送 |
| NDJSON | 改行区切りJSONストリーム | 大データエクスポート、イベントストリーム |
| Backpressure | バックプレッシャー、消費者が生産速度を制御 | OOM防止、トラフィックシェーピング |
broadcast |
Tokioブロードキャストチャネル | マルチクライアント同時サブスクライブ |
watch |
Tokio状態観察チャネル | 設定ホットリロード、状態同期 |
| Chunked Encoding | HTTPチャンク転送 | ストリーミングレスポンスの基盤転送メカニズム |
ストリーミングレスポンスアーキテクチャ
┌─────────────────────────────────────────────────┐
│ Client │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Browser │ │ Mobile │ │ CLI │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
└───────┼──────────────┼──────────────┼───────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────┐
│ Axum Server │
│ ┌─────────────────────────────────────────┐ │
│ │ Router │ │
│ │ /sse → SSE Handler │ │
│ │ /ws → WebSocket Handler │ │
│ │ /stream → NDJSON Handler │ │
│ └──────────────┬──────────────────────────┘ │
│ │ │
│ ┌──────────────▼──────────────────────────┐ │
│ │ Tokio Runtime │ │
│ │ ┌────────┐ ┌────────┐ ┌────────┐ │ │
│ │ │ mpsc │ │broadcast│ │ watch │ │ │
│ │ └────────┘ └────────┘ └────────┘ │ │
│ └─────────────────────────────────────────┘ │
└─────────────────────────────────────────────────┘
パターン1: SSEサーバー送信イベント実装
SSEはAxumストリーミングレスポンスで最も一般的なパターン、HTTP長接続で単方向プッシュを実現し、ブラウザネイティブのEventSource APIが自動再接続をサポート。
基本的なSSE実装
[dependencies]
axum = { version = "0.8", features = ["sse"] }
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
futures = "0.3"
use axum::{
Router,
response::sse::{Event, Sse},
routing::get,
};
use futures::stream::{self, Stream};
use std::convert::Infallible;
use tokio_stream::StreamExt as _;
use std::time::Duration;
async fn sse_handler() -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let stream = stream::iter(0..100).map(|i| {
Ok(Event::default()
.data(format!("Message {}", i))
.event("update")
.id(i.to_string()))
});
Sse::new(stream).keep_alive(
axum::response::sse::KeepAlive::new()
.interval(Duration::from_secs(5))
.text("ping"),
)
}
#[tokio::main]
async fn main() {
let app = Router::new().route("/sse", get(sse_handler));
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
}
JSONデータ付きSSE
use axum::{
Router,
extract::State,
response::sse::{Event, Sse},
routing::get,
};
use futures::stream::Stream;
use serde::Serialize;
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;
use tokio_stream::StreamExt as _;
#[derive(Serialize, Clone)]
struct Notification {
id: u64,
title: String,
body: String,
timestamp: i64,
}
struct AppState {
notification_tx: tokio::sync::broadcast::Sender<Notification>,
}
async fn sse_notifications(
State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let mut rx = state.notification_tx.subscribe();
let stream = async_stream::stream! {
loop {
match rx.recv().await {
Ok(notification) => {
let data = serde_json::to_string(¬ification).unwrap();
yield Ok(Event::default()
.event("notification")
.data(data)
.id(notification.id.to_string()));
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
let data = serde_json::json!({"missed": n}).to_string();
yield Ok(Event::default()
.event("lagged")
.data(data));
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
break;
}
}
}
};
Sse::new(stream).keep_alive(
axum::response::sse::KeepAlive::new()
.interval(Duration::from_secs(15))
.text("keepalive"),
)
}
#[tokio::main]
async fn main() {
let (tx, _) = tokio::sync::broadcast::channel::<Notification>(100);
let state = Arc::new(AppState { notification_tx: tx });
let app = Router::new()
.route("/sse/notifications", get(sse_notifications))
.with_state(state);
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
}
SSEクライアント切断検出
use axum::{
extract::Request,
response::sse::{Event, Sse},
};
use futures::stream::Stream;
use std::convert::Infallible;
use tokio_stream::StreamExt as _;
async fn sse_with_disconnect(request: Request) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let cancel_token = tokio_util::sync::CancellationToken::new();
let cancel_clone = cancel_token.clone();
tokio::spawn(async move {
tokio::select! {
_ = cancel_clone.cancelled() => {
tracing::info!("Client disconnected, stopping SSE stream");
}
_ = tokio::time::sleep(Duration::from_secs(3600)) => {
tracing::info!("SSE stream timeout");
}
}
});
let stream = async_stream::stream! {
let mut interval = tokio::time::interval(Duration::from_secs(1));
loop {
tokio::select! {
_ = interval.tick() => {
let data = serde_json::json!({
"time": chrono::Utc::now().to_rfc3339()
});
yield Ok(Event::default().data(data.to_string()));
}
_ = cancel_token.cancelled() => {
break;
}
}
}
};
Sse::new(stream).keep_alive(
axum::response::sse::KeepAlive::new()
.interval(Duration::from_secs(10)),
)
}
パターン2: WebSocket双方向通信
WebSocketは全二重通信を提供し、チャット、協調編集、リアルタイムゲームなどの双方向インタラクションシナリオに適する。
基本的なWebSocket実装
[dependencies]
axum = { version = "0.8", features = ["ws"] }
tokio = { version = "1", features = ["full"] }
futures = "0.3"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
use axum::{
Router,
extract::ws::{Message, WebSocket, WebSocketUpgrade},
response::IntoResponse,
routing::get,
};
use futures::{SinkExt, StreamExt};
async fn ws_handler(ws: WebSocketUpgrade) -> impl IntoResponse {
ws.on_upgrade(handle_socket)
}
async fn handle_socket(mut socket: WebSocket) {
let (mut sender, mut receiver) = socket.split();
let send_task = tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(5));
loop {
interval.tick().await;
let msg = serde_json::json!({
"type": "heartbeat",
"timestamp": chrono::Utc::now().to_rfc3339()
});
if sender.send(Message::Text(msg.to_string().into())).await.is_err() {
break;
}
}
});
let recv_task = tokio::spawn(async move {
while let Some(Ok(msg)) = receiver.next().await {
match msg {
Message::Text(text) => {
tracing::info!("Received: {}", text);
}
Message::Close(_) => {
tracing::info!("Client closed connection");
break;
}
_ => {}
}
}
});
tokio::select! {
_ = send_task => {},
_ = recv_task => {},
}
}
#[tokio::main]
async fn main() {
let app = Router::new().route("/ws", get(ws_handler));
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
}
認証と状態管理付きWebSocket
use axum::{
Router,
extract::{State, WebSocketUpgrade, ws::WebSocket},
response::IntoResponse,
routing::get,
};
use futures::{SinkExt, StreamExt};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
#[derive(Debug, Clone)]
struct ChatMessage {
user: String,
content: String,
timestamp: i64,
}
struct AppState {
rooms: RwLock<HashMap<String, Vec<mpsc::UnboundedSender<ChatMessage>>>>,
}
async fn ws_chat(
ws: WebSocketUpgrade,
State(state): State<Arc<AppState>>,
) -> impl IntoResponse {
ws.on_upgrade(move |socket| handle_chat(socket, state))
}
async fn handle_chat(socket: WebSocket, state: Arc<AppState>) {
let (mut ws_sender, mut ws_receiver) = socket.split();
let (tx, mut rx) = mpsc::unbounded_channel::<ChatMessage>();
let room_id = "general".to_string();
{
let mut rooms = state.rooms.write().await;
rooms.entry(room_id.clone()).or_default().push(tx);
}
let send_task = tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
let text = serde_json::to_string(&msg).unwrap();
if ws_sender.send(axum::extract::ws::Message::Text(text.into())).await.is_err() {
break;
}
}
});
let recv_task = tokio::spawn(async move {
while let Some(Ok(msg)) = ws_receiver.next().await {
if let axum::extract::ws::Message::Text(text) = msg {
let chat_msg = ChatMessage {
user: "anonymous".to_string(),
content: text.to_string(),
timestamp: chrono::Utc::now().timestamp(),
};
let rooms = state.rooms.read().await;
if let Some(senders) = rooms.get(&room_id) {
for sender in senders {
let _ = sender.send(chat_msg.clone());
}
}
}
}
});
tokio::select! {
_ = send_task => {},
_ = recv_task => {},
}
let mut rooms = state.rooms.write().await;
if let Some(senders) = rooms.get_mut(&room_id) {
senders.retain(|s| !s.is_closed());
}
}
パターン3: ストリーミングJSON/NDJSONレスポンス
NDJSON(Newline Delimited JSON)はストリーミングJSON転送のデファクトスタンダード。1行に1つの完全なJSONオブジェクトがあり、クライアントは完全なレスポンスを待つことなく行ごとにパースできる。
NDJSONストリーミングレスポンス
use axum::{
Router,
extract::{Path, Query},
response::IntoResponse,
routing::get,
};
use serde::{Deserialize, Serialize};
use tokio_stream::StreamExt;
#[derive(Serialize, Deserialize)]
struct LogEntry {
id: u64,
level: String,
message: String,
timestamp: String,
service: String,
}
#[derive(Deserialize)]
struct LogQuery {
service: Option<String>,
level: Option<String>,
limit: Option<usize>,
}
async fn stream_logs(
Query(params): Query<LogQuery>,
) -> impl IntoResponse {
let limit = params.limit.unwrap_or(1000);
let service_filter = params.service.clone();
let stream = async_stream::stream! {
let mut count = 0u64;
let mut interval = tokio::time::interval(Duration::from_millis(100));
while count < limit as u64 {
interval.tick().await;
let entry = LogEntry {
id: count,
level: if count % 10 == 0 { "ERROR".to_string() } else { "INFO".to_string() },
message: format!("Log entry #{}", count),
timestamp: chrono::Utc::now().to_rfc3339(),
service: service_filter.clone().unwrap_or_else(|| "api".to_string()),
};
let json_line = serde_json::to_string(&entry).unwrap();
yield Ok::<_, std::convert::Infallible>(format!("{}\n", json_line));
count += 1;
}
};
(
[
("content-type", "application/x-ndjson"),
("cache-control", "no-cache"),
("x-stream-format", "ndjson"),
],
axum::body::Body::from_stream(stream),
)
}
#[tokio::main]
async fn main() {
let app = Router::new().route("/logs/stream", get(stream_logs));
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
}
データベースクエリストリーミング出力
use axum::{
Router,
extract::State,
response::IntoResponse,
routing::get,
};
use sqlx::postgres::PgPool;
use std::sync::Arc;
struct AppState {
pool: PgPool,
}
async fn stream_query_results(
State(state): State<Arc<AppState>>,
) -> impl IntoResponse {
let pool = state.pool.clone();
let stream = async_stream::stream! {
let mut stream = sqlx::query_as::<_, (i64, String, String)>(
"SELECT id, name, email FROM users ORDER BY id"
)
.fetch(&pool);
while let Some(row) = stream.next().await {
match row {
Ok((id, name, email)) => {
let json = serde_json::json!({
"id": id,
"name": name,
"email": email,
});
yield Ok::<_, std::convert::Infallible>(
format!("{}\n", json)
);
}
Err(e) => {
tracing::error!("Query error: {}", e);
break;
}
}
}
};
(
[("content-type", "application/x-ndjson")],
axum::body::Body::from_stream(stream),
)
}
パターン4: バックプレッシャーとフロー制御
バックプレッシャーはストリーミングシステムの生命線。消費者の処理速度が生産者の生成速度に追いつかない場合、バックプレッシャー制御がないとメモリが継続的に増加しOOMに至る。
バックプレッシャー制御アーキテクチャ
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Producer │────▶│ Buffer │────▶│ Consumer │
│ (fast) │ │ (bounded)│ │ (slow) │
└──────────┘ └──────────┘ └──────────┘
│ │ │
│ backpressure │ │
│◀───────────────┤ │
│ (wait when │ │
│ full) │ │
境界付きチャネルでバックプレッシャー実装
use axum::{
Router,
extract::State,
response::sse::{Event, Sse},
routing::get,
};
use futures::stream::Stream;
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
struct AppState {
event_tx: mpsc::Sender<String>,
}
async fn sse_with_backpressure(
State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let (tx, mut rx) = mpsc::channel::<String>(32);
let producer_tx = state.event_tx.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_millis(10));
for i in 0..10000 {
interval.tick().await;
let msg = format!("Event {}", i);
if tx.send(msg).await.is_err() {
tracing::warn!("Consumer dropped, stopping producer");
break;
}
}
});
let stream = async_stream::stream! {
while let Some(msg) = rx.recv().await {
yield Ok(Event::default().data(msg));
}
};
Sse::new(stream).keep_alive(
axum::response::sse::KeepAlive::new()
.interval(Duration::from_secs(15)),
)
}
トラフィックシェーピング(レートリミットストリーム)
use axum::{
Router,
response::IntoResponse,
routing::get,
};
use tokio::time::{self, Duration};
use tokio_stream::StreamExt;
#[derive(Clone)]
struct ThrottleConfig {
max_per_second: u64,
burst_size: usize,
}
async fn throttled_stream() -> impl IntoResponse {
let config = ThrottleConfig {
max_per_second: 100,
burst_size: 50,
};
let stream = async_stream::stream! {
let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(config.burst_size);
let mut tokens = config.burst_size as f64;
let max_tokens = config.burst_size as f64;
let refill_rate = config.max_per_second as f64 / 1000.0;
let mut last_refill = tokio::time::Instant::now();
tokio::spawn(async move {
let mut count = 0u64;
loop {
tokio::time::sleep(Duration::from_millis(10)).await;
let msg = format!("Item {}", count);
if tx.send(msg).await.is_err() {
break;
}
count += 1;
}
});
while let Some(item) = rx.recv().await {
let now = tokio::time::Instant::now();
let elapsed = now.duration_since(last_refill).as_millis() as f64;
tokens = (tokens + elapsed * refill_rate).min(max_tokens);
last_refill = now;
if tokens < 1.0 {
let wait_ms = ((1.0 - tokens) / refill_rate) as u64;
tokio::time::sleep(Duration::from_millis(wait_ms)).await;
tokens = 0.0;
last_refill = tokio::time::Instant::now();
} else {
tokens -= 1.0;
}
yield Ok::<_, std::convert::Infallible>(format!("{}\n", item));
}
};
(
[("content-type", "application/x-ndjson")],
axum::body::Body::from_stream(stream),
)
}
パターン5: マルチクライアントブロードキャスト(Pub/Sub)
マルチクライアントブロードキャストはリアルタイムプッシュのコアシナリオ:1つのイベントソースがメッセージを生成し、複数のクライアントが同時に受信する。
ブロードキャストチャネルベースのブロードキャスト
use axum::{
Router,
extract::State,
response::sse::{Event, Sse},
routing::{get, post},
};
use futures::stream::Stream;
use serde::{Deserialize, Serialize};
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;
#[derive(Serialize, Deserialize, Clone, Debug)]
struct BroadcastEvent {
event_type: String,
payload: serde_json::Value,
timestamp: i64,
}
struct AppState {
broadcast_tx: tokio::sync::broadcast::Sender<BroadcastEvent>,
client_count: Arc<std::sync::atomic::AtomicUsize>,
}
async fn subscribe(
State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let mut rx = state.broadcast_tx.subscribe();
let client_id = state.client_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
tracing::info!("Client {} subscribed", client_id);
let stream = async_stream::stream! {
loop {
match rx.recv().await {
Ok(event) => {
let data = serde_json::to_string(&event).unwrap();
yield Ok(Event::default()
.event(&event.event_type)
.data(data)
.id(event.timestamp.to_string()));
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!("Client {} lagged {} messages", client_id, n);
yield Ok(Event::default()
.event("lagged")
.data(format!("Missed {} messages", n)));
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
break;
}
}
}
};
Sse::new(stream).keep_alive(
axum::response::sse::KeepAlive::new()
.interval(Duration::from_secs(10)),
)
}
async fn publish(
State(state): State<Arc<AppState>>,
axum::Json(event): axum::Json<BroadcastEvent>,
) -> impl IntoResponse {
let receiver_count = state.broadcast_tx.receiver_count();
match state.broadcast_tx.send(event) {
Ok(_) => axum::Json(serde_json::json!({
"status": "published",
"receivers": receiver_count,
})),
Err(_) => axum::Json(serde_json::json!({
"status": "no_receivers",
})),
}
}
#[tokio::main]
async fn main() {
let (broadcast_tx, _) = tokio::sync::broadcast::channel::<BroadcastEvent>(256);
let state = Arc::new(AppState {
broadcast_tx,
client_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
});
let app = Router::new()
.route("/subscribe", get(subscribe))
.route("/publish", post(publish))
.with_state(state);
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
}
トピックベースのサブスクリプション
use axum::{
Router,
extract::{Path, State},
response::sse::{Event, Sse},
routing::{get, post},
};
use futures::stream::Stream;
use std::collections::HashMap;
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, RwLock};
#[derive(Clone, Serialize, Debug)]
struct TopicEvent {
topic: String,
data: serde_json::Value,
}
struct AppState {
topics: RwLock<HashMap<String, broadcast::Sender<TopicEvent>>>,
}
impl AppState {
fn new() -> Self {
Self {
topics: RwLock::new(HashMap::new()),
}
}
async fn get_or_create_topic(&self, topic: &str) -> broadcast::Sender<TopicEvent> {
let mut topics = self.topics.write().await;
topics
.entry(topic.to_string())
.or_insert_with(|| {
let (tx, _) = broadcast::channel::<TopicEvent>(256);
tx
})
.clone()
}
}
async fn subscribe_topic(
Path(topic): Path<String>,
State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let tx = state.get_or_create_topic(&topic).await;
let mut rx = tx.subscribe();
let stream = async_stream::stream! {
loop {
match rx.recv().await {
Ok(event) => {
let data = serde_json::to_string(&event).unwrap();
yield Ok(Event::default()
.event(&event.topic)
.data(data));
}
Err(broadcast::error::RecvError::Lagged(n)) => {
yield Ok(Event::default()
.event("lagged")
.data(format!("Missed {}", n)));
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
};
Sse::new(stream).keep_alive(
axum::response::sse::KeepAlive::new()
.interval(Duration::from_secs(15)),
)
}
async fn publish_topic(
Path(topic): Path<String>,
State(state): State<Arc<AppState>>,
axum::Json(data): axum::Json<serde_json::Value>,
) -> impl IntoResponse {
let tx = state.get_or_create_topic(&topic).await;
let event = TopicEvent {
topic: topic.clone(),
data,
};
let receivers = tx.receiver_count();
let _ = tx.send(event);
axum::Json(serde_json::json!({"delivered_to": receivers}))
}
パターン6: プロダクションデプロイとグレースフルシャットダウン
プロダクション環境のストリーミングサービスは接続管理、グレースフルシャットダウン、ヘルスチェックを処理する必要がある。
完全なプロダクショングレードサービス
use axum::{
Router,
extract::State,
response::sse::{Event, Sse},
routing::{get, post},
};
use futures::stream::Stream;
use serde::Serialize;
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, RwLock};
use tokio_util::sync::CancellationToken;
#[derive(Serialize, Clone)]
struct ServerEvent {
event_type: String,
data: serde_json::Value,
}
struct AppState {
broadcast_tx: broadcast::Sender<ServerEvent>,
cancel_token: CancellationToken,
connections: RwLock<Vec<String>>,
}
async fn sse_production(
State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let mut rx = state.broadcast_tx.subscribe();
let cancel = state.cancel_token.clone();
let conn_id = uuid::Uuid::new_v4().to_string();
{
let mut conns = state.connections.write().await;
conns.push(conn_id.clone());
}
let conn_id_clone = conn_id.clone();
let connections = state.connections.clone();
let stream = async_stream::stream! {
loop {
tokio::select! {
event = rx.recv() => {
match event {
Ok(ev) => {
let data = serde_json::to_string(&ev).unwrap();
yield Ok(Event::default()
.event(&ev.event_type)
.data(data));
}
Err(broadcast::error::RecvError::Lagged(n)) => {
yield Ok(Event::default()
.event("lagged")
.data(format!("Missed {}", n)));
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
_ = cancel.cancelled() => {
yield Ok(Event::default()
.event("shutdown")
.data("Server is shutting down"));
break;
}
}
}
};
let conns = connections.clone();
let id = conn_id_clone.clone();
tokio::spawn(async move {
cancel.cancelled().await;
let mut conns = conns.write().await;
conns.retain(|c| c != &id);
});
Sse::new(stream).keep_alive(
axum::response::sse::KeepAlive::new()
.interval(Duration::from_secs(10)),
)
}
async fn health_check(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let conn_count = state.connections.read().await.len();
let is_shutting_down = state.cancel_token.is_cancelled();
axum::Json(serde_json::json!({
"status": if is_shutting_down { "shutting_down" } else { "healthy" },
"active_connections": conn_count,
"uptime_secs": 0,
}))
}
#[tokio::main]
async fn main() {
let (broadcast_tx, _) = broadcast::channel::<ServerEvent>(512);
let cancel_token = CancellationToken::new();
let state = Arc::new(AppState {
broadcast_tx,
cancel_token: cancel_token.clone(),
connections: RwLock::new(Vec::new()),
});
let app = Router::new()
.route("/events", get(sse_production))
.route("/health", get(health_check))
.with_state(state.clone());
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
tokio::spawn(async move {
tokio::signal::ctrl_c().await.unwrap();
tracing::info!("Shutdown signal received, draining connections...");
cancel_token.cancel();
tokio::time::sleep(Duration::from_secs(10)).await;
tracing::info!("Graceful shutdown complete");
std::process::exit(0);
});
axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal())
.await
.unwrap();
}
async fn shutdown_signal() {
tokio::signal::ctrl_c().await.unwrap();
}
Dockerデプロイ設定
FROM rust:1.85 as builder
WORKDIR /app
COPY Cargo.toml Cargo.lock ./
COPY src ./src
RUN cargo build --release
FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/*
COPY --from=builder /app/target/release/streaming-server /usr/local/bin/
EXPOSE 3000
CMD ["streaming-server"]
# docker-compose.yml
services:
streaming-server:
build: .
ports:
- "3000:3000"
environment:
- RUST_LOG=info
- STREAM_BUFFER_SIZE=256
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:3000/health"]
interval: 10s
timeout: 5s
retries: 3
deploy:
resources:
limits:
memory: 512M
5つのよくある落とし穴と解決策
落とし穴1: SSEストリームが閉じず接続リーク
症状: クライアント切断後、サーバーのストリームが実行を続け、メモリが継続的に増加。
原因: AxumのSSEストリームはクライアントの切断を自動検出しない。
解決策:
use axum::extract::Request;
async fn sse_safe(request: Request) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let cancel = tokio_util::sync::CancellationToken::new();
let cancel_clone = cancel.clone();
let stream = async_stream::stream! {
let mut interval = tokio::time::interval(Duration::from_secs(1));
loop {
tokio::select! {
_ = interval.tick() => {
yield Ok(Event::default().data("tick"));
}
_ = cancel_clone.cancelled() => {
tracing::info!("Stream cancelled, client disconnected");
break;
}
}
}
};
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(3600)).await;
cancel.cancel();
});
Sse::new(stream)
}
落とし穴2: ブロードキャストチャネルのLaggedでメッセージ損失
症状: RecvError::Lagged(n) が頻繁に発生、クライアントがメッセージを損失。
原因: ブロードキャストチャネルの容量不足、遅いコンシューマーが速いプロデューサーに追いつけない。
解決策: チャネル容量を増やす + 損失メッセージ数を記録 + クライアント再接続時に再送。
let (tx, _) = tokio::sync::broadcast::channel::<Event>(1024);
match rx.recv().await {
Err(tokio::sync::broadcast::error::RecvError::Lagged(missed)) => {
tracing::warn!("Lagged {}, requesting catch-up", missed);
yield Ok(Event::default()
.event("lagged")
.data(serde_json::json!({"missed": missed}).to_string()));
}
_ => {}
}
落とし穴3: WebSocketメッセージが大きすぎて拒否される
症状: 大きなメッセージを送信するとWebSocketエラーまたは切断。
原因: AxumのデフォルトWebSocketメッセージサイズ制限は64KB。
解決策:
use axum::extract::ws::WebSocketUpgrade;
async fn ws_large_messages(ws: WebSocketUpgrade) -> impl IntoResponse {
ws.max_frame_size(1024 * 1024)
.max_message_size(4 * 1024 * 1024)
.on_upgrade(handle_socket)
}
落とし穴4: ストリーミングレスポンスにContent-Typeがない
症状: クライアントがストリーミングデータを受信するが正しくパースできない。
原因: 正しいContent-Typeヘッダーの設定を忘れた。
解決策:
async fn ndjson_response() -> impl IntoResponse {
let stream = create_stream();
(
[
("content-type", "application/x-ndjson; charset=utf-8"),
("cache-control", "no-cache"),
("connection", "keep-alive"),
],
axum::body::Body::from_stream(stream),
)
}
落とし穴5: グレースフルシャットダウン時にストリームが強制中断
症状: サービスシャットダウン時にクライアントが接続リセットを受信し、正常な終了ではない。
原因: プロセスを直接killし、アクティブなストリームの完了を待っていない。
解決策: CancellationTokenを使用してすべてのストリームにグレースフルな終了を通知(パターン6を参照)。
10のよくあるエラートラブルシューティング
| # | エラーメッセージ | 原因 | 解決策 |
|---|---|---|---|
| 1 | the trait bound impl Stream: IntoResponse is not satisfied |
Stream型の不一致 | StreamのItem型がResult<Event, Infallible>であることを確認 |
| 2 | WebSocket upgrade failed: Connection header is not upgrade |
クライアントがUpgradeヘッダーを送信していない | リクエストにConnection: UpgradeとUpgrade: websocketが含まれていることを確認 |
| 3 | channel send error: channel closed |
レシーバーが既にクローズされている | レシーバーが早期にdropされていないか確認、send().await.is_err()で処理 |
| 4 | RecvError::Lagged(n) |
消費速度が生産速度に追いつかない | ブロードキャスト容量を増やす、または境界付きmpscでバックプレッシャーを実装 |
| 5 | body write aborted: connection closed |
クライアントが切断 | クライアント切断検出を追加、リソースを速やかにクリーンアップ |
| 6 | SSE stream timeout |
Keep-Aliveタイムアウト | KeepAlive::new().interval()の時間を調整 |
| 7 | WebSocket message too large |
メッセージがデフォルト64KB制限を超過 | ws.max_frame_size()とws.max_message_size()を使用 |
| 8 | task hung detected |
非同期タスクのブロッキング | 同期ブロッキング操作がないか確認、tokio::task::spawn_blockingを使用 |
| 9 | too many open files |
接続数がシステム制限を超過 | ulimit -nを調整、または最大接続数を制限 |
| 10 | connection reset by peer |
クライアントが異常終了 | WebSocketハンドラーでMessage::Closeとエラーを処理 |
高度な最適化テクニック
1. 接続数制限
use axum::middleware;
use std::sync::atomic::{AtomicUsize, Ordering};
struct ConnectionLimiter {
current: AtomicUsize,
max: usize,
}
async fn limit_connections(
State(limiter): State<Arc<ConnectionLimiter>>,
request: axum::extract::Request,
next: axum::middleware::Next,
) -> impl IntoResponse {
let current = limiter.current.fetch_add(1, Ordering::Relaxed);
if current >= limiter.max {
limiter.current.fetch_sub(1, Ordering::Relaxed);
return axum::http::StatusCode::SERVICE_UNAVAILABLE.into_response();
}
let response = next.run(request).await;
limiter.current.fetch_sub(1, Ordering::Relaxed);
response
}
2. ストリーミング圧縮
use axum::response::IntoResponse;
use tokio_util::io::StreamReader;
async fn compressed_stream() -> impl IntoResponse {
let stream = create_data_stream();
let reader = StreamReader::new(stream);
let mut encoder = async_compression::tokio::write::GzipEncoder::new(tokio::io::BufWriter::new(reader));
let compressed_stream = tokio_util::io::ReaderStream::new(encoder);
(
[
("content-type", "application/x-ndjson"),
("content-encoding", "gzip"),
],
axum::body::Body::from_stream(compressed_stream),
)
}
3. ストリーミングレスポンスモニタリング
use prometheus::{IntCounter, IntGauge, Registry};
lazy_static::lazy_static! {
static ref REGISTRY: Registry = Registry::new();
static ref SSE_CONNECTIONS: IntGauge = IntGauge::new(
"sse_connections_active",
"Active SSE connections"
).unwrap();
static ref SSE_EVENTS_SENT: IntCounter = IntCounter::new(
"sse_events_sent_total",
"Total SSE events sent"
).unwrap();
static ref WS_CONNECTIONS: IntGauge = IntGauge::new(
"ws_connections_active",
"Active WebSocket connections"
).unwrap();
}
async fn sse_monitored(
State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
SSE_CONNECTIONS.inc();
let mut rx = state.broadcast_tx.subscribe();
let stream = async_stream::stream! {
loop {
match rx.recv().await {
Ok(event) => {
SSE_EVENTS_SENT.inc();
let data = serde_json::to_string(&event).unwrap();
yield Ok(Event::default().data(data));
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
break;
}
Err(_) => continue,
}
}
};
let stream = stream.chain(futures::stream::once(async {
SSE_CONNECTIONS.dec();
Ok(Event::default())
}));
Sse::new(stream)
}
比較分析:SSE vs WebSocket vs ロングポーリング
| 次元 | SSE | WebSocket | ロングポーリング |
|---|---|---|---|
| 通信方向 | 単方向(サーバー→クライアント) | 双方向 | 単方向(シミュレート) |
| プロトコル | HTTP | WS/WSS | HTTP |
| 自動再接続 | ブラウザネイティブサポート | 手動実装必要 | 自然にサポート |
| ブラウザAPI | EventSource | WebSocket API | fetch/XHR |
| データ形式 | テキスト | テキスト+バイナリ | 任意 |
| プロキシ/CDN | フレンドリー | 特別な設定が必要 | フレンドリー |
| 接続オーバーヘッド | 低 | 中 | 高(頻繁な接続確立) |
| 適用シナリオ | 通知、LLMストリーミング、ログ | チャット、協調、ゲーム | 高い互換性要件 |
| メモリ使用量 | 低 | 中 | 高 |
| 実装複雑度 | 低 | 中 | 低 |
選択の決定ツリー
リアルタイムプッシュが必要?
├── サーバー→クライアントのみ?
│ ├── 自動再接続が必要? → SSE
│ └── データ量が少なく、頻度が低い? → ロングポーリング
└── 双方向通信が必要?
├── バイナリデータが必要? → WebSocket
└── テキストのみ? → WebSocket
オンラインツールおすすめ
Rust Axumストリーミングレスポンスの開発時に、以下のオンラインツールが生産性を大幅に向上:
- JSONフォーマッター - SSEとWebSocketのJSONメッセージをフォーマット、ストリーミングデータをデバッグ
- Base64エンコード/デコード - WebSocketバイナリフレームのBase64データをエンコード/デコード
- Curl to Code - SSE/WebSocketのcurlテストコマンドをRustコードに変換
まとめ
Rust Axumストリーミングレスポンスは、リアルタイムWebアプリケーションに高性能でメモリ安全なソリューションを提供する。6つのプロダクションパターンにはそれぞれ理想的なユースケースがある:
- SSE: 単方向プッシュシナリオに最適、実装がシンプル、ブラウザネイティブサポート
- WebSocket: 双方向通信の第一選択、チャットや協調に最適
- NDJSON: 大データストリーミングの標準フォーマット、行ごとにパース、待機不要
- バックプレッシャー制御: ストリーミングシステムの生命線、境界付きチャネル+トークンバケットが必須
- マルチクライアントブロードキャスト: ブロードキャストチャネルでPub/Sub、トピックサブスクリプションで的確なプッシュ
- プロダクションデプロイ: グレースフルシャットダウン+ヘルスチェック+接続制限の3点セット
覚えておいて:非境界チャネルは時限爆弾、バックプレッシャーのないストリーミングシステムはいつかOOMする。適切なチャネルタイプと容量を選択し、Tokioのゼロコスト非同期と組み合わせることで、Axumストリーミングサービスはプロダクション環境で安定稼働する。
関連記事:
- Rust Axum Webフレームワーク:ルート設計からミドルウェアまでの5つのプロダクショングレードパターン
- Rust Tokio Channelパターン実践:mpscからBroadcastまでの6つのプロダクションパターン
- Rust Tokioグレースフルシャットダウン:シグナル処理からリソースクリーンアップまでの完全実践
参考リソース:
ブラウザローカルツールを無料で試す →