Rust Tokio Channelパターン実践:mpscからBroadcastまで6つのプロダクションパターン

编程语言

非同期Rustコードが「喧嘩」し始めたら

こんな経験はありませんか——複数の非同期タスクが状態を共有する必要があり、Arc<Mutex<T>> を追加したら、ロック競合でパフォーマンスが急低下。RwLock に切り替えたら、書き込みロックの飢餓でデッドロックが発生。そして気づくのです——非同期の世界では、メッセージパッシングこそが正解だと。

Tokioのチャネルシステムはまさにこの問題のために作られました:異なる通信パターンが異なる並行シナリオに対応します。正しいチャネルを選べば、非同期アーキテクチャは「互いに待ち合う」状態から「効率的に協調する」状態へと変わります。


コア概念一覧

概念 説明 典型的なユースケース
mpsc マルチプロデューサ・シングルコンシューマチャネル タスクキュー、イベントストリーム
oneshot 一回限りの単一値チャネル リクエスト・レスポンス、Future通知
broadcast マルチコンシューマブロードキャストチャネル イベントファンアウト、ログ配信
watch 単一値状態監視チャネル 設定ホットリロード、状態同期
Backpressure バックプレッシャー機構 フロー制御、OOM防止
select! マルチチャネル多重化マクロ マルチソースイベント処理
Stream 非同期イテレータ チャネル合成、ストリーム処理

非同期通信の5つの課題

1. 共有可変状態のロック地獄

Arc<Mutex<T>> は同期コードでは機能しますが、非同期ランタイムでは長時間ロックを保持するとスレッド全体がブロックされ、他のタスクがスケジューリングされなくなります。

2. タスク間通信パターンのミスマッチ

mpsc でブロードキャストを、broadcast でリクエスト・レスポンスを実装する——間違ったチャネルタイプの選択は、パフォーマンスが悪いだけでなく、論理バグを引き起こす可能性があります。

3. バックプレッシャー欠如によるメモリ爆発

非バインドチャネルは便利に見えますが、生産速度が消費速度を大幅に上回ると、メモリはOOMに至るまで増え続けます。

4. グレースフルシャットダウンの複雑性

すべてのメッセージが処理されてからシャットダウンするにはどうすればよいか?全プロデューサーに停止を通知するには?これには慎重なシャットダウン設計が必要です。

5. マルチチャネル調整の複雑さ

実際のサービスは複数のチャネルタイプを同時に使用することが多く、select! でエレガントに組み合わせるには、コードがスパゲッティにならないよう工夫が必要です。


6つのプロダクション級チャネルパターン

パターン1:mpscチャネル——プロデューサ・コンシューマパターン

最も古典的なチャネルパターン:複数のプロデューサーが1つのコンシューマにメッセージを送信。タスクキューとイベントストリーム処理に最適。

use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel::<String>(100);

    for i in 0..3 {
        let tx = tx.clone();
        tokio::spawn(async move {
            for j in 0..5 {
                let msg = format!("producer-{} msg-{}", i, j);
                if tx.send(msg).await.is_err() {
                    println!("receiver dropped");
                    return;
                }
            }
        });
    }

    drop(tx);

    while let Some(msg) = rx.recv().await {
        println!("received: {}", msg);
    }

    println!("all messages processed");
}

重要ポイント

  • tx.clone() で新しいプロデューサーハンドルを作成、元の tx をドロップしてもチャネルは閉じない
  • drop(tx) でクローンされた全プロデューサーを閉じる、コンシューマは残りのメッセージを受信後に終了
  • バインドチャネル容量100が自然なバックプレッシャーを提供

パターン2:oneshotチャネル——リクエスト・レスポンスパターン

一回限りのチャネル、単一の値を送信後にチャネルが閉じる——RPCスタイルのリクエスト・レスポンスに最適。

use tokio::sync::{mpsc, oneshot};
use std::collections::HashMap;

#[derive(Debug)]
enum Command {
    Get {
        key: String,
        responder: oneshot::Sender<Option<String>>,
    },
    Set {
        key: String,
        value: String,
        responder: oneshot::Sender<()>,
    },
}

