Rust Tokioグレースフルシャットダウン:5つのシグナル処理パターンとリソースクリーンアップガイド

编程语言

またRustサービスが暴力的にシャットダウンした

K8sローリングアップデート時、PodがSIGTERMを受信して即座に終了、処理中の1000リクエストがすべて切断、データベーストランザクションがロールバック、メッセージキューのメッセージが消失。ユーザーは502エラーの画面を見、アラートが再爆発。tokio::signalリスニングを追加したが、シャットダウン時にtokioタスクがまだ実行中、コネクションがドレインされていない、リソースが解放されていない。

Rust Tokioグレースフルシャットダウンctrlc::set_handlerを1行追加するだけではない。シグナルをどうキャプチャするか?ランタイムをどう停止するか?コネクションをどうドレインするか?リソースをどうクリーンアップするか?状態をどう永続化するか?これらを理解しないと、毎回のデプロイが「小さな事故」になる。

本記事は5つのシグナル処理パターンから出発し、シグナルキャプチャ→ランタイムシャットダウン→コネクションドレイニング→リソースクリーンアップ→状態永続化のフルパイプラインを実践する。


Tokioグレースフルシャットダウンコア概念

概念 説明
tokio::signal Tokio非同期シグナルリスニングモジュール、SIGINT/SIGTERM/SIGHUP対応
tokio::sync::broadcast ブロードキャストチャネル、全ワーカーにシャットダウン信号を伝播
tokio::sync::watch 単値観察チャネル、シャットダウン状態変更を通知
CancellationToken tokio_util提供の協調的キャンセレーショントークン
tokio::task::JoinSet タスクコレクション、全タスク完了後にシャットダウン可能
Graceful Shutdown グレースフルシャットダウン:新規リクエスト受信停止→既存リクエスト完了待ち→終了
Connection Draining コネクションドレイニング:リスナークローズ後、アクティブコネクションの処理完了を待機

グレースフルシャットダウンフロー

1. SIGTERMシグナル受信
2. 新規接続の受信停止(Listenerクローズ)
3. 全ワーカーにグレースフルシャットダウン開始を通知
4. アクティブリクエストの処理完了を待機(Connection Draining)
5. データベース接続プールのクローズ
6. 必要な状態の永続化
7. プロセス終了(exit 0)

問題分析:Tokioグレースフルシャットダウンの5つの課題

  1. シグナル処理タイミング:シグナルは任意のタイミングで到達、重要操作の途中で中断されないことを保証
  2. タスク待機とタイムアウト:全タスク完了待ちが無限待機になる可能性、grace periodが必要
  3. コネクションドレイニング:HTTP/gRPC接続は新規リクエスト停止→既存リクエスト完了待ちの順序が必要
  4. リソース解放順序:DB接続、ファイルハンドル、一時ファイルは正しい順序で解放
  5. 状態永続化:シャットダウン前にメモリ内状態をディスクまたはデータベースに永続化

ステップバイステップ:完全グレースフルシャットダウン実装

Step 1:基本シグナル処理

use tokio::signal;
use tokio::sync::broadcast;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let (shutdown_tx, _) = broadcast::channel::<()>(1);

    tokio::spawn(handle_shutdown_signal(shutdown_tx.clone()));

    let mut shutdown_rx = shutdown_tx.subscribe();

    tokio::select! {
        _ = run_server() => {
            println!("Server exited normally");
        }
        _ = shutdown_rx.recv() => {
            println!("Received shutdown signal");
        }
    }

    Ok(())
}

async fn handle_shutdown_signal(shutdown_tx: broadcast::Sender<()>) {
    let ctrl_c = async {
        signal::ctrl_c()
            .await
            .expect("Failed to install Ctrl+C handler");
    };

    #[cfg(unix)]
    let terminate = async {
        signal::unix::signal(signal::unix::SignalKind::terminate())
            .expect("Failed to install SIGTERM handler")
            .recv()
            .await;
    };

    #[cfg(not(unix))]
    let terminate = std::future::pending::<()>();

    tokio::select! {
        _ = ctrl_c => {
            println!("Received Ctrl+C");
        }
        _ = terminate => {
            println!("Received SIGTERM");
        }
    }

    let _ = shutdown_tx.send(());
}

