Rust Tokio優雅關閉:5種訊號處理模式與資源清理的完整實戰指南

编程语言

你的Rust服務又暴力關閉了

K8s滾動更新時,Pod收到SIGTERM後直接退出,正在處理的1000個請求全部斷開,資料庫事務回滾,訊息佇列訊息丟失。使用者看到滿屏502,告警再次炸裂。你加了tokio::signal監聽,結果發現關閉時goroutine——不對,是tokio task——還在跑,連線沒排空,資源沒釋放。

Rust Tokio優雅關閉不是ctrlc::set_handler加一行就完事的。訊號如何捕獲?執行時如何停止?連線如何排空?資源如何清理?狀態如何持久化?這些問題不搞清楚,每次部署都是一次「小事故」。

本文將從5種訊號處理模式出發,帶你完成訊號捕獲→執行時關閉→連線排空→資源清理→狀態持久化的全鏈路實戰。


Tokio優雅關閉核心概念

概念 說明
tokio::signal Tokio非同步訊號監聽模組,支援SIGINT/SIGTERM/SIGHUP
tokio::sync::broadcast 廣播通道,用於向所有worker傳播關閉訊號
tokio::sync::watch 單值觀察通道,用於通知關閉狀態變更
CancellationToken tokio_util提供的協作式取消令牌
tokio::task::JoinSet 任務集合,可等待所有任務完成後再關閉
Graceful Shutdown 優雅關閉:先停止接收新請求,等待現有請求完成,再退出
Connection Draining 連線排空:關閉監聽埠後,等待活躍連線處理完畢

優雅關閉流程

1. 收到SIGTERM訊號
2. 停止接收新連線(關閉Listener)
3. 通知所有worker開始優雅關閉
4. 等待活躍請求處理完畢(Connection Draining)
5. 關閉資料庫連線池
6. 持久化必要狀態
7. 退出程序(exit 0)

問題分析:Tokio優雅關閉的5大挑戰

  1. 訊號處理時機:訊號可能在任意時刻到達,需確保不會在關鍵操作中間中斷
  2. 任務等待與超時:等待所有task完成可能導致無限等待,需要設定grace period
  3. 連線排空:HTTP/gRPC連線需要先停止接收新請求,再等待現有請求完成
  4. 資源釋放順序:資料庫連線、檔案控制代碼、臨時檔案等需要按正確順序釋放
  5. 狀態持久化:關閉前需要將記憶體中的狀態持久化到磁碟或資料庫

分步實操:完整優雅關閉實現

Step 1:基礎訊號處理

use tokio::signal;
use tokio::sync::broadcast;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let (shutdown_tx, _) = broadcast::channel::<()>(1);

    tokio::spawn(handle_shutdown_signal(shutdown_tx.clone()));

    let mut shutdown_rx = shutdown_tx.subscribe();

    tokio::select! {
        _ = run_server() => {
            println!("Server exited normally");
        }
        _ = shutdown_rx.recv() => {
            println!("Received shutdown signal");
        }
    }

    Ok(())
}

async fn handle_shutdown_signal(shutdown_tx: broadcast::Sender<()>) {
    let ctrl_c = async {
        signal::ctrl_c()
            .await
            .expect("Failed to install Ctrl+C handler");
    };

    #[cfg(unix)]
    let terminate = async {
        signal::unix::signal(signal::unix::SignalKind::terminate())
            .expect("Failed to install SIGTERM handler")
            .recv()
            .await;
    };

    #[cfg(not(unix))]
    let terminate = std::future::pending::<()>();

    tokio::select! {
        _ = ctrl_c => {
            println!("Received Ctrl+C");
        }
        _ = terminate => {
            println!("Received SIGTERM");
        }
    }

    let _ = shutdown_tx.send(());
}

async fn run_server() {
    loop {
        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
    }
}

Step 2:CancellationToken模式

use tokio_util::sync::CancellationToken;
use std::time::Duration;

