Rust Actor Model Framework: 5 Production Patterns from Message Passing to Supervision

编程语言

When Concurrency Meets Rust's Ownership

Ever run into this — your system needs to handle thousands of independent stateful entities simultaneously, each with its own lifecycle and message queue. You tried Arc<Mutex<HashMap>> for shared state, only to watch lock contention tank your throughput; you tried thread pools for task dispatch, but found inter-thread communication overhead exceeding the computation itself; eventually you realize, the Actor model is the answer.

The Actor model's core idea is simple: each Actor is an independent computation unit with private state, communicating only through message passing. No shared state means no lock contention; no lock contention means no deadlocks. Rust's ownership system is a natural fit — message ownership transfers on send, and the compiler guarantees thread safety.


Core Concepts at a Glance

Concept Description Typical Use Case
Actor Independent computation unit encapsulating state and behavior State machines, connection management
Message The sole communication mechanism between Actors Commands, events, queries
Mailbox Actor's message queue Message buffering, ordered processing
Supervision Parent Actor monitors child Actors Fault recovery, restart strategies
Address Actor's reference handle Message addressing, routing
Context Actor's runtime environment Timers, child Actor management
Backpressure Flow control mechanism Preventing Mailbox overflow

Five Challenges of the Actor Model

1. The Lock Hell of Shared State

Traditional concurrency models rely on shared memory + locks, but under high concurrency, lock contention becomes the bottleneck. The Actor model solves this by "not sharing" — each Actor exclusively owns its state.

2. Fault Isolation and Recovery

One component crashing shouldn't bring down the entire system. The Actor model achieves fault isolation through supervision trees — when a child Actor crashes, the parent decides how to handle it (restart, stop, escalate, etc.).

3. Message Ordering Guarantees

Messages within a single Actor are naturally ordered, but how do you guarantee ordering across Actors? This requires careful design of message protocols and routing strategies.

4. Distribution Transparency

Local and remote Actors should communicate the same way. How do you enable cross-node Actor communication without changing business logic?

5. The Complexity of Graceful Shutdown

An Actor system may have hundreds or thousands of Actors. How do you ensure all messages are processed and all resources released during shutdown?


Five Production Actor Patterns

Pattern 1: Actix Actor System — Classic Actor Pattern

Actix is the most mature Actor framework in the Rust ecosystem, providing typed message handling and complete Actor lifecycle management.

use actix::prelude::*;
use std::time::Duration;

struct GameSession {
    id: String,
    score: u64,
    level: u32,
}

impl Actor for GameSession {
    type Context = Context<Self>;

    fn started(&mut self, _ctx: &mut Self::Context) {
        println!("session {} started, score: {}", self.id, self.score);
    }

    fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
        println!("session {} stopping", self.id);
        Running::Continue
    }

    fn stopped(&mut self, _ctx: &mut Self::Context) {
        println!("session {} stopped, final score: {}", self.id, self.score);
    }
}

#[derive(Message)]
#[rtype(result = "u64")]
struct GetScore;

#[derive(Message)]
#[rtype(result = "()")]
struct AddScore { amount: u64 }

#[derive(Message)]
#[rtype(result = "()")]
struct LevelUp;

impl Handler<GetScore> for GameSession {
    type Result = u64;

    fn handle(&mut self, _msg: GetScore, _ctx: &mut Context<Self>) -> u64 {
        self.score
    }
}

impl Handler<AddScore> for GameSession {
    type Result = ();

    fn handle(&mut self, msg: AddScore, _ctx: &mut Context<Self>) {
        self.score += msg.amount;
        println!("session {} score: {}", self.id, self.score);
    }
}

impl Handler<LevelUp> for GameSession {
    type Result = ();

    fn handle(&mut self, _msg: LevelUp, ctx: &mut Context<Self>) {
        self.level += 1;
        println!("session {} leveled up to {}", self.id, self.level);

        ctx.notify_later(LevelUp, Duration::from_secs(30));
    }
}

#[actix::main]
async fn main() {
    let addr = GameSession {
        id: "player-001".into(),
        score: 0,
        level: 1,
    }.start();

    addr.send(AddScore { amount: 100 }).await.unwrap();
    addr.send(AddScore { amount: 50 }).await.unwrap();
    addr.send(LevelUp).await.unwrap();

    let score = addr.send(GetScore).await.unwrap();
    println!("final score: {}", score);
}

Key takeaways:

  • impl Actor defines the Actor lifecycle: started, stopping, stopped
  • #[derive(Message)] defines typed messages; rtype specifies the return type
  • Handler<T> trait implements handling logic for each message type
  • ctx.notify_later enables scheduled self-messages for periodic tasks
  • addr.send() sends messages asynchronously and awaits responses

Pattern 2: Tokio Channel-Based Actor — Lightweight Actor Pattern

Build Actors from scratch using Tokio channels — lighter and more flexible, ideal for scenarios demanding maximum performance and control.

use tokio::sync::{mpsc, oneshot};
use std::collections::HashMap;