async fn run_server() {
    loop {
        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
    }
}

Step 2:CancellationTokenパターン

use tokio_util::sync::CancellationToken;
use std::time::Duration;

struct AppServer {
    cancellation_token: CancellationToken,
    workers: Vec<tokio::task::JoinHandle<()>>,
}

impl AppServer {
    fn new() -> Self {
        Self {
            cancellation_token: CancellationToken::new(),
            workers: Vec::new(),
        }
    }

    async fn start(&mut self) {
        for id in 0..4 {
            let token = self.cancellation_token.child_token();
            let handle = tokio::spawn(async move {
                worker_loop(id, token).await;
            });
            self.workers.push(handle);
        }
    }

    async fn graceful_shutdown(self, timeout: Duration) {
        self.cancellation_token.cancel();

        let deadline = tokio::time::Instant::now() + timeout;
        for handle in self.workers {
            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
            match tokio::time::timeout(remaining, handle).await {
                Ok(Ok(())) => {}
                Ok(Err(e)) => {
                    eprintln!("Worker panicked: {}", e);
                }
                Err(_) => {
                    eprintln!("Worker did not shut down within timeout, aborting");
                }
            }
        }
    }
}

async fn worker_loop(id: usize, token: CancellationToken) {
    loop {
        tokio::select! {
            _ = token.cancelled() => {
                println!("Worker {} shutting down gracefully", id);
                break;
            }
            _ = tokio::time::sleep(Duration::from_secs(1)) => {
                println!("Worker {} processing task", id);
            }
        }
    }
    println!("Worker {} cleanup complete", id);
}

Step 3:HTTPサーバーコネクションドレイニング

use axum::{Router, Server};
use axum::routing::get;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::watch;

struct GracefulServer {
    shutdown_tx: watch::Sender<bool>,
    shutdown_rx: watch::Receiver<bool>,
}

impl GracefulServer {
    fn new() -> Self {
        let (shutdown_tx, shutdown_rx) = watch::channel(false);
        Self {
            shutdown_tx,
            shutdown_rx,
        }
    }

    async fn run(self: Arc<Self>, addr: SocketAddr) {
        let app = Router::new()
            .route("/health", get(|| async { "ok" }))
            .route("/api/data", get(handle_data));

        let server = Server::bind(&addr)
            .serve(app.into_make_service());

        let graceful = server.with_graceful_shutdown(async {
            let mut rx = self.shutdown_rx.clone();
            while !*rx.borrow_and_update() {
                if rx.changed().await.is_err() {
                    break;
                }
            }
        });

        if let Err(e) = graceful.await {
            eprintln!("Server error: {}", e);
        }
    }

    fn trigger_shutdown(&self) {
        let _ = self.shutdown_tx.send(true);
    }
}

async fn handle_data() -> &'static str {
    tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
    "data response"
}

Step 4:リソースクリーンアップマネージャー

use std::future::Future;
use std::time::Duration;
use tokio::sync::Mutex;

struct CleanupTask {
    name: String,
    cleanup_fn: Box<dyn FnOnce() -> Box<dyn Future<Output = Result<(), String>> + Send> + Send>,
}

pub struct ResourceManager {
    cleanup_tasks: Mutex<Vec<CleanupTask>>,
    shutdown_timeout: Duration,
}

impl ResourceManager {
    pub fn new(shutdown_timeout: Duration) -> Self {
        Self {
            cleanup_tasks: Mutex::new(Vec::new()),
            shutdown_timeout,
        }
    }

    pub async fn register<F, Fut>(&self, name: impl Into<String>, cleanup_fn: F)
    where
        F: FnOnce() -> Fut + Send + 'static,
        Fut: Future<Output = Result<(), String>> + Send + 'static,
    {
        let task = CleanupTask {
            name: name.into(),
            cleanup_fn: Box::new(move || Box::new(cleanup_fn()) as _),
        };
        self.cleanup_tasks.lock().await.push(task);
    }

