Rust Actor模型框架实战:从消息传递到监督树的5种生产模式

编程语言

当并发模型遇上Rust的所有权

你有没有遇到过这种情况——系统需要同时处理上千个独立的状态实体,每个实体都有自己的生命周期和消息队列。你试过用 Arc<Mutex<HashMap>> 管理共享状态,结果锁竞争让吞吐量跌到谷底;你试过用线程池分发任务,却发现线程间通信的开销比计算本身还大;最终你意识到,Actor模型才是这类问题的正解

Actor模型的核心思想很简单:每个Actor是一个独立的计算单元,拥有私有状态,只能通过消息传递与外界交互。没有共享状态,就没有锁竞争;没有锁竞争,就没有死锁。Rust的所有权系统与Actor模型天然契合——消息的所有权在发送时转移,编译器帮你保证线程安全。


核心概念一览

概念 说明 典型场景
Actor 独立计算单元,封装状态与行为 状态机、连接管理
Message Actor间通信的唯一方式 命令、事件、查询
Mailbox Actor的消息队列 消息缓冲、有序处理
Supervision 父Actor监控子Actor 故障恢复、重启策略
Address Actor的引用句柄 消息寻址、路由
Context Actor的运行时环境 定时器、子Actor管理
Backpressure 流量控制机制 防止Mailbox溢出

Actor模型的五大挑战

1. 共享状态的锁地狱

传统并发模型依赖共享内存+锁,但在高并发场景下,锁竞争成为瓶颈。Actor模型通过"不共享"来解决这个问题——每个Actor独占自己的状态。

2. 故障隔离与恢复

一个组件崩溃不应该拖垮整个系统。Actor模型通过监督树实现故障隔离——子Actor崩溃由父Actor决定如何处理(重启、停止、升级重启等)。

3. 消息有序性保证

同一个Actor处理消息天然有序,但跨Actor的消息顺序如何保证?需要精心设计消息协议和路由策略。

4. 分布式透明性

本地Actor和远程Actor的通信方式应该一致。如何在不改变业务逻辑的前提下,让Actor跨节点通信?

5. 优雅关闭的复杂性

一个Actor系统可能有成百上千个Actor,如何确保关闭时所有消息都被处理完毕、所有资源都被释放?


五种生产级Actor模式

模式一:Actix Actor系统——经典Actor模式

Actix是Rust生态中最成熟的Actor框架,基于类型化的消息处理,提供完整的Actor生命周期管理。

use actix::prelude::*;
use std::time::Duration;

struct GameSession {
    id: String,
    score: u64,
    level: u32,
}

impl Actor for GameSession {
    type Context = Context<Self>;

    fn started(&mut self, _ctx: &mut Self::Context) {
        println!("session {} started, score: {}", self.id, self.score);
    }

    fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
        println!("session {} stopping", self.id);
        Running::Continue
    }

    fn stopped(&mut self, _ctx: &mut Self::Context) {
        println!("session {} stopped, final score: {}", self.id, self.score);
    }
}

#[derive(Message)]
#[rtype(result = "u64")]
struct GetScore;

#[derive(Message)]
#[rtype(result = "()")]
struct AddScore { amount: u64 }

#[derive(Message)]
#[rtype(result = "()")]
struct LevelUp;

impl Handler<GetScore> for GameSession {
    type Result = u64;

    fn handle(&mut self, _msg: GetScore, _ctx: &mut Context<Self>) -> u64 {
        self.score
    }
}

impl Handler<AddScore> for GameSession {
    type Result = ();

    fn handle(&mut self, msg: AddScore, _ctx: &mut Context<Self>) {
        self.score += msg.amount;
        println!("session {} score: {}", self.id, self.score);
    }
}

impl Handler<LevelUp> for GameSession {
    type Result = ();

    fn handle(&mut self, _msg: LevelUp, ctx: &mut Context<Self>) {
        self.level += 1;
        println!("session {} leveled up to {}", self.id, self.level);

        ctx.notify_later(LevelUp, Duration::from_secs(30));
    }
}

#[actix::main]
async fn main() {
    let addr = GameSession {
        id: "player-001".into(),
        score: 0,
        level: 1,
    }.start();

    addr.send(AddScore { amount: 100 }).await.unwrap();
    addr.send(AddScore { amount: 50 }).await.unwrap();
    addr.send(LevelUp).await.unwrap();

    let score = addr.send(GetScore).await.unwrap();
    println!("final score: {}", score);
}

关键要点

  • impl Actor 定义Actor生命周期:startedstoppingstopped
  • #[derive(Message)] 定义类型化消息,rtype 指定返回类型
  • Handler<T> trait 为每种消息类型实现处理逻辑
  • ctx.notify_later 实现定时自消息,构建周期性任务
  • addr.send() 异步发送消息并等待响应

模式二:Tokio Channel-Based Actor——轻量级Actor模式

不依赖Actor框架,用Tokio channel手写Actor,更轻量、更灵活,适合对性能和可控性要求极高的场景。

use tokio::sync::{mpsc, oneshot};
use std::collections::HashMap;

