Redis Streams事件驱动架构实战:从消费者组到事件溯源的6种生产模式

数据库

微服务通信的困境:Kafka太重,Redis Streams刚刚好

订单服务要通知库存扣减、支付服务要广播交易结果、用户注册要触发欢迎邮件——微服务之间的事件通信无处不在。你上Kafka?集群运维、ZooKeeper/KRaft协调、分区再平衡……一个小服务根本扛不住这套基础设施。你用RabbitMQ?交换机绑定、路由键配置、消息确认机制……复杂度也不低。2026年,Redis Streams作为轻量级事件驱动方案,正在成为中小规模微服务通信的首选。

本文将从6种生产模式出发,带你完成基础生产消费→消费者组→事件溯源→CQRS读模型→死信队列→生产级事件处理服务的全链路实战,每一步都有完整可运行的Python和Go代码。


Redis Streams核心概念

概念 说明
Stream Redis 5.0引入的日志型数据结构,类似Kafka Topic,按时间顺序存储消息
XADD 向Stream追加消息,自动生成时间戳ID(如1718432000000-0
XREAD 从Stream读取消息,支持阻塞与非阻塞模式
Consumer Group 消费者组,类似Kafka Consumer Group,确保每条消息只被组内一个消费者处理
XREADGROUP 以消费者组身份读取消息,消息进入Pending状态
XACK 确认消息已处理,从Pending列表移除
Pending Entries List (PEL) 已投递但未确认的消息列表,用于故障恢复和重试
XPENDING 查看消费者组的Pending消息统计
XCLAIM 将Pending消息转移给其他消费者,用于故障转移
Event Sourcing 事件溯源模式,所有状态变更以事件序列持久化,可重建任意时刻状态
CQRS 命令查询职责分离,写模型(事件流)与读模型(投影)独立演进
Dead Letter Queue (DLQ) 死信队列,处理失败超过重试阈值的消息,避免无限重试阻塞消费

问题分析:事件驱动架构的5大挑战

  1. 消息丢失与重复消费:消费者宕机时未ACK的消息可能丢失或被重复投递,需要Exactly-Once或At-Least-Once语义保证
  2. 消费者组负载不均:部分消费者处理慢导致消息堆积,其他消费者空闲,需要动态再平衡和XCLAIM转移
  3. 事件溯源的快照与回放:事件流无限增长,全量回放成本高,需要定期快照+增量事件的组合策略
  4. CQRS读模型一致性:读模型投影是最终一致,用户看到的数据可能滞后,需要版本追踪和补偿机制
  5. 死信与重试风暴:消息处理反复失败导致重试风暴,需要指数退避+死信队列+人工干预的完整策略

分步实操:6种Redis Streams事件驱动模式

模式1:基础Stream生产者/消费者(XADD/XREAD)

最简单的生产消费模型,一个生产者写入Stream,一个消费者读取。

import redis
import json
import time
import threading

r = redis.Redis(host="localhost", port=6379, decode_responses=True)

STREAM_NAME = "events:orders"

def producer():
    for i in range(1, 6):
        event = {
            "event_type": "order_created",
            "order_id": f"ORD-{i:04d}",
            "amount": 100 * i,
            "user_id": f"USR-{i:03d}",
            "timestamp": time.time()
        }
        msg_id = r.xadd(STREAM_NAME, event)
        print(f"[Producer] Added event: {msg_id} -> {event}")
        time.sleep(0.5)

def consumer():
    last_id = "0-0"
    while True:
        entries = r.xread({STREAM_NAME: last_id}, count=5, block=5000)
        if not entries:
            print("[Consumer] No new messages, waiting...")
            continue
        for stream_name, messages in entries:
            for msg_id, fields in messages:
                print(f"[Consumer] Processed: {msg_id} -> {fields}")
                last_id = msg_id

if __name__ == "__main__":
    threading.Thread(target=producer, daemon=True).start()
    consumer()

Go版本:

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/redis/go-redis/v9"
)

var ctx = context.Background()

func main() {
	rdb := redis.NewClient(&redis.Options{
		Addr: "localhost:6379",
	})
	streamName := "events:orders"

	go producer(rdb, streamName)
	consumer(rdb, streamName)
}

func producer(rdb *redis.Client, streamName string) {
	for i := 1; i <= 5; i++ {
		values := map[string]interface{}{
			"event_type": "order_created",
			"order_id":   fmt.Sprintf("ORD-%04d", i),
			"amount":     fmt.Sprintf("%d", 100*i),
			"user_id":    fmt.Sprintf("USR-%03d", i),
			"timestamp":  fmt.Sprintf("%d", time.Now().UnixMilli()),
		}
		msgID, err := rdb.XAdd(ctx, &redis.XAddArgs{
			Stream: streamName,
			Values: values,
		}).Result()
		if err != nil {
			fmt.Printf("[Producer] Error: %v\n", err)
			continue
		}
		fmt.Printf("[Producer] Added event: %s\n", msgID)
		time.Sleep(500 * time.Millisecond)
	}
}

func consumer(rdb *redis.Client, streamName string) {
	lastID := "0-0"
	for {
		results, err := rdb.XRead(ctx, &redis.XReadArgs{
			Streams: []string{streamName, lastID},
			Count:   5,
			Block:   5 * time.Second,
		}).Result()
		if err != nil {
			if err == redis.Nil {
				fmt.Println("[Consumer] No new messages, waiting...")
				continue
			}
			fmt.Printf("[Consumer] Error: %v\n", err)
			continue
		}
		for _, stream := range results {
			for _, msg := range stream.Messages {
				fmt.Printf("[Consumer] Processed: %s -> %v\n", msg.ID, msg.Values)
				lastID = msg.ID
			}
		}
	}
}

模式2:消费者组(XREADGROUP)

消费者组确保每条消息只被组内一个消费者处理,类似Kafka的Consumer Group。

import redis
import time
import threading
import random

r = redis.Redis(host="localhost", port=6379, decode_responses=True)

