Rust Axum流式响应:从SSE到WebSocket的6种生产模式

编程语言

等数据全部加载完再返回,用户早关页面了

你写了个API返回10万条日志,客户端等了30秒才拿到完整JSON,内存直接飙到2GB;你想做实时通知,用轮询每秒请求一次,服务器QPS翻了10倍;你用WebSocket做单向推送,结果发现80%的连接只用来接收数据,全双工纯属浪费。2026年,Rust Axum流式响应才是正解——SSE单向推送、WebSocket双向通信、NDJSON流式传输,配合Tokio的零成本异步,内存安全且性能炸裂。

本文将从Axum流式响应核心概念出发,带你完成SSE推送→WebSocket通信→流式JSON→背压控制→多客户端广播→生产部署的6种生产模式,让实时数据推送从"勉强能用"变成"生产可靠"。


核心要点

  • Axum流式响应基于Body流和SSE两种核心抽象
  • SSE适合单向推送(通知、LLM流式输出),WebSocket适合双向交互
  • NDJSON是流式JSON的事实标准,每行一个完整JSON对象
  • 背压控制是流式系统的生命线,无界通道=定时炸弹
  • tokio::sync::broadcast实现多客户端广播,watch实现状态同步
  • 生产部署需要优雅关闭、连接超时、健康检查三件套

目录

  1. Axum流式响应核心概念
  2. Pattern 1: SSE服务端推送实现
  3. Pattern 2: WebSocket双向通信
  4. Pattern 3: 流式JSON/NDJSON响应
  5. Pattern 4: 背压与流量控制
  6. Pattern 5: 多客户端广播(Pub/Sub)
  7. Pattern 6: 生产部署与优雅关闭
  8. 5个常见坑及解决方案
  9. 10个常见报错排查
  10. 进阶优化技巧
  11. 对比分析:SSE vs WebSocket vs 长轮询
  12. 在线工具推荐
  13. 总结

Axum流式响应核心概念

概念 说明 适用场景
Sse<impl Stream> Server-Sent Events,基于HTTP的单向推送 通知、LLM流式输出、实时日志
WebSocket 全双工通信,需协议升级 聊天、协作编辑、游戏
Body Axum响应体的底层流抽象 自定义流式传输
NDJSON 换行分隔的JSON流 大数据导出、事件流
Backpressure 背压,消费者控制生产速度 防止OOM、流量整形
broadcast Tokio广播通道 多客户端同时订阅
watch Tokio状态观察通道 配置热更新、状态同步
Chunked Encoding HTTP分块传输 流式响应的底层传输机制

流式响应架构

┌─────────────────────────────────────────────────┐
│                   Client                         │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐      │
│  │ Browser  │  │  Mobile  │  │   CLI    │      │
│  └────┬─────┘  └────┬─────┘  └────┬─────┘      │
└───────┼──────────────┼──────────────┼───────────┘
        │              │              │
        ▼              ▼              ▼
┌─────────────────────────────────────────────────┐
│              Axum Server                         │
│  ┌─────────────────────────────────────────┐    │
│  │            Router                        │    │
│  │  /sse  →  SSE Handler                   │    │
│  │  /ws   →  WebSocket Handler             │    │
│  │  /stream → NDJSON Handler               │    │
│  └──────────────┬──────────────────────────┘    │
│                 │                                │
│  ┌──────────────▼──────────────────────────┐    │
│  │         Tokio Runtime                    │    │
│  │  ┌────────┐ ┌────────┐ ┌────────┐      │    │
│  │  │  mpsc  │ │broadcast│ │  watch │      │    │
│  │  └────────┘ └────────┘ └────────┘      │    │
│  └─────────────────────────────────────────┘    │
└─────────────────────────────────────────────────┘

Pattern 1: SSE服务端推送实现

SSE是Axum流式响应最常用的模式,基于HTTP长连接实现单向推送,浏览器原生EventSource API自动重连。

基础SSE实现

[dependencies]
axum = { version = "0.8", features = ["sse"] }
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
futures = "0.3"
use axum::{
    Router,
    response::sse::{Event, Sse},
    routing::get,
};
use futures::stream::{self, Stream};
use std::convert::Infallible;
use tokio_stream::StreamExt as _;
use std::time::Duration;

async fn sse_handler() -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let stream = stream::iter(0..100).map(|i| {
        Ok(Event::default()
            .data(format!("Message {}", i))
            .event("update")
            .id(i.to_string()))
    });

    Sse::new(stream).keep_alive(
        axum::response::sse::KeepAlive::new()
            .interval(Duration::from_secs(5))
            .text("ping"),
    )
}

