Rust Tokioグレースフルシャットダウン:5つのシグナル処理パターンとリソースクリーンアップガイド
またRustサービスが暴力的にシャットダウンした
K8sローリングアップデート時、PodがSIGTERMを受信して即座に終了、処理中の1000リクエストがすべて切断、データベーストランザクションがロールバック、メッセージキューのメッセージが消失。ユーザーは502エラーの画面を見、アラートが再爆発。tokio::signalリスニングを追加したが、シャットダウン時にtokioタスクがまだ実行中、コネクションがドレインされていない、リソースが解放されていない。
Rust Tokioグレースフルシャットダウンはctrlc::set_handlerを1行追加するだけではない。シグナルをどうキャプチャするか?ランタイムをどう停止するか?コネクションをどうドレインするか?リソースをどうクリーンアップするか?状態をどう永続化するか?これらを理解しないと、毎回のデプロイが「小さな事故」になる。
本記事は5つのシグナル処理パターンから出発し、シグナルキャプチャ→ランタイムシャットダウン→コネクションドレイニング→リソースクリーンアップ→状態永続化のフルパイプラインを実践する。
Tokioグレースフルシャットダウンコア概念
| 概念 | 説明 |
|---|---|
| tokio::signal | Tokio非同期シグナルリスニングモジュール、SIGINT/SIGTERM/SIGHUP対応 |
| tokio::sync::broadcast | ブロードキャストチャネル、全ワーカーにシャットダウン信号を伝播 |
| tokio::sync::watch | 単値観察チャネル、シャットダウン状態変更を通知 |
| CancellationToken | tokio_util提供の協調的キャンセレーショントークン |
| tokio::task::JoinSet | タスクコレクション、全タスク完了後にシャットダウン可能 |
| Graceful Shutdown | グレースフルシャットダウン:新規リクエスト受信停止→既存リクエスト完了待ち→終了 |
| Connection Draining | コネクションドレイニング:リスナークローズ後、アクティブコネクションの処理完了を待機 |
グレースフルシャットダウンフロー
1. SIGTERMシグナル受信
2. 新規接続の受信停止(Listenerクローズ)
3. 全ワーカーにグレースフルシャットダウン開始を通知
4. アクティブリクエストの処理完了を待機(Connection Draining)
5. データベース接続プールのクローズ
6. 必要な状態の永続化
7. プロセス終了(exit 0)
問題分析:Tokioグレースフルシャットダウンの5つの課題
- シグナル処理タイミング:シグナルは任意のタイミングで到達、重要操作の途中で中断されないことを保証
- タスク待機とタイムアウト:全タスク完了待ちが無限待機になる可能性、grace periodが必要
- コネクションドレイニング:HTTP/gRPC接続は新規リクエスト停止→既存リクエスト完了待ちの順序が必要
- リソース解放順序:DB接続、ファイルハンドル、一時ファイルは正しい順序で解放
- 状態永続化:シャットダウン前にメモリ内状態をディスクまたはデータベースに永続化
ステップバイステップ:完全グレースフルシャットダウン実装
Step 1:基本シグナル処理
use tokio::signal;
use tokio::sync::broadcast;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (shutdown_tx, _) = broadcast::channel::<()>(1);
tokio::spawn(handle_shutdown_signal(shutdown_tx.clone()));
let mut shutdown_rx = shutdown_tx.subscribe();
tokio::select! {
_ = run_server() => {
println!("Server exited normally");
}
_ = shutdown_rx.recv() => {
println!("Received shutdown signal");
}
}
Ok(())
}
async fn handle_shutdown_signal(shutdown_tx: broadcast::Sender<()>) {
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("Failed to install Ctrl+C handler");
};
#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("Failed to install SIGTERM handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => {
println!("Received Ctrl+C");
}
_ = terminate => {
println!("Received SIGTERM");
}
}
let _ = shutdown_tx.send(());
}
async fn run_server() {
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
Step 2:CancellationTokenパターン
use tokio_util::sync::CancellationToken;
use std::time::Duration;
struct AppServer {
cancellation_token: CancellationToken,
workers: Vec<tokio::task::JoinHandle<()>>,
}
impl AppServer {
fn new() -> Self {
Self {
cancellation_token: CancellationToken::new(),
workers: Vec::new(),
}
}
async fn start(&mut self) {
for id in 0..4 {
let token = self.cancellation_token.child_token();
let handle = tokio::spawn(async move {
worker_loop(id, token).await;
});
self.workers.push(handle);
}
}
async fn graceful_shutdown(self, timeout: Duration) {
self.cancellation_token.cancel();
let deadline = tokio::time::Instant::now() + timeout;
for handle in self.workers {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
match tokio::time::timeout(remaining, handle).await {
Ok(Ok(())) => {}
Ok(Err(e)) => {
eprintln!("Worker panicked: {}", e);
}
Err(_) => {
eprintln!("Worker did not shut down within timeout, aborting");
}
}
}
}
}
async fn worker_loop(id: usize, token: CancellationToken) {
loop {
tokio::select! {
_ = token.cancelled() => {
println!("Worker {} shutting down gracefully", id);
break;
}
_ = tokio::time::sleep(Duration::from_secs(1)) => {
println!("Worker {} processing task", id);
}
}
}
println!("Worker {} cleanup complete", id);
}
Step 3:HTTPサーバーコネクションドレイニング
use axum::{Router, Server};
use axum::routing::get;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::watch;
struct GracefulServer {
shutdown_tx: watch::Sender<bool>,
shutdown_rx: watch::Receiver<bool>,
}
impl GracefulServer {
fn new() -> Self {
let (shutdown_tx, shutdown_rx) = watch::channel(false);
Self {
shutdown_tx,
shutdown_rx,
}
}
async fn run(self: Arc<Self>, addr: SocketAddr) {
let app = Router::new()
.route("/health", get(|| async { "ok" }))
.route("/api/data", get(handle_data));
let server = Server::bind(&addr)
.serve(app.into_make_service());
let graceful = server.with_graceful_shutdown(async {
let mut rx = self.shutdown_rx.clone();
while !*rx.borrow_and_update() {
if rx.changed().await.is_err() {
break;
}
}
});
if let Err(e) = graceful.await {
eprintln!("Server error: {}", e);
}
}
fn trigger_shutdown(&self) {
let _ = self.shutdown_tx.send(true);
}
}
async fn handle_data() -> &'static str {
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
"data response"
}
Step 4:リソースクリーンアップマネージャー
use std::future::Future;
use std::time::Duration;
use tokio::sync::Mutex;
struct CleanupTask {
name: String,
cleanup_fn: Box<dyn FnOnce() -> Box<dyn Future<Output = Result<(), String>> + Send> + Send>,
}
pub struct ResourceManager {
cleanup_tasks: Mutex<Vec<CleanupTask>>,
shutdown_timeout: Duration,
}
impl ResourceManager {
pub fn new(shutdown_timeout: Duration) -> Self {
Self {
cleanup_tasks: Mutex::new(Vec::new()),
shutdown_timeout,
}
}
pub async fn register<F, Fut>(&self, name: impl Into<String>, cleanup_fn: F)
where
F: FnOnce() -> Fut + Send + 'static,
Fut: Future<Output = Result<(), String>> + Send + 'static,
{
let task = CleanupTask {
name: name.into(),
cleanup_fn: Box::new(move || Box::new(cleanup_fn()) as _),
};
self.cleanup_tasks.lock().await.push(task);
}
pub async fn cleanup_all(&self) {
let mut tasks = self.cleanup_tasks.lock().await;
let tasks = std::mem::take(&mut *tasks);
for task in tasks.into_iter().rev() {
let result = tokio::time::timeout(
self.shutdown_timeout,
(task.cleanup_fn)(),
).await;
match result {
Ok(Ok(())) => {
println!("✓ Cleaned up: {}", task.name);
}
Ok(Err(e)) => {
eprintln!("✗ Cleanup failed for {}: {}", task.name, e);
}
Err(_) => {
eprintln!("✗ Cleanup timed out for {}", task.name);
}
}
}
}
}
Step 5:完全グレースフルシャットダウンオーケストレーション
use tokio::sync::broadcast;
use tokio_util::sync::CancellationToken;
use std::time::Duration;
pub struct GracefulShutdown {
shutdown_signal: broadcast::Sender<()>,
cancellation_token: CancellationToken,
resource_manager: ResourceManager,
grace_period: Duration,
}
impl GracefulShutdown {
pub fn new(grace_period: Duration) -> Self {
let (shutdown_signal, _) = broadcast::channel(1);
Self {
shutdown_signal,
cancellation_token: CancellationToken::new(),
resource_manager: ResourceManager::new(Duration::from_secs(5)),
grace_period,
}
}
pub fn shutdown_signal(&self) -> broadcast::Sender<()> {
self.shutdown_signal.clone()
}
pub fn cancellation_token(&self) -> CancellationToken {
self.cancellation_token.clone()
}
pub fn resource_manager(&self) -> &ResourceManager {
&self.resource_manager
}
pub async fn run(self) {
let mut rx = self.shutdown_signal.subscribe();
rx.recv().await.ok();
println!("🛑 Shutdown signal received, starting graceful shutdown...");
self.cancellation_token.cancel();
println!("⏳ Waiting up to {:?} for tasks to complete...", self.grace_period);
tokio::time::sleep(self.grace_period).await;
println!("🧹 Cleaning up resources...");
self.resource_manager.cleanup_all().await;
println!("✅ Graceful shutdown complete. Goodbye!");
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let shutdown = GracefulShutdown::new(Duration::from_secs(30));
let signal_tx = shutdown.shutdown_signal();
tokio::spawn(async move {
tokio::signal::ctrl_c().await.ok();
let _ = signal_tx.send(());
});
let token = shutdown.cancellation_token();
tokio::spawn(async move {
loop {
tokio::select! {
_ = token.cancelled() => {
println!("Background task shutting down");
break;
}
_ = tokio::time::sleep(Duration::from_secs(1)) => {
println!("Background task working...");
}
}
}
});
shutdown.run().await;
Ok(())
}
落とし穴ガイド
落とし穴1:std::sync::mpscの代わりにtokio::syncを使用
// ❌ 誤り:非同期コンテキストで標準ライブラリチャネルを使用
let (tx, rx) = std::sync::mpsc::channel();
tokio::spawn(async move {
let msg = rx.recv().unwrap(); // 現在のスレッドをブロック!
});
// ✅ 正しい:tokio非同期チャネルを使用
let (tx, mut rx) = tokio::sync::mpsc::channel(100);
tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
println!("Received: {}", msg);
}
});
落とし穴2:シグナルハンドラで重い操作を実行
// ❌ 誤り:シグナルコールバックでクリーンアップ
ctrlc::set_handler(|| {
std::thread::sleep(Duration::from_secs(5)); // シグナルハンドラでブロック不可!
cleanup_database();
})?;
// ✅ 正しい:シグナルは通知のみ、クリーンアップはメインループで
let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(1);
let tx_clone = tx.clone();
ctrlc::set_handler(move || {
let _ = tx_clone.try_send(());
})?;
rx.recv().await;
cleanup_database().await;
落とし穴3:JoinHandleを待機せずタスクが強制終了
// ❌ 誤り:spawn後待機なし、main終了でタスクがキルされる
tokio::spawn(async {
long_running_task().await;
});
// main終了、タスクがキルされる
// ✅ 正しい:JoinHandleを収集して完了を待機
let mut tasks = tokio::task::JoinSet::new();
for i in 0..4 {
tasks.spawn(async move { worker(i).await });
}
while let Some(result) = tasks.join_next().await {
result?;
}
落とし穴4:CancellationTokenが子タスクに伝播されない
// ❌ 誤り:子タスクがキャンセル信号を受信しない
let token = CancellationToken::new();
tokio::spawn(async {
loop {
do_work().await; // 永遠に停止しない
}
});
token.cancel();
// ✅ 正しい:child_tokenを子タスクに渡す
let token = CancellationToken::new();
let child_token = token.child_token();
tokio::spawn(async move {
loop {
tokio::select! {
_ = child_token.cancelled() => break,
_ = do_work() => {}
}
}
});
token.cancel();
落とし穴5:不当なgrace period設定
// ❌ 誤り:grace periodが短すぎる、リクエスト完了前にキルされる
let grace_period = Duration::from_millis(100); // 短すぎる!
// ✅ 正しい:P99レイテンシに基づいて合理的なgrace periodを設定
let grace_period = Duration::from_secs(30); // P99レイテンシより大きく
// K8sのterminationGracePeriodSecondsはgrace_periodより大きく
エラートラブルシューティング
| # | エラーメッセージ | 原因 | 解決方法 |
|---|---|---|---|
| 1 | task 42 was cancelled |
タスクが実行中にキャンセルされた | CancellationTokenやタイムアウトを確認、期待される動作か確認 |
| 2 | channel closed |
送信側がドロップ、受信側が待機中 | クローズ前に送信側が受信側に通知することを保証 |
| 3 | JoinError::Panic(...) |
タスク内部でパニック | タスクロジックを確認、エラーハンドリングを追加 |
| 4 | deadline has elapsed |
tokio::time::timeout期限切れ | タイムアウトを増やすか処理速度を最適化 |
| 5 | connection reset by peer |
クライアントがサーバーシャットダウン時に接続をリセット | コネクションドレイニングを実装、新規接続を先に停止 |
| 6 | Address in use |
シャットダウン後ポートが解放されない | Listenerが正しくドロップされることを確認、SO_REUSEADDRを設定 |
| 7 | broken pipe |
クローズされた接続に書き込み | 書き込み前に接続状態を確認 |
| 8 | database connection closed |
シャットダウン時にDB接続プールが解放済み | リソース解放順序を調整、DB接続を最後にクローズ |
| 9 | runtime dropped |
Tokioランタイムがタスク完了前にドロップ | ランタイムのライフタイムが全タスクをカバーすることを保証 |
| 10 | signal handler already registered |
シグナルハンドラが複数回登録 | once_cellやlazy_staticで1回のみ登録 |
高度な最適化
1. ヘルスチェック対応シャットダウン
use axum::{Json, extract::State};
use serde_json::{json, Value};
use std::sync::Arc;
use tokio::sync::watch;
#[derive(Clone)]
struct AppState {
shutting_down: watch::Receiver<bool>,
}
async fn health_check(State(state): State<Arc<AppState>>) -> Json<Value> {
if *state.shutting_down.borrow() {
Json(json!({
"status": "draining",
"ready": false
}))
} else {
Json(json!({
"status": "healthy",
"ready": true
}))
}
}
async fn readiness_check(State(state): State<Arc<AppState>>) -> &'static str {
if *state.shutting_down.borrow() {
"Service Unavailable"
} else {
"OK"
}
}
2. コネクションカウントドレイニング
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::Notify;
#[derive(Clone)]
pub struct ConnectionTracker {
active_connections: Arc<AtomicU64>,
drain_notify: Arc<Notify>,
}
impl ConnectionTracker {
pub fn new() -> Self {
Self {
active_connections: Arc::new(AtomicU64::new(0)),
drain_notify: Arc::new(Notify::new()),
}
}
pub fn connection_enter(&self) -> ConnectionGuard {
self.active_connections.fetch_add(1, Ordering::Relaxed);
ConnectionGuard {
tracker: self.clone(),
}
}
pub fn connection_exit(&self) {
let prev = self.active_connections.fetch_sub(1, Ordering::Relaxed);
if prev == 1 {
self.drain_notify.notify_waiters();
}
}
pub async fn drain(&self, timeout: std::time::Duration) {
let deadline = tokio::time::Instant::now() + timeout;
while self.active_connections.load(Ordering::Relaxed) > 0 {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
let remaining = self.active_connections.load(Ordering::Relaxed);
eprintln!("Drain timeout, {} connections still active", remaining);
break;
}
tokio::select! {
_ = self.drain_notify.notified() => {}
_ = tokio::time::sleep(remaining) => break,
}
}
}
}
pub struct ConnectionGuard {
tracker: ConnectionTracker,
}
impl Drop for ConnectionGuard {
fn drop(&mut self) {
self.tracker.connection_exit();
}
}
3. 状態永続化フック
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::Mutex;
type PersistFn = Box<dyn Fn() -> Pin<Box<dyn Future<Output = Result<(), String>> + Send>> + Send + Sync>;
pub struct StatePersistence {
hooks: Arc<Mutex<Vec<(String, PersistFn)>>>,
}
impl StatePersistence {
pub fn new() -> Self {
Self {
hooks: Arc::new(Mutex::new(Vec::new())),
}
}
pub async fn register<F, Fut>(&self, name: impl Into<String>, persist_fn: F)
where
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<(), String>> + Send + 'static,
{
let name = name.into();
let hook: PersistFn = Box::new(move || Box::pin(persist_fn()));
self.hooks.lock().await.push((name, hook));
}
pub async fn persist_all(&self, timeout: std::time::Duration) -> Vec<(String, Result<(), String>)> {
let hooks = self.hooks.lock().await;
let mut results = Vec::new();
for (name, hook) in hooks.iter() {
let result = tokio::time::timeout(timeout, hook()).await;
match result {
Ok(Ok(())) => {
println!("✓ State persisted: {}", name);
results.push((name.clone(), Ok(())));
}
Ok(Err(e)) => {
eprintln!("✗ State persist failed for {}: {}", name, e);
results.push((name.clone(), Err(e)));
}
Err(_) => {
let err = format!("State persist timed out for {}", name);
eprintln!("✗ {}", err);
results.push((name.clone(), Err(err)));
}
}
}
results
}
}
比較分析
| 次元 | broadcastチャネル | watchチャネル | CancellationToken | JoinSet | mpscチャネル |
|---|---|---|---|---|---|
| 伝播モード | 1対多ブロードキャスト | 1対多観察 | ツリーカスケード | タスクコレクション | 1対1/多 |
| キャンセルセマンティクス | 通知型 | 状態型 | 協調型 | 待機型 | メッセージ型 |
| マルチコンシューマー | ✅全員受信 | ✅全員観察 | ✅子トークンカスケード | ❌単一待機者 | ❌競合消費 |
| バックプレッシャー | ❌遅いコンシューマーはメッセージ損失 | ✅最新値のみ保持 | N/A | N/A | ✅バッファあり |
| リソースオーバーヘッド | 低 | 極低 | 低 | 中 | 中 |
| ユースケース | グローバルシャットダウン通知 | 状態変更通知 | タスク協調キャンセル | タスク完了待機 | コマンド式シャットダウン |
まとめ:Rust Tokioグレースフルシャットダウンの核心は「協調的」であること——Cのシグナルハンドラのように非同期に中断できるわけではなく、Tokioのシャットダウンは各タスクが能動的に協力する必要がある。5つのパターンのうち、CancellationTokenが最も汎用的:ツリーカスケードキャンセルをサポート、child_tokenは親トークンと共に自動キャンセル、キャンセルは冪等。本番推奨の組み合わせ:CancellationToken(タスクキャンセル)+ watchチャネル(状態通知)+ JoinSet(完了待機)+ ConnectionTracker(コネクションドレイニング)+ StatePersistence(状態永続化)。K8sの鉄則を忘れずに:
terminationGracePeriodSecondsはgrace period + リソースクリーンアップ時間より大きくすること。
オンラインツール推奨
- JSONフォーマッター:/ja/json/format
- Base64エンコード/デコード:/ja/encode/base64
- Hash計算:/ja/encode/hash
- JWTデコード:/ja/encode/jwt-decode
ブラウザローカルツールを無料で試す →