Rustアクターモデルフレームワーク実践:メッセージパッシングからスーパービジョンまでの5つのプロダクションパターン
並行性モデルがRustの所有権と出会うとき
こんな経験はありませんか——システムが同時に数千の独立したステートフルエンティティを処理する必要があり、それぞれが独自のライフサイクルとメッセージキューを持っている。Arc<Mutex<HashMap>> で共有状態を管理しようとしたら、ロック競合でスループットが急低下。スレッドプールでタスクをディスパッチしようとしたら、スレッド間通信のオーバーヘッドが計算自体を上回る。そして気づくのです——アクターモデルこそが答えだと。
アクターモデルの核心はシンプル:各アクターは独立した計算ユニットで、プライベートな状態を持ち、メッセージパッシングでのみ外部と通信します。共有状態がなければロック競合もなく、ロック競合がなければデッドロックもありません。Rustの所有権システムはアクターモデルと自然に適合——メッセージの所有権は送信時に移転し、コンパイラがスレッドセーフを保証します。
コア概念一覧
| 概念 | 説明 | 典型的なユースケース |
|---|---|---|
| Actor | 状態と振る舞いをカプセル化する独立した計算ユニット | ステートマシン、接続管理 |
| Message | アクター間通信の唯一の手段 | コマンド、イベント、クエリ |
| Mailbox | アクターのメッセージキュー | メッセージバッファリング、順序付き処理 |
| Supervision | 親アクターが子アクターを監視 | 障害回復、再起動戦略 |
| Address | アクターの参照ハンドル | メッセージアドレッシング、ルーティング |
| Context | アクターのランタイム環境 | タイマー、子アクター管理 |
| Backpressure | フロー制御メカニズム | Mailboxオーバーフロー防止 |
アクターモデルの5つの課題
1. 共有状態のロック地獄
従来の並行性モデルは共有メモリ+ロックに依存しますが、高並行シナリオではロック競合がボトルネックになります。アクターモデルは「共有しない」ことでこの問題を解決——各アクターが自分の状態を独占します。
2. 障害隔離と回復
あるコンポーネントのクラッシュがシステム全体をダウンさせるべきではありません。アクターモデルはスーパービジョンツリーで障害隔離を実現——子アクターがクラッシュした際、親が処理方法(再起動、停止、エスカレーションなど)を決定します。
3. メッセージ順序の保証
単一アクター内のメッセージは自然に順序付けられますが、アクター間のメッセージ順序をどう保証するか?メッセージプロトコルとルーティング戦略の慎重な設計が必要です。
4. 分散透過性
ローカルアクターとリモートアクターの通信方法は同じであるべきです。ビジネスロジックを変更せずにアクターのクロスノード通信を可能にするには?
5. グレースフルシャットダウンの複雑性
アクターシステムには数百〜数千のアクターが存在する可能性があります。シャットダウン時にすべてのメッセージが処理され、すべてのリソースが解放されることをどう保証するか?
5つのプロダクション級アクターパターン
パターン1:Actixアクターシステム——クラシックアクターパターン
ActixはRustエコシステムで最も成熟したアクターフレームワークで、型付きメッセージハンドリングと完全なアクターライフサイクル管理を提供します。
use actix::prelude::*;
use std::time::Duration;
struct GameSession {
id: String,
score: u64,
level: u32,
}
impl Actor for GameSession {
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {
println!("session {} started, score: {}", self.id, self.score);
}
fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
println!("session {} stopping", self.id);
Running::Continue
}
fn stopped(&mut self, _ctx: &mut Self::Context) {
println!("session {} stopped, final score: {}", self.id, self.score);
}
}
#[derive(Message)]
#[rtype(result = "u64")]
struct GetScore;
#[derive(Message)]
#[rtype(result = "()")]
struct AddScore { amount: u64 }
#[derive(Message)]
#[rtype(result = "()")]
struct LevelUp;
impl Handler<GetScore> for GameSession {
type Result = u64;
fn handle(&mut self, _msg: GetScore, _ctx: &mut Context<Self>) -> u64 {
self.score
}
}
impl Handler<AddScore> for GameSession {
type Result = ();
fn handle(&mut self, msg: AddScore, _ctx: &mut Context<Self>) {
self.score += msg.amount;
println!("session {} score: {}", self.id, self.score);
}
}
impl Handler<LevelUp> for GameSession {
type Result = ();
fn handle(&mut self, _msg: LevelUp, ctx: &mut Context<Self>) {
self.level += 1;
println!("session {} leveled up to {}", self.id, self.level);
ctx.notify_later(LevelUp, Duration::from_secs(30));
}
}
#[actix::main]
async fn main() {
let addr = GameSession {
id: "player-001".into(),
score: 0,
level: 1,
}.start();
addr.send(AddScore { amount: 100 }).await.unwrap();
addr.send(AddScore { amount: 50 }).await.unwrap();
addr.send(LevelUp).await.unwrap();
let score = addr.send(GetScore).await.unwrap();
println!("final score: {}", score);
}
重要ポイント:
impl Actorはアクターライフサイクルを定義:started、stopping、stopped#[derive(Message)]は型付きメッセージを定義、rtypeは戻り値の型を指定Handler<T>トレイトは各メッセージタイプの処理ロジックを実装ctx.notify_laterはスケジュール済み自己メッセージで周期タスクを構築addr.send()はメッセージを非同期送信し、レスポンスを待機
パターン2:Tokioチャネルベースアクター——軽量アクターパターン
Tokioチャネルでスクラッチからアクターを構築——より軽量で柔軟、パフォーマンスと制御性を極限まで求めるシナリオに最適。
use tokio::sync::{mpsc, oneshot};
use std::collections::HashMap;
enum CacheCommand {
Get {
key: String,
responder: oneshot::Sender<Option<String>>,
},
Set {
key: String,
value: String,
ttl: Option<std::time::Duration>,
responder: oneshot::Sender<bool>,
},
Delete {
key: String,
responder: oneshot::Sender<bool>,
},
Stats {
responder: oneshot::Sender<CacheStats>,
},
}
#[derive(Debug)]
struct CacheStats {
entries: usize,
hits: u64,
misses: u64,
}
struct CacheEntry {
value: String,
expires_at: Option<std::time::Instant>,
}
struct CacheActor {
store: HashMap<String, CacheEntry>,
hits: u64,
misses: u64,
}
impl CacheActor {
fn new() -> Self {
Self {
store: HashMap::new(),
hits: 0,
misses: 0,
}
}
fn is_expired(&self, entry: &CacheEntry) -> bool {
match entry.expires_at {
Some(t) => std::time::Instant::now() > t,
None => false,
}
}
async fn run(&mut self, mut rx: mpsc::Receiver<CacheCommand>) {
while let Some(cmd) = rx.recv().await {
match cmd {
CacheCommand::Get { key, responder } => {
let result = match self.store.get(&key) {
Some(entry) if !self.is_expired(entry) => {
self.hits += 1;
Some(entry.value.clone())
}
Some(_) => {
self.store.remove(&key);
self.misses += 1;
None
}
None => {
self.misses += 1;
None
}
};
let _ = responder.send(result);
}
CacheCommand::Set { key, value, ttl, responder } => {
let expires_at = ttl.map(|d| std::time::Instant::now() + d);
self.store.insert(key, CacheEntry { value, expires_at });
let _ = responder.send(true);
}
CacheCommand::Delete { key, responder } => {
let existed = self.store.remove(&key).is_some();
let _ = responder.send(existed);
}
CacheCommand::Stats { responder } => {
let stats = CacheStats {
entries: self.store.len(),
hits: self.hits,
misses: self.misses,
};
let _ = responder.send(stats);
}
}
}
println!("cache actor shutting down, {} entries remaining", self.store.len());
}
}
#[tokio::main]
async fn main() {
let (tx, rx) = mpsc::channel::<CacheCommand>(256);
let mut actor = CacheActor::new();
tokio::spawn(async move { actor.run(rx).await });
let (resp_tx, resp_rx) = oneshot::channel();
tx.send(CacheCommand::Set {
key: "user:1001".into(),
value: r#"{"name":"alice"}"#.into(),
ttl: Some(std::time::Duration::from_secs(300)),
responder: resp_tx,
}).await.unwrap();
resp_rx.await.unwrap();
let (resp_tx, resp_rx) = oneshot::channel();
tx.send(CacheCommand::Get {
key: "user:1001".into(),
responder: resp_tx,
}).await.unwrap();
let result = resp_rx.await.unwrap();
println!("get result: {:?}", result);
let (resp_tx, resp_rx) = oneshot::channel();
tx.send(CacheCommand::Stats { responder: resp_tx }).await.unwrap();
let stats = resp_rx.await.unwrap();
println!("cache stats: {:?}", stats);
}
重要ポイント:
mpscチャネルがアクターのMailboxとして機能、oneshotチャネルでリクエスト・レスポンスを実装enum Commandはメッセージプロトコルを定義——型安全で拡張可能- アクターは単一オーナーで、データ競合が自然に発生しない
- 外部フレームワーク依存なし——Tokioだけで完全なアクターシステムを構築
ttl期限切れはアクター内部状態管理の柔軟性を示す
チャネルアクター vs Actix 比較:
| 特徴 | チャネルアクター | Actix |
|---|---|---|
| 依存関係 | tokioのみ | actix + tokio |
| メッセージ型 | 手動enum | derive Message |
| レスポンス機構 | oneshotチャネル | Message rtype |
| ライフサイクル | 手動管理 | 自動コールバック |
| スーパービジョン | 手動実装 | 組み込みサポート |
| パフォーマンスオーバーヘッド | 極小 | 小 |
| 学習曲線 | 緩やか | 中程度 |
パターン3:スーパービジョンツリー——障害回復パターン
スーパービジョンツリーはアクターモデルの最も強力な機能の一つ:親アクターが子アクターを監視し、クラッシュ時に回復方法を決定します。
use tokio::sync::{mpsc, oneshot};
use tokio::time::{sleep, Duration};
use std::collections::HashMap;
#[derive(Debug, Clone)]
enum RestartStrategy {
Permanent,
Transient,
Temporary,
}
#[derive(Debug)]
enum WorkerMessage {
Process { data: String, responder: oneshot::Sender<String> },
Crash,
Status { responder: oneshot::Sender<WorkerStatus> },
}
#[derive(Debug, Clone)]
struct WorkerStatus {
id: String,
processed: u64,
alive: bool,
}
struct Worker {
id: String,
processed: u64,
}
impl Worker {
fn new(id: String) -> Self {
Self { id, processed: 0 }
}
async fn run(
&mut self,
mut rx: mpsc::Receiver<WorkerMessage>,
supervisor_tx: mpsc::Sender<SupervisorMessage>,
) {
println!("worker {} started", self.id);
while let Some(msg) = rx.recv().await {
match msg {
WorkerMessage::Process { data, responder } => {
self.processed += 1;
let result = format!("processed by {}: {}", self.id, data.to_uppercase());
let _ = responder.send(result);
}
WorkerMessage::Crash => {
println!("worker {} crashing!", self.id);
let _ = supervisor_tx.send(SupervisorMessage::ChildCrashed {
id: self.id.clone(),
}).await;
return;
}
WorkerMessage::Status { responder } => {
let _ = responder.send(WorkerStatus {
id: self.id.clone(),
processed: self.processed,
alive: true,
});
}
}
}
let _ = supervisor_tx.send(SupervisorMessage::ChildStopped {
id: self.id.clone(),
}).await;
}
}
enum SupervisorMessage {
ChildCrashed { id: String },
ChildStopped { id: String },
SpawnWorker { id: String },
GetStatus { responder: oneshot::Sender<Vec<WorkerStatus>> },
}
struct ChildHandle {
tx: mpsc::Sender<WorkerMessage>,
strategy: RestartStrategy,
}
struct Supervisor {
children: HashMap<String, ChildHandle>,
supervisor_rx: mpsc::Receiver<SupervisorMessage>,
supervisor_tx: mpsc::Sender<SupervisorMessage>,
restart_count: HashMap<String, u32>,
max_restarts: u32,
}
impl Supervisor {
fn new(rx: mpsc::Receiver<SupervisorMessage>, tx: mpsc::Sender<SupervisorMessage>, max_restarts: u32) -> Self {
Self {
children: HashMap::new(),
supervisor_rx: rx,
supervisor_tx: tx,
restart_count: HashMap::new(),
max_restarts,
}
}
async fn spawn_child(&mut self, id: String, strategy: RestartStrategy) {
let (tx, rx) = mpsc::channel::<WorkerMessage>(64);
let supervisor_tx = self.supervisor_tx.clone();
let worker_id = id.clone();
let mut worker = Worker::new(id.clone());
tokio::spawn(async move {
worker.run(rx, supervisor_tx).await;
});
self.children.insert(id, ChildHandle { tx, strategy });
println!("supervisor spawned child {}", worker_id);
}
async fn restart_child(&mut self, id: &str) -> bool {
let restarts = self.restart_count.entry(id.to_string()).or_insert(0);
if *restarts >= self.max_restarts {
println!("supervisor: child {} exceeded max restarts ({})", id, self.max_restarts);
return false;
}
*restarts += 1;
println!("supervisor: restarting child {} (attempt {})", id, *restarts);
let strategy = self.children.get(id).map(|h| h.strategy.clone());
if let Some(strategy) = strategy {
self.children.remove(id);
self.spawn_child(id.to_string(), strategy).await;
true
} else {
false
}
}
async fn run(&mut self) {
while let Some(msg) = self.supervisor_rx.recv().await {
match msg {
SupervisorMessage::ChildCrashed { id } => {
let strategy = self.children.get(&id).map(|h| h.strategy.clone());
match strategy {
Some(RestartStrategy::Permanent) => {
self.restart_child(&id).await;
}
Some(RestartStrategy::Transient) => {
self.restart_child(&id).await;
}
Some(RestartStrategy::Temporary) => {
println!("supervisor: child {} crashed, not restarting (temporary)", id);
self.children.remove(&id);
}
None => {}
}
}
SupervisorMessage::ChildStopped { id } => {
let strategy = self.children.get(&id).map(|h| h.strategy.clone());
match strategy {
Some(RestartStrategy::Permanent) => {
self.restart_child(&id).await;
}
Some(RestartStrategy::Transient) | Some(RestartStrategy::Temporary) => {
self.children.remove(&id);
}
None => {}
}
}
SupervisorMessage::SpawnWorker { id } => {
self.spawn_child(id, RestartStrategy::Permanent).await;
}
SupervisorMessage::GetStatus { responder } => {
let mut statuses = Vec::new();
for (id, handle) in &self.children {
let (resp_tx, resp_rx) = oneshot::channel();
if handle.tx.send(WorkerMessage::Status { responder: resp_tx }).await.is_ok() {
if let Ok(status) = resp_rx.await {
statuses.push(status);
}
}
}
let _ = responder.send(statuses);
}
}
}
}
}
#[tokio::main]
async fn main() {
let (sup_tx, sup_rx) = mpsc::channel::<SupervisorMessage>(256);
let mut supervisor = Supervisor::new(sup_rx, sup_tx.clone(), 3);
tokio::spawn(async move { supervisor.run().await });
sup_tx.send(SupervisorMessage::SpawnWorker { id: "worker-1".into() }).await.unwrap();
sup_tx.send(SupervisorMessage::SpawnWorker { id: "worker-2".into() }).await.unwrap();
sleep(Duration::from_millis(50)).await;
let (resp_tx, resp_rx) = oneshot::channel();
sup_tx.send(SupervisorMessage::GetStatus { responder: resp_tx }).await.unwrap();
let statuses = resp_rx.await.unwrap();
for s in statuses {
println!("status: {:?}", s);
}
}
重要ポイント:
- スーパーバイザーは子アクターの
Senderハンドルを保持し、mpscでクラッシュ通知を受信 - 3つの再起動戦略:
Permanent(常に再起動)、Transient(エラー時再起動)、Temporary(再起動しない) max_restartsで無限再起動ループを防止- 子アクターはクラッシュ時にスーパーバイザーに能動的に通知、スーパーバイザーが再起動の可否を決定
- スーパービジョンツリーはネスト可能:スーパーバイザー自体も監視対象の子アクターになり得る
再起動戦略比較:
| 戦略 | 正常終了 | 異常終了 | 典型的なユースケース |
|---|---|---|---|
| Permanent | 再起動 | 再起動 | 常駐サービス、定期タスク |
| Transient | 再起動しない | 再起動 | リクエストハンドラー、一時計算 |
| Temporary | 再起動しない | 再起動しない | 一回限りタスク、デバッグ |
パターン4:分散アクター——リモートメッセージングパターン
アクターがプロセス境界を越える場合、メッセージをシリアライズしてネットワーク経由で転送する必要があります。このパターンはリモート通信可能なアクターシステムの構築方法を示します。
use tokio::sync::{mpsc, oneshot};
use tokio::net::{TcpListener, TcpStream};
use serde::{Serialize, Deserialize};
use std::collections::HashMap;
use std::net::SocketAddr;
#[derive(Debug, Serialize, Deserialize)]
struct RemoteEnvelope {
target: String,
message_type: String,
payload: Vec<u8>,
reply_to: Option<u64>,
}
#[derive(Debug, Serialize, Deserialize)]
enum OrderCommand {
Create { order_id: String, item: String, quantity: u32 },
Cancel { order_id: String },
Query { order_id: String },
}
#[derive(Debug, Serialize, Deserialize)]
enum OrderResponse {
Created { order_id: String },
Cancelled { order_id: String },
Found { order_id: String, item: String, quantity: u32 },
NotFound { order_id: String },
Error { message: String },
}
#[derive(Debug, Serialize, Deserialize, Clone)]
struct Order {
id: String,
item: String,
quantity: u32,
status: String,
}
struct OrderActor {
orders: HashMap<String, Order>,
cmd_rx: mpsc::Receiver<(OrderCommand, oneshot::Sender<OrderResponse>)>,
}
impl OrderActor {
fn new(rx: mpsc::Receiver<(OrderCommand, oneshot::Sender<OrderResponse>)>) -> Self {
Self { orders: HashMap::new(), cmd_rx: rx }
}
async fn run(&mut self) {
while let Some((cmd, responder)) = self.cmd_rx.recv().await {
let response = match cmd {
OrderCommand::Create { order_id, item, quantity } => {
if self.orders.contains_key(&order_id) {
OrderResponse::Error { message: format!("order {} already exists", order_id) }
} else {
self.orders.insert(order_id.clone(), Order {
id: order_id.clone(),
item,
quantity,
status: "created".into(),
});
OrderResponse::Created { order_id }
}
}
OrderCommand::Cancel { order_id } => {
match self.orders.get_mut(&order_id) {
Some(order) => {
order.status = "cancelled".into();
OrderResponse::Cancelled { order_id }
}
None => OrderResponse::NotFound { order_id },
}
}
OrderCommand::Query { order_id } => {
match self.orders.get(&order_id) {
Some(order) => OrderResponse::Found {
order_id: order.id.clone(),
item: order.item.clone(),
quantity: order.quantity,
},
None => OrderResponse::NotFound { order_id },
}
}
};
let _ = responder.send(response);
}
}
}
struct RemoteGateway {
local_actors: HashMap<String, mpsc::Sender<(OrderCommand, oneshot::Sender<OrderResponse>)>>,
remote_nodes: HashMap<String, SocketAddr>,
next_request_id: u64,
}
impl RemoteGateway {
fn new() -> Self {
Self {
local_actors: HashMap::new(),
remote_nodes: HashMap::new(),
next_request_id: 0,
}
}
fn register_local(&mut self, name: String, tx: mpsc::Sender<(OrderCommand, oneshot::Sender<OrderResponse>)>) {
self.local_actors.insert(name, tx);
}
fn register_remote(&mut self, name: String, addr: SocketAddr) {
self.remote_nodes.insert(name, addr);
}
async fn send_command(&mut self, target: &str, cmd: OrderCommand) -> Option<OrderResponse> {
if let Some(tx) = self.local_actors.get(target) {
let (resp_tx, resp_rx) = oneshot::channel();
if tx.send((cmd, resp_tx)).await.is_ok() {
return resp_rx.await.ok();
}
return None;
}
if let Some(addr) = self.remote_nodes.get(target) {
match TcpStream::connect(addr).await {
Ok(mut stream) => {
let payload = bincode::serialize(&cmd).ok()?;
let request_id = self.next_request_id;
self.next_request_id += 1;
let envelope = RemoteEnvelope {
target: target.to_string(),
message_type: "OrderCommand".to_string(),
payload,
reply_to: Some(request_id),
};
let data = bincode::serialize(&envelope).ok()?;
let len = data.len() as u32;
use tokio::io::AsyncWriteExt;
if stream.write_all(&len.to_le_bytes()).await.is_err() { return None; }
if stream.write_all(&data).await.is_err() { return None; }
use tokio::io::AsyncReadExt;
let mut len_buf = [0u8; 4];
if stream.read_exact(&mut len_buf).await.is_err() { return None; }
let resp_len = u32::from_le_bytes(len_buf) as usize;
let mut resp_buf = vec![0u8; resp_len];
if stream.read_exact(&mut resp_buf).await.is_err() { return None; }
bincode::deserialize(&resp_buf).ok()
}
Err(e) => {
eprintln!("failed to connect to {}: {}", target, e);
None
}
}
} else {
None
}
}
}
async fn server_node(addr: SocketAddr) {
let (tx, rx) = mpsc::channel::<(OrderCommand, oneshot::Sender<OrderResponse>)>(256);
let mut actor = OrderActor::new(rx);
tokio::spawn(async move { actor.run().await });
let listener = TcpListener::bind(addr).await.unwrap();
println!("order actor listening on {}", addr);
loop {
if let Ok((mut stream, _)) = listener.accept().await {
let tx = tx.clone();
tokio::spawn(async move {
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
let mut len_buf = [0u8; 4];
if stream.read_exact(&mut len_buf).await.is_err() { return; }
let len = u32::from_le_bytes(len_buf) as usize;
let mut buf = vec![0u8; len];
if stream.read_exact(&mut buf).await.is_err() { return; }
let envelope: RemoteEnvelope = match bincode::deserialize(&buf) {
Ok(e) => e,
Err(_) => return,
};
let cmd: OrderCommand = match bincode::deserialize(&envelope.payload) {
Ok(c) => c,
Err(_) => return,
};
let (resp_tx, resp_rx) = oneshot::channel();
if tx.send((cmd, resp_tx)).await.is_err() { return; }
let response = match resp_rx.await {
Ok(r) => r,
Err(_) => return,
};
let resp_data = bincode::serialize(&response).unwrap();
let resp_len = resp_data.len() as u32;
let _ = stream.write_all(&resp_len.to_le_bytes()).await;
let _ = stream.write_all(&resp_data).await;
});
}
}
}
#[tokio::main]
async fn main() {
let server_addr: SocketAddr = "127.0.0.1:9001".parse().unwrap();
tokio::spawn(server_node(server_addr));
tokio::time::sleep(Duration::from_millis(100)).await;
let mut gateway = RemoteGateway::new();
gateway.register_remote("order-service".into(), server_addr);
let response = gateway.send_command("order-service", OrderCommand::Create {
order_id: "ORD-001".into(),
item: "Rust Book".into(),
quantity: 2,
}).await;
println!("create response: {:?}", response);
let response = gateway.send_command("order-service", OrderCommand::Query {
order_id: "ORD-001".into(),
}).await;
println!("query response: {:?}", response);
}
重要ポイント:
RemoteEnvelopeはメッセージメタデータをラップ:ターゲットアクター、メッセージタイプ、ペイロード、リプライアドレス- ローカルアクターはチャネルで直接通信、リモートアクターはシリアライズされたメッセージをTCPで転送
RemoteGatewayはローカル/リモートルーティングを統一——ビジネスコードは位置透過bincodeは効率的なバイナリシリアライゼーションを提供、JSONよりアクター間通信に適している- 長さプレフィックスフレーミング(4バイト長+データ)でTCPのストリーム指向の問題を解決
パターン5:プロダクション級アクターシステム——グレースフルシャットダウンとバックプレッシャー
プロダクション環境のアクターシステムは、グレースフルシャットダウン、バックプレッシャー制御、メッセージタイムアウトなどの複雑なシナリオを処理する必要があります。
use tokio::sync::{mpsc, oneshot, watch, broadcast};
use tokio::time::{timeout, Duration, Instant};
use tokio::select;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
struct ActorSystem {
shutdown_tx: watch::Sender<bool>,
shutdown_rx: watch::Receiver<bool>,
event_tx: broadcast::Sender<String>,
metrics: Arc<ActorMetrics>,
}
struct ActorMetrics {
messages_sent: AtomicU64,
messages_processed: AtomicU64,
messages_dropped: AtomicU64,
actors_alive: AtomicU64,
}
impl ActorMetrics {
fn new() -> Self {
Self {
messages_sent: AtomicU64::new(0),
messages_processed: AtomicU64::new(0),
messages_dropped: AtomicU64::new(0),
actors_alive: AtomicU64::new(0),
}
}
fn snapshot(&self) -> String {
format!(
"sent: {}, processed: {}, dropped: {}, alive: {}",
self.messages_sent.load(Ordering::Relaxed),
self.messages_processed.load(Ordering::Relaxed),
self.messages_dropped.load(Ordering::Relaxed),
self.actors_alive.load(Ordering::Relaxed),
)
}
}
enum WorkerCmd {
Execute { task: String, responder: oneshot::Sender<String> },
Ping { responder: oneshot::Sender<bool> },
}
struct ResilientWorker {
id: String,
cmd_rx: mpsc::Receiver<WorkerCmd>,
shutdown_rx: watch::Receiver<bool>,
event_tx: broadcast::Sender<String>,
metrics: Arc<ActorMetrics>,
mailbox_capacity: usize,
process_timeout: Duration,
}
impl ResilientWorker {
async fn run(&mut self) {
self.metrics.actors_alive.fetch_add(1, Ordering::Relaxed);
println!("worker {} started", self.id);
loop {
select! {
_ = self.shutdown_rx.changed() => {
if *self.shutdown_rx.borrow() {
self.drain_mailbox().await;
break;
}
}
cmd = self.cmd_rx.recv() => {
match cmd {
Some(cmd) => self.handle_command(cmd).await,
None => break,
}
}
}
}
self.metrics.actors_alive.fetch_sub(1, Ordering::Relaxed);
println!("worker {} stopped", self.id);
}
async fn handle_command(&mut self, cmd: WorkerCmd) {
match cmd {
WorkerCmd::Execute { task, responder } => {
self.metrics.messages_sent.fetch_add(1, Ordering::Relaxed);
let result = match timeout(self.process_timeout, self.process_task(&task)).await {
Ok(output) => {
self.metrics.messages_processed.fetch_add(1, Ordering::Relaxed);
output
}
Err(_) => {
self.metrics.messages_dropped.fetch_add(1, Ordering::Relaxed);
let _ = self.event_tx.send(format!("worker {} timeout on: {}", self.id, task));
"timeout".to_string()
}
};
let _ = responder.send(result);
}
WorkerCmd::Ping { responder } => {
let _ = responder.send(true);
}
}
}
async fn process_task(&self, task: &str) -> String {
tokio::time::sleep(Duration::from_millis(10)).await;
format!("worker {} processed: {}", self.id, task.to_uppercase())
}
async fn drain_mailbox(&mut self) {
println!("worker {} draining mailbox...", self.id);
let drain_timeout = Duration::from_secs(5);
let deadline = Instant::now() + drain_timeout;
loop {
if Instant::now() > deadline {
println!("worker {} drain timeout, {} messages remaining", self.id, self.cmd_rx.len());
break;
}
match timeout(Duration::from_millis(100), self.cmd_rx.recv()).await {
Ok(Some(WorkerCmd::Execute { task, responder })) => {
let result = self.process_task(&task).await;
self.metrics.messages_processed.fetch_add(1, Ordering::Relaxed);
let _ = responder.send(result);
}
Ok(Some(WorkerCmd::Ping { responder })) => {
let _ = responder.send(false);
}
Ok(None) => break,
Err(_) => continue,
}
}
println!("worker {} mailbox drained", self.id);
}
}
impl ActorSystem {
fn new() -> Self {
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let (event_tx, _) = broadcast::channel::<String>(256);
let metrics = Arc::new(ActorMetrics::new());
Self { shutdown_tx, shutdown_rx, event_tx, metrics }
}
async fn spawn_worker(&self, id: String, mailbox_capacity: usize) -> mpsc::Sender<WorkerCmd> {
let (tx, rx) = mpsc::channel::<WorkerCmd>(mailbox_capacity);
let mut worker = ResilientWorker {
id,
cmd_rx: rx,
shutdown_rx: self.shutdown_rx.clone(),
event_tx: self.event_tx.clone(),
metrics: self.metrics.clone(),
mailbox_capacity,
process_timeout: Duration::from_secs(2),
};
tokio::spawn(async move { worker.run().await });
tx
}
async fn graceful_shutdown(&self) {
println!("initiating graceful shutdown...");
self.shutdown_tx.send(true).ok();
tokio::time::sleep(Duration::from_secs(6)).await;
println!("final metrics: {}", self.metrics.snapshot());
}
}
async fn backpressure_sender(
tx: mpsc::Sender<WorkerCmd>,
worker_id: String,
total: u64,
) -> u64 {
let mut sent = 0u64;
let mut dropped = 0u64;
for i in 0..total {
let (resp_tx, resp_rx) = oneshot::channel();
let task = format!("task-{}", i);
match timeout(Duration::from_millis(50), tx.send(WorkerCmd::Execute {
task,
responder: resp_tx,
})).await {
Ok(Ok(())) => {
match timeout(Duration::from_secs(3), resp_rx).await {
Ok(Ok(result)) => {
sent += 1;
if i % 100 == 0 {
println!("{} result: {}", worker_id, result);
}
}
_ => dropped += 1,
}
}
_ => dropped += 1,
}
}
println!("{} sent: {}, dropped: {}", worker_id, sent, dropped);
dropped
}
#[tokio::main]
async fn main() {
let system = ActorSystem::new();
let worker1 = system.spawn_worker("worker-1".into(), 64).await;
let worker2 = system.spawn_worker("worker-2".into(), 64).await;
let h1 = tokio::spawn(backpressure_sender(worker1, "sender-1".into(), 200));
let h2 = tokio::spawn(backpressure_sender(worker2, "sender-2".into(), 200));
h1.await.unwrap();
h2.await.unwrap();
println!("metrics: {}", system.metrics.snapshot());
system.graceful_shutdown().await;
}
重要ポイント:
watch::channel(false)はシャットダウンシグナルをブロードキャスト、全アクターが同時に受信drain_mailbox()はシャットダウン前に残存メッセージを処理、無限待機を防ぐタイムアウト付きtimeoutでsendとrecvをラップ、バックプレッシャーによる無限ブロッキングを防止ActorMetricsはAtomicU64でロックフリー、スレッドセーフなメトリクス収集- バウンド付きMailbox(容量64)が自然なバックプレッシャーを提供——フル時にプロデューサーは待機
5つのよくある落とし穴
落とし穴1:アクター間の循環待機
// ❌ 間違い:アクターAがアクターBのレスポンスを待ち、アクターBがアクターAのレスポンスを待つ
async fn deadlock_pattern() {
let (tx_a, mut rx_a) = mpsc::channel::<String>(16);
let (tx_b, mut rx_b) = mpsc::channel::<String>(16);
tokio::spawn(async move {
tx_b.send("request-from-a".into()).await.unwrap();
let response = rx_a.recv().await.unwrap();
});
tokio::spawn(async move {
tx_a.send("request-from-b".into()).await.unwrap();
let response = rx_b.recv().await.unwrap();
});
}
// ✅ 正しい:タイムアウトまたは一方向メッセージでサイクルを打破
async fn safe_pattern() {
let (tx_a, mut rx_a) = mpsc::channel::<String>(16);
let (tx_b, mut rx_b) = mpsc::channel::<String>(16);
tokio::spawn(async move {
tx_b.send("request-from-a".into()).await.unwrap();
match timeout(Duration::from_secs(5), rx_a.recv()).await {
Ok(Some(response)) => println!("got: {}", response),
_ => println!("timeout or channel closed"),
}
});
}
落とし穴2:Mailbox満了の処理忘れ
// ❌ 間違い:アンバインドチャネル、メモリが爆発する可能性
async fn unbounded_risk() {
let (tx, mut rx) = mpsc::unbounded_channel::<String>();
for i in 0..1000000 {
tx.send(format!("msg-{}", i)).unwrap();
}
}
// ✅ 正しい:バウンド付きチャネル+バックプレッシャー
async fn bounded_safe() {
let (tx, mut rx) = mpsc::channel::<String>(256);
for i in 0..1000 {
match tx.send(format!("msg-{}", i)).await {
Ok(()) => {}
Err(_) => {
println!("receiver dropped at {}", i);
return;
}
}
}
}
落とし穴3:スーパーバイザーの再起動制限なし
// ❌ 間違い:無限再起動、クラッシュ・再起動ループの可能性
async fn infinite_restart_loop() {
loop {
let (tx, rx) = mpsc::channel::<String>(16);
let handle = tokio::spawn(async move {
panic!("always crash!");
});
let _ = handle.await;
println!("restarting...");
}
}
// ✅ 正しい:再起動頻度と回数を制限
struct RestartPolicy {
max_restarts: u32,
window: Duration,
restarts: Vec<Instant>,
}
impl RestartPolicy {
fn new(max_restarts: u32, window: Duration) -> Self {
Self { max_restarts, window, restarts: Vec::new() }
}
fn should_restart(&mut self) -> bool {
let now = Instant::now();
self.restarts.retain(|t| now - *t < self.window);
if self.restarts.len() < self.max_restarts as usize {
self.restarts.push(now);
true
} else {
false
}
}
}
落とし穴4:リモートメッセージにタイムアウトなし
// ❌ 間違い:リモートコールが永遠にブロックする可能性
async fn remote_without_timeout() {
let response = gateway.send_command("remote-actor", cmd).await;
}
// ✅ 正しい:すべてのリモートコールにタイムアウトを設定
async fn remote_with_timeout() {
match timeout(Duration::from_secs(5), gateway.send_command("remote-actor", cmd)).await {
Ok(Some(response)) => println!("got: {:?}", response),
Ok(None) => println!("actor not found"),
Err(_) => println!("remote call timeout"),
}
}
落とし穴5:シャットダウン時の未処理メッセージ破棄
// ❌ 間違い:レシーバーを直接ドロップ、メッセージが失われる
async fn bad_shutdown(mut rx: mpsc::Receiver<String>) {
drop(rx);
}
// ✅ 正しい:クローズ前にドレイン
async fn good_shutdown(mut rx: mpsc::Receiver<String>) {
println!("draining remaining messages...");
while let Ok(Some(msg)) = timeout(Duration::from_millis(100), rx.recv()).await {
println!("drained: {}", msg);
}
println!("all messages processed");
}
エラートラブルシューティングクイックリファレンス
| 症状 | 考えられる原因 | 解決策 |
|---|---|---|
| アクターがメッセージに応答しない | Mailbox満了またはデッドロック | Mailbox容量を確認、タイムアウト機構を追加 |
頻繁な SendError |
レシーバーが既にクローズ | アクターが予期せず終了していないか確認、スーパービジョンを追加 |
| メモリが継続的に増加 | アンバインドチャネルまたはメッセージバックログ | バウンド付きチャネルに切り替え、バックプレッシャーを追加 |
| 再起動ループ | アクターの初期化失敗 | 起動ロジックを確認、再起動回数を制限 |
| リモートコールタイムアウト | ネットワーク遅延またはピア無応答 | タイムアウトとリトライ機構を追加 |
| メッセージ順序の問題 | 複数プロデューサーの並行送信 | 順序保証にシングルプロデューサーを使用、またはシーケンス番号を追加 |
| グレースフルシャットダウンがスタック | アクターがシャットダウンシグナルに応答しない | ドレイン操作にタイムアウトを設定 |
oneshot RecvError |
レスポンダーが送信せずに終了 | すべてのコードパスでレスポンスを送信しているか確認 |
高度な最適化テクニック
1. アクタープーリング——負荷分散
単一アクターがボトルネックになった場合、アクタープールでメッセージを分散:
use tokio::sync::mpsc;
struct ActorPool {
workers: Vec<mpsc::Sender<WorkerCmd>>,
next: std::sync::atomic::AtomicUsize,
}
impl ActorPool {
fn new(workers: Vec<mpsc::Sender<WorkerCmd>>) -> Self {
Self {
workers,
next: std::sync::atomic::AtomicUsize::new(0),
}
}
fn next_worker(&self) -> &mpsc::Sender<WorkerCmd> {
let idx = self.next.fetch_add(1, Ordering::Relaxed) % self.workers.len();
&self.workers[idx]
}
async fn send(&self, cmd: WorkerCmd) -> bool {
let worker = self.next_worker();
worker.send(cmd).await.is_ok()
}
}
2. メッセージ優先度
use tokio::sync::mpsc;
use tokio::select;
enum Priority {
High,
Normal,
Low,
}
struct PriorityMailbox {
high_rx: mpsc::Receiver<String>,
normal_rx: mpsc::Receiver<String>,
low_rx: mpsc::Receiver<String>,
}
impl PriorityMailbox {
async fn recv(&mut self) -> Option<String> {
select! {
biased;
msg = self.high_rx.recv() => msg,
msg = self.normal_rx.recv() => msg,
msg = self.low_rx.recv() => msg,
}
}
}
3. トレースIDによるメッセージ追跡
use std::sync::atomic::{AtomicU64, Ordering};
static TRACE_ID: AtomicU64 = AtomicU64::new(1);
fn new_trace_id() -> u64 {
TRACE_ID.fetch_add(1, Ordering::Relaxed)
}
#[derive(Debug, Clone)]
struct TracedMessage {
trace_id: u64,
span_id: u64,
parent_span: Option<u64>,
payload: String,
}
impl TracedMessage {
fn new(payload: String) -> Self {
Self {
trace_id: new_trace_id(),
span_id: new_trace_id(),
parent_span: None,
payload,
}
}
fn child(&self, payload: String) -> Self {
Self {
trace_id: self.trace_id,
span_id: new_trace_id(),
parent_span: Some(self.span_id),
payload,
}
}
}
アクターフレームワーク比較
| 特徴 | Actix | Tokio Channel | Riker | Coerce |
|---|---|---|---|---|
| メッセージパッシング | 型付きMessage | 手動enum | 型付き | 型付き |
| スーパービジョンツリー | 組み込み | 手動実装 | 組み込み | 組み込み |
| リモートアクター | actix-remote | 手動実装 | サポート | 組み込み |
| パフォーマンス | 高 | 非常に高い | 中 | 高 |
| 依存関係 | actix | tokioのみ | riker | coerce |
| 学習曲線 | 中程度 | 緩やか | 中程度 | 中程度 |
| エコシステム成熟度 | 高 | 高 | 低 | 低 |
| ユースケース | Webサービス | 汎用並行性 | 伝統的アクター | 分散 |
まとめ
Rustアクターモデルの核心哲学:メモリを共有して通信するのではなく、通信してメモリを共有する。Actixは成熟した型付きアクターシステムを提供し、Tokioチャネルアクターはより軽量で柔軟です。スーパービジョンツリーはプロダクション級アクターシステムの礎石——スーパービジョンのないアクターシステムは、シートベルトのない車のようなものです。分散シナリオでは、ローカル/リモート通信インターフェースの統一が鍵です。常にバウンド付きMailboxでバックプレッシャーを実現し、すべてのリモートコールにタイムアウトを設定し、ドレイン機構でグレースフルシャットダウンを確保すれば、あなたのアクターシステムは堅牢かつ効率的になります。
推奨ツール
- JSONフォーマッター — アクターメッセージのJSONデータをフォーマット
- ハッシュ計算 — メッセージダイジェストを計算、重複排除と検証に使用
- Curl→コード変換 — リモートアクターのHTTP呼び出しをコードに変換
ブラウザローカルツールを無料で試す →