#[tokio::main]
async fn main() {
    let app = Router::new().route("/sse", get(sse_handler));
    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

带JSON数据的SSE

use axum::{
    Router,
    extract::State,
    response::sse::{Event, Sse},
    routing::get,
};
use futures::stream::Stream;
use serde::Serialize;
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;
use tokio_stream::StreamExt as _;

#[derive(Serialize, Clone)]
struct Notification {
    id: u64,
    title: String,
    body: String,
    timestamp: i64,
}

struct AppState {
    notification_tx: tokio::sync::broadcast::Sender<Notification>,
}

async fn sse_notifications(
    State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let mut rx = state.notification_tx.subscribe();

    let stream = async_stream::stream! {
        loop {
            match rx.recv().await {
                Ok(notification) => {
                    let data = serde_json::to_string(&notification).unwrap();
                    yield Ok(Event::default()
                        .event("notification")
                        .data(data)
                        .id(notification.id.to_string()));
                }
                Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
                    let data = serde_json::json!({"missed": n}).to_string();
                    yield Ok(Event::default()
                        .event("lagged")
                        .data(data));
                }
                Err(tokio::sync::broadcast::error::RecvError::Closed) => {
                    break;
                }
            }
        }
    };

    Sse::new(stream).keep_alive(
        axum::response::sse::KeepAlive::new()
            .interval(Duration::from_secs(15))
            .text("keepalive"),
    )
}

#[tokio::main]
async fn main() {
    let (tx, _) = tokio::sync::broadcast::channel::<Notification>(100);
    let state = Arc::new(AppState { notification_tx: tx });

    let app = Router::new()
        .route("/sse/notifications", get(sse_notifications))
        .with_state(state);

    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

SSE客户端断开检测

use axum::{
    extract::Request,
    response::sse::{Event, Sse},
};
use futures::stream::Stream;
use std::convert::Infallible;
use tokio_stream::StreamExt as _;

async fn sse_with_disconnect(request: Request) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let cancel_token = tokio_util::sync::CancellationToken::new();
    let cancel_clone = cancel_token.clone();

    tokio::spawn(async move {
        tokio::select! {
            _ = cancel_clone.cancelled() => {
                tracing::info!("Client disconnected, stopping SSE stream");
            }
            _ = tokio::time::sleep(Duration::from_secs(3600)) => {
                tracing::info!("SSE stream timeout");
            }
        }
    });

    let stream = async_stream::stream! {
        let mut interval = tokio::time::interval(Duration::from_secs(1));
        loop {
            tokio::select! {
                _ = interval.tick() => {
                    let data = serde_json::json!({
                        "time": chrono::Utc::now().to_rfc3339()
                    });
                    yield Ok(Event::default().data(data.to_string()));
                }
                _ = cancel_token.cancelled() => {
                    break;
                }
            }
        }
    };

    Sse::new(stream).keep_alive(
        axum::response::sse::KeepAlive::new()
            .interval(Duration::from_secs(10)),
    )
}

Pattern 2: WebSocket双向通信

WebSocket提供全双工通信能力,适合聊天、协作编辑、实时游戏等双向交互场景。

基础WebSocket实现

[dependencies]
axum = { version = "0.8", features = ["ws"] }
tokio = { version = "1", features = ["full"] }
futures = "0.3"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
use axum::{
    Router,
    extract::ws::{Message, WebSocket, WebSocketUpgrade},
    response::IntoResponse,
    routing::get,
};
use futures::{SinkExt, StreamExt};

async fn ws_handler(ws: WebSocketUpgrade) -> impl IntoResponse {
    ws.on_upgrade(handle_socket)
}

async fn handle_socket(mut socket: WebSocket) {
    let (mut sender, mut receiver) = socket.split();

    let send_task = tokio::spawn(async move {
        let mut interval = tokio::time::interval(Duration::from_secs(5));
        loop {
            interval.tick().await;
            let msg = serde_json::json!({
                "type": "heartbeat",
                "timestamp": chrono::Utc::now().to_rfc3339()
            });
            if sender.send(Message::Text(msg.to_string().into())).await.is_err() {
                break;
            }
        }
    });

    let recv_task = tokio::spawn(async move {
        while let Some(Ok(msg)) = receiver.next().await {
            match msg {
                Message::Text(text) => {
                    tracing::info!("Received: {}", text);
                }
                Message::Close(_) => {
                    tracing::info!("Client closed connection");
                    break;
                }
                _ => {}
            }
        }
    });

    tokio::select! {
        _ = send_task => {},
        _ = recv_task => {},
    }
}

