Rust Tokio优雅关闭:5种信号处理模式与资源清理的完整实战指南

编程语言

你的Rust服务又暴力关闭了

K8s滚动更新时,Pod收到SIGTERM后直接退出,正在处理的1000个请求全部断开,数据库事务回滚,消息队列消息丢失。用户看到满屏502,告警再次炸裂。你加了tokio::signal监听,结果发现关闭时goroutine——不对,是tokio task——还在跑,连接没排空,资源没释放。

Rust Tokio优雅关闭不是ctrlc::set_handler加一行就完事的。信号如何捕获?运行时如何停止?连接如何排空?资源如何清理?状态如何持久化?这些问题不搞清楚,每次部署都是一次"小事故"。

本文将从5种信号处理模式出发,带你完成信号捕获→运行时关闭→连接排空→资源清理→状态持久化的全链路实战。


Tokio优雅关闭核心概念

概念 说明
tokio::signal Tokio异步信号监听模块,支持SIGINT/SIGTERM/SIGHUP
tokio::sync::broadcast 广播通道,用于向所有worker传播关闭信号
tokio::sync::watch 单值观察通道,用于通知关闭状态变更
CancellationToken tokio_util提供的协作式取消令牌
tokio::task::JoinSet 任务集合,可等待所有任务完成后再关闭
Graceful Shutdown 优雅关闭:先停止接收新请求,等待现有请求完成,再退出
Connection Draining 连接排空:关闭监听端口后,等待活跃连接处理完毕

优雅关闭流程

1. 收到SIGTERM信号
2. 停止接收新连接(关闭Listener)
3. 通知所有worker开始优雅关闭
4. 等待活跃请求处理完毕(Connection Draining)
5. 关闭数据库连接池
6. 持久化必要状态
7. 退出进程(exit 0)

问题分析:Tokio优雅关闭的5大挑战

  1. 信号处理时机:信号可能在任意时刻到达,需确保不会在关键操作中间中断
  2. 任务等待与超时:等待所有task完成可能导致无限等待,需要设置grace period
  3. 连接排空:HTTP/gRPC连接需要先停止接收新请求,再等待现有请求完成
  4. 资源释放顺序:数据库连接、文件句柄、临时文件等需要按正确顺序释放
  5. 状态持久化:关闭前需要将内存中的状态持久化到磁盘或数据库

分步实操:完整优雅关闭实现

Step 1:基础信号处理

use tokio::signal;
use tokio::sync::broadcast;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let (shutdown_tx, _) = broadcast::channel::<()>(1);

    tokio::spawn(handle_shutdown_signal(shutdown_tx.clone()));

    let mut shutdown_rx = shutdown_tx.subscribe();

    tokio::select! {
        _ = run_server() => {
            println!("Server exited normally");
        }
        _ = shutdown_rx.recv() => {
            println!("Received shutdown signal");
        }
    }

    Ok(())
}

async fn handle_shutdown_signal(shutdown_tx: broadcast::Sender<()>) {
    let ctrl_c = async {
        signal::ctrl_c()
            .await
            .expect("Failed to install Ctrl+C handler");
    };

    #[cfg(unix)]
    let terminate = async {
        signal::unix::signal(signal::unix::SignalKind::terminate())
            .expect("Failed to install SIGTERM handler")
            .recv()
            .await;
    };

    #[cfg(not(unix))]
    let terminate = std::future::pending::<()>();

    tokio::select! {
        _ = ctrl_c => {
            println!("Received Ctrl+C");
        }
        _ = terminate => {
            println!("Received SIGTERM");
        }
    }

    let _ = shutdown_tx.send(());
}

async fn run_server() {
    loop {
        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
    }
}

Step 2:CancellationToken模式

use tokio_util::sync::CancellationToken;
use std::time::Duration;

struct AppServer {
    cancellation_token: CancellationToken,
    workers: Vec<tokio::task::JoinHandle<()>>,
}

impl AppServer {
    fn new() -> Self {
        Self {
            cancellation_token: CancellationToken::new(),
            workers: Vec::new(),
        }
    }

    async fn start(&mut self) {
        for id in 0..4 {
            let token = self.cancellation_token.child_token();
            let handle = tokio::spawn(async move {
                worker_loop(id, token).await;
            });
            self.workers.push(handle);
        }
    }

    async fn graceful_shutdown(self, timeout: Duration) {
        self.cancellation_token.cancel();

        let deadline = tokio::time::Instant::now() + timeout;
        for handle in self.workers {
            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
            match tokio::time::timeout(remaining, handle).await {
                Ok(Ok(())) => {}
                Ok(Err(e)) => {
                    eprintln!("Worker panicked: {}", e);
                }
                Err(_) => {
                    eprintln!("Worker did not shut down within timeout, aborting");
                }
            }
        }
    }
}