async fn kv_server(mut rx: mpsc::Receiver<Command>) {
    let mut store: HashMap<String, String> = HashMap::new();
    while let Some(cmd) = rx.recv().await {
        match cmd {
            Command::Get { key, responder } => {
                let val = store.get(&key).cloned();
                let _ = responder.send(val);
            }
            Command::Set { key, value, responder } => {
                store.insert(key, value);
                let _ = responder.send(());
            }
        }
    }
}

#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::channel::<Command>(32);
    tokio::spawn(kv_server(rx));

    let (resp_tx, resp_rx) = oneshot::channel();
    tx.send(Command::Set {
        key: "hello".into(),
        value: "world".into(),
        responder: resp_tx,
    }).await.unwrap();
    resp_rx.await.unwrap();

    let (resp_tx, resp_rx) = oneshot::channel();
    tx.send(Command::Get {
        key: "hello".into(),
        responder: resp_tx,
    }).await.unwrap();
    let result = resp_rx.await.unwrap();
    println!("got: {:?}", result);
}

重要ポイント

  • mpsc + oneshot の組み合わせでリクエスト・レスポンスを実現:mpsc でコマンドを送信、oneshot で結果を返す
  • サーバーは単一のオーナー、データ競合を自然に回避
  • responder をCommand列挙型に埋め込み、型安全性を確保

パターン3:broadcastチャネル——ファンアウトパターン

1つのメッセージを全レシーバーに配信。イベント通知、ログ配信など1対多のシナリオに最適。

use tokio::sync::broadcast;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let (tx, _) = broadcast::channel::<String>(16);

    for i in 0..3 {
        let mut rx = tx.subscribe();
        tokio::spawn(async move {
            loop {
                match rx.recv().await {
                    Ok(msg) => println!("subscriber-{}: {}", i, msg),
                    Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
                        println!("subscriber-{} lagged {} messages", i, n);
                    }
                    Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
                }
            }
        });
    }

    for i in 0..5 {
        tx.send(format!("event-{}", i)).unwrap();
    }

    drop(tx);
    sleep(Duration::from_millis(100)).await;
}

重要ポイント

  • tx.subscribe() で独立したレシーバーを作成、各レシーバーが全メッセージを受信
  • 遅いコンシューマは Lagged エラーを引き起こす——これはバックプレッシャーシグナルであり、致命的エラーではない
  • チャネル容量はレシーバーごとのバッファサイズであり、総容量ではない

パターン4:watchチャネル——状態ブロードキャストパターン

最新の値のみを保持、コンシューマは常に最新の状態を読み取る。設定のホットリロード、進捗同期などのシナリオに最適。

use tokio::sync::watch;
use std::time::Duration;

#[derive(Debug, Clone)]
struct Config {
    max_connections: usize,
    timeout_ms: u64,
}

#[tokio::main]
async fn main() {
    let (tx, rx) = watch::channel(Config {
        max_connections: 100,
        timeout_ms: 5000,
    });

    for i in 0..2 {
        let mut rx = rx.clone();
        tokio::spawn(async move {
            loop {
                if rx.changed().await.is_err() {
                    break;
                }
                let config = rx.borrow();
                println!("worker-{} config updated: {:?}", i, *config);
            }
        });
    }

    tx.send(Config {
        max_connections: 200,
        timeout_ms: 3000,
    }).unwrap();

    tokio::time::sleep(Duration::from_millis(50)).await;

    tx.send(Config {
        max_connections: 50,
        timeout_ms: 10000,
    }).unwrap();

    tokio::time::sleep(Duration::from_millis(50)).await;
}

重要ポイント

  • watch は最新の値のみを保持、中間値は上書きされる——これは機能であり、欠陥ではない
  • rx.changed().await は値が変化するまでブロック、ポーリングを回避
  • rx.borrow() は現在の値の参照を取得、await 不要

パターン5:バインドチャネルとバックプレッシャー処理

プロダクションシステムにはバックプレッシャーが必須。バインドチャネル + send の await が自然にバックプレッシャーを実装。

use tokio::sync::mpsc;
use tokio::time::{sleep, Duration, Instant};

async fn fast_producer(tx: mpsc::Sender<u64>) {
    for i in 0..1000 {
        match tx.send(i).await {
            Ok(()) => {}
            Err(_) => {
                println!("producer: receiver dropped at {}", i);
                return;
            }
        }
        if i % 100 == 0 {
            println!("producer sent: {}", i);
        }
    }
}

