Rust Axum流式回應:從SSE到WebSocket的6種生產模式

编程语言

等資料全部載入完再回傳,使用者早關頁面了

你寫了個API回傳10萬條日誌,客戶端等了30秒才拿到完整JSON,記憶體直接飆到2GB;你想做即時通知,用輪詢每秒請求一次,伺服器QPS翻了10倍;你用WebSocket做單向推送,結果發現80%的連線只用來接收資料,全雙工純屬浪費。2026年,Rust Axum流式回應才是正解——SSE單向推送、WebSocket雙向通訊、NDJSON流式傳輸,配合Tokio的零成本非同步,記憶體安全且效能炸裂。

本文將從Axum流式回應核心概念出發,帶你完成SSE推送→WebSocket通訊→流式JSON→背壓控制→多客戶端廣播→生產部署的6種生產模式,讓即時資料推送從「勉強能用」變成「生產可靠」。


核心要點

  • Axum流式回應基於Body流和SSE兩種核心抽象
  • SSE適合單向推送(通知、LLM流式輸出),WebSocket適合雙向互動
  • NDJSON是流式JSON的事實標準,每行一個完整JSON物件
  • 背壓控制是流式系統的生命線,無界通道=定時炸彈
  • tokio::sync::broadcast實現多客戶端廣播,watch實現狀態同步
  • 生產部署需要優雅關閉、連線逾時、健康檢查三件套

目錄

  1. Axum流式回應核心概念
  2. Pattern 1: SSE服務端推送實現
  3. Pattern 2: WebSocket雙向通訊
  4. Pattern 3: 流式JSON/NDJSON回應
  5. Pattern 4: 背壓與流量控制
  6. Pattern 5: 多客戶端廣播(Pub/Sub)
  7. Pattern 6: 生產部署與優雅關閉
  8. 5個常見坑及解決方案
  9. 10個常見報錯排查
  10. 進階最佳化技巧
  11. 對比分析:SSE vs WebSocket vs 長輪詢
  12. 線上工具推薦
  13. 總結

Axum流式回應核心概念

概念 說明 適用場景
Sse<impl Stream> Server-Sent Events,基於HTTP的單向推送 通知、LLM流式輸出、即時日誌
WebSocket 全雙工通訊,需協議升級 聊天、協作編輯、遊戲
Body Axum回應體的底層流抽象 自訂流式傳輸
NDJSON 換行分隔的JSON流 大資料匯出、事件流
Backpressure 背壓,消費者控制生產速度 防止OOM、流量整形
broadcast Tokio廣播通道 多客戶端同時訂閱
watch Tokio狀態觀察通道 設定熱更新、狀態同步
Chunked Encoding HTTP分塊傳輸 流式回應的底層傳輸機制

流式回應架構

┌─────────────────────────────────────────────────┐
│                   Client                         │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐      │
│  │ Browser  │  │  Mobile  │  │   CLI    │      │
│  └────┬─────┘  └────┬─────┘  └────┬─────┘      │
└───────┼──────────────┼──────────────┼───────────┘
        │              │              │
        ▼              ▼              ▼
┌─────────────────────────────────────────────────┐
│              Axum Server                         │
│  ┌─────────────────────────────────────────┐    │
│  │            Router                        │    │
│  │  /sse  →  SSE Handler                   │    │
│  │  /ws   →  WebSocket Handler             │    │
│  │  /stream → NDJSON Handler               │    │
│  └──────────────┬──────────────────────────┘    │
│                 │                                │
│  ┌──────────────▼──────────────────────────┐    │
│  │         Tokio Runtime                    │    │
│  │  ┌────────┐ ┌────────┐ ┌────────┐      │    │
│  │  │  mpsc  │ │broadcast│ │  watch │      │    │
│  │  └────────┘ └────────┘ └────────┘      │    │
│  └─────────────────────────────────────────┘    │
└─────────────────────────────────────────────────┘

Pattern 1: SSE服務端推送實現

SSE是Axum流式回應最常用的模式,基於HTTP長連線實現單向推送,瀏覽器原生EventSource API自動重連。

基礎SSE實現

[dependencies]
axum = { version = "0.8", features = ["sse"] }
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
futures = "0.3"
use axum::{
    Router,
    response::sse::{Event, Sse},
    routing::get,
};
use futures::stream::{self, Stream};
use std::convert::Infallible;
use tokio_stream::StreamExt as _;
use std::time::Duration;