enum CacheCommand {
    Get {
        key: String,
        responder: oneshot::Sender<Option<String>>,
    },
    Set {
        key: String,
        value: String,
        ttl: Option<std::time::Duration>,
        responder: oneshot::Sender<bool>,
    },
    Delete {
        key: String,
        responder: oneshot::Sender<bool>,
    },
    Stats {
        responder: oneshot::Sender<CacheStats>,
    },
}

#[derive(Debug)]
struct CacheStats {
    entries: usize,
    hits: u64,
    misses: u64,
}

struct CacheEntry {
    value: String,
    expires_at: Option<std::time::Instant>,
}

struct CacheActor {
    store: HashMap<String, CacheEntry>,
    hits: u64,
    misses: u64,
}

impl CacheActor {
    fn new() -> Self {
        Self {
            store: HashMap::new(),
            hits: 0,
            misses: 0,
        }
    }

    fn is_expired(&self, entry: &CacheEntry) -> bool {
        match entry.expires_at {
            Some(t) => std::time::Instant::now() > t,
            None => false,
        }
    }

    async fn run(&mut self, mut rx: mpsc::Receiver<CacheCommand>) {
        while let Some(cmd) = rx.recv().await {
            match cmd {
                CacheCommand::Get { key, responder } => {
                    let result = match self.store.get(&key) {
                        Some(entry) if !self.is_expired(entry) => {
                            self.hits += 1;
                            Some(entry.value.clone())
                        }
                        Some(_) => {
                            self.store.remove(&key);
                            self.misses += 1;
                            None
                        }
                        None => {
                            self.misses += 1;
                            None
                        }
                    };
                    let _ = responder.send(result);
                }
                CacheCommand::Set { key, value, ttl, responder } => {
                    let expires_at = ttl.map(|d| std::time::Instant::now() + d);
                    self.store.insert(key, CacheEntry { value, expires_at });
                    let _ = responder.send(true);
                }
                CacheCommand::Delete { key, responder } => {
                    let existed = self.store.remove(&key).is_some();
                    let _ = responder.send(existed);
                }
                CacheCommand::Stats { responder } => {
                    let stats = CacheStats {
                        entries: self.store.len(),
                        hits: self.hits,
                        misses: self.misses,
                    };
                    let _ = responder.send(stats);
                }
            }
        }
        println!("cache actor shutting down, {} entries remaining", self.store.len());
    }
}

#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::channel::<CacheCommand>(256);
    let mut actor = CacheActor::new();
    tokio::spawn(async move { actor.run(rx).await });

    let (resp_tx, resp_rx) = oneshot::channel();
    tx.send(CacheCommand::Set {
        key: "user:1001".into(),
        value: r#"{"name":"alice"}"#.into(),
        ttl: Some(std::time::Duration::from_secs(300)),
        responder: resp_tx,
    }).await.unwrap();
    resp_rx.await.unwrap();

    let (resp_tx, resp_rx) = oneshot::channel();
    tx.send(CacheCommand::Get {
        key: "user:1001".into(),
        responder: resp_tx,
    }).await.unwrap();
    let result = resp_rx.await.unwrap();
    println!("get result: {:?}", result);

    let (resp_tx, resp_rx) = oneshot::channel();
    tx.send(CacheCommand::Stats { responder: resp_tx }).await.unwrap();
    let stats = resp_rx.await.unwrap();
    println!("cache stats: {:?}", stats);
}

关键要点

  • mpsc channel 作为 Actor 的 mailbox,oneshot channel 实现请求-响应
  • enum Command 定义消息协议,类型安全且可扩展
  • Actor 是单所有者,天然无数据竞争
  • 无需外部框架依赖,Tokio 即可构建完整 Actor 系统
  • ttl 过期机制展示 Actor 内部状态管理的灵活性

Channel Actor vs Actix 对比

特性 Channel Actor Actix
依赖 仅 tokio actix + tokio
消息类型 手动 enum derive Message
响应机制 oneshot channel Message rtype
生命周期 手动管理 自动回调
监督 手动实现 内置支持
性能开销 极低
学习曲线 平缓 中等

模式三:监督树——故障恢复模式

监督树是Actor模型最强大的特性之一:父Actor监控子Actor,当子Actor崩溃时根据策略决定如何恢复。

use tokio::sync::{mpsc, oneshot, watch};
use tokio::time::{sleep, Duration};
use std::collections::HashMap;

#[derive(Debug, Clone)]
enum RestartStrategy {
    Permanent,
    Transient,
    Temporary,
}

#[derive(Debug)]
enum WorkerMessage {
    Process { data: String, responder: oneshot::Sender<String> },
    Crash,
    Status { responder: oneshot::Sender<WorkerStatus>,
    },
}

#[derive(Debug, Clone)]
struct WorkerStatus {
    id: String,
    processed: u64,
    alive: bool,
}

struct Worker {
    id: String,
    processed: u64,
}

impl Worker {
    fn new(id: String) -> Self {
        Self { id, processed: 0 }
    }