#[tokio::main]
async fn main() {
    let app = Router::new().route("/ws", get(ws_handler));
    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

带认证和状态管理的WebSocket

use axum::{
    Router,
    extract::{State, WebSocketUpgrade, ws::WebSocket},
    response::IntoResponse,
    routing::get,
};
use futures::{SinkExt, StreamExt};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};

#[derive(Debug, Clone)]
struct ChatMessage {
    user: String,
    content: String,
    timestamp: i64,
}

struct AppState {
    rooms: RwLock<HashMap<String, Vec<mpsc::UnboundedSender<ChatMessage>>>>,
}

async fn ws_chat(
    ws: WebSocketUpgrade,
    State(state): State<Arc<AppState>>,
) -> impl IntoResponse {
    ws.on_upgrade(move |socket| handle_chat(socket, state))
}

async fn handle_chat(socket: WebSocket, state: Arc<AppState>) {
    let (mut ws_sender, mut ws_receiver) = socket.split();
    let (tx, mut rx) = mpsc::unbounded_channel::<ChatMessage>();

    let room_id = "general".to_string();
    {
        let mut rooms = state.rooms.write().await;
        rooms.entry(room_id.clone()).or_default().push(tx);
    }

    let send_task = tokio::spawn(async move {
        while let Some(msg) = rx.recv().await {
            let text = serde_json::to_string(&msg).unwrap();
            if ws_sender.send(axum::extract::ws::Message::Text(text.into())).await.is_err() {
                break;
            }
        }
    });

    let recv_task = tokio::spawn(async move {
        while let Some(Ok(msg)) = ws_receiver.next().await {
            if let axum::extract::ws::Message::Text(text) = msg {
                let chat_msg = ChatMessage {
                    user: "anonymous".to_string(),
                    content: text.to_string(),
                    timestamp: chrono::Utc::now().timestamp(),
                };
                let rooms = state.rooms.read().await;
                if let Some(senders) = rooms.get(&room_id) {
                    for sender in senders {
                        let _ = sender.send(chat_msg.clone());
                    }
                }
            }
        }
    });

    tokio::select! {
        _ = send_task => {},
        _ = recv_task => {},
    }

    let mut rooms = state.rooms.write().await;
    if let Some(senders) = rooms.get_mut(&room_id) {
        senders.retain(|s| !s.is_closed());
    }
}

Pattern 3: 流式JSON/NDJSON响应

NDJSON(Newline Delimited JSON)是流式JSON传输的事实标准,每行一个完整的JSON对象,客户端可以逐行解析,无需等待完整响应。

NDJSON流式响应

use axum::{
    Router,
    extract::{Path, Query},
    response::IntoResponse,
    routing::get,
};
use serde::{Deserialize, Serialize};
use tokio_stream::StreamExt;

#[derive(Serialize, Deserialize)]
struct LogEntry {
    id: u64,
    level: String,
    message: String,
    timestamp: String,
    service: String,
}

#[derive(Deserialize)]
struct LogQuery {
    service: Option<String>,
    level: Option<String>,
    limit: Option<usize>,
}

async fn stream_logs(
    Query(params): Query<LogQuery>,
) -> impl IntoResponse {
    let limit = params.limit.unwrap_or(1000);
    let service_filter = params.service.clone();

    let stream = async_stream::stream! {
        let mut count = 0u64;
        let mut interval = tokio::time::interval(Duration::from_millis(100));

        while count < limit as u64 {
            interval.tick().await;

            let entry = LogEntry {
                id: count,
                level: if count % 10 == 0 { "ERROR".to_string() } else { "INFO".to_string() },
                message: format!("Log entry #{}", count),
                timestamp: chrono::Utc::now().to_rfc3339(),
                service: service_filter.clone().unwrap_or_else(|| "api".to_string()),
            };

            let json_line = serde_json::to_string(&entry).unwrap();
            yield Ok::<_, std::convert::Infallible>(format!("{}\n", json_line));
            count += 1;
        }
    };

    (
        [
            ("content-type", "application/x-ndjson"),
            ("cache-control", "no-cache"),
            ("x-stream-format", "ndjson"),
        ],
        axum::body::Body::from_stream(stream),
    )
}

