Rust Actor模型框架實戰:從訊息傳遞到監督樹的5種生產模式

编程语言

當併發模型遇上Rust的所有權

你有沒有遇到過這種情況——系統需要同時處理上千個獨立的狀態實體,每個實體都有自己的生命週期和訊息佇列。你試過用 Arc<Mutex<HashMap>> 管理共享狀態,結果鎖競爭讓吞吐量跌到谷底;你試過用執行緒池分發任務,卻發現執行緒間通訊的開銷比計算本身還大;最終你意識到,Actor模型才是這類問題的正解

Actor模型的核心思想很簡單:每個Actor是一個獨立的計算單元,擁有私有狀態,只能透過訊息傳遞與外界互動。沒有共享狀態,就沒有鎖競爭;沒有鎖競爭,就沒有死結。Rust的所有權系統與Actor模型天然契合——訊息的所有權在傳送時轉移,編譯器幫你保證執行緒安全。


核心概念一覽

概念 說明 典型場景
Actor 獨立計算單元,封裝狀態與行為 狀態機、連線管理
Message Actor間通訊的唯一方式 命令、事件、查詢
Mailbox Actor的訊息佇列 訊息緩衝、有序處理
Supervision 父Actor監控子Actor 故障恢復、重啟策略
Address Actor的引用句柄 訊息定址、路由
Context Actor的執行時環境 計時器、子Actor管理
Backpressure 流量控制機制 防止Mailbox溢位

Actor模型的五大挑戰

1. 共享狀態的鎖地獄

傳統併發模型依賴共享記憶體+鎖,但在高併發場景下,鎖競爭成為瓶頸。Actor模型透過「不共享」來解決這個問題——每個Actor獨佔自己的狀態。

2. 故障隔離與恢復

一個元件崩潰不應該拖垮整個系統。Actor模型透過監督樹實現故障隔離——子Actor崩潰由父Actor決定如何處理(重啟、停止、升級重啟等)。

3. 訊息有序性保證

同一個Actor處理訊息天然有序,但跨Actor的訊息順序如何保證?需要精心設計訊息協定和路由策略。

4. 分散式透明性

本地Actor和遠端Actor的通訊方式應該一致。如何在不改變業務邏輯的前提下,讓Actor跨節點通訊?

5. 優雅關閉的複雜性

一個Actor系統可能有成百上千個Actor,如何確保關閉時所有訊息都被處理完畢、所有資源都被釋放?


五種生產級Actor模式

模式一:Actix Actor系統——經典Actor模式

Actix是Rust生態中最成熟的Actor框架,基於型別化的訊息處理,提供完整的Actor生命週期管理。

use actix::prelude::*;
use std::time::Duration;

struct GameSession {
    id: String,
    score: u64,
    level: u32,
}

impl Actor for GameSession {
    type Context = Context<Self>;

    fn started(&mut self, _ctx: &mut Self::Context) {
        println!("session {} started, score: {}", self.id, self.score);
    }

    fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
        println!("session {} stopping", self.id);
        Running::Continue
    }

    fn stopped(&mut self, _ctx: &mut Self::Context) {
        println!("session {} stopped, final score: {}", self.id, self.score);
    }
}

#[derive(Message)]
#[rtype(result = "u64")]
struct GetScore;

#[derive(Message)]
#[rtype(result = "()")]
struct AddScore { amount: u64 }

#[derive(Message)]
#[rtype(result = "()")]
struct LevelUp;

impl Handler<GetScore> for GameSession {
    type Result = u64;

    fn handle(&mut self, _msg: GetScore, _ctx: &mut Context<Self>) -> u64 {
        self.score
    }
}

impl Handler<AddScore> for GameSession {
    type Result = ();

    fn handle(&mut self, msg: AddScore, _ctx: &mut Context<Self>) {
        self.score += msg.amount;
        println!("session {} score: {}", self.id, self.score);
    }
}

impl Handler<LevelUp> for GameSession {
    type Result = ();

    fn handle(&mut self, _msg: LevelUp, ctx: &mut Context<Self>) {
        self.level += 1;
        println!("session {} leveled up to {}", self.id, self.level);

        ctx.notify_later(LevelUp, Duration::from_secs(30));
    }
}

#[actix::main]
async fn main() {
    let addr = GameSession {
        id: "player-001".into(),
        score: 0,
        level: 1,
    }.start();

    addr.send(AddScore { amount: 100 }).await.unwrap();
    addr.send(AddScore { amount: 50 }).await.unwrap();
    addr.send(LevelUp).await.unwrap();

    let score = addr.send(GetScore).await.unwrap();
    println!("final score: {}", score);
}

關鍵要點

  • impl Actor 定義Actor生命週期:startedstoppingstopped
  • #[derive(Message)] 定義型別化訊息,rtype 指定回傳型別
  • Handler<T> trait 為每種訊息型別實作處理邏輯
  • ctx.notify_later 實現定時自訊息,構建週期性任務
  • addr.send() 非同步傳送訊息並等待回應

模式二:Tokio Channel-Based Actor——輕量級Actor模式