async fn sse_handler() -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let stream = stream::iter(0..100).map(|i| {
        Ok(Event::default()
            .data(format!("Message {}", i))
            .event("update")
            .id(i.to_string()))
    });

    Sse::new(stream).keep_alive(
        axum::response::sse::KeepAlive::new()
            .interval(Duration::from_secs(5))
            .text("ping"),
    )
}

#[tokio::main]
async fn main() {
    let app = Router::new().route("/sse", get(sse_handler));
    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

帶JSON資料的SSE

use axum::{
    Router,
    extract::State,
    response::sse::{Event, Sse},
    routing::get,
};
use futures::stream::Stream;
use serde::Serialize;
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;
use tokio_stream::StreamExt as _;

#[derive(Serialize, Clone)]
struct Notification {
    id: u64,
    title: String,
    body: String,
    timestamp: i64,
}

struct AppState {
    notification_tx: tokio::sync::broadcast::Sender<Notification>,
}

async fn sse_notifications(
    State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let mut rx = state.notification_tx.subscribe();

    let stream = async_stream::stream! {
        loop {
            match rx.recv().await {
                Ok(notification) => {
                    let data = serde_json::to_string(&notification).unwrap();
                    yield Ok(Event::default()
                        .event("notification")
                        .data(data)
                        .id(notification.id.to_string()));
                }
                Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
                    let data = serde_json::json!({"missed": n}).to_string();
                    yield Ok(Event::default()
                        .event("lagged")
                        .data(data));
                }
                Err(tokio::sync::broadcast::error::RecvError::Closed) => {
                    break;
                }
            }
        }
    };

    Sse::new(stream).keep_alive(
        axum::response::sse::KeepAlive::new()
            .interval(Duration::from_secs(15))
            .text("keepalive"),
    )
}

#[tokio::main]
async fn main() {
    let (tx, _) = tokio::sync::broadcast::channel::<Notification>(100);
    let state = Arc::new(AppState { notification_tx: tx });

    let app = Router::new()
        .route("/sse/notifications", get(sse_notifications))
        .with_state(state);

    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

SSE客戶端斷開偵測

use axum::{
    extract::Request,
    response::sse::{Event, Sse},
};
use futures::stream::Stream;
use std::convert::Infallible;
use tokio_stream::StreamExt as _;

async fn sse_with_disconnect(request: Request) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let cancel_token = tokio_util::sync::CancellationToken::new();
    let cancel_clone = cancel_token.clone();

    tokio::spawn(async move {
        tokio::select! {
            _ = cancel_clone.cancelled() => {
                tracing::info!("Client disconnected, stopping SSE stream");
            }
            _ = tokio::time::sleep(Duration::from_secs(3600)) => {
                tracing::info!("SSE stream timeout");
            }
        }
    });

    let stream = async_stream::stream! {
        let mut interval = tokio::time::interval(Duration::from_secs(1));
        loop {
            tokio::select! {
                _ = interval.tick() => {
                    let data = serde_json::json!({
                        "time": chrono::Utc::now().to_rfc3339()
                    });
                    yield Ok(Event::default().data(data.to_string()));
                }
                _ = cancel_token.cancelled() => {
                    break;
                }
            }
        }
    };

    Sse::new(stream).keep_alive(
        axum::response::sse::KeepAlive::new()
            .interval(Duration::from_secs(10)),
    )
}

Pattern 2: WebSocket雙向通訊

WebSocket提供全雙工通訊能力,適合聊天、協作編輯、即時遊戲等雙向互動場景。

基礎WebSocket實現

[dependencies]
axum = { version = "0.8", features = ["ws"] }
tokio = { version = "1", features = ["full"] }
futures = "0.3"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
use axum::{
    Router,
    extract::ws::{Message, WebSocket, WebSocketUpgrade},
    response::IntoResponse,
    routing::get,
};
use futures::{SinkExt, StreamExt};

async fn ws_handler(ws: WebSocketUpgrade) -> impl IntoResponse {
    ws.on_upgrade(handle_socket)
}

async fn handle_socket(mut socket: WebSocket) {
    let (mut sender, mut receiver) = socket.split();

    let send_task = tokio::spawn(async move {
        let mut interval = tokio::time::interval(Duration::from_secs(5));
        loop {
            interval.tick().await;
            let msg = serde_json::json!({
                "type": "heartbeat",
                "timestamp": chrono::Utc::now().to_rfc3339()
            });
            if sender.send(Message::Text(msg.to_string().into())).await.is_err() {
                break;
            }
        }
    });

    let recv_task = tokio::spawn(async move {
        while let Some(Ok(msg)) = receiver.next().await {
            match msg {
                Message::Text(text) => {
                    tracing::info!("Received: {}", text);
                }
                Message::Close(_) => {
                    tracing::info!("Client closed connection");
                    break;
                }
                _ => {}
            }
        }
    });

    tokio::select! {
        _ = send_task => {},
        _ = recv_task => {},
    }
}

