Rust Axum Streaming Response: 6 Production Patterns from SSE to WebSocket

编程语言

Waiting for All Data to Load Before Responding? Users Already Left

You built an API that returns 100K log entries — the client waits 30 seconds for the complete JSON, and memory spikes to 2GB; you try polling for real-time notifications, hitting the server every second, and QPS jumps 10x; you use WebSocket for one-way push, only to find 80% of connections only receive data — full duplex is a waste. In 2026, Rust Axum streaming responses are the answer — SSE for one-way push, WebSocket for bidirectional communication, NDJSON for streaming JSON, combined with Tokio's zero-cost async for memory-safe, high-performance real-time data delivery.

This article starts from Axum streaming response fundamentals and walks you through SSE push → WebSocket communication → Streaming JSON → Backpressure control → Multi-client broadcast → Production deployment — 6 production patterns to take real-time data push from "barely working" to "production reliable".


Key Takeaways

  • Axum streaming responses are built on Body streams and SSE as two core abstractions
  • SSE is ideal for one-way push (notifications, LLM streaming output), WebSocket for bidirectional interaction
  • NDJSON is the de facto standard for streaming JSON — one complete JSON object per line
  • Backpressure control is the lifeline of streaming systems; unbounded channels = ticking time bombs
  • tokio::sync::broadcast enables multi-client broadcast, watch enables state synchronization
  • Production deployment requires graceful shutdown, connection timeouts, and health checks

Table of Contents

  1. Axum Streaming Response Core Concepts
  2. Pattern 1: SSE Server-Sent Events Implementation
  3. Pattern 2: WebSocket Bidirectional Communication
  4. Pattern 3: Streaming JSON/NDJSON Response
  5. Pattern 4: Backpressure and Flow Control
  6. Pattern 5: Multi-Client Broadcast (Pub/Sub)
  7. Pattern 6: Production Deployment and Graceful Shutdown
  8. 5 Common Pitfalls and Solutions
  9. 10 Common Error Troubleshooting
  10. Advanced Optimization Techniques
  11. Comparison: SSE vs WebSocket vs Long Polling
  12. Recommended Online Tools
  13. Summary

Axum Streaming Response Core Concepts

Concept Description Use Case
Sse<impl Stream> Server-Sent Events, HTTP-based one-way push Notifications, LLM streaming, real-time logs
WebSocket Full-duplex communication, requires protocol upgrade Chat, collaborative editing, games
Body Axum's underlying response body stream abstraction Custom streaming transmission
NDJSON Newline-delimited JSON stream Large data export, event streams
Backpressure Consumer controls producer speed OOM prevention, traffic shaping
broadcast Tokio broadcast channel Multi-client simultaneous subscription
watch Tokio state observation channel Hot config reload, state sync
Chunked Encoding HTTP chunked transfer encoding Underlying mechanism for streaming responses

Streaming Response Architecture

┌─────────────────────────────────────────────────┐
│                   Client                         │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐      │
│  │ Browser  │  │  Mobile  │  │   CLI    │      │
│  └────┬─────┘  └────┬─────┘  └────┬─────┘      │
└───────┼──────────────┼──────────────┼───────────┘
        │              │              │
        ▼              ▼              ▼
┌─────────────────────────────────────────────────┐
│              Axum Server                         │
│  ┌─────────────────────────────────────────┐    │
│  │            Router                        │    │
│  │  /sse  →  SSE Handler                   │    │
│  │  /ws   →  WebSocket Handler             │    │
│  │  /stream → NDJSON Handler               │    │
│  └──────────────┬──────────────────────────┘    │
│                 │                                │
│  ┌──────────────▼──────────────────────────┐    │
│  │         Tokio Runtime                    │    │
│  │  ┌────────┐ ┌────────┐ ┌────────┐      │    │
│  │  │  mpsc  │ │broadcast│ │  watch │      │    │
│  │  └────────┘ └────────┘ └────────┘      │    │
│  └─────────────────────────────────────────┘    │
└─────────────────────────────────────────────────┘

Pattern 1: SSE Server-Sent Events Implementation

SSE is the most common pattern for Axum streaming responses, implementing one-way push over HTTP long connections with native browser EventSource API auto-reconnection.

Basic SSE Implementation

[dependencies]
axum = { version = "0.8", features = ["sse"] }
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
futures = "0.3"
use axum::{
    Router,
    response::sse::{Event, Sse},
    routing::get,
};
use futures::stream::{self, Stream};
use std::convert::Infallible;
use tokio_stream::StreamExt as _;
use std::time::Duration;