enum CacheCommand {
    Get {
        key: String,
        responder: oneshot::Sender<Option<String>>,
    },
    Set {
        key: String,
        value: String,
        ttl: Option<std::time::Duration>,
        responder: oneshot::Sender<bool>,
    },
    Delete {
        key: String,
        responder: oneshot::Sender<bool>,
    },
    Stats {
        responder: oneshot::Sender<CacheStats>,
    },
}

#[derive(Debug)]
struct CacheStats {
    entries: usize,
    hits: u64,
    misses: u64,
}

struct CacheEntry {
    value: String,
    expires_at: Option<std::time::Instant>,
}

struct CacheActor {
    store: HashMap<String, CacheEntry>,
    hits: u64,
    misses: u64,
}

impl CacheActor {
    fn new() -> Self {
        Self {
            store: HashMap::new(),
            hits: 0,
            misses: 0,
        }
    }

    fn is_expired(&self, entry: &CacheEntry) -> bool {
        match entry.expires_at {
            Some(t) => std::time::Instant::now() > t,
            None => false,
        }
    }

    async fn run(&mut self, mut rx: mpsc::Receiver<CacheCommand>) {
        while let Some(cmd) = rx.recv().await {
            match cmd {
                CacheCommand::Get { key, responder } => {
                    let result = match self.store.get(&key) {
                        Some(entry) if !self.is_expired(entry) => {
                            self.hits += 1;
                            Some(entry.value.clone())
                        }
                        Some(_) => {
                            self.store.remove(&key);
                            self.misses += 1;
                            None
                        }
                        None => {
                            self.misses += 1;
                            None
                        }
                    };
                    let _ = responder.send(result);
                }
                CacheCommand::Set { key, value, ttl, responder } => {
                    let expires_at = ttl.map(|d| std::time::Instant::now() + d);
                    self.store.insert(key, CacheEntry { value, expires_at });
                    let _ = responder.send(true);
                }
                CacheCommand::Delete { key, responder } => {
                    let existed = self.store.remove(&key).is_some();
                    let _ = responder.send(existed);
                }
                CacheCommand::Stats { responder } => {
                    let stats = CacheStats {
                        entries: self.store.len(),
                        hits: self.hits,
                        misses: self.misses,
                    };
                    let _ = responder.send(stats);
                }
            }
        }
        println!("cache actor shutting down, {} entries remaining", self.store.len());
    }
}

#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::channel::<CacheCommand>(256);
    let mut actor = CacheActor::new();
    tokio::spawn(async move { actor.run(rx).await });

    let (resp_tx, resp_rx) = oneshot::channel();
    tx.send(CacheCommand::Set {
        key: "user:1001".into(),
        value: r#"{"name":"alice"}"#.into(),
        ttl: Some(std::time::Duration::from_secs(300)),
        responder: resp_tx,
    }).await.unwrap();
    resp_rx.await.unwrap();

    let (resp_tx, resp_rx) = oneshot::channel();
    tx.send(CacheCommand::Get {
        key: "user:1001".into(),
        responder: resp_tx,
    }).await.unwrap();
    let result = resp_rx.await.unwrap();
    println!("get result: {:?}", result);

    let (resp_tx, resp_rx) = oneshot::channel();
    tx.send(CacheCommand::Stats { responder: resp_tx }).await.unwrap();
    let stats = resp_rx.await.unwrap();
    println!("cache stats: {:?}", stats);
}

Key takeaways:

  • mpsc channel serves as the Actor's mailbox; oneshot channel implements request-response
  • enum Command defines the message protocol — type-safe and extensible
  • The Actor is a single owner, naturally free of data races
  • No external framework dependency — Tokio alone builds a complete Actor system
  • ttl expiration demonstrates the flexibility of Actor internal state management

Channel Actor vs Actix Comparison:

Feature Channel Actor Actix
Dependencies tokio only actix + tokio
Message types Manual enum derive Message
Response mechanism oneshot channel Message rtype
Lifecycle Manual management Automatic callbacks
Supervision Manual implementation Built-in support
Performance overhead Minimal Low
Learning curve Gentle Moderate

Pattern 3: Supervision Trees — Fault Recovery Pattern

Supervision trees are one of the Actor model's most powerful features: parent Actors monitor child Actors and decide how to recover when children crash.

use tokio::sync::{mpsc, oneshot};
use tokio::time::{sleep, Duration};
use std::collections::HashMap;

#[derive(Debug, Clone)]
enum RestartStrategy {
    Permanent,
    Transient,
    Temporary,
}

#[derive(Debug)]
enum WorkerMessage {
    Process { data: String, responder: oneshot::Sender<String> },
    Crash,
    Status { responder: oneshot::Sender<WorkerStatus> },
}

#[derive(Debug, Clone)]
struct WorkerStatus {
    id: String,
    processed: u64,
    alive: bool,
}

struct Worker {
    id: String,
    processed: u64,
}

impl Worker {
    fn new(id: String) -> Self {
        Self { id, processed: 0 }
    }