#[tokio::main]
async fn main() {
    let app = Router::new().route("/ws", get(ws_handler));
    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

帶認證和狀態管理的WebSocket

use axum::{
    Router,
    extract::{State, WebSocketUpgrade, ws::WebSocket},
    response::IntoResponse,
    routing::get,
};
use futures::{SinkExt, StreamExt};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};

#[derive(Debug, Clone)]
struct ChatMessage {
    user: String,
    content: String,
    timestamp: i64,
}

struct AppState {
    rooms: RwLock<HashMap<String, Vec<mpsc::UnboundedSender<ChatMessage>>>>,
}

async fn ws_chat(
    ws: WebSocketUpgrade,
    State(state): State<Arc<AppState>>,
) -> impl IntoResponse {
    ws.on_upgrade(move |socket| handle_chat(socket, state))
}

async fn handle_chat(socket: WebSocket, state: Arc<AppState>) {
    let (mut ws_sender, mut ws_receiver) = socket.split();
    let (tx, mut rx) = mpsc::unbounded_channel::<ChatMessage>();

    let room_id = "general".to_string();
    {
        let mut rooms = state.rooms.write().await;
        rooms.entry(room_id.clone()).or_default().push(tx);
    }

    let send_task = tokio::spawn(async move {
        while let Some(msg) = rx.recv().await {
            let text = serde_json::to_string(&msg).unwrap();
            if ws_sender.send(axum::extract::ws::Message::Text(text.into())).await.is_err() {
                break;
            }
        }
    });

    let recv_task = tokio::spawn(async move {
        while let Some(Ok(msg)) = ws_receiver.next().await {
            if let axum::extract::ws::Message::Text(text) = msg {
                let chat_msg = ChatMessage {
                    user: "anonymous".to_string(),
                    content: text.to_string(),
                    timestamp: chrono::Utc::now().timestamp(),
                };
                let rooms = state.rooms.read().await;
                if let Some(senders) = rooms.get(&room_id) {
                    for sender in senders {
                        let _ = sender.send(chat_msg.clone());
                    }
                }
            }
        }
    });

    tokio::select! {
        _ = send_task => {},
        _ = recv_task => {},
    }

    let mut rooms = state.rooms.write().await;
    if let Some(senders) = rooms.get_mut(&room_id) {
        senders.retain(|s| !s.is_closed());
    }
}

Pattern 3: 流式JSON/NDJSON回應

NDJSON(Newline Delimited JSON)是流式JSON傳輸的事實標準,每行一個完整的JSON物件,客戶端可以逐行解析,無需等待完整回應。

NDJSON流式回應

use axum::{
    Router,
    extract::{Path, Query},
    response::IntoResponse,
    routing::get,
};
use serde::{Deserialize, Serialize};
use tokio_stream::StreamExt;

#[derive(Serialize, Deserialize)]
struct LogEntry {
    id: u64,
    level: String,
    message: String,
    timestamp: String,
    service: String,
}

#[derive(Deserialize)]
struct LogQuery {
    service: Option<String>,
    level: Option<String>,
    limit: Option<usize>,
}

async fn stream_logs(
    Query(params): Query<LogQuery>,
) -> impl IntoResponse {
    let limit = params.limit.unwrap_or(1000);
    let service_filter = params.service.clone();

    let stream = async_stream::stream! {
        let mut count = 0u64;
        let mut interval = tokio::time::interval(Duration::from_millis(100));

        while count < limit as u64 {
            interval.tick().await;

            let entry = LogEntry {
                id: count,
                level: if count % 10 == 0 { "ERROR".to_string() } else { "INFO".to_string() },
                message: format!("Log entry #{}", count),
                timestamp: chrono::Utc::now().to_rfc3339(),
                service: service_filter.clone().unwrap_or_else(|| "api".to_string()),
            };

            let json_line = serde_json::to_string(&entry).unwrap();
            yield Ok::<_, std::convert::Infallible>(format!("{}\n", json_line));
            count += 1;
        }
    };

    (
        [
            ("content-type", "application/x-ndjson"),
            ("cache-control", "no-cache"),
            ("x-stream-format", "ndjson"),
        ],
        axum::body::Body::from_stream(stream),
    )
}