不依賴Actor框架,用Tokio channel手寫Actor,更輕量、更靈活,適合對效能和可控性要求極高的場景。

use tokio::sync::{mpsc, oneshot};
use std::collections::HashMap;

enum CacheCommand {
    Get {
        key: String,
        responder: oneshot::Sender<Option<String>>,
    },
    Set {
        key: String,
        value: String,
        ttl: Option<std::time::Duration>,
        responder: oneshot::Sender<bool>,
    },
    Delete {
        key: String,
        responder: oneshot::Sender<bool>,
    },
    Stats {
        responder: oneshot::Sender<CacheStats>,
    },
}

#[derive(Debug)]
struct CacheStats {
    entries: usize,
    hits: u64,
    misses: u64,
}

struct CacheEntry {
    value: String,
    expires_at: Option<std::time::Instant>,
}

struct CacheActor {
    store: HashMap<String, CacheEntry>,
    hits: u64,
    misses: u64,
}

impl CacheActor {
    fn new() -> Self {
        Self {
            store: HashMap::new(),
            hits: 0,
            misses: 0,
        }
    }

    fn is_expired(&self, entry: &CacheEntry) -> bool {
        match entry.expires_at {
            Some(t) => std::time::Instant::now() > t,
            None => false,
        }
    }

    async fn run(&mut self, mut rx: mpsc::Receiver<CacheCommand>) {
        while let Some(cmd) = rx.recv().await {
            match cmd {
                CacheCommand::Get { key, responder } => {
                    let result = match self.store.get(&key) {
                        Some(entry) if !self.is_expired(entry) => {
                            self.hits += 1;
                            Some(entry.value.clone())
                        }
                        Some(_) => {
                            self.store.remove(&key);
                            self.misses += 1;
                            None
                        }
                        None => {
                            self.misses += 1;
                            None
                        }
                    };
                    let _ = responder.send(result);
                }
                CacheCommand::Set { key, value, ttl, responder } => {
                    let expires_at = ttl.map(|d| std::time::Instant::now() + d);
                    self.store.insert(key, CacheEntry { value, expires_at });
                    let _ = responder.send(true);
                }
                CacheCommand::Delete { key, responder } => {
                    let existed = self.store.remove(&key).is_some();
                    let _ = responder.send(existed);
                }
                CacheCommand::Stats { responder } => {
                    let stats = CacheStats {
                        entries: self.store.len(),
                        hits: self.hits,
                        misses: self.misses,
                    };
                    let _ = responder.send(stats);
                }
            }
        }
        println!("cache actor shutting down, {} entries remaining", self.store.len());
    }
}

#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::channel::<CacheCommand>(256);
    let mut actor = CacheActor::new();
    tokio::spawn(async move { actor.run(rx).await });

    let (resp_tx, resp_rx) = oneshot::channel();
    tx.send(CacheCommand::Set {
        key: "user:1001".into(),
        value: r#"{"name":"alice"}"#.into(),
        ttl: Some(std::time::Duration::from_secs(300)),
        responder: resp_tx,
    }).await.unwrap();
    resp_rx.await.unwrap();

    let (resp_tx, resp_rx) = oneshot::channel();
    tx.send(CacheCommand::Get {
        key: "user:1001".into(),
        responder: resp_tx,
    }).await.unwrap();
    let result = resp_rx.await.unwrap();
    println!("get result: {:?}", result);

    let (resp_tx, resp_rx) = oneshot::channel();
    tx.send(CacheCommand::Stats { responder: resp_tx }).await.unwrap();
    let stats = resp_rx.await.unwrap();
    println!("cache stats: {:?}", stats);
}

關鍵要點

  • mpsc channel 作為 Actor 的 mailbox,oneshot channel 實現請求-回應
  • enum Command 定義訊息協定,型別安全且可擴展
  • Actor 是單一擁有者,天然無資料競爭
  • 無需外部框架依賴,Tokio 即可構建完整 Actor 系統
  • ttl 過期機制展示 Actor 內部狀態管理的靈活性

Channel Actor vs Actix 對比

特性 Channel Actor Actix
依賴 僅 tokio actix + tokio
訊息型別 手動 enum derive Message
回應機制 oneshot channel Message rtype
生命週期 手動管理 自動回呼
監督 手動實作 內建支援
效能開銷 極低
學習曲線 平緩 中等

模式三:監督樹——故障恢復模式

監督樹是Actor模型最強大的特性之一:父Actor監控子Actor,當子Actor崩潰時根據策略決定如何恢復。

use tokio::sync::{mpsc, oneshot};
use tokio::time::{sleep, Duration};
use std::collections::HashMap;

#[derive(Debug, Clone)]
enum RestartStrategy {
    Permanent,
    Transient,
    Temporary,
}

#[derive(Debug)]
enum WorkerMessage {
    Process { data: String, responder: oneshot::Sender<String> },
    Crash,
    Status { responder: oneshot::Sender<WorkerStatus> },
}

#[derive(Debug, Clone)]
struct WorkerStatus {
    id: String,
    processed: u64,
    alive: bool,
}

struct Worker {
    id: String,
    processed: u64,
}

impl Worker {
    fn new(id: String) -> Self {
        Self { id, processed: 0 }
    }