#[tokio::main]
async fn main() {
    let app = Router::new().route("/logs/stream", get(stream_logs));
    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

数据库查询流式输出

use axum::{
    Router,
    extract::State,
    response::IntoResponse,
    routing::get,
};
use sqlx::postgres::PgPool;
use std::sync::Arc;

struct AppState {
    pool: PgPool,
}

async fn stream_query_results(
    State(state): State<Arc<AppState>>,
) -> impl IntoResponse {
    let pool = state.pool.clone();

    let stream = async_stream::stream! {
        let mut stream = sqlx::query_as::<_, (i64, String, String)>(
            "SELECT id, name, email FROM users ORDER BY id"
        )
        .fetch(&pool);

        while let Some(row) = stream.next().await {
            match row {
                Ok((id, name, email)) => {
                    let json = serde_json::json!({
                        "id": id,
                        "name": name,
                        "email": email,
                    });
                    yield Ok::<_, std::convert::Infallible>(
                        format!("{}\n", json)
                    );
                }
                Err(e) => {
                    tracing::error!("Query error: {}", e);
                    break;
                }
            }
        }
    };

    (
        [("content-type", "application/x-ndjson")],
        axum::body::Body::from_stream(stream),
    )
}

Pattern 4: 背压与流量控制

背压是流式系统的生命线。当消费者处理速度慢于生产速度时,没有背压控制会导致内存持续增长直到OOM。

背压控制架构

┌──────────┐     ┌──────────┐     ┌──────────┐
│ Producer  │────▶│  Buffer  │────▶│ Consumer │
│ (fast)    │     │ (bounded)│     │ (slow)   │
└──────────┘     └──────────┘     └──────────┘
     │                │                  │
     │   backpressure │                  │
     │◀───────────────┤                  │
     │  (wait when    │                  │
     │   full)        │                  │

有界通道实现背压

use axum::{
    Router,
    extract::State,
    response::sse::{Event, Sse},
    routing::get,
};
use futures::stream::Stream;
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;

struct AppState {
    event_tx: mpsc::Sender<String>,
}

async fn sse_with_backpressure(
    State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let (tx, mut rx) = mpsc::channel::<String>(32);

    let producer_tx = state.event_tx.clone();
    tokio::spawn(async move {
        let mut interval = tokio::time::interval(Duration::from_millis(10));
        for i in 0..10000 {
            interval.tick().await;
            let msg = format!("Event {}", i);
            if tx.send(msg).await.is_err() {
                tracing::warn!("Consumer dropped, stopping producer");
                break;
            }
        }
    });

    let stream = async_stream::stream! {
        while let Some(msg) = rx.recv().await {
            yield Ok(Event::default().data(msg));
        }
    };

    Sse::new(stream).keep_alive(
        axum::response::sse::KeepAlive::new()
            .interval(Duration::from_secs(15)),
    )
}

流量整形(Rate Limiting Stream)

use axum::{
    Router,
    response::IntoResponse,
    routing::get,
};
use tokio::time::{self, Duration};
use tokio_stream::StreamExt;

#[derive(Clone)]
struct ThrottleConfig {
    max_per_second: u64,
    burst_size: usize,
}

async fn throttled_stream() -> impl IntoResponse {
    let config = ThrottleConfig {
        max_per_second: 100,
        burst_size: 50,
    };

    let stream = async_stream::stream! {
        let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(config.burst_size);
        let mut tokens = config.burst_size as f64;
        let max_tokens = config.burst_size as f64;
        let refill_rate = config.max_per_second as f64 / 1000.0;
        let mut last_refill = tokio::time::Instant::now();

        tokio::spawn(async move {
            let mut count = 0u64;
            loop {
                tokio::time::sleep(Duration::from_millis(10)).await;
                let msg = format!("Item {}", count);
                if tx.send(msg).await.is_err() {
                    break;
                }
                count += 1;
            }
        });

        while let Some(item) = rx.recv().await {
            let now = tokio::time::Instant::now();
            let elapsed = now.duration_since(last_refill).as_millis() as f64;
            tokens = (tokens + elapsed * refill_rate).min(max_tokens);
            last_refill = now;

            if tokens < 1.0 {
                let wait_ms = ((1.0 - tokens) / refill_rate) as u64;
                tokio::time::sleep(Duration::from_millis(wait_ms)).await;
                tokens = 0.0;
                last_refill = tokio::time::Instant::now();
            } else {
                tokens -= 1.0;
            }

            yield Ok::<_, std::convert::Infallible>(format!("{}\n", item));
        }
    };

    (
        [("content-type", "application/x-ndjson")],
        axum::body::Body::from_stream(stream),
    )
}

Pattern 5: 多客户端广播(Pub/Sub)

多客户端广播是实时推送的核心场景:一个事件源产生消息,多个客户端同时接收。

基于broadcast通道的广播

use axum::{
    Router,
    extract::State,
    response::sse::{Event, Sse},
    routing::{get, post},
};
use futures::stream::Stream;
use serde::{Deserialize, Serialize};
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;

#[derive(Serialize, Deserialize, Clone, Debug)]
struct BroadcastEvent {
    event_type: String,
    payload: serde_json::Value,
    timestamp: i64,
}

struct AppState {
    broadcast_tx: tokio::sync::broadcast::Sender<BroadcastEvent>,
    client_count: Arc<std::sync::atomic::AtomicUsize>,
}

async fn subscribe(
    State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let mut rx = state.broadcast_tx.subscribe();
    let client_id = state.client_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);

    tracing::info!("Client {} subscribed", client_id);

    let stream = async_stream::stream! {
        loop {
            match rx.recv().await {
                Ok(event) => {
                    let data = serde_json::to_string(&event).unwrap();
                    yield Ok(Event::default()
                        .event(&event.event_type)
                        .data(data)
                        .id(event.timestamp.to_string()));
                }
                Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
                    tracing::warn!("Client {} lagged {} messages", client_id, n);
                    yield Ok(Event::default()
                        .event("lagged")
                        .data(format!("Missed {} messages", n)));
                }
                Err(tokio::sync::broadcast::error::RecvError::Closed) => {
                    break;
                }
            }
        }
    };

    Sse::new(stream).keep_alive(
        axum::response::sse::KeepAlive::new()
            .interval(Duration::from_secs(10)),
    )
}