    async fn run(
        &mut self,
        mut rx: mpsc::Receiver<WorkerMessage>,
        supervisor_tx: mpsc::Sender<SupervisorMessage>,
    ) {
        println!("worker {} started", self.id);
        while let Some(msg) = rx.recv().await {
            match msg {
                WorkerMessage::Process { data, responder } => {
                    self.processed += 1;
                    let result = format!("processed by {}: {}", self.id, data.to_uppercase());
                    let _ = responder.send(result);
                }
                WorkerMessage::Crash => {
                    println!("worker {} crashing!", self.id);
                    let _ = supervisor_tx.send(SupervisorMessage::ChildCrashed {
                        id: self.id.clone(),
                    }).await;
                    return;
                }
                WorkerMessage::Status { responder } => {
                    let _ = responder.send(WorkerStatus {
                        id: self.id.clone(),
                        processed: self.processed,
                        alive: true,
                    });
                }
            }
        }
        let _ = supervisor_tx.send(SupervisorMessage::ChildStopped {
            id: self.id.clone(),
        }).await;
    }
}

enum SupervisorMessage {
    ChildCrashed { id: String },
    ChildStopped { id: String },
    SpawnWorker { id: String },
    GetStatus { responder: oneshot::Sender<Vec<WorkerStatus>> },
}

struct ChildHandle {
    tx: mpsc::Sender<WorkerMessage>,
    strategy: RestartStrategy,
}

struct Supervisor {
    children: HashMap<String, ChildHandle>,
    supervisor_rx: mpsc::Receiver<SupervisorMessage>,
    restart_count: HashMap<String, u32>,
    max_restarts: u32,
    restart_window: Duration,
}

impl Supervisor {
    fn new(rx: mpsc::Receiver<SupervisorMessage>, max_restarts: u32) -> Self {
        Self {
            children: HashMap::new(),
            supervisor_rx: rx,
            restart_count: HashMap::new(),
            max_restarts,
            restart_window: Duration::from_secs(60),
        }
    }

    async fn spawn_child(&mut self, id: String, strategy: RestartStrategy) {
        let (tx, rx) = mpsc::channel::<WorkerMessage>(64);
        let supervisor_tx = self.supervisor_tx();
        let worker_id = id.clone();
        let mut worker = Worker::new(id.clone());

        tokio::spawn(async move {
            worker.run(rx, supervisor_tx).await;
        });

        self.children.insert(id, ChildHandle { tx, strategy });
        println!("supervisor spawned child {}", worker_id);
    }

    fn supervisor_tx(&self) -> mpsc::Sender<SupervisorMessage> {
        self.supervisor_rx.clone_sender()
    }

    async fn restart_child(&mut self, id: &str) -> bool {
        let restarts = self.restart_count.entry(id.to_string()).or_insert(0);
        if *restarts >= self.max_restarts {
            println!("supervisor: child {} exceeded max restarts ({})", id, self.max_restarts);
            return false;
        }
        *restarts += 1;
        println!("supervisor: restarting child {} (attempt {})", id, *restarts);

        let strategy = self.children.get(id).map(|h| h.strategy.clone());
        if let Some(strategy) = strategy {
            self.children.remove(id);
            self.spawn_child(id.to_string(), strategy).await;
            true
        } else {
            false
        }
    }

    async fn run(&mut self) {
        while let Some(msg) = self.supervisor_rx.recv().await {
            match msg {
                SupervisorMessage::ChildCrashed { id } => {
                    let strategy = self.children.get(&id).map(|h| h.strategy.clone());
                    match strategy {
                        Some(RestartStrategy::Permanent) => {
                            self.restart_child(&id).await;
                        }
                        Some(RestartStrategy::Transient) => {
                            self.restart_child(&id).await;
                        }
                        Some(RestartStrategy::Temporary) => {
                            println!("supervisor: child {} crashed, not restarting (temporary)", id);
                            self.children.remove(&id);
                        }
                        None => {}
                    }
                }
                SupervisorMessage::ChildStopped { id } => {
                    let strategy = self.children.get(&id).map(|h| h.strategy.clone());
                    match strategy {
                        Some(RestartStrategy::Permanent) => {
                            self.restart_child(&id).await;
                        }
                        Some(RestartStrategy::Transient) | Some(RestartStrategy::Temporary) => {
                            self.children.remove(&id);
                        }
                        None => {}
                    }
                }
                SupervisorMessage::SpawnWorker { id } => {
                    self.spawn_child(id, RestartStrategy::Permanent).await;
                }
                SupervisorMessage::GetStatus { responder } => {
                    let mut statuses = Vec::new();
                    for (id, handle) in &self.children {
                        let (resp_tx, resp_rx) = oneshot::channel();
                        if handle.tx.send(WorkerMessage::Status { responder: resp_tx }).await.is_ok() {
                            if let Ok(status) = resp_rx.await {
                                statuses.push(status);
                            }
                        }
                    }
                    let _ = responder.send(statuses);
                }
            }
        }
    }
}