    async fn run(
        &mut self,
        mut rx: mpsc::Receiver<WorkerMessage>,
        supervisor_tx: mpsc::Sender<SupervisorMessage>,
    ) {
        println!("worker {} started", self.id);
        while let Some(msg) = rx.recv().await {
            match msg {
                WorkerMessage::Process { data, responder } => {
                    self.processed += 1;
                    let result = format!("processed by {}: {}", self.id, data.to_uppercase());
                    let _ = responder.send(result);
                }
                WorkerMessage::Crash => {
                    println!("worker {} crashing!", self.id);
                    let _ = supervisor_tx.send(SupervisorMessage::ChildCrashed {
                        id: self.id.clone(),
                    }).await;
                    return;
                }
                WorkerMessage::Status { responder } => {
                    let _ = responder.send(WorkerStatus {
                        id: self.id.clone(),
                        processed: self.processed,
                        alive: true,
                    });
                }
            }
        }
        let _ = supervisor_tx.send(SupervisorMessage::ChildStopped {
            id: self.id.clone(),
        }).await;
    }
}

enum SupervisorMessage {
    ChildCrashed { id: String },
    ChildStopped { id: String },
    SpawnWorker { id: String },
    GetStatus { responder: oneshot::Sender<Vec<WorkerStatus>> },
}

struct ChildHandle {
    tx: mpsc::Sender<WorkerMessage>,
    strategy: RestartStrategy,
}

struct Supervisor {
    children: HashMap<String, ChildHandle>,
    supervisor_rx: mpsc::Receiver<SupervisorMessage>,
    supervisor_tx: mpsc::Sender<SupervisorMessage>,
    restart_count: HashMap<String, u32>,
    max_restarts: u32,
}

impl Supervisor {
    fn new(rx: mpsc::Receiver<SupervisorMessage>, tx: mpsc::Sender<SupervisorMessage>, max_restarts: u32) -> Self {
        Self {
            children: HashMap::new(),
            supervisor_rx: rx,
            supervisor_tx: tx,
            restart_count: HashMap::new(),
            max_restarts,
        }
    }

    async fn spawn_child(&mut self, id: String, strategy: RestartStrategy) {
        let (tx, rx) = mpsc::channel::<WorkerMessage>(64);
        let supervisor_tx = self.supervisor_tx.clone();
        let worker_id = id.clone();
        let mut worker = Worker::new(id.clone());

        tokio::spawn(async move {
            worker.run(rx, supervisor_tx).await;
        });

        self.children.insert(id, ChildHandle { tx, strategy });
        println!("supervisor spawned child {}", worker_id);
    }

    async fn restart_child(&mut self, id: &str) -> bool {
        let restarts = self.restart_count.entry(id.to_string()).or_insert(0);
        if *restarts >= self.max_restarts {
            println!("supervisor: child {} exceeded max restarts ({})", id, self.max_restarts);
            return false;
        }
        *restarts += 1;
        println!("supervisor: restarting child {} (attempt {})", id, *restarts);

        let strategy = self.children.get(id).map(|h| h.strategy.clone());
        if let Some(strategy) = strategy {
            self.children.remove(id);
            self.spawn_child(id.to_string(), strategy).await;
            true
        } else {
            false
        }
    }

    async fn run(&mut self) {
        while let Some(msg) = self.supervisor_rx.recv().await {
            match msg {
                SupervisorMessage::ChildCrashed { id } => {
                    let strategy = self.children.get(&id).map(|h| h.strategy.clone());
                    match strategy {
                        Some(RestartStrategy::Permanent) => {
                            self.restart_child(&id).await;
                        }
                        Some(RestartStrategy::Transient) => {
                            self.restart_child(&id).await;
                        }
                        Some(RestartStrategy::Temporary) => {
                            println!("supervisor: child {} crashed, not restarting (temporary)", id);
                            self.children.remove(&id);
                        }
                        None => {}
                    }
                }
                SupervisorMessage::ChildStopped { id } => {
                    let strategy = self.children.get(&id).map(|h| h.strategy.clone());
                    match strategy {
                        Some(RestartStrategy::Permanent) => {
                            self.restart_child(&id).await;
                        }
                        Some(RestartStrategy::Transient) | Some(RestartStrategy::Temporary) => {
                            self.children.remove(&id);
                        }
                        None => {}
                    }
                }
                SupervisorMessage::SpawnWorker { id } => {
                    self.spawn_child(id, RestartStrategy::Permanent).await;
                }
                SupervisorMessage::GetStatus { responder } => {
                    let mut statuses = Vec::new();
                    for (id, handle) in &self.children {
                        let (resp_tx, resp_rx) = oneshot::channel();
                        if handle.tx.send(WorkerMessage::Status { responder: resp_tx }).await.is_ok() {
                            if let Ok(status) = resp_rx.await {
                                statuses.push(status);
                            }
                        }
                    }
                    let _ = responder.send(statuses);
                }
            }
        }
    }
}