STREAM_NAME = "events:payments"
GROUP_NAME = "payment-processors"

try:
    r.xgroup_create(STREAM_NAME, GROUP_NAME, id="0", mkstream=True)
    print(f"[Init] Created consumer group: {GROUP_NAME}")
except redis.ResponseError as e:
    if "BUSYGROUP" in str(e):
        print(f"[Init] Group already exists: {GROUP_NAME}")
    else:
        raise

def group_consumer(consumer_name: str):
    while True:
        entries = r.xreadgroup(
            GROUP_NAME, consumer_name,
            {STREAM_NAME: ">"},
            count=3, block=3000
        )
        if not entries:
            print(f"[{consumer_name}] No new messages")
            continue
        for stream_name, messages in entries:
            for msg_id, fields in messages:
                process_time = random.uniform(0.1, 0.5)
                time.sleep(process_time)
                r.xack(STREAM_NAME, GROUP_NAME, msg_id)
                print(f"[{consumer_name}] ACKed: {msg_id} -> {fields}")

def check_pending():
    pending = r.xpending_range(STREAM_NAME, GROUP_NAME, min="-", max="+", count=10)
    if pending:
        print(f"[Monitor] Pending messages: {len(pending)}")
        for p in pending:
            print(f"  msg_id={p['message_id']}, consumer={p['consumer']}, "
                  f"delivered={p['times_delivered']} times")

if __name__ == "__main__":
    for i in range(1, 4):
        for j in range(1, 3):
            r.xadd(STREAM_NAME, {
                "event_type": "payment_completed",
                "payment_id": f"PAY-{i:02d}{j:02d}",
                "amount": str(random.randint(50, 500)),
            })

    threading.Thread(target=group_consumer, args=("consumer-1",), daemon=True).start()
    threading.Thread(target=group_consumer, args=("consumer-2",), daemon=True).start()

    time.sleep(1)
    check_pending()
    time.sleep(5)

Go版本:

package main

import (
	"context"
	"fmt"
	"math/rand"
	"time"

	"github.com/redis/go-redis/v9"
)

var ctx = context.Background()

func main() {
	rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
	streamName := "events:payments"
	groupName := "payment-processors"

	err := rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()
	if err != nil {
		fmt.Printf("[Init] Group create: %v\n", err)
	} else {
		fmt.Printf("[Init] Created consumer group: %s\n", groupName)
	}

	for i := 1; i <= 3; i++ {
		for j := 1; j <= 2; j++ {
			rdb.XAdd(ctx, &redis.XAddArgs{
				Stream: streamName,
				Values: map[string]interface{}{
					"event_type": "payment_completed",
					"payment_id": fmt.Sprintf("PAY-%02d%02d", i, j),
					"amount":     fmt.Sprintf("%d", rand.Intn(450)+50),
				},
			})
		}
	}

	go groupConsumer(rdb, streamName, groupName, "consumer-1")
	go groupConsumer(rdb, streamName, groupName, "consumer-2")
	time.Sleep(6 * time.Second)
}

func groupConsumer(rdb *redis.Client, stream, group, consumer string) {
	for {
		results, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
			Group:    group,
			Consumer: consumer,
			Streams:  []string{stream, ">"},
			Count:    3,
			Block:    3 * time.Second,
		}).Result()
		if err != nil {
			if err == redis.Nil {
				fmt.Printf("[%s] No new messages\n", consumer)
				continue
			}
			fmt.Printf("[%s] Error: %v\n", consumer, err)
			continue
		}
		for _, stream := range results {
			for _, msg := range stream.Messages {
				fmt.Printf("[%s] Processing: %s -> %v\n", consumer, msg.ID, msg.Values)
				rdb.XAck(ctx, stream, group, msg.ID)
				fmt.Printf("[%s] ACKed: %s\n", consumer, msg.ID)
			}
		}
	}
}

模式3:事件溯源(Event Sourcing)

所有状态变更以事件序列持久化到Stream,可重建任意时刻的聚合根状态。

import redis
import json
from dataclasses import dataclass, field
from typing import List, Dict

r = redis.Redis(host="localhost", port=6379, decode_responses=True)

@dataclass
class OrderState:
    order_id: str = ""
    status: str = "created"
    items: List[Dict] = field(default_factory=list)
    total: float = 0.0
    version: int = 0

class EventStore:
    def __init__(self, aggregate_type: str, aggregate_id: str):
        self.stream_key = f"events:{aggregate_type}:{aggregate_id}"
        self.aggregate_type = aggregate_type
        self.aggregate_id = aggregate_id

    def append(self, event_type: str, data: dict):
        event = {
            "event_type": event_type,
            "aggregate_id": self.aggregate_id,
            "data": json.dumps(data),
            "version": str(self._next_version()),
        }
        msg_id = r.xadd(self.stream_key, event)
        print(f"[EventStore] Appended: {msg_id} type={event_type} version={event['version']}")
        return msg_id

    def _next_version(self) -> int:
        info = r.xinfo_stream(self.stream_key)
        length = info.get("length", 0)
        return length + 1

    def load_events(self) -> List[dict]:
        entries = r.xrange(self.stream_key, "-", "+", count=1000)
        events = []
        for msg_id, fields in entries:
            events.append({
                "id": msg_id,
                "event_type": fields["event_type"],
                "data": json.loads(fields["data"]),
                "version": int(fields["version"]),
            })
        return events