#[tokio::main]
async fn main() {
    let (sup_tx, sup_rx) = mpsc::channel::<SupervisorMessage>(256);
    let mut supervisor = Supervisor::new(sup_rx, 3);
    tokio::spawn(async move { supervisor.run().await });

    sup_tx.send(SupervisorMessage::SpawnWorker { id: "worker-1".into() }).await.unwrap();
    sup_tx.send(SupervisorMessage::SpawnWorker { id: "worker-2".into() }).await.unwrap();

    sleep(Duration::from_millis(50)).await;

    if let Some(handle) = supervisor_children_get("worker-1", &sup_tx).await {
        let (resp_tx, resp_rx) = oneshot::channel();
        handle.tx.send(WorkerMessage::Process {
            data: "hello world".into(),
            responder: resp_tx,
        }).await.unwrap();
        let result = resp_rx.await.unwrap();
        println!("result: {}", result);
    }

    println!("--- triggering crash ---");
    if let Some(handle) = supervisor_children_get("worker-1", &sup_tx).await {
        let _ = handle.tx.send(WorkerMessage::Crash).await;
    }

    sleep(Duration::from_millis(100)).await;

    let (resp_tx, resp_rx) = oneshot::channel();
    sup_tx.send(SupervisorMessage::GetStatus { responder: resp_tx }).await.unwrap();
    let statuses = resp_rx.await.unwrap();
    for s in statuses {
        println!("status: {:?}", s);
    }
}

async fn supervisor_children_get(
    _id: &str,
    _sup_tx: &mpsc::Sender<SupervisorMessage>,
) -> Option<ChildHandle> {
    None
}

关键要点

  • 监督者(Supervisor)持有子Actor的 Sender,通过 mpsc 接收崩溃通知
  • 三种重启策略:Permanent(总是重启)、Transient(异常退出重启)、Temporary(不重启)
  • max_restarts + restart_window 防止无限重启循环
  • 子Actor崩溃时主动通知监督者,监督者决定是否重启
  • 监督树可以嵌套:监督者本身也可以是被监督的子Actor

重启策略对比

策略 正常退出 异常退出 典型场景
Permanent 重启 重启 常驻服务、定时任务
Transient 不重启 重启 请求处理器、临时计算
Temporary 不重启 不重启 一次性任务、调试

模式四:分布式Actor——远程消息模式

当Actor跨越进程边界时,需要序列化消息并通过网络传输。这个模式展示如何构建可远程通信的Actor系统。

use tokio::sync::{mpsc, oneshot};
use tokio::net::{TcpListener, TcpStream};
use serde::{Serialize, Deserialize};
use std::collections::HashMap;
use std::net::SocketAddr;

#[derive(Debug, Serialize, Deserialize)]
struct RemoteEnvelope {
    target: String,
    message_type: String,
    payload: Vec<u8>,
    reply_to: Option<u64>,
}

#[derive(Debug, Serialize, Deserialize)]
enum OrderCommand {
    Create { order_id: String, item: String, quantity: u32 },
    Cancel { order_id: String },
    Query { order_id: String },
}

#[derive(Debug, Serialize, Deserialize)]
enum OrderResponse {
    Created { order_id: String },
    Cancelled { order_id: String },
    Found { order_id: String, item: String, quantity: u32 },
    NotFound { order_id: String },
    Error { message: String },
}

#[derive(Debug, Serialize, Deserialize, Clone)]
struct Order {
    id: String,
    item: String,
    quantity: u32,
    status: String,
}

struct OrderActor {
    orders: HashMap<String, Order>,
    cmd_rx: mpsc::Receiver<(OrderCommand, oneshot::Sender<OrderResponse>)>,
}

impl OrderActor {
    fn new(rx: mpsc::Receiver<(OrderCommand, oneshot::Sender<OrderResponse>)>) -> Self {
        Self { orders: HashMap::new(), cmd_rx: rx }
    }

    async fn run(&mut self) {
        while let Some((cmd, responder)) = self.cmd_rx.recv().await {
            let response = match cmd {
                OrderCommand::Create { order_id, item, quantity } => {
                    if self.orders.contains_key(&order_id) {
                        OrderResponse::Error { message: format!("order {} already exists", order_id) }
                    } else {
                        self.orders.insert(order_id.clone(), Order {
                            id: order_id.clone(),
                            item,
                            quantity,
                            status: "created".into(),
                        });
                        OrderResponse::Created { order_id }
                    }
                }
                OrderCommand::Cancel { order_id } => {
                    match self.orders.get_mut(&order_id) {
                        Some(order) => {
                            order.status = "cancelled".into();
                            OrderResponse::Cancelled { order_id }
                        }
                        None => OrderResponse::NotFound { order_id },
                    }
                }
                OrderCommand::Query { order_id } => {
                    match self.orders.get(&order_id) {
                        Some(order) => OrderResponse::Found {
                            order_id: order.id.clone(),
                            item: order.item.clone(),
                            quantity: order.quantity,
                        },
                        None => OrderResponse::NotFound { order_id },
                    }
                }
            };
            let _ = responder.send(response);
        }
    }
}

struct RemoteGateway {
    local_actors: HashMap<String, mpsc::Sender<(OrderCommand, oneshot::Sender<OrderResponse>)>>,
    remote_nodes: HashMap<String, SocketAddr>,
    pending_replies: HashMap<u64, oneshot::Sender<OrderResponse>>,
    next_request_id: u64,
}

impl RemoteGateway {
    fn new() -> Self {
        Self {
            local_actors: HashMap::new(),
            remote_nodes: HashMap::new(),
            pending_replies: HashMap::new(),
            next_request_id: 0,
        }
    }