    async fn run(
        &mut self,
        mut rx: mpsc::Receiver<WorkerMessage>,
        supervisor_tx: mpsc::Sender<SupervisorMessage>,
    ) {
        println!("worker {} started", self.id);
        while let Some(msg) = rx.recv().await {
            match msg {
                WorkerMessage::Process { data, responder } => {
                    self.processed += 1;
                    let result = format!("processed by {}: {}", self.id, data.to_uppercase());
                    let _ = responder.send(result);
                }
                WorkerMessage::Crash => {
                    println!("worker {} crashing!", self.id);
                    let _ = supervisor_tx.send(SupervisorMessage::ChildCrashed {
                        id: self.id.clone(),
                    }).await;
                    return;
                }
                WorkerMessage::Status { responder } => {
                    let _ = responder.send(WorkerStatus {
                        id: self.id.clone(),
                        processed: self.processed,
                        alive: true,
                    });
                }
            }
        }
        let _ = supervisor_tx.send(SupervisorMessage::ChildStopped {
            id: self.id.clone(),
        }).await;
    }
}

enum SupervisorMessage {
    ChildCrashed { id: String },
    ChildStopped { id: String },
    SpawnWorker { id: String },
    GetStatus { responder: oneshot::Sender<Vec<WorkerStatus>> },
}

struct ChildHandle {
    tx: mpsc::Sender<WorkerMessage>,
    strategy: RestartStrategy,
}

struct Supervisor {
    children: HashMap<String, ChildHandle>,
    supervisor_rx: mpsc::Receiver<SupervisorMessage>,
    supervisor_tx: mpsc::Sender<SupervisorMessage>,
    restart_count: HashMap<String, u32>,
    max_restarts: u32,
}

impl Supervisor {
    fn new(rx: mpsc::Receiver<SupervisorMessage>, tx: mpsc::Sender<SupervisorMessage>, max_restarts: u32) -> Self {
        Self {
            children: HashMap::new(),
            supervisor_rx: rx,
            supervisor_tx: tx,
            restart_count: HashMap::new(),
            max_restarts,
        }
    }

    async fn spawn_child(&mut self, id: String, strategy: RestartStrategy) {
        let (tx, rx) = mpsc::channel::<WorkerMessage>(64);
        let supervisor_tx = self.supervisor_tx.clone();
        let worker_id = id.clone();
        let mut worker = Worker::new(id.clone());

        tokio::spawn(async move {
            worker.run(rx, supervisor_tx).await;
        });

        self.children.insert(id, ChildHandle { tx, strategy });
        println!("supervisor spawned child {}", worker_id);
    }

    async fn restart_child(&mut self, id: &str) -> bool {
        let restarts = self.restart_count.entry(id.to_string()).or_insert(0);
        if *restarts >= self.max_restarts {
            println!("supervisor: child {} exceeded max restarts ({})", id, self.max_restarts);
            return false;
        }
        *restarts += 1;
        println!("supervisor: restarting child {} (attempt {})", id, *restarts);

        let strategy = self.children.get(id).map(|h| h.strategy.clone());
        if let Some(strategy) = strategy {
            self.children.remove(id);
            self.spawn_child(id.to_string(), strategy).await;
            true
        } else {
            false
        }
    }

    async fn run(&mut self) {
        while let Some(msg) = self.supervisor_rx.recv().await {
            match msg {
                SupervisorMessage::ChildCrashed { id } => {
                    let strategy = self.children.get(&id).map(|h| h.strategy.clone());
                    match strategy {
                        Some(RestartStrategy::Permanent) => {
                            self.restart_child(&id).await;
                        }
                        Some(RestartStrategy::Transient) => {
                            self.restart_child(&id).await;
                        }
                        Some(RestartStrategy::Temporary) => {
                            println!("supervisor: child {} crashed, not restarting (temporary)", id);
                            self.children.remove(&id);
                        }
                        None => {}
                    }
                }
                SupervisorMessage::ChildStopped { id } => {
                    let strategy = self.children.get(&id).map(|h| h.strategy.clone());
                    match strategy {
                        Some(RestartStrategy::Permanent) => {
                            self.restart_child(&id).await;
                        }
                        Some(RestartStrategy::Transient) | Some(RestartStrategy::Temporary) => {
                            self.children.remove(&id);
                        }
                        None => {}
                    }
                }
                SupervisorMessage::SpawnWorker { id } => {
                    self.spawn_child(id, RestartStrategy::Permanent).await;
                }
                SupervisorMessage::GetStatus { responder } => {
                    let mut statuses = Vec::new();
                    for (id, handle) in &self.children {
                        let (resp_tx, resp_rx) = oneshot::channel();
                        if handle.tx.send(WorkerMessage::Status { responder: resp_tx }).await.is_ok() {
                            if let Ok(status) = resp_rx.await {
                                statuses.push(status);
                            }
                        }
                    }
                    let _ = responder.send(statuses);
                }
            }
        }
    }
}