#[tokio::main]
async fn main() {
    let (sup_tx, sup_rx) = mpsc::channel::<SupervisorMessage>(256);
    let mut supervisor = Supervisor::new(sup_rx, sup_tx.clone(), 3);
    tokio::spawn(async move { supervisor.run().await });

    sup_tx.send(SupervisorMessage::SpawnWorker { id: "worker-1".into() }).await.unwrap();
    sup_tx.send(SupervisorMessage::SpawnWorker { id: "worker-2".into() }).await.unwrap();

    sleep(Duration::from_millis(50)).await;

    let (resp_tx, resp_rx) = oneshot::channel();
    sup_tx.send(SupervisorMessage::GetStatus { responder: resp_tx }).await.unwrap();
    let statuses = resp_rx.await.unwrap();
    for s in statuses {
        println!("status: {:?}", s);
    }
}

關鍵要點

  • 監督者(Supervisor)持有子Actor的 Sender,透過 mpsc 接收崩潰通知
  • 三種重啟策略:Permanent(總是重啟)、Transient(異常退出重啟)、Temporary(不重啟)
  • max_restarts 防止無限重啟迴圈
  • 子Actor崩潰時主動通知監督者,監督者決定是否重啟
  • 監督樹可以巢狀:監督者本身也可以是被監督的子Actor

重啟策略對比

策略 正常退出 異常退出 典型場景
Permanent 重啟 重啟 常駐服務、定時任務
Transient 不重啟 重啟 請求處理器、臨時計算
Temporary 不重啟 不重啟 一次性任務、除錯

模式四:分散式Actor——遠端訊息模式

當Actor跨越程序邊界時,需要序列化訊息並透過網路傳輸。這個模式展示如何構建可遠端通訊的Actor系統。

use tokio::sync::{mpsc, oneshot};
use tokio::net::{TcpListener, TcpStream};
use serde::{Serialize, Deserialize};
use std::collections::HashMap;
use std::net::SocketAddr;

#[derive(Debug, Serialize, Deserialize)]
struct RemoteEnvelope {
    target: String,
    message_type: String,
    payload: Vec<u8>,
    reply_to: Option<u64>,
}

#[derive(Debug, Serialize, Deserialize)]
enum OrderCommand {
    Create { order_id: String, item: String, quantity: u32 },
    Cancel { order_id: String },
    Query { order_id: String },
}

#[derive(Debug, Serialize, Deserialize)]
enum OrderResponse {
    Created { order_id: String },
    Cancelled { order_id: String },
    Found { order_id: String, item: String, quantity: u32 },
    NotFound { order_id: String },
    Error { message: String },
}

#[derive(Debug, Serialize, Deserialize, Clone)]
struct Order {
    id: String,
    item: String,
    quantity: u32,
    status: String,
}

struct OrderActor {
    orders: HashMap<String, Order>,
    cmd_rx: mpsc::Receiver<(OrderCommand, oneshot::Sender<OrderResponse>)>,
}

impl OrderActor {
    fn new(rx: mpsc::Receiver<(OrderCommand, oneshot::Sender<OrderResponse>)>) -> Self {
        Self { orders: HashMap::new(), cmd_rx: rx }
    }

    async fn run(&mut self) {
        while let Some((cmd, responder)) = self.cmd_rx.recv().await {
            let response = match cmd {
                OrderCommand::Create { order_id, item, quantity } => {
                    if self.orders.contains_key(&order_id) {
                        OrderResponse::Error { message: format!("order {} already exists", order_id) }
                    } else {
                        self.orders.insert(order_id.clone(), Order {
                            id: order_id.clone(),
                            item,
                            quantity,
                            status: "created".into(),
                        });
                        OrderResponse::Created { order_id }
                    }
                }
                OrderCommand::Cancel { order_id } => {
                    match self.orders.get_mut(&order_id) {
                        Some(order) => {
                            order.status = "cancelled".into();
                            OrderResponse::Cancelled { order_id }
                        }
                        None => OrderResponse::NotFound { order_id },
                    }
                }
                OrderCommand::Query { order_id } => {
                    match self.orders.get(&order_id) {
                        Some(order) => OrderResponse::Found {
                            order_id: order.id.clone(),
                            item: order.item.clone(),
                            quantity: order.quantity,
                        },
                        None => OrderResponse::NotFound { order_id },
                    }
                }
            };
            let _ = responder.send(response);
        }
    }
}

struct RemoteGateway {
    local_actors: HashMap<String, mpsc::Sender<(OrderCommand, oneshot::Sender<OrderResponse>)>>,
    remote_nodes: HashMap<String, SocketAddr>,
    next_request_id: u64,
}

impl RemoteGateway {
    fn new() -> Self {
        Self {
            local_actors: HashMap::new(),
            remote_nodes: HashMap::new(),
            next_request_id: 0,
        }
    }

    fn register_local(&mut self, name: String, tx: mpsc::Sender<(OrderCommand, oneshot::Sender<OrderResponse>)>) {
        self.local_actors.insert(name, tx);
    }

    fn register_remote(&mut self, name: String, addr: SocketAddr) {
        self.remote_nodes.insert(name, addr);
    }

