Rust Tokio優雅關閉:5種訊號處理模式與資源清理的完整實戰指南
编程语言
你的Rust服務又暴力關閉了
K8s滾動更新時,Pod收到SIGTERM後直接退出,正在處理的1000個請求全部斷開,資料庫事務回滾,訊息佇列訊息丟失。使用者看到滿屏502,告警再次炸裂。你加了tokio::signal監聽,結果發現關閉時goroutine——不對,是tokio task——還在跑,連線沒排空,資源沒釋放。
Rust Tokio優雅關閉不是ctrlc::set_handler加一行就完事的。訊號如何捕獲?執行時如何停止?連線如何排空?資源如何清理?狀態如何持久化?這些問題不搞清楚,每次部署都是一次「小事故」。
本文將從5種訊號處理模式出發,帶你完成訊號捕獲→執行時關閉→連線排空→資源清理→狀態持久化的全鏈路實戰。
Tokio優雅關閉核心概念
| 概念 | 說明 |
|---|---|
| tokio::signal | Tokio非同步訊號監聽模組,支援SIGINT/SIGTERM/SIGHUP |
| tokio::sync::broadcast | 廣播通道,用於向所有worker傳播關閉訊號 |
| tokio::sync::watch | 單值觀察通道,用於通知關閉狀態變更 |
| CancellationToken | tokio_util提供的協作式取消令牌 |
| tokio::task::JoinSet | 任務集合,可等待所有任務完成後再關閉 |
| Graceful Shutdown | 優雅關閉:先停止接收新請求,等待現有請求完成,再退出 |
| Connection Draining | 連線排空:關閉監聽埠後,等待活躍連線處理完畢 |
優雅關閉流程
1. 收到SIGTERM訊號
2. 停止接收新連線(關閉Listener)
3. 通知所有worker開始優雅關閉
4. 等待活躍請求處理完畢(Connection Draining)
5. 關閉資料庫連線池
6. 持久化必要狀態
7. 退出程序(exit 0)
問題分析:Tokio優雅關閉的5大挑戰
- 訊號處理時機:訊號可能在任意時刻到達,需確保不會在關鍵操作中間中斷
- 任務等待與超時:等待所有task完成可能導致無限等待,需要設定grace period
- 連線排空:HTTP/gRPC連線需要先停止接收新請求,再等待現有請求完成
- 資源釋放順序:資料庫連線、檔案控制代碼、臨時檔案等需要按正確順序釋放
- 狀態持久化:關閉前需要將記憶體中的狀態持久化到磁碟或資料庫
分步實操:完整優雅關閉實現
Step 1:基礎訊號處理
use tokio::signal;
use tokio::sync::broadcast;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (shutdown_tx, _) = broadcast::channel::<()>(1);
tokio::spawn(handle_shutdown_signal(shutdown_tx.clone()));
let mut shutdown_rx = shutdown_tx.subscribe();
tokio::select! {
_ = run_server() => {
println!("Server exited normally");
}
_ = shutdown_rx.recv() => {
println!("Received shutdown signal");
}
}
Ok(())
}
async fn handle_shutdown_signal(shutdown_tx: broadcast::Sender<()>) {
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("Failed to install Ctrl+C handler");
};
#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("Failed to install SIGTERM handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => {
println!("Received Ctrl+C");
}
_ = terminate => {
println!("Received SIGTERM");
}
}
let _ = shutdown_tx.send(());
}
async fn run_server() {
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
Step 2:CancellationToken模式
use tokio_util::sync::CancellationToken;
use std::time::Duration;
struct AppServer {
cancellation_token: CancellationToken,
workers: Vec<tokio::task::JoinHandle<()>>,
}
impl AppServer {
fn new() -> Self {
Self {
cancellation_token: CancellationToken::new(),
workers: Vec::new(),
}
}
async fn start(&mut self) {
for id in 0..4 {
let token = self.cancellation_token.child_token();
let handle = tokio::spawn(async move {
worker_loop(id, token).await;
});
self.workers.push(handle);
}
}
async fn graceful_shutdown(self, timeout: Duration) {
self.cancellation_token.cancel();
let deadline = tokio::time::Instant::now() + timeout;
for handle in self.workers {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
match tokio::time::timeout(remaining, handle).await {
Ok(Ok(())) => {}
Ok(Err(e)) => {
eprintln!("Worker panicked: {}", e);
}
Err(_) => {
eprintln!("Worker did not shut down within timeout, aborting");
}
}
}
}
}
async fn worker_loop(id: usize, token: CancellationToken) {
loop {
tokio::select! {
_ = token.cancelled() => {
println!("Worker {} shutting down gracefully", id);
break;
}
_ = tokio::time::sleep(Duration::from_secs(1)) => {
println!("Worker {} processing task", id);
}
}
}
println!("Worker {} cleanup complete", id);
}
Step 3:HTTP服務連線排空
use axum::{Router, Server};
use axum::routing::get;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::watch;
struct GracefulServer {
shutdown_tx: watch::Sender<bool>,
shutdown_rx: watch::Receiver<bool>,
}
impl GracefulServer {
fn new() -> Self {
let (shutdown_tx, shutdown_rx) = watch::channel(false);
Self {
shutdown_tx,
shutdown_rx,
}
}
async fn run(self: Arc<Self>, addr: SocketAddr) {
let app = Router::new()
.route("/health", get(|| async { "ok" }))
.route("/api/data", get(handle_data));
let server = Server::bind(&addr)
.serve(app.into_make_service());
let graceful = server.with_graceful_shutdown(async {
let mut rx = self.shutdown_rx.clone();
while !*rx.borrow_and_update() {
if rx.changed().await.is_err() {
break;
}
}
});
if let Err(e) = graceful.await {
eprintln!("Server error: {}", e);
}
}
fn trigger_shutdown(&self) {
let _ = self.shutdown_tx.send(true);
}
}
async fn handle_data() -> &'static str {
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
"data response"
}
Step 4:資源清理管理器
use std::future::Future;
use std::time::Duration;
use tokio::sync::Mutex;
struct CleanupTask {
name: String,
cleanup_fn: Box<dyn FnOnce() -> Box<dyn Future<Output = Result<(), String>> + Send> + Send>,
}
pub struct ResourceManager {
cleanup_tasks: Mutex<Vec<CleanupTask>>,
shutdown_timeout: Duration,
}
impl ResourceManager {
pub fn new(shutdown_timeout: Duration) -> Self {
Self {
cleanup_tasks: Mutex::new(Vec::new()),
shutdown_timeout,
}
}
pub async fn register<F, Fut>(&self, name: impl Into<String>, cleanup_fn: F)
where
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = Result<(), String>> + Send + 'static,
{
let task = CleanupTask {
name: name.into(),
cleanup_fn: Box::new(move || Box::new(cleanup_fn()) as _),
};
self.cleanup_tasks.lock().await.push(task);
}
pub async fn cleanup_all(&self) {
let mut tasks = self.cleanup_tasks.lock().await;
let tasks = std::mem::take(&mut *tasks);
for task in tasks.into_iter().rev() {
let result = tokio::time::timeout(
self.shutdown_timeout,
(task.cleanup_fn)(),
).await;
match result {
Ok(Ok(())) => {
println!("✓ Cleaned up: {}", task.name);
}
Ok(Err(e)) => {
eprintln!("✗ Cleanup failed for {}: {}", task.name, e);
}
Err(_) => {
eprintln!("✗ Cleanup timed out for {}", task.name);
}
}
}
}
}
Step 5:完整優雅關閉編排
use tokio::sync::broadcast;
use tokio_util::sync::CancellationToken;
use std::time::Duration;
pub struct GracefulShutdown {
shutdown_signal: broadcast::Sender<()>,
cancellation_token: CancellationToken,
resource_manager: ResourceManager,
grace_period: Duration,
}
impl GracefulShutdown {
pub fn new(grace_period: Duration) -> Self {
let (shutdown_signal, _) = broadcast::channel(1);
Self {
shutdown_signal,
cancellation_token: CancellationToken::new(),
resource_manager: ResourceManager::new(Duration::from_secs(5)),
grace_period,
}
}
pub fn shutdown_signal(&self) -> broadcast::Sender<()> {
self.shutdown_signal.clone()
}
pub fn cancellation_token(&self) -> CancellationToken {
self.cancellation_token.clone()
}
pub fn resource_manager(&self) -> &ResourceManager {
&self.resource_manager
}
pub async fn run(self) {
let mut rx = self.shutdown_signal.subscribe();
rx.recv().await.ok();
println!("🛑 Shutdown signal received, starting graceful shutdown...");
self.cancellation_token.cancel();
println!("⏳ Waiting up to {:?} for tasks to complete...", self.grace_period);
tokio::time::sleep(self.grace_period).await;
println!("🧹 Cleaning up resources...");
self.resource_manager.cleanup_all().await;
println!("✅ Graceful shutdown complete. Goodbye!");
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let shutdown = GracefulShutdown::new(Duration::from_secs(30));
let signal_tx = shutdown.shutdown_signal();
tokio::spawn(async move {
tokio::signal::ctrl_c().await.ok();
let _ = signal_tx.send(());
});
let token = shutdown.cancellation_token();
tokio::spawn(async move {
loop {
tokio::select! {
_ = token.cancelled() => {
println!("Background task shutting down");
break;
}
_ = tokio::time::sleep(Duration::from_secs(1)) => {
println!("Background task working...");
}
}
}
});
shutdown.run().await;
Ok(())
}
避坑指南
坑1:使用std::sync::mpsc而不是tokio::sync
// ❌ 錯誤:使用標準庫通道在非同步上下文中
let (tx, rx) = std::sync::mpsc::channel();
tokio::spawn(async move {
let msg = rx.recv().unwrap(); // 阻塞當前執行緒!
});
// ✅ 正確:使用tokio非同步通道
let (tx, mut rx) = tokio::sync::mpsc::channel(100);
tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
println!("Received: {}", msg);
}
});
坑2:在訊號handler中執行耗時操作
// ❌ 錯誤:訊號回呼中做清理工作
ctrlc::set_handler(|| {
std::thread::sleep(Duration::from_secs(5)); // 訊號handler中不能阻塞!
cleanup_database();
})?;
// ✅ 正確:訊號只通知,清理在主迴圈中做
let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(1);
let tx_clone = tx.clone();
ctrlc::set_handler(move || {
let _ = tx_clone.try_send(());
})?;
rx.recv().await;
cleanup_database().await;
坑3:JoinHandle不等待導致task被強制終止
// ❌ 錯誤:spawn後不等待,主函式退出task直接被殺
tokio::spawn(async {
long_running_task().await;
});
// main函式退出,task被殺
// ✅ 正確:收集JoinHandle並等待完成
let mut tasks = tokio::task::JoinSet::new();
for i in 0..4 {
tasks.spawn(async move { worker(i).await });
}
while let Some(result) = tasks.join_next().await {
result?;
}
坑4:CancellationToken不傳播到子task
// ❌ 錯誤:子task沒有接收取消訊號
let token = CancellationToken::new();
tokio::spawn(async {
loop {
do_work().await; // 永遠不會停止
}
});
token.cancel();
// ✅ 正確:將child_token傳遞給子task
let token = CancellationToken::new();
let child_token = token.child_token();
tokio::spawn(async move {
loop {
tokio::select! {
_ = child_token.cancelled() => break,
_ = do_work() => {}
}
}
});
token.cancel();
坑5:grace period設定不合理
// ❌ 錯誤:grace period太短,請求還沒處理完就被殺
let grace_period = Duration::from_millis(100); // 太短!
// ✅ 正確:根據業務P99延遲設定合理的grace period
let grace_period = Duration::from_secs(30); // 大於P99延遲
// K8s中terminationGracePeriodSeconds應大於grace_period
報錯排查
| 序號 | 報錯訊息 | 原因 | 解決方法 |
|---|---|---|---|
| 1 | task 42 was cancelled |
Task在執行中被取消 | 檢查CancellationToken或超時設定,確認是否預期行為 |
| 2 | channel closed |
通道傳送端被drop,接收端還在等待 | 確保傳送端在關閉前通知接收端 |
| 3 | JoinError::Panic(...) |
Task內部panic | 檢查task邏輯,新增錯誤處理 |
| 4 | deadline has elapsed |
tokio::time::timeout超時 | 增加超時時間或最佳化處理速度 |
| 5 | connection reset by peer |
客戶端在服務端關閉時重置連線 | 實現連線排空,先停止新連線再等待現有連線 |
| 6 | Address in use |
服務關閉後埠未釋放 | 確保Listener正確drop,設定SO_REUSEADDR |
| 7 | broken pipe |
向已關閉的連線寫資料 | 在寫操作前檢查連線狀態 |
| 8 | database connection closed |
關閉時資料庫連線池已釋放 | 調整資源釋放順序,資料庫連線最後關閉 |
| 9 | runtime dropped |
Tokio runtime在task完成前被drop | 確保runtime的生命週期覆蓋所有task |
| 10 | signal handler already registered |
重複註冊訊號處理 | 使用once_cell或lazy_static確保只註冊一次 |
進階最佳化
1. 健康檢查感知關閉
use axum::{Json, extract::State};
use serde_json::{json, Value};
use std::sync::Arc;
use tokio::sync::watch;
#[derive(Clone)]
struct AppState {
shutting_down: watch::Receiver<bool>,
}
async fn health_check(State(state): State<Arc<AppState>>) -> Json<Value> {
if *state.shutting_down.borrow() {
Json(json!({
"status": "draining",
"ready": false
}))
} else {
Json(json!({
"status": "healthy",
"ready": true
}))
}
}
async fn readiness_check(State(state): State<Arc<AppState>>) -> &'static str {
if *state.shutting_down.borrow() {
"Service Unavailable"
} else {
"OK"
}
}
2. 連線計數排空
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::Notify;
#[derive(Clone)]
pub struct ConnectionTracker {
active_connections: Arc<AtomicU64>,
drain_notify: Arc<Notify>,
}
impl ConnectionTracker {
pub fn new() -> Self {
Self {
active_connections: Arc::new(AtomicU64::new(0)),
drain_notify: Arc::new(Notify::new()),
}
}
pub fn connection_enter(&self) -> ConnectionGuard {
self.active_connections.fetch_add(1, Ordering::Relaxed);
ConnectionGuard {
tracker: self.clone(),
}
}
pub fn connection_exit(&self) {
let prev = self.active_connections.fetch_sub(1, Ordering::Relaxed);
if prev == 1 {
self.drain_notify.notify_waiters();
}
}
pub async fn drain(&self, timeout: std::time::Duration) {
let deadline = tokio::time::Instant::now() + timeout;
while self.active_connections.load(Ordering::Relaxed) > 0 {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
let remaining = self.active_connections.load(Ordering::Relaxed);
eprintln!("Drain timeout, {} connections still active", remaining);
break;
}
tokio::select! {
_ = self.drain_notify.notified() => {}
_ = tokio::time::sleep(remaining) => break,
}
}
}
}
pub struct ConnectionGuard {
tracker: ConnectionTracker,
}
impl Drop for ConnectionGuard {
fn drop(&mut self) {
self.tracker.connection_exit();
}
}
3. 狀態持久化Hook
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::Mutex;
type PersistFn = Box<dyn Fn() -> Pin<Box<dyn Future<Output = Result<(), String>> + Send>> + Send + Sync>;
pub struct StatePersistence {
hooks: Arc<Mutex<Vec<(String, PersistFn)>>>,
}
impl StatePersistence {
pub fn new() -> Self {
Self {
hooks: Arc::new(Mutex::new(Vec::new())),
}
}
pub async fn register<F, Fut>(&self, name: impl Into<String>, persist_fn: F)
where
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<(), String>> + Send + 'static,
{
let name = name.into();
let hook: PersistFn = Box::new(move || Box::pin(persist_fn()));
self.hooks.lock().await.push((name, hook));
}
pub async fn persist_all(&self, timeout: std::time::Duration) -> Vec<(String, Result<(), String>)> {
let hooks = self.hooks.lock().await;
let mut results = Vec::new();
for (name, hook) in hooks.iter() {
let result = tokio::time::timeout(timeout, hook()).await;
match result {
Ok(Ok(())) => {
println!("✓ State persisted: {}", name);
results.push((name.clone(), Ok(())));
}
Ok(Err(e)) => {
eprintln!("✗ State persist failed for {}: {}", name, e);
results.push((name.clone(), Err(e)));
}
Err(_) => {
let err = format!("State persist timed out for {}", name);
eprintln!("✗ {}", err);
results.push((name.clone(), Err(err)));
}
}
}
results
}
}
對比分析
| 維度 | broadcast通道 | watch通道 | CancellationToken | JoinSet | mpsc通道 |
|---|---|---|---|---|---|
| 傳播模式 | 一對多廣播 | 一對多觀察 | 樹形級聯 | 任務集合 | 一對一/多 |
| 取消語義 | 通知型 | 狀態型 | 協作型 | 等待型 | 訊息型 |
| 多消費者 | ✅所有都能收到 | ✅所有都能觀察 | ✅子token級聯 | ❌單一等待者 | ❌競爭消費 |
| 背壓支援 | ❌慢消費者丟訊息 | ✅只保留最新值 | N/A | N/A | ✅有緩衝 |
| 資源開銷 | 低 | 極低 | 低 | 中 | 中 |
| 適用場景 | 全域關閉通知 | 狀態變更通知 | Task協作取消 | 等待任務完成 | 命令式關閉 |
總結:Rust Tokio優雅關閉的核心是「協作式」——不像C的signal handler可以非同步中斷,Tokio的關閉需要每個task主動配合。5種模式中,CancellationToken是最通用的:它支援樹形級聯取消,child_token自動隨父token取消,且取消是冪等的。生產環境推薦組合:CancellationToken(task取消)+ watch通道(狀態通知)+ JoinSet(等待完成)+ ConnectionTracker(連線排空)+ StatePersistence(狀態持久化)。記住K8s的鐵律:
terminationGracePeriodSeconds必須大於你的grace period + 資源清理時間。
線上工具推薦
- JSON格式化:/zh-TW/json/format
- Base64編解碼:/zh-TW/encode/base64
- Hash計算:/zh-TW/encode/hash
- JWT解碼:/zh-TW/encode/jwt-decode
本站提供瀏覽器本地工具,免註冊即可試用 →
#Rust#Tokio#优雅关闭#异步运行时#信号处理#2026#服务端