struct AppServer {
    cancellation_token: CancellationToken,
    workers: Vec<tokio::task::JoinHandle<()>>,
}

impl AppServer {
    fn new() -> Self {
        Self {
            cancellation_token: CancellationToken::new(),
            workers: Vec::new(),
        }
    }

    async fn start(&mut self) {
        for id in 0..4 {
            let token = self.cancellation_token.child_token();
            let handle = tokio::spawn(async move {
                worker_loop(id, token).await;
            });
            self.workers.push(handle);
        }
    }

    async fn graceful_shutdown(self, timeout: Duration) {
        self.cancellation_token.cancel();

        let deadline = tokio::time::Instant::now() + timeout;
        for handle in self.workers {
            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
            match tokio::time::timeout(remaining, handle).await {
                Ok(Ok(())) => {}
                Ok(Err(e)) => {
                    eprintln!("Worker panicked: {}", e);
                }
                Err(_) => {
                    eprintln!("Worker did not shut down within timeout, aborting");
                }
            }
        }
    }
}

async fn worker_loop(id: usize, token: CancellationToken) {
    loop {
        tokio::select! {
            _ = token.cancelled() => {
                println!("Worker {} shutting down gracefully", id);
                break;
            }
            _ = tokio::time::sleep(Duration::from_secs(1)) => {
                println!("Worker {} processing task", id);
            }
        }
    }
    println!("Worker {} cleanup complete", id);
}

Step 3:HTTP服務連線排空

use axum::{Router, Server};
use axum::routing::get;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::watch;

struct GracefulServer {
    shutdown_tx: watch::Sender<bool>,
    shutdown_rx: watch::Receiver<bool>,
}

impl GracefulServer {
    fn new() -> Self {
        let (shutdown_tx, shutdown_rx) = watch::channel(false);
        Self {
            shutdown_tx,
            shutdown_rx,
        }
    }

    async fn run(self: Arc<Self>, addr: SocketAddr) {
        let app = Router::new()
            .route("/health", get(|| async { "ok" }))
            .route("/api/data", get(handle_data));

        let server = Server::bind(&addr)
            .serve(app.into_make_service());

        let graceful = server.with_graceful_shutdown(async {
            let mut rx = self.shutdown_rx.clone();
            while !*rx.borrow_and_update() {
                if rx.changed().await.is_err() {
                    break;
                }
            }
        });

        if let Err(e) = graceful.await {
            eprintln!("Server error: {}", e);
        }
    }

    fn trigger_shutdown(&self) {
        let _ = self.shutdown_tx.send(true);
    }
}

async fn handle_data() -> &'static str {
    tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
    "data response"
}

Step 4:資源清理管理器

use std::future::Future;
use std::time::Duration;
use tokio::sync::Mutex;

struct CleanupTask {
    name: String,
    cleanup_fn: Box<dyn FnOnce() -> Box<dyn Future<Output = Result<(), String>> + Send> + Send>,
}

pub struct ResourceManager {
    cleanup_tasks: Mutex<Vec<CleanupTask>>,
    shutdown_timeout: Duration,
}

impl ResourceManager {
    pub fn new(shutdown_timeout: Duration) -> Self {
        Self {
            cleanup_tasks: Mutex::new(Vec::new()),
            shutdown_timeout,
        }
    }

    pub async fn register<F, Fut>(&self, name: impl Into<String>, cleanup_fn: F)
    where
        F: FnOnce() -> Fut + Send + 'static,
        Fut: Future<Output = Result<(), String>> + Send + 'static,
    {
        let task = CleanupTask {
            name: name.into(),
            cleanup_fn: Box::new(move || Box::new(cleanup_fn()) as _),
        };
        self.cleanup_tasks.lock().await.push(task);
    }

