Redis Streams事件驱动架构实战:从消费者组到事件溯源的6种生产模式
微服务通信的困境:Kafka太重,Redis Streams刚刚好
订单服务要通知库存扣减、支付服务要广播交易结果、用户注册要触发欢迎邮件——微服务之间的事件通信无处不在。你上Kafka?集群运维、ZooKeeper/KRaft协调、分区再平衡……一个小服务根本扛不住这套基础设施。你用RabbitMQ?交换机绑定、路由键配置、消息确认机制……复杂度也不低。2026年,Redis Streams作为轻量级事件驱动方案,正在成为中小规模微服务通信的首选。
本文将从6种生产模式出发,带你完成基础生产消费→消费者组→事件溯源→CQRS读模型→死信队列→生产级事件处理服务的全链路实战,每一步都有完整可运行的Python和Go代码。
Redis Streams核心概念
| 概念 | 说明 |
|---|---|
| Stream | Redis 5.0引入的日志型数据结构,类似Kafka Topic,按时间顺序存储消息 |
| XADD | 向Stream追加消息,自动生成时间戳ID(如1718432000000-0) |
| XREAD | 从Stream读取消息,支持阻塞与非阻塞模式 |
| Consumer Group | 消费者组,类似Kafka Consumer Group,确保每条消息只被组内一个消费者处理 |
| XREADGROUP | 以消费者组身份读取消息,消息进入Pending状态 |
| XACK | 确认消息已处理,从Pending列表移除 |
| Pending Entries List (PEL) | 已投递但未确认的消息列表,用于故障恢复和重试 |
| XPENDING | 查看消费者组的Pending消息统计 |
| XCLAIM | 将Pending消息转移给其他消费者,用于故障转移 |
| Event Sourcing | 事件溯源模式,所有状态变更以事件序列持久化,可重建任意时刻状态 |
| CQRS | 命令查询职责分离,写模型(事件流)与读模型(投影)独立演进 |
| Dead Letter Queue (DLQ) | 死信队列,处理失败超过重试阈值的消息,避免无限重试阻塞消费 |
问题分析:事件驱动架构的5大挑战
- 消息丢失与重复消费:消费者宕机时未ACK的消息可能丢失或被重复投递,需要Exactly-Once或At-Least-Once语义保证
- 消费者组负载不均:部分消费者处理慢导致消息堆积,其他消费者空闲,需要动态再平衡和XCLAIM转移
- 事件溯源的快照与回放:事件流无限增长,全量回放成本高,需要定期快照+增量事件的组合策略
- CQRS读模型一致性:读模型投影是最终一致,用户看到的数据可能滞后,需要版本追踪和补偿机制
- 死信与重试风暴:消息处理反复失败导致重试风暴,需要指数退避+死信队列+人工干预的完整策略
分步实操:6种Redis Streams事件驱动模式
模式1:基础Stream生产者/消费者(XADD/XREAD)
最简单的生产消费模型,一个生产者写入Stream,一个消费者读取。
import redis
import json
import time
import threading
r = redis.Redis(host="localhost", port=6379, decode_responses=True)
STREAM_NAME = "events:orders"
def producer():
for i in range(1, 6):
event = {
"event_type": "order_created",
"order_id": f"ORD-{i:04d}",
"amount": 100 * i,
"user_id": f"USR-{i:03d}",
"timestamp": time.time()
}
msg_id = r.xadd(STREAM_NAME, event)
print(f"[Producer] Added event: {msg_id} -> {event}")
time.sleep(0.5)
def consumer():
last_id = "0-0"
while True:
entries = r.xread({STREAM_NAME: last_id}, count=5, block=5000)
if not entries:
print("[Consumer] No new messages, waiting...")
continue
for stream_name, messages in entries:
for msg_id, fields in messages:
print(f"[Consumer] Processed: {msg_id} -> {fields}")
last_id = msg_id
if __name__ == "__main__":
threading.Thread(target=producer, daemon=True).start()
consumer()
Go版本:
package main
import (
"context"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
var ctx = context.Background()
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
streamName := "events:orders"
go producer(rdb, streamName)
consumer(rdb, streamName)
}
func producer(rdb *redis.Client, streamName string) {
for i := 1; i <= 5; i++ {
values := map[string]interface{}{
"event_type": "order_created",
"order_id": fmt.Sprintf("ORD-%04d", i),
"amount": fmt.Sprintf("%d", 100*i),
"user_id": fmt.Sprintf("USR-%03d", i),
"timestamp": fmt.Sprintf("%d", time.Now().UnixMilli()),
}
msgID, err := rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
Values: values,
}).Result()
if err != nil {
fmt.Printf("[Producer] Error: %v\n", err)
continue
}
fmt.Printf("[Producer] Added event: %s\n", msgID)
time.Sleep(500 * time.Millisecond)
}
}
func consumer(rdb *redis.Client, streamName string) {
lastID := "0-0"
for {
results, err := rdb.XRead(ctx, &redis.XReadArgs{
Streams: []string{streamName, lastID},
Count: 5,
Block: 5 * time.Second,
}).Result()
if err != nil {
if err == redis.Nil {
fmt.Println("[Consumer] No new messages, waiting...")
continue
}
fmt.Printf("[Consumer] Error: %v\n", err)
continue
}
for _, stream := range results {
for _, msg := range stream.Messages {
fmt.Printf("[Consumer] Processed: %s -> %v\n", msg.ID, msg.Values)
lastID = msg.ID
}
}
}
}
模式2:消费者组(XREADGROUP)
消费者组确保每条消息只被组内一个消费者处理,类似Kafka的Consumer Group。
import redis
import time
import threading
import random
r = redis.Redis(host="localhost", port=6379, decode_responses=True)
STREAM_NAME = "events:payments"
GROUP_NAME = "payment-processors"
try:
r.xgroup_create(STREAM_NAME, GROUP_NAME, id="0", mkstream=True)
print(f"[Init] Created consumer group: {GROUP_NAME}")
except redis.ResponseError as e:
if "BUSYGROUP" in str(e):
print(f"[Init] Group already exists: {GROUP_NAME}")
else:
raise
def group_consumer(consumer_name: str):
while True:
entries = r.xreadgroup(
GROUP_NAME, consumer_name,
{STREAM_NAME: ">"},
count=3, block=3000
)
if not entries:
print(f"[{consumer_name}] No new messages")
continue
for stream_name, messages in entries:
for msg_id, fields in messages:
process_time = random.uniform(0.1, 0.5)
time.sleep(process_time)
r.xack(STREAM_NAME, GROUP_NAME, msg_id)
print(f"[{consumer_name}] ACKed: {msg_id} -> {fields}")
def check_pending():
pending = r.xpending_range(STREAM_NAME, GROUP_NAME, min="-", max="+", count=10)
if pending:
print(f"[Monitor] Pending messages: {len(pending)}")
for p in pending:
print(f" msg_id={p['message_id']}, consumer={p['consumer']}, "
f"delivered={p['times_delivered']} times")
if __name__ == "__main__":
for i in range(1, 4):
for j in range(1, 3):
r.xadd(STREAM_NAME, {
"event_type": "payment_completed",
"payment_id": f"PAY-{i:02d}{j:02d}",
"amount": str(random.randint(50, 500)),
})
threading.Thread(target=group_consumer, args=("consumer-1",), daemon=True).start()
threading.Thread(target=group_consumer, args=("consumer-2",), daemon=True).start()
time.sleep(1)
check_pending()
time.sleep(5)
Go版本:
package main
import (
"context"
"fmt"
"math/rand"
"time"
"github.com/redis/go-redis/v9"
)
var ctx = context.Background()
func main() {
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
streamName := "events:payments"
groupName := "payment-processors"
err := rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()
if err != nil {
fmt.Printf("[Init] Group create: %v\n", err)
} else {
fmt.Printf("[Init] Created consumer group: %s\n", groupName)
}
for i := 1; i <= 3; i++ {
for j := 1; j <= 2; j++ {
rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
Values: map[string]interface{}{
"event_type": "payment_completed",
"payment_id": fmt.Sprintf("PAY-%02d%02d", i, j),
"amount": fmt.Sprintf("%d", rand.Intn(450)+50),
},
})
}
}
go groupConsumer(rdb, streamName, groupName, "consumer-1")
go groupConsumer(rdb, streamName, groupName, "consumer-2")
time.Sleep(6 * time.Second)
}
func groupConsumer(rdb *redis.Client, stream, group, consumer string) {
for {
results, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: group,
Consumer: consumer,
Streams: []string{stream, ">"},
Count: 3,
Block: 3 * time.Second,
}).Result()
if err != nil {
if err == redis.Nil {
fmt.Printf("[%s] No new messages\n", consumer)
continue
}
fmt.Printf("[%s] Error: %v\n", consumer, err)
continue
}
for _, stream := range results {
for _, msg := range stream.Messages {
fmt.Printf("[%s] Processing: %s -> %v\n", consumer, msg.ID, msg.Values)
rdb.XAck(ctx, stream, group, msg.ID)
fmt.Printf("[%s] ACKed: %s\n", consumer, msg.ID)
}
}
}
}
模式3:事件溯源(Event Sourcing)
所有状态变更以事件序列持久化到Stream,可重建任意时刻的聚合根状态。
import redis
import json
from dataclasses import dataclass, field
from typing import List, Dict
r = redis.Redis(host="localhost", port=6379, decode_responses=True)
@dataclass
class OrderState:
order_id: str = ""
status: str = "created"
items: List[Dict] = field(default_factory=list)
total: float = 0.0
version: int = 0
class EventStore:
def __init__(self, aggregate_type: str, aggregate_id: str):
self.stream_key = f"events:{aggregate_type}:{aggregate_id}"
self.aggregate_type = aggregate_type
self.aggregate_id = aggregate_id
def append(self, event_type: str, data: dict):
event = {
"event_type": event_type,
"aggregate_id": self.aggregate_id,
"data": json.dumps(data),
"version": str(self._next_version()),
}
msg_id = r.xadd(self.stream_key, event)
print(f"[EventStore] Appended: {msg_id} type={event_type} version={event['version']}")
return msg_id
def _next_version(self) -> int:
info = r.xinfo_stream(self.stream_key)
length = info.get("length", 0)
return length + 1
def load_events(self) -> List[dict]:
entries = r.xrange(self.stream_key, "-", "+", count=1000)
events = []
for msg_id, fields in entries:
events.append({
"id": msg_id,
"event_type": fields["event_type"],
"data": json.loads(fields["data"]),
"version": int(fields["version"]),
})
return events
class OrderAggregate:
def __init__(self, order_id: str):
self.order_id = order_id
self.store = EventStore("order", order_id)
self.state = OrderState(order_id=order_id)
def create(self, items: List[Dict]):
total = sum(item["price"] * item["qty"] for item in items)
self.store.append("OrderCreated", {"items": items, "total": total})
def add_item(self, item: Dict):
self.store.append("ItemAdded", item)
def pay(self):
self.store.append("OrderPaid", {"payment_method": "credit_card"})
def ship(self, tracking_number: str):
self.store.append("OrderShipped", {"tracking_number": tracking_number})
def rebuild(self) -> OrderState:
events = self.store.load_events()
state = OrderState(order_id=self.order_id)
for event in events:
self._apply(state, event)
self.state = state
return state
def _apply(self, state: OrderState, event: dict):
event_type = event["event_type"]
data = event["data"]
if event_type == "OrderCreated":
state.items = data.get("items", [])
state.total = data.get("total", 0)
state.status = "created"
elif event_type == "ItemAdded":
state.items.append(data)
state.total += data.get("price", 0) * data.get("qty", 1)
elif event_type == "OrderPaid":
state.status = "paid"
elif event_type == "OrderShipped":
state.status = "shipped"
state.version = event["version"]
if __name__ == "__main__":
order = OrderAggregate("ORD-0001")
order.create([{"sku": "WIDGET-01", "price": 29.9, "qty": 2}])
order.add_item({"sku": "GADGET-02", "price": 49.9, "qty": 1})
order.pay()
order.ship("SF1234567890")
rebuilt = order.rebuild()
print(f"\n[Rebuilt State] order_id={rebuilt.order_id} status={rebuilt.status} "
f"total={rebuilt.total} items={len(rebuilt.items)} version={rebuilt.version}")
Go版本:
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/redis/go-redis/v9"
)
var ctx = context.Background()
type OrderState struct {
OrderID string `json:"order_id"`
Status string `json:"status"`
Items []map[string]interface{} `json:"items"`
Total float64 `json:"total"`
Version int `json:"version"`
}
type Event struct {
ID string `json:"id"`
EventType string `json:"event_type"`
Data map[string]interface{} `json:"data"`
Version int `json:"version"`
}
type EventStore struct {
rdb *redis.Client
StreamKey string
AggregateID string
}
func NewEventStore(rdb *redis.Client, aggregateType, aggregateID string) *EventStore {
return &EventStore{
rdb: rdb,
StreamKey: fmt.Sprintf("events:%s:%s", aggregateType, aggregateID),
AggregateID: aggregateID,
}
}
func (s *EventStore) Append(eventType string, data map[string]interface{}) (string, error) {
info, err := s.rdb.XInfoStream(ctx, s.StreamKey).Result()
version := 1
if err == nil {
version = int(info.Length) + 1
}
dataJSON, _ := json.Marshal(data)
msgID, err := s.rdb.XAdd(ctx, &redis.XAddArgs{
Stream: s.StreamKey,
Values: map[string]interface{}{
"event_type": eventType,
"aggregate_id": s.AggregateID,
"data": string(dataJSON),
"version": fmt.Sprintf("%d", version),
},
}).Result()
if err != nil {
return "", err
}
fmt.Printf("[EventStore] Appended: %s type=%s version=%d\n", msgID, eventType, version)
return msgID, nil
}
func (s *EventStore) LoadEvents() ([]Event, error) {
entries, err := s.rdb.XRange(ctx, s.StreamKey, "-", "+").Result()
if err != nil {
return nil, err
}
var events []Event
for _, msg := range entries {
var data map[string]interface{}
json.Unmarshal([]byte(msg.Values["data"].(string)), &data)
var version int
fmt.Sscanf(msg.Values["version"].(string), "%d", &version)
events = append(events, Event{
ID: msg.ID,
EventType: msg.Values["event_type"].(string),
Data: data,
Version: version,
})
}
return events, nil
}
type OrderAggregate struct {
OrderID string
Store *EventStore
State OrderState
}
func NewOrderAggregate(rdb *redis.Client, orderID string) *OrderAggregate {
return &OrderAggregate{
OrderID: orderID,
Store: NewEventStore(rdb, "order", orderID),
State: OrderState{OrderID: orderID, Status: "created"},
}
}
func (a *OrderAggregate) Create(items []map[string]interface{}) {
total := 0.0
for _, item := range items {
price, _ := item["price"].(float64)
qty, _ := item["qty"].(float64)
total += price * qty
}
a.Store.Append("OrderCreated", map[string]interface{}{
"items": items, "total": total,
})
}
func (a *OrderAggregate) Pay() {
a.Store.Append("OrderPaid", map[string]interface{}{
"payment_method": "credit_card",
})
}
func (a *OrderAggregate) Ship(trackingNumber string) {
a.Store.Append("OrderShipped", map[string]interface{}{
"tracking_number": trackingNumber,
})
}
func (a *OrderAggregate) Rebuild() (*OrderState, error) {
events, err := a.Store.LoadEvents()
if err != nil {
return nil, err
}
state := OrderState{OrderID: a.OrderID, Status: "created"}
for _, event := range events {
switch event.EventType {
case "OrderCreated":
if items, ok := event.Data["items"].([]interface{}); ok {
for _, item := range items {
if m, ok := item.(map[string]interface{}); ok {
state.Items = append(state.Items, m)
}
}
}
if total, ok := event.Data["total"].(float64); ok {
state.Total = total
}
state.Status = "created"
case "OrderPaid":
state.Status = "paid"
case "OrderShipped":
state.Status = "shipped"
}
state.Version = event.Version
}
a.State = state
return &state, nil
}
func main() {
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
order := NewOrderAggregate(rdb, "ORD-0001")
order.Create([]map[string]interface{}{
{"sku": "WIDGET-01", "price": 29.9, "qty": 2},
})
order.Pay()
order.Ship("SF1234567890")
state, _ := order.Rebuild()
fmt.Printf("\n[Rebuilt State] order_id=%s status=%s total=%.2f items=%d version=%d\n",
state.OrderID, state.Status, state.Total, len(state.Items), state.Version)
}
模式4:CQRS读模型投影
写端事件流写入Stream,读端监听Stream并更新投影(读模型),实现命令查询职责分离。
import redis
import json
import time
import threading
r = redis.Redis(host="localhost", port=6379, decode_responses=True)
EVENT_STREAM = "events:orders"
PROJECTION_KEY = "projection:order_summary"
GROUP_NAME = "cqrs-projectors"
try:
r.xgroup_create(EVENT_STREAM, GROUP_NAME, id="0", mkstream=True)
except redis.ResponseError:
pass
def command_side_publish_events():
events = [
{"event_type": "OrderCreated", "order_id": "ORD-001", "amount": "299", "user_id": "USR-001"},
{"event_type": "OrderCreated", "order_id": "ORD-002", "amount": "599", "user_id": "USR-002"},
{"event_type": "OrderPaid", "order_id": "ORD-001", "amount": "299"},
{"event_type": "OrderShipped", "order_id": "ORD-001"},
{"event_type": "OrderCreated", "order_id": "ORD-003", "amount": "199", "user_id": "USR-001"},
]
for event in events:
r.xadd(EVENT_STREAM, event)
print(f"[Command] Published: {event['event_type']} -> {event.get('order_id')}")
time.sleep(0.3)
def query_side_projector():
consumer_name = "projector-1"
while True:
entries = r.xreadgroup(
GROUP_NAME, consumer_name,
{EVENT_STREAM: ">"},
count=5, block=3000
)
if not entries:
continue
for stream_name, messages in entries:
for msg_id, fields in messages:
update_projection(fields)
r.xack(EVENT_STREAM, GROUP_NAME, msg_id)
def update_projection(event: dict):
event_type = event.get("event_type")
order_id = event.get("order_id", "")
if event_type == "OrderCreated":
r.hset(PROJECTION_KEY, order_id, json.dumps({
"status": "created",
"amount": event.get("amount", "0"),
"user_id": event.get("user_id", ""),
}))
elif event_type == "OrderPaid":
existing = r.hget(PROJECTION_KEY, order_id)
if existing:
data = json.loads(existing)
data["status"] = "paid"
r.hset(PROJECTION_KEY, order_id, json.dumps(data))
elif event_type == "OrderShipped":
existing = r.hget(PROJECTION_KEY, order_id)
if existing:
data = json.loads(existing)
data["status"] = "shipped"
r.hset(PROJECTION_KEY, order_id, json.dumps(data))
print(f"[Projection] Updated: {order_id} <- {event_type}")
def query_read_model():
time.sleep(3)
all_orders = r.hgetall(PROJECTION_KEY)
print("\n[Read Model] Order Summary:")
for order_id, data in all_orders.items():
print(f" {order_id}: {data}")
if __name__ == "__main__":
threading.Thread(target=command_side_publish_events, daemon=True).start()
threading.Thread(target=query_side_projector, daemon=True).start()
query_read_model()
Go版本:
package main
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
var ctx = context.Background()
const (
eventStream = "events:orders"
projectionKey = "projection:order_summary"
groupName = "cqrs-projectors"
)
func main() {
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
rdb.XGroupCreateMkStream(ctx, eventStream, groupName, "0")
go commandSidePublishEvents(rdb)
go querySideProjector(rdb)
time.Sleep(4 * time.Second)
queryReadModel(rdb)
}
func commandSidePublishEvents(rdb *redis.Client) {
events := []map[string]interface{}{
{"event_type": "OrderCreated", "order_id": "ORD-001", "amount": "299", "user_id": "USR-001"},
{"event_type": "OrderCreated", "order_id": "ORD-002", "amount": "599", "user_id": "USR-002"},
{"event_type": "OrderPaid", "order_id": "ORD-001", "amount": "299"},
{"event_type": "OrderShipped", "order_id": "ORD-001"},
{"event_type": "OrderCreated", "order_id": "ORD-003", "amount": "199", "user_id": "USR-001"},
}
for _, event := range events {
rdb.XAdd(ctx, &redis.XAddArgs{Stream: eventStream, Values: event})
fmt.Printf("[Command] Published: %s -> %s\n", event["event_type"], event["order_id"])
time.Sleep(300 * time.Millisecond)
}
}
func querySideProjector(rdb *redis.Client) {
for {
results, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: "projector-1",
Streams: []string{eventStream, ">"},
Count: 5,
Block: 3 * time.Second,
}).Result()
if err != nil {
continue
}
for _, stream := range results {
for _, msg := range stream.Messages {
updateProjection(rdb, msg.Values)
rdb.XAck(ctx, eventStream, groupName, msg.ID)
}
}
}
}
func updateProjection(rdb *redis.Client, fields map[string]interface{}) {
eventType, _ := fields["event_type"].(string)
orderID, _ := fields["order_id"].(string)
switch eventType {
case "OrderCreated":
data, _ := json.Marshal(map[string]string{
"status": "created",
"amount": fmt.Sprintf("%v", fields["amount"]),
"user_id": fmt.Sprintf("%v", fields["user_id"]),
})
rdb.HSet(ctx, projectionKey, orderID, string(data))
case "OrderPaid":
existing, _ := rdb.HGet(ctx, projectionKey, orderID).Result()
if existing != "" {
var data map[string]string
json.Unmarshal([]byte(existing), &data)
data["status"] = "paid"
updated, _ := json.Marshal(data)
rdb.HSet(ctx, projectionKey, orderID, string(updated))
}
case "OrderShipped":
existing, _ := rdb.HGet(ctx, projectionKey, orderID).Result()
if existing != "" {
var data map[string]string
json.Unmarshal([]byte(existing), &data)
data["status"] = "shipped"
updated, _ := json.Marshal(data)
rdb.HSet(ctx, projectionKey, orderID, string(updated))
}
}
fmt.Printf("[Projection] Updated: %s <- %s\n", orderID, eventType)
}
func queryReadModel(rdb *redis.Client) {
orders, _ := rdb.HGetAll(ctx, projectionKey).Result()
fmt.Println("\n[Read Model] Order Summary:")
for orderID, data := range orders {
fmt.Printf(" %s: %s\n", orderID, data)
}
}
模式5:死信队列与重试机制
消息处理失败超过重试阈值后转入死信队列,避免无限重试阻塞消费。
import redis
import json
import time
import random
r = redis.Redis(host="localhost", port=6379, decode_responses=True)
MAIN_STREAM = "events:orders"
DLQ_STREAM = "events:orders:dlq"
GROUP_NAME = "order-processors"
MAX_RETRIES = 3
RETRY_DELAYS = [1, 5, 30]
try:
r.xgroup_create(MAIN_STREAM, GROUP_NAME, id="0", mkstream=True)
except redis.ResponseError:
pass
def process_message(fields: dict) -> bool:
order_id = fields.get("order_id", "")
if random.random() < 0.4:
print(f" [FAIL] Processing failed for {order_id}")
return False
print(f" [OK] Processed {order_id}")
return True
def consumer_with_retry(consumer_name: str):
while True:
entries = r.xreadgroup(
GROUP_NAME, consumer_name,
{MAIN_STREAM: ">"},
count=1, block=3000
)
if not entries:
continue
for stream_name, messages in entries:
for msg_id, fields in messages:
success = process_message(fields)
if success:
r.xack(MAIN_STREAM, GROUP_NAME, msg_id)
else:
pending_info = r.xpending_range(
MAIN_STREAM, GROUP_NAME,
min=msg_id, max=msg_id, count=1
)
retry_count = 0
if pending_info:
retry_count = pending_info[0].get("times_delivered", 1) - 1
if retry_count >= MAX_RETRIES:
r.xadd(DLQ_STREAM, {
**fields,
"original_id": msg_id,
"failure_reason": "max_retries_exceeded",
"retry_count": str(retry_count),
})
r.xack(MAIN_STREAM, GROUP_NAME, msg_id)
print(f" [DLQ] Moved to dead letter: {msg_id}")
else:
delay = RETRY_DELAYS[min(retry_count, len(RETRY_DELAYS) - 1)]
print(f" [RETRY] Will retry {msg_id} after {delay}s "
f"(attempt {retry_count + 1}/{MAX_RETRIES})")
r.xack(MAIN_STREAM, GROUP_NAME, msg_id)
time.sleep(delay)
r.xadd(MAIN_STREAM, fields)
def inspect_dlq():
dlq_entries = r.xrange(DLQ_STREAM, "-", "+", count=20)
print(f"\n[DLQ] {len(dlq_entries)} dead letters:")
for msg_id, fields in dlq_entries:
print(f" {msg_id}: order_id={fields.get('order_id')} "
f"retries={fields.get('retry_count')} reason={fields.get('failure_reason')}")
if __name__ == "__main__":
for i in range(1, 11):
r.xadd(MAIN_STREAM, {"order_id": f"ORD-{i:04d}", "amount": str(i * 100)})
random.seed(42)
consumer_with_retry("consumer-1")
Go版本:
package main
import (
"context"
"fmt"
"math/rand"
"time"
"github.com/redis/go-redis/v9"
)
var ctx = context.Background()
const (
mainStream = "events:orders"
dlqStream = "events:orders:dlq"
groupName = "order-processors"
maxRetries = 3
)
var retryDelays = []time.Duration{1 * time.Second, 5 * time.Second, 30 * time.Second}
func main() {
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
rdb.XGroupCreateMkStream(ctx, mainStream, groupName, "0")
for i := 1; i <= 10; i++ {
rdb.XAdd(ctx, &redis.XAddArgs{
Stream: mainStream,
Values: map[string]interface{}{
"order_id": fmt.Sprintf("ORD-%04d", i),
"amount": fmt.Sprintf("%d", i*100),
},
})
}
consumerWithRetry(rdb, "consumer-1")
}
func processMessage(fields map[string]interface{}) bool {
orderID := fmt.Sprintf("%v", fields["order_id"])
if rand.Float64() < 0.4 {
fmt.Printf(" [FAIL] Processing failed for %s\n", orderID)
return false
}
fmt.Printf(" [OK] Processed %s\n", orderID)
return true
}
func consumerWithRetry(rdb *redis.Client, consumerName string) {
for {
results, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: consumerName,
Streams: []string{mainStream, ">"},
Count: 1,
Block: 3 * time.Second,
}).Result()
if err != nil {
continue
}
for _, stream := range results {
for _, msg := range stream.Messages {
if processMessage(msg.Values) {
rdb.XAck(ctx, mainStream, groupName, msg.ID)
} else {
pending, _ := rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: mainStream,
Group: groupName,
Start: msg.ID,
End: msg.ID,
Count: 1,
}).Result()
retryCount := 0
if len(pending) > 0 {
retryCount = int(pending[0].DeliveryCount) - 1
}
if retryCount >= maxRetries {
dlqValues := map[string]interface{}{
"original_id": msg.ID,
"failure_reason": "max_retries_exceeded",
"retry_count": fmt.Sprintf("%d", retryCount),
}
for k, v := range msg.Values {
dlqValues[k] = v
}
rdb.XAdd(ctx, &redis.XAddArgs{Stream: dlqStream, Values: dlqValues})
rdb.XAck(ctx, mainStream, groupName, msg.ID)
fmt.Printf(" [DLQ] Moved to dead letter: %s\n", msg.ID)
} else {
delay := retryDelays[min(retryCount, len(retryDelays)-1)]
fmt.Printf(" [RETRY] Will retry %s after %v (attempt %d/%d)\n",
msg.ID, delay, retryCount+1, maxRetries)
rdb.XAck(ctx, mainStream, groupName, msg.ID)
time.Sleep(delay)
rdb.XAdd(ctx, &redis.XAddArgs{Stream: mainStream, Values: msg.Values})
}
}
}
}
}
}
模式6:生产级事件处理服务(Go实现)
完整的生产级事件处理服务,包含优雅关闭、健康检查、指标监控、XCLAIM故障转移。
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/redis/go-redis/v9"
)
type EventProcessor struct {
rdb *redis.Client
stream string
group string
consumer string
dlqStream string
maxRetries int
processed atomic.Int64
failed atomic.Int64
retried atomic.Int64
mu sync.Mutex
running bool
ctx context.Context
cancel context.CancelFunc
}
func NewEventProcessor(rdb *redis.Client, stream, group, consumer string) *EventProcessor {
ctx, cancel := context.WithCancel(context.Background())
return &EventProcessor{
rdb: rdb,
stream: stream,
group: group,
consumer: consumer,
dlqStream: stream + ":dlq",
maxRetries: 3,
running: true,
ctx: ctx,
cancel: cancel,
}
}
func (p *EventProcessor) Start() {
rdb := p.rdb
rdb.XGroupCreateMkStream(p.ctx, p.stream, p.group, "0")
log.Printf("[Processor] Starting consumer %s for stream %s", p.consumer, p.stream)
go p.claimPendingMessages()
go p.consumeNewMessages()
go p.reportMetrics()
}
func (p *EventProcessor) consumeNewMessages() {
for p.running {
results, err := p.rdb.XReadGroup(p.ctx, &redis.XReadGroupArgs{
Group: p.group,
Consumer: p.consumer,
Streams: []string{p.stream, ">"},
Count: 10,
Block: 2 * time.Second,
}).Result()
if err != nil {
if err == redis.Nil || p.ctx.Err() != nil {
continue
}
log.Printf("[Consumer] Error: %v", err)
continue
}
for _, stream := range results {
for _, msg := range stream.Messages {
p.handleMessage(msg.ID, msg.Values)
}
}
}
}
func (p *EventProcessor) claimPendingMessages() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
pending, err := p.rdb.XPendingExt(p.ctx, &redis.XPendingExtArgs{
Stream: p.stream,
Group: p.group,
Start: "-",
End: "+",
Count: 10,
Idle: 60 * time.Second,
}).Result()
if err != nil {
continue
}
for _, msg := range pending {
claimed, err := p.rdb.XClaim(p.ctx, &redis.XClaimArgs{
Stream: p.stream,
Group: p.group,
Consumer: p.consumer,
MinIdle: 60 * time.Second,
Messages: []string{msg.ID},
}).Result()
if err == nil && len(claimed) > 0 {
log.Printf("[Claim] Claimed pending message: %s", claimed[0].ID)
p.retried.Add(1)
p.handleMessage(claimed[0].ID, claimed[0].Values)
}
}
case <-p.ctx.Done():
return
}
}
}
func (p *EventProcessor) handleMessage(msgID string, values map[string]interface{}) {
err := p.processEvent(values)
if err != nil {
p.failed.Add(1)
pending, _ := p.rdb.XPendingExt(p.ctx, &redis.XPendingExtArgs{
Stream: p.stream,
Group: p.group,
Start: msgID,
End: msgID,
Count: 1,
}).Result()
deliveryCount := 1
if len(pending) > 0 {
deliveryCount = int(pending[0].DeliveryCount)
}
if deliveryCount >= p.maxRetries {
dlqValues := map[string]interface{}{
"original_id": msgID,
"failure_reason": err.Error(),
"retry_count": fmt.Sprintf("%d", deliveryCount),
}
for k, v := range values {
dlqValues[k] = v
}
p.rdb.XAdd(p.ctx, &redis.XAddArgs{Stream: p.dlqStream, Values: dlqValues})
p.rdb.XAck(p.ctx, p.stream, p.group, msgID)
log.Printf("[DLQ] Moved %s to dead letter after %d attempts", msgID, deliveryCount)
} else {
log.Printf("[Retry] Message %s failed (attempt %d/%d): %v",
msgID, deliveryCount, p.maxRetries, err)
}
return
}
p.rdb.XAck(p.ctx, p.stream, p.group, msgID)
p.processed.Add(1)
}
func (p *EventProcessor) processEvent(values map[string]interface{}) error {
eventType, _ := values["event_type"].(string)
switch eventType {
case "order_created":
return p.handleOrderCreated(values)
case "payment_completed":
return p.handlePaymentCompleted(values)
default:
log.Printf("[Process] Unknown event type: %s", eventType)
return nil
}
}
func (p *EventProcessor) handleOrderCreated(values map[string]interface{}) error {
orderID := fmt.Sprintf("%v", values["order_id"])
log.Printf("[Process] Order created: %s", orderID)
return nil
}
func (p *EventProcessor) handlePaymentCompleted(values map[string]interface{}) error {
paymentID := fmt.Sprintf("%v", values["payment_id"])
log.Printf("[Process] Payment completed: %s", paymentID)
return nil
}
func (p *EventProcessor) reportMetrics() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
log.Printf("[Metrics] processed=%d failed=%d retried=%d",
p.processed.Load(), p.failed.Load(), p.retried.Load())
case <-p.ctx.Done():
return
}
}
}
func (p *EventProcessor) Stop() {
log.Println("[Processor] Shutting down gracefully...")
p.running = false
p.cancel()
time.Sleep(2 * time.Second)
log.Printf("[Processor] Final metrics: processed=%d failed=%d retried=%d",
p.processed.Load(), p.failed.Load(), p.retried.Load())
}
func main() {
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
processor := NewEventProcessor(rdb, "events:orders", "order-processors", "worker-1")
processor.Start()
for i := 1; i <= 20; i++ {
rdb.XAdd(context.Background(), &redis.XAddArgs{
Stream: "events:orders",
Values: map[string]interface{}{
"event_type": "order_created",
"order_id": fmt.Sprintf("ORD-%04d", i),
"amount": fmt.Sprintf("%d", i*100),
},
})
}
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
processor.Stop()
}
避坑指南:5个常见错误
❌ 坑1:XREAD不使用消费者组,消息被重复消费
# ❌ 错误:多个消费者独立XREAD,每条消息被所有消费者处理
def bad_consumer():
entries = r.xread({STREAM_NAME: "0-0"}, count=10, block=5000)
for _, messages in entries:
for msg_id, fields in messages:
process(fields)
# ✅ 正确:使用XREADGROUP,消费者组保证每条消息只被组内一个消费者处理
def good_consumer():
entries = r.xreadgroup(
GROUP_NAME, "consumer-1",
{STREAM_NAME: ">"},
count=10, block=5000
)
for _, messages in entries:
for msg_id, fields in messages:
process(fields)
r.xack(STREAM_NAME, GROUP_NAME, msg_id)
❌ 坑2:忘记XACK,消息永远留在Pending列表
// ❌ 错误:处理完消息不ACK,消息一直处于Pending状态
func badConsumer(rdb *redis.Client) {
results, _ := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: "consumer-1",
Streams: []string{streamName, ">"},
Count: 10,
}).Result()
for _, stream := range results {
for _, msg := range stream.Messages {
processEvent(msg.Values)
// 忘记ACK!
}
}
}
// ✅ 正确:处理完成后立即ACK
func goodConsumer(rdb *redis.Client) {
results, _ := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: "consumer-1",
Streams: []string{streamName, ">"},
Count: 10,
}).Result()
for _, stream := range results {
for _, msg := range stream.Messages {
if err := processEvent(msg.Values); err == nil {
rdb.XAck(ctx, streamName, groupName, msg.ID)
}
}
}
}
❌ 坑3:Stream无限增长导致内存溢出
# ❌ 错误:只写不删,Stream无限增长
r.xadd("events:orders", {"data": "..."})
# 从不调用XTRIM或XDEL
# ✅ 正确:使用MAXLEN限制Stream长度,或定期XTRIM
r.xadd("events:orders", {"data": "..."}, maxlen=10000, approximate=True)
# 或手动裁剪
r.xtrim("events:orders", maxlen=10000, approximate=True)
❌ 坑4:XCLAIM转移消息时不检查Idle时间
// ❌ 错误:盲目XCLAIM所有Pending消息,可能抢走正在处理的消息
func badClaim(rdb *redis.Client) {
pending, _ := rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: streamName,
Group: groupName,
Start: "-",
End: "+",
Count: 100,
}).Result()
for _, msg := range pending {
rdb.XClaim(ctx, &redis.XClaimArgs{
Stream: streamName,
Group: groupName,
Consumer: "consumer-2",
Messages: []string{msg.ID},
})
}
}
// ✅ 正确:只Claim空闲时间超过阈值的Pending消息
func goodClaim(rdb *redis.Client) {
pending, _ := rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: streamName,
Group: groupName,
Start: "-",
End: "+",
Count: 100,
Idle: 60 * time.Second, // 只Claim空闲超过60秒的
}).Result()
for _, msg := range pending {
rdb.XClaim(ctx, &redis.XClaimArgs{
Stream: streamName,
Group: groupName,
Consumer: "consumer-2",
MinIdle: 60 * time.Second,
Messages: []string{msg.ID},
})
}
}
❌ 坑5:消费者组创建时ID设置错误
# ❌ 错误:用$创建消费者组,只能消费创建后的新消息,历史消息丢失
r.xgroup_create(STREAM_NAME, GROUP_NAME, id="$")
# ✅ 正确:用0创建消费者组,从头消费所有消息
r.xgroup_create(STREAM_NAME, GROUP_NAME, id="0")
# 如果确实只想消费新消息,显式注释说明意图
r.xgroup_create(STREAM_NAME, GROUP_NAME, id="$") # 只消费新消息
错误排查表
| 错误现象 | 可能原因 | 排查命令 | 解决方案 |
|---|---|---|---|
NOGROUP No such key |
Stream或消费者组不存在 | XINFO GROUPS events:orders |
先XGROUP CREATE创建组,加MKSTREAM自动创建Stream |
BUSYGROUP Consumer Group name already exists |
重复创建消费者组 | XINFO GROUPS events:orders |
捕获异常忽略,或先检查组是否存在 |
| 消费者收不到消息 | XREADGROUP使用0-0而非> |
检查代码中Streams参数 | 新消息用>,未确认消息用0-0 |
| Pending消息堆积 | 消费者宕机未ACK | XPENDING events:orders group-name |
使用XCLAIM转移给活跃消费者 |
| Stream内存持续增长 | 未设置MAXLEN/XTRIM | XINFO STREAM events:orders 查看length |
XADD时加MAXLEN ~ 10000或定期XTRIM |
| 消息被重复消费 | 消费者处理完成但ACK失败 | 检查ACK是否在处理逻辑之后 | 确保ACK在业务逻辑成功后执行 |
| XCLAIM后消息仍被原消费者处理 | MinIdle时间设置过短 | XPENDING查看消息空闲时间 |
设置合理的MinIdle阈值(如60秒) |
| 事件溯源回放慢 | 事件数量过大,全量XRANGE | XLEN events:order:ORD-001 |
定期快照+增量事件,用XRANGE按ID范围回放 |
| CQRS读模型不一致 | 投影消费者延迟或宕机 | XINFO CONSUMERS查看消费者延迟 |
监控消费者lag,设置告警阈值 |
XADD返回ID格式错误 |
Redis版本低于5.0 | redis-server --version |
升级Redis至5.0+ |
高级优化
1. Stream分片与分区策略
当单Stream吞吐量不足时,可按业务键分片到多个Stream:
import hashlib
def get_shard_stream(base_stream: str, key: str, shard_count: int = 16) -> str:
shard_index = int(hashlib.md5(key.encode()).hexdigest(), 16) % shard_count
return f"{base_stream}:shard-{shard_index:03d}"
order_id = "ORD-0001"
stream = get_shard_stream("events:orders", order_id)
r.xadd(stream, {"order_id": order_id, "event_type": "order_created"})
2. 消费者Lag监控与告警
func monitorConsumerLag(rdb *redis.Client, stream, group string) {
ticker := time.NewTicker(10 * time.Second)
for range ticker.C {
info, err := rdb.XInfoStream(ctx, stream).Result()
if err != nil {
continue
}
groups, _ := rdb.XInfoGroups(ctx, stream).Result()
for _, g := range groups {
if g.Name == group {
pending := g.Pending
consumers, _ := rdb.XInfoConsumers(ctx, stream, group).Result()
for _, c := range consumers {
if c.Pending > 100 {
log.Printf("[ALERT] Consumer %s has %d pending messages!",
c.Name, c.Pending)
}
if c.Idle > 5*time.Minute {
log.Printf("[WARN] Consumer %s idle for %v", c.Name, c.Idle)
}
}
if pending > 1000 {
log.Printf("[ALERT] Group %s has %d pending messages!", group, pending)
}
_ = info
}
}
}
}
3. 事件快照与增量回放
import json
SNAPSHOT_INTERVAL = 100
def save_snapshot(aggregate_id: str, state: dict, version: int):
snapshot_key = f"snapshot:order:{aggregate_id}"
r.hset(snapshot_key, mapping={
"state": json.dumps(state),
"version": str(version),
"timestamp": str(time.time()),
})
def load_from_snapshot(aggregate_id: str):
snapshot_key = f"snapshot:order:{aggregate_id}"
data = r.hgetall(snapshot_key)
if not data:
return None, 0
state = json.loads(data["state"])
version = int(data["version"])
return state, version
def rebuild_with_snapshot(aggregate_id: str):
state, version = load_from_snapshot(aggregate_id)
stream_key = f"events:order:{aggregate_id}"
if state is None:
events = r.xrange(stream_key, "-", "+")
else:
min_id = f"{version + 1}-0"
events = r.xrange(stream_key, min_id, "+")
for msg_id, fields in events:
state = apply_event(state or {}, fields)
return state
对比:Redis Streams vs 其他消息系统
| 特性 | Redis Streams | Kafka | RabbitMQ | NATS JetStream | Pulsar |
|---|---|---|---|---|---|
| 定位 | 轻量级事件流 | 分布式事件流平台 | 消息代理 | 云原生消息系统 | 分布式消息平台 |
| 部署复杂度 | ★☆☆☆☆ | ★★★★★ | ★★★☆☆ | ★★☆☆☆ | ★★★★☆ |
| 吞吐量 | 10-50万/s | 100万+/s | 5-10万/s | 50-100万/s | 100万+/s |
| 消息持久化 | AOF/RDB | 磁盘日志 | 可选持久化 | 磁盘日志 | BookKeeper |
| 消费者组 | ✅ 原生支持 | ✅ 原生支持 | ❌ 需要插件 | ✅ 原生支持 | ✅ 原生支持 |
| 消息回放 | ✅ XRANGE | ✅ Offset重置 | ❌ 不支持 | ✅ 支持回放 | ✅ 支持回放 |
| Exactly-Once | ❌ At-Least-Once | ✅ 事务支持 | ❌ 需要插件 | ✅ 支持 | ✅ 支持 |
| 死信队列 | ❌ 需手动实现 | ❌ 需手动实现 | ✅ 原生支持 | ✅ 原生支持 | ✅ 原生支持 |
| 分区 | ❌ 需手动分片 | ✅ 自动分区 | ✅ Exchange路由 | ✅ 自动分区 | ✅ 自动分区 |
| 运维成本 | 极低 | 高(ZooKeeper/KRaft) | 中等 | 低 | 高(BookKeeper) |
| 适用场景 | 中小规模微服务事件通信 | 大规模数据流处理 | 企业级消息路由 | 云原生轻量级消息 | 多租户大规模消息 |
总结
Redis Streams在2026年依然是中小规模微服务事件驱动架构的最优轻量级选择。它不需要Kafka那样重的基础设施,也不需要RabbitMQ那样复杂的交换机配置,用6种核心模式就能覆盖从基础生产消费到生产级事件处理的全场景。关键记住三点:一定要用消费者组(XREADGROUP)、处理完必须XACK、Stream必须设置MAXLEN防止内存溢出。当你需要Exactly-Once语义或百万级吞吐时,再考虑升级到Kafka或Pulsar。
推荐工具
本站提供浏览器本地工具,免注册即可试用 →