async fn sse_handler() -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let stream = stream::iter(0..100).map(|i| {
        Ok(Event::default()
            .data(format!("Message {}", i))
            .event("update")
            .id(i.to_string()))
    });

    Sse::new(stream).keep_alive(
        axum::response::sse::KeepAlive::new()
            .interval(Duration::from_secs(5))
            .text("ping"),
    )
}

#[tokio::main]
async fn main() {
    let app = Router::new().route("/sse", get(sse_handler));
    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

SSE with JSON Data

use axum::{
    Router,
    extract::State,
    response::sse::{Event, Sse},
    routing::get,
};
use futures::stream::Stream;
use serde::Serialize;
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;
use tokio_stream::StreamExt as _;

#[derive(Serialize, Clone)]
struct Notification {
    id: u64,
    title: String,
    body: String,
    timestamp: i64,
}

struct AppState {
    notification_tx: tokio::sync::broadcast::Sender<Notification>,
}

async fn sse_notifications(
    State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let mut rx = state.notification_tx.subscribe();

    let stream = async_stream::stream! {
        loop {
            match rx.recv().await {
                Ok(notification) => {
                    let data = serde_json::to_string(&notification).unwrap();
                    yield Ok(Event::default()
                        .event("notification")
                        .data(data)
                        .id(notification.id.to_string()));
                }
                Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
                    let data = serde_json::json!({"missed": n}).to_string();
                    yield Ok(Event::default()
                        .event("lagged")
                        .data(data));
                }
                Err(tokio::sync::broadcast::error::RecvError::Closed) => {
                    break;
                }
            }
        }
    };

    Sse::new(stream).keep_alive(
        axum::response::sse::KeepAlive::new()
            .interval(Duration::from_secs(15))
            .text("keepalive"),
    )
}

#[tokio::main]
async fn main() {
    let (tx, _) = tokio::sync::broadcast::channel::<Notification>(100);
    let state = Arc::new(AppState { notification_tx: tx });

    let app = Router::new()
        .route("/sse/notifications", get(sse_notifications))
        .with_state(state);

    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

SSE Client Disconnect Detection

use axum::{
    extract::Request,
    response::sse::{Event, Sse},
};
use futures::stream::Stream;
use std::convert::Infallible;
use tokio_stream::StreamExt as _;

async fn sse_with_disconnect(request: Request) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let cancel_token = tokio_util::sync::CancellationToken::new();
    let cancel_clone = cancel_token.clone();

    tokio::spawn(async move {
        tokio::select! {
            _ = cancel_clone.cancelled() => {
                tracing::info!("Client disconnected, stopping SSE stream");
            }
            _ = tokio::time::sleep(Duration::from_secs(3600)) => {
                tracing::info!("SSE stream timeout");
            }
        }
    });

    let stream = async_stream::stream! {
        let mut interval = tokio::time::interval(Duration::from_secs(1));
        loop {
            tokio::select! {
                _ = interval.tick() => {
                    let data = serde_json::json!({
                        "time": chrono::Utc::now().to_rfc3339()
                    });
                    yield Ok(Event::default().data(data.to_string()));
                }
                _ = cancel_token.cancelled() => {
                    break;
                }
            }
        }
    };

    Sse::new(stream).keep_alive(
        axum::response::sse::KeepAlive::new()
            .interval(Duration::from_secs(10)),
    )
}

Pattern 2: WebSocket Bidirectional Communication

WebSocket provides full-duplex communication, suitable for chat, collaborative editing, real-time games, and other bidirectional interaction scenarios.

Basic WebSocket Implementation

[dependencies]
axum = { version = "0.8", features = ["ws"] }
tokio = { version = "1", features = ["full"] }
futures = "0.3"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
use axum::{
    Router,
    extract::ws::{Message, WebSocket, WebSocketUpgrade},
    response::IntoResponse,
    routing::get,
};
use futures::{SinkExt, StreamExt};

async fn ws_handler(ws: WebSocketUpgrade) -> impl IntoResponse {
    ws.on_upgrade(handle_socket)
}

async fn handle_socket(mut socket: WebSocket) {
    let (mut sender, mut receiver) = socket.split();

    let send_task = tokio::spawn(async move {
        let mut interval = tokio::time::interval(Duration::from_secs(5));
        loop {
            interval.tick().await;
            let msg = serde_json::json!({
                "type": "heartbeat",
                "timestamp": chrono::Utc::now().to_rfc3339()
            });
            if sender.send(Message::Text(msg.to_string().into())).await.is_err() {
                break;
            }
        }
    });

    let recv_task = tokio::spawn(async move {
        while let Some(Ok(msg)) = receiver.next().await {
            match msg {
                Message::Text(text) => {
                    tracing::info!("Received: {}", text);
                }
                Message::Close(_) => {
                    tracing::info!("Client closed connection");
                    break;
                }
                _ => {}
            }
        }
    });

    tokio::select! {
        _ = send_task => {},
        _ = recv_task => {},
    }
}