    pub async fn cleanup_all(&self) {
        let mut tasks = self.cleanup_tasks.lock().await;
        let tasks = std::mem::take(&mut *tasks);

        for task in tasks.into_iter().rev() {
            let result = tokio::time::timeout(
                self.shutdown_timeout,
                (task.cleanup_fn)(),
            ).await;

            match result {
                Ok(Ok(())) => {
                    println!("✓ Cleaned up: {}", task.name);
                }
                Ok(Err(e)) => {
                    eprintln!("✗ Cleanup failed for {}: {}", task.name, e);
                }
                Err(_) => {
                    eprintln!("✗ Cleanup timed out for {}", task.name);
                }
            }
        }
    }
}

Step 5:完整優雅關閉編排

use tokio::sync::broadcast;
use tokio_util::sync::CancellationToken;
use std::time::Duration;

pub struct GracefulShutdown {
    shutdown_signal: broadcast::Sender<()>,
    cancellation_token: CancellationToken,
    resource_manager: ResourceManager,
    grace_period: Duration,
}

impl GracefulShutdown {
    pub fn new(grace_period: Duration) -> Self {
        let (shutdown_signal, _) = broadcast::channel(1);
        Self {
            shutdown_signal,
            cancellation_token: CancellationToken::new(),
            resource_manager: ResourceManager::new(Duration::from_secs(5)),
            grace_period,
        }
    }

    pub fn shutdown_signal(&self) -> broadcast::Sender<()> {
        self.shutdown_signal.clone()
    }

    pub fn cancellation_token(&self) -> CancellationToken {
        self.cancellation_token.clone()
    }

    pub fn resource_manager(&self) -> &ResourceManager {
        &self.resource_manager
    }