class OrderAggregate:
    def __init__(self, order_id: str):
        self.order_id = order_id
        self.store = EventStore("order", order_id)
        self.state = OrderState(order_id=order_id)

    def create(self, items: List[Dict]):
        total = sum(item["price"] * item["qty"] for item in items)
        self.store.append("OrderCreated", {"items": items, "total": total})

    def add_item(self, item: Dict):
        self.store.append("ItemAdded", item)

    def pay(self):
        self.store.append("OrderPaid", {"payment_method": "credit_card"})

    def ship(self, tracking_number: str):
        self.store.append("OrderShipped", {"tracking_number": tracking_number})

    def rebuild(self) -> OrderState:
        events = self.store.load_events()
        state = OrderState(order_id=self.order_id)
        for event in events:
            self._apply(state, event)
        self.state = state
        return state

    def _apply(self, state: OrderState, event: dict):
        event_type = event["event_type"]
        data = event["data"]
        if event_type == "OrderCreated":
            state.items = data.get("items", [])
            state.total = data.get("total", 0)
            state.status = "created"
        elif event_type == "ItemAdded":
            state.items.append(data)
            state.total += data.get("price", 0) * data.get("qty", 1)
        elif event_type == "OrderPaid":
            state.status = "paid"
        elif event_type == "OrderShipped":
            state.status = "shipped"
        state.version = event["version"]

if __name__ == "__main__":
    order = OrderAggregate("ORD-0001")
    order.create([{"sku": "WIDGET-01", "price": 29.9, "qty": 2}])
    order.add_item({"sku": "GADGET-02", "price": 49.9, "qty": 1})
    order.pay()
    order.ship("SF1234567890")

    rebuilt = order.rebuild()
    print(f"\n[Rebuilt State] order_id={rebuilt.order_id} status={rebuilt.status} "
          f"total={rebuilt.total} items={len(rebuilt.items)} version={rebuilt.version}")

Go版本:

package main

import (
	"context"
	"encoding/json"
	"fmt"

	"github.com/redis/go-redis/v9"
)

var ctx = context.Background()

type OrderState struct {
	OrderID string                   `json:"order_id"`
	Status  string                   `json:"status"`
	Items   []map[string]interface{} `json:"items"`
	Total   float64                  `json:"total"`
	Version int                      `json:"version"`
}

type Event struct {
	ID         string                 `json:"id"`
	EventType  string                 `json:"event_type"`
	Data       map[string]interface{} `json:"data"`
	Version    int                    `json:"version"`
}

type EventStore struct {
	rdb           *redis.Client
	StreamKey     string
	AggregateID   string
}

func NewEventStore(rdb *redis.Client, aggregateType, aggregateID string) *EventStore {
	return &EventStore{
		rdb:         rdb,
		StreamKey:   fmt.Sprintf("events:%s:%s", aggregateType, aggregateID),
		AggregateID: aggregateID,
	}
}

func (s *EventStore) Append(eventType string, data map[string]interface{}) (string, error) {
	info, err := s.rdb.XInfoStream(ctx, s.StreamKey).Result()
	version := 1
	if err == nil {
		version = int(info.Length) + 1
	}
	dataJSON, _ := json.Marshal(data)
	msgID, err := s.rdb.XAdd(ctx, &redis.XAddArgs{
		Stream: s.StreamKey,
		Values: map[string]interface{}{
			"event_type":   eventType,
			"aggregate_id": s.AggregateID,
			"data":         string(dataJSON),
			"version":      fmt.Sprintf("%d", version),
		},
	}).Result()
	if err != nil {
		return "", err
	}
	fmt.Printf("[EventStore] Appended: %s type=%s version=%d\n", msgID, eventType, version)
	return msgID, nil
}

func (s *EventStore) LoadEvents() ([]Event, error) {
	entries, err := s.rdb.XRange(ctx, s.StreamKey, "-", "+").Result()
	if err != nil {
		return nil, err
	}
	var events []Event
	for _, msg := range entries {
		var data map[string]interface{}
		json.Unmarshal([]byte(msg.Values["data"].(string)), &data)
		var version int
		fmt.Sscanf(msg.Values["version"].(string), "%d", &version)
		events = append(events, Event{
			ID:        msg.ID,
			EventType: msg.Values["event_type"].(string),
			Data:      data,
			Version:   version,
		})
	}
	return events, nil
}

type OrderAggregate struct {
	OrderID string
	Store   *EventStore
	State   OrderState
}

func NewOrderAggregate(rdb *redis.Client, orderID string) *OrderAggregate {
	return &OrderAggregate{
		OrderID: orderID,
		Store:   NewEventStore(rdb, "order", orderID),
		State:   OrderState{OrderID: orderID, Status: "created"},
	}
}

func (a *OrderAggregate) Create(items []map[string]interface{}) {
	total := 0.0
	for _, item := range items {
		price, _ := item["price"].(float64)
		qty, _ := item["qty"].(float64)
		total += price * qty
	}
	a.Store.Append("OrderCreated", map[string]interface{}{
		"items": items, "total": total,
	})
}

func (a *OrderAggregate) Pay() {
	a.Store.Append("OrderPaid", map[string]interface{}{
		"payment_method": "credit_card",
	})
}

func (a *OrderAggregate) Ship(trackingNumber string) {
	a.Store.Append("OrderShipped", map[string]interface{}{
		"tracking_number": trackingNumber,
	})
}

func (a *OrderAggregate) Rebuild() (*OrderState, error) {
	events, err := a.Store.LoadEvents()
	if err != nil {
		return nil, err
	}
	state := OrderState{OrderID: a.OrderID, Status: "created"}
	for _, event := range events {
		switch event.EventType {
		case "OrderCreated":
			if items, ok := event.Data["items"].([]interface{}); ok {
				for _, item := range items {
					if m, ok := item.(map[string]interface{}); ok {
						state.Items = append(state.Items, m)
					}
				}
			}
			if total, ok := event.Data["total"].(float64); ok {
				state.Total = total
			}
			state.Status = "created"
		case "OrderPaid":
			state.Status = "paid"
		case "OrderShipped":
			state.Status = "shipped"
		}
		state.Version = event.Version
	}
	a.State = state
	return &state, nil
}

func main() {
	rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
	order := NewOrderAggregate(rdb, "ORD-0001")
	order.Create([]map[string]interface{}{
		{"sku": "WIDGET-01", "price": 29.9, "qty": 2},
	})
	order.Pay()
	order.Ship("SF1234567890")

	state, _ := order.Rebuild()
	fmt.Printf("\n[Rebuilt State] order_id=%s status=%s total=%.2f items=%d version=%d\n",
		state.OrderID, state.Status, state.Total, len(state.Items), state.Version)
}