#[tokio::main]
async fn main() {
    let app = Router::new().route("/ws", get(ws_handler));
    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

WebSocket with Authentication and State Management

use axum::{
    Router,
    extract::{State, WebSocketUpgrade, ws::WebSocket},
    response::IntoResponse,
    routing::get,
};
use futures::{SinkExt, StreamExt};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};

#[derive(Debug, Clone)]
struct ChatMessage {
    user: String,
    content: String,
    timestamp: i64,
}

struct AppState {
    rooms: RwLock<HashMap<String, Vec<mpsc::UnboundedSender<ChatMessage>>>>,
}

async fn ws_chat(
    ws: WebSocketUpgrade,
    State(state): State<Arc<AppState>>,
) -> impl IntoResponse {
    ws.on_upgrade(move |socket| handle_chat(socket, state))
}

async fn handle_chat(socket: WebSocket, state: Arc<AppState>) {
    let (mut ws_sender, mut ws_receiver) = socket.split();
    let (tx, mut rx) = mpsc::unbounded_channel::<ChatMessage>();

    let room_id = "general".to_string();
    {
        let mut rooms = state.rooms.write().await;
        rooms.entry(room_id.clone()).or_default().push(tx);
    }

    let send_task = tokio::spawn(async move {
        while let Some(msg) = rx.recv().await {
            let text = serde_json::to_string(&msg).unwrap();
            if ws_sender.send(axum::extract::ws::Message::Text(text.into())).await.is_err() {
                break;
            }
        }
    });

    let recv_task = tokio::spawn(async move {
        while let Some(Ok(msg)) = ws_receiver.next().await {
            if let axum::extract::ws::Message::Text(text) = msg {
                let chat_msg = ChatMessage {
                    user: "anonymous".to_string(),
                    content: text.to_string(),
                    timestamp: chrono::Utc::now().timestamp(),
                };
                let rooms = state.rooms.read().await;
                if let Some(senders) = rooms.get(&room_id) {
                    for sender in senders {
                        let _ = sender.send(chat_msg.clone());
                    }
                }
            }
        }
    });

    tokio::select! {
        _ = send_task => {},
        _ = recv_task => {},
    }

    let mut rooms = state.rooms.write().await;
    if let Some(senders) = rooms.get_mut(&room_id) {
        senders.retain(|s| !s.is_closed());
    }
}

Pattern 3: Streaming JSON/NDJSON Response

NDJSON (Newline Delimited JSON) is the de facto standard for streaming JSON transmission. Each line is a complete JSON object, allowing clients to parse incrementally without waiting for the full response.

NDJSON Streaming Response

use axum::{
    Router,
    extract::{Path, Query},
    response::IntoResponse,
    routing::get,
};
use serde::{Deserialize, Serialize};
use tokio_stream::StreamExt;

#[derive(Serialize, Deserialize)]
struct LogEntry {
    id: u64,
    level: String,
    message: String,
    timestamp: String,
    service: String,
}

#[derive(Deserialize)]
struct LogQuery {
    service: Option<String>,
    level: Option<String>,
    limit: Option<usize>,
}

async fn stream_logs(
    Query(params): Query<LogQuery>,
) -> impl IntoResponse {
    let limit = params.limit.unwrap_or(1000);
    let service_filter = params.service.clone();

    let stream = async_stream::stream! {
        let mut count = 0u64;
        let mut interval = tokio::time::interval(Duration::from_millis(100));

        while count < limit as u64 {
            interval.tick().await;

            let entry = LogEntry {
                id: count,
                level: if count % 10 == 0 { "ERROR".to_string() } else { "INFO".to_string() },
                message: format!("Log entry #{}", count),
                timestamp: chrono::Utc::now().to_rfc3339(),
                service: service_filter.clone().unwrap_or_else(|| "api".to_string()),
            };

            let json_line = serde_json::to_string(&entry).unwrap();
            yield Ok::<_, std::convert::Infallible>(format!("{}\n", json_line));
            count += 1;
        }
    };

    (
        [
            ("content-type", "application/x-ndjson"),
            ("cache-control", "no-cache"),
            ("x-stream-format", "ndjson"),
        ],
        axum::body::Body::from_stream(stream),
    )
}