    pub async fn run(self) {
        let mut rx = self.shutdown_signal.subscribe();

        rx.recv().await.ok();

        println!("🛑 Shutdown signal received, starting graceful shutdown...");

        self.cancellation_token.cancel();

        println!("⏳ Waiting up to {:?} for tasks to complete...", self.grace_period);
        tokio::time::sleep(self.grace_period).await;

        println!("🧹 Cleaning up resources...");
        self.resource_manager.cleanup_all().await;

        println!("✅ Graceful shutdown complete. Goodbye!");
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let shutdown = GracefulShutdown::new(Duration::from_secs(30));

    let signal_tx = shutdown.shutdown_signal();
    tokio::spawn(async move {
        tokio::signal::ctrl_c().await.ok();
        let _ = signal_tx.send(());
    });

    let token = shutdown.cancellation_token();
    tokio::spawn(async move {
        loop {
            tokio::select! {
                _ = token.cancelled() => {
                    println!("Background task shutting down");
                    break;
                }
                _ = tokio::time::sleep(Duration::from_secs(1)) => {
                    println!("Background task working...");
                }
            }
        }
    });

    shutdown.run().await;
    Ok(())
}

避坑指南

坑1:使用std::sync::mpsc而不是tokio::sync

// ❌ 錯誤:使用標準庫通道在非同步上下文中
let (tx, rx) = std::sync::mpsc::channel();
tokio::spawn(async move {
    let msg = rx.recv().unwrap(); // 阻塞當前執行緒!
});

// ✅ 正確:使用tokio非同步通道
let (tx, mut rx) = tokio::sync::mpsc::channel(100);
tokio::spawn(async move {
    while let Some(msg) = rx.recv().await {
        println!("Received: {}", msg);
    }
});

坑2:在訊號handler中執行耗時操作

// ❌ 錯誤:訊號回呼中做清理工作
ctrlc::set_handler(|| {
    std::thread::sleep(Duration::from_secs(5)); // 訊號handler中不能阻塞!
    cleanup_database();
})?;

// ✅ 正確:訊號只通知,清理在主迴圈中做
let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(1);
let tx_clone = tx.clone();
ctrlc::set_handler(move || {
    let _ = tx_clone.try_send(());
})?;

rx.recv().await;
cleanup_database().await;

坑3:JoinHandle不等待導致task被強制終止

// ❌ 錯誤:spawn後不等待,主函式退出task直接被殺
tokio::spawn(async {
    long_running_task().await;
});
// main函式退出,task被殺

// ✅ 正確:收集JoinHandle並等待完成
let mut tasks = tokio::task::JoinSet::new();
for i in 0..4 {
    tasks.spawn(async move { worker(i).await });
}
while let Some(result) = tasks.join_next().await {
    result?;
}

坑4:CancellationToken不傳播到子task

// ❌ 錯誤:子task沒有接收取消訊號
let token = CancellationToken::new();
tokio::spawn(async {
    loop {
        do_work().await; // 永遠不會停止
    }
});
token.cancel();

// ✅ 正確:將child_token傳遞給子task
let token = CancellationToken::new();
let child_token = token.child_token();
tokio::spawn(async move {
    loop {
        tokio::select! {
            _ = child_token.cancelled() => break,
            _ = do_work() => {}
        }
    }
});
token.cancel();

坑5:grace period設定不合理

// ❌ 錯誤:grace period太短,請求還沒處理完就被殺
let grace_period = Duration::from_millis(100); // 太短!

// ✅ 正確:根據業務P99延遲設定合理的grace period
let grace_period = Duration::from_secs(30); // 大於P99延遲
// K8s中terminationGracePeriodSeconds應大於grace_period

報錯排查

序號 報錯訊息 原因 解決方法
1 task 42 was cancelled Task在執行中被取消 檢查CancellationToken或超時設定,確認是否預期行為
2 channel closed 通道傳送端被drop,接收端還在等待 確保傳送端在關閉前通知接收端
3 JoinError::Panic(...) Task內部panic 檢查task邏輯,新增錯誤處理
4 deadline has elapsed tokio::time::timeout超時 增加超時時間或最佳化處理速度
5 connection reset by peer 客戶端在服務端關閉時重置連線 實現連線排空,先停止新連線再等待現有連線
6 Address in use 服務關閉後埠未釋放 確保Listener正確drop,設定SO_REUSEADDR
7 broken pipe 向已關閉的連線寫資料 在寫操作前檢查連線狀態
8 database connection closed 關閉時資料庫連線池已釋放 調整資源釋放順序,資料庫連線最後關閉
9 runtime dropped Tokio runtime在task完成前被drop 確保runtime的生命週期覆蓋所有task
10 signal handler already registered 重複註冊訊號處理 使用once_cell或lazy_static確保只註冊一次

進階最佳化

1. 健康檢查感知關閉

use axum::{Json, extract::State};
use serde_json::{json, Value};
use std::sync::Arc;
use tokio::sync::watch;

#[derive(Clone)]
struct AppState {
    shutting_down: watch::Receiver<bool>,
}

async fn health_check(State(state): State<Arc<AppState>>) -> Json<Value> {
    if *state.shutting_down.borrow() {
        Json(json!({
            "status": "draining",
            "ready": false
        }))
    } else {
        Json(json!({
            "status": "healthy",
            "ready": true
        }))
    }
}

async fn readiness_check(State(state): State<Arc<AppState>>) -> &'static str {
    if *state.shutting_down.borrow() {
        "Service Unavailable"
    } else {
        "OK"
    }
}

2. 連線計數排空

use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::Notify;

#[derive(Clone)]
pub struct ConnectionTracker {
    active_connections: Arc<AtomicU64>,
    drain_notify: Arc<Notify>,
}

impl ConnectionTracker {
    pub fn new() -> Self {
        Self {
            active_connections: Arc::new(AtomicU64::new(0)),
            drain_notify: Arc::new(Notify::new()),
        }
    }

    pub fn connection_enter(&self) -> ConnectionGuard {
        self.active_connections.fetch_add(1, Ordering::Relaxed);
        ConnectionGuard {
            tracker: self.clone(),
        }
    }

    pub fn connection_exit(&self) {
        let prev = self.active_connections.fetch_sub(1, Ordering::Relaxed);
        if prev == 1 {
            self.drain_notify.notify_waiters();
        }
    }