#[tokio::main]
async fn main() {
    let (sup_tx, sup_rx) = mpsc::channel::<SupervisorMessage>(256);
    let mut supervisor = Supervisor::new(sup_rx, sup_tx.clone(), 3);
    tokio::spawn(async move { supervisor.run().await });

    sup_tx.send(SupervisorMessage::SpawnWorker { id: "worker-1".into() }).await.unwrap();
    sup_tx.send(SupervisorMessage::SpawnWorker { id: "worker-2".into() }).await.unwrap();

    sleep(Duration::from_millis(50)).await;

    let (resp_tx, resp_rx) = oneshot::channel();
    sup_tx.send(SupervisorMessage::GetStatus { responder: resp_tx }).await.unwrap();
    let statuses = resp_rx.await.unwrap();
    for s in statuses {
        println!("status: {:?}", s);
    }
}

Key takeaways:

  • The Supervisor holds child Actor Sender handles and receives crash notifications via mpsc
  • Three restart strategies: Permanent (always restart), Transient (restart on error), Temporary (never restart)
  • max_restarts prevents infinite restart loops
  • Child Actors proactively notify the supervisor on crash; the supervisor decides whether to restart
  • Supervision trees can be nested: a supervisor itself can be a supervised child

Restart Strategy Comparison:

Strategy Normal Exit Abnormal Exit Typical Use Case
Permanent Restart Restart Always-on services, scheduled tasks
Transient No restart Restart Request handlers, temporary computation
Temporary No restart No restart One-off tasks, debugging

Pattern 4: Distributed Actors — Remote Messaging Pattern

When Actors cross process boundaries, messages must be serialized and transmitted over the network. This pattern shows how to build a remotely communicable Actor system.

use tokio::sync::{mpsc, oneshot};
use tokio::net::{TcpListener, TcpStream};
use serde::{Serialize, Deserialize};
use std::collections::HashMap;
use std::net::SocketAddr;

#[derive(Debug, Serialize, Deserialize)]
struct RemoteEnvelope {
    target: String,
    message_type: String,
    payload: Vec<u8>,
    reply_to: Option<u64>,
}

#[derive(Debug, Serialize, Deserialize)]
enum OrderCommand {
    Create { order_id: String, item: String, quantity: u32 },
    Cancel { order_id: String },
    Query { order_id: String },
}

#[derive(Debug, Serialize, Deserialize)]
enum OrderResponse {
    Created { order_id: String },
    Cancelled { order_id: String },
    Found { order_id: String, item: String, quantity: u32 },
    NotFound { order_id: String },
    Error { message: String },
}

#[derive(Debug, Serialize, Deserialize, Clone)]
struct Order {
    id: String,
    item: String,
    quantity: u32,
    status: String,
}

struct OrderActor {
    orders: HashMap<String, Order>,
    cmd_rx: mpsc::Receiver<(OrderCommand, oneshot::Sender<OrderResponse>)>,
}

impl OrderActor {
    fn new(rx: mpsc::Receiver<(OrderCommand, oneshot::Sender<OrderResponse>)>) -> Self {
        Self { orders: HashMap::new(), cmd_rx: rx }
    }

    async fn run(&mut self) {
        while let Some((cmd, responder)) = self.cmd_rx.recv().await {
            let response = match cmd {
                OrderCommand::Create { order_id, item, quantity } => {
                    if self.orders.contains_key(&order_id) {
                        OrderResponse::Error { message: format!("order {} already exists", order_id) }
                    } else {
                        self.orders.insert(order_id.clone(), Order {
                            id: order_id.clone(),
                            item,
                            quantity,
                            status: "created".into(),
                        });
                        OrderResponse::Created { order_id }
                    }
                }
                OrderCommand::Cancel { order_id } => {
                    match self.orders.get_mut(&order_id) {
                        Some(order) => {
                            order.status = "cancelled".into();
                            OrderResponse::Cancelled { order_id }
                        }
                        None => OrderResponse::NotFound { order_id },
                    }
                }
                OrderCommand::Query { order_id } => {
                    match self.orders.get(&order_id) {
                        Some(order) => OrderResponse::Found {
                            order_id: order.id.clone(),
                            item: order.item.clone(),
                            quantity: order.quantity,
                        },
                        None => OrderResponse::NotFound { order_id },
                    }
                }
            };
            let _ = responder.send(response);
        }
    }
}

struct RemoteGateway {
    local_actors: HashMap<String, mpsc::Sender<(OrderCommand, oneshot::Sender<OrderResponse>)>>,
    remote_nodes: HashMap<String, SocketAddr>,
    next_request_id: u64,
}

impl RemoteGateway {
    fn new() -> Self {
        Self {
            local_actors: HashMap::new(),
            remote_nodes: HashMap::new(),
            next_request_id: 0,
        }
    }

    fn register_local(&mut self, name: String, tx: mpsc::Sender<(OrderCommand, oneshot::Sender<OrderResponse>)>) {
        self.local_actors.insert(name, tx);
    }

    fn register_remote(&mut self, name: String, addr: SocketAddr) {
        self.remote_nodes.insert(name, addr);
    }