#[tokio::main]
async fn main() {
    let app = Router::new().route("/logs/stream", get(stream_logs));
    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

Database Query Streaming Output

use axum::{
    Router,
    extract::State,
    response::IntoResponse,
    routing::get,
};
use sqlx::postgres::PgPool;
use std::sync::Arc;

struct AppState {
    pool: PgPool,
}

async fn stream_query_results(
    State(state): State<Arc<AppState>>,
) -> impl IntoResponse {
    let pool = state.pool.clone();

    let stream = async_stream::stream! {
        let mut stream = sqlx::query_as::<_, (i64, String, String)>(
            "SELECT id, name, email FROM users ORDER BY id"
        )
        .fetch(&pool);

        while let Some(row) = stream.next().await {
            match row {
                Ok((id, name, email)) => {
                    let json = serde_json::json!({
                        "id": id,
                        "name": name,
                        "email": email,
                    });
                    yield Ok::<_, std::convert::Infallible>(
                        format!("{}\n", json)
                    );
                }
                Err(e) => {
                    tracing::error!("Query error: {}", e);
                    break;
                }
            }
        }
    };

    (
        [("content-type", "application/x-ndjson")],
        axum::body::Body::from_stream(stream),
    )
}

Pattern 4: Backpressure and Flow Control

Backpressure is the lifeline of streaming systems. When consumers process data slower than producers generate it, lack of backpressure control leads to continuous memory growth until OOM.

Backpressure Control Architecture

┌──────────┐     ┌──────────┐     ┌──────────┐
│ Producer  │────▶│  Buffer  │────▶│ Consumer │
│ (fast)    │     │ (bounded)│     │ (slow)   │
└──────────┘     └──────────┘     └──────────┘
     │                │                  │
     │   backpressure │                  │
     │◀───────────────┤                  │
     │  (wait when    │                  │
     │   full)        │                  │

Bounded Channel Backpressure

use axum::{
    Router,
    extract::State,
    response::sse::{Event, Sse},
    routing::get,
};
use futures::stream::Stream;
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;

struct AppState {
    event_tx: mpsc::Sender<String>,
}

async fn sse_with_backpressure(
    State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let (tx, mut rx) = mpsc::channel::<String>(32);

    let producer_tx = state.event_tx.clone();
    tokio::spawn(async move {
        let mut interval = tokio::time::interval(Duration::from_millis(10));
        for i in 0..10000 {
            interval.tick().await;
            let msg = format!("Event {}", i);
            if tx.send(msg).await.is_err() {
                tracing::warn!("Consumer dropped, stopping producer");
                break;
            }
        }
    });

    let stream = async_stream::stream! {
        while let Some(msg) = rx.recv().await {
            yield Ok(Event::default().data(msg));
        }
    };

    Sse::new(stream).keep_alive(
        axum::response::sse::KeepAlive::new()
            .interval(Duration::from_secs(15)),
    )
}

Traffic Shaping (Rate Limiting Stream)

use axum::{
    Router,
    response::IntoResponse,
    routing::get,
};
use tokio::time::{self, Duration};
use tokio_stream::StreamExt;

#[derive(Clone)]
struct ThrottleConfig {
    max_per_second: u64,
    burst_size: usize,
}

async fn throttled_stream() -> impl IntoResponse {
    let config = ThrottleConfig {
        max_per_second: 100,
        burst_size: 50,
    };

    let stream = async_stream::stream! {
        let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(config.burst_size);
        let mut tokens = config.burst_size as f64;
        let max_tokens = config.burst_size as f64;
        let refill_rate = config.max_per_second as f64 / 1000.0;
        let mut last_refill = tokio::time::Instant::now();

        tokio::spawn(async move {
            let mut count = 0u64;
            loop {
                tokio::time::sleep(Duration::from_millis(10)).await;
                let msg = format!("Item {}", count);
                if tx.send(msg).await.is_err() {
                    break;
                }
                count += 1;
            }
        });

        while let Some(item) = rx.recv().await {
            let now = tokio::time::Instant::now();
            let elapsed = now.duration_since(last_refill).as_millis() as f64;
            tokens = (tokens + elapsed * refill_rate).min(max_tokens);
            last_refill = now;

            if tokens < 1.0 {
                let wait_ms = ((1.0 - tokens) / refill_rate) as u64;
                tokio::time::sleep(Duration::from_millis(wait_ms)).await;
                tokens = 0.0;
                last_refill = tokio::time::Instant::now();
            } else {
                tokens -= 1.0;
            }

            yield Ok::<_, std::convert::Infallible>(format!("{}\n", item));
        }
    };

    (
        [("content-type", "application/x-ndjson")],
        axum::body::Body::from_stream(stream),
    )
}

Pattern 5: Multi-Client Broadcast (Pub/Sub)

Multi-client broadcast is the core scenario for real-time push: one event source produces messages, multiple clients receive them simultaneously.

Broadcast Channel-Based Broadcasting

use axum::{
    Router,
    extract::State,
    response::sse::{Event, Sse},
    routing::{get, post},
};
use futures::stream::Stream;
use serde::{Deserialize, Serialize};
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;

#[derive(Serialize, Deserialize, Clone, Debug)]
struct BroadcastEvent {
    event_type: String,
    payload: serde_json::Value,
    timestamp: i64,
}

struct AppState {
    broadcast_tx: tokio::sync::broadcast::Sender<BroadcastEvent>,
    client_count: Arc<std::sync::atomic::AtomicUsize>,
}

async fn subscribe(
    State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let mut rx = state.broadcast_tx.subscribe();
    let client_id = state.client_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);

    tracing::info!("Client {} subscribed", client_id);

    let stream = async_stream::stream! {
        loop {
            match rx.recv().await {
                Ok(event) => {
                    let data = serde_json::to_string(&event).unwrap();
                    yield Ok(Event::default()
                        .event(&event.event_type)
                        .data(data)
                        .id(event.timestamp.to_string()));
                }
                Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
                    tracing::warn!("Client {} lagged {} messages", client_id, n);
                    yield Ok(Event::default()
                        .event("lagged")
                        .data(format!("Missed {} messages", n)));
                }
                Err(tokio::sync::broadcast::error::RecvError::Closed) => {
                    break;
                }
            }
        }
    };

    Sse::new(stream).keep_alive(
        axum::response::sse::KeepAlive::new()
            .interval(Duration::from_secs(10)),
    )
}