    async fn send_command(&mut self, target: &str, cmd: OrderCommand) -> Option<OrderResponse> {
        if let Some(tx) = self.local_actors.get(target) {
            let (resp_tx, resp_rx) = oneshot::channel();
            if tx.send((cmd, resp_tx)).await.is_ok() {
                return resp_rx.await.ok();
            }
            return None;
        }

        if let Some(addr) = self.remote_nodes.get(target) {
            match TcpStream::connect(addr).await {
                Ok(mut stream) => {
                    let payload = bincode::serialize(&cmd).ok()?;
                    let request_id = self.next_request_id;
                    self.next_request_id += 1;

                    let envelope = RemoteEnvelope {
                        target: target.to_string(),
                        message_type: "OrderCommand".to_string(),
                        payload,
                        reply_to: Some(request_id),
                    };

                    let data = bincode::serialize(&envelope).ok()?;
                    let len = data.len() as u32;
                    use tokio::io::AsyncWriteExt;
                    if stream.write_all(&len.to_le_bytes()).await.is_err() { return None; }
                    if stream.write_all(&data).await.is_err() { return None; }

                    use tokio::io::AsyncReadExt;
                    let mut len_buf = [0u8; 4];
                    if stream.read_exact(&mut len_buf).await.is_err() { return None; }
                    let resp_len = u32::from_le_bytes(len_buf) as usize;
                    let mut resp_buf = vec![0u8; resp_len];
                    if stream.read_exact(&mut resp_buf).await.is_err() { return None; }
                    bincode::deserialize(&resp_buf).ok()
                }
                Err(e) => {
                    eprintln!("failed to connect to {}: {}", target, e);
                    None
                }
            }
        } else {
            None
        }
    }
}

async fn server_node(addr: SocketAddr) {
    let (tx, rx) = mpsc::channel::<(OrderCommand, oneshot::Sender<OrderResponse>)>(256);
    let mut actor = OrderActor::new(rx);
    tokio::spawn(async move { actor.run().await });

    let listener = TcpListener::bind(addr).await.unwrap();
    println!("order actor listening on {}", addr);

    loop {
        if let Ok((mut stream, _)) = listener.accept().await {
            let tx = tx.clone();
            tokio::spawn(async move {
                use tokio::io::AsyncReadExt;
                use tokio::io::AsyncWriteExt;

                let mut len_buf = [0u8; 4];
                if stream.read_exact(&mut len_buf).await.is_err() { return; }
                let len = u32::from_le_bytes(len_buf) as usize;
                let mut buf = vec![0u8; len];
                if stream.read_exact(&mut buf).await.is_err() { return; }

                let envelope: RemoteEnvelope = match bincode::deserialize(&buf) {
                    Ok(e) => e,
                    Err(_) => return,
                };

                let cmd: OrderCommand = match bincode::deserialize(&envelope.payload) {
                    Ok(c) => c,
                    Err(_) => return,
                };

                let (resp_tx, resp_rx) = oneshot::channel();
                if tx.send((cmd, resp_tx)).await.is_err() { return; }
                let response = match resp_rx.await {
                    Ok(r) => r,
                    Err(_) => return,
                };

                let resp_data = bincode::serialize(&response).unwrap();
                let resp_len = resp_data.len() as u32;
                let _ = stream.write_all(&resp_len.to_le_bytes()).await;
                let _ = stream.write_all(&resp_data).await;
            });
        }
    }
}

#[tokio::main]
async fn main() {
    let server_addr: SocketAddr = "127.0.0.1:9001".parse().unwrap();
    tokio::spawn(server_node(server_addr));

    tokio::time::sleep(Duration::from_millis(100)).await;

    let mut gateway = RemoteGateway::new();
    gateway.register_remote("order-service".into(), server_addr);

    let response = gateway.send_command("order-service", OrderCommand::Create {
        order_id: "ORD-001".into(),
        item: "Rust Book".into(),
        quantity: 2,
    }).await;
    println!("create response: {:?}", response);

    let response = gateway.send_command("order-service", OrderCommand::Query {
        order_id: "ORD-001".into(),
    }).await;
    println!("query response: {:?}", response);
}

關鍵要點

  • RemoteEnvelope 封裝訊息元資料:目標Actor、訊息型別、承載、回覆位址
  • 本地Actor直接透過 channel 通訊,遠端Actor透過 TCP 傳輸序列化訊息
  • RemoteGateway 統一本地/遠端路由,業務程式碼無需關心Actor位置
  • bincode 提供高效的二進位序列化,比 JSON 更適合 Actor 間通訊
  • 長度前綴幀協定(4位元組長度+資料)解決 TCP 黏包問題

模式五:生產級Actor系統——優雅關閉與背壓

生產環境中的Actor系統需要處理優雅關閉、背壓控制、訊息超時等複雜場景。

use tokio::sync::{mpsc, oneshot, watch, broadcast};
use tokio::time::{timeout, Duration, Instant};
use tokio::select;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

struct ActorSystem {
    shutdown_tx: watch::Sender<bool>,
    shutdown_rx: watch::Receiver<bool>,
    event_tx: broadcast::Sender<String>,
    metrics: Arc<ActorMetrics>,
}