    async fn send_command(&mut self, target: &str, cmd: OrderCommand) -> Option<OrderResponse> {
        if let Some(tx) = self.local_actors.get(target) {
            let (resp_tx, resp_rx) = oneshot::channel();
            if tx.send((cmd, resp_tx)).await.is_ok() {
                return resp_rx.await.ok();
            }
            return None;
        }

        if let Some(addr) = self.remote_nodes.get(target) {
            match TcpStream::connect(addr).await {
                Ok(mut stream) => {
                    let payload = bincode::serialize(&cmd).ok()?;
                    let request_id = self.next_request_id;
                    self.next_request_id += 1;

                    let envelope = RemoteEnvelope {
                        target: target.to_string(),
                        message_type: "OrderCommand".to_string(),
                        payload,
                        reply_to: Some(request_id),
                    };

                    let data = bincode::serialize(&envelope).ok()?;
                    let len = data.len() as u32;
                    use tokio::io::AsyncWriteExt;
                    if stream.write_all(&len.to_le_bytes()).await.is_err() { return None; }
                    if stream.write_all(&data).await.is_err() { return None; }

                    use tokio::io::AsyncReadExt;
                    let mut len_buf = [0u8; 4];
                    if stream.read_exact(&mut len_buf).await.is_err() { return None; }
                    let resp_len = u32::from_le_bytes(len_buf) as usize;
                    let mut resp_buf = vec![0u8; resp_len];
                    if stream.read_exact(&mut resp_buf).await.is_err() { return None; }
                    bincode::deserialize(&resp_buf).ok()
                }
                Err(e) => {
                    eprintln!("failed to connect to {}: {}", target, e);
                    None
                }
            }
        } else {
            None
        }
    }
}

async fn server_node(addr: SocketAddr) {
    let (tx, rx) = mpsc::channel::<(OrderCommand, oneshot::Sender<OrderResponse>)>(256);
    let mut actor = OrderActor::new(rx);
    tokio::spawn(async move { actor.run().await });

    let listener = TcpListener::bind(addr).await.unwrap();
    println!("order actor listening on {}", addr);

    loop {
        if let Ok((mut stream, _)) = listener.accept().await {
            let tx = tx.clone();
            tokio::spawn(async move {
                use tokio::io::AsyncReadExt;
                use tokio::io::AsyncWriteExt;

                let mut len_buf = [0u8; 4];
                if stream.read_exact(&mut len_buf).await.is_err() { return; }
                let len = u32::from_le_bytes(len_buf) as usize;
                let mut buf = vec![0u8; len];
                if stream.read_exact(&mut buf).await.is_err() { return; }

                let envelope: RemoteEnvelope = match bincode::deserialize(&buf) {
                    Ok(e) => e,
                    Err(_) => return,
                };

                let cmd: OrderCommand = match bincode::deserialize(&envelope.payload) {
                    Ok(c) => c,
                    Err(_) => return,
                };

                let (resp_tx, resp_rx) = oneshot::channel();
                if tx.send((cmd, resp_tx)).await.is_err() { return; }
                let response = match resp_rx.await {
                    Ok(r) => r,
                    Err(_) => return,
                };

                let resp_data = bincode::serialize(&response).unwrap();
                let resp_len = resp_data.len() as u32;
                let _ = stream.write_all(&resp_len.to_le_bytes()).await;
                let _ = stream.write_all(&resp_data).await;
            });
        }
    }
}

#[tokio::main]
async fn main() {
    let server_addr: SocketAddr = "127.0.0.1:9001".parse().unwrap();
    tokio::spawn(server_node(server_addr));

    tokio::time::sleep(Duration::from_millis(100)).await;

    let mut gateway = RemoteGateway::new();
    gateway.register_remote("order-service".into(), server_addr);

    let response = gateway.send_command("order-service", OrderCommand::Create {
        order_id: "ORD-001".into(),
        item: "Rust Book".into(),
        quantity: 2,
    }).await;
    println!("create response: {:?}", response);

    let response = gateway.send_command("order-service", OrderCommand::Query {
        order_id: "ORD-001".into(),
    }).await;
    println!("query response: {:?}", response);
}

Key takeaways:

  • RemoteEnvelope wraps message metadata: target Actor, message type, payload, reply address
  • Local Actors communicate directly via channels; remote Actors use serialized messages over TCP
  • RemoteGateway unifies local/remote routing — business code is location-transparent
  • bincode provides efficient binary serialization, better suited for Actor communication than JSON
  • Length-prefixed framing (4-byte length + data) solves TCP's stream-oriented nature

Pattern 5: Production Actor System — Graceful Shutdown and Backpressure

Production Actor systems must handle graceful shutdown, backpressure control, message timeouts, and other complex scenarios.

use tokio::sync::{mpsc, oneshot, watch, broadcast};
use tokio::time::{timeout, Duration, Instant};
use tokio::select;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

struct ActorSystem {
    shutdown_tx: watch::Sender<bool>,
    shutdown_rx: watch::Receiver<bool>,
    event_tx: broadcast::Sender<String>,
    metrics: Arc<ActorMetrics>,
}

struct ActorMetrics {
    messages_sent: AtomicU64,
    messages_processed: AtomicU64,
    messages_dropped: AtomicU64,
    actors_alive: AtomicU64,
}