    fn register_local(&mut self, name: String, tx: mpsc::Sender<(OrderCommand, oneshot::Sender<OrderResponse>)>) {
        self.local_actors.insert(name, tx);
    }

    fn register_remote(&mut self, name: String, addr: SocketAddr) {
        self.remote_nodes.insert(name, addr);
    }

    async fn send_command(&mut self, target: &str, cmd: OrderCommand) -> Option<OrderResponse> {
        if let Some(tx) = self.local_actors.get(target) {
            let (resp_tx, resp_rx) = oneshot::channel();
            if tx.send((cmd, resp_tx)).await.is_ok() {
                return resp_rx.await.ok();
            }
            return None;
        }

        if let Some(addr) = self.remote_nodes.get(target) {
            match TcpStream::connect(addr).await {
                Ok(mut stream) => {
                    let payload = bincode::serialize(&cmd).ok()?;
                    let request_id = self.next_request_id;
                    self.next_request_id += 1;

                    let envelope = RemoteEnvelope {
                        target: target.to_string(),
                        message_type: "OrderCommand".to_string(),
                        payload,
                        reply_to: Some(request_id),
                    };

                    let data = bincode::serialize(&envelope).ok()?;
                    let len = data.len() as u32;
                    use tokio::io::AsyncWriteExt;
                    if stream.write_all(&len.to_le_bytes()).await.is_err() { return None; }
                    if stream.write_all(&data).await.is_err() { return None; }

                    use tokio::io::AsyncReadExt;
                    let mut len_buf = [0u8; 4];
                    if stream.read_exact(&mut len_buf).await.is_err() { return None; }
                    let resp_len = u32::from_le_bytes(len_buf) as usize;
                    let mut resp_buf = vec![0u8; resp_len];
                    if stream.read_exact(&mut resp_buf).await.is_err() { return None; }
                    bincode::deserialize(&resp_buf).ok()
                }
                Err(e) => {
                    eprintln!("failed to connect to {}: {}", target, e);
                    None
                }
            }
        } else {
            None
        }
    }
}

async fn server_node(addr: SocketAddr) {
    let (tx, rx) = mpsc::channel::<(OrderCommand, oneshot::Sender<OrderResponse>)>(256);
    let mut actor = OrderActor::new(rx);
    tokio::spawn(async move { actor.run().await });

    let listener = TcpListener::bind(addr).await.unwrap();
    println!("order actor listening on {}", addr);

    loop {
        if let Ok((mut stream, _)) = listener.accept().await {
            let tx = tx.clone();
            tokio::spawn(async move {
                use tokio::io::AsyncReadExt;
                use tokio::io::AsyncWriteExt;

                let mut len_buf = [0u8; 4];
                if stream.read_exact(&mut len_buf).await.is_err() { return; }
                let len = u32::from_le_bytes(len_buf) as usize;
                let mut buf = vec![0u8; len];
                if stream.read_exact(&mut buf).await.is_err() { return; }

                let envelope: RemoteEnvelope = match bincode::deserialize(&buf) {
                    Ok(e) => e,
                    Err(_) => return,
                };

                let cmd: OrderCommand = match bincode::deserialize(&envelope.payload) {
                    Ok(c) => c,
                    Err(_) => return,
                };

                let (resp_tx, resp_rx) = oneshot::channel();
                if tx.send((cmd, resp_tx)).await.is_err() { return; }
                let response = match resp_rx.await {
                    Ok(r) => r,
                    Err(_) => return,
                };

                let resp_data = bincode::serialize(&response).unwrap();
                let resp_len = resp_data.len() as u32;
                let _ = stream.write_all(&resp_len.to_le_bytes()).await;
                let _ = stream.write_all(&resp_data).await;
            });
        }
    }
}

#[tokio::main]
async fn main() {
    let server_addr: SocketAddr = "127.0.0.1:9001".parse().unwrap();
    tokio::spawn(server_node(server_addr));

    tokio::time::sleep(Duration::from_millis(100)).await;

    let mut gateway = RemoteGateway::new();
    gateway.register_remote("order-service".into(), server_addr);

    let response = gateway.send_command("order-service", OrderCommand::Create {
        order_id: "ORD-001".into(),
        item: "Rust Book".into(),
        quantity: 2,
    }).await;
    println!("create response: {:?}", response);

    let response = gateway.send_command("order-service", OrderCommand::Query {
        order_id: "ORD-001".into(),
    }).await;
    println!("query response: {:?}", response);
}

关键要点

  • RemoteEnvelope 封装消息元数据:目标Actor、消息类型、载荷、回复地址
  • 本地Actor直接通过 channel 通信,远程Actor通过 TCP 传输序列化消息
  • RemoteGateway 统一本地/远程路由,业务代码无需关心Actor位置
  • bincode 提供高效的二进制序列化,比 JSON 更适合 Actor 间通信
  • 长度前缀帧协议(4字节长度 + 数据)解决 TCP 粘包问题

模式五:生产级Actor系统——优雅关闭与背压

生产环境中的Actor系统需要处理优雅关闭、背压控制、消息超时等复杂场景。

