Rust Tokio Channelパターン実践:mpscからBroadcastまで6つのプロダクションパターン
非同期Rustコードが「喧嘩」し始めたら
こんな経験はありませんか——複数の非同期タスクが状態を共有する必要があり、Arc<Mutex<T>> を追加したら、ロック競合でパフォーマンスが急低下。RwLock に切り替えたら、書き込みロックの飢餓でデッドロックが発生。そして気づくのです——非同期の世界では、メッセージパッシングこそが正解だと。
Tokioのチャネルシステムはまさにこの問題のために作られました:異なる通信パターンが異なる並行シナリオに対応します。正しいチャネルを選べば、非同期アーキテクチャは「互いに待ち合う」状態から「効率的に協調する」状態へと変わります。
コア概念一覧
| 概念 | 説明 | 典型的なユースケース |
|---|---|---|
mpsc |
マルチプロデューサ・シングルコンシューマチャネル | タスクキュー、イベントストリーム |
oneshot |
一回限りの単一値チャネル | リクエスト・レスポンス、Future通知 |
broadcast |
マルチコンシューマブロードキャストチャネル | イベントファンアウト、ログ配信 |
watch |
単一値状態監視チャネル | 設定ホットリロード、状態同期 |
| Backpressure | バックプレッシャー機構 | フロー制御、OOM防止 |
select! |
マルチチャネル多重化マクロ | マルチソースイベント処理 |
Stream |
非同期イテレータ | チャネル合成、ストリーム処理 |
非同期通信の5つの課題
1. 共有可変状態のロック地獄
Arc<Mutex<T>> は同期コードでは機能しますが、非同期ランタイムでは長時間ロックを保持するとスレッド全体がブロックされ、他のタスクがスケジューリングされなくなります。
2. タスク間通信パターンのミスマッチ
mpsc でブロードキャストを、broadcast でリクエスト・レスポンスを実装する——間違ったチャネルタイプの選択は、パフォーマンスが悪いだけでなく、論理バグを引き起こす可能性があります。
3. バックプレッシャー欠如によるメモリ爆発
非バインドチャネルは便利に見えますが、生産速度が消費速度を大幅に上回ると、メモリはOOMに至るまで増え続けます。
4. グレースフルシャットダウンの複雑性
すべてのメッセージが処理されてからシャットダウンするにはどうすればよいか?全プロデューサーに停止を通知するには?これには慎重なシャットダウン設計が必要です。
5. マルチチャネル調整の複雑さ
実際のサービスは複数のチャネルタイプを同時に使用することが多く、select! でエレガントに組み合わせるには、コードがスパゲッティにならないよう工夫が必要です。
6つのプロダクション級チャネルパターン
パターン1:mpscチャネル——プロデューサ・コンシューマパターン
最も古典的なチャネルパターン:複数のプロデューサーが1つのコンシューマにメッセージを送信。タスクキューとイベントストリーム処理に最適。
use tokio::sync::mpsc;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel::<String>(100);
for i in 0..3 {
let tx = tx.clone();
tokio::spawn(async move {
for j in 0..5 {
let msg = format!("producer-{} msg-{}", i, j);
if tx.send(msg).await.is_err() {
println!("receiver dropped");
return;
}
}
});
}
drop(tx);
while let Some(msg) = rx.recv().await {
println!("received: {}", msg);
}
println!("all messages processed");
}
重要ポイント:
tx.clone()で新しいプロデューサーハンドルを作成、元のtxをドロップしてもチャネルは閉じないdrop(tx)でクローンされた全プロデューサーを閉じる、コンシューマは残りのメッセージを受信後に終了- バインドチャネル容量100が自然なバックプレッシャーを提供
パターン2:oneshotチャネル——リクエスト・レスポンスパターン
一回限りのチャネル、単一の値を送信後にチャネルが閉じる——RPCスタイルのリクエスト・レスポンスに最適。
use tokio::sync::{mpsc, oneshot};
use std::collections::HashMap;
#[derive(Debug)]
enum Command {
Get {
key: String,
responder: oneshot::Sender<Option<String>>,
},
Set {
key: String,
value: String,
responder: oneshot::Sender<()>,
},
}
async fn kv_server(mut rx: mpsc::Receiver<Command>) {
let mut store: HashMap<String, String> = HashMap::new();
while let Some(cmd) = rx.recv().await {
match cmd {
Command::Get { key, responder } => {
let val = store.get(&key).cloned();
let _ = responder.send(val);
}
Command::Set { key, value, responder } => {
store.insert(key, value);
let _ = responder.send(());
}
}
}
}
#[tokio::main]
async fn main() {
let (tx, rx) = mpsc::channel::<Command>(32);
tokio::spawn(kv_server(rx));
let (resp_tx, resp_rx) = oneshot::channel();
tx.send(Command::Set {
key: "hello".into(),
value: "world".into(),
responder: resp_tx,
}).await.unwrap();
resp_rx.await.unwrap();
let (resp_tx, resp_rx) = oneshot::channel();
tx.send(Command::Get {
key: "hello".into(),
responder: resp_tx,
}).await.unwrap();
let result = resp_rx.await.unwrap();
println!("got: {:?}", result);
}
重要ポイント:
mpsc+oneshotの組み合わせでリクエスト・レスポンスを実現:mpscでコマンドを送信、oneshotで結果を返す- サーバーは単一のオーナー、データ競合を自然に回避
responderをCommand列挙型に埋め込み、型安全性を確保
パターン3:broadcastチャネル——ファンアウトパターン
1つのメッセージを全レシーバーに配信。イベント通知、ログ配信など1対多のシナリオに最適。
use tokio::sync::broadcast;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
let (tx, _) = broadcast::channel::<String>(16);
for i in 0..3 {
let mut rx = tx.subscribe();
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(msg) => println!("subscriber-{}: {}", i, msg),
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
println!("subscriber-{} lagged {} messages", i, n);
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
});
}
for i in 0..5 {
tx.send(format!("event-{}", i)).unwrap();
}
drop(tx);
sleep(Duration::from_millis(100)).await;
}
重要ポイント:
tx.subscribe()で独立したレシーバーを作成、各レシーバーが全メッセージを受信- 遅いコンシューマは
Laggedエラーを引き起こす——これはバックプレッシャーシグナルであり、致命的エラーではない - チャネル容量はレシーバーごとのバッファサイズであり、総容量ではない
パターン4:watchチャネル——状態ブロードキャストパターン
最新の値のみを保持、コンシューマは常に最新の状態を読み取る。設定のホットリロード、進捗同期などのシナリオに最適。
use tokio::sync::watch;
use std::time::Duration;
#[derive(Debug, Clone)]
struct Config {
max_connections: usize,
timeout_ms: u64,
}
#[tokio::main]
async fn main() {
let (tx, rx) = watch::channel(Config {
max_connections: 100,
timeout_ms: 5000,
});
for i in 0..2 {
let mut rx = rx.clone();
tokio::spawn(async move {
loop {
if rx.changed().await.is_err() {
break;
}
let config = rx.borrow();
println!("worker-{} config updated: {:?}", i, *config);
}
});
}
tx.send(Config {
max_connections: 200,
timeout_ms: 3000,
}).unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
tx.send(Config {
max_connections: 50,
timeout_ms: 10000,
}).unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
}
重要ポイント:
watchは最新の値のみを保持、中間値は上書きされる——これは機能であり、欠陥ではないrx.changed().awaitは値が変化するまでブロック、ポーリングを回避rx.borrow()は現在の値の参照を取得、await不要
パターン5:バインドチャネルとバックプレッシャー処理
プロダクションシステムにはバックプレッシャーが必須。バインドチャネル + send の await が自然にバックプレッシャーを実装。
use tokio::sync::mpsc;
use tokio::time::{sleep, Duration, Instant};
async fn fast_producer(tx: mpsc::Sender<u64>) {
for i in 0..1000 {
match tx.send(i).await {
Ok(()) => {}
Err(_) => {
println!("producer: receiver dropped at {}", i);
return;
}
}
if i % 100 == 0 {
println!("producer sent: {}", i);
}
}
}
async fn slow_consumer(mut rx: mpsc::Receiver<u64>) {
while let Some(val) = rx.recv().await {
sleep(Duration::from_millis(10)).await;
if val % 100 == 0 {
println!("consumer processed: {}", val);
}
}
}
#[tokio::main]
async fn main() {
let (tx, rx) = mpsc::channel::<u64>(32);
let start = Instant::now();
let producer = tokio::spawn(fast_producer(tx));
let consumer = tokio::spawn(slow_consumer(rx));
producer.await.unwrap();
consumer.await.unwrap();
println!("total time: {:?}", start.elapsed());
}
重要ポイント:
- バインドチャネル容量32、チャネルが満杯の時
send().awaitが自動的にプロデューサーをブロック - 手動バックプレッシャーは不要——
sendの await がバックプレッシャーそのもの - コンシューマが遅いとプロデューサーが自動的に減速、メモリ使用量は常に制御可能
3つのバックプレッシャー戦略の比較:
| 戦略 | 実装方法 | 適用シナリオ |
|---|---|---|
| 待機 | send().await |
ほとんどのシナリオ、メッセージ損失なしを保証 |
| トライ送信 | try_send() |
メッセージ損失を許容できるリアルタイムシステム |
| タイムアウト送信 | timeout(send()).await |
レイテンシと信頼性のバランスが必要な場合 |
use tokio::sync::mpsc;
use tokio::time::{timeout, Duration};
async fn backpressure_with_timeout(tx: mpsc::Sender<u64>, val: u64) -> bool {
match timeout(Duration::from_millis(100), tx.send(val)).await {
Ok(Ok(())) => true,
Ok(Err(_)) => false,
Err(_) => {
println!("send timeout, dropping {}", val);
false
}
}
}
パターン6:プロダクション級非同期サービス——チャネルオーケストレーション
実際のサービスは複数のチャネルタイプを組み合わせる必要があり、select! でマルチソースイベントをエレガントに処理。
use tokio::sync::{mpsc, oneshot, broadcast, watch};
use tokio::select;
use std::collections::HashMap;
#[derive(Debug)]
enum Request {
Query {
key: String,
responder: oneshot::Sender<Option<String>>,
},
Update {
key: String,
value: String,
responder: oneshot::Sender<bool>,
},
}
#[derive(Debug, Clone)]
struct AppEvent {
kind: String,
detail: String,
}
#[derive(Debug, Clone)]
struct AppState {
version: u64,
maintenance: bool,
}
struct Service {
request_rx: mpsc::Receiver<Request>,
event_tx: broadcast::Sender<AppEvent>,
state_rx: watch::Receiver<AppState>,
store: HashMap<String, String>,
}
impl Service {
async fn run(&mut self) {
loop {
select! {
Some(req) = self.request_rx.recv() => {
self.handle_request(req);
}
Ok(event) = self.event_tx.subscribe().recv() => {
println!("event received: {:?}", event);
}
Ok(_) = self.state_rx.changed() => {
let state = self.state_rx.borrow();
println!("state changed: {:?}", *state);
}
else => {
println!("all channels closed, shutting down");
break;
}
}
}
}
fn handle_request(&mut self, req: Request) {
match req {
Request::Query { key, responder } => {
let val = self.store.get(&key).cloned();
let _ = responder.send(val);
}
Request::Update { key, value, responder } => {
self.store.insert(key.clone(), value);
let _ = responder.send(true);
}
}
}
}
#[tokio::main]
async fn main() {
let (req_tx, req_rx) = mpsc::channel::<Request>(64);
let (event_tx, _) = broadcast::channel::<AppEvent>(32);
let (state_tx, state_rx) = watch::channel(AppState {
version: 1,
maintenance: false,
});
let mut service = Service {
request_rx: req_rx,
event_tx: event_tx.clone(),
state_rx,
store: HashMap::new(),
};
tokio::spawn(async move { service.run().await });
let (resp_tx, resp_rx) = oneshot::channel();
req_tx.send(Request::Update {
key: "server".into(),
value: "tokio-1.40".into(),
responder: resp_tx,
}).await.unwrap();
resp_rx.await.unwrap();
let (resp_tx, resp_rx) = oneshot::channel();
req_tx.send(Request::Query {
key: "server".into(),
responder: resp_tx,
}).await.unwrap();
let result = resp_rx.await.unwrap();
println!("query result: {:?}", result);
event_tx.send(AppEvent {
kind: "deploy".into(),
detail: "v2.0 released".into(),
}).unwrap();
state_tx.send(AppState {
version: 2,
maintenance: true,
}).unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
重要ポイント:
select!は複数のチャネルを同時に監視、準備できたものから処理mpscでリクエストを処理、broadcastでイベントを受信、watchで状態変化を感知elseブランチで全チャネルが閉じた場合を処理、グレースフルシャットダウンを実現- 各チャネルタイプが役割を分担:リクエストは
mpsc、通知はbroadcast、状態はwatch
5つのよくある落とし穴
落とし穴1:cloneせずにSenderをドロップ
// ❌ 間違い:txがmoveされた後、使用できない
async fn bad_pattern() {
let (tx, mut rx) = mpsc::channel::<i32>(16);
tokio::spawn(async move {
tx.send(42).await.unwrap();
});
// txは既にmove済み、cloneや使用ができない
}
// ✅ 正しい:moveする前にclone
async fn good_pattern() {
let (tx, mut rx) = mpsc::channel::<i32>(16);
let tx_clone = tx.clone();
tokio::spawn(async move {
tx_clone.send(42).await.unwrap();
});
// 元のtxはまだ使用可能
tx.send(99).await.unwrap();
drop(tx);
while let Some(v) = rx.recv().await {
println!("got: {}", v);
}
}
落とし穴2:broadcastレシーバーでLaggedを処理しない
// ❌ 間違い:Laggedエラーを無視、メッセージが黙って失われる可能性
async fn bad_broadcast(mut rx: broadcast::Receiver<String>) {
loop {
let msg = rx.recv().await.unwrap();
println!("{}", msg);
}
}
// ✅ 正しい:Laggedを処理し、ログに出力して継続
async fn good_broadcast(mut rx: broadcast::Receiver<String>) {
loop {
match rx.recv().await {
Ok(msg) => println!("{}", msg),
Err(broadcast::error::RecvError::Lagged(n)) => {
eprintln!("warning: lagged {} messages, continuing", n);
continue;
}
Err(broadcast::error::RecvError::Closed) => break,
}
}
}
落とし穴3:watchチャネルをメッセージキューと勘違いする
// ❌ 間違い:watchは最新の値のみ保持、中間値は失われる
async fn bad_watch_usage() {
let (tx, mut rx) = watch::channel(0);
for i in 1..=100 {
tx.send(i).unwrap();
}
// 100しか読めない、1-99はすべて失われる
rx.changed().await.unwrap();
assert_eq!(*rx.borrow(), 100);
}
// ✅ 正しい:全メッセージを保持する必要がある場合はmpscを使用
async fn correct_channel_choice() {
let (tx, mut rx) = mpsc::channel::<i32>(256);
for i in 1..=100 {
tx.send(i).await.unwrap();
}
drop(tx);
let mut count = 0;
while let Some(v) = rx.recv().await {
count += 1;
}
assert_eq!(count, 100);
}
落とし穴4:非同期コードでMutexロックをawaitポイント越えで保持
// ❌ 間違い:MutexGuardがawaitをまたぐ、デッドロックの可能性
async fn bad_mutex() {
let data = Arc::new(tokio::sync::Mutex::new(vec![]));
let data_clone = data.clone();
tokio::spawn(async move {
let mut guard = data_clone.lock().await;
guard.push(1);
some_async_work().await; // ロックを保持したままawait!
guard.push(2);
});
}
// ✅ 正しい:チャネルで共有状態を置き換えるか、ロックの保持時間を短縮
async fn good_channel_approach() {
let (tx, mut rx) = mpsc::channel::<i32>(32);
tokio::spawn(async move {
while let Some(val) = rx.recv().await {
println!("processed: {}", val);
}
});
tx.send(1).await.unwrap();
some_async_work().await;
tx.send(2).await.unwrap();
}
async fn some_async_work() {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
落とし穴5:select!のブランチ順序によるスターベーション
// ❌ 間違い:高優先度チャネルが低優先度チャネルに餓死される可能性
async fn bad_select(mut rx1: mpsc::Receiver<i32>, mut rx2: mpsc::Receiver<i32>) {
loop {
select! {
Some(v) = rx1.recv() => println!("rx1: {}", v),
Some(v) = rx2.recv() => println!("rx2: {}", v),
else => break,
}
}
}
// ✅ 正しい:重要なチャネルにbiasedを使用し、優先処理を確保
async fn good_select(mut rx1: mpsc::Receiver<i32>, mut rx2: mpsc::Receiver<i32>) {
loop {
select! {
biased;
Some(v) = rx1.recv() => println!("rx1: {}", v),
Some(v) = rx2.recv() => println!("rx2: {}", v),
else => break,
}
}
}
エラートラブルシューティング早見表
| エラー現象 | 考えられる原因 | 解決策 |
|---|---|---|
send が Err(SendError) を返す |
レシーバーが既に閉じている | レシーバーが早期にドロップされていないか確認、シャットダウン通知機構を追加 |
recv が None を返す |
全センダーが閉じている | プロデューサーが正常に完了したか、予期せず終了していないか確認 |
RecvError::Lagged(n) |
コンシューマの処理が遅すぎる | チャネル容量を増やすか、コンシューマのパフォーマンスを最適化 |
コンパイルエラー use of moved value |
Senderがmove後に使用された | spawnの前に tx.clone() を呼び出す |
| デッドロック:タスクが永遠にブロック | select! のブランチが永遠にレディにならない |
タイムアウトブランチを追加するか、全チャネルにメッセージソースがあることを確認 |
| メモリが増加し続ける | 非バインドチャネルを使用している | バインドチャネル mpsc::channel(cap) に切り替え |
oneshot RecvError |
レスポンダーが送信せずに終了 | サーバーロジックを確認、全ブランチで responder.send() を呼び出すことを確認 |
watch changed() が発火しない |
値が実際に変更されていない | watch は値の変更時のみ通知、同じ値ではトリガーされない |
| broadcast メッセージが消失 | レシーバーが送信後にサブスクライブした | 送信前にサブスクライブするか、リプレイ機構を使用 |
Mutex がawait越えでデッドロック |
ロック保持中に .await を呼び出した |
ロックスコープを縮小するか、チャネルベースの通信に切り替え |
高度な最適化テクニック
1. チャネル容量プランニング
チャネル容量は「大きければよい」ではなく、生産・消費レート差に基づいて計画する必要があります:
use tokio::sync::mpsc;
fn calculate_channel_capacity(
produce_rate: u64,
consume_rate: u64,
burst_duration_ms: u64,
) -> usize {
if consume_rate >= produce_rate {
return 64;
}
let backlog = (produce_rate - consume_rate) * burst_duration_ms / 1000;
(backlog as usize).next_power_of_two().max(64)
}
#[tokio::main]
async fn main() {
let cap = calculate_channel_capacity(10000, 8000, 2000);
let (tx, rx) = mpsc::channel::<String>(cap);
println!("channel capacity: {}", cap);
}
経験則:
- 低頻度シナリオ:32-64
- 中頻度シナリオ:128-256
- 高頻度バースト:512-2048
- 常に2の累乗を使用
2. グレースフルシャットダウンフロー
プロダクションサービスには順序あるシャットダウンが必要:プロデューサーに停止を通知 → 残りのメッセージを処理 → コンシューマを終了
use tokio::sync::{mpsc, watch};
use tokio::time::{timeout, Duration};
struct ShutdownSignal;
async fn graceful_shutdown_example() {
let (tx, mut rx) = mpsc::channel::<String>(64);
let (shutdown_tx, shutdown_rx) = watch::channel(false);
for i in 0..3 {
let tx = tx.clone();
let mut shutdown = shutdown_rx.clone();
tokio::spawn(async move {
loop {
select! {
_ = shutdown.changed() => {
println!("producer-{} shutting down", i);
return;
}
_ = tokio::time::sleep(Duration::from_millis(100)) => {
if tx.send(format!("msg-from-{}", i)).await.is_err() {
return;
}
}
}
}
});
}
drop(tx);
let mut shutdown_consumer = shutdown_rx.clone();
tokio::spawn(async move {
tokio::signal::ctrl_c().await.ok();
shutdown_tx.send(true).ok();
});
loop {
select! {
Some(msg) = rx.recv() => {
println!("processing: {}", msg);
}
_ = shutdown_consumer.changed() => {
println!("draining remaining messages...");
while let Ok(Some(msg)) = timeout(Duration::from_secs(1), rx.recv()).await {
println!("drained: {}", msg);
}
break;
}
}
}
}
3. パフォーマンスモニタリングとメトリクス収集
use tokio::sync::mpsc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;
struct ChannelMetrics {
sent: AtomicU64,
received: AtomicU64,
dropped: AtomicU64,
}
impl ChannelMetrics {
fn new() -> Self {
Self {
sent: AtomicU64::new(0),
received: AtomicU64::new(0),
dropped: AtomicU64::new(0),
}
}
fn record_send(&self) {
self.sent.fetch_add(1, Ordering::Relaxed);
}
fn record_recv(&self) {
self.received.fetch_add(1, Ordering::Relaxed);
}
fn record_drop(&self) {
self.dropped.fetch_add(1, Ordering::Relaxed);
}
fn report(&self) {
let sent = self.sent.load(Ordering::Relaxed);
let received = self.received.load(Ordering::Relaxed);
let dropped = self.dropped.load(Ordering::Relaxed);
println!(
"metrics - sent: {}, received: {}, dropped: {}",
sent, received, dropped
);
}
}
async fn monitored_producer(
tx: mpsc::Sender<String>,
metrics: Arc<ChannelMetrics>,
) {
for i in 0..1000 {
match tx.send(format!("msg-{}", i)).await {
Ok(()) => metrics.record_send(),
Err(_) => {
metrics.record_drop();
return;
}
}
}
}
async fn monitored_consumer(
mut rx: mpsc::Receiver<String>,
metrics: Arc<ChannelMetrics>,
) {
while let Some(_) = rx.recv().await {
metrics.record_recv();
}
}
チャネルタイプ比較
| 特徴 | mpsc | broadcast | watch | oneshot | crossbeam | flume |
|---|---|---|---|---|---|---|
| プロデューサー | 複数 | 複数 | 単一 | 単一 | 複数 | 複数 |
| コンシューマ | 単一 | 複数 | 複数 | 単一 | 複数 | 複数 |
| メッセージ保持 | FIFOキュー | コンシューマごとのキュー | 最新値のみ | 単一値 | FIFOキュー | FIFOキュー |
| バックプレッシャー | バインド/アンバインド | Laggedエラー | 古い値を上書き | なし | バインド/アンバインド | バインド/アンバインド |
| 非同期サポート | ネイティブ | ネイティブ | ネイティブ | ネイティブ | 同期のみ | 非同期+同期 |
| パフォーマンス | 高い | 中程度 | 非常に高い | 非常に高い | 非常に高い | 高い |
| 典型的な用途 | タスクキュー | イベントブロードキャスト | 状態同期 | RPCレスポンス | 同期通信 | 混合シナリオ |
| ランタイム依存 | tokio | tokio | tokio | tokio | なし | なし |
まとめ
Rust Tokio Channelの中核哲学:メモリを共有して通信するのではなく、通信してメモリを共有する。 正しいチャネルタイプを選ぶことは、間違ったチャネルタイプを最適化することより重要です——
mpscはタスクディスパッチに、oneshotはリクエスト・レスポンスに、broadcastはイベントファンアウトに、watchは状態同期に。プロダクション環境では、常にバインドチャネルでバックプレッシャーを実現し、select!でマルチチャネルを編成し、watchでシャットダウンシグナルを伝播すれば、非同期アーキテクチャは効率的かつ信頼性の高いものになります。
おすすめツール
- JSONフォーマッター — チャネルメッセージのJSONデータをフォーマット
- ハッシュ計算 — メッセージダイジェストを計算、メッセージの重複排除と検証に使用
- Base64エンコード/デコード — バイナリメッセージデータのエンコード/デコード
ブラウザローカルツールを無料で試す →