impl ActorMetrics {
    fn new() -> Self {
        Self {
            messages_sent: AtomicU64::new(0),
            messages_processed: AtomicU64::new(0),
            messages_dropped: AtomicU64::new(0),
            actors_alive: AtomicU64::new(0),
        }
    }

    fn snapshot(&self) -> String {
        format!(
            "sent: {}, processed: {}, dropped: {}, alive: {}",
            self.messages_sent.load(Ordering::Relaxed),
            self.messages_processed.load(Ordering::Relaxed),
            self.messages_dropped.load(Ordering::Relaxed),
            self.actors_alive.load(Ordering::Relaxed),
        )
    }
}

enum WorkerCmd {
    Execute { task: String, responder: oneshot::Sender<String> },
    Ping { responder: oneshot::Sender<bool> },
}

struct ResilientWorker {
    id: String,
    cmd_rx: mpsc::Receiver<WorkerCmd>,
    shutdown_rx: watch::Receiver<bool>,
    event_tx: broadcast::Sender<String>,
    metrics: Arc<ActorMetrics>,
    mailbox_capacity: usize,
    process_timeout: Duration,
}

impl ResilientWorker {
    async fn run(&mut self) {
        self.metrics.actors_alive.fetch_add(1, Ordering::Relaxed);
        println!("worker {} started", self.id);

        loop {
            select! {
                _ = self.shutdown_rx.changed() => {
                    if *self.shutdown_rx.borrow() {
                        self.drain_mailbox().await;
                        break;
                    }
                }
                cmd = self.cmd_rx.recv() => {
                    match cmd {
                        Some(cmd) => self.handle_command(cmd).await,
                        None => break,
                    }
                }
            }
        }

        self.metrics.actors_alive.fetch_sub(1, Ordering::Relaxed);
        println!("worker {} stopped", self.id);
    }

    async fn handle_command(&mut self, cmd: WorkerCmd) {
        match cmd {
            WorkerCmd::Execute { task, responder } => {
                self.metrics.messages_sent.fetch_add(1, Ordering::Relaxed);

                let result = match timeout(self.process_timeout, self.process_task(&task)).await {
                    Ok(output) => {
                        self.metrics.messages_processed.fetch_add(1, Ordering::Relaxed);
                        output
                    }
                    Err(_) => {
                        self.metrics.messages_dropped.fetch_add(1, Ordering::Relaxed);
                        let _ = self.event_tx.send(format!("worker {} timeout on: {}", self.id, task));
                        "timeout".to_string()
                    }
                };
                let _ = responder.send(result);
            }
            WorkerCmd::Ping { responder } => {
                let _ = responder.send(true);
            }
        }
    }

    async fn process_task(&self, task: &str) -> String {
        tokio::time::sleep(Duration::from_millis(10)).await;
        format!("worker {} processed: {}", self.id, task.to_uppercase())
    }

    async fn drain_mailbox(&mut self) {
        println!("worker {} draining mailbox...", self.id);
        let drain_timeout = Duration::from_secs(5);
        let deadline = Instant::now() + drain_timeout;

        loop {
            if Instant::now() > deadline {
                println!("worker {} drain timeout, {} messages remaining", self.id, self.cmd_rx.len());
                break;
            }

            match timeout(Duration::from_millis(100), self.cmd_rx.recv()).await {
                Ok(Some(WorkerCmd::Execute { task, responder })) => {
                    let result = self.process_task(&task).await;
                    self.metrics.messages_processed.fetch_add(1, Ordering::Relaxed);
                    let _ = responder.send(result);
                }
                Ok(Some(WorkerCmd::Ping { responder })) => {
                    let _ = responder.send(false);
                }
                Ok(None) => break,
                Err(_) => continue,
            }
        }
        println!("worker {} mailbox drained", self.id);
    }
}

impl ActorSystem {
    fn new() -> Self {
        let (shutdown_tx, shutdown_rx) = watch::channel(false);
        let (event_tx, _) = broadcast::channel::<String>(256);
        let metrics = Arc::new(ActorMetrics::new());

        Self { shutdown_tx, shutdown_rx, event_tx, metrics }
    }

    async fn spawn_worker(&self, id: String, mailbox_capacity: usize) -> mpsc::Sender<WorkerCmd> {
        let (tx, rx) = mpsc::channel::<WorkerCmd>(mailbox_capacity);
        let mut worker = ResilientWorker {
            id,
            cmd_rx: rx,
            shutdown_rx: self.shutdown_rx.clone(),
            event_tx: self.event_tx.clone(),
            metrics: self.metrics.clone(),
            mailbox_capacity,
            process_timeout: Duration::from_secs(2),
        };
        tokio::spawn(async move { worker.run().await });
        tx
    }

    async fn graceful_shutdown(&self) {
        println!("initiating graceful shutdown...");
        self.shutdown_tx.send(true).ok();
        tokio::time::sleep(Duration::from_secs(6)).await;
        println!("final metrics: {}", self.metrics.snapshot());
    }
}