async fn worker_loop(id: usize, token: CancellationToken) {
    loop {
        tokio::select! {
            _ = token.cancelled() => {
                println!("Worker {} shutting down gracefully", id);
                break;
            }
            _ = tokio::time::sleep(Duration::from_secs(1)) => {
                println!("Worker {} processing task", id);
            }
        }
    }
    println!("Worker {} cleanup complete", id);
}

Step 3:HTTP服务连接排空

use axum::{Router, Server};
use axum::routing::get;
use hyper::server::conn::AddrIncoming;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::watch;

struct GracefulServer {
    shutdown_tx: watch::Sender<bool>,
    shutdown_rx: watch::Receiver<bool>,
}

impl GracefulServer {
    fn new() -> Self {
        let (shutdown_tx, shutdown_rx) = watch::channel(false);
        Self {
            shutdown_tx,
            shutdown_rx,
        }
    }

    async fn run(self: Arc<Self>, addr: SocketAddr) {
        let app = Router::new()
            .route("/health", get(|| async { "ok" }))
            .route("/api/data", get(handle_data));

        let server = Server::bind(&addr)
            .serve(app.into_make_service());

        let graceful = server.with_graceful_shutdown(async {
            let mut rx = self.shutdown_rx.clone();
            while !*rx.borrow_and_update() {
                if rx.changed().await.is_err() {
                    break;
                }
            }
        });

        if let Err(e) = graceful.await {
            eprintln!("Server error: {}", e);
        }
    }

    fn trigger_shutdown(&self) {
        let _ = self.shutdown_tx.send(true);
    }
}

async fn handle_data() -> &'static str {
    tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
    "data response"
}

Step 4:资源清理管理器

use std::future::Future;
use std::time::Duration;
use tokio::sync::Mutex;

struct CleanupTask {
    name: String,
    cleanup_fn: Box<dyn FnOnce() -> Box<dyn Future<Output = Result<(), String>> + Send> + Send>,
}

pub struct ResourceManager {
    cleanup_tasks: Mutex<Vec<CleanupTask>>,
    shutdown_timeout: Duration,
}

impl ResourceManager {
    pub fn new(shutdown_timeout: Duration) -> Self {
        Self {
            cleanup_tasks: Mutex::new(Vec::new()),
            shutdown_timeout,
        }
    }

    pub async fn register<F, Fut>(&self, name: impl Into<String>, cleanup_fn: F)
    where
        F: FnOnce() -> Fut + Send + 'static,
        Fut: Future<Output = Result<(), String>> + Send + 'static,
    {
        let task = CleanupTask {
            name: name.into(),
            cleanup_fn: Box::new(move || Box::new(cleanup_fn()) as _),
        };
        self.cleanup_tasks.lock().await.push(task);
    }

    pub async fn cleanup_all(&self) {
        let mut tasks = self.cleanup_tasks.lock().await;
        let tasks = std::mem::take(&mut *tasks);

        for task in tasks.into_iter().rev() {
            let result = tokio::time::timeout(
                self.shutdown_timeout,
                (task.cleanup_fn)(),
            ).await;

            match result {
                Ok(Ok(())) => {
                    println!("✓ Cleaned up: {}", task.name);
                }
                Ok(Err(e)) => {
                    eprintln!("✗ Cleanup failed for {}: {}", task.name, e);
                }
                Err(_) => {
                    eprintln!("✗ Cleanup timed out for {}", task.name);
                }
            }
        }
    }
}

Step 5:完整优雅关闭编排

use tokio::sync::broadcast;
use tokio_util::sync::CancellationToken;
use std::time::Duration;

pub struct GracefulShutdown {
    shutdown_signal: broadcast::Sender<()>,
    cancellation_token: CancellationToken,
    resource_manager: ResourceManager,
    grace_period: Duration,
}

impl GracefulShutdown {
    pub fn new(grace_period: Duration) -> Self {
        let (shutdown_signal, _) = broadcast::channel(1);
        Self {
            shutdown_signal,
            cancellation_token: CancellationToken::new(),
            resource_manager: ResourceManager::new(Duration::from_secs(5)),
            grace_period,
        }
    }

    pub fn shutdown_signal(&self) -> broadcast::Sender<()> {
        self.shutdown_signal.clone()
    }

    pub fn cancellation_token(&self) -> CancellationToken {
        self.cancellation_token.clone()
    }

    pub fn resource_manager(&self) -> &ResourceManager {
        &self.resource_manager
    }