async fn publish(
    State(state): State<Arc<AppState>>,
    axum::Json(event): axum::Json<BroadcastEvent>,
) -> impl IntoResponse {
    let receiver_count = state.broadcast_tx.receiver_count();
    match state.broadcast_tx.send(event) {
        Ok(_) => axum::Json(serde_json::json!({
            "status": "published",
            "receivers": receiver_count,
        })),
        Err(_) => axum::Json(serde_json::json!({
            "status": "no_receivers",
        })),
    }
}

#[tokio::main]
async fn main() {
    let (broadcast_tx, _) = tokio::sync::broadcast::channel::<BroadcastEvent>(256);
    let state = Arc::new(AppState {
        broadcast_tx,
        client_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
    });

    let app = Router::new()
        .route("/subscribe", get(subscribe))
        .route("/publish", post(publish))
        .with_state(state);

    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

基于Topic的订阅

use axum::{
    Router,
    extract::{Path, State},
    response::sse::{Event, Sse},
    routing::{get, post},
};
use futures::stream::Stream;
use std::collections::HashMap;
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, RwLock};

#[derive(Clone, Serialize, Debug)]
struct TopicEvent {
    topic: String,
    data: serde_json::Value,
}

struct AppState {
    topics: RwLock<HashMap<String, broadcast::Sender<TopicEvent>>>,
}

impl AppState {
    fn new() -> Self {
        Self {
            topics: RwLock::new(HashMap::new()),
        }
    }

    async fn get_or_create_topic(&self, topic: &str) -> broadcast::Sender<TopicEvent> {
        let mut topics = self.topics.write().await;
        topics
            .entry(topic.to_string())
            .or_insert_with(|| {
                let (tx, _) = broadcast::channel::<TopicEvent>(256);
                tx
            })
            .clone()
    }
}

async fn subscribe_topic(
    Path(topic): Path<String>,
    State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let tx = state.get_or_create_topic(&topic).await;
    let mut rx = tx.subscribe();

    let stream = async_stream::stream! {
        loop {
            match rx.recv().await {
                Ok(event) => {
                    let data = serde_json::to_string(&event).unwrap();
                    yield Ok(Event::default()
                        .event(&event.topic)
                        .data(data));
                }
                Err(broadcast::error::RecvError::Lagged(n)) => {
                    yield Ok(Event::default()
                        .event("lagged")
                        .data(format!("Missed {}", n)));
                }
                Err(broadcast::error::RecvError::Closed) => break,
            }
        }
    };

    Sse::new(stream).keep_alive(
        axum::response::sse::KeepAlive::new()
            .interval(Duration::from_secs(15)),
    )
}