async fn backpressure_sender(
    tx: mpsc::Sender<WorkerCmd>,
    worker_id: String,
    total: u64,
) -> u64 {
    let mut sent = 0u64;
    let mut dropped = 0u64;

    for i in 0..total {
        let (resp_tx, resp_rx) = oneshot::channel();
        let task = format!("task-{}", i);

        match timeout(Duration::from_millis(50), tx.send(WorkerCmd::Execute {
            task,
            responder: resp_tx,
        })).await {
            Ok(Ok(())) => {
                match timeout(Duration::from_secs(3), resp_rx).await {
                    Ok(Ok(result)) => {
                        sent += 1;
                        if i % 100 == 0 {
                            println!("{} result: {}", worker_id, result);
                        }
                    }
                    _ => dropped += 1,
                }
            }
            _ => dropped += 1,
        }
    }

    println!("{} sent: {}, dropped: {}", worker_id, sent, dropped);
    dropped
}

#[tokio::main]
async fn main() {
    let system = ActorSystem::new();

    let worker1 = system.spawn_worker("worker-1".into(), 64).await;
    let worker2 = system.spawn_worker("worker-2".into(), 64).await;

    let h1 = tokio::spawn(backpressure_sender(worker1, "sender-1".into(), 200));
    let h2 = tokio::spawn(backpressure_sender(worker2, "sender-2".into(), 200));

    h1.await.unwrap();
    h2.await.unwrap();

    println!("metrics: {}", system.metrics.snapshot());
    system.graceful_shutdown().await;
}

Key takeaways:

  • watch::channel(false) broadcasts the shutdown signal; all Actors receive it simultaneously
  • drain_mailbox() processes remaining messages before shutdown, with a timeout to prevent infinite waiting
  • timeout wraps send and recv to prevent backpressure from causing indefinite blocking
  • ActorMetrics uses AtomicU64 for lock-free, thread-safe metrics collection
  • Bounded mailbox (capacity 64) provides natural backpressure — producers wait when full

Five Common Pitfalls

Pitfall 1: Circular Wait Between Actors

// ❌ Wrong: Actor A waits for Actor B's response, Actor B waits for Actor A's response
async fn deadlock_pattern() {
    let (tx_a, mut rx_a) = mpsc::channel::<String>(16);
    let (tx_b, mut rx_b) = mpsc::channel::<String>(16);

    tokio::spawn(async move {
        tx_b.send("request-from-a".into()).await.unwrap();
        let response = rx_a.recv().await.unwrap();
    });

    tokio::spawn(async move {
        tx_a.send("request-from-b".into()).await.unwrap();
        let response = rx_b.recv().await.unwrap();
    });
}

// ✅ Correct: Use timeouts or one-way messages to break cycles
async fn safe_pattern() {
    let (tx_a, mut rx_a) = mpsc::channel::<String>(16);
    let (tx_b, mut rx_b) = mpsc::channel::<String>(16);

    tokio::spawn(async move {
        tx_b.send("request-from-a".into()).await.unwrap();
        match timeout(Duration::from_secs(5), rx_a.recv()).await {
            Ok(Some(response)) => println!("got: {}", response),
            _ => println!("timeout or channel closed"),
        }
    });
}

Pitfall 2: Forgetting to Handle Full Mailboxes

// ❌ Wrong: Unbounded channel, memory may explode
async fn unbounded_risk() {
    let (tx, mut rx) = mpsc::unbounded_channel::<String>();
    for i in 0..1000000 {
        tx.send(format!("msg-{}", i)).unwrap();
    }
}

// ✅ Correct: Bounded channel + backpressure
async fn bounded_safe() {
    let (tx, mut rx) = mpsc::channel::<String>(256);
    for i in 0..1000 {
        match tx.send(format!("msg-{}", i)).await {
            Ok(()) => {}
            Err(_) => {
                println!("receiver dropped at {}", i);
                return;
            }
        }
    }
}

Pitfall 3: Supervisor Without Restart Limits

// ❌ Wrong: Infinite restarts, may form crash-restart loops
async fn infinite_restart_loop() {
    loop {
        let (tx, rx) = mpsc::channel::<String>(16);
        let handle = tokio::spawn(async move {
            panic!("always crash!");
        });
        let _ = handle.await;
        println!("restarting...");
    }
}

// ✅ Correct: Limit restart frequency and count
struct RestartPolicy {
    max_restarts: u32,
    window: Duration,
    restarts: Vec<Instant>,
}

impl RestartPolicy {
    fn new(max_restarts: u32, window: Duration) -> Self {
        Self { max_restarts, window, restarts: Vec::new() }
    }

    fn should_restart(&mut self) -> bool {
        let now = Instant::now();
        self.restarts.retain(|t| now - *t < self.window);
        if self.restarts.len() < self.max_restarts as usize {
            self.restarts.push(now);
            true
        } else {
            false
        }
    }
}

Pitfall 4: Remote Messages Without Timeouts

// ❌ Wrong: Remote call may block forever
async fn remote_without_timeout() {
    let response = gateway.send_command("remote-actor", cmd).await;
}