模式4:CQRS读模型投影

写端事件流写入Stream,读端监听Stream并更新投影(读模型),实现命令查询职责分离。

import redis
import json
import time
import threading

r = redis.Redis(host="localhost", port=6379, decode_responses=True)

EVENT_STREAM = "events:orders"
PROJECTION_KEY = "projection:order_summary"
GROUP_NAME = "cqrs-projectors"

try:
    r.xgroup_create(EVENT_STREAM, GROUP_NAME, id="0", mkstream=True)
except redis.ResponseError:
    pass

def command_side_publish_events():
    events = [
        {"event_type": "OrderCreated", "order_id": "ORD-001", "amount": "299", "user_id": "USR-001"},
        {"event_type": "OrderCreated", "order_id": "ORD-002", "amount": "599", "user_id": "USR-002"},
        {"event_type": "OrderPaid", "order_id": "ORD-001", "amount": "299"},
        {"event_type": "OrderShipped", "order_id": "ORD-001"},
        {"event_type": "OrderCreated", "order_id": "ORD-003", "amount": "199", "user_id": "USR-001"},
    ]
    for event in events:
        r.xadd(EVENT_STREAM, event)
        print(f"[Command] Published: {event['event_type']} -> {event.get('order_id')}")
        time.sleep(0.3)

def query_side_projector():
    consumer_name = "projector-1"
    while True:
        entries = r.xreadgroup(
            GROUP_NAME, consumer_name,
            {EVENT_STREAM: ">"},
            count=5, block=3000
        )
        if not entries:
            continue
        for stream_name, messages in entries:
            for msg_id, fields in messages:
                update_projection(fields)
                r.xack(EVENT_STREAM, GROUP_NAME, msg_id)

def update_projection(event: dict):
    event_type = event.get("event_type")
    order_id = event.get("order_id", "")
    if event_type == "OrderCreated":
        r.hset(PROJECTION_KEY, order_id, json.dumps({
            "status": "created",
            "amount": event.get("amount", "0"),
            "user_id": event.get("user_id", ""),
        }))
    elif event_type == "OrderPaid":
        existing = r.hget(PROJECTION_KEY, order_id)
        if existing:
            data = json.loads(existing)
            data["status"] = "paid"
            r.hset(PROJECTION_KEY, order_id, json.dumps(data))
    elif event_type == "OrderShipped":
        existing = r.hget(PROJECTION_KEY, order_id)
        if existing:
            data = json.loads(existing)
            data["status"] = "shipped"
            r.hset(PROJECTION_KEY, order_id, json.dumps(data))
    print(f"[Projection] Updated: {order_id} <- {event_type}")

def query_read_model():
    time.sleep(3)
    all_orders = r.hgetall(PROJECTION_KEY)
    print("\n[Read Model] Order Summary:")
    for order_id, data in all_orders.items():
        print(f"  {order_id}: {data}")

if __name__ == "__main__":
    threading.Thread(target=command_side_publish_events, daemon=True).start()
    threading.Thread(target=query_side_projector, daemon=True).start()
    query_read_model()

Go版本:

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"time"

	"github.com/redis/go-redis/v9"
)

var ctx = context.Background()

const (
	eventStream   = "events:orders"
	projectionKey = "projection:order_summary"
	groupName     = "cqrs-projectors"
)

func main() {
	rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
	rdb.XGroupCreateMkStream(ctx, eventStream, groupName, "0")

	go commandSidePublishEvents(rdb)
	go querySideProjector(rdb)
	time.Sleep(4 * time.Second)
	queryReadModel(rdb)
}

func commandSidePublishEvents(rdb *redis.Client) {
	events := []map[string]interface{}{
		{"event_type": "OrderCreated", "order_id": "ORD-001", "amount": "299", "user_id": "USR-001"},
		{"event_type": "OrderCreated", "order_id": "ORD-002", "amount": "599", "user_id": "USR-002"},
		{"event_type": "OrderPaid", "order_id": "ORD-001", "amount": "299"},
		{"event_type": "OrderShipped", "order_id": "ORD-001"},
		{"event_type": "OrderCreated", "order_id": "ORD-003", "amount": "199", "user_id": "USR-001"},
	}
	for _, event := range events {
		rdb.XAdd(ctx, &redis.XAddArgs{Stream: eventStream, Values: event})
		fmt.Printf("[Command] Published: %s -> %s\n", event["event_type"], event["order_id"])
		time.Sleep(300 * time.Millisecond)
	}
}

func querySideProjector(rdb *redis.Client) {
	for {
		results, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
			Group:    groupName,
			Consumer: "projector-1",
			Streams:  []string{eventStream, ">"},
			Count:    5,
			Block:    3 * time.Second,
		}).Result()
		if err != nil {
			continue
		}
		for _, stream := range results {
			for _, msg := range stream.Messages {
				updateProjection(rdb, msg.Values)
				rdb.XAck(ctx, eventStream, groupName, msg.ID)
			}
		}
	}
}

func updateProjection(rdb *redis.Client, fields map[string]interface{}) {
	eventType, _ := fields["event_type"].(string)
	orderID, _ := fields["order_id"].(string)
	switch eventType {
	case "OrderCreated":
		data, _ := json.Marshal(map[string]string{
			"status":  "created",
			"amount":  fmt.Sprintf("%v", fields["amount"]),
			"user_id": fmt.Sprintf("%v", fields["user_id"]),
		})
		rdb.HSet(ctx, projectionKey, orderID, string(data))
	case "OrderPaid":
		existing, _ := rdb.HGet(ctx, projectionKey, orderID).Result()
		if existing != "" {
			var data map[string]string
			json.Unmarshal([]byte(existing), &data)
			data["status"] = "paid"
			updated, _ := json.Marshal(data)
			rdb.HSet(ctx, projectionKey, orderID, string(updated))
		}
	case "OrderShipped":
		existing, _ := rdb.HGet(ctx, projectionKey, orderID).Result()
		if existing != "" {
			var data map[string]string
			json.Unmarshal([]byte(existing), &data)
			data["status"] = "shipped"
			updated, _ := json.Marshal(data)
			rdb.HSet(ctx, projectionKey, orderID, string(updated))
		}
	}
	fmt.Printf("[Projection] Updated: %s <- %s\n", orderID, eventType)
}