async fn publish_topic(
    Path(topic): Path<String>,
    State(state): State<Arc<AppState>>,
    axum::Json(data): axum::Json<serde_json::Value>,
) -> impl IntoResponse {
    let tx = state.get_or_create_topic(&topic).await;
    let event = TopicEvent {
        topic: topic.clone(),
        data,
    };
    let receivers = tx.receiver_count();
    let _ = tx.send(event);
    axum::Json(serde_json::json!({"delivered_to": receivers}))
}

Pattern 6: 生产部署与优雅关闭

生产环境的流式服务需要处理连接管理、优雅关闭和健康检查。

完整生产级服务

use axum::{
    Router,
    extract::State,
    response::sse::{Event, Sse},
    routing::{get, post},
};
use futures::stream::Stream;
use serde::Serialize;
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, RwLock};
use tokio_util::sync::CancellationToken;

#[derive(Serialize, Clone)]
struct ServerEvent {
    event_type: String,
    data: serde_json::Value,
}

struct AppState {
    broadcast_tx: broadcast::Sender<ServerEvent>,
    cancel_token: CancellationToken,
    connections: RwLock<Vec<String>>,
}

async fn sse_production(
    State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let mut rx = state.broadcast_tx.subscribe();
    let cancel = state.cancel_token.clone();
    let conn_id = uuid::Uuid::new_v4().to_string();

    {
        let mut conns = state.connections.write().await;
        conns.push(conn_id.clone());
    }

    let conn_id_clone = conn_id.clone();
    let connections = state.connections.clone();

    let stream = async_stream::stream! {
        loop {
            tokio::select! {
                event = rx.recv() => {
                    match event {
                        Ok(ev) => {
                            let data = serde_json::to_string(&ev).unwrap();
                            yield Ok(Event::default()
                                .event(&ev.event_type)
                                .data(data));
                        }
                        Err(broadcast::error::RecvError::Lagged(n)) => {
                            yield Ok(Event::default()
                                .event("lagged")
                                .data(format!("Missed {}", n)));
                        }
                        Err(broadcast::error::RecvError::Closed) => break,
                    }
                }
                _ = cancel.cancelled() => {
                    yield Ok(Event::default()
                        .event("shutdown")
                        .data("Server is shutting down"));
                    break;
                }
            }
        }
    };

    let conns = connections.clone();
    let id = conn_id_clone.clone();
    tokio::spawn(async move {
        cancel.cancelled().await;
        let mut conns = conns.write().await;
        conns.retain(|c| c != &id);
    });

    Sse::new(stream).keep_alive(
        axum::response::sse::KeepAlive::new()
            .interval(Duration::from_secs(10)),
    )
}

async fn health_check(State(state): State<Arc<AppState>>) -> impl IntoResponse {
    let conn_count = state.connections.read().await.len();
    let is_shutting_down = state.cancel_token.is_cancelled();
    axum::Json(serde_json::json!({
        "status": if is_shutting_down { "shutting_down" } else { "healthy" },
        "active_connections": conn_count,
        "uptime_secs": 0,
    }))
}

#[tokio::main]
async fn main() {
    let (broadcast_tx, _) = broadcast::channel::<ServerEvent>(512);
    let cancel_token = CancellationToken::new();

    let state = Arc::new(AppState {
        broadcast_tx,
        cancel_token: cancel_token.clone(),
        connections: RwLock::new(Vec::new()),
    });

    let app = Router::new()
        .route("/events", get(sse_production))
        .route("/health", get(health_check))
        .with_state(state.clone());

    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();

    tokio::spawn(async move {
        tokio::signal::ctrl_c().await.unwrap();
        tracing::info!("Shutdown signal received, draining connections...");
        cancel_token.cancel();
        tokio::time::sleep(Duration::from_secs(10)).await;
        tracing::info!("Graceful shutdown complete");
        std::process::exit(0);
    });

    axum::serve(listener, app)
        .with_graceful_shutdown(shutdown_signal())
        .await
        .unwrap();
}

async fn shutdown_signal() {
    tokio::signal::ctrl_c().await.unwrap();
}

Docker部署配置

FROM rust:1.85 as builder
WORKDIR /app
COPY Cargo.toml Cargo.lock ./
COPY src ./src
RUN cargo build --release

FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/*
COPY --from=builder /app/target/release/streaming-server /usr/local/bin/
EXPOSE 3000
CMD ["streaming-server"]
# docker-compose.yml
services:
  streaming-server:
    build: .
    ports:
      - "3000:3000"
    environment:
      - RUST_LOG=info
      - STREAM_BUFFER_SIZE=256
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:3000/health"]
      interval: 10s
      timeout: 5s
      retries: 3
    deploy:
      resources:
        limits:
          memory: 512M

5个常见坑及解决方案

坑1:SSE流不关闭导致连接泄漏

现象:客户端断开后,服务端stream仍在运行,内存持续增长。

原因:Axum的SSE stream没有自动感知客户端断开。

解决方案

use axum::extract::Request;

async fn sse_safe(request: Request) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let cancel = tokio_util::sync::CancellationToken::new();
    let cancel_clone = cancel.clone();

    let stream = async_stream::stream! {
        let mut interval = tokio::time::interval(Duration::from_secs(1));
        loop {
            tokio::select! {
                _ = interval.tick() => {
                    yield Ok(Event::default().data("tick"));
                }
                _ = cancel_clone.cancelled() => {
                    tracing::info!("Stream cancelled, client disconnected");
                    break;
                }
            }
        }
    };

    tokio::spawn(async move {
        tokio::time::sleep(Duration::from_secs(3600)).await;
        cancel.cancel();
    });

    Sse::new(stream)
}

坑2:broadcast通道Lagged导致消息丢失

现象RecvError::Lagged(n) 频繁出现,客户端丢失消息。

原因:broadcast通道容量不足,慢消费者跟不上快生产者。

解决方案:增大通道容量 + 记录丢失消息数 + 客户端重连时补发。

let (tx, _) = tokio::sync::broadcast::channel::<Event>(1024);

match rx.recv().await {
    Err(tokio::sync::broadcast::error::RecvError::Lagged(missed)) => {
        tracing::warn!("Lagged {}, requesting catch-up", missed);
        yield Ok(Event::default()
            .event("lagged")
            .data(serde_json::json!({"missed": missed}).to_string()));
    }
    _ => {}
}

坑3:WebSocket消息过大被拒

现象:发送大消息时WebSocket返回错误或断开。

原因:Axum默认WebSocket消息大小限制为64KB。

解决方案

use axum::extract::ws::WebSocketUpgrade;

async fn ws_large_messages(ws: WebSocketUpgrade) -> impl IntoResponse {
    ws.max_frame_size(1024 * 1024)
      .max_message_size(4 * 1024 * 1024)
      .on_upgrade(handle_socket)
}

坑4:流式响应缺少Content-Type

现象:客户端收到流式数据但无法正确解析。

原因:忘记设置正确的Content-Type头。

解决方案

async fn ndjson_response() -> impl IntoResponse {
    let stream = create_stream();
    (
        [
            ("content-type", "application/x-ndjson; charset=utf-8"),
            ("cache-control", "no-cache"),
            ("connection", "keep-alive"),
        ],
        axum::body::Body::from_stream(stream),
    )
}

坑5:优雅关闭时流被强制中断

现象:服务关闭时客户端收到连接重置,而非正常结束。

原因:直接kill进程,没有等待活跃流完成。

解决方案:使用CancellationToken通知所有流优雅结束(见Pattern 6)。


10个常见报错排查

# 报错信息 原因 解决方案
1 the trait bound impl Stream: IntoResponse is not satisfied Stream类型不匹配 确保Stream的Item类型为Result<Event, Infallible>
2 WebSocket upgrade failed: Connection header is not upgrade 客户端未发送Upgrade头 确认请求头包含Connection: UpgradeUpgrade: websocket
3 channel send error: channel closed 接收端已关闭 检查接收端是否提前drop,使用send().await.is_err()处理
4 RecvError::Lagged(n) 消费速度跟不上生产速度 增大broadcast容量,或使用有界mpsc实现背压
5 body write aborted: connection closed 客户端断开连接 添加客户端断开检测,及时清理资源
6 SSE stream timeout Keep-Alive超时 调整KeepAlive::new().interval()时间
7 WebSocket message too large 消息超过默认64KB限制 使用ws.max_frame_size()ws.max_message_size()
8 task hung detected 异步任务阻塞 检查是否有同步阻塞操作,使用tokio::task::spawn_blocking
9 too many open files 连接数超过系统限制 调整ulimit -n,或限制最大连接数
10 connection reset by peer 客户端异常关闭 在WebSocket handler中处理Message::Close和错误

进阶优化技巧

1. 连接数限制

use axum::middleware;
use std::sync::atomic::{AtomicUsize, Ordering};

struct ConnectionLimiter {
    current: AtomicUsize,
    max: usize,
}

async fn limit_connections(
    State(limiter): State<Arc<ConnectionLimiter>>,
    request: axum::extract::Request,
    next: axum::middleware::Next,
) -> impl IntoResponse {
    let current = limiter.current.fetch_add(1, Ordering::Relaxed);
    if current >= limiter.max {
        limiter.current.fetch_sub(1, Ordering::Relaxed);
        return axum::http::StatusCode::SERVICE_UNAVAILABLE.into_response();
    }

    let response = next.run(request).await;
    limiter.current.fetch_sub(1, Ordering::Relaxed);
    response
}

2. 流式压缩

use axum::response::IntoResponse;
use tokio_util::io::StreamReader;

async fn compressed_stream() -> impl IntoResponse {
    let stream = create_data_stream();
    let reader = StreamReader::new(stream);

    let mut encoder = async_compression::tokio::write::GzipEncoder::new(tokio::io::BufWriter::new(reader));
    let compressed_stream = tokio_util::io::ReaderStream::new(encoder);

    (
        [
            ("content-type", "application/x-ndjson"),
            ("content-encoding", "gzip"),
        ],
        axum::body::Body::from_stream(compressed_stream),
    )
}

3. 流式响应监控

use prometheus::{IntCounter, IntGauge, Registry};

lazy_static::lazy_static! {
    static ref REGISTRY: Registry = Registry::new();
    static ref SSE_CONNECTIONS: IntGauge = IntGauge::new(
        "sse_connections_active",
        "Active SSE connections"
    ).unwrap();
    static ref SSE_EVENTS_SENT: IntCounter = IntCounter::new(
        "sse_events_sent_total",
        "Total SSE events sent"
    ).unwrap();
    static ref WS_CONNECTIONS: IntGauge = IntGauge::new(
        "ws_connections_active",
        "Active WebSocket connections"
    ).unwrap();
}

async fn sse_monitored(
    State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    SSE_CONNECTIONS.inc();
    let mut rx = state.broadcast_tx.subscribe();

    let stream = async_stream::stream! {
        loop {
            match rx.recv().await {
                Ok(event) => {
                    SSE_EVENTS_SENT.inc();
                    let data = serde_json::to_string(&event).unwrap();
                    yield Ok(Event::default().data(data));
                }
                Err(tokio::sync::broadcast::error::RecvError::Closed) => {
                    break;
                }
                Err(_) => continue,
            }
        }
    };

    let stream = stream.chain(futures::stream::once(async {
        SSE_CONNECTIONS.dec();
        Ok(Event::default())
    }));

    Sse::new(stream)
}

对比分析:SSE vs WebSocket vs 长轮询

维度 SSE WebSocket 长轮询
通信方向 单向(服务端→客户端) 双向 单向(模拟)
协议 HTTP WS/WSS HTTP
自动重连 浏览器原生支持 需手动实现 天然支持
浏览器API EventSource WebSocket API fetch/XHR
数据格式 文本 文本+二进制 任意
代理/CDN 友好 需特殊配置 友好
连接开销 高(频繁建连)
适用场景 通知、LLM流式、日志 聊天、协作、游戏 兼容性要求高
内存占用
实现复杂度

选型决策树

需要实时推送?
├── 只需服务端→客户端?
│   ├── 需要自动重连? → SSE
│   └── 数据量小、频率低? → 长轮询
└── 需要双向通信?
    ├── 需要二进制数据? → WebSocket
    └── 只需文本? → WebSocket

在线工具推荐

开发Rust Axum流式响应时,以下在线工具能大幅提升效率:


总结

Rust Axum流式响应为实时Web应用提供了高性能、内存安全的解决方案。6种生产模式各有适用场景:

  1. SSE:最适合单向推送场景,实现简单,浏览器原生支持
  2. WebSocket:双向通信的首选,适合聊天和协作
  3. NDJSON:大数据流式传输的标准格式,逐行解析无需等待
  4. 背压控制:流式系统的生命线,有界通道+令牌桶是标配
  5. 多客户端广播:broadcast通道实现Pub/Sub,Topic订阅实现精准推送
  6. 生产部署:优雅关闭+健康检查+连接限制三件套

记住:无界通道是定时炸弹,没有背压的流式系统迟早OOM。选择正确的通道类型和容量,配合Tokio的零成本异步,你的Axum流式服务就能在生产环境中稳定运行。


相关阅读

参考资源

本站提供浏览器本地工具,免注册即可试用 →

#Rust#Axum#流式响应#SSE#WebSocket#实时推送#2026#编程语言