use tokio::sync::{mpsc, oneshot, watch, broadcast};
use tokio::time::{timeout, Duration, Instant};
use tokio::select;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

struct ActorSystem {
    shutdown_tx: watch::Sender<bool>,
    shutdown_rx: watch::Receiver<bool>,
    event_tx: broadcast::Sender<String>,
    metrics: Arc<ActorMetrics>,
}

struct ActorMetrics {
    messages_sent: AtomicU64,
    messages_processed: AtomicU64,
    messages_dropped: AtomicU64,
    actors_alive: AtomicU64,
}

impl ActorMetrics {
    fn new() -> Self {
        Self {
            messages_sent: AtomicU64::new(0),
            messages_processed: AtomicU64::new(0),
            messages_dropped: AtomicU64::new(0),
            actors_alive: AtomicU64::new(0),
        }
    }

    fn snapshot(&self) -> String {
        format!(
            "sent: {}, processed: {}, dropped: {}, alive: {}",
            self.messages_sent.load(Ordering::Relaxed),
            self.messages_processed.load(Ordering::Relaxed),
            self.messages_dropped.load(Ordering::Relaxed),
            self.actors_alive.load(Ordering::Relaxed),
        )
    }
}

enum WorkerCmd {
    Execute { task: String, responder: oneshot::Sender<String> },
    Ping { responder: oneshot::Sender<bool> },
}

struct ResilientWorker {
    id: String,
    cmd_rx: mpsc::Receiver<WorkerCmd>,
    shutdown_rx: watch::Receiver<bool>,
    event_tx: broadcast::Sender<String>,
    metrics: Arc<ActorMetrics>,
    mailbox_capacity: usize,
    process_timeout: Duration,
}

impl ResilientWorker {
    async fn run(&mut self) {
        self.metrics.actors_alive.fetch_add(1, Ordering::Relaxed);
        println!("worker {} started", self.id);

        loop {
            select! {
                _ = self.shutdown_rx.changed() => {
                    if *self.shutdown_rx.borrow() {
                        self.drain_mailbox().await;
                        break;
                    }
                }
                cmd = self.cmd_rx.recv() => {
                    match cmd {
                        Some(cmd) => self.handle_command(cmd).await,
                        None => break,
                    }
                }
            }
        }

        self.metrics.actors_alive.fetch_sub(1, Ordering::Relaxed);
        println!("worker {} stopped", self.id);
    }

    async fn handle_command(&mut self, cmd: WorkerCmd) {
        match cmd {
            WorkerCmd::Execute { task, responder } => {
                self.metrics.messages_sent.fetch_add(1, Ordering::Relaxed);

                let result = match timeout(self.process_timeout, self.process_task(&task)).await {
                    Ok(output) => {
                        self.metrics.messages_processed.fetch_add(1, Ordering::Relaxed);
                        output
                    }
                    Err(_) => {
                        self.metrics.messages_dropped.fetch_add(1, Ordering::Relaxed);
                        let _ = self.event_tx.send(format!("worker {} timeout on: {}", self.id, task));
                        "timeout".to_string()
                    }
                };
                let _ = responder.send(result);
            }
            WorkerCmd::Ping { responder } => {
                let _ = responder.send(true);
            }
        }
    }

    async fn process_task(&self, task: &str) -> String {
        tokio::time::sleep(Duration::from_millis(10)).await;
        format!("worker {} processed: {}", self.id, task.to_uppercase())
    }

    async fn drain_mailbox(&mut self) {
        println!("worker {} draining mailbox...", self.id);
        let drain_timeout = Duration::from_secs(5);
        let deadline = Instant::now() + drain_timeout;

        loop {
            if Instant::now() > deadline {
                println!("worker {} drain timeout, {} messages remaining", self.id, self.cmd_rx.len());
                break;
            }

            match timeout(Duration::from_millis(100), self.cmd_rx.recv()).await {
                Ok(Some(WorkerCmd::Execute { task, responder })) => {
                    let result = self.process_task(&task).await;
                    self.metrics.messages_processed.fetch_add(1, Ordering::Relaxed);
                    let _ = responder.send(result);
                }
                Ok(Some(WorkerCmd::Ping { responder })) => {
                    let _ = responder.send(false);
                }
                Ok(None) => break,
                Err(_) => continue,
            }
        }
        println!("worker {} mailbox drained", self.id);
    }
}

impl ActorSystem {
    fn new() -> Self {
        let (shutdown_tx, shutdown_rx) = watch::channel(false);
        let (event_tx, _) = broadcast::channel::<String>(256);
        let metrics = Arc::new(ActorMetrics::new());

        Self { shutdown_tx, shutdown_rx, event_tx, metrics }
    }

    async fn spawn_worker(&self, id: String, mailbox_capacity: usize) -> mpsc::Sender<WorkerCmd> {
        let (tx, rx) = mpsc::channel::<WorkerCmd>(mailbox_capacity);
        let mut worker = ResilientWorker {
            id,
            cmd_rx: rx,
            shutdown_rx: self.shutdown_rx.clone(),
            event_tx: self.event_tx.clone(),
            metrics: self.metrics.clone(),
            mailbox_capacity,
            process_timeout: Duration::from_secs(2),
        };
        tokio::spawn(async move { worker.run().await });
        tx
    }