async fn publish(
    State(state): State<Arc<AppState>>,
    axum::Json(event): axum::Json<BroadcastEvent>,
) -> impl IntoResponse {
    let receiver_count = state.broadcast_tx.receiver_count();
    match state.broadcast_tx.send(event) {
        Ok(_) => axum::Json(serde_json::json!({
            "status": "published",
            "receivers": receiver_count,
        })),
        Err(_) => axum::Json(serde_json::json!({
            "status": "no_receivers",
        })),
    }
}

#[tokio::main]
async fn main() {
    let (broadcast_tx, _) = tokio::sync::broadcast::channel::<BroadcastEvent>(256);
    let state = Arc::new(AppState {
        broadcast_tx,
        client_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
    });

    let app = Router::new()
        .route("/subscribe", get(subscribe))
        .route("/publish", post(publish))
        .with_state(state);

    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

Topic-Based Subscription

use axum::{
    Router,
    extract::{Path, State},
    response::sse::{Event, Sse},
    routing::{get, post},
};
use futures::stream::Stream;
use std::collections::HashMap;
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, RwLock};

#[derive(Clone, Serialize, Debug)]
struct TopicEvent {
    topic: String,
    data: serde_json::Value,
}

struct AppState {
    topics: RwLock<HashMap<String, broadcast::Sender<TopicEvent>>>,
}

impl AppState {
    fn new() -> Self {
        Self {
            topics: RwLock::new(HashMap::new()),
        }
    }

    async fn get_or_create_topic(&self, topic: &str) -> broadcast::Sender<TopicEvent> {
        let mut topics = self.topics.write().await;
        topics
            .entry(topic.to_string())
            .or_insert_with(|| {
                let (tx, _) = broadcast::channel::<TopicEvent>(256);
                tx
            })
            .clone()
    }
}

async fn subscribe_topic(
    Path(topic): Path<String>,
    State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let tx = state.get_or_create_topic(&topic).await;
    let mut rx = tx.subscribe();

    let stream = async_stream::stream! {
        loop {
            match rx.recv().await {
                Ok(event) => {
                    let data = serde_json::to_string(&event).unwrap();
                    yield Ok(Event::default()
                        .event(&event.topic)
                        .data(data));
                }
                Err(broadcast::error::RecvError::Lagged(n)) => {
                    yield Ok(Event::default()
                        .event("lagged")
                        .data(format!("Missed {}", n)));
                }
                Err(broadcast::error::RecvError::Closed) => break,
            }
        }
    };

    Sse::new(stream).keep_alive(
        axum::response::sse::KeepAlive::new()
            .interval(Duration::from_secs(15)),
    )
}