async fn slow_consumer(mut rx: mpsc::Receiver<u64>) {
    while let Some(val) = rx.recv().await {
        sleep(Duration::from_millis(10)).await;
        if val % 100 == 0 {
            println!("consumer processed: {}", val);
        }
    }
}

#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::channel::<u64>(32);

    let start = Instant::now();
    let producer = tokio::spawn(fast_producer(tx));
    let consumer = tokio::spawn(slow_consumer(rx));

    producer.await.unwrap();
    consumer.await.unwrap();
    println!("total time: {:?}", start.elapsed());
}

重要ポイント

  • バインドチャネル容量32、チャネルが満杯の時 send().await が自動的にプロデューサーをブロック
  • 手動バックプレッシャーは不要——send の await がバックプレッシャーそのもの
  • コンシューマが遅いとプロデューサーが自動的に減速、メモリ使用量は常に制御可能

3つのバックプレッシャー戦略の比較

戦略 実装方法 適用シナリオ
待機 send().await ほとんどのシナリオ、メッセージ損失なしを保証
トライ送信 try_send() メッセージ損失を許容できるリアルタイムシステム
タイムアウト送信 timeout(send()).await レイテンシと信頼性のバランスが必要な場合
use tokio::sync::mpsc;
use tokio::time::{timeout, Duration};

async fn backpressure_with_timeout(tx: mpsc::Sender<u64>, val: u64) -> bool {
    match timeout(Duration::from_millis(100), tx.send(val)).await {
        Ok(Ok(())) => true,
        Ok(Err(_)) => false,
        Err(_) => {
            println!("send timeout, dropping {}", val);
            false
        }
    }
}

パターン6:プロダクション級非同期サービス——チャネルオーケストレーション

実際のサービスは複数のチャネルタイプを組み合わせる必要があり、select! でマルチソースイベントをエレガントに処理。

use tokio::sync::{mpsc, oneshot, broadcast, watch};
use tokio::select;
use std::collections::HashMap;

#[derive(Debug)]
enum Request {
    Query {
        key: String,
        responder: oneshot::Sender<Option<String>>,
    },
    Update {
        key: String,
        value: String,
        responder: oneshot::Sender<bool>,
    },
}

#[derive(Debug, Clone)]
struct AppEvent {
    kind: String,
    detail: String,
}

#[derive(Debug, Clone)]
struct AppState {
    version: u64,
    maintenance: bool,
}

struct Service {
    request_rx: mpsc::Receiver<Request>,
    event_tx: broadcast::Sender<AppEvent>,
    state_rx: watch::Receiver<AppState>,
    store: HashMap<String, String>,
}

impl Service {
    async fn run(&mut self) {
        loop {
            select! {
                Some(req) = self.request_rx.recv() => {
                    self.handle_request(req);
                }
                Ok(event) = self.event_tx.subscribe().recv() => {
                    println!("event received: {:?}", event);
                }
                Ok(_) = self.state_rx.changed() => {
                    let state = self.state_rx.borrow();
                    println!("state changed: {:?}", *state);
                }
                else => {
                    println!("all channels closed, shutting down");
                    break;
                }
            }
        }
    }

    fn handle_request(&mut self, req: Request) {
        match req {
            Request::Query { key, responder } => {
                let val = self.store.get(&key).cloned();
                let _ = responder.send(val);
            }
            Request::Update { key, value, responder } => {
                self.store.insert(key.clone(), value);
                let _ = responder.send(true);
            }
        }
    }
}