    async fn graceful_shutdown(&self) {
        println!("initiating graceful shutdown...");
        self.shutdown_tx.send(true).ok();
        tokio::time::sleep(Duration::from_secs(6)).await;
        println!("final metrics: {}", self.metrics.snapshot());
    }
}

async fn backpressure_sender(
    tx: mpsc::Sender<WorkerCmd>,
    worker_id: String,
    total: u64,
) -> u64 {
    let mut sent = 0u64;
    let mut dropped = 0u64;

    for i in 0..total {
        let (resp_tx, resp_rx) = oneshot::channel();
        let task = format!("task-{}", i);

        match timeout(Duration::from_millis(50), tx.send(WorkerCmd::Execute {
            task,
            responder: resp_tx,
        })).await {
            Ok(Ok(())) => {
                match timeout(Duration::from_secs(3), resp_rx).await {
                    Ok(Ok(result)) => {
                        sent += 1;
                        if i % 100 == 0 {
                            println!("{} result: {}", worker_id, result);
                        }
                    }
                    _ => dropped += 1,
                }
            }
            _ => dropped += 1,
        }
    }

    println!("{} sent: {}, dropped: {}", worker_id, sent, dropped);
    dropped
}

#[tokio::main]
async fn main() {
    let system = ActorSystem::new();

    let worker1 = system.spawn_worker("worker-1".into(), 64).await;
    let worker2 = system.spawn_worker("worker-2".into(), 64).await;

    let h1 = tokio::spawn(backpressure_sender(worker1, "sender-1".into(), 200));
    let h2 = tokio::spawn(backpressure_sender(worker2, "sender-2".into(), 200));

    h1.await.unwrap();
    h2.await.unwrap();

    println!("metrics: {}", system.metrics.snapshot());
    system.graceful_shutdown().await;
}

关键要点

  • watch::channel(false) 广播关闭信号,所有Actor同时收到通知
  • drain_mailbox() 在关闭前处理剩余消息,设置超时防止无限等待
  • timeout 包裹 sendrecv,防止背压导致无限阻塞
  • ActorMetricsAtomicU64 采集指标,无锁且线程安全
  • 有界 mailbox(容量 64)天然实现背压,生产者在满时等待

五大常见陷阱

陷阱一:Actor间循环等待

// ❌ 错误:Actor A等Actor B的响应,Actor B等Actor A的响应
async fn deadlock_pattern() {
    let (tx_a, mut rx_a) = mpsc::channel::<String>(16);
    let (tx_b, mut rx_b) = mpsc::channel::<String>(16);

    tokio::spawn(async move {
        tx_b.send("request-from-a".into()).await.unwrap();
        let response = rx_a.recv().await.unwrap();
    });

    tokio::spawn(async move {
        tx_a.send("request-from-b".into()).await.unwrap();
        let response = rx_b.recv().await.unwrap();
    });
}

// ✅ 正确:使用超时或单向消息打破循环
async fn safe_pattern() {
    let (tx_a, mut rx_a) = mpsc::channel::<String>(16);
    let (tx_b, mut rx_b) = mpsc::channel::<String>(16);

    tokio::spawn(async move {
        tx_b.send("request-from-a".into()).await.unwrap();
        match timeout(Duration::from_secs(5), rx_a.recv()).await {
            Ok(Some(response)) => println!("got: {}", response),
            _ => println!("timeout or channel closed"),
        }
    });
}

陷阱二:忘记处理Mailbox满的情况

// ❌ 错误:无界channel,内存可能爆炸
async fn unbounded_risk() {
    let (tx, mut rx) = mpsc::unbounded_channel::<String>();
    for i in 0..1000000 {
        tx.send(format!("msg-{}", i)).unwrap();
    }
}

// ✅ 正确:有界channel + 背压
async fn bounded_safe() {
    let (tx, mut rx) = mpsc::channel::<String>(256);
    for i in 0..1000 {
        match tx.send(format!("msg-{}", i)).await {
            Ok(()) => {}
            Err(_) => {
                println!("receiver dropped at {}", i);
                return;
            }
        }
    }
}

陷阱三:监督者不限制重启次数

// ❌ 错误:无限重启,可能形成崩溃-重启循环
async fn infinite_restart_loop() {
    loop {
        let (tx, rx) = mpsc::channel::<String>(16);
        let handle = tokio::spawn(async move {
            panic!("always crash!");
        });
        let _ = handle.await;
        println!("restarting...");
    }
}

// ✅ 正确:限制重启次数和频率
struct RestartPolicy {
    max_restarts: u32,
    window: Duration,
    restarts: Vec<Instant>,
}

impl RestartPolicy {
    fn new(max_restarts: u32, window: Duration) -> Self {
        Self { max_restarts, window, restarts: Vec::new() }
    }

    fn should_restart(&mut self) -> bool {
        let now = Instant::now();
        self.restarts.retain(|t| now - *t < self.window);
        if self.restarts.len() < self.max_restarts as usize {
            self.restarts.push(now);
            true
        } else {
            false
        }
    }
}

陷阱四:远程消息不设超时