async fn publish_topic(
    Path(topic): Path<String>,
    State(state): State<Arc<AppState>>,
    axum::Json(data): axum::Json<serde_json::Value>,
) -> impl IntoResponse {
    let tx = state.get_or_create_topic(&topic).await;
    let event = TopicEvent {
        topic: topic.clone(),
        data,
    };
    let receivers = tx.receiver_count();
    let _ = tx.send(event);
    axum::Json(serde_json::json!({"delivered_to": receivers}))
}

Pattern 6: Production Deployment and Graceful Shutdown

Production streaming services need connection management, graceful shutdown, and health checks.

Complete Production-Grade Service

use axum::{
    Router,
    extract::State,
    response::sse::{Event, Sse},
    routing::{get, post},
};
use futures::stream::Stream;
use serde::Serialize;
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, RwLock};
use tokio_util::sync::CancellationToken;

#[derive(Serialize, Clone)]
struct ServerEvent {
    event_type: String,
    data: serde_json::Value,
}

struct AppState {
    broadcast_tx: broadcast::Sender<ServerEvent>,
    cancel_token: CancellationToken,
    connections: RwLock<Vec<String>>,
}

async fn sse_production(
    State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let mut rx = state.broadcast_tx.subscribe();
    let cancel = state.cancel_token.clone();
    let conn_id = uuid::Uuid::new_v4().to_string();

    {
        let mut conns = state.connections.write().await;
        conns.push(conn_id.clone());
    }

    let conn_id_clone = conn_id.clone();
    let connections = state.connections.clone();

    let stream = async_stream::stream! {
        loop {
            tokio::select! {
                event = rx.recv() => {
                    match event {
                        Ok(ev) => {
                            let data = serde_json::to_string(&ev).unwrap();
                            yield Ok(Event::default()
                                .event(&ev.event_type)
                                .data(data));
                        }
                        Err(broadcast::error::RecvError::Lagged(n)) => {
                            yield Ok(Event::default()
                                .event("lagged")
                                .data(format!("Missed {}", n)));
                        }
                        Err(broadcast::error::RecvError::Closed) => break,
                    }
                }
                _ = cancel.cancelled() => {
                    yield Ok(Event::default()
                        .event("shutdown")
                        .data("Server is shutting down"));
                    break;
                }
            }
        }
    };

    let conns = connections.clone();
    let id = conn_id_clone.clone();
    tokio::spawn(async move {
        cancel.cancelled().await;
        let mut conns = conns.write().await;
        conns.retain(|c| c != &id);
    });

    Sse::new(stream).keep_alive(
        axum::response::sse::KeepAlive::new()
            .interval(Duration::from_secs(10)),
    )
}

async fn health_check(State(state): State<Arc<AppState>>) -> impl IntoResponse {
    let conn_count = state.connections.read().await.len();
    let is_shutting_down = state.cancel_token.is_cancelled();
    axum::Json(serde_json::json!({
        "status": if is_shutting_down { "shutting_down" } else { "healthy" },
        "active_connections": conn_count,
        "uptime_secs": 0,
    }))
}

#[tokio::main]
async fn main() {
    let (broadcast_tx, _) = broadcast::channel::<ServerEvent>(512);
    let cancel_token = CancellationToken::new();

    let state = Arc::new(AppState {
        broadcast_tx,
        cancel_token: cancel_token.clone(),
        connections: RwLock::new(Vec::new()),
    });

    let app = Router::new()
        .route("/events", get(sse_production))
        .route("/health", get(health_check))
        .with_state(state.clone());

    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();

    tokio::spawn(async move {
        tokio::signal::ctrl_c().await.unwrap();
        tracing::info!("Shutdown signal received, draining connections...");
        cancel_token.cancel();
        tokio::time::sleep(Duration::from_secs(10)).await;
        tracing::info!("Graceful shutdown complete");
        std::process::exit(0);
    });

    axum::serve(listener, app)
        .with_graceful_shutdown(shutdown_signal())
        .await
        .unwrap();
}

async fn shutdown_signal() {
    tokio::signal::ctrl_c().await.unwrap();
}

Docker Deployment Configuration

FROM rust:1.85 as builder
WORKDIR /app
COPY Cargo.toml Cargo.lock ./
COPY src ./src
RUN cargo build --release

FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/*
COPY --from=builder /app/target/release/streaming-server /usr/local/bin/
EXPOSE 3000
CMD ["streaming-server"]
# docker-compose.yml
services:
  streaming-server:
    build: .
    ports:
      - "3000:3000"
    environment:
      - RUST_LOG=info
      - STREAM_BUFFER_SIZE=256
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:3000/health"]
      interval: 10s
      timeout: 5s
      retries: 3
    deploy:
      resources:
        limits:
          memory: 512M

5 Common Pitfalls and Solutions

Pitfall 1: SSE Stream Not Closing Causes Connection Leaks

Symptom: After client disconnects, the server stream keeps running, memory grows continuously.

Cause: Axum's SSE stream doesn't automatically detect client disconnection.

Solution:

use axum::extract::Request;

async fn sse_safe(request: Request) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let cancel = tokio_util::sync::CancellationToken::new();
    let cancel_clone = cancel.clone();

    let stream = async_stream::stream! {
        let mut interval = tokio::time::interval(Duration::from_secs(1));
        loop {
            tokio::select! {
                _ = interval.tick() => {
                    yield Ok(Event::default().data("tick"));
                }
                _ = cancel_clone.cancelled() => {
                    tracing::info!("Stream cancelled, client disconnected");
                    break;
                }
            }
        }
    };

    tokio::spawn(async move {
        tokio::time::sleep(Duration::from_secs(3600)).await;
        cancel.cancel();
    });

    Sse::new(stream)
}

Pitfall 2: Broadcast Channel Lagged Causes Message Loss

Symptom: RecvError::Lagged(n) appears frequently, clients lose messages.

Cause: Broadcast channel capacity insufficient, slow consumers can't keep up with fast producers.

Solution: Increase channel capacity + log missed messages + resend on client reconnect.

let (tx, _) = tokio::sync::broadcast::channel::<Event>(1024);

match rx.recv().await {
    Err(tokio::sync::broadcast::error::RecvError::Lagged(missed)) => {
        tracing::warn!("Lagged {}, requesting catch-up", missed);
        yield Ok(Event::default()
            .event("lagged")
            .data(serde_json::json!({"missed": missed}).to_string()));
    }
    _ => {}
}

Pitfall 3: WebSocket Message Too Large Gets Rejected

Symptom: Sending large messages causes WebSocket errors or disconnection.

Cause: Axum's default WebSocket message size limit is 64KB.

Solution:

use axum::extract::ws::WebSocketUpgrade;

async fn ws_large_messages(ws: WebSocketUpgrade) -> impl IntoResponse {
    ws.max_frame_size(1024 * 1024)
      .max_message_size(4 * 1024 * 1024)
      .on_upgrade(handle_socket)
}

Pitfall 4: Streaming Response Missing Content-Type

Symptom: Client receives streaming data but can't parse it correctly.

Cause: Forgot to set the correct Content-Type header.

Solution:

async fn ndjson_response() -> impl IntoResponse {
    let stream = create_stream();
    (
        [
            ("content-type", "application/x-ndjson; charset=utf-8"),
            ("cache-control", "no-cache"),
            ("connection", "keep-alive"),
        ],
        axum::body::Body::from_stream(stream),
    )
}

Pitfall 5: Graceful Shutdown Forcibly Interrupts Streams

Symptom: When the service shuts down, clients receive connection reset instead of normal closure.

Cause: Killing the process directly without waiting for active streams to complete.

Solution: Use CancellationToken to notify all streams to gracefully end (see Pattern 6).


10 Common Error Troubleshooting

# Error Message Cause Solution
1 the trait bound impl Stream: IntoResponse is not satisfied Stream type mismatch Ensure Stream's Item type is Result<Event, Infallible>
2 WebSocket upgrade failed: Connection header is not upgrade Client didn't send Upgrade header Confirm request includes Connection: Upgrade and Upgrade: websocket
3 channel send error: channel closed Receiver already closed Check if receiver was dropped early, use send().await.is_err()
4 RecvError::Lagged(n) Consumer can't keep up with producer Increase broadcast capacity, or use bounded mpsc for backpressure
5 body write aborted: connection closed Client disconnected Add client disconnect detection, clean up resources promptly
6 SSE stream timeout Keep-Alive timeout Adjust KeepAlive::new().interval() duration
7 WebSocket message too large Message exceeds default 64KB limit Use ws.max_frame_size() and ws.max_message_size()
8 task hung detected Async task blocking Check for synchronous blocking operations, use tokio::task::spawn_blocking
9 too many open files Connection count exceeds system limit Adjust ulimit -n, or limit max connections
10 connection reset by peer Client abnormally closed Handle Message::Close and errors in WebSocket handler

Advanced Optimization Techniques

1. Connection Limiting

use axum::middleware;
use std::sync::atomic::{AtomicUsize, Ordering};

struct ConnectionLimiter {
    current: AtomicUsize,
    max: usize,
}

async fn limit_connections(
    State(limiter): State<Arc<ConnectionLimiter>>,
    request: axum::extract::Request,
    next: axum::middleware::Next,
) -> impl IntoResponse {
    let current = limiter.current.fetch_add(1, Ordering::Relaxed);
    if current >= limiter.max {
        limiter.current.fetch_sub(1, Ordering::Relaxed);
        return axum::http::StatusCode::SERVICE_UNAVAILABLE.into_response();
    }

    let response = next.run(request).await;
    limiter.current.fetch_sub(1, Ordering::Relaxed);
    response
}

2. Streaming Compression

use axum::response::IntoResponse;
use tokio_util::io::StreamReader;

async fn compressed_stream() -> impl IntoResponse {
    let stream = create_data_stream();
    let reader = StreamReader::new(stream);

    let mut encoder = async_compression::tokio::write::GzipEncoder::new(tokio::io::BufWriter::new(reader));
    let compressed_stream = tokio_util::io::ReaderStream::new(encoder);

    (
        [
            ("content-type", "application/x-ndjson"),
            ("content-encoding", "gzip"),
        ],
        axum::body::Body::from_stream(compressed_stream),
    )
}

3. Streaming Response Monitoring

use prometheus::{IntCounter, IntGauge, Registry};

lazy_static::lazy_static! {
    static ref REGISTRY: Registry = Registry::new();
    static ref SSE_CONNECTIONS: IntGauge = IntGauge::new(
        "sse_connections_active",
        "Active SSE connections"
    ).unwrap();
    static ref SSE_EVENTS_SENT: IntCounter = IntCounter::new(
        "sse_events_sent_total",
        "Total SSE events sent"
    ).unwrap();
    static ref WS_CONNECTIONS: IntGauge = IntGauge::new(
        "ws_connections_active",
        "Active WebSocket connections"
    ).unwrap();
}

async fn sse_monitored(
    State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    SSE_CONNECTIONS.inc();
    let mut rx = state.broadcast_tx.subscribe();

    let stream = async_stream::stream! {
        loop {
            match rx.recv().await {
                Ok(event) => {
                    SSE_EVENTS_SENT.inc();
                    let data = serde_json::to_string(&event).unwrap();
                    yield Ok(Event::default().data(data));
                }
                Err(tokio::sync::broadcast::error::RecvError::Closed) => {
                    break;
                }
                Err(_) => continue,
            }
        }
    };

    let stream = stream.chain(futures::stream::once(async {
        SSE_CONNECTIONS.dec();
        Ok(Event::default())
    }));

    Sse::new(stream)
}

Comparison: SSE vs WebSocket vs Long Polling

Dimension SSE WebSocket Long Polling
Direction One-way (server→client) Bidirectional One-way (simulated)
Protocol HTTP WS/WSS HTTP
Auto-reconnect Native browser support Manual implementation Naturally supported
Browser API EventSource WebSocket API fetch/XHR
Data format Text Text + Binary Any
Proxy/CDN Friendly Requires special config Friendly
Connection overhead Low Medium High (frequent connections)
Use cases Notifications, LLM streaming, logs Chat, collaboration, games High compatibility requirements
Memory usage Low Medium High
Implementation complexity Low Medium Low

Decision Tree

Need real-time push?
├── Only server→client?
│   ├── Need auto-reconnect? → SSE
│   └── Small data volume, low frequency? → Long Polling
└── Need bidirectional communication?
    ├── Need binary data? → WebSocket
    └── Text only? → WebSocket

When developing Rust Axum streaming responses, these online tools can significantly boost productivity:


Summary

Rust Axum streaming responses provide high-performance, memory-safe solutions for real-time web applications. Each of the 6 production patterns has its ideal use case:

  1. SSE: Best for one-way push scenarios, simple implementation, native browser support
  2. WebSocket: First choice for bidirectional communication, ideal for chat and collaboration
  3. NDJSON: Standard format for large data streaming, parse line-by-line without waiting
  4. Backpressure Control: Lifeline of streaming systems, bounded channels + token buckets are essential
  5. Multi-Client Broadcast: broadcast channels for Pub/Sub, topic subscriptions for targeted push
  6. Production Deployment: Graceful shutdown + health checks + connection limiting as the essential trio

Remember: unbounded channels are ticking time bombs; streaming systems without backpressure will eventually OOM. Choose the right channel type and capacity, combined with Tokio's zero-cost async, and your Axum streaming service will run stably in production.


Related Reading:

References:

Try these browser-local tools — no sign-up required →

#Rust#Axum#流式响应#SSE#WebSocket#实时推送#2026#编程语言