    pub async fn drain(&self, timeout: std::time::Duration) {
        let deadline = tokio::time::Instant::now() + timeout;
        while self.active_connections.load(Ordering::Relaxed) > 0 {
            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
            if remaining.is_zero() {
                let remaining = self.active_connections.load(Ordering::Relaxed);
                eprintln!("Drain timeout, {} connections still active", remaining);
                break;
            }
            tokio::select! {
                _ = self.drain_notify.notified() => {}
                _ = tokio::time::sleep(remaining) => break,
            }
        }
    }
}

pub struct ConnectionGuard {
    tracker: ConnectionTracker,
}

impl Drop for ConnectionGuard {
    fn drop(&mut self) {
        self.tracker.connection_exit();
    }
}

3. 狀態持久化Hook

use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::Mutex;

type PersistFn = Box<dyn Fn() -> Pin<Box<dyn Future<Output = Result<(), String>> + Send>> + Send + Sync>;

pub struct StatePersistence {
    hooks: Arc<Mutex<Vec<(String, PersistFn)>>>,
}

impl StatePersistence {
    pub fn new() -> Self {
        Self {
            hooks: Arc::new(Mutex::new(Vec::new())),
        }
    }

    pub async fn register<F, Fut>(&self, name: impl Into<String>, persist_fn: F)
    where
        F: Fn() -> Fut + Send + Sync + 'static,
        Fut: Future<Output = Result<(), String>> + Send + 'static,
    {
        let name = name.into();
        let hook: PersistFn = Box::new(move || Box::pin(persist_fn()));
        self.hooks.lock().await.push((name, hook));
    }

    pub async fn persist_all(&self, timeout: std::time::Duration) -> Vec<(String, Result<(), String>)> {
        let hooks = self.hooks.lock().await;
        let mut results = Vec::new();

        for (name, hook) in hooks.iter() {
            let result = tokio::time::timeout(timeout, hook()).await;
            match result {
                Ok(Ok(())) => {
                    println!("✓ State persisted: {}", name);
                    results.push((name.clone(), Ok(())));
                }
                Ok(Err(e)) => {
                    eprintln!("✗ State persist failed for {}: {}", name, e);
                    results.push((name.clone(), Err(e)));
                }
                Err(_) => {
                    let err = format!("State persist timed out for {}", name);
                    eprintln!("✗ {}", err);
                    results.push((name.clone(), Err(err)));
                }
            }
        }

        results
    }
}

對比分析

維度 broadcast通道 watch通道 CancellationToken JoinSet mpsc通道
傳播模式 一對多廣播 一對多觀察 樹形級聯 任務集合 一對一/多
取消語義 通知型 狀態型 協作型 等待型 訊息型
多消費者 ✅所有都能收到 ✅所有都能觀察 ✅子token級聯 ❌單一等待者 ❌競爭消費
背壓支援 ❌慢消費者丟訊息 ✅只保留最新值 N/A N/A ✅有緩衝
資源開銷 極低
適用場景 全域關閉通知 狀態變更通知 Task協作取消 等待任務完成 命令式關閉

總結:Rust Tokio優雅關閉的核心是「協作式」——不像C的signal handler可以非同步中斷,Tokio的關閉需要每個task主動配合。5種模式中,CancellationToken是最通用的:它支援樹形級聯取消,child_token自動隨父token取消,且取消是冪等的。生產環境推薦組合:CancellationToken(task取消)+ watch通道(狀態通知)+ JoinSet(等待完成)+ ConnectionTracker(連線排空)+ StatePersistence(狀態持久化)。記住K8s的鐵律:terminationGracePeriodSeconds必須大於你的grace period + 資源清理時間。


線上工具推薦

本站提供瀏覽器本地工具,免註冊即可試用 →

#Rust#Tokio#优雅关闭#异步运行时#信号处理#2026#服务端