struct ActorMetrics {
    messages_sent: AtomicU64,
    messages_processed: AtomicU64,
    messages_dropped: AtomicU64,
    actors_alive: AtomicU64,
}

impl ActorMetrics {
    fn new() -> Self {
        Self {
            messages_sent: AtomicU64::new(0),
            messages_processed: AtomicU64::new(0),
            messages_dropped: AtomicU64::new(0),
            actors_alive: AtomicU64::new(0),
        }
    }

    fn snapshot(&self) -> String {
        format!(
            "sent: {}, processed: {}, dropped: {}, alive: {}",
            self.messages_sent.load(Ordering::Relaxed),
            self.messages_processed.load(Ordering::Relaxed),
            self.messages_dropped.load(Ordering::Relaxed),
            self.actors_alive.load(Ordering::Relaxed),
        )
    }
}

enum WorkerCmd {
    Execute { task: String, responder: oneshot::Sender<String> },
    Ping { responder: oneshot::Sender<bool> },
}

struct ResilientWorker {
    id: String,
    cmd_rx: mpsc::Receiver<WorkerCmd>,
    shutdown_rx: watch::Receiver<bool>,
    event_tx: broadcast::Sender<String>,
    metrics: Arc<ActorMetrics>,
    mailbox_capacity: usize,
    process_timeout: Duration,
}

impl ResilientWorker {
    async fn run(&mut self) {
        self.metrics.actors_alive.fetch_add(1, Ordering::Relaxed);
        println!("worker {} started", self.id);

        loop {
            select! {
                _ = self.shutdown_rx.changed() => {
                    if *self.shutdown_rx.borrow() {
                        self.drain_mailbox().await;
                        break;
                    }
                }
                cmd = self.cmd_rx.recv() => {
                    match cmd {
                        Some(cmd) => self.handle_command(cmd).await,
                        None => break,
                    }
                }
            }
        }

        self.metrics.actors_alive.fetch_sub(1, Ordering::Relaxed);
        println!("worker {} stopped", self.id);
    }

    async fn handle_command(&mut self, cmd: WorkerCmd) {
        match cmd {
            WorkerCmd::Execute { task, responder } => {
                self.metrics.messages_sent.fetch_add(1, Ordering::Relaxed);

                let result = match timeout(self.process_timeout, self.process_task(&task)).await {
                    Ok(output) => {
                        self.metrics.messages_processed.fetch_add(1, Ordering::Relaxed);
                        output
                    }
                    Err(_) => {
                        self.metrics.messages_dropped.fetch_add(1, Ordering::Relaxed);
                        let _ = self.event_tx.send(format!("worker {} timeout on: {}", self.id, task));
                        "timeout".to_string()
                    }
                };
                let _ = responder.send(result);
            }
            WorkerCmd::Ping { responder } => {
                let _ = responder.send(true);
            }
        }
    }

    async fn process_task(&self, task: &str) -> String {
        tokio::time::sleep(Duration::from_millis(10)).await;
        format!("worker {} processed: {}", self.id, task.to_uppercase())
    }

    async fn drain_mailbox(&mut self) {
        println!("worker {} draining mailbox...", self.id);
        let drain_timeout = Duration::from_secs(5);
        let deadline = Instant::now() + drain_timeout;

        loop {
            if Instant::now() > deadline {
                println!("worker {} drain timeout, {} messages remaining", self.id, self.cmd_rx.len());
                break;
            }

            match timeout(Duration::from_millis(100), self.cmd_rx.recv()).await {
                Ok(Some(WorkerCmd::Execute { task, responder })) => {
                    let result = self.process_task(&task).await;
                    self.metrics.messages_processed.fetch_add(1, Ordering::Relaxed);
                    let _ = responder.send(result);
                }
                Ok(Some(WorkerCmd::Ping { responder })) => {
                    let _ = responder.send(false);
                }
                Ok(None) => break,
                Err(_) => continue,
            }
        }
        println!("worker {} mailbox drained", self.id);
    }
}

impl ActorSystem {
    fn new() -> Self {
        let (shutdown_tx, shutdown_rx) = watch::channel(false);
        let (event_tx, _) = broadcast::channel::<String>(256);
        let metrics = Arc::new(ActorMetrics::new());

        Self { shutdown_tx, shutdown_rx, event_tx, metrics }
    }

    async fn spawn_worker(&self, id: String, mailbox_capacity: usize) -> mpsc::Sender<WorkerCmd> {
        let (tx, rx) = mpsc::channel::<WorkerCmd>(mailbox_capacity);
        let mut worker = ResilientWorker {
            id,
            cmd_rx: rx,
            shutdown_rx: self.shutdown_rx.clone(),
            event_tx: self.event_tx.clone(),
            metrics: self.metrics.clone(),
            mailbox_capacity,
            process_timeout: Duration::from_secs(2),
        };
        tokio::spawn(async move { worker.run().await });
        tx
    }

    async fn graceful_shutdown(&self) {
        println!("initiating graceful shutdown...");
        self.shutdown_tx.send(true).ok();
        tokio::time::sleep(Duration::from_secs(6)).await;
        println!("final metrics: {}", self.metrics.snapshot());
    }
}