#[tokio::main]
async fn main() {
    let app = Router::new().route("/logs/stream", get(stream_logs));
    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

資料庫查詢流式輸出

use axum::{
    Router,
    extract::State,
    response::IntoResponse,
    routing::get,
};
use sqlx::postgres::PgPool;
use std::sync::Arc;

struct AppState {
    pool: PgPool,
}

async fn stream_query_results(
    State(state): State<Arc<AppState>>,
) -> impl IntoResponse {
    let pool = state.pool.clone();

    let stream = async_stream::stream! {
        let mut stream = sqlx::query_as::<_, (i64, String, String)>(
            "SELECT id, name, email FROM users ORDER BY id"
        )
        .fetch(&pool);

        while let Some(row) = stream.next().await {
            match row {
                Ok((id, name, email)) => {
                    let json = serde_json::json!({
                        "id": id,
                        "name": name,
                        "email": email,
                    });
                    yield Ok::<_, std::convert::Infallible>(
                        format!("{}\n", json)
                    );
                }
                Err(e) => {
                    tracing::error!("Query error: {}", e);
                    break;
                }
            }
        }
    };

    (
        [("content-type", "application/x-ndjson")],
        axum::body::Body::from_stream(stream),
    )
}

Pattern 4: 背壓與流量控制

背壓是流式系統的生命線。當消費者處理速度慢於生產速度時,沒有背壓控制會導致記憶體持續增長直到OOM。

背壓控制架構

┌──────────┐     ┌──────────┐     ┌──────────┐
│ Producer  │────▶│  Buffer  │────▶│ Consumer │
│ (fast)    │     │ (bounded)│     │ (slow)   │
└──────────┘     └──────────┘     └──────────┘
     │                │                  │
     │   backpressure │                  │
     │◀───────────────┤                  │
     │  (wait when    │                  │
     │   full)        │                  │

有界通道實現背壓

use axum::{
    Router,
    extract::State,
    response::sse::{Event, Sse},
    routing::get,
};
use futures::stream::Stream;
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;

struct AppState {
    event_tx: mpsc::Sender<String>,
}

async fn sse_with_backpressure(
    State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let (tx, mut rx) = mpsc::channel::<String>(32);

    let producer_tx = state.event_tx.clone();
    tokio::spawn(async move {
        let mut interval = tokio::time::interval(Duration::from_millis(10));
        for i in 0..10000 {
            interval.tick().await;
            let msg = format!("Event {}", i);
            if tx.send(msg).await.is_err() {
                tracing::warn!("Consumer dropped, stopping producer");
                break;
            }
        }
    });

    let stream = async_stream::stream! {
        while let Some(msg) = rx.recv().await {
            yield Ok(Event::default().data(msg));
        }
    };

    Sse::new(stream).keep_alive(
        axum::response::sse::KeepAlive::new()
            .interval(Duration::from_secs(15)),
    )
}

流量整形(Rate Limiting Stream)

use axum::{
    Router,
    response::IntoResponse,
    routing::get,
};
use tokio::time::{self, Duration};
use tokio_stream::StreamExt;

#[derive(Clone)]
struct ThrottleConfig {
    max_per_second: u64,
    burst_size: usize,
}

async fn throttled_stream() -> impl IntoResponse {
    let config = ThrottleConfig {
        max_per_second: 100,
        burst_size: 50,
    };

    let stream = async_stream::stream! {
        let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(config.burst_size);
        let mut tokens = config.burst_size as f64;
        let max_tokens = config.burst_size as f64;
        let refill_rate = config.max_per_second as f64 / 1000.0;
        let mut last_refill = tokio::time::Instant::now();

        tokio::spawn(async move {
            let mut count = 0u64;
            loop {
                tokio::time::sleep(Duration::from_millis(10)).await;
                let msg = format!("Item {}", count);
                if tx.send(msg).await.is_err() {
                    break;
                }
                count += 1;
            }
        });

        while let Some(item) = rx.recv().await {
            let now = tokio::time::Instant::now();
            let elapsed = now.duration_since(last_refill).as_millis() as f64;
            tokens = (tokens + elapsed * refill_rate).min(max_tokens);
            last_refill = now;

            if tokens < 1.0 {
                let wait_ms = ((1.0 - tokens) / refill_rate) as u64;
                tokio::time::sleep(Duration::from_millis(wait_ms)).await;
                tokens = 0.0;
                last_refill = tokio::time::Instant::now();
            } else {
                tokens -= 1.0;
            }

            yield Ok::<_, std::convert::Infallible>(format!("{}\n", item));
        }
    };

    (
        [("content-type", "application/x-ndjson")],
        axum::body::Body::from_stream(stream),
    )
}

Pattern 5: 多客戶端廣播(Pub/Sub)

多客戶端廣播是即時推送的核心場景:一個事件源產生訊息,多個客戶端同時接收。

基於broadcast通道的廣播

use axum::{
    Router,
    extract::State,
    response::sse::{Event, Sse},
    routing::{get, post},
};
use futures::stream::Stream;
use serde::{Deserialize, Serialize};
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;

#[derive(Serialize, Deserialize, Clone, Debug)]
struct BroadcastEvent {
    event_type: String,
    payload: serde_json::Value,
    timestamp: i64,
}

struct AppState {
    broadcast_tx: tokio::sync::broadcast::Sender<BroadcastEvent>,
    client_count: Arc<std::sync::atomic::AtomicUsize>,
}

async fn subscribe(
    State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let mut rx = state.broadcast_tx.subscribe();
    let client_id = state.client_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);

    tracing::info!("Client {} subscribed", client_id);

    let stream = async_stream::stream! {
        loop {
            match rx.recv().await {
                Ok(event) => {
                    let data = serde_json::to_string(&event).unwrap();
                    yield Ok(Event::default()
                        .event(&event.event_type)
                        .data(data)
                        .id(event.timestamp.to_string()));
                }
                Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
                    tracing::warn!("Client {} lagged {} messages", client_id, n);
                    yield Ok(Event::default()
                        .event("lagged")
                        .data(format!("Missed {} messages", n)));
                }
                Err(tokio::sync::broadcast::error::RecvError::Closed) => {
                    break;
                }
            }
        }
    };

    Sse::new(stream).keep_alive(
        axum::response::sse::KeepAlive::new()
            .interval(Duration::from_secs(10)),
    )
}