#[tokio::main]
async fn main() {
    let (req_tx, req_rx) = mpsc::channel::<Request>(64);
    let (event_tx, _) = broadcast::channel::<AppEvent>(32);
    let (state_tx, state_rx) = watch::channel(AppState {
        version: 1,
        maintenance: false,
    });

    let mut service = Service {
        request_rx: req_rx,
        event_tx: event_tx.clone(),
        state_rx,
        store: HashMap::new(),
    };

    tokio::spawn(async move { service.run().await });

    let (resp_tx, resp_rx) = oneshot::channel();
    req_tx.send(Request::Update {
        key: "server".into(),
        value: "tokio-1.40".into(),
        responder: resp_tx,
    }).await.unwrap();
    resp_rx.await.unwrap();

    let (resp_tx, resp_rx) = oneshot::channel();
    req_tx.send(Request::Query {
        key: "server".into(),
        responder: resp_tx,
    }).await.unwrap();
    let result = resp_rx.await.unwrap();
    println!("query result: {:?}", result);

    event_tx.send(AppEvent {
        kind: "deploy".into(),
        detail: "v2.0 released".into(),
    }).unwrap();

    state_tx.send(AppState {
        version: 2,
        maintenance: true,
    }).unwrap();

    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}

重要ポイント

  • select! は複数のチャネルを同時に監視、準備できたものから処理
  • mpsc でリクエストを処理、broadcast でイベントを受信、watch で状態変化を感知
  • else ブランチで全チャネルが閉じた場合を処理、グレースフルシャットダウンを実現
  • 各チャネルタイプが役割を分担:リクエストは mpsc、通知は broadcast、状態は watch

5つのよくある落とし穴

落とし穴1:cloneせずにSenderをドロップ

// ❌ 間違い:txがmoveされた後、使用できない
async fn bad_pattern() {
    let (tx, mut rx) = mpsc::channel::<i32>(16);
    tokio::spawn(async move {
        tx.send(42).await.unwrap();
    });
    // txは既にmove済み、cloneや使用ができない
}

// ✅ 正しい:moveする前にclone
async fn good_pattern() {
    let (tx, mut rx) = mpsc::channel::<i32>(16);
    let tx_clone = tx.clone();
    tokio::spawn(async move {
        tx_clone.send(42).await.unwrap();
    });
    // 元のtxはまだ使用可能
    tx.send(99).await.unwrap();
    drop(tx);
    while let Some(v) = rx.recv().await {
        println!("got: {}", v);
    }
}

落とし穴2:broadcastレシーバーでLaggedを処理しない

// ❌ 間違い:Laggedエラーを無視、メッセージが黙って失われる可能性
async fn bad_broadcast(mut rx: broadcast::Receiver<String>) {
    loop {
        let msg = rx.recv().await.unwrap();
        println!("{}", msg);
    }
}

// ✅ 正しい:Laggedを処理し、ログに出力して継続
async fn good_broadcast(mut rx: broadcast::Receiver<String>) {
    loop {
        match rx.recv().await {
            Ok(msg) => println!("{}", msg),
            Err(broadcast::error::RecvError::Lagged(n)) => {
                eprintln!("warning: lagged {} messages, continuing", n);
                continue;
            }
            Err(broadcast::error::RecvError::Closed) => break,
        }
    }
}

落とし穴3:watchチャネルをメッセージキューと勘違いする

// ❌ 間違い:watchは最新の値のみ保持、中間値は失われる
async fn bad_watch_usage() {
    let (tx, mut rx) = watch::channel(0);
    for i in 1..=100 {
        tx.send(i).unwrap();
    }
    // 100しか読めない、1-99はすべて失われる
    rx.changed().await.unwrap();
    assert_eq!(*rx.borrow(), 100);
}

// ✅ 正しい:全メッセージを保持する必要がある場合はmpscを使用
async fn correct_channel_choice() {
    let (tx, mut rx) = mpsc::channel::<i32>(256);
    for i in 1..=100 {
        tx.send(i).await.unwrap();
    }
    drop(tx);
    let mut count = 0;
    while let Some(v) = rx.recv().await {
        count += 1;
    }
    assert_eq!(count, 100);
}

落とし穴4:非同期コードでMutexロックをawaitポイント越えで保持

// ❌ 間違い:MutexGuardがawaitをまたぐ、デッドロックの可能性
async fn bad_mutex() {
    let data = Arc::new(tokio::sync::Mutex::new(vec![]));
    let data_clone = data.clone();
    tokio::spawn(async move {
        let mut guard = data_clone.lock().await;
        guard.push(1);
        some_async_work().await; // ロックを保持したままawait!
        guard.push(2);
    });
}