// ❌ 错误:远程调用可能永远阻塞
async fn remote_without_timeout() {
    let response = gateway.send_command("remote-actor", cmd).await;
}

// ✅ 正确:所有远程调用必须设超时
async fn remote_with_timeout() {
    match timeout(Duration::from_secs(5), gateway.send_command("remote-actor", cmd)).await {
        Ok(Some(response)) => println!("got: {:?}", response),
        Ok(None) => println!("actor not found"),
        Err(_) => println!("remote call timeout"),
    }
}

陷阱五:关闭时丢弃未处理消息

// ❌ 错误:直接drop receiver,消息丢失
async fn bad_shutdown(mut rx: mpsc::Receiver<String>) {
    drop(rx);
}

// ✅ 正确:drain后再关闭
async fn good_shutdown(mut rx: mpsc::Receiver<String>) {
    println!("draining remaining messages...");
    while let Ok(Some(msg)) = timeout(Duration::from_millis(100), rx.recv()).await {
        println!("drained: {}", msg);
    }
    println!("all messages processed");
}

错误排查速查表

错误现象 可能原因 解决方案
Actor不响应消息 Mailbox满或死锁 检查mailbox容量,添加超时机制
SendError 频繁出现 接收端已关闭 检查Actor是否意外退出,添加监督
内存持续增长 无界channel或消息积压 改用有界channel,添加背压
重启循环 Actor初始化失败 检查启动逻辑,限制重启次数
远程调用超时 网络延迟或对端无响应 添加超时和重试机制
消息顺序错乱 多生产者并发发送 单生产者保证顺序,或添加序列号
优雅关闭卡住 某个Actor不响应关闭信号 为drain设置超时
oneshot RecvError 响应端未发送就退出 检查所有代码路径是否都发送了响应

高级优化技巧

1. Actor池化——负载均衡

当单个Actor成为瓶颈时,用Actor池分发消息:

use tokio::sync::mpsc;

struct ActorPool {
    workers: Vec<mpsc::Sender<WorkerCmd>>,
    next: std::sync::atomic::AtomicUsize,
}

impl ActorPool {
    fn new(workers: Vec<mpsc::Sender<WorkerCmd>>) -> Self {
        Self {
            workers,
            next: std::sync::atomic::AtomicUsize::new(0),
        }
    }

    fn next_worker(&self) -> &mpsc::Sender<WorkerCmd> {
        let idx = self.next.fetch_add(1, Ordering::Relaxed) % self.workers.len();
        &self.workers[idx]
    }

    async fn send(&self, cmd: WorkerCmd) -> bool {
        let worker = self.next_worker();
        worker.send(cmd).await.is_ok()
    }
}

2. 消息优先级

use tokio::sync::mpsc;
use tokio::select;

enum Priority {
    High,
    Normal,
    Low,
}

struct PriorityMailbox {
    high_rx: mpsc::Receiver<String>,
    normal_rx: mpsc::Receiver<String>,
    low_rx: mpsc::Receiver<String>,
}

impl PriorityMailbox {
    async fn recv(&mut self) -> Option<String> {
        select! {
            biased;
            msg = self.high_rx.recv() => msg,
            msg = self.normal_rx.recv() => msg,
            msg = self.low_rx.recv() => msg,
        }
    }
}

3. 消息追踪与链路ID

use std::sync::atomic::{AtomicU64, Ordering};

static TRACE_ID: AtomicU64 = AtomicU64::new(1);

fn new_trace_id() -> u64 {
    TRACE_ID.fetch_add(1, Ordering::Relaxed)
}

#[derive(Debug, Clone)]
struct TracedMessage {
    trace_id: u64,
    span_id: u64,
    parent_span: Option<u64>,
    payload: String,
}

impl TracedMessage {
    fn new(payload: String) -> Self {
        Self {
            trace_id: new_trace_id(),
            span_id: new_trace_id(),
            parent_span: None,
            payload,
        }
    }

    fn child(&self, payload: String) -> Self {
        Self {
            trace_id: self.trace_id,
            span_id: new_trace_id(),
            parent_span: Some(self.span_id),
            payload,
        }
    }
}

Actor框架对比

特性 Actix Tokio Channel Riker Coerce
消息传递 类型化Message 手动enum 类型化 类型化
监督树 内置 手动实现 内置 内置
远程Actor actix-remote 手动实现 支持 内置
性能 极高
依赖 actix 仅tokio riker coerce
学习曲线 中等 平缓 中等 中等
生态成熟度
适用场景 Web服务 通用并发 传统Actor 分布式

总结

Rust Actor模型的核心哲学是:不要通过共享内存来通信,而要通过通信来共享内存。Actix提供了成熟的类型化Actor系统,Tokio Channel Actor则更轻量灵活。监督树是生产级Actor系统的基石——没有监督的Actor系统就像没有安全带的汽车。在分布式场景中,统一本地/远程通信接口是关键。始终使用有界mailbox实现背压,为所有远程调用设置超时,用drain机制确保优雅关闭,你的Actor系统将既健壮又高效。


推荐工具

本站提供浏览器本地工具,免注册即可试用 →

#Rust Actor#Actix#Tokio#并发模型#消息传递#2026#编程语言