async fn publish(
    State(state): State<Arc<AppState>>,
    axum::Json(event): axum::Json<BroadcastEvent>,
) -> impl IntoResponse {
    let receiver_count = state.broadcast_tx.receiver_count();
    match state.broadcast_tx.send(event) {
        Ok(_) => axum::Json(serde_json::json!({
            "status": "published",
            "receivers": receiver_count,
        })),
        Err(_) => axum::Json(serde_json::json!({
            "status": "no_receivers",
        })),
    }
}

#[tokio::main]
async fn main() {
    let (broadcast_tx, _) = tokio::sync::broadcast::channel::<BroadcastEvent>(256);
    let state = Arc::new(AppState {
        broadcast_tx,
        client_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
    });

    let app = Router::new()
        .route("/subscribe", get(subscribe))
        .route("/publish", post(publish))
        .with_state(state);

    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

基於Topic的訂閱

use axum::{
    Router,
    extract::{Path, State},
    response::sse::{Event, Sse},
    routing::{get, post},
};
use futures::stream::Stream;
use std::collections::HashMap;
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, RwLock};

#[derive(Clone, Serialize, Debug)]
struct TopicEvent {
    topic: String,
    data: serde_json::Value,
}

struct AppState {
    topics: RwLock<HashMap<String, broadcast::Sender<TopicEvent>>>,
}

impl AppState {
    fn new() -> Self {
        Self {
            topics: RwLock::new(HashMap::new()),
        }
    }

    async fn get_or_create_topic(&self, topic: &str) -> broadcast::Sender<TopicEvent> {
        let mut topics = self.topics.write().await;
        topics
            .entry(topic.to_string())
            .or_insert_with(|| {
                let (tx, _) = broadcast::channel::<TopicEvent>(256);
                tx
            })
            .clone()
    }
}

async fn subscribe_topic(
    Path(topic): Path<String>,
    State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let tx = state.get_or_create_topic(&topic).await;
    let mut rx = tx.subscribe();

    let stream = async_stream::stream! {
        loop {
            match rx.recv().await {
                Ok(event) => {
                    let data = serde_json::to_string(&event).unwrap();
                    yield Ok(Event::default()
                        .event(&event.topic)
                        .data(data));
                }
                Err(broadcast::error::RecvError::Lagged(n)) => {
                    yield Ok(Event::default()
                        .event("lagged")
                        .data(format!("Missed {}", n)));
                }
                Err(broadcast::error::RecvError::Closed) => break,
            }
        }
    };

    Sse::new(stream).keep_alive(
        axum::response::sse::KeepAlive::new()
            .interval(Duration::from_secs(15)),
    )
}