    pub async fn run(self) {
        let mut rx = self.shutdown_signal.subscribe();

        rx.recv().await.ok();

        println!("🛑 Shutdown signal received, starting graceful shutdown...");

        self.cancellation_token.cancel();

        println!("⏳ Waiting up to {:?} for tasks to complete...", self.grace_period);
        tokio::time::sleep(self.grace_period).await;

        println!("🧹 Cleaning up resources...");
        self.resource_manager.cleanup_all().await;

        println!("✅ Graceful shutdown complete. Goodbye!");
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let shutdown = GracefulShutdown::new(Duration::from_secs(30));

    let signal_tx = shutdown.shutdown_signal();
    tokio::spawn(async move {
        tokio::signal::ctrl_c().await.ok();
        let _ = signal_tx.send(());
    });

    let token = shutdown.cancellation_token();
    tokio::spawn(async move {
        loop {
            tokio::select! {
                _ = token.cancelled() => {
                    println!("Background task shutting down");
                    break;
                }
                _ = tokio::time::sleep(Duration::from_secs(1)) => {
                    println!("Background task working...");
                }
            }
        }
    });

    shutdown.run().await;
    Ok(())
}

避坑指南

坑1:使用std::sync::mpsc而不是tokio::sync

// ❌ 错误:使用标准库通道在异步上下文中
let (tx, rx) = std::sync::mpsc::channel();
tokio::spawn(async move {
    let msg = rx.recv().unwrap(); // 阻塞当前线程!
});

// ✅ 正确:使用tokio异步通道
let (tx, mut rx) = tokio::sync::mpsc::channel(100);
tokio::spawn(async move {
    while let Some(msg) = rx.recv().await {
        println!("Received: {}", msg);
    }
});

坑2:在信号handler中执行耗时操作

// ❌ 错误:信号回调中做清理工作
ctrlc::set_handler(|| {
    std::thread::sleep(Duration::from_secs(5)); // 信号handler中不能阻塞!
    cleanup_database();
})?;

// ✅ 正确:信号只通知,清理在主循环中做
let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(1);
let tx_clone = tx.clone();
ctrlc::set_handler(move || {
    let _ = tx_clone.try_send(());
})?;

rx.recv().await;
cleanup_database().await;

坑3:JoinHandle不等待导致task被强制终止

// ❌ 错误:spawn后不等待,主函数退出task直接被杀
tokio::spawn(async {
    long_running_task().await;
});
// main函数退出,task被杀

// ✅ 正确:收集JoinHandle并等待完成
let mut tasks = tokio::task::JoinSet::new();
for i in 0..4 {
    tasks.spawn(async move { worker(i).await });
}
while let Some(result) = tasks.join_next().await {
    result?;
}

坑4:CancellationToken不传播到子task

// ❌ 错误:子task没有接收取消信号
let token = CancellationToken::new();
tokio::spawn(async {
    loop {
        do_work().await; // 永远不会停止
    }
});
token.cancel();

// ✅ 正确:将child_token传递给子task
let token = CancellationToken::new();
let child_token = token.child_token();
tokio::spawn(async move {
    loop {
        tokio::select! {
            _ = child_token.cancelled() => break,
            _ = do_work() => {}
        }
    }
});
token.cancel();

坑5:grace period设置不合理

// ❌ 错误:grace period太短,请求还没处理完就被杀
let grace_period = Duration::from_millis(100); // 太短!

// ✅ 正确:根据业务P99延迟设置合理的grace period
let grace_period = Duration::from_secs(30); // 大于P99延迟
// K8s中terminationGracePeriodSeconds应大于grace_period

报错排查

序号 报错信息 原因 解决方法
1 task 42 was cancelled Task在执行中被取消 检查CancellationToken或超时设置,确认是否预期行为
2 channel closed 通道发送端被drop,接收端还在等待 确保发送端在关闭前通知接收端
3 JoinError::Panic(...) Task内部panic 检查task逻辑,添加错误处理
4 deadline has elapsed tokio::time::timeout超时 增加超时时间或优化处理速度
5 connection reset by peer 客户端在服务端关闭时重置连接 实现连接排空,先停止新连接再等待现有连接
6 Address in use 服务关闭后端口未释放 确保Listener正确drop,设置SO_REUSEADDR
7 broken pipe 向已关闭的连接写数据 在写操作前检查连接状态
8 database connection closed 关闭时数据库连接池已释放 调整资源释放顺序,数据库连接最后关闭
9 runtime dropped Tokio runtime在task完成前被drop 确保runtime的生命周期覆盖所有task
10 signal handler already registered 重复注册信号处理 使用once_cell或lazy_static确保只注册一次

进阶优化

1. 健康检查感知关闭

use axum::{Json, extract::State};
use serde_json::{json, Value};
use std::sync::Arc;
use tokio::sync::watch;

#[derive(Clone)]
struct AppState {
    shutting_down: watch::Receiver<bool>,
}

async fn health_check(State(state): State<Arc<AppState>>) -> Json<Value> {
    if *state.shutting_down.borrow() {
        Json(json!({
            "status": "draining",
            "ready": false
        }))
    } else {
        Json(json!({
            "status": "healthy",
            "ready": true
        }))
    }
}

async fn readiness_check(State(state): State<Arc<AppState>>) -> &'static str {
    if *state.shutting_down.borrow() {
        "Service Unavailable"
    } else {
        "OK"
    }
}