// ✅ 正しい:チャネルで共有状態を置き換えるか、ロックの保持時間を短縮
async fn good_channel_approach() {
    let (tx, mut rx) = mpsc::channel::<i32>(32);
    tokio::spawn(async move {
        while let Some(val) = rx.recv().await {
            println!("processed: {}", val);
        }
    });
    tx.send(1).await.unwrap();
    some_async_work().await;
    tx.send(2).await.unwrap();
}

async fn some_async_work() {
    tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}

落とし穴5:select!のブランチ順序によるスターベーション

// ❌ 間違い:高優先度チャネルが低優先度チャネルに餓死される可能性
async fn bad_select(mut rx1: mpsc::Receiver<i32>, mut rx2: mpsc::Receiver<i32>) {
    loop {
        select! {
            Some(v) = rx1.recv() => println!("rx1: {}", v),
            Some(v) = rx2.recv() => println!("rx2: {}", v),
            else => break,
        }
    }
}

// ✅ 正しい:重要なチャネルにbiasedを使用し、優先処理を確保
async fn good_select(mut rx1: mpsc::Receiver<i32>, mut rx2: mpsc::Receiver<i32>) {
    loop {
        select! {
            biased;
            Some(v) = rx1.recv() => println!("rx1: {}", v),
            Some(v) = rx2.recv() => println!("rx2: {}", v),
            else => break,
        }
    }
}

エラートラブルシューティング早見表

エラー現象 考えられる原因 解決策
sendErr(SendError) を返す レシーバーが既に閉じている レシーバーが早期にドロップされていないか確認、シャットダウン通知機構を追加
recvNone を返す 全センダーが閉じている プロデューサーが正常に完了したか、予期せず終了していないか確認
RecvError::Lagged(n) コンシューマの処理が遅すぎる チャネル容量を増やすか、コンシューマのパフォーマンスを最適化
コンパイルエラー use of moved value Senderがmove後に使用された spawnの前に tx.clone() を呼び出す
デッドロック:タスクが永遠にブロック select! のブランチが永遠にレディにならない タイムアウトブランチを追加するか、全チャネルにメッセージソースがあることを確認
メモリが増加し続ける 非バインドチャネルを使用している バインドチャネル mpsc::channel(cap) に切り替え
oneshot RecvError レスポンダーが送信せずに終了 サーバーロジックを確認、全ブランチで responder.send() を呼び出すことを確認
watch changed() が発火しない 値が実際に変更されていない watch は値の変更時のみ通知、同じ値ではトリガーされない
broadcast メッセージが消失 レシーバーが送信後にサブスクライブした 送信前にサブスクライブするか、リプレイ機構を使用
Mutex がawait越えでデッドロック ロック保持中に .await を呼び出した ロックスコープを縮小するか、チャネルベースの通信に切り替え

高度な最適化テクニック

1. チャネル容量プランニング

チャネル容量は「大きければよい」ではなく、生産・消費レート差に基づいて計画する必要があります:

use tokio::sync::mpsc;

fn calculate_channel_capacity(
    produce_rate: u64,
    consume_rate: u64,
    burst_duration_ms: u64,
) -> usize {
    if consume_rate >= produce_rate {
        return 64;
    }
    let backlog = (produce_rate - consume_rate) * burst_duration_ms / 1000;
    (backlog as usize).next_power_of_two().max(64)
}

#[tokio::main]
async fn main() {
    let cap = calculate_channel_capacity(10000, 8000, 2000);
    let (tx, rx) = mpsc::channel::<String>(cap);
    println!("channel capacity: {}", cap);
}

経験則

  • 低頻度シナリオ:32-64
  • 中頻度シナリオ:128-256
  • 高頻度バースト:512-2048
  • 常に2の累乗を使用

2. グレースフルシャットダウンフロー

プロダクションサービスには順序あるシャットダウンが必要:プロデューサーに停止を通知 → 残りのメッセージを処理 → コンシューマを終了

use tokio::sync::{mpsc, watch};
use tokio::time::{timeout, Duration};

struct ShutdownSignal;