// ✅ Correct: All remote calls must have timeouts
async fn remote_with_timeout() {
    match timeout(Duration::from_secs(5), gateway.send_command("remote-actor", cmd)).await {
        Ok(Some(response)) => println!("got: {:?}", response),
        Ok(None) => println!("actor not found"),
        Err(_) => println!("remote call timeout"),
    }
}

Pitfall 5: Dropping Unprocessed Messages During Shutdown

// ❌ Wrong: Drop receiver directly, messages lost
async fn bad_shutdown(mut rx: mpsc::Receiver<String>) {
    drop(rx);
}

// ✅ Correct: Drain before closing
async fn good_shutdown(mut rx: mpsc::Receiver<String>) {
    println!("draining remaining messages...");
    while let Ok(Some(msg)) = timeout(Duration::from_millis(100), rx.recv()).await {
        println!("drained: {}", msg);
    }
    println!("all messages processed");
}

Error Troubleshooting Quick Reference

Symptom Possible Cause Solution
Actor not responding Mailbox full or deadlock Check mailbox capacity, add timeout mechanisms
Frequent SendError Receiver already closed Check if Actor exited unexpectedly, add supervision
Continuous memory growth Unbounded channel or message backlog Switch to bounded channels, add backpressure
Restart loop Actor initialization failure Check startup logic, limit restart count
Remote call timeout Network latency or unresponsive peer Add timeout and retry mechanisms
Message ordering issues Multiple producers sending concurrently Use single producer for ordering, or add sequence numbers
Graceful shutdown stuck An Actor not responding to shutdown signal Set timeout for drain operations
oneshot RecvError Responder exited without sending Check all code paths send responses

Advanced Optimization Techniques

1. Actor Pooling — Load Balancing

When a single Actor becomes a bottleneck, use an Actor pool to distribute messages:

use tokio::sync::mpsc;

struct ActorPool {
    workers: Vec<mpsc::Sender<WorkerCmd>>,
    next: std::sync::atomic::AtomicUsize,
}

impl ActorPool {
    fn new(workers: Vec<mpsc::Sender<WorkerCmd>>) -> Self {
        Self {
            workers,
            next: std::sync::atomic::AtomicUsize::new(0),
        }
    }

    fn next_worker(&self) -> &mpsc::Sender<WorkerCmd> {
        let idx = self.next.fetch_add(1, Ordering::Relaxed) % self.workers.len();
        &self.workers[idx]
    }

    async fn send(&self, cmd: WorkerCmd) -> bool {
        let worker = self.next_worker();
        worker.send(cmd).await.is_ok()
    }
}

2. Message Priority

use tokio::sync::mpsc;
use tokio::select;

enum Priority {
    High,
    Normal,
    Low,
}

struct PriorityMailbox {
    high_rx: mpsc::Receiver<String>,
    normal_rx: mpsc::Receiver<String>,
    low_rx: mpsc::Receiver<String>,
}

impl PriorityMailbox {
    async fn recv(&mut self) -> Option<String> {
        select! {
            biased;
            msg = self.high_rx.recv() => msg,
            msg = self.normal_rx.recv() => msg,
            msg = self.low_rx.recv() => msg,
        }
    }
}

3. Message Tracing with Trace IDs

use std::sync::atomic::{AtomicU64, Ordering};

static TRACE_ID: AtomicU64 = AtomicU64::new(1);

fn new_trace_id() -> u64 {
    TRACE_ID.fetch_add(1, Ordering::Relaxed)
}

#[derive(Debug, Clone)]
struct TracedMessage {
    trace_id: u64,
    span_id: u64,
    parent_span: Option<u64>,
    payload: String,
}

impl TracedMessage {
    fn new(payload: String) -> Self {
        Self {
            trace_id: new_trace_id(),
            span_id: new_trace_id(),
            parent_span: None,
            payload,
        }
    }

    fn child(&self, payload: String) -> Self {
        Self {
            trace_id: self.trace_id,
            span_id: new_trace_id(),
            parent_span: Some(self.span_id),
            payload,
        }
    }
}

Actor Framework Comparison

Feature Actix Tokio Channel Riker Coerce
Message passing Typed Message Manual enum Typed Typed
Supervision tree Built-in Manual implementation Built-in Built-in
Remote Actors actix-remote Manual implementation Supported Built-in
Performance High Very high Medium High
Dependencies actix tokio only riker coerce
Learning curve Moderate Gentle Moderate Moderate
Ecosystem maturity High High Low Low
Use case Web services General concurrency Traditional Actor Distributed

Summary

The core philosophy of the Rust Actor model: Don't communicate by sharing memory; share memory by communicating. Actix provides a mature typed Actor system, while Tokio Channel Actors are lighter and more flexible. Supervision trees are the cornerstone of production Actor systems — an Actor system without supervision is like a car without seatbelts. In distributed scenarios, unifying local/remote communication interfaces is key. Always use bounded mailboxes for backpressure, set timeouts for all remote calls, and use drain mechanisms for graceful shutdown. Your Actor system will be both robust and efficient.


Try these browser-local tools — no sign-up required →

#Rust Actor#Actix#Tokio#并发模型#消息传递#2026#编程语言