2. 连接计数排空

use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::Notify;

#[derive(Clone)]
pub struct ConnectionTracker {
    active_connections: Arc<AtomicU64>,
    drain_notify: Arc<Notify>,
}

impl ConnectionTracker {
    pub fn new() -> Self {
        Self {
            active_connections: Arc::new(AtomicU64::new(0)),
            drain_notify: Arc::new(Notify::new()),
        }
    }

    pub fn connection_enter(&self) -> ConnectionGuard {
        self.active_connections.fetch_add(1, Ordering::Relaxed);
        ConnectionGuard {
            tracker: self.clone(),
        }
    }

    pub fn connection_exit(&self) {
        let prev = self.active_connections.fetch_sub(1, Ordering::Relaxed);
        if prev == 1 {
            self.drain_notify.notify_waiters();
        }
    }

    pub async fn drain(&self, timeout: std::time::Duration) {
        let deadline = tokio::time::Instant::now() + timeout;
        while self.active_connections.load(Ordering::Relaxed) > 0 {
            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
            if remaining.is_zero() {
                let remaining = self.active_connections.load(Ordering::Relaxed);
                eprintln!("Drain timeout, {} connections still active", remaining);
                break;
            }
            tokio::select! {
                _ = self.drain_notify.notified() => {}
                _ = tokio::time::sleep(remaining) => break,
            }
        }
    }
}

pub struct ConnectionGuard {
    tracker: ConnectionTracker,
}

impl Drop for ConnectionGuard {
    fn drop(&mut self) {
        self.tracker.connection_exit();
    }
}

3. 状态持久化Hook

use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::Mutex;

type PersistFn = Box<dyn Fn() -> Pin<Box<dyn Future<Output = Result<(), String>> + Send>> + Send + Sync>;

pub struct StatePersistence {
    hooks: Arc<Mutex<Vec<(String, PersistFn)>>>,
}

impl StatePersistence {
    pub fn new() -> Self {
        Self {
            hooks: Arc::new(Mutex::new(Vec::new())),
        }
    }

    pub async fn register<F, Fut>(&self, name: impl Into<String>, persist_fn: F)
    where
        F: Fn() -> Fut + Send + Sync + 'static,
        Fut: Future<Output = Result<(), String>> + Send + 'static,
    {
        let name = name.into();
        let hook: PersistFn = Box::new(move || Box::pin(persist_fn()));
        self.hooks.lock().await.push((name, hook));
    }

    pub async fn persist_all(&self, timeout: std::time::Duration) -> Vec<(String, Result<(), String>)> {
        let hooks = self.hooks.lock().await;
        let mut results = Vec::new();

        for (name, hook) in hooks.iter() {
            let result = tokio::time::timeout(timeout, hook()).await;
            match result {
                Ok(Ok(())) => {
                    println!("✓ State persisted: {}", name);
                    results.push((name.clone(), Ok(())));
                }
                Ok(Err(e)) => {
                    eprintln!("✗ State persist failed for {}: {}", name, e);
                    results.push((name.clone(), Err(e)));
                }
                Err(_) => {
                    let err = format!("State persist timed out for {}", name);
                    eprintln!("✗ {}", err);
                    results.push((name.clone(), Err(err)));
                }
            }
        }

        results
    }
}

对比分析

维度 broadcast通道 watch通道 CancellationToken JoinSet mpsc通道
传播模式 一对多广播 一对多观察 树形级联 任务集合 一对一/多
取消语义 通知型 状态型 协作型 等待型 消息型
多消费者 ✅所有都能收到 ✅所有都能观察 ✅子token级联 ❌单一等待者 ❌竞争消费
背压支持 ❌慢消费者丢消息 ✅只保留最新值 N/A N/A ✅有缓冲
资源开销 极低
适用场景 全局关闭通知 状态变更通知 Task协作取消 等待任务完成 命令式关闭

总结:Rust Tokio优雅关闭的核心是"协作式"——不像C的signal handler可以异步中断,Tokio的关闭需要每个task主动配合。5种模式中,CancellationToken是最通用的:它支持树形级联取消,child_token自动随父token取消,且取消是幂等的。生产环境推荐组合:CancellationToken(task取消)+ watch通道(状态通知)+ JoinSet(等待完成)+ ConnectionTracker(连接排空)+ StatePersistence(状态持久化)。记住K8s的铁律:terminationGracePeriodSeconds必须大于你的grace period + 资源清理时间。


在线工具推荐

本站提供浏览器本地工具,免注册即可试用 →

#Rust#Tokio#优雅关闭#异步运行时#信号处理#2026#服务端