    pub async fn cleanup_all(&self) {
        let mut tasks = self.cleanup_tasks.lock().await;
        let tasks = std::mem::take(&mut *tasks);

        for task in tasks.into_iter().rev() {
            let result = tokio::time::timeout(
                self.shutdown_timeout,
                (task.cleanup_fn)(),
            ).await;

            match result {
                Ok(Ok(())) => {
                    println!("✓ Cleaned up: {}", task.name);
                }
                Ok(Err(e)) => {
                    eprintln!("✗ Cleanup failed for {}: {}", task.name, e);
                }
                Err(_) => {
                    eprintln!("✗ Cleanup timed out for {}", task.name);
                }
            }
        }
    }
}

Step 5:完全グレースフルシャットダウンオーケストレーション

use tokio::sync::broadcast;
use tokio_util::sync::CancellationToken;
use std::time::Duration;

pub struct GracefulShutdown {
    shutdown_signal: broadcast::Sender<()>,
    cancellation_token: CancellationToken,
    resource_manager: ResourceManager,
    grace_period: Duration,
}

impl GracefulShutdown {
    pub fn new(grace_period: Duration) -> Self {
        let (shutdown_signal, _) = broadcast::channel(1);
        Self {
            shutdown_signal,
            cancellation_token: CancellationToken::new(),
            resource_manager: ResourceManager::new(Duration::from_secs(5)),
            grace_period,
        }
    }

    pub fn shutdown_signal(&self) -> broadcast::Sender<()> {
        self.shutdown_signal.clone()
    }

    pub fn cancellation_token(&self) -> CancellationToken {
        self.cancellation_token.clone()
    }

    pub fn resource_manager(&self) -> &ResourceManager {
        &self.resource_manager
    }

