Rust Actor Model Framework: 5 Production Patterns from Message Passing to Supervision
When Concurrency Meets Rust's Ownership
Ever run into this — your system needs to handle thousands of independent stateful entities simultaneously, each with its own lifecycle and message queue. You tried Arc<Mutex<HashMap>> for shared state, only to watch lock contention tank your throughput; you tried thread pools for task dispatch, but found inter-thread communication overhead exceeding the computation itself; eventually you realize, the Actor model is the answer.
The Actor model's core idea is simple: each Actor is an independent computation unit with private state, communicating only through message passing. No shared state means no lock contention; no lock contention means no deadlocks. Rust's ownership system is a natural fit — message ownership transfers on send, and the compiler guarantees thread safety.
Core Concepts at a Glance
| Concept | Description | Typical Use Case |
|---|---|---|
| Actor | Independent computation unit encapsulating state and behavior | State machines, connection management |
| Message | The sole communication mechanism between Actors | Commands, events, queries |
| Mailbox | Actor's message queue | Message buffering, ordered processing |
| Supervision | Parent Actor monitors child Actors | Fault recovery, restart strategies |
| Address | Actor's reference handle | Message addressing, routing |
| Context | Actor's runtime environment | Timers, child Actor management |
| Backpressure | Flow control mechanism | Preventing Mailbox overflow |
Five Challenges of the Actor Model
1. The Lock Hell of Shared State
Traditional concurrency models rely on shared memory + locks, but under high concurrency, lock contention becomes the bottleneck. The Actor model solves this by "not sharing" — each Actor exclusively owns its state.
2. Fault Isolation and Recovery
One component crashing shouldn't bring down the entire system. The Actor model achieves fault isolation through supervision trees — when a child Actor crashes, the parent decides how to handle it (restart, stop, escalate, etc.).
3. Message Ordering Guarantees
Messages within a single Actor are naturally ordered, but how do you guarantee ordering across Actors? This requires careful design of message protocols and routing strategies.
4. Distribution Transparency
Local and remote Actors should communicate the same way. How do you enable cross-node Actor communication without changing business logic?
5. The Complexity of Graceful Shutdown
An Actor system may have hundreds or thousands of Actors. How do you ensure all messages are processed and all resources released during shutdown?
Five Production Actor Patterns
Pattern 1: Actix Actor System — Classic Actor Pattern
Actix is the most mature Actor framework in the Rust ecosystem, providing typed message handling and complete Actor lifecycle management.
use actix::prelude::*;
use std::time::Duration;
struct GameSession {
id: String,
score: u64,
level: u32,
}
impl Actor for GameSession {
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {
println!("session {} started, score: {}", self.id, self.score);
}
fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
println!("session {} stopping", self.id);
Running::Continue
}
fn stopped(&mut self, _ctx: &mut Self::Context) {
println!("session {} stopped, final score: {}", self.id, self.score);
}
}
#[derive(Message)]
#[rtype(result = "u64")]
struct GetScore;
#[derive(Message)]
#[rtype(result = "()")]
struct AddScore { amount: u64 }
#[derive(Message)]
#[rtype(result = "()")]
struct LevelUp;
impl Handler<GetScore> for GameSession {
type Result = u64;
fn handle(&mut self, _msg: GetScore, _ctx: &mut Context<Self>) -> u64 {
self.score
}
}
impl Handler<AddScore> for GameSession {
type Result = ();
fn handle(&mut self, msg: AddScore, _ctx: &mut Context<Self>) {
self.score += msg.amount;
println!("session {} score: {}", self.id, self.score);
}
}
impl Handler<LevelUp> for GameSession {
type Result = ();
fn handle(&mut self, _msg: LevelUp, ctx: &mut Context<Self>) {
self.level += 1;
println!("session {} leveled up to {}", self.id, self.level);
ctx.notify_later(LevelUp, Duration::from_secs(30));
}
}
#[actix::main]
async fn main() {
let addr = GameSession {
id: "player-001".into(),
score: 0,
level: 1,
}.start();
addr.send(AddScore { amount: 100 }).await.unwrap();
addr.send(AddScore { amount: 50 }).await.unwrap();
addr.send(LevelUp).await.unwrap();
let score = addr.send(GetScore).await.unwrap();
println!("final score: {}", score);
}
Key takeaways:
impl Actordefines the Actor lifecycle:started,stopping,stopped#[derive(Message)]defines typed messages;rtypespecifies the return typeHandler<T>trait implements handling logic for each message typectx.notify_laterenables scheduled self-messages for periodic tasksaddr.send()sends messages asynchronously and awaits responses
Pattern 2: Tokio Channel-Based Actor — Lightweight Actor Pattern
Build Actors from scratch using Tokio channels — lighter and more flexible, ideal for scenarios demanding maximum performance and control.
use tokio::sync::{mpsc, oneshot};
use std::collections::HashMap;
enum CacheCommand {
Get {
key: String,
responder: oneshot::Sender<Option<String>>,
},
Set {
key: String,
value: String,
ttl: Option<std::time::Duration>,
responder: oneshot::Sender<bool>,
},
Delete {
key: String,
responder: oneshot::Sender<bool>,
},
Stats {
responder: oneshot::Sender<CacheStats>,
},
}
#[derive(Debug)]
struct CacheStats {
entries: usize,
hits: u64,
misses: u64,
}
struct CacheEntry {
value: String,
expires_at: Option<std::time::Instant>,
}
struct CacheActor {
store: HashMap<String, CacheEntry>,
hits: u64,
misses: u64,
}
impl CacheActor {
fn new() -> Self {
Self {
store: HashMap::new(),
hits: 0,
misses: 0,
}
}
fn is_expired(&self, entry: &CacheEntry) -> bool {
match entry.expires_at {
Some(t) => std::time::Instant::now() > t,
None => false,
}
}
async fn run(&mut self, mut rx: mpsc::Receiver<CacheCommand>) {
while let Some(cmd) = rx.recv().await {
match cmd {
CacheCommand::Get { key, responder } => {
let result = match self.store.get(&key) {
Some(entry) if !self.is_expired(entry) => {
self.hits += 1;
Some(entry.value.clone())
}
Some(_) => {
self.store.remove(&key);
self.misses += 1;
None
}
None => {
self.misses += 1;
None
}
};
let _ = responder.send(result);
}
CacheCommand::Set { key, value, ttl, responder } => {
let expires_at = ttl.map(|d| std::time::Instant::now() + d);
self.store.insert(key, CacheEntry { value, expires_at });
let _ = responder.send(true);
}
CacheCommand::Delete { key, responder } => {
let existed = self.store.remove(&key).is_some();
let _ = responder.send(existed);
}
CacheCommand::Stats { responder } => {
let stats = CacheStats {
entries: self.store.len(),
hits: self.hits,
misses: self.misses,
};
let _ = responder.send(stats);
}
}
}
println!("cache actor shutting down, {} entries remaining", self.store.len());
}
}
#[tokio::main]
async fn main() {
let (tx, rx) = mpsc::channel::<CacheCommand>(256);
let mut actor = CacheActor::new();
tokio::spawn(async move { actor.run(rx).await });
let (resp_tx, resp_rx) = oneshot::channel();
tx.send(CacheCommand::Set {
key: "user:1001".into(),
value: r#"{"name":"alice"}"#.into(),
ttl: Some(std::time::Duration::from_secs(300)),
responder: resp_tx,
}).await.unwrap();
resp_rx.await.unwrap();
let (resp_tx, resp_rx) = oneshot::channel();
tx.send(CacheCommand::Get {
key: "user:1001".into(),
responder: resp_tx,
}).await.unwrap();
let result = resp_rx.await.unwrap();
println!("get result: {:?}", result);
let (resp_tx, resp_rx) = oneshot::channel();
tx.send(CacheCommand::Stats { responder: resp_tx }).await.unwrap();
let stats = resp_rx.await.unwrap();
println!("cache stats: {:?}", stats);
}
Key takeaways:
mpscchannel serves as the Actor's mailbox;oneshotchannel implements request-responseenum Commanddefines the message protocol — type-safe and extensible- The Actor is a single owner, naturally free of data races
- No external framework dependency — Tokio alone builds a complete Actor system
ttlexpiration demonstrates the flexibility of Actor internal state management
Channel Actor vs Actix Comparison:
| Feature | Channel Actor | Actix |
|---|---|---|
| Dependencies | tokio only | actix + tokio |
| Message types | Manual enum | derive Message |
| Response mechanism | oneshot channel | Message rtype |
| Lifecycle | Manual management | Automatic callbacks |
| Supervision | Manual implementation | Built-in support |
| Performance overhead | Minimal | Low |
| Learning curve | Gentle | Moderate |
Pattern 3: Supervision Trees — Fault Recovery Pattern
Supervision trees are one of the Actor model's most powerful features: parent Actors monitor child Actors and decide how to recover when children crash.
use tokio::sync::{mpsc, oneshot};
use tokio::time::{sleep, Duration};
use std::collections::HashMap;
#[derive(Debug, Clone)]
enum RestartStrategy {
Permanent,
Transient,
Temporary,
}
#[derive(Debug)]
enum WorkerMessage {
Process { data: String, responder: oneshot::Sender<String> },
Crash,
Status { responder: oneshot::Sender<WorkerStatus> },
}
#[derive(Debug, Clone)]
struct WorkerStatus {
id: String,
processed: u64,
alive: bool,
}
struct Worker {
id: String,
processed: u64,
}
impl Worker {
fn new(id: String) -> Self {
Self { id, processed: 0 }
}
async fn run(
&mut self,
mut rx: mpsc::Receiver<WorkerMessage>,
supervisor_tx: mpsc::Sender<SupervisorMessage>,
) {
println!("worker {} started", self.id);
while let Some(msg) = rx.recv().await {
match msg {
WorkerMessage::Process { data, responder } => {
self.processed += 1;
let result = format!("processed by {}: {}", self.id, data.to_uppercase());
let _ = responder.send(result);
}
WorkerMessage::Crash => {
println!("worker {} crashing!", self.id);
let _ = supervisor_tx.send(SupervisorMessage::ChildCrashed {
id: self.id.clone(),
}).await;
return;
}
WorkerMessage::Status { responder } => {
let _ = responder.send(WorkerStatus {
id: self.id.clone(),
processed: self.processed,
alive: true,
});
}
}
}
let _ = supervisor_tx.send(SupervisorMessage::ChildStopped {
id: self.id.clone(),
}).await;
}
}
enum SupervisorMessage {
ChildCrashed { id: String },
ChildStopped { id: String },
SpawnWorker { id: String },
GetStatus { responder: oneshot::Sender<Vec<WorkerStatus>> },
}
struct ChildHandle {
tx: mpsc::Sender<WorkerMessage>,
strategy: RestartStrategy,
}
struct Supervisor {
children: HashMap<String, ChildHandle>,
supervisor_rx: mpsc::Receiver<SupervisorMessage>,
supervisor_tx: mpsc::Sender<SupervisorMessage>,
restart_count: HashMap<String, u32>,
max_restarts: u32,
}
impl Supervisor {
fn new(rx: mpsc::Receiver<SupervisorMessage>, tx: mpsc::Sender<SupervisorMessage>, max_restarts: u32) -> Self {
Self {
children: HashMap::new(),
supervisor_rx: rx,
supervisor_tx: tx,
restart_count: HashMap::new(),
max_restarts,
}
}
async fn spawn_child(&mut self, id: String, strategy: RestartStrategy) {
let (tx, rx) = mpsc::channel::<WorkerMessage>(64);
let supervisor_tx = self.supervisor_tx.clone();
let worker_id = id.clone();
let mut worker = Worker::new(id.clone());
tokio::spawn(async move {
worker.run(rx, supervisor_tx).await;
});
self.children.insert(id, ChildHandle { tx, strategy });
println!("supervisor spawned child {}", worker_id);
}
async fn restart_child(&mut self, id: &str) -> bool {
let restarts = self.restart_count.entry(id.to_string()).or_insert(0);
if *restarts >= self.max_restarts {
println!("supervisor: child {} exceeded max restarts ({})", id, self.max_restarts);
return false;
}
*restarts += 1;
println!("supervisor: restarting child {} (attempt {})", id, *restarts);
let strategy = self.children.get(id).map(|h| h.strategy.clone());
if let Some(strategy) = strategy {
self.children.remove(id);
self.spawn_child(id.to_string(), strategy).await;
true
} else {
false
}
}
async fn run(&mut self) {
while let Some(msg) = self.supervisor_rx.recv().await {
match msg {
SupervisorMessage::ChildCrashed { id } => {
let strategy = self.children.get(&id).map(|h| h.strategy.clone());
match strategy {
Some(RestartStrategy::Permanent) => {
self.restart_child(&id).await;
}
Some(RestartStrategy::Transient) => {
self.restart_child(&id).await;
}
Some(RestartStrategy::Temporary) => {
println!("supervisor: child {} crashed, not restarting (temporary)", id);
self.children.remove(&id);
}
None => {}
}
}
SupervisorMessage::ChildStopped { id } => {
let strategy = self.children.get(&id).map(|h| h.strategy.clone());
match strategy {
Some(RestartStrategy::Permanent) => {
self.restart_child(&id).await;
}
Some(RestartStrategy::Transient) | Some(RestartStrategy::Temporary) => {
self.children.remove(&id);
}
None => {}
}
}
SupervisorMessage::SpawnWorker { id } => {
self.spawn_child(id, RestartStrategy::Permanent).await;
}
SupervisorMessage::GetStatus { responder } => {
let mut statuses = Vec::new();
for (id, handle) in &self.children {
let (resp_tx, resp_rx) = oneshot::channel();
if handle.tx.send(WorkerMessage::Status { responder: resp_tx }).await.is_ok() {
if let Ok(status) = resp_rx.await {
statuses.push(status);
}
}
}
let _ = responder.send(statuses);
}
}
}
}
}
#[tokio::main]
async fn main() {
let (sup_tx, sup_rx) = mpsc::channel::<SupervisorMessage>(256);
let mut supervisor = Supervisor::new(sup_rx, sup_tx.clone(), 3);
tokio::spawn(async move { supervisor.run().await });
sup_tx.send(SupervisorMessage::SpawnWorker { id: "worker-1".into() }).await.unwrap();
sup_tx.send(SupervisorMessage::SpawnWorker { id: "worker-2".into() }).await.unwrap();
sleep(Duration::from_millis(50)).await;
let (resp_tx, resp_rx) = oneshot::channel();
sup_tx.send(SupervisorMessage::GetStatus { responder: resp_tx }).await.unwrap();
let statuses = resp_rx.await.unwrap();
for s in statuses {
println!("status: {:?}", s);
}
}
Key takeaways:
- The Supervisor holds child Actor
Senderhandles and receives crash notifications viampsc - Three restart strategies:
Permanent(always restart),Transient(restart on error),Temporary(never restart) max_restartsprevents infinite restart loops- Child Actors proactively notify the supervisor on crash; the supervisor decides whether to restart
- Supervision trees can be nested: a supervisor itself can be a supervised child
Restart Strategy Comparison:
| Strategy | Normal Exit | Abnormal Exit | Typical Use Case |
|---|---|---|---|
| Permanent | Restart | Restart | Always-on services, scheduled tasks |
| Transient | No restart | Restart | Request handlers, temporary computation |
| Temporary | No restart | No restart | One-off tasks, debugging |
Pattern 4: Distributed Actors — Remote Messaging Pattern
When Actors cross process boundaries, messages must be serialized and transmitted over the network. This pattern shows how to build a remotely communicable Actor system.
use tokio::sync::{mpsc, oneshot};
use tokio::net::{TcpListener, TcpStream};
use serde::{Serialize, Deserialize};
use std::collections::HashMap;
use std::net::SocketAddr;
#[derive(Debug, Serialize, Deserialize)]
struct RemoteEnvelope {
target: String,
message_type: String,
payload: Vec<u8>,
reply_to: Option<u64>,
}
#[derive(Debug, Serialize, Deserialize)]
enum OrderCommand {
Create { order_id: String, item: String, quantity: u32 },
Cancel { order_id: String },
Query { order_id: String },
}
#[derive(Debug, Serialize, Deserialize)]
enum OrderResponse {
Created { order_id: String },
Cancelled { order_id: String },
Found { order_id: String, item: String, quantity: u32 },
NotFound { order_id: String },
Error { message: String },
}
#[derive(Debug, Serialize, Deserialize, Clone)]
struct Order {
id: String,
item: String,
quantity: u32,
status: String,
}
struct OrderActor {
orders: HashMap<String, Order>,
cmd_rx: mpsc::Receiver<(OrderCommand, oneshot::Sender<OrderResponse>)>,
}
impl OrderActor {
fn new(rx: mpsc::Receiver<(OrderCommand, oneshot::Sender<OrderResponse>)>) -> Self {
Self { orders: HashMap::new(), cmd_rx: rx }
}
async fn run(&mut self) {
while let Some((cmd, responder)) = self.cmd_rx.recv().await {
let response = match cmd {
OrderCommand::Create { order_id, item, quantity } => {
if self.orders.contains_key(&order_id) {
OrderResponse::Error { message: format!("order {} already exists", order_id) }
} else {
self.orders.insert(order_id.clone(), Order {
id: order_id.clone(),
item,
quantity,
status: "created".into(),
});
OrderResponse::Created { order_id }
}
}
OrderCommand::Cancel { order_id } => {
match self.orders.get_mut(&order_id) {
Some(order) => {
order.status = "cancelled".into();
OrderResponse::Cancelled { order_id }
}
None => OrderResponse::NotFound { order_id },
}
}
OrderCommand::Query { order_id } => {
match self.orders.get(&order_id) {
Some(order) => OrderResponse::Found {
order_id: order.id.clone(),
item: order.item.clone(),
quantity: order.quantity,
},
None => OrderResponse::NotFound { order_id },
}
}
};
let _ = responder.send(response);
}
}
}
struct RemoteGateway {
local_actors: HashMap<String, mpsc::Sender<(OrderCommand, oneshot::Sender<OrderResponse>)>>,
remote_nodes: HashMap<String, SocketAddr>,
next_request_id: u64,
}
impl RemoteGateway {
fn new() -> Self {
Self {
local_actors: HashMap::new(),
remote_nodes: HashMap::new(),
next_request_id: 0,
}
}
fn register_local(&mut self, name: String, tx: mpsc::Sender<(OrderCommand, oneshot::Sender<OrderResponse>)>) {
self.local_actors.insert(name, tx);
}
fn register_remote(&mut self, name: String, addr: SocketAddr) {
self.remote_nodes.insert(name, addr);
}
async fn send_command(&mut self, target: &str, cmd: OrderCommand) -> Option<OrderResponse> {
if let Some(tx) = self.local_actors.get(target) {
let (resp_tx, resp_rx) = oneshot::channel();
if tx.send((cmd, resp_tx)).await.is_ok() {
return resp_rx.await.ok();
}
return None;
}
if let Some(addr) = self.remote_nodes.get(target) {
match TcpStream::connect(addr).await {
Ok(mut stream) => {
let payload = bincode::serialize(&cmd).ok()?;
let request_id = self.next_request_id;
self.next_request_id += 1;
let envelope = RemoteEnvelope {
target: target.to_string(),
message_type: "OrderCommand".to_string(),
payload,
reply_to: Some(request_id),
};
let data = bincode::serialize(&envelope).ok()?;
let len = data.len() as u32;
use tokio::io::AsyncWriteExt;
if stream.write_all(&len.to_le_bytes()).await.is_err() { return None; }
if stream.write_all(&data).await.is_err() { return None; }
use tokio::io::AsyncReadExt;
let mut len_buf = [0u8; 4];
if stream.read_exact(&mut len_buf).await.is_err() { return None; }
let resp_len = u32::from_le_bytes(len_buf) as usize;
let mut resp_buf = vec![0u8; resp_len];
if stream.read_exact(&mut resp_buf).await.is_err() { return None; }
bincode::deserialize(&resp_buf).ok()
}
Err(e) => {
eprintln!("failed to connect to {}: {}", target, e);
None
}
}
} else {
None
}
}
}
async fn server_node(addr: SocketAddr) {
let (tx, rx) = mpsc::channel::<(OrderCommand, oneshot::Sender<OrderResponse>)>(256);
let mut actor = OrderActor::new(rx);
tokio::spawn(async move { actor.run().await });
let listener = TcpListener::bind(addr).await.unwrap();
println!("order actor listening on {}", addr);
loop {
if let Ok((mut stream, _)) = listener.accept().await {
let tx = tx.clone();
tokio::spawn(async move {
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
let mut len_buf = [0u8; 4];
if stream.read_exact(&mut len_buf).await.is_err() { return; }
let len = u32::from_le_bytes(len_buf) as usize;
let mut buf = vec![0u8; len];
if stream.read_exact(&mut buf).await.is_err() { return; }
let envelope: RemoteEnvelope = match bincode::deserialize(&buf) {
Ok(e) => e,
Err(_) => return,
};
let cmd: OrderCommand = match bincode::deserialize(&envelope.payload) {
Ok(c) => c,
Err(_) => return,
};
let (resp_tx, resp_rx) = oneshot::channel();
if tx.send((cmd, resp_tx)).await.is_err() { return; }
let response = match resp_rx.await {
Ok(r) => r,
Err(_) => return,
};
let resp_data = bincode::serialize(&response).unwrap();
let resp_len = resp_data.len() as u32;
let _ = stream.write_all(&resp_len.to_le_bytes()).await;
let _ = stream.write_all(&resp_data).await;
});
}
}
}
#[tokio::main]
async fn main() {
let server_addr: SocketAddr = "127.0.0.1:9001".parse().unwrap();
tokio::spawn(server_node(server_addr));
tokio::time::sleep(Duration::from_millis(100)).await;
let mut gateway = RemoteGateway::new();
gateway.register_remote("order-service".into(), server_addr);
let response = gateway.send_command("order-service", OrderCommand::Create {
order_id: "ORD-001".into(),
item: "Rust Book".into(),
quantity: 2,
}).await;
println!("create response: {:?}", response);
let response = gateway.send_command("order-service", OrderCommand::Query {
order_id: "ORD-001".into(),
}).await;
println!("query response: {:?}", response);
}
Key takeaways:
RemoteEnvelopewraps message metadata: target Actor, message type, payload, reply address- Local Actors communicate directly via channels; remote Actors use serialized messages over TCP
RemoteGatewayunifies local/remote routing — business code is location-transparentbincodeprovides efficient binary serialization, better suited for Actor communication than JSON- Length-prefixed framing (4-byte length + data) solves TCP's stream-oriented nature
Pattern 5: Production Actor System — Graceful Shutdown and Backpressure
Production Actor systems must handle graceful shutdown, backpressure control, message timeouts, and other complex scenarios.
use tokio::sync::{mpsc, oneshot, watch, broadcast};
use tokio::time::{timeout, Duration, Instant};
use tokio::select;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
struct ActorSystem {
shutdown_tx: watch::Sender<bool>,
shutdown_rx: watch::Receiver<bool>,
event_tx: broadcast::Sender<String>,
metrics: Arc<ActorMetrics>,
}
struct ActorMetrics {
messages_sent: AtomicU64,
messages_processed: AtomicU64,
messages_dropped: AtomicU64,
actors_alive: AtomicU64,
}
impl ActorMetrics {
fn new() -> Self {
Self {
messages_sent: AtomicU64::new(0),
messages_processed: AtomicU64::new(0),
messages_dropped: AtomicU64::new(0),
actors_alive: AtomicU64::new(0),
}
}
fn snapshot(&self) -> String {
format!(
"sent: {}, processed: {}, dropped: {}, alive: {}",
self.messages_sent.load(Ordering::Relaxed),
self.messages_processed.load(Ordering::Relaxed),
self.messages_dropped.load(Ordering::Relaxed),
self.actors_alive.load(Ordering::Relaxed),
)
}
}
enum WorkerCmd {
Execute { task: String, responder: oneshot::Sender<String> },
Ping { responder: oneshot::Sender<bool> },
}
struct ResilientWorker {
id: String,
cmd_rx: mpsc::Receiver<WorkerCmd>,
shutdown_rx: watch::Receiver<bool>,
event_tx: broadcast::Sender<String>,
metrics: Arc<ActorMetrics>,
mailbox_capacity: usize,
process_timeout: Duration,
}
impl ResilientWorker {
async fn run(&mut self) {
self.metrics.actors_alive.fetch_add(1, Ordering::Relaxed);
println!("worker {} started", self.id);
loop {
select! {
_ = self.shutdown_rx.changed() => {
if *self.shutdown_rx.borrow() {
self.drain_mailbox().await;
break;
}
}
cmd = self.cmd_rx.recv() => {
match cmd {
Some(cmd) => self.handle_command(cmd).await,
None => break,
}
}
}
}
self.metrics.actors_alive.fetch_sub(1, Ordering::Relaxed);
println!("worker {} stopped", self.id);
}
async fn handle_command(&mut self, cmd: WorkerCmd) {
match cmd {
WorkerCmd::Execute { task, responder } => {
self.metrics.messages_sent.fetch_add(1, Ordering::Relaxed);
let result = match timeout(self.process_timeout, self.process_task(&task)).await {
Ok(output) => {
self.metrics.messages_processed.fetch_add(1, Ordering::Relaxed);
output
}
Err(_) => {
self.metrics.messages_dropped.fetch_add(1, Ordering::Relaxed);
let _ = self.event_tx.send(format!("worker {} timeout on: {}", self.id, task));
"timeout".to_string()
}
};
let _ = responder.send(result);
}
WorkerCmd::Ping { responder } => {
let _ = responder.send(true);
}
}
}
async fn process_task(&self, task: &str) -> String {
tokio::time::sleep(Duration::from_millis(10)).await;
format!("worker {} processed: {}", self.id, task.to_uppercase())
}
async fn drain_mailbox(&mut self) {
println!("worker {} draining mailbox...", self.id);
let drain_timeout = Duration::from_secs(5);
let deadline = Instant::now() + drain_timeout;
loop {
if Instant::now() > deadline {
println!("worker {} drain timeout, {} messages remaining", self.id, self.cmd_rx.len());
break;
}
match timeout(Duration::from_millis(100), self.cmd_rx.recv()).await {
Ok(Some(WorkerCmd::Execute { task, responder })) => {
let result = self.process_task(&task).await;
self.metrics.messages_processed.fetch_add(1, Ordering::Relaxed);
let _ = responder.send(result);
}
Ok(Some(WorkerCmd::Ping { responder })) => {
let _ = responder.send(false);
}
Ok(None) => break,
Err(_) => continue,
}
}
println!("worker {} mailbox drained", self.id);
}
}
impl ActorSystem {
fn new() -> Self {
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let (event_tx, _) = broadcast::channel::<String>(256);
let metrics = Arc::new(ActorMetrics::new());
Self { shutdown_tx, shutdown_rx, event_tx, metrics }
}
async fn spawn_worker(&self, id: String, mailbox_capacity: usize) -> mpsc::Sender<WorkerCmd> {
let (tx, rx) = mpsc::channel::<WorkerCmd>(mailbox_capacity);
let mut worker = ResilientWorker {
id,
cmd_rx: rx,
shutdown_rx: self.shutdown_rx.clone(),
event_tx: self.event_tx.clone(),
metrics: self.metrics.clone(),
mailbox_capacity,
process_timeout: Duration::from_secs(2),
};
tokio::spawn(async move { worker.run().await });
tx
}
async fn graceful_shutdown(&self) {
println!("initiating graceful shutdown...");
self.shutdown_tx.send(true).ok();
tokio::time::sleep(Duration::from_secs(6)).await;
println!("final metrics: {}", self.metrics.snapshot());
}
}
async fn backpressure_sender(
tx: mpsc::Sender<WorkerCmd>,
worker_id: String,
total: u64,
) -> u64 {
let mut sent = 0u64;
let mut dropped = 0u64;
for i in 0..total {
let (resp_tx, resp_rx) = oneshot::channel();
let task = format!("task-{}", i);
match timeout(Duration::from_millis(50), tx.send(WorkerCmd::Execute {
task,
responder: resp_tx,
})).await {
Ok(Ok(())) => {
match timeout(Duration::from_secs(3), resp_rx).await {
Ok(Ok(result)) => {
sent += 1;
if i % 100 == 0 {
println!("{} result: {}", worker_id, result);
}
}
_ => dropped += 1,
}
}
_ => dropped += 1,
}
}
println!("{} sent: {}, dropped: {}", worker_id, sent, dropped);
dropped
}
#[tokio::main]
async fn main() {
let system = ActorSystem::new();
let worker1 = system.spawn_worker("worker-1".into(), 64).await;
let worker2 = system.spawn_worker("worker-2".into(), 64).await;
let h1 = tokio::spawn(backpressure_sender(worker1, "sender-1".into(), 200));
let h2 = tokio::spawn(backpressure_sender(worker2, "sender-2".into(), 200));
h1.await.unwrap();
h2.await.unwrap();
println!("metrics: {}", system.metrics.snapshot());
system.graceful_shutdown().await;
}
Key takeaways:
watch::channel(false)broadcasts the shutdown signal; all Actors receive it simultaneouslydrain_mailbox()processes remaining messages before shutdown, with a timeout to prevent infinite waitingtimeoutwrapssendandrecvto prevent backpressure from causing indefinite blockingActorMetricsusesAtomicU64for lock-free, thread-safe metrics collection- Bounded mailbox (capacity 64) provides natural backpressure — producers wait when full
Five Common Pitfalls
Pitfall 1: Circular Wait Between Actors
// ❌ Wrong: Actor A waits for Actor B's response, Actor B waits for Actor A's response
async fn deadlock_pattern() {
let (tx_a, mut rx_a) = mpsc::channel::<String>(16);
let (tx_b, mut rx_b) = mpsc::channel::<String>(16);
tokio::spawn(async move {
tx_b.send("request-from-a".into()).await.unwrap();
let response = rx_a.recv().await.unwrap();
});
tokio::spawn(async move {
tx_a.send("request-from-b".into()).await.unwrap();
let response = rx_b.recv().await.unwrap();
});
}
// ✅ Correct: Use timeouts or one-way messages to break cycles
async fn safe_pattern() {
let (tx_a, mut rx_a) = mpsc::channel::<String>(16);
let (tx_b, mut rx_b) = mpsc::channel::<String>(16);
tokio::spawn(async move {
tx_b.send("request-from-a".into()).await.unwrap();
match timeout(Duration::from_secs(5), rx_a.recv()).await {
Ok(Some(response)) => println!("got: {}", response),
_ => println!("timeout or channel closed"),
}
});
}
Pitfall 2: Forgetting to Handle Full Mailboxes
// ❌ Wrong: Unbounded channel, memory may explode
async fn unbounded_risk() {
let (tx, mut rx) = mpsc::unbounded_channel::<String>();
for i in 0..1000000 {
tx.send(format!("msg-{}", i)).unwrap();
}
}
// ✅ Correct: Bounded channel + backpressure
async fn bounded_safe() {
let (tx, mut rx) = mpsc::channel::<String>(256);
for i in 0..1000 {
match tx.send(format!("msg-{}", i)).await {
Ok(()) => {}
Err(_) => {
println!("receiver dropped at {}", i);
return;
}
}
}
}
Pitfall 3: Supervisor Without Restart Limits
// ❌ Wrong: Infinite restarts, may form crash-restart loops
async fn infinite_restart_loop() {
loop {
let (tx, rx) = mpsc::channel::<String>(16);
let handle = tokio::spawn(async move {
panic!("always crash!");
});
let _ = handle.await;
println!("restarting...");
}
}
// ✅ Correct: Limit restart frequency and count
struct RestartPolicy {
max_restarts: u32,
window: Duration,
restarts: Vec<Instant>,
}
impl RestartPolicy {
fn new(max_restarts: u32, window: Duration) -> Self {
Self { max_restarts, window, restarts: Vec::new() }
}
fn should_restart(&mut self) -> bool {
let now = Instant::now();
self.restarts.retain(|t| now - *t < self.window);
if self.restarts.len() < self.max_restarts as usize {
self.restarts.push(now);
true
} else {
false
}
}
}
Pitfall 4: Remote Messages Without Timeouts
// ❌ Wrong: Remote call may block forever
async fn remote_without_timeout() {
let response = gateway.send_command("remote-actor", cmd).await;
}
// ✅ Correct: All remote calls must have timeouts
async fn remote_with_timeout() {
match timeout(Duration::from_secs(5), gateway.send_command("remote-actor", cmd)).await {
Ok(Some(response)) => println!("got: {:?}", response),
Ok(None) => println!("actor not found"),
Err(_) => println!("remote call timeout"),
}
}
Pitfall 5: Dropping Unprocessed Messages During Shutdown
// ❌ Wrong: Drop receiver directly, messages lost
async fn bad_shutdown(mut rx: mpsc::Receiver<String>) {
drop(rx);
}
// ✅ Correct: Drain before closing
async fn good_shutdown(mut rx: mpsc::Receiver<String>) {
println!("draining remaining messages...");
while let Ok(Some(msg)) = timeout(Duration::from_millis(100), rx.recv()).await {
println!("drained: {}", msg);
}
println!("all messages processed");
}
Error Troubleshooting Quick Reference
| Symptom | Possible Cause | Solution |
|---|---|---|
| Actor not responding | Mailbox full or deadlock | Check mailbox capacity, add timeout mechanisms |
Frequent SendError |
Receiver already closed | Check if Actor exited unexpectedly, add supervision |
| Continuous memory growth | Unbounded channel or message backlog | Switch to bounded channels, add backpressure |
| Restart loop | Actor initialization failure | Check startup logic, limit restart count |
| Remote call timeout | Network latency or unresponsive peer | Add timeout and retry mechanisms |
| Message ordering issues | Multiple producers sending concurrently | Use single producer for ordering, or add sequence numbers |
| Graceful shutdown stuck | An Actor not responding to shutdown signal | Set timeout for drain operations |
oneshot RecvError |
Responder exited without sending | Check all code paths send responses |
Advanced Optimization Techniques
1. Actor Pooling — Load Balancing
When a single Actor becomes a bottleneck, use an Actor pool to distribute messages:
use tokio::sync::mpsc;
struct ActorPool {
workers: Vec<mpsc::Sender<WorkerCmd>>,
next: std::sync::atomic::AtomicUsize,
}
impl ActorPool {
fn new(workers: Vec<mpsc::Sender<WorkerCmd>>) -> Self {
Self {
workers,
next: std::sync::atomic::AtomicUsize::new(0),
}
}
fn next_worker(&self) -> &mpsc::Sender<WorkerCmd> {
let idx = self.next.fetch_add(1, Ordering::Relaxed) % self.workers.len();
&self.workers[idx]
}
async fn send(&self, cmd: WorkerCmd) -> bool {
let worker = self.next_worker();
worker.send(cmd).await.is_ok()
}
}
2. Message Priority
use tokio::sync::mpsc;
use tokio::select;
enum Priority {
High,
Normal,
Low,
}
struct PriorityMailbox {
high_rx: mpsc::Receiver<String>,
normal_rx: mpsc::Receiver<String>,
low_rx: mpsc::Receiver<String>,
}
impl PriorityMailbox {
async fn recv(&mut self) -> Option<String> {
select! {
biased;
msg = self.high_rx.recv() => msg,
msg = self.normal_rx.recv() => msg,
msg = self.low_rx.recv() => msg,
}
}
}
3. Message Tracing with Trace IDs
use std::sync::atomic::{AtomicU64, Ordering};
static TRACE_ID: AtomicU64 = AtomicU64::new(1);
fn new_trace_id() -> u64 {
TRACE_ID.fetch_add(1, Ordering::Relaxed)
}
#[derive(Debug, Clone)]
struct TracedMessage {
trace_id: u64,
span_id: u64,
parent_span: Option<u64>,
payload: String,
}
impl TracedMessage {
fn new(payload: String) -> Self {
Self {
trace_id: new_trace_id(),
span_id: new_trace_id(),
parent_span: None,
payload,
}
}
fn child(&self, payload: String) -> Self {
Self {
trace_id: self.trace_id,
span_id: new_trace_id(),
parent_span: Some(self.span_id),
payload,
}
}
}
Actor Framework Comparison
| Feature | Actix | Tokio Channel | Riker | Coerce |
|---|---|---|---|---|
| Message passing | Typed Message | Manual enum | Typed | Typed |
| Supervision tree | Built-in | Manual implementation | Built-in | Built-in |
| Remote Actors | actix-remote | Manual implementation | Supported | Built-in |
| Performance | High | Very high | Medium | High |
| Dependencies | actix | tokio only | riker | coerce |
| Learning curve | Moderate | Gentle | Moderate | Moderate |
| Ecosystem maturity | High | High | Low | Low |
| Use case | Web services | General concurrency | Traditional Actor | Distributed |
Summary
The core philosophy of the Rust Actor model: Don't communicate by sharing memory; share memory by communicating. Actix provides a mature typed Actor system, while Tokio Channel Actors are lighter and more flexible. Supervision trees are the cornerstone of production Actor systems — an Actor system without supervision is like a car without seatbelts. In distributed scenarios, unifying local/remote communication interfaces is key. Always use bounded mailboxes for backpressure, set timeouts for all remote calls, and use drain mechanisms for graceful shutdown. Your Actor system will be both robust and efficient.
Recommended Tools
- JSON Formatter — Format Actor message JSON data
- Hash Calculator — Compute message digests for deduplication and verification
- Curl to Code — Convert remote Actor HTTP calls to code
Try these browser-local tools — no sign-up required →