Redis Streams事件驅動架構實戰:從消費者組到事件溯源的6種生產模式
微服務通信的困境:Kafka太重,Redis Streams剛剛好
訂單服務要通知庫存扣減、支付服務要廣播交易結果、使用者註冊要觸發歡迎郵件——微服務之間的事件通信無處不在。你上Kafka?叢集運維、ZooKeeper/KRaft協調、分區再平衡……一個小服務根本扛不住這套基礎設施。你用RabbitMQ?交換機綁定、路由鍵配置、訊息確認機制……複雜度也不低。2026年,Redis Streams作為輕量級事件驅動方案,正在成為中小規模微服務通信的首選。
本文將從6種生產模式出發,帶你完成基礎生產消費→消費者組→事件溯源→CQRS讀模型→死信佇列→生產級事件處理服務的全鏈路實戰,每一步都有完整可執行的Python和Go程式碼。
Redis Streams核心概念
| 概念 | 說明 |
|---|---|
| Stream | Redis 5.0引入的日誌型資料結構,類似Kafka Topic,按時間順序儲存訊息 |
| XADD | 向Stream追加訊息,自動產生時間戳ID(如1718432000000-0) |
| XREAD | 從Stream讀取訊息,支援阻塞與非阻塞模式 |
| Consumer Group | 消費者組,類似Kafka Consumer Group,確保每條訊息只被組內一個消費者處理 |
| XREADGROUP | 以消費者組身份讀取訊息,訊息進入Pending狀態 |
| XACK | 確認訊息已處理,從Pending列表移除 |
| Pending Entries List (PEL) | 已投遞但未確認的訊息列表,用於故障恢復和重試 |
| XPENDING | 檢視消費者組的Pending訊息統計 |
| XCLAIM | 將Pending訊息轉移給其他消費者,用於故障轉移 |
| Event Sourcing | 事件溯源模式,所有狀態變更以事件序列持久化,可重建任意時刻狀態 |
| CQRS | 命令查詢職責分離,寫模型(事件流)與讀模型(投影)獨立演進 |
| Dead Letter Queue (DLQ) | 死信佇列,處理失敗超過重試閾值的訊息,避免無限重試阻塞消費 |
問題分析:事件驅動架構的5大挑戰
- 訊息遺失與重複消費:消費者當機時未ACK的訊息可能遺失或被重複投遞,需要Exactly-Once或At-Least-Once語義保證
- 消費者組負載不均:部分消費者處理慢導致訊息堆積,其他消費者閒置,需要動態再平衡和XCLAIM轉移
- 事件溯源的快照與回放:事件流無限增長,全量回放成本高,需要定期快照+增量事件的組合策略
- CQRS讀模型一致性:讀模型投影是最終一致,使用者看到的資料可能滯後,需要版本追蹤和補償機制
- 死信與重試風暴:訊息處理反覆失敗導致重試風暴,需要指數退避+死信佇列+人工干預的完整策略
分步實操:6種Redis Streams事件驅動模式
模式1:基礎Stream生產者/消費者(XADD/XREAD)
最簡單的生產消費模型,一個生產者寫入Stream,一個消費者讀取。
import redis
import json
import time
import threading
r = redis.Redis(host="localhost", port=6379, decode_responses=True)
STREAM_NAME = "events:orders"
def producer():
for i in range(1, 6):
event = {
"event_type": "order_created",
"order_id": f"ORD-{i:04d}",
"amount": 100 * i,
"user_id": f"USR-{i:03d}",
"timestamp": time.time()
}
msg_id = r.xadd(STREAM_NAME, event)
print(f"[Producer] Added event: {msg_id} -> {event}")
time.sleep(0.5)
def consumer():
last_id = "0-0"
while True:
entries = r.xread({STREAM_NAME: last_id}, count=5, block=5000)
if not entries:
print("[Consumer] No new messages, waiting...")
continue
for stream_name, messages in entries:
for msg_id, fields in messages:
print(f"[Consumer] Processed: {msg_id} -> {fields}")
last_id = msg_id
if __name__ == "__main__":
threading.Thread(target=producer, daemon=True).start()
consumer()
Go版本:
package main
import (
"context"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
var ctx = context.Background()
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
streamName := "events:orders"
go producer(rdb, streamName)
consumer(rdb, streamName)
}
func producer(rdb *redis.Client, streamName string) {
for i := 1; i <= 5; i++ {
values := map[string]interface{}{
"event_type": "order_created",
"order_id": fmt.Sprintf("ORD-%04d", i),
"amount": fmt.Sprintf("%d", 100*i),
"user_id": fmt.Sprintf("USR-%03d", i),
"timestamp": fmt.Sprintf("%d", time.Now().UnixMilli()),
}
msgID, err := rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
Values: values,
}).Result()
if err != nil {
fmt.Printf("[Producer] Error: %v\n", err)
continue
}
fmt.Printf("[Producer] Added event: %s\n", msgID)
time.Sleep(500 * time.Millisecond)
}
}
func consumer(rdb *redis.Client, streamName string) {
lastID := "0-0"
for {
results, err := rdb.XRead(ctx, &redis.XReadArgs{
Streams: []string{streamName, lastID},
Count: 5,
Block: 5 * time.Second,
}).Result()
if err != nil {
if err == redis.Nil {
fmt.Println("[Consumer] No new messages, waiting...")
continue
}
fmt.Printf("[Consumer] Error: %v\n", err)
continue
}
for _, stream := range results {
for _, msg := range stream.Messages {
fmt.Printf("[Consumer] Processed: %s -> %v\n", msg.ID, msg.Values)
lastID = msg.ID
}
}
}
}
模式2:消費者組(XREADGROUP)
消費者組確保每條訊息只被組內一個消費者處理,類似Kafka的Consumer Group。
import redis
import time
import threading
import random
r = redis.Redis(host="localhost", port=6379, decode_responses=True)
STREAM_NAME = "events:payments"
GROUP_NAME = "payment-processors"
try:
r.xgroup_create(STREAM_NAME, GROUP_NAME, id="0", mkstream=True)
print(f"[Init] Created consumer group: {GROUP_NAME}")
except redis.ResponseError as e:
if "BUSYGROUP" in str(e):
print(f"[Init] Group already exists: {GROUP_NAME}")
else:
raise
def group_consumer(consumer_name: str):
while True:
entries = r.xreadgroup(
GROUP_NAME, consumer_name,
{STREAM_NAME: ">"},
count=3, block=3000
)
if not entries:
print(f"[{consumer_name}] No new messages")
continue
for stream_name, messages in entries:
for msg_id, fields in messages:
process_time = random.uniform(0.1, 0.5)
time.sleep(process_time)
r.xack(STREAM_NAME, GROUP_NAME, msg_id)
print(f"[{consumer_name}] ACKed: {msg_id} -> {fields}")
def check_pending():
pending = r.xpending_range(STREAM_NAME, GROUP_NAME, min="-", max="+", count=10)
if pending:
print(f"[Monitor] Pending messages: {len(pending)}")
for p in pending:
print(f" msg_id={p['message_id']}, consumer={p['consumer']}, "
f"delivered={p['times_delivered']} times")
if __name__ == "__main__":
for i in range(1, 4):
for j in range(1, 3):
r.xadd(STREAM_NAME, {
"event_type": "payment_completed",
"payment_id": f"PAY-{i:02d}{j:02d}",
"amount": str(random.randint(50, 500)),
})
threading.Thread(target=group_consumer, args=("consumer-1",), daemon=True).start()
threading.Thread(target=group_consumer, args=("consumer-2",), daemon=True).start()
time.sleep(1)
check_pending()
time.sleep(5)
Go版本:
package main
import (
"context"
"fmt"
"math/rand"
"time"
"github.com/redis/go-redis/v9"
)
var ctx = context.Background()
func main() {
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
streamName := "events:payments"
groupName := "payment-processors"
err := rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()
if err != nil {
fmt.Printf("[Init] Group create: %v\n", err)
} else {
fmt.Printf("[Init] Created consumer group: %s\n", groupName)
}
for i := 1; i <= 3; i++ {
for j := 1; j <= 2; j++ {
rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
Values: map[string]interface{}{
"event_type": "payment_completed",
"payment_id": fmt.Sprintf("PAY-%02d%02d", i, j),
"amount": fmt.Sprintf("%d", rand.Intn(450)+50),
},
})
}
}
go groupConsumer(rdb, streamName, groupName, "consumer-1")
go groupConsumer(rdb, streamName, groupName, "consumer-2")
time.Sleep(6 * time.Second)
}
func groupConsumer(rdb *redis.Client, stream, group, consumer string) {
for {
results, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: group,
Consumer: consumer,
Streams: []string{stream, ">"},
Count: 3,
Block: 3 * time.Second,
}).Result()
if err != nil {
if err == redis.Nil {
fmt.Printf("[%s] No new messages\n", consumer)
continue
}
fmt.Printf("[%s] Error: %v\n", consumer, err)
continue
}
for _, stream := range results {
for _, msg := range stream.Messages {
fmt.Printf("[%s] Processing: %s -> %v\n", consumer, msg.ID, msg.Values)
rdb.XAck(ctx, stream, group, msg.ID)
fmt.Printf("[%s] ACKed: %s\n", consumer, msg.ID)
}
}
}
}
模式3:事件溯源(Event Sourcing)
所有狀態變更以事件序列持久化到Stream,可重建任意時刻的聚合根狀態。
import redis
import json
from dataclasses import dataclass, field
from typing import List, Dict
r = redis.Redis(host="localhost", port=6379, decode_responses=True)
@dataclass
class OrderState:
order_id: str = ""
status: str = "created"
items: List[Dict] = field(default_factory=list)
total: float = 0.0
version: int = 0
class EventStore:
def __init__(self, aggregate_type: str, aggregate_id: str):
self.stream_key = f"events:{aggregate_type}:{aggregate_id}"
self.aggregate_type = aggregate_type
self.aggregate_id = aggregate_id
def append(self, event_type: str, data: dict):
event = {
"event_type": event_type,
"aggregate_id": self.aggregate_id,
"data": json.dumps(data),
"version": str(self._next_version()),
}
msg_id = r.xadd(self.stream_key, event)
print(f"[EventStore] Appended: {msg_id} type={event_type} version={event['version']}")
return msg_id
def _next_version(self) -> int:
info = r.xinfo_stream(self.stream_key)
length = info.get("length", 0)
return length + 1
def load_events(self) -> List[dict]:
entries = r.xrange(self.stream_key, "-", "+", count=1000)
events = []
for msg_id, fields in entries:
events.append({
"id": msg_id,
"event_type": fields["event_type"],
"data": json.loads(fields["data"]),
"version": int(fields["version"]),
})
return events
class OrderAggregate:
def __init__(self, order_id: str):
self.order_id = order_id
self.store = EventStore("order", order_id)
self.state = OrderState(order_id=order_id)
def create(self, items: List[Dict]):
total = sum(item["price"] * item["qty"] for item in items)
self.store.append("OrderCreated", {"items": items, "total": total})
def add_item(self, item: Dict):
self.store.append("ItemAdded", item)
def pay(self):
self.store.append("OrderPaid", {"payment_method": "credit_card"})
def ship(self, tracking_number: str):
self.store.append("OrderShipped", {"tracking_number": tracking_number})
def rebuild(self) -> OrderState:
events = self.store.load_events()
state = OrderState(order_id=self.order_id)
for event in events:
self._apply(state, event)
self.state = state
return state
def _apply(self, state: OrderState, event: dict):
event_type = event["event_type"]
data = event["data"]
if event_type == "OrderCreated":
state.items = data.get("items", [])
state.total = data.get("total", 0)
state.status = "created"
elif event_type == "ItemAdded":
state.items.append(data)
state.total += data.get("price", 0) * data.get("qty", 1)
elif event_type == "OrderPaid":
state.status = "paid"
elif event_type == "OrderShipped":
state.status = "shipped"
state.version = event["version"]
if __name__ == "__main__":
order = OrderAggregate("ORD-0001")
order.create([{"sku": "WIDGET-01", "price": 29.9, "qty": 2}])
order.add_item({"sku": "GADGET-02", "price": 49.9, "qty": 1})
order.pay()
order.ship("SF1234567890")
rebuilt = order.rebuild()
print(f"\n[Rebuilt State] order_id={rebuilt.order_id} status={rebuilt.status} "
f"total={rebuilt.total} items={len(rebuilt.items)} version={rebuilt.version}")
Go版本:
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/redis/go-redis/v9"
)
var ctx = context.Background()
type OrderState struct {
OrderID string `json:"order_id"`
Status string `json:"status"`
Items []map[string]interface{} `json:"items"`
Total float64 `json:"total"`
Version int `json:"version"`
}
type Event struct {
ID string `json:"id"`
EventType string `json:"event_type"`
Data map[string]interface{} `json:"data"`
Version int `json:"version"`
}
type EventStore struct {
rdb *redis.Client
StreamKey string
AggregateID string
}
func NewEventStore(rdb *redis.Client, aggregateType, aggregateID string) *EventStore {
return &EventStore{
rdb: rdb,
StreamKey: fmt.Sprintf("events:%s:%s", aggregateType, aggregateID),
AggregateID: aggregateID,
}
}
func (s *EventStore) Append(eventType string, data map[string]interface{}) (string, error) {
info, err := s.rdb.XInfoStream(ctx, s.StreamKey).Result()
version := 1
if err == nil {
version = int(info.Length) + 1
}
dataJSON, _ := json.Marshal(data)
msgID, err := s.rdb.XAdd(ctx, &redis.XAddArgs{
Stream: s.StreamKey,
Values: map[string]interface{}{
"event_type": eventType,
"aggregate_id": s.AggregateID,
"data": string(dataJSON),
"version": fmt.Sprintf("%d", version),
},
}).Result()
if err != nil {
return "", err
}
fmt.Printf("[EventStore] Appended: %s type=%s version=%d\n", msgID, eventType, version)
return msgID, nil
}
func (s *EventStore) LoadEvents() ([]Event, error) {
entries, err := s.rdb.XRange(ctx, s.StreamKey, "-", "+").Result()
if err != nil {
return nil, err
}
var events []Event
for _, msg := range entries {
var data map[string]interface{}
json.Unmarshal([]byte(msg.Values["data"].(string)), &data)
var version int
fmt.Sscanf(msg.Values["version"].(string), "%d", &version)
events = append(events, Event{
ID: msg.ID,
EventType: msg.Values["event_type"].(string),
Data: data,
Version: version,
})
}
return events, nil
}
type OrderAggregate struct {
OrderID string
Store *EventStore
State OrderState
}
func NewOrderAggregate(rdb *redis.Client, orderID string) *OrderAggregate {
return &OrderAggregate{
OrderID: orderID,
Store: NewEventStore(rdb, "order", orderID),
State: OrderState{OrderID: orderID, Status: "created"},
}
}
func (a *OrderAggregate) Create(items []map[string]interface{}) {
total := 0.0
for _, item := range items {
price, _ := item["price"].(float64)
qty, _ := item["qty"].(float64)
total += price * qty
}
a.Store.Append("OrderCreated", map[string]interface{}{
"items": items, "total": total,
})
}
func (a *OrderAggregate) Pay() {
a.Store.Append("OrderPaid", map[string]interface{}{
"payment_method": "credit_card",
})
}
func (a *OrderAggregate) Ship(trackingNumber string) {
a.Store.Append("OrderShipped", map[string]interface{}{
"tracking_number": trackingNumber,
})
}
func (a *OrderAggregate) Rebuild() (*OrderState, error) {
events, err := a.Store.LoadEvents()
if err != nil {
return nil, err
}
state := OrderState{OrderID: a.OrderID, Status: "created"}
for _, event := range events {
switch event.EventType {
case "OrderCreated":
if items, ok := event.Data["items"].([]interface{}); ok {
for _, item := range items {
if m, ok := item.(map[string]interface{}); ok {
state.Items = append(state.Items, m)
}
}
}
if total, ok := event.Data["total"].(float64); ok {
state.Total = total
}
state.Status = "created"
case "OrderPaid":
state.Status = "paid"
case "OrderShipped":
state.Status = "shipped"
}
state.Version = event.Version
}
a.State = state
return &state, nil
}
func main() {
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
order := NewOrderAggregate(rdb, "ORD-0001")
order.Create([]map[string]interface{}{
{"sku": "WIDGET-01", "price": 29.9, "qty": 2},
})
order.Pay()
order.Ship("SF1234567890")
state, _ := order.Rebuild()
fmt.Printf("\n[Rebuilt State] order_id=%s status=%s total=%.2f items=%d version=%d\n",
state.OrderID, state.Status, state.Total, len(state.Items), state.Version)
}
模式4:CQRS讀模型投影
寫端事件流寫入Stream,讀端監聽Stream並更新投影(讀模型),實現命令查詢職責分離。
import redis
import json
import time
import threading
r = redis.Redis(host="localhost", port=6379, decode_responses=True)
EVENT_STREAM = "events:orders"
PROJECTION_KEY = "projection:order_summary"
GROUP_NAME = "cqrs-projectors"
try:
r.xgroup_create(EVENT_STREAM, GROUP_NAME, id="0", mkstream=True)
except redis.ResponseError:
pass
def command_side_publish_events():
events = [
{"event_type": "OrderCreated", "order_id": "ORD-001", "amount": "299", "user_id": "USR-001"},
{"event_type": "OrderCreated", "order_id": "ORD-002", "amount": "599", "user_id": "USR-002"},
{"event_type": "OrderPaid", "order_id": "ORD-001", "amount": "299"},
{"event_type": "OrderShipped", "order_id": "ORD-001"},
{"event_type": "OrderCreated", "order_id": "ORD-003", "amount": "199", "user_id": "USR-001"},
]
for event in events:
r.xadd(EVENT_STREAM, event)
print(f"[Command] Published: {event['event_type']} -> {event.get('order_id')}")
time.sleep(0.3)
def query_side_projector():
consumer_name = "projector-1"
while True:
entries = r.xreadgroup(
GROUP_NAME, consumer_name,
{EVENT_STREAM: ">"},
count=5, block=3000
)
if not entries:
continue
for stream_name, messages in entries:
for msg_id, fields in messages:
update_projection(fields)
r.xack(EVENT_STREAM, GROUP_NAME, msg_id)
def update_projection(event: dict):
event_type = event.get("event_type")
order_id = event.get("order_id", "")
if event_type == "OrderCreated":
r.hset(PROJECTION_KEY, order_id, json.dumps({
"status": "created",
"amount": event.get("amount", "0"),
"user_id": event.get("user_id", ""),
}))
elif event_type == "OrderPaid":
existing = r.hget(PROJECTION_KEY, order_id)
if existing:
data = json.loads(existing)
data["status"] = "paid"
r.hset(PROJECTION_KEY, order_id, json.dumps(data))
elif event_type == "OrderShipped":
existing = r.hget(PROJECTION_KEY, order_id)
if existing:
data = json.loads(existing)
data["status"] = "shipped"
r.hset(PROJECTION_KEY, order_id, json.dumps(data))
print(f"[Projection] Updated: {order_id} <- {event_type}")
def query_read_model():
time.sleep(3)
all_orders = r.hgetall(PROJECTION_KEY)
print("\n[Read Model] Order Summary:")
for order_id, data in all_orders.items():
print(f" {order_id}: {data}")
if __name__ == "__main__":
threading.Thread(target=command_side_publish_events, daemon=True).start()
threading.Thread(target=query_side_projector, daemon=True).start()
query_read_model()
Go版本:
package main
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
var ctx = context.Background()
const (
eventStream = "events:orders"
projectionKey = "projection:order_summary"
groupName = "cqrs-projectors"
)
func main() {
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
rdb.XGroupCreateMkStream(ctx, eventStream, groupName, "0")
go commandSidePublishEvents(rdb)
go querySideProjector(rdb)
time.Sleep(4 * time.Second)
queryReadModel(rdb)
}
func commandSidePublishEvents(rdb *redis.Client) {
events := []map[string]interface{}{
{"event_type": "OrderCreated", "order_id": "ORD-001", "amount": "299", "user_id": "USR-001"},
{"event_type": "OrderCreated", "order_id": "ORD-002", "amount": "599", "user_id": "USR-002"},
{"event_type": "OrderPaid", "order_id": "ORD-001", "amount": "299"},
{"event_type": "OrderShipped", "order_id": "ORD-001"},
{"event_type": "OrderCreated", "order_id": "ORD-003", "amount": "199", "user_id": "USR-001"},
}
for _, event := range events {
rdb.XAdd(ctx, &redis.XAddArgs{Stream: eventStream, Values: event})
fmt.Printf("[Command] Published: %s -> %s\n", event["event_type"], event["order_id"])
time.Sleep(300 * time.Millisecond)
}
}
func querySideProjector(rdb *redis.Client) {
for {
results, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: "projector-1",
Streams: []string{eventStream, ">"},
Count: 5,
Block: 3 * time.Second,
}).Result()
if err != nil {
continue
}
for _, stream := range results {
for _, msg := range stream.Messages {
updateProjection(rdb, msg.Values)
rdb.XAck(ctx, eventStream, groupName, msg.ID)
}
}
}
}
func updateProjection(rdb *redis.Client, fields map[string]interface{}) {
eventType, _ := fields["event_type"].(string)
orderID, _ := fields["order_id"].(string)
switch eventType {
case "OrderCreated":
data, _ := json.Marshal(map[string]string{
"status": "created",
"amount": fmt.Sprintf("%v", fields["amount"]),
"user_id": fmt.Sprintf("%v", fields["user_id"]),
})
rdb.HSet(ctx, projectionKey, orderID, string(data))
case "OrderPaid":
existing, _ := rdb.HGet(ctx, projectionKey, orderID).Result()
if existing != "" {
var data map[string]string
json.Unmarshal([]byte(existing), &data)
data["status"] = "paid"
updated, _ := json.Marshal(data)
rdb.HSet(ctx, projectionKey, orderID, string(updated))
}
case "OrderShipped":
existing, _ := rdb.HGet(ctx, projectionKey, orderID).Result()
if existing != "" {
var data map[string]string
json.Unmarshal([]byte(existing), &data)
data["status"] = "shipped"
updated, _ := json.Marshal(data)
rdb.HSet(ctx, projectionKey, orderID, string(updated))
}
}
fmt.Printf("[Projection] Updated: %s <- %s\n", orderID, eventType)
}
func queryReadModel(rdb *redis.Client) {
orders, _ := rdb.HGetAll(ctx, projectionKey).Result()
fmt.Println("\n[Read Model] Order Summary:")
for orderID, data := range orders {
fmt.Printf(" %s: %s\n", orderID, data)
}
}
模式5:死信佇列與重試機制
訊息處理失敗超過重試閾值後轉入死信佇列,避免無限重試阻塞消費。
import redis
import json
import time
import random
r = redis.Redis(host="localhost", port=6379, decode_responses=True)
MAIN_STREAM = "events:orders"
DLQ_STREAM = "events:orders:dlq"
GROUP_NAME = "order-processors"
MAX_RETRIES = 3
RETRY_DELAYS = [1, 5, 30]
try:
r.xgroup_create(MAIN_STREAM, GROUP_NAME, id="0", mkstream=True)
except redis.ResponseError:
pass
def process_message(fields: dict) -> bool:
order_id = fields.get("order_id", "")
if random.random() < 0.4:
print(f" [FAIL] Processing failed for {order_id}")
return False
print(f" [OK] Processed {order_id}")
return True
def consumer_with_retry(consumer_name: str):
while True:
entries = r.xreadgroup(
GROUP_NAME, consumer_name,
{MAIN_STREAM: ">"},
count=1, block=3000
)
if not entries:
continue
for stream_name, messages in entries:
for msg_id, fields in messages:
success = process_message(fields)
if success:
r.xack(MAIN_STREAM, GROUP_NAME, msg_id)
else:
pending_info = r.xpending_range(
MAIN_STREAM, GROUP_NAME,
min=msg_id, max=msg_id, count=1
)
retry_count = 0
if pending_info:
retry_count = pending_info[0].get("times_delivered", 1) - 1
if retry_count >= MAX_RETRIES:
r.xadd(DLQ_STREAM, {
**fields,
"original_id": msg_id,
"failure_reason": "max_retries_exceeded",
"retry_count": str(retry_count),
})
r.xack(MAIN_STREAM, GROUP_NAME, msg_id)
print(f" [DLQ] Moved to dead letter: {msg_id}")
else:
delay = RETRY_DELAYS[min(retry_count, len(RETRY_DELAYS) - 1)]
print(f" [RETRY] Will retry {msg_id} after {delay}s "
f"(attempt {retry_count + 1}/{MAX_RETRIES})")
r.xack(MAIN_STREAM, GROUP_NAME, msg_id)
time.sleep(delay)
r.xadd(MAIN_STREAM, fields)
def inspect_dlq():
dlq_entries = r.xrange(DLQ_STREAM, "-", "+", count=20)
print(f"\n[DLQ] {len(dlq_entries)} dead letters:")
for msg_id, fields in dlq_entries:
print(f" {msg_id}: order_id={fields.get('order_id')} "
f"retries={fields.get('retry_count')} reason={fields.get('failure_reason')}")
if __name__ == "__main__":
for i in range(1, 11):
r.xadd(MAIN_STREAM, {"order_id": f"ORD-{i:04d}", "amount": str(i * 100)})
random.seed(42)
consumer_with_retry("consumer-1")
Go版本:
package main
import (
"context"
"fmt"
"math/rand"
"time"
"github.com/redis/go-redis/v9"
)
var ctx = context.Background()
const (
mainStream = "events:orders"
dlqStream = "events:orders:dlq"
groupName = "order-processors"
maxRetries = 3
)
var retryDelays = []time.Duration{1 * time.Second, 5 * time.Second, 30 * time.Second}
func main() {
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
rdb.XGroupCreateMkStream(ctx, mainStream, groupName, "0")
for i := 1; i <= 10; i++ {
rdb.XAdd(ctx, &redis.XAddArgs{
Stream: mainStream,
Values: map[string]interface{}{
"order_id": fmt.Sprintf("ORD-%04d", i),
"amount": fmt.Sprintf("%d", i*100),
},
})
}
consumerWithRetry(rdb, "consumer-1")
}
func processMessage(fields map[string]interface{}) bool {
orderID := fmt.Sprintf("%v", fields["order_id"])
if rand.Float64() < 0.4 {
fmt.Printf(" [FAIL] Processing failed for %s\n", orderID)
return false
}
fmt.Printf(" [OK] Processed %s\n", orderID)
return true
}
func consumerWithRetry(rdb *redis.Client, consumerName string) {
for {
results, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: consumerName,
Streams: []string{mainStream, ">"},
Count: 1,
Block: 3 * time.Second,
}).Result()
if err != nil {
continue
}
for _, stream := range results {
for _, msg := range stream.Messages {
if processMessage(msg.Values) {
rdb.XAck(ctx, mainStream, groupName, msg.ID)
} else {
pending, _ := rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: mainStream,
Group: groupName,
Start: msg.ID,
End: msg.ID,
Count: 1,
}).Result()
retryCount := 0
if len(pending) > 0 {
retryCount = int(pending[0].DeliveryCount) - 1
}
if retryCount >= maxRetries {
dlqValues := map[string]interface{}{
"original_id": msg.ID,
"failure_reason": "max_retries_exceeded",
"retry_count": fmt.Sprintf("%d", retryCount),
}
for k, v := range msg.Values {
dlqValues[k] = v
}
rdb.XAdd(ctx, &redis.XAddArgs{Stream: dlqStream, Values: dlqValues})
rdb.XAck(ctx, mainStream, groupName, msg.ID)
fmt.Printf(" [DLQ] Moved to dead letter: %s\n", msg.ID)
} else {
delay := retryDelays[min(retryCount, len(retryDelays)-1)]
fmt.Printf(" [RETRY] Will retry %s after %v (attempt %d/%d)\n",
msg.ID, delay, retryCount+1, maxRetries)
rdb.XAck(ctx, mainStream, groupName, msg.ID)
time.Sleep(delay)
rdb.XAdd(ctx, &redis.XAddArgs{Stream: mainStream, Values: msg.Values})
}
}
}
}
}
}
模式6:生產級事件處理服務(Go實作)
完整的生產級事件處理服務,包含優雅關閉、健康檢查、指標監控、XCLAIM故障轉移。
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/redis/go-redis/v9"
)
type EventProcessor struct {
rdb *redis.Client
stream string
group string
consumer string
dlqStream string
maxRetries int
processed atomic.Int64
failed atomic.Int64
retried atomic.Int64
mu sync.Mutex
running bool
ctx context.Context
cancel context.CancelFunc
}
func NewEventProcessor(rdb *redis.Client, stream, group, consumer string) *EventProcessor {
ctx, cancel := context.WithCancel(context.Background())
return &EventProcessor{
rdb: rdb,
stream: stream,
group: group,
consumer: consumer,
dlqStream: stream + ":dlq",
maxRetries: 3,
running: true,
ctx: ctx,
cancel: cancel,
}
}
func (p *EventProcessor) Start() {
rdb := p.rdb
rdb.XGroupCreateMkStream(p.ctx, p.stream, p.group, "0")
log.Printf("[Processor] Starting consumer %s for stream %s", p.consumer, p.stream)
go p.claimPendingMessages()
go p.consumeNewMessages()
go p.reportMetrics()
}
func (p *EventProcessor) consumeNewMessages() {
for p.running {
results, err := p.rdb.XReadGroup(p.ctx, &redis.XReadGroupArgs{
Group: p.group,
Consumer: p.consumer,
Streams: []string{p.stream, ">"},
Count: 10,
Block: 2 * time.Second,
}).Result()
if err != nil {
if err == redis.Nil || p.ctx.Err() != nil {
continue
}
log.Printf("[Consumer] Error: %v", err)
continue
}
for _, stream := range results {
for _, msg := range stream.Messages {
p.handleMessage(msg.ID, msg.Values)
}
}
}
}
func (p *EventProcessor) claimPendingMessages() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
pending, err := p.rdb.XPendingExt(p.ctx, &redis.XPendingExtArgs{
Stream: p.stream,
Group: p.group,
Start: "-",
End: "+",
Count: 10,
Idle: 60 * time.Second,
}).Result()
if err != nil {
continue
}
for _, msg := range pending {
claimed, err := p.rdb.XClaim(p.ctx, &redis.XClaimArgs{
Stream: p.stream,
Group: p.group,
Consumer: p.consumer,
MinIdle: 60 * time.Second,
Messages: []string{msg.ID},
}).Result()
if err == nil && len(claimed) > 0 {
log.Printf("[Claim] Claimed pending message: %s", claimed[0].ID)
p.retried.Add(1)
p.handleMessage(claimed[0].ID, claimed[0].Values)
}
}
case <-p.ctx.Done():
return
}
}
}
func (p *EventProcessor) handleMessage(msgID string, values map[string]interface{}) {
err := p.processEvent(values)
if err != nil {
p.failed.Add(1)
pending, _ := p.rdb.XPendingExt(p.ctx, &redis.XPendingExtArgs{
Stream: p.stream,
Group: p.group,
Start: msgID,
End: msgID,
Count: 1,
}).Result()
deliveryCount := 1
if len(pending) > 0 {
deliveryCount = int(pending[0].DeliveryCount)
}
if deliveryCount >= p.maxRetries {
dlqValues := map[string]interface{}{
"original_id": msgID,
"failure_reason": err.Error(),
"retry_count": fmt.Sprintf("%d", deliveryCount),
}
for k, v := range values {
dlqValues[k] = v
}
p.rdb.XAdd(p.ctx, &redis.XAddArgs{Stream: p.dlqStream, Values: dlqValues})
p.rdb.XAck(p.ctx, p.stream, p.group, msgID)
log.Printf("[DLQ] Moved %s to dead letter after %d attempts", msgID, deliveryCount)
} else {
log.Printf("[Retry] Message %s failed (attempt %d/%d): %v",
msgID, deliveryCount, p.maxRetries, err)
}
return
}
p.rdb.XAck(p.ctx, p.stream, p.group, msgID)
p.processed.Add(1)
}
func (p *EventProcessor) processEvent(values map[string]interface{}) error {
eventType, _ := values["event_type"].(string)
switch eventType {
case "order_created":
return p.handleOrderCreated(values)
case "payment_completed":
return p.handlePaymentCompleted(values)
default:
log.Printf("[Process] Unknown event type: %s", eventType)
return nil
}
}
func (p *EventProcessor) handleOrderCreated(values map[string]interface{}) error {
orderID := fmt.Sprintf("%v", values["order_id"])
log.Printf("[Process] Order created: %s", orderID)
return nil
}
func (p *EventProcessor) handlePaymentCompleted(values map[string]interface{}) error {
paymentID := fmt.Sprintf("%v", values["payment_id"])
log.Printf("[Process] Payment completed: %s", paymentID)
return nil
}
func (p *EventProcessor) reportMetrics() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
log.Printf("[Metrics] processed=%d failed=%d retried=%d",
p.processed.Load(), p.failed.Load(), p.retried.Load())
case <-p.ctx.Done():
return
}
}
}
func (p *EventProcessor) Stop() {
log.Println("[Processor] Shutting down gracefully...")
p.running = false
p.cancel()
time.Sleep(2 * time.Second)
log.Printf("[Processor] Final metrics: processed=%d failed=%d retried=%d",
p.processed.Load(), p.failed.Load(), p.retried.Load())
}
func main() {
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
processor := NewEventProcessor(rdb, "events:orders", "order-processors", "worker-1")
processor.Start()
for i := 1; i <= 20; i++ {
rdb.XAdd(context.Background(), &redis.XAddArgs{
Stream: "events:orders",
Values: map[string]interface{}{
"event_type": "order_created",
"order_id": fmt.Sprintf("ORD-%04d", i),
"amount": fmt.Sprintf("%d", i*100),
},
})
}
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
processor.Stop()
}
避坑指南:5個常見錯誤
❌ 坑1:XREAD不使用消費者組,訊息被重複消費
# ❌ 錯誤:多個消費者獨立XREAD,每條訊息被所有消費者處理
def bad_consumer():
entries = r.xread({STREAM_NAME: "0-0"}, count=10, block=5000)
for _, messages in entries:
for msg_id, fields in messages:
process(fields)
# ✅ 正確:使用XREADGROUP,消費者組保證每條訊息只被組內一個消費者處理
def good_consumer():
entries = r.xreadgroup(
GROUP_NAME, "consumer-1",
{STREAM_NAME: ">"},
count=10, block=5000
)
for _, messages in entries:
for msg_id, fields in messages:
process(fields)
r.xack(STREAM_NAME, GROUP_NAME, msg_id)
❌ 坑2:忘記XACK,訊息永遠留在Pending列表
// ❌ 錯誤:處理完訊息不ACK,訊息一直處於Pending狀態
func badConsumer(rdb *redis.Client) {
results, _ := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: "consumer-1",
Streams: []string{streamName, ">"},
Count: 10,
}).Result()
for _, stream := range results {
for _, msg := range stream.Messages {
processEvent(msg.Values)
// 忘記ACK!
}
}
}
// ✅ 正確:處理完成後立即ACK
func goodConsumer(rdb *redis.Client) {
results, _ := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: "consumer-1",
Streams: []string{streamName, ">"},
Count: 10,
}).Result()
for _, stream := range results {
for _, msg := range stream.Messages {
if err := processEvent(msg.Values); err == nil {
rdb.XAck(ctx, streamName, groupName, msg.ID)
}
}
}
}
❌ 坑3:Stream無限增長導致記憶體溢位
# ❌ 錯誤:只寫不刪,Stream無限增長
r.xadd("events:orders", {"data": "..."})
# 從不呼叫XTRIM或XDEL
# ✅ 正確:使用MAXLEN限制Stream長度,或定期XTRIM
r.xadd("events:orders", {"data": "..."}, maxlen=10000, approximate=True)
# 或手動裁剪
r.xtrim("events:orders", maxlen=10000, approximate=True)
❌ 坑4:XCLAIM轉移訊息時不檢查Idle時間
// ❌ 錯誤:盲目XCLAIM所有Pending訊息,可能搶走正在處理的訊息
func badClaim(rdb *redis.Client) {
pending, _ := rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: streamName,
Group: groupName,
Start: "-",
End: "+",
Count: 100,
}).Result()
for _, msg := range pending {
rdb.XClaim(ctx, &redis.XClaimArgs{
Stream: streamName,
Group: groupName,
Consumer: "consumer-2",
Messages: []string{msg.ID},
})
}
}
// ✅ 正確:只Claim閒置時間超過閾值的Pending訊息
func goodClaim(rdb *redis.Client) {
pending, _ := rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: streamName,
Group: groupName,
Start: "-",
End: "+",
Count: 100,
Idle: 60 * time.Second, // 只Claim閒置超過60秒的
}).Result()
for _, msg := range pending {
rdb.XClaim(ctx, &redis.XClaimArgs{
Stream: streamName,
Group: groupName,
Consumer: "consumer-2",
MinIdle: 60 * time.Second,
Messages: []string{msg.ID},
})
}
}
❌ 坑5:消費者組建立時ID設定錯誤
# ❌ 錯誤:用$建立消費者組,只能消費建立後的新訊息,歷史訊息遺失
r.xgroup_create(STREAM_NAME, GROUP_NAME, id="$")
# ✅ 正確:用0建立消費者組,從頭消費所有訊息
r.xgroup_create(STREAM_NAME, GROUP_NAME, id="0")
# 如果確實只想消費新訊息,顯式註解說明意圖
r.xgroup_create(STREAM_NAME, GROUP_NAME, id="$") # 只消費新訊息
錯誤排查表
| 錯誤現象 | 可能原因 | 排查指令 | 解決方案 |
|---|---|---|---|
NOGROUP No such key |
Stream或消費者組不存在 | XINFO GROUPS events:orders |
先XGROUP CREATE建立組,加MKSTREAM自動建立Stream |
BUSYGROUP Consumer Group name already exists |
重複建立消費者組 | XINFO GROUPS events:orders |
捕獲異常忽略,或先檢查組是否存在 |
| 消費者收不到訊息 | XREADGROUP使用0-0而非> |
檢查程式碼中Streams參數 | 新訊息用>,未確認訊息用0-0 |
| Pending訊息堆積 | 消費者當機未ACK | XPENDING events:orders group-name |
使用XCLAIM轉移給活躍消費者 |
| Stream記憶體持續增長 | 未設定MAXLEN/XTRIM | XINFO STREAM events:orders 檢視length |
XADD時加MAXLEN ~ 10000或定期XTRIM |
| 訊息被重複消費 | 消費者處理完成但ACK失敗 | 檢查ACK是否在處理邏輯之後 | 確保ACK在業務邏輯成功後執行 |
| XCLAIM後訊息仍被原消費者處理 | MinIdle時間設定過短 | XPENDING檢視訊息閒置時間 |
設定合理的MinIdle閾值(如60秒) |
| 事件溯源回放慢 | 事件數量過大,全量XRANGE | XLEN events:order:ORD-001 |
定期快照+增量事件,用XRANGE按ID範圍回放 |
| CQRS讀模型不一致 | 投影消費者延遲或當機 | XINFO CONSUMERS檢視消費者延遲 |
監控消費者lag,設定告警閾值 |
XADD回傳ID格式錯誤 |
Redis版本低於5.0 | redis-server --version |
升級Redis至5.0+ |
進階最佳化
1. Stream分片與分區策略
當單Stream吞吐量不足時,可按業務鍵分片到多個Stream:
import hashlib
def get_shard_stream(base_stream: str, key: str, shard_count: int = 16) -> str:
shard_index = int(hashlib.md5(key.encode()).hexdigest(), 16) % shard_count
return f"{base_stream}:shard-{shard_index:03d}"
order_id = "ORD-0001"
stream = get_shard_stream("events:orders", order_id)
r.xadd(stream, {"order_id": order_id, "event_type": "order_created"})
2. 消費者Lag監控與告警
func monitorConsumerLag(rdb *redis.Client, stream, group string) {
ticker := time.NewTicker(10 * time.Second)
for range ticker.C {
info, err := rdb.XInfoStream(ctx, stream).Result()
if err != nil {
continue
}
groups, _ := rdb.XInfoGroups(ctx, stream).Result()
for _, g := range groups {
if g.Name == group {
pending := g.Pending
consumers, _ := rdb.XInfoConsumers(ctx, stream, group).Result()
for _, c := range consumers {
if c.Pending > 100 {
log.Printf("[ALERT] Consumer %s has %d pending messages!",
c.Name, c.Pending)
}
if c.Idle > 5*time.Minute {
log.Printf("[WARN] Consumer %s idle for %v", c.Name, c.Idle)
}
}
if pending > 1000 {
log.Printf("[ALERT] Group %s has %d pending messages!", group, pending)
}
_ = info
}
}
}
}
3. 事件快照與增量回放
import json
SNAPSHOT_INTERVAL = 100
def save_snapshot(aggregate_id: str, state: dict, version: int):
snapshot_key = f"snapshot:order:{aggregate_id}"
r.hset(snapshot_key, mapping={
"state": json.dumps(state),
"version": str(version),
"timestamp": str(time.time()),
})
def load_from_snapshot(aggregate_id: str):
snapshot_key = f"snapshot:order:{aggregate_id}"
data = r.hgetall(snapshot_key)
if not data:
return None, 0
state = json.loads(data["state"])
version = int(data["version"])
return state, version
def rebuild_with_snapshot(aggregate_id: str):
state, version = load_from_snapshot(aggregate_id)
stream_key = f"events:order:{aggregate_id}"
if state is None:
events = r.xrange(stream_key, "-", "+")
else:
min_id = f"{version + 1}-0"
events = r.xrange(stream_key, min_id, "+")
for msg_id, fields in events:
state = apply_event(state or {}, fields)
return state
對比:Redis Streams vs 其他訊息系統
| 特性 | Redis Streams | Kafka | RabbitMQ | NATS JetStream | Pulsar |
|---|---|---|---|---|---|
| 定位 | 輕量級事件流 | 分散式事件流平台 | 訊息代理 | 雲原生訊息系統 | 分散式訊息平台 |
| 部署複雜度 | ★☆☆☆☆ | ★★★★★ | ★★★☆☆ | ★★☆☆☆ | ★★★★☆ |
| 吞吐量 | 10-50萬/s | 100萬+/s | 5-10萬/s | 50-100萬/s | 100萬+/s |
| 訊息持久化 | AOF/RDB | 磁碟日誌 | 可選持久化 | 磁碟日誌 | BookKeeper |
| 消費者組 | ✅ 原生支援 | ✅ 原生支援 | ❌ 需要外掛 | ✅ 原生支援 | ✅ 原生支援 |
| 訊息回放 | ✅ XRANGE | ✅ Offset重置 | ❌ 不支援 | ✅ 支援回放 | ✅ 支援回放 |
| Exactly-Once | ❌ At-Least-Once | ✅ 交易支援 | ❌ 需要外掛 | ✅ 支援 | ✅ 支援 |
| 死信佇列 | ❌ 需手動實作 | ❌ 需手動實作 | ✅ 原生支援 | ✅ 原生支援 | ✅ 原生支援 |
| 分區 | ❌ 需手動分片 | ✅ 自動分區 | ✅ Exchange路由 | ✅ 自動分區 | ✅ 自動分區 |
| 運維成本 | 極低 | 高(ZooKeeper/KRaft) | 中等 | 低 | 高(BookKeeper) |
| 適用場景 | 中小規模微服務事件通信 | 大規模資料流處理 | 企業級訊息路由 | 雲原生輕量級訊息 | 多租戶大規模訊息 |
總結
Redis Streams在2026年依然是中小規模微服務事件驅動架構的最優輕量級選擇。它不需要Kafka那樣重的基礎設施,也不需要RabbitMQ那樣複雜的交換機配置,用6種核心模式就能覆蓋從基礎生產消費到生產級事件處理的全場景。關鍵記住三點:一定要用消費者組(XREADGROUP)、處理完必須XACK、Stream必須設定MAXLEN防止記憶體溢位。當你需要Exactly-Once語義或百萬級吞吐時,再考慮升級到Kafka或Pulsar。
推薦工具
本站提供瀏覽器本地工具,免註冊即可試用 →