async fn publish_topic(
    Path(topic): Path<String>,
    State(state): State<Arc<AppState>>,
    axum::Json(data): axum::Json<serde_json::Value>,
) -> impl IntoResponse {
    let tx = state.get_or_create_topic(&topic).await;
    let event = TopicEvent {
        topic: topic.clone(),
        data,
    };
    let receivers = tx.receiver_count();
    let _ = tx.send(event);
    axum::Json(serde_json::json!({"delivered_to": receivers}))
}

Pattern 6: 生產部署與優雅關閉

生產環境的流式服務需要處理連線管理、優雅關閉和健康檢查。

完整生產級服務

use axum::{
    Router,
    extract::State,
    response::sse::{Event, Sse},
    routing::{get, post},
};
use futures::stream::Stream;
use serde::Serialize;
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, RwLock};
use tokio_util::sync::CancellationToken;

#[derive(Serialize, Clone)]
struct ServerEvent {
    event_type: String,
    data: serde_json::Value,
}

struct AppState {
    broadcast_tx: broadcast::Sender<ServerEvent>,
    cancel_token: CancellationToken,
    connections: RwLock<Vec<String>>,
}

async fn sse_production(
    State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let mut rx = state.broadcast_tx.subscribe();
    let cancel = state.cancel_token.clone();
    let conn_id = uuid::Uuid::new_v4().to_string();

    {
        let mut conns = state.connections.write().await;
        conns.push(conn_id.clone());
    }

    let conn_id_clone = conn_id.clone();
    let connections = state.connections.clone();

    let stream = async_stream::stream! {
        loop {
            tokio::select! {
                event = rx.recv() => {
                    match event {
                        Ok(ev) => {
                            let data = serde_json::to_string(&ev).unwrap();
                            yield Ok(Event::default()
                                .event(&ev.event_type)
                                .data(data));
                        }
                        Err(broadcast::error::RecvError::Lagged(n)) => {
                            yield Ok(Event::default()
                                .event("lagged")
                                .data(format!("Missed {}", n)));
                        }
                        Err(broadcast::error::RecvError::Closed) => break,
                    }
                }
                _ = cancel.cancelled() => {
                    yield Ok(Event::default()
                        .event("shutdown")
                        .data("Server is shutting down"));
                    break;
                }
            }
        }
    };

    let conns = connections.clone();
    let id = conn_id_clone.clone();
    tokio::spawn(async move {
        cancel.cancelled().await;
        let mut conns = conns.write().await;
        conns.retain(|c| c != &id);
    });

    Sse::new(stream).keep_alive(
        axum::response::sse::KeepAlive::new()
            .interval(Duration::from_secs(10)),
    )
}

async fn health_check(State(state): State<Arc<AppState>>) -> impl IntoResponse {
    let conn_count = state.connections.read().await.len();
    let is_shutting_down = state.cancel_token.is_cancelled();
    axum::Json(serde_json::json!({
        "status": if is_shutting_down { "shutting_down" } else { "healthy" },
        "active_connections": conn_count,
        "uptime_secs": 0,
    }))
}

#[tokio::main]
async fn main() {
    let (broadcast_tx, _) = broadcast::channel::<ServerEvent>(512);
    let cancel_token = CancellationToken::new();

    let state = Arc::new(AppState {
        broadcast_tx,
        cancel_token: cancel_token.clone(),
        connections: RwLock::new(Vec::new()),
    });

    let app = Router::new()
        .route("/events", get(sse_production))
        .route("/health", get(health_check))
        .with_state(state.clone());

    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();

    tokio::spawn(async move {
        tokio::signal::ctrl_c().await.unwrap();
        tracing::info!("Shutdown signal received, draining connections...");
        cancel_token.cancel();
        tokio::time::sleep(Duration::from_secs(10)).await;
        tracing::info!("Graceful shutdown complete");
        std::process::exit(0);
    });

    axum::serve(listener, app)
        .with_graceful_shutdown(shutdown_signal())
        .await
        .unwrap();
}

async fn shutdown_signal() {
    tokio::signal::ctrl_c().await.unwrap();
}

Docker部署設定

FROM rust:1.85 as builder
WORKDIR /app
COPY Cargo.toml Cargo.lock ./
COPY src ./src
RUN cargo build --release

FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/*
COPY --from=builder /app/target/release/streaming-server /usr/local/bin/
EXPOSE 3000
CMD ["streaming-server"]
# docker-compose.yml
services:
  streaming-server:
    build: .
    ports:
      - "3000:3000"
    environment:
      - RUST_LOG=info
      - STREAM_BUFFER_SIZE=256
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:3000/health"]
      interval: 10s
      timeout: 5s
      retries: 3
    deploy:
      resources:
        limits:
          memory: 512M

5個常見坑及解決方案

坑1:SSE流不關閉導致連線洩漏

現象:客戶端斷開後,服務端stream仍在執行,記憶體持續增長。

原因:Axum的SSE stream沒有自動感知客戶端斷開。

解決方案

use axum::extract::Request;

async fn sse_safe(request: Request) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let cancel = tokio_util::sync::CancellationToken::new();
    let cancel_clone = cancel.clone();

    let stream = async_stream::stream! {
        let mut interval = tokio::time::interval(Duration::from_secs(1));
        loop {
            tokio::select! {
                _ = interval.tick() => {
                    yield Ok(Event::default().data("tick"));
                }
                _ = cancel_clone.cancelled() => {
                    tracing::info!("Stream cancelled, client disconnected");
                    break;
                }
            }
        }
    };

    tokio::spawn(async move {
        tokio::time::sleep(Duration::from_secs(3600)).await;
        cancel.cancel();
    });

    Sse::new(stream)
}

坑2:broadcast通道Lagged導致訊息丟失

現象RecvError::Lagged(n) 頻繁出現,客戶端丟失訊息。

原因:broadcast通道容量不足,慢消費者跟不上快生產者。

解決方案:增大通道容量 + 記錄丟失訊息數 + 客戶端重連時補發。

let (tx, _) = tokio::sync::broadcast::channel::<Event>(1024);

match rx.recv().await {
    Err(tokio::sync::broadcast::error::RecvError::Lagged(missed)) => {
        tracing::warn!("Lagged {}, requesting catch-up", missed);
        yield Ok(Event::default()
            .event("lagged")
            .data(serde_json::json!({"missed": missed}).to_string()));
    }
    _ => {}
}

坑3:WebSocket訊息過大被拒

現象:傳送大訊息時WebSocket回傳錯誤或斷開。

原因:Axum預設WebSocket訊息大小限制為64KB。

解決方案

use axum::extract::ws::WebSocketUpgrade;

async fn ws_large_messages(ws: WebSocketUpgrade) -> impl IntoResponse {
    ws.max_frame_size(1024 * 1024)
      .max_message_size(4 * 1024 * 1024)
      .on_upgrade(handle_socket)
}

坑4:流式回應缺少Content-Type

現象:客戶端收到流式資料但無法正確解析。

原因:忘記設定正確的Content-Type標頭。

解決方案

async fn ndjson_response() -> impl IntoResponse {
    let stream = create_stream();
    (
        [
            ("content-type", "application/x-ndjson; charset=utf-8"),
            ("cache-control", "no-cache"),
            ("connection", "keep-alive"),
        ],
        axum::body::Body::from_stream(stream),
    )
}

坑5:優雅關閉時流被強制中斷

現象:服務關閉時客戶端收到連線重置,而非正常結束。

原因:直接kill程序,沒有等待活躍流完成。

解決方案:使用CancellationToken通知所有流優雅結束(見Pattern 6)。


10個常見報錯排查

# 報錯資訊 原因 解決方案
1 the trait bound impl Stream: IntoResponse is not satisfied Stream型別不匹配 確保Stream的Item型別為Result<Event, Infallible>
2 WebSocket upgrade failed: Connection header is not upgrade 客戶端未傳送Upgrade標頭 確認請求標頭包含Connection: UpgradeUpgrade: websocket
3 channel send error: channel closed 接收端已關閉 檢查接收端是否提前drop,使用send().await.is_err()處理
4 RecvError::Lagged(n) 消費速度跟不上生產速度 增大broadcast容量,或使用有界mpsc實現背壓
5 body write aborted: connection closed 客戶端斷開連線 新增客戶端斷開偵測,及時清理資源
6 SSE stream timeout Keep-Alive逾時 調整KeepAlive::new().interval()時間
7 WebSocket message too large 訊息超過預設64KB限制 使用ws.max_frame_size()ws.max_message_size()
8 task hung detected 非同步任務阻塞 檢查是否有同步阻塞操作,使用tokio::task::spawn_blocking
9 too many open files 連線數超過系統限制 調整ulimit -n,或限制最大連線數
10 connection reset by peer 客戶端異常關閉 在WebSocket handler中處理Message::Close和錯誤

進階最佳化技巧

1. 連線數限制

use axum::middleware;
use std::sync::atomic::{AtomicUsize, Ordering};

struct ConnectionLimiter {
    current: AtomicUsize,
    max: usize,
}

async fn limit_connections(
    State(limiter): State<Arc<ConnectionLimiter>>,
    request: axum::extract::Request,
    next: axum::middleware::Next,
) -> impl IntoResponse {
    let current = limiter.current.fetch_add(1, Ordering::Relaxed);
    if current >= limiter.max {
        limiter.current.fetch_sub(1, Ordering::Relaxed);
        return axum::http::StatusCode::SERVICE_UNAVAILABLE.into_response();
    }

    let response = next.run(request).await;
    limiter.current.fetch_sub(1, Ordering::Relaxed);
    response
}

2. 流式壓縮

use axum::response::IntoResponse;
use tokio_util::io::StreamReader;

async fn compressed_stream() -> impl IntoResponse {
    let stream = create_data_stream();
    let reader = StreamReader::new(stream);

    let mut encoder = async_compression::tokio::write::GzipEncoder::new(tokio::io::BufWriter::new(reader));
    let compressed_stream = tokio_util::io::ReaderStream::new(encoder);

    (
        [
            ("content-type", "application/x-ndjson"),
            ("content-encoding", "gzip"),
        ],
        axum::body::Body::from_stream(compressed_stream),
    )
}

3. 流式回應監控

use prometheus::{IntCounter, IntGauge, Registry};

lazy_static::lazy_static! {
    static ref REGISTRY: Registry = Registry::new();
    static ref SSE_CONNECTIONS: IntGauge = IntGauge::new(
        "sse_connections_active",
        "Active SSE connections"
    ).unwrap();
    static ref SSE_EVENTS_SENT: IntCounter = IntCounter::new(
        "sse_events_sent_total",
        "Total SSE events sent"
    ).unwrap();
    static ref WS_CONNECTIONS: IntGauge = IntGauge::new(
        "ws_connections_active",
        "Active WebSocket connections"
    ).unwrap();
}

async fn sse_monitored(
    State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    SSE_CONNECTIONS.inc();
    let mut rx = state.broadcast_tx.subscribe();

    let stream = async_stream::stream! {
        loop {
            match rx.recv().await {
                Ok(event) => {
                    SSE_EVENTS_SENT.inc();
                    let data = serde_json::to_string(&event).unwrap();
                    yield Ok(Event::default().data(data));
                }
                Err(tokio::sync::broadcast::error::RecvError::Closed) => {
                    break;
                }
                Err(_) => continue,
            }
        }
    };

    let stream = stream.chain(futures::stream::once(async {
        SSE_CONNECTIONS.dec();
        Ok(Event::default())
    }));

    Sse::new(stream)
}

對比分析:SSE vs WebSocket vs 長輪詢

維度 SSE WebSocket 長輪詢
通訊方向 單向(服務端→客戶端) 雙向 單向(模擬)
協議 HTTP WS/WSS HTTP
自動重連 瀏覽器原生支援 需手動實現 天然支援
瀏覽器API EventSource WebSocket API fetch/XHR
資料格式 文字 文字+二進位 任意
代理/CDN 友好 需特殊設定 友好
連線開銷 高(頻繁建連)
適用場景 通知、LLM流式、日誌 聊天、協作、遊戲 相容性要求高
記憶體佔用
實現複雜度

選型決策樹

需要即時推送?
├── 只需服務端→客戶端?
│   ├── 需要自動重連? → SSE
│   └── 資料量小、頻率低? → 長輪詢
└── 需要雙向通訊?
    ├── 需要二進位資料? → WebSocket
    └── 只需文字? → WebSocket

線上工具推薦

開發Rust Axum流式回應時,以下線上工具能大幅提升效率:


總結

Rust Axum流式回應為即時Web應用提供了高效能、記憶體安全的解決方案。6種生產模式各有適用場景:

  1. SSE:最適合單向推送場景,實現簡單,瀏覽器原生支援
  2. WebSocket:雙向通訊的首選,適合聊天和協作
  3. NDJSON:大資料流式傳輸的標準格式,逐行解析無需等待
  4. 背壓控制:流式系統的生命線,有界通道+令牌桶是標配
  5. 多客戶端廣播:broadcast通道實現Pub/Sub,Topic訂閱實現精準推送
  6. 生產部署:優雅關閉+健康檢查+連線限制三件套

記住:無界通道是定時炸彈,沒有背壓的流式系統運早OOM。選擇正確的通道型別和容量,配合Tokio的零成本非同步,你的Axum流式服務就能在生產環境中穩定執行。


相關閱讀

參考資源

本站提供瀏覽器本地工具,免註冊即可試用 →

#Rust#Axum#流式响应#SSE#WebSocket#实时推送#2026#编程语言