async fn backpressure_sender(
    tx: mpsc::Sender<WorkerCmd>,
    worker_id: String,
    total: u64,
) -> u64 {
    let mut sent = 0u64;
    let mut dropped = 0u64;

    for i in 0..total {
        let (resp_tx, resp_rx) = oneshot::channel();
        let task = format!("task-{}", i);

        match timeout(Duration::from_millis(50), tx.send(WorkerCmd::Execute {
            task,
            responder: resp_tx,
        })).await {
            Ok(Ok(())) => {
                match timeout(Duration::from_secs(3), resp_rx).await {
                    Ok(Ok(result)) => {
                        sent += 1;
                        if i % 100 == 0 {
                            println!("{} result: {}", worker_id, result);
                        }
                    }
                    _ => dropped += 1,
                }
            }
            _ => dropped += 1,
        }
    }

    println!("{} sent: {}, dropped: {}", worker_id, sent, dropped);
    dropped
}

#[tokio::main]
async fn main() {
    let system = ActorSystem::new();

    let worker1 = system.spawn_worker("worker-1".into(), 64).await;
    let worker2 = system.spawn_worker("worker-2".into(), 64).await;

    let h1 = tokio::spawn(backpressure_sender(worker1, "sender-1".into(), 200));
    let h2 = tokio::spawn(backpressure_sender(worker2, "sender-2".into(), 200));

    h1.await.unwrap();
    h2.await.unwrap();

    println!("metrics: {}", system.metrics.snapshot());
    system.graceful_shutdown().await;
}

關鍵要點

  • watch::channel(false) 廣播關閉訊號,所有Actor同時收到通知
  • drain_mailbox() 在關閉前處理剩餘訊息,設定超時防止無限等待
  • timeout 包裹 sendrecv,防止背壓導致無限阻塞
  • ActorMetricsAtomicU64 採集指標,無鎖且執行緒安全
  • 有界 mailbox(容量 64)天然實現背壓,生產者在滿時等待

五大常見陷阱

陷阱一:Actor間循環等待

// ❌ 錯誤:Actor A等Actor B的回應,Actor B等Actor A的回應
async fn deadlock_pattern() {
    let (tx_a, mut rx_a) = mpsc::channel::<String>(16);
    let (tx_b, mut rx_b) = mpsc::channel::<String>(16);

    tokio::spawn(async move {
        tx_b.send("request-from-a".into()).await.unwrap();
        let response = rx_a.recv().await.unwrap();
    });

    tokio::spawn(async move {
        tx_a.send("request-from-b".into()).await.unwrap();
        let response = rx_b.recv().await.unwrap();
    });
}

// ✅ 正確:使用超時或單向訊息打破循環
async fn safe_pattern() {
    let (tx_a, mut rx_a) = mpsc::channel::<String>(16);
    let (tx_b, mut rx_b) = mpsc::channel::<String>(16);

    tokio::spawn(async move {
        tx_b.send("request-from-a".into()).await.unwrap();
        match timeout(Duration::from_secs(5), rx_a.recv()).await {
            Ok(Some(response)) => println!("got: {}", response),
            _ => println!("timeout or channel closed"),
        }
    });
}

陷阱二:忘記處理Mailbox滿的情況

// ❌ 錯誤:無界channel,記憶體可能爆炸
async fn unbounded_risk() {
    let (tx, mut rx) = mpsc::unbounded_channel::<String>();
    for i in 0..1000000 {
        tx.send(format!("msg-{}", i)).unwrap();
    }
}

// ✅ 正確:有界channel + 背壓
async fn bounded_safe() {
    let (tx, mut rx) = mpsc::channel::<String>(256);
    for i in 0..1000 {
        match tx.send(format!("msg-{}", i)).await {
            Ok(()) => {}
            Err(_) => {
                println!("receiver dropped at {}", i);
                return;
            }
        }
    }
}

陷阱三:監督者不限制重啟次數

// ❌ 錯誤:無限重啟,可能形成崩潰-重啟迴圈
async fn infinite_restart_loop() {
    loop {
        let (tx, rx) = mpsc::channel::<String>(16);
        let handle = tokio::spawn(async move {
            panic!("always crash!");
        });
        let _ = handle.await;
        println!("restarting...");
    }
}

// ✅ 正確:限制重啟次數和頻率
struct RestartPolicy {
    max_restarts: u32,
    window: Duration,
    restarts: Vec<Instant>,
}

impl RestartPolicy {
    fn new(max_restarts: u32, window: Duration) -> Self {
        Self { max_restarts, window, restarts: Vec::new() }
    }

    fn should_restart(&mut self) -> bool {
        let now = Instant::now();
        self.restarts.retain(|t| now - *t < self.window);
        if self.restarts.len() < self.max_restarts as usize {
            self.restarts.push(now);
            true
        } else {
            false
        }
    }
}

陷阱四:遠端訊息不設超時

// ❌ 錯誤:遠端呼叫可能永遠阻塞
async fn remote_without_timeout() {
    let response = gateway.send_command("remote-actor", cmd).await;
}