func queryReadModel(rdb *redis.Client) {
	orders, _ := rdb.HGetAll(ctx, projectionKey).Result()
	fmt.Println("\n[Read Model] Order Summary:")
	for orderID, data := range orders {
		fmt.Printf("  %s: %s\n", orderID, data)
	}
}

模式5:死信队列与重试机制

消息处理失败超过重试阈值后转入死信队列,避免无限重试阻塞消费。

import redis
import json
import time
import random

r = redis.Redis(host="localhost", port=6379, decode_responses=True)

MAIN_STREAM = "events:orders"
DLQ_STREAM = "events:orders:dlq"
GROUP_NAME = "order-processors"
MAX_RETRIES = 3
RETRY_DELAYS = [1, 5, 30]

try:
    r.xgroup_create(MAIN_STREAM, GROUP_NAME, id="0", mkstream=True)
except redis.ResponseError:
    pass

def process_message(fields: dict) -> bool:
    order_id = fields.get("order_id", "")
    if random.random() < 0.4:
        print(f"  [FAIL] Processing failed for {order_id}")
        return False
    print(f"  [OK] Processed {order_id}")
    return True

def consumer_with_retry(consumer_name: str):
    while True:
        entries = r.xreadgroup(
            GROUP_NAME, consumer_name,
            {MAIN_STREAM: ">"},
            count=1, block=3000
        )
        if not entries:
            continue
        for stream_name, messages in entries:
            for msg_id, fields in messages:
                success = process_message(fields)
                if success:
                    r.xack(MAIN_STREAM, GROUP_NAME, msg_id)
                else:
                    pending_info = r.xpending_range(
                        MAIN_STREAM, GROUP_NAME,
                        min=msg_id, max=msg_id, count=1
                    )
                    retry_count = 0
                    if pending_info:
                        retry_count = pending_info[0].get("times_delivered", 1) - 1
                    if retry_count >= MAX_RETRIES:
                        r.xadd(DLQ_STREAM, {
                            **fields,
                            "original_id": msg_id,
                            "failure_reason": "max_retries_exceeded",
                            "retry_count": str(retry_count),
                        })
                        r.xack(MAIN_STREAM, GROUP_NAME, msg_id)
                        print(f"  [DLQ] Moved to dead letter: {msg_id}")
                    else:
                        delay = RETRY_DELAYS[min(retry_count, len(RETRY_DELAYS) - 1)]
                        print(f"  [RETRY] Will retry {msg_id} after {delay}s "
                              f"(attempt {retry_count + 1}/{MAX_RETRIES})")
                        r.xack(MAIN_STREAM, GROUP_NAME, msg_id)
                        time.sleep(delay)
                        r.xadd(MAIN_STREAM, fields)

def inspect_dlq():
    dlq_entries = r.xrange(DLQ_STREAM, "-", "+", count=20)
    print(f"\n[DLQ] {len(dlq_entries)} dead letters:")
    for msg_id, fields in dlq_entries:
        print(f"  {msg_id}: order_id={fields.get('order_id')} "
              f"retries={fields.get('retry_count')} reason={fields.get('failure_reason')}")

if __name__ == "__main__":
    for i in range(1, 11):
        r.xadd(MAIN_STREAM, {"order_id": f"ORD-{i:04d}", "amount": str(i * 100)})

    random.seed(42)
    consumer_with_retry("consumer-1")

Go版本:

package main

import (
	"context"
	"fmt"
	"math/rand"
	"time"

	"github.com/redis/go-redis/v9"
)

var ctx = context.Background()

const (
	mainStream  = "events:orders"
	dlqStream   = "events:orders:dlq"
	groupName   = "order-processors"
	maxRetries  = 3
)

var retryDelays = []time.Duration{1 * time.Second, 5 * time.Second, 30 * time.Second}

func main() {
	rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
	rdb.XGroupCreateMkStream(ctx, mainStream, groupName, "0")

	for i := 1; i <= 10; i++ {
		rdb.XAdd(ctx, &redis.XAddArgs{
			Stream: mainStream,
			Values: map[string]interface{}{
				"order_id": fmt.Sprintf("ORD-%04d", i),
				"amount":   fmt.Sprintf("%d", i*100),
			},
		})
	}

	consumerWithRetry(rdb, "consumer-1")
}

func processMessage(fields map[string]interface{}) bool {
	orderID := fmt.Sprintf("%v", fields["order_id"])
	if rand.Float64() < 0.4 {
		fmt.Printf("  [FAIL] Processing failed for %s\n", orderID)
		return false
	}
	fmt.Printf("  [OK] Processed %s\n", orderID)
	return true
}

func consumerWithRetry(rdb *redis.Client, consumerName string) {
	for {
		results, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
			Group:    groupName,
			Consumer: consumerName,
			Streams:  []string{mainStream, ">"},
			Count:    1,
			Block:    3 * time.Second,
		}).Result()
		if err != nil {
			continue
		}
		for _, stream := range results {
			for _, msg := range stream.Messages {
				if processMessage(msg.Values) {
					rdb.XAck(ctx, mainStream, groupName, msg.ID)
				} else {
					pending, _ := rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
						Stream: mainStream,
						Group:  groupName,
						Start:  msg.ID,
						End:    msg.ID,
						Count:  1,
					}).Result()
					retryCount := 0
					if len(pending) > 0 {
						retryCount = int(pending[0].DeliveryCount) - 1
					}
					if retryCount >= maxRetries {
						dlqValues := map[string]interface{}{
							"original_id":    msg.ID,
							"failure_reason": "max_retries_exceeded",
							"retry_count":    fmt.Sprintf("%d", retryCount),
						}
						for k, v := range msg.Values {
							dlqValues[k] = v
						}
						rdb.XAdd(ctx, &redis.XAddArgs{Stream: dlqStream, Values: dlqValues})
						rdb.XAck(ctx, mainStream, groupName, msg.ID)
						fmt.Printf("  [DLQ] Moved to dead letter: %s\n", msg.ID)
					} else {
						delay := retryDelays[min(retryCount, len(retryDelays)-1)]
						fmt.Printf("  [RETRY] Will retry %s after %v (attempt %d/%d)\n",
							msg.ID, delay, retryCount+1, maxRetries)
						rdb.XAck(ctx, mainStream, groupName, msg.ID)
						time.Sleep(delay)
						rdb.XAdd(ctx, &redis.XAddArgs{Stream: mainStream, Values: msg.Values})
					}
				}
			}
		}
	}
}