async fn graceful_shutdown_example() {
    let (tx, mut rx) = mpsc::channel::<String>(64);
    let (shutdown_tx, shutdown_rx) = watch::channel(false);

    for i in 0..3 {
        let tx = tx.clone();
        let mut shutdown = shutdown_rx.clone();
        tokio::spawn(async move {
            loop {
                select! {
                    _ = shutdown.changed() => {
                        println!("producer-{} shutting down", i);
                        return;
                    }
                    _ = tokio::time::sleep(Duration::from_millis(100)) => {
                        if tx.send(format!("msg-from-{}", i)).await.is_err() {
                            return;
                        }
                    }
                }
            }
        });
    }

    drop(tx);

    let mut shutdown_consumer = shutdown_rx.clone();
    tokio::spawn(async move {
        tokio::signal::ctrl_c().await.ok();
        shutdown_tx.send(true).ok();
    });

    loop {
        select! {
            Some(msg) = rx.recv() => {
                println!("processing: {}", msg);
            }
            _ = shutdown_consumer.changed() => {
                println!("draining remaining messages...");
                while let Ok(Some(msg)) = timeout(Duration::from_secs(1), rx.recv()).await {
                    println!("drained: {}", msg);
                }
                break;
            }
        }
    }
}

3. パフォーマンスモニタリングとメトリクス収集

use tokio::sync::mpsc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;

struct ChannelMetrics {
    sent: AtomicU64,
    received: AtomicU64,
    dropped: AtomicU64,
}

impl ChannelMetrics {
    fn new() -> Self {
        Self {
            sent: AtomicU64::new(0),
            received: AtomicU64::new(0),
            dropped: AtomicU64::new(0),
        }
    }

    fn record_send(&self) {
        self.sent.fetch_add(1, Ordering::Relaxed);
    }

    fn record_recv(&self) {
        self.received.fetch_add(1, Ordering::Relaxed);
    }

    fn record_drop(&self) {
        self.dropped.fetch_add(1, Ordering::Relaxed);
    }

    fn report(&self) {
        let sent = self.sent.load(Ordering::Relaxed);
        let received = self.received.load(Ordering::Relaxed);
        let dropped = self.dropped.load(Ordering::Relaxed);
        println!(
            "metrics - sent: {}, received: {}, dropped: {}",
            sent, received, dropped
        );
    }
}

async fn monitored_producer(
    tx: mpsc::Sender<String>,
    metrics: Arc<ChannelMetrics>,
) {
    for i in 0..1000 {
        match tx.send(format!("msg-{}", i)).await {
            Ok(()) => metrics.record_send(),
            Err(_) => {
                metrics.record_drop();
                return;
            }
        }
    }
}

async fn monitored_consumer(
    mut rx: mpsc::Receiver<String>,
    metrics: Arc<ChannelMetrics>,
) {
    while let Some(_) = rx.recv().await {
        metrics.record_recv();
    }
}

チャネルタイプ比較

特徴 mpsc broadcast watch oneshot crossbeam flume
プロデューサー 複数 複数 単一 単一 複数 複数
コンシューマ 単一 複数 複数 単一 複数 複数
メッセージ保持 FIFOキュー コンシューマごとのキュー 最新値のみ 単一値 FIFOキュー FIFOキュー
バックプレッシャー バインド/アンバインド Laggedエラー 古い値を上書き なし バインド/アンバインド バインド/アンバインド
非同期サポート ネイティブ ネイティブ ネイティブ ネイティブ 同期のみ 非同期+同期
パフォーマンス 高い 中程度 非常に高い 非常に高い 非常に高い 高い
典型的な用途 タスクキュー イベントブロードキャスト 状態同期 RPCレスポンス 同期通信 混合シナリオ
ランタイム依存 tokio tokio tokio tokio なし なし

まとめ

Rust Tokio Channelの中核哲学:メモリを共有して通信するのではなく、通信してメモリを共有する。 正しいチャネルタイプを選ぶことは、間違ったチャネルタイプを最適化することより重要です——mpsc はタスクディスパッチに、oneshot はリクエスト・レスポンスに、broadcast はイベントファンアウトに、watch は状態同期に。プロダクション環境では、常にバインドチャネルでバックプレッシャーを実現し、select! でマルチチャネルを編成し、watch でシャットダウンシグナルを伝播すれば、非同期アーキテクチャは効率的かつ信頼性の高いものになります。


おすすめツール

ブラウザローカルツールを無料で試す →

#Rust#Tokio#Channel#异步编程#mpsc#2026#并发