Rust Actor模型框架实战:从消息传递到监督树的5种生产模式
当并发模型遇上Rust的所有权
你有没有遇到过这种情况——系统需要同时处理上千个独立的状态实体,每个实体都有自己的生命周期和消息队列。你试过用 Arc<Mutex<HashMap>> 管理共享状态,结果锁竞争让吞吐量跌到谷底;你试过用线程池分发任务,却发现线程间通信的开销比计算本身还大;最终你意识到,Actor模型才是这类问题的正解。
Actor模型的核心思想很简单:每个Actor是一个独立的计算单元,拥有私有状态,只能通过消息传递与外界交互。没有共享状态,就没有锁竞争;没有锁竞争,就没有死锁。Rust的所有权系统与Actor模型天然契合——消息的所有权在发送时转移,编译器帮你保证线程安全。
核心概念一览
| 概念 | 说明 | 典型场景 |
|---|---|---|
| Actor | 独立计算单元,封装状态与行为 | 状态机、连接管理 |
| Message | Actor间通信的唯一方式 | 命令、事件、查询 |
| Mailbox | Actor的消息队列 | 消息缓冲、有序处理 |
| Supervision | 父Actor监控子Actor | 故障恢复、重启策略 |
| Address | Actor的引用句柄 | 消息寻址、路由 |
| Context | Actor的运行时环境 | 定时器、子Actor管理 |
| Backpressure | 流量控制机制 | 防止Mailbox溢出 |
Actor模型的五大挑战
1. 共享状态的锁地狱
传统并发模型依赖共享内存+锁,但在高并发场景下,锁竞争成为瓶颈。Actor模型通过"不共享"来解决这个问题——每个Actor独占自己的状态。
2. 故障隔离与恢复
一个组件崩溃不应该拖垮整个系统。Actor模型通过监督树实现故障隔离——子Actor崩溃由父Actor决定如何处理(重启、停止、升级重启等)。
3. 消息有序性保证
同一个Actor处理消息天然有序,但跨Actor的消息顺序如何保证?需要精心设计消息协议和路由策略。
4. 分布式透明性
本地Actor和远程Actor的通信方式应该一致。如何在不改变业务逻辑的前提下,让Actor跨节点通信?
5. 优雅关闭的复杂性
一个Actor系统可能有成百上千个Actor,如何确保关闭时所有消息都被处理完毕、所有资源都被释放?
五种生产级Actor模式
模式一:Actix Actor系统——经典Actor模式
Actix是Rust生态中最成熟的Actor框架,基于类型化的消息处理,提供完整的Actor生命周期管理。
use actix::prelude::*;
use std::time::Duration;
struct GameSession {
id: String,
score: u64,
level: u32,
}
impl Actor for GameSession {
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {
println!("session {} started, score: {}", self.id, self.score);
}
fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
println!("session {} stopping", self.id);
Running::Continue
}
fn stopped(&mut self, _ctx: &mut Self::Context) {
println!("session {} stopped, final score: {}", self.id, self.score);
}
}
#[derive(Message)]
#[rtype(result = "u64")]
struct GetScore;
#[derive(Message)]
#[rtype(result = "()")]
struct AddScore { amount: u64 }
#[derive(Message)]
#[rtype(result = "()")]
struct LevelUp;
impl Handler<GetScore> for GameSession {
type Result = u64;
fn handle(&mut self, _msg: GetScore, _ctx: &mut Context<Self>) -> u64 {
self.score
}
}
impl Handler<AddScore> for GameSession {
type Result = ();
fn handle(&mut self, msg: AddScore, _ctx: &mut Context<Self>) {
self.score += msg.amount;
println!("session {} score: {}", self.id, self.score);
}
}
impl Handler<LevelUp> for GameSession {
type Result = ();
fn handle(&mut self, _msg: LevelUp, ctx: &mut Context<Self>) {
self.level += 1;
println!("session {} leveled up to {}", self.id, self.level);
ctx.notify_later(LevelUp, Duration::from_secs(30));
}
}
#[actix::main]
async fn main() {
let addr = GameSession {
id: "player-001".into(),
score: 0,
level: 1,
}.start();
addr.send(AddScore { amount: 100 }).await.unwrap();
addr.send(AddScore { amount: 50 }).await.unwrap();
addr.send(LevelUp).await.unwrap();
let score = addr.send(GetScore).await.unwrap();
println!("final score: {}", score);
}
关键要点:
impl Actor定义Actor生命周期:started、stopping、stopped#[derive(Message)]定义类型化消息,rtype指定返回类型Handler<T>trait 为每种消息类型实现处理逻辑ctx.notify_later实现定时自消息,构建周期性任务addr.send()异步发送消息并等待响应
模式二:Tokio Channel-Based Actor——轻量级Actor模式
不依赖Actor框架,用Tokio channel手写Actor,更轻量、更灵活,适合对性能和可控性要求极高的场景。
use tokio::sync::{mpsc, oneshot};
use std::collections::HashMap;
enum CacheCommand {
Get {
key: String,
responder: oneshot::Sender<Option<String>>,
},
Set {
key: String,
value: String,
ttl: Option<std::time::Duration>,
responder: oneshot::Sender<bool>,
},
Delete {
key: String,
responder: oneshot::Sender<bool>,
},
Stats {
responder: oneshot::Sender<CacheStats>,
},
}
#[derive(Debug)]
struct CacheStats {
entries: usize,
hits: u64,
misses: u64,
}
struct CacheEntry {
value: String,
expires_at: Option<std::time::Instant>,
}
struct CacheActor {
store: HashMap<String, CacheEntry>,
hits: u64,
misses: u64,
}
impl CacheActor {
fn new() -> Self {
Self {
store: HashMap::new(),
hits: 0,
misses: 0,
}
}
fn is_expired(&self, entry: &CacheEntry) -> bool {
match entry.expires_at {
Some(t) => std::time::Instant::now() > t,
None => false,
}
}
async fn run(&mut self, mut rx: mpsc::Receiver<CacheCommand>) {
while let Some(cmd) = rx.recv().await {
match cmd {
CacheCommand::Get { key, responder } => {
let result = match self.store.get(&key) {
Some(entry) if !self.is_expired(entry) => {
self.hits += 1;
Some(entry.value.clone())
}
Some(_) => {
self.store.remove(&key);
self.misses += 1;
None
}
None => {
self.misses += 1;
None
}
};
let _ = responder.send(result);
}
CacheCommand::Set { key, value, ttl, responder } => {
let expires_at = ttl.map(|d| std::time::Instant::now() + d);
self.store.insert(key, CacheEntry { value, expires_at });
let _ = responder.send(true);
}
CacheCommand::Delete { key, responder } => {
let existed = self.store.remove(&key).is_some();
let _ = responder.send(existed);
}
CacheCommand::Stats { responder } => {
let stats = CacheStats {
entries: self.store.len(),
hits: self.hits,
misses: self.misses,
};
let _ = responder.send(stats);
}
}
}
println!("cache actor shutting down, {} entries remaining", self.store.len());
}
}
#[tokio::main]
async fn main() {
let (tx, rx) = mpsc::channel::<CacheCommand>(256);
let mut actor = CacheActor::new();
tokio::spawn(async move { actor.run(rx).await });
let (resp_tx, resp_rx) = oneshot::channel();
tx.send(CacheCommand::Set {
key: "user:1001".into(),
value: r#"{"name":"alice"}"#.into(),
ttl: Some(std::time::Duration::from_secs(300)),
responder: resp_tx,
}).await.unwrap();
resp_rx.await.unwrap();
let (resp_tx, resp_rx) = oneshot::channel();
tx.send(CacheCommand::Get {
key: "user:1001".into(),
responder: resp_tx,
}).await.unwrap();
let result = resp_rx.await.unwrap();
println!("get result: {:?}", result);
let (resp_tx, resp_rx) = oneshot::channel();
tx.send(CacheCommand::Stats { responder: resp_tx }).await.unwrap();
let stats = resp_rx.await.unwrap();
println!("cache stats: {:?}", stats);
}
关键要点:
mpscchannel 作为 Actor 的 mailbox,oneshotchannel 实现请求-响应enum Command定义消息协议,类型安全且可扩展- Actor 是单所有者,天然无数据竞争
- 无需外部框架依赖,Tokio 即可构建完整 Actor 系统
ttl过期机制展示 Actor 内部状态管理的灵活性
Channel Actor vs Actix 对比:
| 特性 | Channel Actor | Actix |
|---|---|---|
| 依赖 | 仅 tokio | actix + tokio |
| 消息类型 | 手动 enum | derive Message |
| 响应机制 | oneshot channel | Message rtype |
| 生命周期 | 手动管理 | 自动回调 |
| 监督 | 手动实现 | 内置支持 |
| 性能开销 | 极低 | 低 |
| 学习曲线 | 平缓 | 中等 |
模式三:监督树——故障恢复模式
监督树是Actor模型最强大的特性之一:父Actor监控子Actor,当子Actor崩溃时根据策略决定如何恢复。
use tokio::sync::{mpsc, oneshot, watch};
use tokio::time::{sleep, Duration};
use std::collections::HashMap;
#[derive(Debug, Clone)]
enum RestartStrategy {
Permanent,
Transient,
Temporary,
}
#[derive(Debug)]
enum WorkerMessage {
Process { data: String, responder: oneshot::Sender<String> },
Crash,
Status { responder: oneshot::Sender<WorkerStatus>,
},
}
#[derive(Debug, Clone)]
struct WorkerStatus {
id: String,
processed: u64,
alive: bool,
}
struct Worker {
id: String,
processed: u64,
}
impl Worker {
fn new(id: String) -> Self {
Self { id, processed: 0 }
}
async fn run(
&mut self,
mut rx: mpsc::Receiver<WorkerMessage>,
supervisor_tx: mpsc::Sender<SupervisorMessage>,
) {
println!("worker {} started", self.id);
while let Some(msg) = rx.recv().await {
match msg {
WorkerMessage::Process { data, responder } => {
self.processed += 1;
let result = format!("processed by {}: {}", self.id, data.to_uppercase());
let _ = responder.send(result);
}
WorkerMessage::Crash => {
println!("worker {} crashing!", self.id);
let _ = supervisor_tx.send(SupervisorMessage::ChildCrashed {
id: self.id.clone(),
}).await;
return;
}
WorkerMessage::Status { responder } => {
let _ = responder.send(WorkerStatus {
id: self.id.clone(),
processed: self.processed,
alive: true,
});
}
}
}
let _ = supervisor_tx.send(SupervisorMessage::ChildStopped {
id: self.id.clone(),
}).await;
}
}
enum SupervisorMessage {
ChildCrashed { id: String },
ChildStopped { id: String },
SpawnWorker { id: String },
GetStatus { responder: oneshot::Sender<Vec<WorkerStatus>> },
}
struct ChildHandle {
tx: mpsc::Sender<WorkerMessage>,
strategy: RestartStrategy,
}
struct Supervisor {
children: HashMap<String, ChildHandle>,
supervisor_rx: mpsc::Receiver<SupervisorMessage>,
restart_count: HashMap<String, u32>,
max_restarts: u32,
restart_window: Duration,
}
impl Supervisor {
fn new(rx: mpsc::Receiver<SupervisorMessage>, max_restarts: u32) -> Self {
Self {
children: HashMap::new(),
supervisor_rx: rx,
restart_count: HashMap::new(),
max_restarts,
restart_window: Duration::from_secs(60),
}
}
async fn spawn_child(&mut self, id: String, strategy: RestartStrategy) {
let (tx, rx) = mpsc::channel::<WorkerMessage>(64);
let supervisor_tx = self.supervisor_tx();
let worker_id = id.clone();
let mut worker = Worker::new(id.clone());
tokio::spawn(async move {
worker.run(rx, supervisor_tx).await;
});
self.children.insert(id, ChildHandle { tx, strategy });
println!("supervisor spawned child {}", worker_id);
}
fn supervisor_tx(&self) -> mpsc::Sender<SupervisorMessage> {
self.supervisor_rx.clone_sender()
}
async fn restart_child(&mut self, id: &str) -> bool {
let restarts = self.restart_count.entry(id.to_string()).or_insert(0);
if *restarts >= self.max_restarts {
println!("supervisor: child {} exceeded max restarts ({})", id, self.max_restarts);
return false;
}
*restarts += 1;
println!("supervisor: restarting child {} (attempt {})", id, *restarts);
let strategy = self.children.get(id).map(|h| h.strategy.clone());
if let Some(strategy) = strategy {
self.children.remove(id);
self.spawn_child(id.to_string(), strategy).await;
true
} else {
false
}
}
async fn run(&mut self) {
while let Some(msg) = self.supervisor_rx.recv().await {
match msg {
SupervisorMessage::ChildCrashed { id } => {
let strategy = self.children.get(&id).map(|h| h.strategy.clone());
match strategy {
Some(RestartStrategy::Permanent) => {
self.restart_child(&id).await;
}
Some(RestartStrategy::Transient) => {
self.restart_child(&id).await;
}
Some(RestartStrategy::Temporary) => {
println!("supervisor: child {} crashed, not restarting (temporary)", id);
self.children.remove(&id);
}
None => {}
}
}
SupervisorMessage::ChildStopped { id } => {
let strategy = self.children.get(&id).map(|h| h.strategy.clone());
match strategy {
Some(RestartStrategy::Permanent) => {
self.restart_child(&id).await;
}
Some(RestartStrategy::Transient) | Some(RestartStrategy::Temporary) => {
self.children.remove(&id);
}
None => {}
}
}
SupervisorMessage::SpawnWorker { id } => {
self.spawn_child(id, RestartStrategy::Permanent).await;
}
SupervisorMessage::GetStatus { responder } => {
let mut statuses = Vec::new();
for (id, handle) in &self.children {
let (resp_tx, resp_rx) = oneshot::channel();
if handle.tx.send(WorkerMessage::Status { responder: resp_tx }).await.is_ok() {
if let Ok(status) = resp_rx.await {
statuses.push(status);
}
}
}
let _ = responder.send(statuses);
}
}
}
}
}
#[tokio::main]
async fn main() {
let (sup_tx, sup_rx) = mpsc::channel::<SupervisorMessage>(256);
let mut supervisor = Supervisor::new(sup_rx, 3);
tokio::spawn(async move { supervisor.run().await });
sup_tx.send(SupervisorMessage::SpawnWorker { id: "worker-1".into() }).await.unwrap();
sup_tx.send(SupervisorMessage::SpawnWorker { id: "worker-2".into() }).await.unwrap();
sleep(Duration::from_millis(50)).await;
if let Some(handle) = supervisor_children_get("worker-1", &sup_tx).await {
let (resp_tx, resp_rx) = oneshot::channel();
handle.tx.send(WorkerMessage::Process {
data: "hello world".into(),
responder: resp_tx,
}).await.unwrap();
let result = resp_rx.await.unwrap();
println!("result: {}", result);
}
println!("--- triggering crash ---");
if let Some(handle) = supervisor_children_get("worker-1", &sup_tx).await {
let _ = handle.tx.send(WorkerMessage::Crash).await;
}
sleep(Duration::from_millis(100)).await;
let (resp_tx, resp_rx) = oneshot::channel();
sup_tx.send(SupervisorMessage::GetStatus { responder: resp_tx }).await.unwrap();
let statuses = resp_rx.await.unwrap();
for s in statuses {
println!("status: {:?}", s);
}
}
async fn supervisor_children_get(
_id: &str,
_sup_tx: &mpsc::Sender<SupervisorMessage>,
) -> Option<ChildHandle> {
None
}
关键要点:
- 监督者(Supervisor)持有子Actor的
Sender,通过mpsc接收崩溃通知 - 三种重启策略:
Permanent(总是重启)、Transient(异常退出重启)、Temporary(不重启) max_restarts+restart_window防止无限重启循环- 子Actor崩溃时主动通知监督者,监督者决定是否重启
- 监督树可以嵌套:监督者本身也可以是被监督的子Actor
重启策略对比:
| 策略 | 正常退出 | 异常退出 | 典型场景 |
|---|---|---|---|
| Permanent | 重启 | 重启 | 常驻服务、定时任务 |
| Transient | 不重启 | 重启 | 请求处理器、临时计算 |
| Temporary | 不重启 | 不重启 | 一次性任务、调试 |
模式四:分布式Actor——远程消息模式
当Actor跨越进程边界时,需要序列化消息并通过网络传输。这个模式展示如何构建可远程通信的Actor系统。
use tokio::sync::{mpsc, oneshot};
use tokio::net::{TcpListener, TcpStream};
use serde::{Serialize, Deserialize};
use std::collections::HashMap;
use std::net::SocketAddr;
#[derive(Debug, Serialize, Deserialize)]
struct RemoteEnvelope {
target: String,
message_type: String,
payload: Vec<u8>,
reply_to: Option<u64>,
}
#[derive(Debug, Serialize, Deserialize)]
enum OrderCommand {
Create { order_id: String, item: String, quantity: u32 },
Cancel { order_id: String },
Query { order_id: String },
}
#[derive(Debug, Serialize, Deserialize)]
enum OrderResponse {
Created { order_id: String },
Cancelled { order_id: String },
Found { order_id: String, item: String, quantity: u32 },
NotFound { order_id: String },
Error { message: String },
}
#[derive(Debug, Serialize, Deserialize, Clone)]
struct Order {
id: String,
item: String,
quantity: u32,
status: String,
}
struct OrderActor {
orders: HashMap<String, Order>,
cmd_rx: mpsc::Receiver<(OrderCommand, oneshot::Sender<OrderResponse>)>,
}
impl OrderActor {
fn new(rx: mpsc::Receiver<(OrderCommand, oneshot::Sender<OrderResponse>)>) -> Self {
Self { orders: HashMap::new(), cmd_rx: rx }
}
async fn run(&mut self) {
while let Some((cmd, responder)) = self.cmd_rx.recv().await {
let response = match cmd {
OrderCommand::Create { order_id, item, quantity } => {
if self.orders.contains_key(&order_id) {
OrderResponse::Error { message: format!("order {} already exists", order_id) }
} else {
self.orders.insert(order_id.clone(), Order {
id: order_id.clone(),
item,
quantity,
status: "created".into(),
});
OrderResponse::Created { order_id }
}
}
OrderCommand::Cancel { order_id } => {
match self.orders.get_mut(&order_id) {
Some(order) => {
order.status = "cancelled".into();
OrderResponse::Cancelled { order_id }
}
None => OrderResponse::NotFound { order_id },
}
}
OrderCommand::Query { order_id } => {
match self.orders.get(&order_id) {
Some(order) => OrderResponse::Found {
order_id: order.id.clone(),
item: order.item.clone(),
quantity: order.quantity,
},
None => OrderResponse::NotFound { order_id },
}
}
};
let _ = responder.send(response);
}
}
}
struct RemoteGateway {
local_actors: HashMap<String, mpsc::Sender<(OrderCommand, oneshot::Sender<OrderResponse>)>>,
remote_nodes: HashMap<String, SocketAddr>,
pending_replies: HashMap<u64, oneshot::Sender<OrderResponse>>,
next_request_id: u64,
}
impl RemoteGateway {
fn new() -> Self {
Self {
local_actors: HashMap::new(),
remote_nodes: HashMap::new(),
pending_replies: HashMap::new(),
next_request_id: 0,
}
}
fn register_local(&mut self, name: String, tx: mpsc::Sender<(OrderCommand, oneshot::Sender<OrderResponse>)>) {
self.local_actors.insert(name, tx);
}
fn register_remote(&mut self, name: String, addr: SocketAddr) {
self.remote_nodes.insert(name, addr);
}
async fn send_command(&mut self, target: &str, cmd: OrderCommand) -> Option<OrderResponse> {
if let Some(tx) = self.local_actors.get(target) {
let (resp_tx, resp_rx) = oneshot::channel();
if tx.send((cmd, resp_tx)).await.is_ok() {
return resp_rx.await.ok();
}
return None;
}
if let Some(addr) = self.remote_nodes.get(target) {
match TcpStream::connect(addr).await {
Ok(mut stream) => {
let payload = bincode::serialize(&cmd).ok()?;
let request_id = self.next_request_id;
self.next_request_id += 1;
let envelope = RemoteEnvelope {
target: target.to_string(),
message_type: "OrderCommand".to_string(),
payload,
reply_to: Some(request_id),
};
let data = bincode::serialize(&envelope).ok()?;
let len = data.len() as u32;
use tokio::io::AsyncWriteExt;
if stream.write_all(&len.to_le_bytes()).await.is_err() { return None; }
if stream.write_all(&data).await.is_err() { return None; }
use tokio::io::AsyncReadExt;
let mut len_buf = [0u8; 4];
if stream.read_exact(&mut len_buf).await.is_err() { return None; }
let resp_len = u32::from_le_bytes(len_buf) as usize;
let mut resp_buf = vec![0u8; resp_len];
if stream.read_exact(&mut resp_buf).await.is_err() { return None; }
bincode::deserialize(&resp_buf).ok()
}
Err(e) => {
eprintln!("failed to connect to {}: {}", target, e);
None
}
}
} else {
None
}
}
}
async fn server_node(addr: SocketAddr) {
let (tx, rx) = mpsc::channel::<(OrderCommand, oneshot::Sender<OrderResponse>)>(256);
let mut actor = OrderActor::new(rx);
tokio::spawn(async move { actor.run().await });
let listener = TcpListener::bind(addr).await.unwrap();
println!("order actor listening on {}", addr);
loop {
if let Ok((mut stream, _)) = listener.accept().await {
let tx = tx.clone();
tokio::spawn(async move {
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
let mut len_buf = [0u8; 4];
if stream.read_exact(&mut len_buf).await.is_err() { return; }
let len = u32::from_le_bytes(len_buf) as usize;
let mut buf = vec![0u8; len];
if stream.read_exact(&mut buf).await.is_err() { return; }
let envelope: RemoteEnvelope = match bincode::deserialize(&buf) {
Ok(e) => e,
Err(_) => return,
};
let cmd: OrderCommand = match bincode::deserialize(&envelope.payload) {
Ok(c) => c,
Err(_) => return,
};
let (resp_tx, resp_rx) = oneshot::channel();
if tx.send((cmd, resp_tx)).await.is_err() { return; }
let response = match resp_rx.await {
Ok(r) => r,
Err(_) => return,
};
let resp_data = bincode::serialize(&response).unwrap();
let resp_len = resp_data.len() as u32;
let _ = stream.write_all(&resp_len.to_le_bytes()).await;
let _ = stream.write_all(&resp_data).await;
});
}
}
}
#[tokio::main]
async fn main() {
let server_addr: SocketAddr = "127.0.0.1:9001".parse().unwrap();
tokio::spawn(server_node(server_addr));
tokio::time::sleep(Duration::from_millis(100)).await;
let mut gateway = RemoteGateway::new();
gateway.register_remote("order-service".into(), server_addr);
let response = gateway.send_command("order-service", OrderCommand::Create {
order_id: "ORD-001".into(),
item: "Rust Book".into(),
quantity: 2,
}).await;
println!("create response: {:?}", response);
let response = gateway.send_command("order-service", OrderCommand::Query {
order_id: "ORD-001".into(),
}).await;
println!("query response: {:?}", response);
}
关键要点:
RemoteEnvelope封装消息元数据:目标Actor、消息类型、载荷、回复地址- 本地Actor直接通过 channel 通信,远程Actor通过 TCP 传输序列化消息
RemoteGateway统一本地/远程路由,业务代码无需关心Actor位置bincode提供高效的二进制序列化,比 JSON 更适合 Actor 间通信- 长度前缀帧协议(4字节长度 + 数据)解决 TCP 粘包问题
模式五:生产级Actor系统——优雅关闭与背压
生产环境中的Actor系统需要处理优雅关闭、背压控制、消息超时等复杂场景。
use tokio::sync::{mpsc, oneshot, watch, broadcast};
use tokio::time::{timeout, Duration, Instant};
use tokio::select;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
struct ActorSystem {
shutdown_tx: watch::Sender<bool>,
shutdown_rx: watch::Receiver<bool>,
event_tx: broadcast::Sender<String>,
metrics: Arc<ActorMetrics>,
}
struct ActorMetrics {
messages_sent: AtomicU64,
messages_processed: AtomicU64,
messages_dropped: AtomicU64,
actors_alive: AtomicU64,
}
impl ActorMetrics {
fn new() -> Self {
Self {
messages_sent: AtomicU64::new(0),
messages_processed: AtomicU64::new(0),
messages_dropped: AtomicU64::new(0),
actors_alive: AtomicU64::new(0),
}
}
fn snapshot(&self) -> String {
format!(
"sent: {}, processed: {}, dropped: {}, alive: {}",
self.messages_sent.load(Ordering::Relaxed),
self.messages_processed.load(Ordering::Relaxed),
self.messages_dropped.load(Ordering::Relaxed),
self.actors_alive.load(Ordering::Relaxed),
)
}
}
enum WorkerCmd {
Execute { task: String, responder: oneshot::Sender<String> },
Ping { responder: oneshot::Sender<bool> },
}
struct ResilientWorker {
id: String,
cmd_rx: mpsc::Receiver<WorkerCmd>,
shutdown_rx: watch::Receiver<bool>,
event_tx: broadcast::Sender<String>,
metrics: Arc<ActorMetrics>,
mailbox_capacity: usize,
process_timeout: Duration,
}
impl ResilientWorker {
async fn run(&mut self) {
self.metrics.actors_alive.fetch_add(1, Ordering::Relaxed);
println!("worker {} started", self.id);
loop {
select! {
_ = self.shutdown_rx.changed() => {
if *self.shutdown_rx.borrow() {
self.drain_mailbox().await;
break;
}
}
cmd = self.cmd_rx.recv() => {
match cmd {
Some(cmd) => self.handle_command(cmd).await,
None => break,
}
}
}
}
self.metrics.actors_alive.fetch_sub(1, Ordering::Relaxed);
println!("worker {} stopped", self.id);
}
async fn handle_command(&mut self, cmd: WorkerCmd) {
match cmd {
WorkerCmd::Execute { task, responder } => {
self.metrics.messages_sent.fetch_add(1, Ordering::Relaxed);
let result = match timeout(self.process_timeout, self.process_task(&task)).await {
Ok(output) => {
self.metrics.messages_processed.fetch_add(1, Ordering::Relaxed);
output
}
Err(_) => {
self.metrics.messages_dropped.fetch_add(1, Ordering::Relaxed);
let _ = self.event_tx.send(format!("worker {} timeout on: {}", self.id, task));
"timeout".to_string()
}
};
let _ = responder.send(result);
}
WorkerCmd::Ping { responder } => {
let _ = responder.send(true);
}
}
}
async fn process_task(&self, task: &str) -> String {
tokio::time::sleep(Duration::from_millis(10)).await;
format!("worker {} processed: {}", self.id, task.to_uppercase())
}
async fn drain_mailbox(&mut self) {
println!("worker {} draining mailbox...", self.id);
let drain_timeout = Duration::from_secs(5);
let deadline = Instant::now() + drain_timeout;
loop {
if Instant::now() > deadline {
println!("worker {} drain timeout, {} messages remaining", self.id, self.cmd_rx.len());
break;
}
match timeout(Duration::from_millis(100), self.cmd_rx.recv()).await {
Ok(Some(WorkerCmd::Execute { task, responder })) => {
let result = self.process_task(&task).await;
self.metrics.messages_processed.fetch_add(1, Ordering::Relaxed);
let _ = responder.send(result);
}
Ok(Some(WorkerCmd::Ping { responder })) => {
let _ = responder.send(false);
}
Ok(None) => break,
Err(_) => continue,
}
}
println!("worker {} mailbox drained", self.id);
}
}
impl ActorSystem {
fn new() -> Self {
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let (event_tx, _) = broadcast::channel::<String>(256);
let metrics = Arc::new(ActorMetrics::new());
Self { shutdown_tx, shutdown_rx, event_tx, metrics }
}
async fn spawn_worker(&self, id: String, mailbox_capacity: usize) -> mpsc::Sender<WorkerCmd> {
let (tx, rx) = mpsc::channel::<WorkerCmd>(mailbox_capacity);
let mut worker = ResilientWorker {
id,
cmd_rx: rx,
shutdown_rx: self.shutdown_rx.clone(),
event_tx: self.event_tx.clone(),
metrics: self.metrics.clone(),
mailbox_capacity,
process_timeout: Duration::from_secs(2),
};
tokio::spawn(async move { worker.run().await });
tx
}
async fn graceful_shutdown(&self) {
println!("initiating graceful shutdown...");
self.shutdown_tx.send(true).ok();
tokio::time::sleep(Duration::from_secs(6)).await;
println!("final metrics: {}", self.metrics.snapshot());
}
}
async fn backpressure_sender(
tx: mpsc::Sender<WorkerCmd>,
worker_id: String,
total: u64,
) -> u64 {
let mut sent = 0u64;
let mut dropped = 0u64;
for i in 0..total {
let (resp_tx, resp_rx) = oneshot::channel();
let task = format!("task-{}", i);
match timeout(Duration::from_millis(50), tx.send(WorkerCmd::Execute {
task,
responder: resp_tx,
})).await {
Ok(Ok(())) => {
match timeout(Duration::from_secs(3), resp_rx).await {
Ok(Ok(result)) => {
sent += 1;
if i % 100 == 0 {
println!("{} result: {}", worker_id, result);
}
}
_ => dropped += 1,
}
}
_ => dropped += 1,
}
}
println!("{} sent: {}, dropped: {}", worker_id, sent, dropped);
dropped
}
#[tokio::main]
async fn main() {
let system = ActorSystem::new();
let worker1 = system.spawn_worker("worker-1".into(), 64).await;
let worker2 = system.spawn_worker("worker-2".into(), 64).await;
let h1 = tokio::spawn(backpressure_sender(worker1, "sender-1".into(), 200));
let h2 = tokio::spawn(backpressure_sender(worker2, "sender-2".into(), 200));
h1.await.unwrap();
h2.await.unwrap();
println!("metrics: {}", system.metrics.snapshot());
system.graceful_shutdown().await;
}
关键要点:
watch::channel(false)广播关闭信号,所有Actor同时收到通知drain_mailbox()在关闭前处理剩余消息,设置超时防止无限等待timeout包裹send和recv,防止背压导致无限阻塞ActorMetrics用AtomicU64采集指标,无锁且线程安全- 有界 mailbox(容量 64)天然实现背压,生产者在满时等待
五大常见陷阱
陷阱一:Actor间循环等待
// ❌ 错误:Actor A等Actor B的响应,Actor B等Actor A的响应
async fn deadlock_pattern() {
let (tx_a, mut rx_a) = mpsc::channel::<String>(16);
let (tx_b, mut rx_b) = mpsc::channel::<String>(16);
tokio::spawn(async move {
tx_b.send("request-from-a".into()).await.unwrap();
let response = rx_a.recv().await.unwrap();
});
tokio::spawn(async move {
tx_a.send("request-from-b".into()).await.unwrap();
let response = rx_b.recv().await.unwrap();
});
}
// ✅ 正确:使用超时或单向消息打破循环
async fn safe_pattern() {
let (tx_a, mut rx_a) = mpsc::channel::<String>(16);
let (tx_b, mut rx_b) = mpsc::channel::<String>(16);
tokio::spawn(async move {
tx_b.send("request-from-a".into()).await.unwrap();
match timeout(Duration::from_secs(5), rx_a.recv()).await {
Ok(Some(response)) => println!("got: {}", response),
_ => println!("timeout or channel closed"),
}
});
}
陷阱二:忘记处理Mailbox满的情况
// ❌ 错误:无界channel,内存可能爆炸
async fn unbounded_risk() {
let (tx, mut rx) = mpsc::unbounded_channel::<String>();
for i in 0..1000000 {
tx.send(format!("msg-{}", i)).unwrap();
}
}
// ✅ 正确:有界channel + 背压
async fn bounded_safe() {
let (tx, mut rx) = mpsc::channel::<String>(256);
for i in 0..1000 {
match tx.send(format!("msg-{}", i)).await {
Ok(()) => {}
Err(_) => {
println!("receiver dropped at {}", i);
return;
}
}
}
}
陷阱三:监督者不限制重启次数
// ❌ 错误:无限重启,可能形成崩溃-重启循环
async fn infinite_restart_loop() {
loop {
let (tx, rx) = mpsc::channel::<String>(16);
let handle = tokio::spawn(async move {
panic!("always crash!");
});
let _ = handle.await;
println!("restarting...");
}
}
// ✅ 正确:限制重启次数和频率
struct RestartPolicy {
max_restarts: u32,
window: Duration,
restarts: Vec<Instant>,
}
impl RestartPolicy {
fn new(max_restarts: u32, window: Duration) -> Self {
Self { max_restarts, window, restarts: Vec::new() }
}
fn should_restart(&mut self) -> bool {
let now = Instant::now();
self.restarts.retain(|t| now - *t < self.window);
if self.restarts.len() < self.max_restarts as usize {
self.restarts.push(now);
true
} else {
false
}
}
}
陷阱四:远程消息不设超时
// ❌ 错误:远程调用可能永远阻塞
async fn remote_without_timeout() {
let response = gateway.send_command("remote-actor", cmd).await;
}
// ✅ 正确:所有远程调用必须设超时
async fn remote_with_timeout() {
match timeout(Duration::from_secs(5), gateway.send_command("remote-actor", cmd)).await {
Ok(Some(response)) => println!("got: {:?}", response),
Ok(None) => println!("actor not found"),
Err(_) => println!("remote call timeout"),
}
}
陷阱五:关闭时丢弃未处理消息
// ❌ 错误:直接drop receiver,消息丢失
async fn bad_shutdown(mut rx: mpsc::Receiver<String>) {
drop(rx);
}
// ✅ 正确:drain后再关闭
async fn good_shutdown(mut rx: mpsc::Receiver<String>) {
println!("draining remaining messages...");
while let Ok(Some(msg)) = timeout(Duration::from_millis(100), rx.recv()).await {
println!("drained: {}", msg);
}
println!("all messages processed");
}
错误排查速查表
| 错误现象 | 可能原因 | 解决方案 |
|---|---|---|
| Actor不响应消息 | Mailbox满或死锁 | 检查mailbox容量,添加超时机制 |
SendError 频繁出现 |
接收端已关闭 | 检查Actor是否意外退出,添加监督 |
| 内存持续增长 | 无界channel或消息积压 | 改用有界channel,添加背压 |
| 重启循环 | Actor初始化失败 | 检查启动逻辑,限制重启次数 |
| 远程调用超时 | 网络延迟或对端无响应 | 添加超时和重试机制 |
| 消息顺序错乱 | 多生产者并发发送 | 单生产者保证顺序,或添加序列号 |
| 优雅关闭卡住 | 某个Actor不响应关闭信号 | 为drain设置超时 |
oneshot RecvError |
响应端未发送就退出 | 检查所有代码路径是否都发送了响应 |
高级优化技巧
1. Actor池化——负载均衡
当单个Actor成为瓶颈时,用Actor池分发消息:
use tokio::sync::mpsc;
struct ActorPool {
workers: Vec<mpsc::Sender<WorkerCmd>>,
next: std::sync::atomic::AtomicUsize,
}
impl ActorPool {
fn new(workers: Vec<mpsc::Sender<WorkerCmd>>) -> Self {
Self {
workers,
next: std::sync::atomic::AtomicUsize::new(0),
}
}
fn next_worker(&self) -> &mpsc::Sender<WorkerCmd> {
let idx = self.next.fetch_add(1, Ordering::Relaxed) % self.workers.len();
&self.workers[idx]
}
async fn send(&self, cmd: WorkerCmd) -> bool {
let worker = self.next_worker();
worker.send(cmd).await.is_ok()
}
}
2. 消息优先级
use tokio::sync::mpsc;
use tokio::select;
enum Priority {
High,
Normal,
Low,
}
struct PriorityMailbox {
high_rx: mpsc::Receiver<String>,
normal_rx: mpsc::Receiver<String>,
low_rx: mpsc::Receiver<String>,
}
impl PriorityMailbox {
async fn recv(&mut self) -> Option<String> {
select! {
biased;
msg = self.high_rx.recv() => msg,
msg = self.normal_rx.recv() => msg,
msg = self.low_rx.recv() => msg,
}
}
}
3. 消息追踪与链路ID
use std::sync::atomic::{AtomicU64, Ordering};
static TRACE_ID: AtomicU64 = AtomicU64::new(1);
fn new_trace_id() -> u64 {
TRACE_ID.fetch_add(1, Ordering::Relaxed)
}
#[derive(Debug, Clone)]
struct TracedMessage {
trace_id: u64,
span_id: u64,
parent_span: Option<u64>,
payload: String,
}
impl TracedMessage {
fn new(payload: String) -> Self {
Self {
trace_id: new_trace_id(),
span_id: new_trace_id(),
parent_span: None,
payload,
}
}
fn child(&self, payload: String) -> Self {
Self {
trace_id: self.trace_id,
span_id: new_trace_id(),
parent_span: Some(self.span_id),
payload,
}
}
}
Actor框架对比
| 特性 | Actix | Tokio Channel | Riker | Coerce |
|---|---|---|---|---|
| 消息传递 | 类型化Message | 手动enum | 类型化 | 类型化 |
| 监督树 | 内置 | 手动实现 | 内置 | 内置 |
| 远程Actor | actix-remote | 手动实现 | 支持 | 内置 |
| 性能 | 高 | 极高 | 中 | 高 |
| 依赖 | actix | 仅tokio | riker | coerce |
| 学习曲线 | 中等 | 平缓 | 中等 | 中等 |
| 生态成熟度 | 高 | 高 | 低 | 低 |
| 适用场景 | Web服务 | 通用并发 | 传统Actor | 分布式 |
总结
Rust Actor模型的核心哲学是:不要通过共享内存来通信,而要通过通信来共享内存。Actix提供了成熟的类型化Actor系统,Tokio Channel Actor则更轻量灵活。监督树是生产级Actor系统的基石——没有监督的Actor系统就像没有安全带的汽车。在分布式场景中,统一本地/远程通信接口是关键。始终使用有界mailbox实现背压,为所有远程调用设置超时,用drain机制确保优雅关闭,你的Actor系统将既健壮又高效。
推荐工具
本站提供浏览器本地工具,免注册即可试用 →