模式6:生产级事件处理服务(Go实现)

完整的生产级事件处理服务,包含优雅关闭、健康检查、指标监控、XCLAIM故障转移。

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"os/signal"
	"sync"
	"sync/atomic"
	"syscall"
	"time"

	"github.com/redis/go-redis/v9"
)

type EventProcessor struct {
	rdb          *redis.Client
	stream       string
	group        string
	consumer     string
	dlqStream    string
	maxRetries   int
	processed    atomic.Int64
	failed       atomic.Int64
	retried      atomic.Int64
	mu           sync.Mutex
	running      bool
	ctx          context.Context
	cancel       context.CancelFunc
}

func NewEventProcessor(rdb *redis.Client, stream, group, consumer string) *EventProcessor {
	ctx, cancel := context.WithCancel(context.Background())
	return &EventProcessor{
		rdb:        rdb,
		stream:     stream,
		group:      group,
		consumer:   consumer,
		dlqStream:  stream + ":dlq",
		maxRetries: 3,
		running:    true,
		ctx:        ctx,
		cancel:     cancel,
	}
}

func (p *EventProcessor) Start() {
	rdb := p.rdb
	rdb.XGroupCreateMkStream(p.ctx, p.stream, p.group, "0")

	log.Printf("[Processor] Starting consumer %s for stream %s", p.consumer, p.stream)

	go p.claimPendingMessages()
	go p.consumeNewMessages()
	go p.reportMetrics()
}

func (p *EventProcessor) consumeNewMessages() {
	for p.running {
		results, err := p.rdb.XReadGroup(p.ctx, &redis.XReadGroupArgs{
			Group:    p.group,
			Consumer: p.consumer,
			Streams:  []string{p.stream, ">"},
			Count:    10,
			Block:    2 * time.Second,
		}).Result()
		if err != nil {
			if err == redis.Nil || p.ctx.Err() != nil {
				continue
			}
			log.Printf("[Consumer] Error: %v", err)
			continue
		}
		for _, stream := range results {
			for _, msg := range stream.Messages {
				p.handleMessage(msg.ID, msg.Values)
			}
		}
	}
}

func (p *EventProcessor) claimPendingMessages() {
	ticker := time.NewTicker(30 * time.Second)
	defer ticker.Stop()
	for {
		select {
		case <-ticker.C:
			pending, err := p.rdb.XPendingExt(p.ctx, &redis.XPendingExtArgs{
				Stream: p.stream,
				Group:  p.group,
				Start:  "-",
				End:    "+",
				Count:  10,
				Idle:   60 * time.Second,
			}).Result()
			if err != nil {
				continue
			}
			for _, msg := range pending {
				claimed, err := p.rdb.XClaim(p.ctx, &redis.XClaimArgs{
					Stream:   p.stream,
					Group:    p.group,
					Consumer: p.consumer,
					MinIdle:  60 * time.Second,
					Messages: []string{msg.ID},
				}).Result()
				if err == nil && len(claimed) > 0 {
					log.Printf("[Claim] Claimed pending message: %s", claimed[0].ID)
					p.retried.Add(1)
					p.handleMessage(claimed[0].ID, claimed[0].Values)
				}
			}
		case <-p.ctx.Done():
			return
		}
	}
}

func (p *EventProcessor) handleMessage(msgID string, values map[string]interface{}) {
	err := p.processEvent(values)
	if err != nil {
		p.failed.Add(1)
		pending, _ := p.rdb.XPendingExt(p.ctx, &redis.XPendingExtArgs{
			Stream: p.stream,
			Group:  p.group,
			Start:  msgID,
			End:    msgID,
			Count:  1,
		}).Result()
		deliveryCount := 1
		if len(pending) > 0 {
			deliveryCount = int(pending[0].DeliveryCount)
		}
		if deliveryCount >= p.maxRetries {
			dlqValues := map[string]interface{}{
				"original_id":    msgID,
				"failure_reason": err.Error(),
				"retry_count":    fmt.Sprintf("%d", deliveryCount),
			}
			for k, v := range values {
				dlqValues[k] = v
			}
			p.rdb.XAdd(p.ctx, &redis.XAddArgs{Stream: p.dlqStream, Values: dlqValues})
			p.rdb.XAck(p.ctx, p.stream, p.group, msgID)
			log.Printf("[DLQ] Moved %s to dead letter after %d attempts", msgID, deliveryCount)
		} else {
			log.Printf("[Retry] Message %s failed (attempt %d/%d): %v",
				msgID, deliveryCount, p.maxRetries, err)
		}
		return
	}
	p.rdb.XAck(p.ctx, p.stream, p.group, msgID)
	p.processed.Add(1)
}

func (p *EventProcessor) processEvent(values map[string]interface{}) error {
	eventType, _ := values["event_type"].(string)
	switch eventType {
	case "order_created":
		return p.handleOrderCreated(values)
	case "payment_completed":
		return p.handlePaymentCompleted(values)
	default:
		log.Printf("[Process] Unknown event type: %s", eventType)
		return nil
	}
}

func (p *EventProcessor) handleOrderCreated(values map[string]interface{}) error {
	orderID := fmt.Sprintf("%v", values["order_id"])
	log.Printf("[Process] Order created: %s", orderID)
	return nil
}