    pub async fn run(self) {
        let mut rx = self.shutdown_signal.subscribe();

        rx.recv().await.ok();

        println!("🛑 Shutdown signal received, starting graceful shutdown...");

        self.cancellation_token.cancel();

        println!("⏳ Waiting up to {:?} for tasks to complete...", self.grace_period);
        tokio::time::sleep(self.grace_period).await;

        println!("🧹 Cleaning up resources...");
        self.resource_manager.cleanup_all().await;

        println!("✅ Graceful shutdown complete. Goodbye!");
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let shutdown = GracefulShutdown::new(Duration::from_secs(30));

    let signal_tx = shutdown.shutdown_signal();
    tokio::spawn(async move {
        tokio::signal::ctrl_c().await.ok();
        let _ = signal_tx.send(());
    });

    let token = shutdown.cancellation_token();
    tokio::spawn(async move {
        loop {
            tokio::select! {
                _ = token.cancelled() => {
                    println!("Background task shutting down");
                    break;
                }
                _ = tokio::time::sleep(Duration::from_secs(1)) => {
                    println!("Background task working...");
                }
            }
        }
    });

    shutdown.run().await;
    Ok(())
}

落とし穴ガイド

落とし穴1:std::sync::mpscの代わりにtokio::syncを使用

// ❌ 誤り:非同期コンテキストで標準ライブラリチャネルを使用
let (tx, rx) = std::sync::mpsc::channel();
tokio::spawn(async move {
    let msg = rx.recv().unwrap(); // 現在のスレッドをブロック!
});

// ✅ 正しい:tokio非同期チャネルを使用
let (tx, mut rx) = tokio::sync::mpsc::channel(100);
tokio::spawn(async move {
    while let Some(msg) = rx.recv().await {
        println!("Received: {}", msg);
    }
});

落とし穴2:シグナルハンドラで重い操作を実行

// ❌ 誤り:シグナルコールバックでクリーンアップ
ctrlc::set_handler(|| {
    std::thread::sleep(Duration::from_secs(5)); // シグナルハンドラでブロック不可!
    cleanup_database();
})?;

// ✅ 正しい:シグナルは通知のみ、クリーンアップはメインループで
let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(1);
let tx_clone = tx.clone();
ctrlc::set_handler(move || {
    let _ = tx_clone.try_send(());
})?;

rx.recv().await;
cleanup_database().await;

落とし穴3:JoinHandleを待機せずタスクが強制終了

// ❌ 誤り:spawn後待機なし、main終了でタスクがキルされる
tokio::spawn(async {
    long_running_task().await;
});
// main終了、タスクがキルされる

// ✅ 正しい:JoinHandleを収集して完了を待機
let mut tasks = tokio::task::JoinSet::new();
for i in 0..4 {
    tasks.spawn(async move { worker(i).await });
}
while let Some(result) = tasks.join_next().await {
    result?;
}

落とし穴4:CancellationTokenが子タスクに伝播されない

// ❌ 誤り:子タスクがキャンセル信号を受信しない
let token = CancellationToken::new();
tokio::spawn(async {
    loop {
        do_work().await; // 永遠に停止しない
    }
});
token.cancel();

// ✅ 正しい:child_tokenを子タスクに渡す
let token = CancellationToken::new();
let child_token = token.child_token();
tokio::spawn(async move {
    loop {
        tokio::select! {
            _ = child_token.cancelled() => break,
            _ = do_work() => {}
        }
    }
});
token.cancel();

落とし穴5:不当なgrace period設定

// ❌ 誤り:grace periodが短すぎる、リクエスト完了前にキルされる
let grace_period = Duration::from_millis(100); // 短すぎる!

// ✅ 正しい:P99レイテンシに基づいて合理的なgrace periodを設定
let grace_period = Duration::from_secs(30); // P99レイテンシより大きく
// K8sのterminationGracePeriodSecondsはgrace_periodより大きく

エラートラブルシューティング

# エラーメッセージ 原因 解決方法
1 task 42 was cancelled タスクが実行中にキャンセルされた CancellationTokenやタイムアウトを確認、期待される動作か確認
2 channel closed 送信側がドロップ、受信側が待機中 クローズ前に送信側が受信側に通知することを保証
3 JoinError::Panic(...) タスク内部でパニック タスクロジックを確認、エラーハンドリングを追加
4 deadline has elapsed tokio::time::timeout期限切れ タイムアウトを増やすか処理速度を最適化
5 connection reset by peer クライアントがサーバーシャットダウン時に接続をリセット コネクションドレイニングを実装、新規接続を先に停止
6 Address in use シャットダウン後ポートが解放されない Listenerが正しくドロップされることを確認、SO_REUSEADDRを設定
7 broken pipe クローズされた接続に書き込み 書き込み前に接続状態を確認
8 database connection closed シャットダウン時にDB接続プールが解放済み リソース解放順序を調整、DB接続を最後にクローズ
9 runtime dropped Tokioランタイムがタスク完了前にドロップ ランタイムのライフタイムが全タスクをカバーすることを保証
10 signal handler already registered シグナルハンドラが複数回登録 once_cellやlazy_staticで1回のみ登録

高度な最適化

1. ヘルスチェック対応シャットダウン

use axum::{Json, extract::State};
use serde_json::{json, Value};
use std::sync::Arc;
use tokio::sync::watch;

#[derive(Clone)]
struct AppState {
    shutting_down: watch::Receiver<bool>,
}

async fn health_check(State(state): State<Arc<AppState>>) -> Json<Value> {
    if *state.shutting_down.borrow() {
        Json(json!({
            "status": "draining",
            "ready": false
        }))
    } else {
        Json(json!({
            "status": "healthy",
            "ready": true
        }))
    }
}

async fn readiness_check(State(state): State<Arc<AppState>>) -> &'static str {
    if *state.shutting_down.borrow() {
        "Service Unavailable"
    } else {
        "OK"
    }
}

2. コネクションカウントドレイニング

use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::Notify;

#[derive(Clone)]
pub struct ConnectionTracker {
    active_connections: Arc<AtomicU64>,
    drain_notify: Arc<Notify>,
}

impl ConnectionTracker {
    pub fn new() -> Self {
        Self {
            active_connections: Arc::new(AtomicU64::new(0)),
            drain_notify: Arc::new(Notify::new()),
        }
    }

    pub fn connection_enter(&self) -> ConnectionGuard {
        self.active_connections.fetch_add(1, Ordering::Relaxed);
        ConnectionGuard {
            tracker: self.clone(),
        }
    }

    pub fn connection_exit(&self) {
        let prev = self.active_connections.fetch_sub(1, Ordering::Relaxed);
        if prev == 1 {
            self.drain_notify.notify_waiters();
        }
    }

