Rust Axum流式响应:从SSE到WebSocket的6种生产模式
等数据全部加载完再返回,用户早关页面了
你写了个API返回10万条日志,客户端等了30秒才拿到完整JSON,内存直接飙到2GB;你想做实时通知,用轮询每秒请求一次,服务器QPS翻了10倍;你用WebSocket做单向推送,结果发现80%的连接只用来接收数据,全双工纯属浪费。2026年,Rust Axum流式响应才是正解——SSE单向推送、WebSocket双向通信、NDJSON流式传输,配合Tokio的零成本异步,内存安全且性能炸裂。
本文将从Axum流式响应核心概念出发,带你完成SSE推送→WebSocket通信→流式JSON→背压控制→多客户端广播→生产部署的6种生产模式,让实时数据推送从"勉强能用"变成"生产可靠"。
核心要点
- Axum流式响应基于
Body流和SSE两种核心抽象 - SSE适合单向推送(通知、LLM流式输出),WebSocket适合双向交互
- NDJSON是流式JSON的事实标准,每行一个完整JSON对象
- 背压控制是流式系统的生命线,无界通道=定时炸弹
tokio::sync::broadcast实现多客户端广播,watch实现状态同步- 生产部署需要优雅关闭、连接超时、健康检查三件套
目录
- Axum流式响应核心概念
- Pattern 1: SSE服务端推送实现
- Pattern 2: WebSocket双向通信
- Pattern 3: 流式JSON/NDJSON响应
- Pattern 4: 背压与流量控制
- Pattern 5: 多客户端广播(Pub/Sub)
- Pattern 6: 生产部署与优雅关闭
- 5个常见坑及解决方案
- 10个常见报错排查
- 进阶优化技巧
- 对比分析:SSE vs WebSocket vs 长轮询
- 在线工具推荐
- 总结
Axum流式响应核心概念
| 概念 | 说明 | 适用场景 |
|---|---|---|
Sse<impl Stream> |
Server-Sent Events,基于HTTP的单向推送 | 通知、LLM流式输出、实时日志 |
WebSocket |
全双工通信,需协议升级 | 聊天、协作编辑、游戏 |
Body |
Axum响应体的底层流抽象 | 自定义流式传输 |
| NDJSON | 换行分隔的JSON流 | 大数据导出、事件流 |
| Backpressure | 背压,消费者控制生产速度 | 防止OOM、流量整形 |
broadcast |
Tokio广播通道 | 多客户端同时订阅 |
watch |
Tokio状态观察通道 | 配置热更新、状态同步 |
| Chunked Encoding | HTTP分块传输 | 流式响应的底层传输机制 |
流式响应架构
┌─────────────────────────────────────────────────┐
│ Client │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Browser │ │ Mobile │ │ CLI │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
└───────┼──────────────┼──────────────┼───────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────┐
│ Axum Server │
│ ┌─────────────────────────────────────────┐ │
│ │ Router │ │
│ │ /sse → SSE Handler │ │
│ │ /ws → WebSocket Handler │ │
│ │ /stream → NDJSON Handler │ │
│ └──────────────┬──────────────────────────┘ │
│ │ │
│ ┌──────────────▼──────────────────────────┐ │
│ │ Tokio Runtime │ │
│ │ ┌────────┐ ┌────────┐ ┌────────┐ │ │
│ │ │ mpsc │ │broadcast│ │ watch │ │ │
│ │ └────────┘ └────────┘ └────────┘ │ │
│ └─────────────────────────────────────────┘ │
└─────────────────────────────────────────────────┘
Pattern 1: SSE服务端推送实现
SSE是Axum流式响应最常用的模式,基于HTTP长连接实现单向推送,浏览器原生EventSource API自动重连。
基础SSE实现
[dependencies]
axum = { version = "0.8", features = ["sse"] }
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
futures = "0.3"
use axum::{
Router,
response::sse::{Event, Sse},
routing::get,
};
use futures::stream::{self, Stream};
use std::convert::Infallible;
use tokio_stream::StreamExt as _;
use std::time::Duration;
async fn sse_handler() -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let stream = stream::iter(0..100).map(|i| {
Ok(Event::default()
.data(format!("Message {}", i))
.event("update")
.id(i.to_string()))
});
Sse::new(stream).keep_alive(
axum::response::sse::KeepAlive::new()
.interval(Duration::from_secs(5))
.text("ping"),
)
}
#[tokio::main]
async fn main() {
let app = Router::new().route("/sse", get(sse_handler));
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
}
带JSON数据的SSE
use axum::{
Router,
extract::State,
response::sse::{Event, Sse},
routing::get,
};
use futures::stream::Stream;
use serde::Serialize;
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;
use tokio_stream::StreamExt as _;
#[derive(Serialize, Clone)]
struct Notification {
id: u64,
title: String,
body: String,
timestamp: i64,
}
struct AppState {
notification_tx: tokio::sync::broadcast::Sender<Notification>,
}
async fn sse_notifications(
State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let mut rx = state.notification_tx.subscribe();
let stream = async_stream::stream! {
loop {
match rx.recv().await {
Ok(notification) => {
let data = serde_json::to_string(¬ification).unwrap();
yield Ok(Event::default()
.event("notification")
.data(data)
.id(notification.id.to_string()));
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
let data = serde_json::json!({"missed": n}).to_string();
yield Ok(Event::default()
.event("lagged")
.data(data));
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
break;
}
}
}
};
Sse::new(stream).keep_alive(
axum::response::sse::KeepAlive::new()
.interval(Duration::from_secs(15))
.text("keepalive"),
)
}
#[tokio::main]
async fn main() {
let (tx, _) = tokio::sync::broadcast::channel::<Notification>(100);
let state = Arc::new(AppState { notification_tx: tx });
let app = Router::new()
.route("/sse/notifications", get(sse_notifications))
.with_state(state);
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
}
SSE客户端断开检测
use axum::{
extract::Request,
response::sse::{Event, Sse},
};
use futures::stream::Stream;
use std::convert::Infallible;
use tokio_stream::StreamExt as _;
async fn sse_with_disconnect(request: Request) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let cancel_token = tokio_util::sync::CancellationToken::new();
let cancel_clone = cancel_token.clone();
tokio::spawn(async move {
tokio::select! {
_ = cancel_clone.cancelled() => {
tracing::info!("Client disconnected, stopping SSE stream");
}
_ = tokio::time::sleep(Duration::from_secs(3600)) => {
tracing::info!("SSE stream timeout");
}
}
});
let stream = async_stream::stream! {
let mut interval = tokio::time::interval(Duration::from_secs(1));
loop {
tokio::select! {
_ = interval.tick() => {
let data = serde_json::json!({
"time": chrono::Utc::now().to_rfc3339()
});
yield Ok(Event::default().data(data.to_string()));
}
_ = cancel_token.cancelled() => {
break;
}
}
}
};
Sse::new(stream).keep_alive(
axum::response::sse::KeepAlive::new()
.interval(Duration::from_secs(10)),
)
}
Pattern 2: WebSocket双向通信
WebSocket提供全双工通信能力,适合聊天、协作编辑、实时游戏等双向交互场景。
基础WebSocket实现
[dependencies]
axum = { version = "0.8", features = ["ws"] }
tokio = { version = "1", features = ["full"] }
futures = "0.3"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
use axum::{
Router,
extract::ws::{Message, WebSocket, WebSocketUpgrade},
response::IntoResponse,
routing::get,
};
use futures::{SinkExt, StreamExt};
async fn ws_handler(ws: WebSocketUpgrade) -> impl IntoResponse {
ws.on_upgrade(handle_socket)
}
async fn handle_socket(mut socket: WebSocket) {
let (mut sender, mut receiver) = socket.split();
let send_task = tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(5));
loop {
interval.tick().await;
let msg = serde_json::json!({
"type": "heartbeat",
"timestamp": chrono::Utc::now().to_rfc3339()
});
if sender.send(Message::Text(msg.to_string().into())).await.is_err() {
break;
}
}
});
let recv_task = tokio::spawn(async move {
while let Some(Ok(msg)) = receiver.next().await {
match msg {
Message::Text(text) => {
tracing::info!("Received: {}", text);
}
Message::Close(_) => {
tracing::info!("Client closed connection");
break;
}
_ => {}
}
}
});
tokio::select! {
_ = send_task => {},
_ = recv_task => {},
}
}
#[tokio::main]
async fn main() {
let app = Router::new().route("/ws", get(ws_handler));
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
}
带认证和状态管理的WebSocket
use axum::{
Router,
extract::{State, WebSocketUpgrade, ws::WebSocket},
response::IntoResponse,
routing::get,
};
use futures::{SinkExt, StreamExt};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
#[derive(Debug, Clone)]
struct ChatMessage {
user: String,
content: String,
timestamp: i64,
}
struct AppState {
rooms: RwLock<HashMap<String, Vec<mpsc::UnboundedSender<ChatMessage>>>>,
}
async fn ws_chat(
ws: WebSocketUpgrade,
State(state): State<Arc<AppState>>,
) -> impl IntoResponse {
ws.on_upgrade(move |socket| handle_chat(socket, state))
}
async fn handle_chat(socket: WebSocket, state: Arc<AppState>) {
let (mut ws_sender, mut ws_receiver) = socket.split();
let (tx, mut rx) = mpsc::unbounded_channel::<ChatMessage>();
let room_id = "general".to_string();
{
let mut rooms = state.rooms.write().await;
rooms.entry(room_id.clone()).or_default().push(tx);
}
let send_task = tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
let text = serde_json::to_string(&msg).unwrap();
if ws_sender.send(axum::extract::ws::Message::Text(text.into())).await.is_err() {
break;
}
}
});
let recv_task = tokio::spawn(async move {
while let Some(Ok(msg)) = ws_receiver.next().await {
if let axum::extract::ws::Message::Text(text) = msg {
let chat_msg = ChatMessage {
user: "anonymous".to_string(),
content: text.to_string(),
timestamp: chrono::Utc::now().timestamp(),
};
let rooms = state.rooms.read().await;
if let Some(senders) = rooms.get(&room_id) {
for sender in senders {
let _ = sender.send(chat_msg.clone());
}
}
}
}
});
tokio::select! {
_ = send_task => {},
_ = recv_task => {},
}
let mut rooms = state.rooms.write().await;
if let Some(senders) = rooms.get_mut(&room_id) {
senders.retain(|s| !s.is_closed());
}
}
Pattern 3: 流式JSON/NDJSON响应
NDJSON(Newline Delimited JSON)是流式JSON传输的事实标准,每行一个完整的JSON对象,客户端可以逐行解析,无需等待完整响应。
NDJSON流式响应
use axum::{
Router,
extract::{Path, Query},
response::IntoResponse,
routing::get,
};
use serde::{Deserialize, Serialize};
use tokio_stream::StreamExt;
#[derive(Serialize, Deserialize)]
struct LogEntry {
id: u64,
level: String,
message: String,
timestamp: String,
service: String,
}
#[derive(Deserialize)]
struct LogQuery {
service: Option<String>,
level: Option<String>,
limit: Option<usize>,
}
async fn stream_logs(
Query(params): Query<LogQuery>,
) -> impl IntoResponse {
let limit = params.limit.unwrap_or(1000);
let service_filter = params.service.clone();
let stream = async_stream::stream! {
let mut count = 0u64;
let mut interval = tokio::time::interval(Duration::from_millis(100));
while count < limit as u64 {
interval.tick().await;
let entry = LogEntry {
id: count,
level: if count % 10 == 0 { "ERROR".to_string() } else { "INFO".to_string() },
message: format!("Log entry #{}", count),
timestamp: chrono::Utc::now().to_rfc3339(),
service: service_filter.clone().unwrap_or_else(|| "api".to_string()),
};
let json_line = serde_json::to_string(&entry).unwrap();
yield Ok::<_, std::convert::Infallible>(format!("{}\n", json_line));
count += 1;
}
};
(
[
("content-type", "application/x-ndjson"),
("cache-control", "no-cache"),
("x-stream-format", "ndjson"),
],
axum::body::Body::from_stream(stream),
)
}
#[tokio::main]
async fn main() {
let app = Router::new().route("/logs/stream", get(stream_logs));
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
}
数据库查询流式输出
use axum::{
Router,
extract::State,
response::IntoResponse,
routing::get,
};
use sqlx::postgres::PgPool;
use std::sync::Arc;
struct AppState {
pool: PgPool,
}
async fn stream_query_results(
State(state): State<Arc<AppState>>,
) -> impl IntoResponse {
let pool = state.pool.clone();
let stream = async_stream::stream! {
let mut stream = sqlx::query_as::<_, (i64, String, String)>(
"SELECT id, name, email FROM users ORDER BY id"
)
.fetch(&pool);
while let Some(row) = stream.next().await {
match row {
Ok((id, name, email)) => {
let json = serde_json::json!({
"id": id,
"name": name,
"email": email,
});
yield Ok::<_, std::convert::Infallible>(
format!("{}\n", json)
);
}
Err(e) => {
tracing::error!("Query error: {}", e);
break;
}
}
}
};
(
[("content-type", "application/x-ndjson")],
axum::body::Body::from_stream(stream),
)
}
Pattern 4: 背压与流量控制
背压是流式系统的生命线。当消费者处理速度慢于生产速度时,没有背压控制会导致内存持续增长直到OOM。
背压控制架构
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Producer │────▶│ Buffer │────▶│ Consumer │
│ (fast) │ │ (bounded)│ │ (slow) │
└──────────┘ └──────────┘ └──────────┘
│ │ │
│ backpressure │ │
│◀───────────────┤ │
│ (wait when │ │
│ full) │ │
有界通道实现背压
use axum::{
Router,
extract::State,
response::sse::{Event, Sse},
routing::get,
};
use futures::stream::Stream;
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
struct AppState {
event_tx: mpsc::Sender<String>,
}
async fn sse_with_backpressure(
State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let (tx, mut rx) = mpsc::channel::<String>(32);
let producer_tx = state.event_tx.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_millis(10));
for i in 0..10000 {
interval.tick().await;
let msg = format!("Event {}", i);
if tx.send(msg).await.is_err() {
tracing::warn!("Consumer dropped, stopping producer");
break;
}
}
});
let stream = async_stream::stream! {
while let Some(msg) = rx.recv().await {
yield Ok(Event::default().data(msg));
}
};
Sse::new(stream).keep_alive(
axum::response::sse::KeepAlive::new()
.interval(Duration::from_secs(15)),
)
}
流量整形(Rate Limiting Stream)
use axum::{
Router,
response::IntoResponse,
routing::get,
};
use tokio::time::{self, Duration};
use tokio_stream::StreamExt;
#[derive(Clone)]
struct ThrottleConfig {
max_per_second: u64,
burst_size: usize,
}
async fn throttled_stream() -> impl IntoResponse {
let config = ThrottleConfig {
max_per_second: 100,
burst_size: 50,
};
let stream = async_stream::stream! {
let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(config.burst_size);
let mut tokens = config.burst_size as f64;
let max_tokens = config.burst_size as f64;
let refill_rate = config.max_per_second as f64 / 1000.0;
let mut last_refill = tokio::time::Instant::now();
tokio::spawn(async move {
let mut count = 0u64;
loop {
tokio::time::sleep(Duration::from_millis(10)).await;
let msg = format!("Item {}", count);
if tx.send(msg).await.is_err() {
break;
}
count += 1;
}
});
while let Some(item) = rx.recv().await {
let now = tokio::time::Instant::now();
let elapsed = now.duration_since(last_refill).as_millis() as f64;
tokens = (tokens + elapsed * refill_rate).min(max_tokens);
last_refill = now;
if tokens < 1.0 {
let wait_ms = ((1.0 - tokens) / refill_rate) as u64;
tokio::time::sleep(Duration::from_millis(wait_ms)).await;
tokens = 0.0;
last_refill = tokio::time::Instant::now();
} else {
tokens -= 1.0;
}
yield Ok::<_, std::convert::Infallible>(format!("{}\n", item));
}
};
(
[("content-type", "application/x-ndjson")],
axum::body::Body::from_stream(stream),
)
}
Pattern 5: 多客户端广播(Pub/Sub)
多客户端广播是实时推送的核心场景:一个事件源产生消息,多个客户端同时接收。
基于broadcast通道的广播
use axum::{
Router,
extract::State,
response::sse::{Event, Sse},
routing::{get, post},
};
use futures::stream::Stream;
use serde::{Deserialize, Serialize};
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;
#[derive(Serialize, Deserialize, Clone, Debug)]
struct BroadcastEvent {
event_type: String,
payload: serde_json::Value,
timestamp: i64,
}
struct AppState {
broadcast_tx: tokio::sync::broadcast::Sender<BroadcastEvent>,
client_count: Arc<std::sync::atomic::AtomicUsize>,
}
async fn subscribe(
State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let mut rx = state.broadcast_tx.subscribe();
let client_id = state.client_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
tracing::info!("Client {} subscribed", client_id);
let stream = async_stream::stream! {
loop {
match rx.recv().await {
Ok(event) => {
let data = serde_json::to_string(&event).unwrap();
yield Ok(Event::default()
.event(&event.event_type)
.data(data)
.id(event.timestamp.to_string()));
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!("Client {} lagged {} messages", client_id, n);
yield Ok(Event::default()
.event("lagged")
.data(format!("Missed {} messages", n)));
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
break;
}
}
}
};
Sse::new(stream).keep_alive(
axum::response::sse::KeepAlive::new()
.interval(Duration::from_secs(10)),
)
}
async fn publish(
State(state): State<Arc<AppState>>,
axum::Json(event): axum::Json<BroadcastEvent>,
) -> impl IntoResponse {
let receiver_count = state.broadcast_tx.receiver_count();
match state.broadcast_tx.send(event) {
Ok(_) => axum::Json(serde_json::json!({
"status": "published",
"receivers": receiver_count,
})),
Err(_) => axum::Json(serde_json::json!({
"status": "no_receivers",
})),
}
}
#[tokio::main]
async fn main() {
let (broadcast_tx, _) = tokio::sync::broadcast::channel::<BroadcastEvent>(256);
let state = Arc::new(AppState {
broadcast_tx,
client_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
});
let app = Router::new()
.route("/subscribe", get(subscribe))
.route("/publish", post(publish))
.with_state(state);
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
}
基于Topic的订阅
use axum::{
Router,
extract::{Path, State},
response::sse::{Event, Sse},
routing::{get, post},
};
use futures::stream::Stream;
use std::collections::HashMap;
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, RwLock};
#[derive(Clone, Serialize, Debug)]
struct TopicEvent {
topic: String,
data: serde_json::Value,
}
struct AppState {
topics: RwLock<HashMap<String, broadcast::Sender<TopicEvent>>>,
}
impl AppState {
fn new() -> Self {
Self {
topics: RwLock::new(HashMap::new()),
}
}
async fn get_or_create_topic(&self, topic: &str) -> broadcast::Sender<TopicEvent> {
let mut topics = self.topics.write().await;
topics
.entry(topic.to_string())
.or_insert_with(|| {
let (tx, _) = broadcast::channel::<TopicEvent>(256);
tx
})
.clone()
}
}
async fn subscribe_topic(
Path(topic): Path<String>,
State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let tx = state.get_or_create_topic(&topic).await;
let mut rx = tx.subscribe();
let stream = async_stream::stream! {
loop {
match rx.recv().await {
Ok(event) => {
let data = serde_json::to_string(&event).unwrap();
yield Ok(Event::default()
.event(&event.topic)
.data(data));
}
Err(broadcast::error::RecvError::Lagged(n)) => {
yield Ok(Event::default()
.event("lagged")
.data(format!("Missed {}", n)));
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
};
Sse::new(stream).keep_alive(
axum::response::sse::KeepAlive::new()
.interval(Duration::from_secs(15)),
)
}
async fn publish_topic(
Path(topic): Path<String>,
State(state): State<Arc<AppState>>,
axum::Json(data): axum::Json<serde_json::Value>,
) -> impl IntoResponse {
let tx = state.get_or_create_topic(&topic).await;
let event = TopicEvent {
topic: topic.clone(),
data,
};
let receivers = tx.receiver_count();
let _ = tx.send(event);
axum::Json(serde_json::json!({"delivered_to": receivers}))
}
Pattern 6: 生产部署与优雅关闭
生产环境的流式服务需要处理连接管理、优雅关闭和健康检查。
完整生产级服务
use axum::{
Router,
extract::State,
response::sse::{Event, Sse},
routing::{get, post},
};
use futures::stream::Stream;
use serde::Serialize;
use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, RwLock};
use tokio_util::sync::CancellationToken;
#[derive(Serialize, Clone)]
struct ServerEvent {
event_type: String,
data: serde_json::Value,
}
struct AppState {
broadcast_tx: broadcast::Sender<ServerEvent>,
cancel_token: CancellationToken,
connections: RwLock<Vec<String>>,
}
async fn sse_production(
State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let mut rx = state.broadcast_tx.subscribe();
let cancel = state.cancel_token.clone();
let conn_id = uuid::Uuid::new_v4().to_string();
{
let mut conns = state.connections.write().await;
conns.push(conn_id.clone());
}
let conn_id_clone = conn_id.clone();
let connections = state.connections.clone();
let stream = async_stream::stream! {
loop {
tokio::select! {
event = rx.recv() => {
match event {
Ok(ev) => {
let data = serde_json::to_string(&ev).unwrap();
yield Ok(Event::default()
.event(&ev.event_type)
.data(data));
}
Err(broadcast::error::RecvError::Lagged(n)) => {
yield Ok(Event::default()
.event("lagged")
.data(format!("Missed {}", n)));
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
_ = cancel.cancelled() => {
yield Ok(Event::default()
.event("shutdown")
.data("Server is shutting down"));
break;
}
}
}
};
let conns = connections.clone();
let id = conn_id_clone.clone();
tokio::spawn(async move {
cancel.cancelled().await;
let mut conns = conns.write().await;
conns.retain(|c| c != &id);
});
Sse::new(stream).keep_alive(
axum::response::sse::KeepAlive::new()
.interval(Duration::from_secs(10)),
)
}
async fn health_check(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let conn_count = state.connections.read().await.len();
let is_shutting_down = state.cancel_token.is_cancelled();
axum::Json(serde_json::json!({
"status": if is_shutting_down { "shutting_down" } else { "healthy" },
"active_connections": conn_count,
"uptime_secs": 0,
}))
}
#[tokio::main]
async fn main() {
let (broadcast_tx, _) = broadcast::channel::<ServerEvent>(512);
let cancel_token = CancellationToken::new();
let state = Arc::new(AppState {
broadcast_tx,
cancel_token: cancel_token.clone(),
connections: RwLock::new(Vec::new()),
});
let app = Router::new()
.route("/events", get(sse_production))
.route("/health", get(health_check))
.with_state(state.clone());
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
tokio::spawn(async move {
tokio::signal::ctrl_c().await.unwrap();
tracing::info!("Shutdown signal received, draining connections...");
cancel_token.cancel();
tokio::time::sleep(Duration::from_secs(10)).await;
tracing::info!("Graceful shutdown complete");
std::process::exit(0);
});
axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal())
.await
.unwrap();
}
async fn shutdown_signal() {
tokio::signal::ctrl_c().await.unwrap();
}
Docker部署配置
FROM rust:1.85 as builder
WORKDIR /app
COPY Cargo.toml Cargo.lock ./
COPY src ./src
RUN cargo build --release
FROM debian:bookworm-slim
RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/*
COPY --from=builder /app/target/release/streaming-server /usr/local/bin/
EXPOSE 3000
CMD ["streaming-server"]
# docker-compose.yml
services:
streaming-server:
build: .
ports:
- "3000:3000"
environment:
- RUST_LOG=info
- STREAM_BUFFER_SIZE=256
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:3000/health"]
interval: 10s
timeout: 5s
retries: 3
deploy:
resources:
limits:
memory: 512M
5个常见坑及解决方案
坑1:SSE流不关闭导致连接泄漏
现象:客户端断开后,服务端stream仍在运行,内存持续增长。
原因:Axum的SSE stream没有自动感知客户端断开。
解决方案:
use axum::extract::Request;
async fn sse_safe(request: Request) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let cancel = tokio_util::sync::CancellationToken::new();
let cancel_clone = cancel.clone();
let stream = async_stream::stream! {
let mut interval = tokio::time::interval(Duration::from_secs(1));
loop {
tokio::select! {
_ = interval.tick() => {
yield Ok(Event::default().data("tick"));
}
_ = cancel_clone.cancelled() => {
tracing::info!("Stream cancelled, client disconnected");
break;
}
}
}
};
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(3600)).await;
cancel.cancel();
});
Sse::new(stream)
}
坑2:broadcast通道Lagged导致消息丢失
现象:RecvError::Lagged(n) 频繁出现,客户端丢失消息。
原因:broadcast通道容量不足,慢消费者跟不上快生产者。
解决方案:增大通道容量 + 记录丢失消息数 + 客户端重连时补发。
let (tx, _) = tokio::sync::broadcast::channel::<Event>(1024);
match rx.recv().await {
Err(tokio::sync::broadcast::error::RecvError::Lagged(missed)) => {
tracing::warn!("Lagged {}, requesting catch-up", missed);
yield Ok(Event::default()
.event("lagged")
.data(serde_json::json!({"missed": missed}).to_string()));
}
_ => {}
}
坑3:WebSocket消息过大被拒
现象:发送大消息时WebSocket返回错误或断开。
原因:Axum默认WebSocket消息大小限制为64KB。
解决方案:
use axum::extract::ws::WebSocketUpgrade;
async fn ws_large_messages(ws: WebSocketUpgrade) -> impl IntoResponse {
ws.max_frame_size(1024 * 1024)
.max_message_size(4 * 1024 * 1024)
.on_upgrade(handle_socket)
}
坑4:流式响应缺少Content-Type
现象:客户端收到流式数据但无法正确解析。
原因:忘记设置正确的Content-Type头。
解决方案:
async fn ndjson_response() -> impl IntoResponse {
let stream = create_stream();
(
[
("content-type", "application/x-ndjson; charset=utf-8"),
("cache-control", "no-cache"),
("connection", "keep-alive"),
],
axum::body::Body::from_stream(stream),
)
}
坑5:优雅关闭时流被强制中断
现象:服务关闭时客户端收到连接重置,而非正常结束。
原因:直接kill进程,没有等待活跃流完成。
解决方案:使用CancellationToken通知所有流优雅结束(见Pattern 6)。
10个常见报错排查
| # | 报错信息 | 原因 | 解决方案 |
|---|---|---|---|
| 1 | the trait bound impl Stream: IntoResponse is not satisfied |
Stream类型不匹配 | 确保Stream的Item类型为Result<Event, Infallible> |
| 2 | WebSocket upgrade failed: Connection header is not upgrade |
客户端未发送Upgrade头 | 确认请求头包含Connection: Upgrade和Upgrade: websocket |
| 3 | channel send error: channel closed |
接收端已关闭 | 检查接收端是否提前drop,使用send().await.is_err()处理 |
| 4 | RecvError::Lagged(n) |
消费速度跟不上生产速度 | 增大broadcast容量,或使用有界mpsc实现背压 |
| 5 | body write aborted: connection closed |
客户端断开连接 | 添加客户端断开检测,及时清理资源 |
| 6 | SSE stream timeout |
Keep-Alive超时 | 调整KeepAlive::new().interval()时间 |
| 7 | WebSocket message too large |
消息超过默认64KB限制 | 使用ws.max_frame_size()和ws.max_message_size() |
| 8 | task hung detected |
异步任务阻塞 | 检查是否有同步阻塞操作,使用tokio::task::spawn_blocking |
| 9 | too many open files |
连接数超过系统限制 | 调整ulimit -n,或限制最大连接数 |
| 10 | connection reset by peer |
客户端异常关闭 | 在WebSocket handler中处理Message::Close和错误 |
进阶优化技巧
1. 连接数限制
use axum::middleware;
use std::sync::atomic::{AtomicUsize, Ordering};
struct ConnectionLimiter {
current: AtomicUsize,
max: usize,
}
async fn limit_connections(
State(limiter): State<Arc<ConnectionLimiter>>,
request: axum::extract::Request,
next: axum::middleware::Next,
) -> impl IntoResponse {
let current = limiter.current.fetch_add(1, Ordering::Relaxed);
if current >= limiter.max {
limiter.current.fetch_sub(1, Ordering::Relaxed);
return axum::http::StatusCode::SERVICE_UNAVAILABLE.into_response();
}
let response = next.run(request).await;
limiter.current.fetch_sub(1, Ordering::Relaxed);
response
}
2. 流式压缩
use axum::response::IntoResponse;
use tokio_util::io::StreamReader;
async fn compressed_stream() -> impl IntoResponse {
let stream = create_data_stream();
let reader = StreamReader::new(stream);
let mut encoder = async_compression::tokio::write::GzipEncoder::new(tokio::io::BufWriter::new(reader));
let compressed_stream = tokio_util::io::ReaderStream::new(encoder);
(
[
("content-type", "application/x-ndjson"),
("content-encoding", "gzip"),
],
axum::body::Body::from_stream(compressed_stream),
)
}
3. 流式响应监控
use prometheus::{IntCounter, IntGauge, Registry};
lazy_static::lazy_static! {
static ref REGISTRY: Registry = Registry::new();
static ref SSE_CONNECTIONS: IntGauge = IntGauge::new(
"sse_connections_active",
"Active SSE connections"
).unwrap();
static ref SSE_EVENTS_SENT: IntCounter = IntCounter::new(
"sse_events_sent_total",
"Total SSE events sent"
).unwrap();
static ref WS_CONNECTIONS: IntGauge = IntGauge::new(
"ws_connections_active",
"Active WebSocket connections"
).unwrap();
}
async fn sse_monitored(
State(state): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
SSE_CONNECTIONS.inc();
let mut rx = state.broadcast_tx.subscribe();
let stream = async_stream::stream! {
loop {
match rx.recv().await {
Ok(event) => {
SSE_EVENTS_SENT.inc();
let data = serde_json::to_string(&event).unwrap();
yield Ok(Event::default().data(data));
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
break;
}
Err(_) => continue,
}
}
};
let stream = stream.chain(futures::stream::once(async {
SSE_CONNECTIONS.dec();
Ok(Event::default())
}));
Sse::new(stream)
}
对比分析:SSE vs WebSocket vs 长轮询
| 维度 | SSE | WebSocket | 长轮询 |
|---|---|---|---|
| 通信方向 | 单向(服务端→客户端) | 双向 | 单向(模拟) |
| 协议 | HTTP | WS/WSS | HTTP |
| 自动重连 | 浏览器原生支持 | 需手动实现 | 天然支持 |
| 浏览器API | EventSource | WebSocket API | fetch/XHR |
| 数据格式 | 文本 | 文本+二进制 | 任意 |
| 代理/CDN | 友好 | 需特殊配置 | 友好 |
| 连接开销 | 低 | 中 | 高(频繁建连) |
| 适用场景 | 通知、LLM流式、日志 | 聊天、协作、游戏 | 兼容性要求高 |
| 内存占用 | 低 | 中 | 高 |
| 实现复杂度 | 低 | 中 | 低 |
选型决策树
需要实时推送?
├── 只需服务端→客户端?
│ ├── 需要自动重连? → SSE
│ └── 数据量小、频率低? → 长轮询
└── 需要双向通信?
├── 需要二进制数据? → WebSocket
└── 只需文本? → WebSocket
在线工具推荐
开发Rust Axum流式响应时,以下在线工具能大幅提升效率:
- JSON格式化工具 - 格式化SSE和WebSocket的JSON消息,调试流式数据
- Base64编解码 - 编解码WebSocket二进制帧的Base64数据
- Curl转代码 - 将SSE/WebSocket的curl测试命令转为Rust代码
总结
Rust Axum流式响应为实时Web应用提供了高性能、内存安全的解决方案。6种生产模式各有适用场景:
- SSE:最适合单向推送场景,实现简单,浏览器原生支持
- WebSocket:双向通信的首选,适合聊天和协作
- NDJSON:大数据流式传输的标准格式,逐行解析无需等待
- 背压控制:流式系统的生命线,有界通道+令牌桶是标配
- 多客户端广播:broadcast通道实现Pub/Sub,Topic订阅实现精准推送
- 生产部署:优雅关闭+健康检查+连接限制三件套
记住:无界通道是定时炸弹,没有背压的流式系统迟早OOM。选择正确的通道类型和容量,配合Tokio的零成本异步,你的Axum流式服务就能在生产环境中稳定运行。
相关阅读:
- Rust Axum Web框架:从路由设计到中间件的5个生产级实战模式
- Rust Tokio Channel模式实战:从mpsc到Broadcast的6种生产模式
- Rust Tokio优雅关闭:从信号处理到资源清理的完整实战
参考资源:
本站提供浏览器本地工具,免注册即可试用 →