func (p *EventProcessor) handlePaymentCompleted(values map[string]interface{}) error {
	paymentID := fmt.Sprintf("%v", values["payment_id"])
	log.Printf("[Process] Payment completed: %s", paymentID)
	return nil
}

func (p *EventProcessor) reportMetrics() {
	ticker := time.NewTicker(5 * time.Second)
	defer ticker.Stop()
	for {
		select {
		case <-ticker.C:
			log.Printf("[Metrics] processed=%d failed=%d retried=%d",
				p.processed.Load(), p.failed.Load(), p.retried.Load())
		case <-p.ctx.Done():
			return
		}
	}
}

func (p *EventProcessor) Stop() {
	log.Println("[Processor] Shutting down gracefully...")
	p.running = false
	p.cancel()
	time.Sleep(2 * time.Second)
	log.Printf("[Processor] Final metrics: processed=%d failed=%d retried=%d",
		p.processed.Load(), p.failed.Load(), p.retried.Load())
}

func main() {
	rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
	processor := NewEventProcessor(rdb, "events:orders", "order-processors", "worker-1")
	processor.Start()

	for i := 1; i <= 20; i++ {
		rdb.XAdd(context.Background(), &redis.XAddArgs{
			Stream: "events:orders",
			Values: map[string]interface{}{
				"event_type": "order_created",
				"order_id":   fmt.Sprintf("ORD-%04d", i),
				"amount":     fmt.Sprintf("%d", i*100),
			},
		})
	}

	sigCh := make(chan os.Signal, 1)
	signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
	<-sigCh
	processor.Stop()
}

避坑指南:5个常见错误

❌ 坑1:XREAD不使用消费者组,消息被重复消费

# ❌ 错误:多个消费者独立XREAD,每条消息被所有消费者处理
def bad_consumer():
    entries = r.xread({STREAM_NAME: "0-0"}, count=10, block=5000)
    for _, messages in entries:
        for msg_id, fields in messages:
            process(fields)
# ✅ 正确:使用XREADGROUP,消费者组保证每条消息只被组内一个消费者处理
def good_consumer():
    entries = r.xreadgroup(
        GROUP_NAME, "consumer-1",
        {STREAM_NAME: ">"},
        count=10, block=5000
    )
    for _, messages in entries:
        for msg_id, fields in messages:
            process(fields)
            r.xack(STREAM_NAME, GROUP_NAME, msg_id)

❌ 坑2:忘记XACK,消息永远留在Pending列表

// ❌ 错误:处理完消息不ACK,消息一直处于Pending状态
func badConsumer(rdb *redis.Client) {
    results, _ := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
        Group:    groupName,
        Consumer: "consumer-1",
        Streams:  []string{streamName, ">"},
        Count:    10,
    }).Result()
    for _, stream := range results {
        for _, msg := range stream.Messages {
            processEvent(msg.Values)
            // 忘记ACK!
        }
    }
}
// ✅ 正确:处理完成后立即ACK
func goodConsumer(rdb *redis.Client) {
    results, _ := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
        Group:    groupName,
        Consumer: "consumer-1",
        Streams:  []string{streamName, ">"},
        Count:    10,
    }).Result()
    for _, stream := range results {
        for _, msg := range stream.Messages {
            if err := processEvent(msg.Values); err == nil {
                rdb.XAck(ctx, streamName, groupName, msg.ID)
            }
        }
    }
}

❌ 坑3:Stream无限增长导致内存溢出

# ❌ 错误:只写不删,Stream无限增长
r.xadd("events:orders", {"data": "..."})
# 从不调用XTRIM或XDEL
# ✅ 正确:使用MAXLEN限制Stream长度,或定期XTRIM
r.xadd("events:orders", {"data": "..."}, maxlen=10000, approximate=True)
# 或手动裁剪
r.xtrim("events:orders", maxlen=10000, approximate=True)

❌ 坑4:XCLAIM转移消息时不检查Idle时间

// ❌ 错误:盲目XCLAIM所有Pending消息,可能抢走正在处理的消息
func badClaim(rdb *redis.Client) {
    pending, _ := rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
        Stream: streamName,
        Group:  groupName,
        Start:  "-",
        End:    "+",
        Count:  100,
    }).Result()
    for _, msg := range pending {
        rdb.XClaim(ctx, &redis.XClaimArgs{
            Stream:   streamName,
            Group:    groupName,
            Consumer: "consumer-2",
            Messages: []string{msg.ID},
        })
    }
}
// ✅ 正确:只Claim空闲时间超过阈值的Pending消息
func goodClaim(rdb *redis.Client) {
    pending, _ := rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
        Stream: streamName,
        Group:  groupName,
        Start:  "-",
        End:    "+",
        Count:  100,
        Idle:   60 * time.Second, // 只Claim空闲超过60秒的
    }).Result()
    for _, msg := range pending {
        rdb.XClaim(ctx, &redis.XClaimArgs{
            Stream:   streamName,
            Group:    groupName,
            Consumer: "consumer-2",
            MinIdle:  60 * time.Second,
            Messages: []string{msg.ID},
        })
    }
}

❌ 坑5:消费者组创建时ID设置错误

# ❌ 错误:用$创建消费者组,只能消费创建后的新消息,历史消息丢失
r.xgroup_create(STREAM_NAME, GROUP_NAME, id="$")
# ✅ 正确:用0创建消费者组,从头消费所有消息
r.xgroup_create(STREAM_NAME, GROUP_NAME, id="0")
# 如果确实只想消费新消息,显式注释说明意图
r.xgroup_create(STREAM_NAME, GROUP_NAME, id="$")  # 只消费新消息

错误排查表