    pub async fn drain(&self, timeout: std::time::Duration) {
        let deadline = tokio::time::Instant::now() + timeout;
        while self.active_connections.load(Ordering::Relaxed) > 0 {
            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
            if remaining.is_zero() {
                let remaining = self.active_connections.load(Ordering::Relaxed);
                eprintln!("Drain timeout, {} connections still active", remaining);
                break;
            }
            tokio::select! {
                _ = self.drain_notify.notified() => {}
                _ = tokio::time::sleep(remaining) => break,
            }
        }
    }
}

pub struct ConnectionGuard {
    tracker: ConnectionTracker,
}

impl Drop for ConnectionGuard {
    fn drop(&mut self) {
        self.tracker.connection_exit();
    }
}

3. 状態永続化フック

use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::Mutex;

type PersistFn = Box<dyn Fn() -> Pin<Box<dyn Future<Output = Result<(), String>> + Send>> + Send + Sync>;

pub struct StatePersistence {
    hooks: Arc<Mutex<Vec<(String, PersistFn)>>>,
}

impl StatePersistence {
    pub fn new() -> Self {
        Self {
            hooks: Arc::new(Mutex::new(Vec::new())),
        }
    }

    pub async fn register<F, Fut>(&self, name: impl Into<String>, persist_fn: F)
    where
        F: Fn() -> Fut + Send + Sync + 'static,
        Fut: Future<Output = Result<(), String>> + Send + 'static,
    {
        let name = name.into();
        let hook: PersistFn = Box::new(move || Box::pin(persist_fn()));
        self.hooks.lock().await.push((name, hook));
    }

    pub async fn persist_all(&self, timeout: std::time::Duration) -> Vec<(String, Result<(), String>)> {
        let hooks = self.hooks.lock().await;
        let mut results = Vec::new();

        for (name, hook) in hooks.iter() {
            let result = tokio::time::timeout(timeout, hook()).await;
            match result {
                Ok(Ok(())) => {
                    println!("✓ State persisted: {}", name);
                    results.push((name.clone(), Ok(())));
                }
                Ok(Err(e)) => {
                    eprintln!("✗ State persist failed for {}: {}", name, e);
                    results.push((name.clone(), Err(e)));
                }
                Err(_) => {
                    let err = format!("State persist timed out for {}", name);
                    eprintln!("✗ {}", err);
                    results.push((name.clone(), Err(err)));
                }
            }
        }

        results
    }
}

比較分析

次元 broadcastチャネル watchチャネル CancellationToken JoinSet mpscチャネル
伝播モード 1対多ブロードキャスト 1対多観察 ツリーカスケード タスクコレクション 1対1/多
キャンセルセマンティクス 通知型 状態型 協調型 待機型 メッセージ型
マルチコンシューマー ✅全員受信 ✅全員観察 ✅子トークンカスケード ❌単一待機者 ❌競合消費
バックプレッシャー ❌遅いコンシューマーはメッセージ損失 ✅最新値のみ保持 N/A N/A ✅バッファあり
リソースオーバーヘッド 極低
ユースケース グローバルシャットダウン通知 状態変更通知 タスク協調キャンセル タスク完了待機 コマンド式シャットダウン

まとめ:Rust Tokioグレースフルシャットダウンの核心は「協調的」であること——Cのシグナルハンドラのように非同期に中断できるわけではなく、Tokioのシャットダウンは各タスクが能動的に協力する必要がある。5つのパターンのうち、CancellationTokenが最も汎用的:ツリーカスケードキャンセルをサポート、child_tokenは親トークンと共に自動キャンセル、キャンセルは冪等。本番推奨の組み合わせ:CancellationToken(タスクキャンセル)+ watchチャネル(状態通知)+ JoinSet(完了待機)+ ConnectionTracker(コネクションドレイニング)+ StatePersistence(状態永続化)。K8sの鉄則を忘れずに:terminationGracePeriodSecondsはgrace period + リソースクリーンアップ時間より大きくすること。


オンラインツール推奨

ブラウザローカルツールを無料で試す →

#Rust#Tokio#优雅关闭#异步运行时#信号处理#2026#服务端