Rust Actor模型框架實戰:從訊息傳遞到監督樹的5種生產模式
當併發模型遇上Rust的所有權
你有沒有遇到過這種情況——系統需要同時處理上千個獨立的狀態實體,每個實體都有自己的生命週期和訊息佇列。你試過用 Arc<Mutex<HashMap>> 管理共享狀態,結果鎖競爭讓吞吐量跌到谷底;你試過用執行緒池分發任務,卻發現執行緒間通訊的開銷比計算本身還大;最終你意識到,Actor模型才是這類問題的正解。
Actor模型的核心思想很簡單:每個Actor是一個獨立的計算單元,擁有私有狀態,只能透過訊息傳遞與外界互動。沒有共享狀態,就沒有鎖競爭;沒有鎖競爭,就沒有死結。Rust的所有權系統與Actor模型天然契合——訊息的所有權在傳送時轉移,編譯器幫你保證執行緒安全。
核心概念一覽
| 概念 | 說明 | 典型場景 |
|---|---|---|
| Actor | 獨立計算單元,封裝狀態與行為 | 狀態機、連線管理 |
| Message | Actor間通訊的唯一方式 | 命令、事件、查詢 |
| Mailbox | Actor的訊息佇列 | 訊息緩衝、有序處理 |
| Supervision | 父Actor監控子Actor | 故障恢復、重啟策略 |
| Address | Actor的引用句柄 | 訊息定址、路由 |
| Context | Actor的執行時環境 | 計時器、子Actor管理 |
| Backpressure | 流量控制機制 | 防止Mailbox溢位 |
Actor模型的五大挑戰
1. 共享狀態的鎖地獄
傳統併發模型依賴共享記憶體+鎖,但在高併發場景下,鎖競爭成為瓶頸。Actor模型透過「不共享」來解決這個問題——每個Actor獨佔自己的狀態。
2. 故障隔離與恢復
一個元件崩潰不應該拖垮整個系統。Actor模型透過監督樹實現故障隔離——子Actor崩潰由父Actor決定如何處理(重啟、停止、升級重啟等)。
3. 訊息有序性保證
同一個Actor處理訊息天然有序,但跨Actor的訊息順序如何保證?需要精心設計訊息協定和路由策略。
4. 分散式透明性
本地Actor和遠端Actor的通訊方式應該一致。如何在不改變業務邏輯的前提下,讓Actor跨節點通訊?
5. 優雅關閉的複雜性
一個Actor系統可能有成百上千個Actor,如何確保關閉時所有訊息都被處理完畢、所有資源都被釋放?
五種生產級Actor模式
模式一:Actix Actor系統——經典Actor模式
Actix是Rust生態中最成熟的Actor框架,基於型別化的訊息處理,提供完整的Actor生命週期管理。
use actix::prelude::*;
use std::time::Duration;
struct GameSession {
id: String,
score: u64,
level: u32,
}
impl Actor for GameSession {
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {
println!("session {} started, score: {}", self.id, self.score);
}
fn stopping(&mut self, _ctx: &mut Self::Context) -> Running {
println!("session {} stopping", self.id);
Running::Continue
}
fn stopped(&mut self, _ctx: &mut Self::Context) {
println!("session {} stopped, final score: {}", self.id, self.score);
}
}
#[derive(Message)]
#[rtype(result = "u64")]
struct GetScore;
#[derive(Message)]
#[rtype(result = "()")]
struct AddScore { amount: u64 }
#[derive(Message)]
#[rtype(result = "()")]
struct LevelUp;
impl Handler<GetScore> for GameSession {
type Result = u64;
fn handle(&mut self, _msg: GetScore, _ctx: &mut Context<Self>) -> u64 {
self.score
}
}
impl Handler<AddScore> for GameSession {
type Result = ();
fn handle(&mut self, msg: AddScore, _ctx: &mut Context<Self>) {
self.score += msg.amount;
println!("session {} score: {}", self.id, self.score);
}
}
impl Handler<LevelUp> for GameSession {
type Result = ();
fn handle(&mut self, _msg: LevelUp, ctx: &mut Context<Self>) {
self.level += 1;
println!("session {} leveled up to {}", self.id, self.level);
ctx.notify_later(LevelUp, Duration::from_secs(30));
}
}
#[actix::main]
async fn main() {
let addr = GameSession {
id: "player-001".into(),
score: 0,
level: 1,
}.start();
addr.send(AddScore { amount: 100 }).await.unwrap();
addr.send(AddScore { amount: 50 }).await.unwrap();
addr.send(LevelUp).await.unwrap();
let score = addr.send(GetScore).await.unwrap();
println!("final score: {}", score);
}
關鍵要點:
impl Actor定義Actor生命週期:started、stopping、stopped#[derive(Message)]定義型別化訊息,rtype指定回傳型別Handler<T>trait 為每種訊息型別實作處理邏輯ctx.notify_later實現定時自訊息,構建週期性任務addr.send()非同步傳送訊息並等待回應
模式二:Tokio Channel-Based Actor——輕量級Actor模式
不依賴Actor框架,用Tokio channel手寫Actor,更輕量、更靈活,適合對效能和可控性要求極高的場景。
use tokio::sync::{mpsc, oneshot};
use std::collections::HashMap;
enum CacheCommand {
Get {
key: String,
responder: oneshot::Sender<Option<String>>,
},
Set {
key: String,
value: String,
ttl: Option<std::time::Duration>,
responder: oneshot::Sender<bool>,
},
Delete {
key: String,
responder: oneshot::Sender<bool>,
},
Stats {
responder: oneshot::Sender<CacheStats>,
},
}
#[derive(Debug)]
struct CacheStats {
entries: usize,
hits: u64,
misses: u64,
}
struct CacheEntry {
value: String,
expires_at: Option<std::time::Instant>,
}
struct CacheActor {
store: HashMap<String, CacheEntry>,
hits: u64,
misses: u64,
}
impl CacheActor {
fn new() -> Self {
Self {
store: HashMap::new(),
hits: 0,
misses: 0,
}
}
fn is_expired(&self, entry: &CacheEntry) -> bool {
match entry.expires_at {
Some(t) => std::time::Instant::now() > t,
None => false,
}
}
async fn run(&mut self, mut rx: mpsc::Receiver<CacheCommand>) {
while let Some(cmd) = rx.recv().await {
match cmd {
CacheCommand::Get { key, responder } => {
let result = match self.store.get(&key) {
Some(entry) if !self.is_expired(entry) => {
self.hits += 1;
Some(entry.value.clone())
}
Some(_) => {
self.store.remove(&key);
self.misses += 1;
None
}
None => {
self.misses += 1;
None
}
};
let _ = responder.send(result);
}
CacheCommand::Set { key, value, ttl, responder } => {
let expires_at = ttl.map(|d| std::time::Instant::now() + d);
self.store.insert(key, CacheEntry { value, expires_at });
let _ = responder.send(true);
}
CacheCommand::Delete { key, responder } => {
let existed = self.store.remove(&key).is_some();
let _ = responder.send(existed);
}
CacheCommand::Stats { responder } => {
let stats = CacheStats {
entries: self.store.len(),
hits: self.hits,
misses: self.misses,
};
let _ = responder.send(stats);
}
}
}
println!("cache actor shutting down, {} entries remaining", self.store.len());
}
}
#[tokio::main]
async fn main() {
let (tx, rx) = mpsc::channel::<CacheCommand>(256);
let mut actor = CacheActor::new();
tokio::spawn(async move { actor.run(rx).await });
let (resp_tx, resp_rx) = oneshot::channel();
tx.send(CacheCommand::Set {
key: "user:1001".into(),
value: r#"{"name":"alice"}"#.into(),
ttl: Some(std::time::Duration::from_secs(300)),
responder: resp_tx,
}).await.unwrap();
resp_rx.await.unwrap();
let (resp_tx, resp_rx) = oneshot::channel();
tx.send(CacheCommand::Get {
key: "user:1001".into(),
responder: resp_tx,
}).await.unwrap();
let result = resp_rx.await.unwrap();
println!("get result: {:?}", result);
let (resp_tx, resp_rx) = oneshot::channel();
tx.send(CacheCommand::Stats { responder: resp_tx }).await.unwrap();
let stats = resp_rx.await.unwrap();
println!("cache stats: {:?}", stats);
}
關鍵要點:
mpscchannel 作為 Actor 的 mailbox,oneshotchannel 實現請求-回應enum Command定義訊息協定,型別安全且可擴展- Actor 是單一擁有者,天然無資料競爭
- 無需外部框架依賴,Tokio 即可構建完整 Actor 系統
ttl過期機制展示 Actor 內部狀態管理的靈活性
Channel Actor vs Actix 對比:
| 特性 | Channel Actor | Actix |
|---|---|---|
| 依賴 | 僅 tokio | actix + tokio |
| 訊息型別 | 手動 enum | derive Message |
| 回應機制 | oneshot channel | Message rtype |
| 生命週期 | 手動管理 | 自動回呼 |
| 監督 | 手動實作 | 內建支援 |
| 效能開銷 | 極低 | 低 |
| 學習曲線 | 平緩 | 中等 |
模式三:監督樹——故障恢復模式
監督樹是Actor模型最強大的特性之一:父Actor監控子Actor,當子Actor崩潰時根據策略決定如何恢復。
use tokio::sync::{mpsc, oneshot};
use tokio::time::{sleep, Duration};
use std::collections::HashMap;
#[derive(Debug, Clone)]
enum RestartStrategy {
Permanent,
Transient,
Temporary,
}
#[derive(Debug)]
enum WorkerMessage {
Process { data: String, responder: oneshot::Sender<String> },
Crash,
Status { responder: oneshot::Sender<WorkerStatus> },
}
#[derive(Debug, Clone)]
struct WorkerStatus {
id: String,
processed: u64,
alive: bool,
}
struct Worker {
id: String,
processed: u64,
}
impl Worker {
fn new(id: String) -> Self {
Self { id, processed: 0 }
}
async fn run(
&mut self,
mut rx: mpsc::Receiver<WorkerMessage>,
supervisor_tx: mpsc::Sender<SupervisorMessage>,
) {
println!("worker {} started", self.id);
while let Some(msg) = rx.recv().await {
match msg {
WorkerMessage::Process { data, responder } => {
self.processed += 1;
let result = format!("processed by {}: {}", self.id, data.to_uppercase());
let _ = responder.send(result);
}
WorkerMessage::Crash => {
println!("worker {} crashing!", self.id);
let _ = supervisor_tx.send(SupervisorMessage::ChildCrashed {
id: self.id.clone(),
}).await;
return;
}
WorkerMessage::Status { responder } => {
let _ = responder.send(WorkerStatus {
id: self.id.clone(),
processed: self.processed,
alive: true,
});
}
}
}
let _ = supervisor_tx.send(SupervisorMessage::ChildStopped {
id: self.id.clone(),
}).await;
}
}
enum SupervisorMessage {
ChildCrashed { id: String },
ChildStopped { id: String },
SpawnWorker { id: String },
GetStatus { responder: oneshot::Sender<Vec<WorkerStatus>> },
}
struct ChildHandle {
tx: mpsc::Sender<WorkerMessage>,
strategy: RestartStrategy,
}
struct Supervisor {
children: HashMap<String, ChildHandle>,
supervisor_rx: mpsc::Receiver<SupervisorMessage>,
supervisor_tx: mpsc::Sender<SupervisorMessage>,
restart_count: HashMap<String, u32>,
max_restarts: u32,
}
impl Supervisor {
fn new(rx: mpsc::Receiver<SupervisorMessage>, tx: mpsc::Sender<SupervisorMessage>, max_restarts: u32) -> Self {
Self {
children: HashMap::new(),
supervisor_rx: rx,
supervisor_tx: tx,
restart_count: HashMap::new(),
max_restarts,
}
}
async fn spawn_child(&mut self, id: String, strategy: RestartStrategy) {
let (tx, rx) = mpsc::channel::<WorkerMessage>(64);
let supervisor_tx = self.supervisor_tx.clone();
let worker_id = id.clone();
let mut worker = Worker::new(id.clone());
tokio::spawn(async move {
worker.run(rx, supervisor_tx).await;
});
self.children.insert(id, ChildHandle { tx, strategy });
println!("supervisor spawned child {}", worker_id);
}
async fn restart_child(&mut self, id: &str) -> bool {
let restarts = self.restart_count.entry(id.to_string()).or_insert(0);
if *restarts >= self.max_restarts {
println!("supervisor: child {} exceeded max restarts ({})", id, self.max_restarts);
return false;
}
*restarts += 1;
println!("supervisor: restarting child {} (attempt {})", id, *restarts);
let strategy = self.children.get(id).map(|h| h.strategy.clone());
if let Some(strategy) = strategy {
self.children.remove(id);
self.spawn_child(id.to_string(), strategy).await;
true
} else {
false
}
}
async fn run(&mut self) {
while let Some(msg) = self.supervisor_rx.recv().await {
match msg {
SupervisorMessage::ChildCrashed { id } => {
let strategy = self.children.get(&id).map(|h| h.strategy.clone());
match strategy {
Some(RestartStrategy::Permanent) => {
self.restart_child(&id).await;
}
Some(RestartStrategy::Transient) => {
self.restart_child(&id).await;
}
Some(RestartStrategy::Temporary) => {
println!("supervisor: child {} crashed, not restarting (temporary)", id);
self.children.remove(&id);
}
None => {}
}
}
SupervisorMessage::ChildStopped { id } => {
let strategy = self.children.get(&id).map(|h| h.strategy.clone());
match strategy {
Some(RestartStrategy::Permanent) => {
self.restart_child(&id).await;
}
Some(RestartStrategy::Transient) | Some(RestartStrategy::Temporary) => {
self.children.remove(&id);
}
None => {}
}
}
SupervisorMessage::SpawnWorker { id } => {
self.spawn_child(id, RestartStrategy::Permanent).await;
}
SupervisorMessage::GetStatus { responder } => {
let mut statuses = Vec::new();
for (id, handle) in &self.children {
let (resp_tx, resp_rx) = oneshot::channel();
if handle.tx.send(WorkerMessage::Status { responder: resp_tx }).await.is_ok() {
if let Ok(status) = resp_rx.await {
statuses.push(status);
}
}
}
let _ = responder.send(statuses);
}
}
}
}
}
#[tokio::main]
async fn main() {
let (sup_tx, sup_rx) = mpsc::channel::<SupervisorMessage>(256);
let mut supervisor = Supervisor::new(sup_rx, sup_tx.clone(), 3);
tokio::spawn(async move { supervisor.run().await });
sup_tx.send(SupervisorMessage::SpawnWorker { id: "worker-1".into() }).await.unwrap();
sup_tx.send(SupervisorMessage::SpawnWorker { id: "worker-2".into() }).await.unwrap();
sleep(Duration::from_millis(50)).await;
let (resp_tx, resp_rx) = oneshot::channel();
sup_tx.send(SupervisorMessage::GetStatus { responder: resp_tx }).await.unwrap();
let statuses = resp_rx.await.unwrap();
for s in statuses {
println!("status: {:?}", s);
}
}
關鍵要點:
- 監督者(Supervisor)持有子Actor的
Sender,透過mpsc接收崩潰通知 - 三種重啟策略:
Permanent(總是重啟)、Transient(異常退出重啟)、Temporary(不重啟) max_restarts防止無限重啟迴圈- 子Actor崩潰時主動通知監督者,監督者決定是否重啟
- 監督樹可以巢狀:監督者本身也可以是被監督的子Actor
重啟策略對比:
| 策略 | 正常退出 | 異常退出 | 典型場景 |
|---|---|---|---|
| Permanent | 重啟 | 重啟 | 常駐服務、定時任務 |
| Transient | 不重啟 | 重啟 | 請求處理器、臨時計算 |
| Temporary | 不重啟 | 不重啟 | 一次性任務、除錯 |
模式四:分散式Actor——遠端訊息模式
當Actor跨越程序邊界時,需要序列化訊息並透過網路傳輸。這個模式展示如何構建可遠端通訊的Actor系統。
use tokio::sync::{mpsc, oneshot};
use tokio::net::{TcpListener, TcpStream};
use serde::{Serialize, Deserialize};
use std::collections::HashMap;
use std::net::SocketAddr;
#[derive(Debug, Serialize, Deserialize)]
struct RemoteEnvelope {
target: String,
message_type: String,
payload: Vec<u8>,
reply_to: Option<u64>,
}
#[derive(Debug, Serialize, Deserialize)]
enum OrderCommand {
Create { order_id: String, item: String, quantity: u32 },
Cancel { order_id: String },
Query { order_id: String },
}
#[derive(Debug, Serialize, Deserialize)]
enum OrderResponse {
Created { order_id: String },
Cancelled { order_id: String },
Found { order_id: String, item: String, quantity: u32 },
NotFound { order_id: String },
Error { message: String },
}
#[derive(Debug, Serialize, Deserialize, Clone)]
struct Order {
id: String,
item: String,
quantity: u32,
status: String,
}
struct OrderActor {
orders: HashMap<String, Order>,
cmd_rx: mpsc::Receiver<(OrderCommand, oneshot::Sender<OrderResponse>)>,
}
impl OrderActor {
fn new(rx: mpsc::Receiver<(OrderCommand, oneshot::Sender<OrderResponse>)>) -> Self {
Self { orders: HashMap::new(), cmd_rx: rx }
}
async fn run(&mut self) {
while let Some((cmd, responder)) = self.cmd_rx.recv().await {
let response = match cmd {
OrderCommand::Create { order_id, item, quantity } => {
if self.orders.contains_key(&order_id) {
OrderResponse::Error { message: format!("order {} already exists", order_id) }
} else {
self.orders.insert(order_id.clone(), Order {
id: order_id.clone(),
item,
quantity,
status: "created".into(),
});
OrderResponse::Created { order_id }
}
}
OrderCommand::Cancel { order_id } => {
match self.orders.get_mut(&order_id) {
Some(order) => {
order.status = "cancelled".into();
OrderResponse::Cancelled { order_id }
}
None => OrderResponse::NotFound { order_id },
}
}
OrderCommand::Query { order_id } => {
match self.orders.get(&order_id) {
Some(order) => OrderResponse::Found {
order_id: order.id.clone(),
item: order.item.clone(),
quantity: order.quantity,
},
None => OrderResponse::NotFound { order_id },
}
}
};
let _ = responder.send(response);
}
}
}
struct RemoteGateway {
local_actors: HashMap<String, mpsc::Sender<(OrderCommand, oneshot::Sender<OrderResponse>)>>,
remote_nodes: HashMap<String, SocketAddr>,
next_request_id: u64,
}
impl RemoteGateway {
fn new() -> Self {
Self {
local_actors: HashMap::new(),
remote_nodes: HashMap::new(),
next_request_id: 0,
}
}
fn register_local(&mut self, name: String, tx: mpsc::Sender<(OrderCommand, oneshot::Sender<OrderResponse>)>) {
self.local_actors.insert(name, tx);
}
fn register_remote(&mut self, name: String, addr: SocketAddr) {
self.remote_nodes.insert(name, addr);
}
async fn send_command(&mut self, target: &str, cmd: OrderCommand) -> Option<OrderResponse> {
if let Some(tx) = self.local_actors.get(target) {
let (resp_tx, resp_rx) = oneshot::channel();
if tx.send((cmd, resp_tx)).await.is_ok() {
return resp_rx.await.ok();
}
return None;
}
if let Some(addr) = self.remote_nodes.get(target) {
match TcpStream::connect(addr).await {
Ok(mut stream) => {
let payload = bincode::serialize(&cmd).ok()?;
let request_id = self.next_request_id;
self.next_request_id += 1;
let envelope = RemoteEnvelope {
target: target.to_string(),
message_type: "OrderCommand".to_string(),
payload,
reply_to: Some(request_id),
};
let data = bincode::serialize(&envelope).ok()?;
let len = data.len() as u32;
use tokio::io::AsyncWriteExt;
if stream.write_all(&len.to_le_bytes()).await.is_err() { return None; }
if stream.write_all(&data).await.is_err() { return None; }
use tokio::io::AsyncReadExt;
let mut len_buf = [0u8; 4];
if stream.read_exact(&mut len_buf).await.is_err() { return None; }
let resp_len = u32::from_le_bytes(len_buf) as usize;
let mut resp_buf = vec![0u8; resp_len];
if stream.read_exact(&mut resp_buf).await.is_err() { return None; }
bincode::deserialize(&resp_buf).ok()
}
Err(e) => {
eprintln!("failed to connect to {}: {}", target, e);
None
}
}
} else {
None
}
}
}
async fn server_node(addr: SocketAddr) {
let (tx, rx) = mpsc::channel::<(OrderCommand, oneshot::Sender<OrderResponse>)>(256);
let mut actor = OrderActor::new(rx);
tokio::spawn(async move { actor.run().await });
let listener = TcpListener::bind(addr).await.unwrap();
println!("order actor listening on {}", addr);
loop {
if let Ok((mut stream, _)) = listener.accept().await {
let tx = tx.clone();
tokio::spawn(async move {
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
let mut len_buf = [0u8; 4];
if stream.read_exact(&mut len_buf).await.is_err() { return; }
let len = u32::from_le_bytes(len_buf) as usize;
let mut buf = vec![0u8; len];
if stream.read_exact(&mut buf).await.is_err() { return; }
let envelope: RemoteEnvelope = match bincode::deserialize(&buf) {
Ok(e) => e,
Err(_) => return,
};
let cmd: OrderCommand = match bincode::deserialize(&envelope.payload) {
Ok(c) => c,
Err(_) => return,
};
let (resp_tx, resp_rx) = oneshot::channel();
if tx.send((cmd, resp_tx)).await.is_err() { return; }
let response = match resp_rx.await {
Ok(r) => r,
Err(_) => return,
};
let resp_data = bincode::serialize(&response).unwrap();
let resp_len = resp_data.len() as u32;
let _ = stream.write_all(&resp_len.to_le_bytes()).await;
let _ = stream.write_all(&resp_data).await;
});
}
}
}
#[tokio::main]
async fn main() {
let server_addr: SocketAddr = "127.0.0.1:9001".parse().unwrap();
tokio::spawn(server_node(server_addr));
tokio::time::sleep(Duration::from_millis(100)).await;
let mut gateway = RemoteGateway::new();
gateway.register_remote("order-service".into(), server_addr);
let response = gateway.send_command("order-service", OrderCommand::Create {
order_id: "ORD-001".into(),
item: "Rust Book".into(),
quantity: 2,
}).await;
println!("create response: {:?}", response);
let response = gateway.send_command("order-service", OrderCommand::Query {
order_id: "ORD-001".into(),
}).await;
println!("query response: {:?}", response);
}
關鍵要點:
RemoteEnvelope封裝訊息元資料:目標Actor、訊息型別、承載、回覆位址- 本地Actor直接透過 channel 通訊,遠端Actor透過 TCP 傳輸序列化訊息
RemoteGateway統一本地/遠端路由,業務程式碼無需關心Actor位置bincode提供高效的二進位序列化,比 JSON 更適合 Actor 間通訊- 長度前綴幀協定(4位元組長度+資料)解決 TCP 黏包問題
模式五:生產級Actor系統——優雅關閉與背壓
生產環境中的Actor系統需要處理優雅關閉、背壓控制、訊息超時等複雜場景。
use tokio::sync::{mpsc, oneshot, watch, broadcast};
use tokio::time::{timeout, Duration, Instant};
use tokio::select;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
struct ActorSystem {
shutdown_tx: watch::Sender<bool>,
shutdown_rx: watch::Receiver<bool>,
event_tx: broadcast::Sender<String>,
metrics: Arc<ActorMetrics>,
}
struct ActorMetrics {
messages_sent: AtomicU64,
messages_processed: AtomicU64,
messages_dropped: AtomicU64,
actors_alive: AtomicU64,
}
impl ActorMetrics {
fn new() -> Self {
Self {
messages_sent: AtomicU64::new(0),
messages_processed: AtomicU64::new(0),
messages_dropped: AtomicU64::new(0),
actors_alive: AtomicU64::new(0),
}
}
fn snapshot(&self) -> String {
format!(
"sent: {}, processed: {}, dropped: {}, alive: {}",
self.messages_sent.load(Ordering::Relaxed),
self.messages_processed.load(Ordering::Relaxed),
self.messages_dropped.load(Ordering::Relaxed),
self.actors_alive.load(Ordering::Relaxed),
)
}
}
enum WorkerCmd {
Execute { task: String, responder: oneshot::Sender<String> },
Ping { responder: oneshot::Sender<bool> },
}
struct ResilientWorker {
id: String,
cmd_rx: mpsc::Receiver<WorkerCmd>,
shutdown_rx: watch::Receiver<bool>,
event_tx: broadcast::Sender<String>,
metrics: Arc<ActorMetrics>,
mailbox_capacity: usize,
process_timeout: Duration,
}
impl ResilientWorker {
async fn run(&mut self) {
self.metrics.actors_alive.fetch_add(1, Ordering::Relaxed);
println!("worker {} started", self.id);
loop {
select! {
_ = self.shutdown_rx.changed() => {
if *self.shutdown_rx.borrow() {
self.drain_mailbox().await;
break;
}
}
cmd = self.cmd_rx.recv() => {
match cmd {
Some(cmd) => self.handle_command(cmd).await,
None => break,
}
}
}
}
self.metrics.actors_alive.fetch_sub(1, Ordering::Relaxed);
println!("worker {} stopped", self.id);
}
async fn handle_command(&mut self, cmd: WorkerCmd) {
match cmd {
WorkerCmd::Execute { task, responder } => {
self.metrics.messages_sent.fetch_add(1, Ordering::Relaxed);
let result = match timeout(self.process_timeout, self.process_task(&task)).await {
Ok(output) => {
self.metrics.messages_processed.fetch_add(1, Ordering::Relaxed);
output
}
Err(_) => {
self.metrics.messages_dropped.fetch_add(1, Ordering::Relaxed);
let _ = self.event_tx.send(format!("worker {} timeout on: {}", self.id, task));
"timeout".to_string()
}
};
let _ = responder.send(result);
}
WorkerCmd::Ping { responder } => {
let _ = responder.send(true);
}
}
}
async fn process_task(&self, task: &str) -> String {
tokio::time::sleep(Duration::from_millis(10)).await;
format!("worker {} processed: {}", self.id, task.to_uppercase())
}
async fn drain_mailbox(&mut self) {
println!("worker {} draining mailbox...", self.id);
let drain_timeout = Duration::from_secs(5);
let deadline = Instant::now() + drain_timeout;
loop {
if Instant::now() > deadline {
println!("worker {} drain timeout, {} messages remaining", self.id, self.cmd_rx.len());
break;
}
match timeout(Duration::from_millis(100), self.cmd_rx.recv()).await {
Ok(Some(WorkerCmd::Execute { task, responder })) => {
let result = self.process_task(&task).await;
self.metrics.messages_processed.fetch_add(1, Ordering::Relaxed);
let _ = responder.send(result);
}
Ok(Some(WorkerCmd::Ping { responder })) => {
let _ = responder.send(false);
}
Ok(None) => break,
Err(_) => continue,
}
}
println!("worker {} mailbox drained", self.id);
}
}
impl ActorSystem {
fn new() -> Self {
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let (event_tx, _) = broadcast::channel::<String>(256);
let metrics = Arc::new(ActorMetrics::new());
Self { shutdown_tx, shutdown_rx, event_tx, metrics }
}
async fn spawn_worker(&self, id: String, mailbox_capacity: usize) -> mpsc::Sender<WorkerCmd> {
let (tx, rx) = mpsc::channel::<WorkerCmd>(mailbox_capacity);
let mut worker = ResilientWorker {
id,
cmd_rx: rx,
shutdown_rx: self.shutdown_rx.clone(),
event_tx: self.event_tx.clone(),
metrics: self.metrics.clone(),
mailbox_capacity,
process_timeout: Duration::from_secs(2),
};
tokio::spawn(async move { worker.run().await });
tx
}
async fn graceful_shutdown(&self) {
println!("initiating graceful shutdown...");
self.shutdown_tx.send(true).ok();
tokio::time::sleep(Duration::from_secs(6)).await;
println!("final metrics: {}", self.metrics.snapshot());
}
}
async fn backpressure_sender(
tx: mpsc::Sender<WorkerCmd>,
worker_id: String,
total: u64,
) -> u64 {
let mut sent = 0u64;
let mut dropped = 0u64;
for i in 0..total {
let (resp_tx, resp_rx) = oneshot::channel();
let task = format!("task-{}", i);
match timeout(Duration::from_millis(50), tx.send(WorkerCmd::Execute {
task,
responder: resp_tx,
})).await {
Ok(Ok(())) => {
match timeout(Duration::from_secs(3), resp_rx).await {
Ok(Ok(result)) => {
sent += 1;
if i % 100 == 0 {
println!("{} result: {}", worker_id, result);
}
}
_ => dropped += 1,
}
}
_ => dropped += 1,
}
}
println!("{} sent: {}, dropped: {}", worker_id, sent, dropped);
dropped
}
#[tokio::main]
async fn main() {
let system = ActorSystem::new();
let worker1 = system.spawn_worker("worker-1".into(), 64).await;
let worker2 = system.spawn_worker("worker-2".into(), 64).await;
let h1 = tokio::spawn(backpressure_sender(worker1, "sender-1".into(), 200));
let h2 = tokio::spawn(backpressure_sender(worker2, "sender-2".into(), 200));
h1.await.unwrap();
h2.await.unwrap();
println!("metrics: {}", system.metrics.snapshot());
system.graceful_shutdown().await;
}
關鍵要點:
watch::channel(false)廣播關閉訊號,所有Actor同時收到通知drain_mailbox()在關閉前處理剩餘訊息,設定超時防止無限等待timeout包裹send和recv,防止背壓導致無限阻塞ActorMetrics用AtomicU64採集指標,無鎖且執行緒安全- 有界 mailbox(容量 64)天然實現背壓,生產者在滿時等待
五大常見陷阱
陷阱一:Actor間循環等待
// ❌ 錯誤:Actor A等Actor B的回應,Actor B等Actor A的回應
async fn deadlock_pattern() {
let (tx_a, mut rx_a) = mpsc::channel::<String>(16);
let (tx_b, mut rx_b) = mpsc::channel::<String>(16);
tokio::spawn(async move {
tx_b.send("request-from-a".into()).await.unwrap();
let response = rx_a.recv().await.unwrap();
});
tokio::spawn(async move {
tx_a.send("request-from-b".into()).await.unwrap();
let response = rx_b.recv().await.unwrap();
});
}
// ✅ 正確:使用超時或單向訊息打破循環
async fn safe_pattern() {
let (tx_a, mut rx_a) = mpsc::channel::<String>(16);
let (tx_b, mut rx_b) = mpsc::channel::<String>(16);
tokio::spawn(async move {
tx_b.send("request-from-a".into()).await.unwrap();
match timeout(Duration::from_secs(5), rx_a.recv()).await {
Ok(Some(response)) => println!("got: {}", response),
_ => println!("timeout or channel closed"),
}
});
}
陷阱二:忘記處理Mailbox滿的情況
// ❌ 錯誤:無界channel,記憶體可能爆炸
async fn unbounded_risk() {
let (tx, mut rx) = mpsc::unbounded_channel::<String>();
for i in 0..1000000 {
tx.send(format!("msg-{}", i)).unwrap();
}
}
// ✅ 正確:有界channel + 背壓
async fn bounded_safe() {
let (tx, mut rx) = mpsc::channel::<String>(256);
for i in 0..1000 {
match tx.send(format!("msg-{}", i)).await {
Ok(()) => {}
Err(_) => {
println!("receiver dropped at {}", i);
return;
}
}
}
}
陷阱三:監督者不限制重啟次數
// ❌ 錯誤:無限重啟,可能形成崩潰-重啟迴圈
async fn infinite_restart_loop() {
loop {
let (tx, rx) = mpsc::channel::<String>(16);
let handle = tokio::spawn(async move {
panic!("always crash!");
});
let _ = handle.await;
println!("restarting...");
}
}
// ✅ 正確:限制重啟次數和頻率
struct RestartPolicy {
max_restarts: u32,
window: Duration,
restarts: Vec<Instant>,
}
impl RestartPolicy {
fn new(max_restarts: u32, window: Duration) -> Self {
Self { max_restarts, window, restarts: Vec::new() }
}
fn should_restart(&mut self) -> bool {
let now = Instant::now();
self.restarts.retain(|t| now - *t < self.window);
if self.restarts.len() < self.max_restarts as usize {
self.restarts.push(now);
true
} else {
false
}
}
}
陷阱四:遠端訊息不設超時
// ❌ 錯誤:遠端呼叫可能永遠阻塞
async fn remote_without_timeout() {
let response = gateway.send_command("remote-actor", cmd).await;
}
// ✅ 正確:所有遠端呼叫必須設超時
async fn remote_with_timeout() {
match timeout(Duration::from_secs(5), gateway.send_command("remote-actor", cmd)).await {
Ok(Some(response)) => println!("got: {:?}", response),
Ok(None) => println!("actor not found"),
Err(_) => println!("remote call timeout"),
}
}
陷阱五:關閉時丟棄未處理訊息
// ❌ 錯誤:直接drop receiver,訊息遺失
async fn bad_shutdown(mut rx: mpsc::Receiver<String>) {
drop(rx);
}
// ✅ 正確:drain後再關閉
async fn good_shutdown(mut rx: mpsc::Receiver<String>) {
println!("draining remaining messages...");
while let Ok(Some(msg)) = timeout(Duration::from_millis(100), rx.recv()).await {
println!("drained: {}", msg);
}
println!("all messages processed");
}
錯誤排查速查表
| 錯誤現象 | 可能原因 | 解決方案 |
|---|---|---|
| Actor不回應訊息 | Mailbox滿或死結 | 檢查mailbox容量,新增超時機制 |
SendError 頻繁出現 |
接收端已關閉 | 檢查Actor是否意外退出,新增監督 |
| 記憶體持續增長 | 無界channel或訊息積壓 | 改用有界channel,新增背壓 |
| 重啟迴圈 | Actor初始化失敗 | 檢查啟動邏輯,限制重啟次數 |
| 遠端呼叫超時 | 網路延遲或對端無回應 | 新增超時和重試機制 |
| 訊息順序錯亂 | 多生產者併發傳送 | 單生產者保證順序,或新增序列號 |
| 優雅關閉卡住 | 某個Actor不回應關閉訊號 | 為drain設定超時 |
oneshot RecvError |
回應端未傳送就退出 | 檢查所有程式碼路徑是否都傳送了回應 |
進階最佳化技巧
1. Actor池化——負載均衡
當單個Actor成為瓶頸時,用Actor池分發訊息:
use tokio::sync::mpsc;
struct ActorPool {
workers: Vec<mpsc::Sender<WorkerCmd>>,
next: std::sync::atomic::AtomicUsize,
}
impl ActorPool {
fn new(workers: Vec<mpsc::Sender<WorkerCmd>>) -> Self {
Self {
workers,
next: std::sync::atomic::AtomicUsize::new(0),
}
}
fn next_worker(&self) -> &mpsc::Sender<WorkerCmd> {
let idx = self.next.fetch_add(1, Ordering::Relaxed) % self.workers.len();
&self.workers[idx]
}
async fn send(&self, cmd: WorkerCmd) -> bool {
let worker = self.next_worker();
worker.send(cmd).await.is_ok()
}
}
2. 訊息優先級
use tokio::sync::mpsc;
use tokio::select;
enum Priority {
High,
Normal,
Low,
}
struct PriorityMailbox {
high_rx: mpsc::Receiver<String>,
normal_rx: mpsc::Receiver<String>,
low_rx: mpsc::Receiver<String>,
}
impl PriorityMailbox {
async fn recv(&mut self) -> Option<String> {
select! {
biased;
msg = self.high_rx.recv() => msg,
msg = self.normal_rx.recv() => msg,
msg = self.low_rx.recv() => msg,
}
}
}
3. 訊息追蹤與鏈路ID
use std::sync::atomic::{AtomicU64, Ordering};
static TRACE_ID: AtomicU64 = AtomicU64::new(1);
fn new_trace_id() -> u64 {
TRACE_ID.fetch_add(1, Ordering::Relaxed)
}
#[derive(Debug, Clone)]
struct TracedMessage {
trace_id: u64,
span_id: u64,
parent_span: Option<u64>,
payload: String,
}
impl TracedMessage {
fn new(payload: String) -> Self {
Self {
trace_id: new_trace_id(),
span_id: new_trace_id(),
parent_span: None,
payload,
}
}
fn child(&self, payload: String) -> Self {
Self {
trace_id: self.trace_id,
span_id: new_trace_id(),
parent_span: Some(self.span_id),
payload,
}
}
}
Actor框架對比
| 特性 | Actix | Tokio Channel | Riker | Coerce |
|---|---|---|---|---|
| 訊息傳遞 | 型別化Message | 手動enum | 型別化 | 型別化 |
| 監督樹 | 內建 | 手動實作 | 內建 | 內建 |
| 遠端Actor | actix-remote | 手動實作 | 支援 | 內建 |
| 效能 | 高 | 極高 | 中 | 高 |
| 依賴 | actix | 僅tokio | riker | coerce |
| 學習曲線 | 中等 | 平緩 | 中等 | 中等 |
| 生態成熟度 | 高 | 高 | 低 | 低 |
| 適用場景 | Web服務 | 通用併發 | 傳統Actor | 分散式 |
總結
Rust Actor模型的核心哲學是:不要透過共享記憶體來通訊,而要透過通訊來共享記憶體。Actix提供了成熟的型別化Actor系統,Tokio Channel Actor則更輕量靈活。監督樹是生產級Actor系統的基石——沒有監督的Actor系統就像沒有安全帶的汽車。在分散式場景中,統一本地/遠端通訊介面是關鍵。始終使用有界mailbox實現背壓,為所有遠端呼叫設定超時,用drain機制確保優雅關閉,你的Actor系統將既健壯又高效。
推薦工具
本站提供瀏覽器本地工具,免註冊即可試用 →