错误现象 可能原因 排查命令 解决方案
NOGROUP No such key Stream或消费者组不存在 XINFO GROUPS events:orders XGROUP CREATE创建组,加MKSTREAM自动创建Stream
BUSYGROUP Consumer Group name already exists 重复创建消费者组 XINFO GROUPS events:orders 捕获异常忽略,或先检查组是否存在
消费者收不到消息 XREADGROUP使用0-0而非> 检查代码中Streams参数 新消息用>,未确认消息用0-0
Pending消息堆积 消费者宕机未ACK XPENDING events:orders group-name 使用XCLAIM转移给活跃消费者
Stream内存持续增长 未设置MAXLEN/XTRIM XINFO STREAM events:orders 查看length XADD时加MAXLEN ~ 10000或定期XTRIM
消息被重复消费 消费者处理完成但ACK失败 检查ACK是否在处理逻辑之后 确保ACK在业务逻辑成功后执行
XCLAIM后消息仍被原消费者处理 MinIdle时间设置过短 XPENDING查看消息空闲时间 设置合理的MinIdle阈值(如60秒)
事件溯源回放慢 事件数量过大,全量XRANGE XLEN events:order:ORD-001 定期快照+增量事件,用XRANGE按ID范围回放
CQRS读模型不一致 投影消费者延迟或宕机 XINFO CONSUMERS查看消费者延迟 监控消费者lag,设置告警阈值
XADD返回ID格式错误 Redis版本低于5.0 redis-server --version 升级Redis至5.0+

高级优化

1. Stream分片与分区策略

当单Stream吞吐量不足时,可按业务键分片到多个Stream:

import hashlib

def get_shard_stream(base_stream: str, key: str, shard_count: int = 16) -> str:
    shard_index = int(hashlib.md5(key.encode()).hexdigest(), 16) % shard_count
    return f"{base_stream}:shard-{shard_index:03d}"

order_id = "ORD-0001"
stream = get_shard_stream("events:orders", order_id)
r.xadd(stream, {"order_id": order_id, "event_type": "order_created"})

2. 消费者Lag监控与告警

func monitorConsumerLag(rdb *redis.Client, stream, group string) {
    ticker := time.NewTicker(10 * time.Second)
    for range ticker.C {
        info, err := rdb.XInfoStream(ctx, stream).Result()
        if err != nil {
            continue
        }
        groups, _ := rdb.XInfoGroups(ctx, stream).Result()
        for _, g := range groups {
            if g.Name == group {
                pending := g.Pending
                consumers, _ := rdb.XInfoConsumers(ctx, stream, group).Result()
                for _, c := range consumers {
                    if c.Pending > 100 {
                        log.Printf("[ALERT] Consumer %s has %d pending messages!",
                            c.Name, c.Pending)
                    }
                    if c.Idle > 5*time.Minute {
                        log.Printf("[WARN] Consumer %s idle for %v", c.Name, c.Idle)
                    }
                }
                if pending > 1000 {
                    log.Printf("[ALERT] Group %s has %d pending messages!", group, pending)
                }
                _ = info
            }
        }
    }
}

3. 事件快照与增量回放

import json

SNAPSHOT_INTERVAL = 100

def save_snapshot(aggregate_id: str, state: dict, version: int):
    snapshot_key = f"snapshot:order:{aggregate_id}"
    r.hset(snapshot_key, mapping={
        "state": json.dumps(state),
        "version": str(version),
        "timestamp": str(time.time()),
    })

def load_from_snapshot(aggregate_id: str):
    snapshot_key = f"snapshot:order:{aggregate_id}"
    data = r.hgetall(snapshot_key)
    if not data:
        return None, 0
    state = json.loads(data["state"])
    version = int(data["version"])
    return state, version

def rebuild_with_snapshot(aggregate_id: str):
    state, version = load_from_snapshot(aggregate_id)
    stream_key = f"events:order:{aggregate_id}"
    if state is None:
        events = r.xrange(stream_key, "-", "+")
    else:
        min_id = f"{version + 1}-0"
        events = r.xrange(stream_key, min_id, "+")
    for msg_id, fields in events:
        state = apply_event(state or {}, fields)
    return state

对比:Redis Streams vs 其他消息系统

特性 Redis Streams Kafka RabbitMQ NATS JetStream Pulsar
定位 轻量级事件流 分布式事件流平台 消息代理 云原生消息系统 分布式消息平台
部署复杂度 ★☆☆☆☆ ★★★★★ ★★★☆☆ ★★☆☆☆ ★★★★☆
吞吐量 10-50万/s 100万+/s 5-10万/s 50-100万/s 100万+/s
消息持久化 AOF/RDB 磁盘日志 可选持久化 磁盘日志 BookKeeper
消费者组 ✅ 原生支持 ✅ 原生支持 ❌ 需要插件 ✅ 原生支持 ✅ 原生支持
消息回放 ✅ XRANGE ✅ Offset重置 ❌ 不支持 ✅ 支持回放 ✅ 支持回放
Exactly-Once ❌ At-Least-Once ✅ 事务支持 ❌ 需要插件 ✅ 支持 ✅ 支持
死信队列 ❌ 需手动实现 ❌ 需手动实现 ✅ 原生支持 ✅ 原生支持 ✅ 原生支持
分区 ❌ 需手动分片 ✅ 自动分区 ✅ Exchange路由 ✅ 自动分区 ✅ 自动分区
运维成本 极低 高(ZooKeeper/KRaft) 中等 高(BookKeeper)
适用场景 中小规模微服务事件通信 大规模数据流处理 企业级消息路由 云原生轻量级消息 多租户大规模消息

总结

Redis Streams在2026年依然是中小规模微服务事件驱动架构的最优轻量级选择。它不需要Kafka那样重的基础设施,也不需要RabbitMQ那样复杂的交换机配置,用6种核心模式就能覆盖从基础生产消费到生产级事件处理的全场景。关键记住三点:一定要用消费者组(XREADGROUP)处理完必须XACKStream必须设置MAXLEN防止内存溢出。当你需要Exactly-Once语义或百万级吞吐时,再考虑升级到Kafka或Pulsar。


推荐工具

本站提供浏览器本地工具,免注册即可试用 →

#Redis#Streams#事件驱动#消息队列#Consumer Group#2026#微服务