// ✅ 正確:所有遠端呼叫必須設超時
async fn remote_with_timeout() {
    match timeout(Duration::from_secs(5), gateway.send_command("remote-actor", cmd)).await {
        Ok(Some(response)) => println!("got: {:?}", response),
        Ok(None) => println!("actor not found"),
        Err(_) => println!("remote call timeout"),
    }
}

陷阱五:關閉時丟棄未處理訊息

// ❌ 錯誤:直接drop receiver,訊息遺失
async fn bad_shutdown(mut rx: mpsc::Receiver<String>) {
    drop(rx);
}

// ✅ 正確:drain後再關閉
async fn good_shutdown(mut rx: mpsc::Receiver<String>) {
    println!("draining remaining messages...");
    while let Ok(Some(msg)) = timeout(Duration::from_millis(100), rx.recv()).await {
        println!("drained: {}", msg);
    }
    println!("all messages processed");
}

錯誤排查速查表

錯誤現象 可能原因 解決方案
Actor不回應訊息 Mailbox滿或死結 檢查mailbox容量,新增超時機制
SendError 頻繁出現 接收端已關閉 檢查Actor是否意外退出,新增監督
記憶體持續增長 無界channel或訊息積壓 改用有界channel,新增背壓
重啟迴圈 Actor初始化失敗 檢查啟動邏輯,限制重啟次數
遠端呼叫超時 網路延遲或對端無回應 新增超時和重試機制
訊息順序錯亂 多生產者併發傳送 單生產者保證順序,或新增序列號
優雅關閉卡住 某個Actor不回應關閉訊號 為drain設定超時
oneshot RecvError 回應端未傳送就退出 檢查所有程式碼路徑是否都傳送了回應

進階最佳化技巧

1. Actor池化——負載均衡

當單個Actor成為瓶頸時,用Actor池分發訊息:

use tokio::sync::mpsc;

struct ActorPool {
    workers: Vec<mpsc::Sender<WorkerCmd>>,
    next: std::sync::atomic::AtomicUsize,
}

impl ActorPool {
    fn new(workers: Vec<mpsc::Sender<WorkerCmd>>) -> Self {
        Self {
            workers,
            next: std::sync::atomic::AtomicUsize::new(0),
        }
    }

    fn next_worker(&self) -> &mpsc::Sender<WorkerCmd> {
        let idx = self.next.fetch_add(1, Ordering::Relaxed) % self.workers.len();
        &self.workers[idx]
    }

    async fn send(&self, cmd: WorkerCmd) -> bool {
        let worker = self.next_worker();
        worker.send(cmd).await.is_ok()
    }
}

2. 訊息優先級

use tokio::sync::mpsc;
use tokio::select;

enum Priority {
    High,
    Normal,
    Low,
}

struct PriorityMailbox {
    high_rx: mpsc::Receiver<String>,
    normal_rx: mpsc::Receiver<String>,
    low_rx: mpsc::Receiver<String>,
}

impl PriorityMailbox {
    async fn recv(&mut self) -> Option<String> {
        select! {
            biased;
            msg = self.high_rx.recv() => msg,
            msg = self.normal_rx.recv() => msg,
            msg = self.low_rx.recv() => msg,
        }
    }
}

3. 訊息追蹤與鏈路ID

use std::sync::atomic::{AtomicU64, Ordering};

static TRACE_ID: AtomicU64 = AtomicU64::new(1);

fn new_trace_id() -> u64 {
    TRACE_ID.fetch_add(1, Ordering::Relaxed)
}

#[derive(Debug, Clone)]
struct TracedMessage {
    trace_id: u64,
    span_id: u64,
    parent_span: Option<u64>,
    payload: String,
}

impl TracedMessage {
    fn new(payload: String) -> Self {
        Self {
            trace_id: new_trace_id(),
            span_id: new_trace_id(),
            parent_span: None,
            payload,
        }
    }

    fn child(&self, payload: String) -> Self {
        Self {
            trace_id: self.trace_id,
            span_id: new_trace_id(),
            parent_span: Some(self.span_id),
            payload,
        }
    }
}

Actor框架對比

特性 Actix Tokio Channel Riker Coerce
訊息傳遞 型別化Message 手動enum 型別化 型別化
監督樹 內建 手動實作 內建 內建
遠端Actor actix-remote 手動實作 支援 內建
效能 極高
依賴 actix 僅tokio riker coerce
學習曲線 中等 平緩 中等 中等
生態成熟度
適用場景 Web服務 通用併發 傳統Actor 分散式

總結

Rust Actor模型的核心哲學是:不要透過共享記憶體來通訊,而要透過通訊來共享記憶體。Actix提供了成熟的型別化Actor系統,Tokio Channel Actor則更輕量靈活。監督樹是生產級Actor系統的基石——沒有監督的Actor系統就像沒有安全帶的汽車。在分散式場景中,統一本地/遠端通訊介面是關鍵。始終使用有界mailbox實現背壓,為所有遠端呼叫設定超時,用drain機制確保優雅關閉,你的Actor系統將既健壯又高效。


推薦工具

本站提供瀏覽器本地工具,免註冊即可試用 →

#Rust Actor#Actix#Tokio#并发模型#消息传递#2026#编程语言