Redis Streams事件驅動架構實戰:從消費者組到事件溯源的6種生產模式

数据库

微服務通信的困境:Kafka太重,Redis Streams剛剛好

訂單服務要通知庫存扣減、支付服務要廣播交易結果、使用者註冊要觸發歡迎郵件——微服務之間的事件通信無處不在。你上Kafka?叢集運維、ZooKeeper/KRaft協調、分區再平衡……一個小服務根本扛不住這套基礎設施。你用RabbitMQ?交換機綁定、路由鍵配置、訊息確認機制……複雜度也不低。2026年,Redis Streams作為輕量級事件驅動方案,正在成為中小規模微服務通信的首選。

本文將從6種生產模式出發,帶你完成基礎生產消費→消費者組→事件溯源→CQRS讀模型→死信佇列→生產級事件處理服務的全鏈路實戰,每一步都有完整可執行的Python和Go程式碼。


Redis Streams核心概念

概念 說明
Stream Redis 5.0引入的日誌型資料結構,類似Kafka Topic,按時間順序儲存訊息
XADD 向Stream追加訊息,自動產生時間戳ID(如1718432000000-0
XREAD 從Stream讀取訊息,支援阻塞與非阻塞模式
Consumer Group 消費者組,類似Kafka Consumer Group,確保每條訊息只被組內一個消費者處理
XREADGROUP 以消費者組身份讀取訊息,訊息進入Pending狀態
XACK 確認訊息已處理,從Pending列表移除
Pending Entries List (PEL) 已投遞但未確認的訊息列表,用於故障恢復和重試
XPENDING 檢視消費者組的Pending訊息統計
XCLAIM 將Pending訊息轉移給其他消費者,用於故障轉移
Event Sourcing 事件溯源模式,所有狀態變更以事件序列持久化,可重建任意時刻狀態
CQRS 命令查詢職責分離,寫模型(事件流)與讀模型(投影)獨立演進
Dead Letter Queue (DLQ) 死信佇列,處理失敗超過重試閾值的訊息,避免無限重試阻塞消費

問題分析:事件驅動架構的5大挑戰

  1. 訊息遺失與重複消費:消費者當機時未ACK的訊息可能遺失或被重複投遞,需要Exactly-Once或At-Least-Once語義保證
  2. 消費者組負載不均:部分消費者處理慢導致訊息堆積,其他消費者閒置,需要動態再平衡和XCLAIM轉移
  3. 事件溯源的快照與回放:事件流無限增長,全量回放成本高,需要定期快照+增量事件的組合策略
  4. CQRS讀模型一致性:讀模型投影是最終一致,使用者看到的資料可能滯後,需要版本追蹤和補償機制
  5. 死信與重試風暴:訊息處理反覆失敗導致重試風暴,需要指數退避+死信佇列+人工干預的完整策略

分步實操:6種Redis Streams事件驅動模式

模式1:基礎Stream生產者/消費者(XADD/XREAD)

最簡單的生產消費模型,一個生產者寫入Stream,一個消費者讀取。

import redis
import json
import time
import threading

r = redis.Redis(host="localhost", port=6379, decode_responses=True)

STREAM_NAME = "events:orders"

def producer():
    for i in range(1, 6):
        event = {
            "event_type": "order_created",
            "order_id": f"ORD-{i:04d}",
            "amount": 100 * i,
            "user_id": f"USR-{i:03d}",
            "timestamp": time.time()
        }
        msg_id = r.xadd(STREAM_NAME, event)
        print(f"[Producer] Added event: {msg_id} -> {event}")
        time.sleep(0.5)

def consumer():
    last_id = "0-0"
    while True:
        entries = r.xread({STREAM_NAME: last_id}, count=5, block=5000)
        if not entries:
            print("[Consumer] No new messages, waiting...")
            continue
        for stream_name, messages in entries:
            for msg_id, fields in messages:
                print(f"[Consumer] Processed: {msg_id} -> {fields}")
                last_id = msg_id

if __name__ == "__main__":
    threading.Thread(target=producer, daemon=True).start()
    consumer()

Go版本:

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/redis/go-redis/v9"
)

var ctx = context.Background()

func main() {
	rdb := redis.NewClient(&redis.Options{
		Addr: "localhost:6379",
	})
	streamName := "events:orders"

	go producer(rdb, streamName)
	consumer(rdb, streamName)
}

func producer(rdb *redis.Client, streamName string) {
	for i := 1; i <= 5; i++ {
		values := map[string]interface{}{
			"event_type": "order_created",
			"order_id":   fmt.Sprintf("ORD-%04d", i),
			"amount":     fmt.Sprintf("%d", 100*i),
			"user_id":    fmt.Sprintf("USR-%03d", i),
			"timestamp":  fmt.Sprintf("%d", time.Now().UnixMilli()),
		}
		msgID, err := rdb.XAdd(ctx, &redis.XAddArgs{
			Stream: streamName,
			Values: values,
		}).Result()
		if err != nil {
			fmt.Printf("[Producer] Error: %v\n", err)
			continue
		}
		fmt.Printf("[Producer] Added event: %s\n", msgID)
		time.Sleep(500 * time.Millisecond)
	}
}

func consumer(rdb *redis.Client, streamName string) {
	lastID := "0-0"
	for {
		results, err := rdb.XRead(ctx, &redis.XReadArgs{
			Streams: []string{streamName, lastID},
			Count:   5,
			Block:   5 * time.Second,
		}).Result()
		if err != nil {
			if err == redis.Nil {
				fmt.Println("[Consumer] No new messages, waiting...")
				continue
			}
			fmt.Printf("[Consumer] Error: %v\n", err)
			continue
		}
		for _, stream := range results {
			for _, msg := range stream.Messages {
				fmt.Printf("[Consumer] Processed: %s -> %v\n", msg.ID, msg.Values)
				lastID = msg.ID
			}
		}
	}
}

模式2:消費者組(XREADGROUP)

消費者組確保每條訊息只被組內一個消費者處理,類似Kafka的Consumer Group。

import redis
import time
import threading
import random

r = redis.Redis(host="localhost", port=6379, decode_responses=True)

STREAM_NAME = "events:payments"
GROUP_NAME = "payment-processors"

try:
    r.xgroup_create(STREAM_NAME, GROUP_NAME, id="0", mkstream=True)
    print(f"[Init] Created consumer group: {GROUP_NAME}")
except redis.ResponseError as e:
    if "BUSYGROUP" in str(e):
        print(f"[Init] Group already exists: {GROUP_NAME}")
    else:
        raise

def group_consumer(consumer_name: str):
    while True:
        entries = r.xreadgroup(
            GROUP_NAME, consumer_name,
            {STREAM_NAME: ">"},
            count=3, block=3000
        )
        if not entries:
            print(f"[{consumer_name}] No new messages")
            continue
        for stream_name, messages in entries:
            for msg_id, fields in messages:
                process_time = random.uniform(0.1, 0.5)
                time.sleep(process_time)
                r.xack(STREAM_NAME, GROUP_NAME, msg_id)
                print(f"[{consumer_name}] ACKed: {msg_id} -> {fields}")

def check_pending():
    pending = r.xpending_range(STREAM_NAME, GROUP_NAME, min="-", max="+", count=10)
    if pending:
        print(f"[Monitor] Pending messages: {len(pending)}")
        for p in pending:
            print(f"  msg_id={p['message_id']}, consumer={p['consumer']}, "
                  f"delivered={p['times_delivered']} times")

if __name__ == "__main__":
    for i in range(1, 4):
        for j in range(1, 3):
            r.xadd(STREAM_NAME, {
                "event_type": "payment_completed",
                "payment_id": f"PAY-{i:02d}{j:02d}",
                "amount": str(random.randint(50, 500)),
            })

    threading.Thread(target=group_consumer, args=("consumer-1",), daemon=True).start()
    threading.Thread(target=group_consumer, args=("consumer-2",), daemon=True).start()

    time.sleep(1)
    check_pending()
    time.sleep(5)

Go版本:

package main

import (
	"context"
	"fmt"
	"math/rand"
	"time"

	"github.com/redis/go-redis/v9"
)

var ctx = context.Background()

func main() {
	rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
	streamName := "events:payments"
	groupName := "payment-processors"

	err := rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()
	if err != nil {
		fmt.Printf("[Init] Group create: %v\n", err)
	} else {
		fmt.Printf("[Init] Created consumer group: %s\n", groupName)
	}

	for i := 1; i <= 3; i++ {
		for j := 1; j <= 2; j++ {
			rdb.XAdd(ctx, &redis.XAddArgs{
				Stream: streamName,
				Values: map[string]interface{}{
					"event_type": "payment_completed",
					"payment_id": fmt.Sprintf("PAY-%02d%02d", i, j),
					"amount":     fmt.Sprintf("%d", rand.Intn(450)+50),
				},
			})
		}
	}

	go groupConsumer(rdb, streamName, groupName, "consumer-1")
	go groupConsumer(rdb, streamName, groupName, "consumer-2")
	time.Sleep(6 * time.Second)
}

func groupConsumer(rdb *redis.Client, stream, group, consumer string) {
	for {
		results, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
			Group:    group,
			Consumer: consumer,
			Streams:  []string{stream, ">"},
			Count:    3,
			Block:    3 * time.Second,
		}).Result()
		if err != nil {
			if err == redis.Nil {
				fmt.Printf("[%s] No new messages\n", consumer)
				continue
			}
			fmt.Printf("[%s] Error: %v\n", consumer, err)
			continue
		}
		for _, stream := range results {
			for _, msg := range stream.Messages {
				fmt.Printf("[%s] Processing: %s -> %v\n", consumer, msg.ID, msg.Values)
				rdb.XAck(ctx, stream, group, msg.ID)
				fmt.Printf("[%s] ACKed: %s\n", consumer, msg.ID)
			}
		}
	}
}

模式3:事件溯源(Event Sourcing)

所有狀態變更以事件序列持久化到Stream,可重建任意時刻的聚合根狀態。

import redis
import json
from dataclasses import dataclass, field
from typing import List, Dict

r = redis.Redis(host="localhost", port=6379, decode_responses=True)

@dataclass
class OrderState:
    order_id: str = ""
    status: str = "created"
    items: List[Dict] = field(default_factory=list)
    total: float = 0.0
    version: int = 0

class EventStore:
    def __init__(self, aggregate_type: str, aggregate_id: str):
        self.stream_key = f"events:{aggregate_type}:{aggregate_id}"
        self.aggregate_type = aggregate_type
        self.aggregate_id = aggregate_id

    def append(self, event_type: str, data: dict):
        event = {
            "event_type": event_type,
            "aggregate_id": self.aggregate_id,
            "data": json.dumps(data),
            "version": str(self._next_version()),
        }
        msg_id = r.xadd(self.stream_key, event)
        print(f"[EventStore] Appended: {msg_id} type={event_type} version={event['version']}")
        return msg_id

    def _next_version(self) -> int:
        info = r.xinfo_stream(self.stream_key)
        length = info.get("length", 0)
        return length + 1

    def load_events(self) -> List[dict]:
        entries = r.xrange(self.stream_key, "-", "+", count=1000)
        events = []
        for msg_id, fields in entries:
            events.append({
                "id": msg_id,
                "event_type": fields["event_type"],
                "data": json.loads(fields["data"]),
                "version": int(fields["version"]),
            })
        return events

class OrderAggregate:
    def __init__(self, order_id: str):
        self.order_id = order_id
        self.store = EventStore("order", order_id)
        self.state = OrderState(order_id=order_id)

    def create(self, items: List[Dict]):
        total = sum(item["price"] * item["qty"] for item in items)
        self.store.append("OrderCreated", {"items": items, "total": total})

    def add_item(self, item: Dict):
        self.store.append("ItemAdded", item)

    def pay(self):
        self.store.append("OrderPaid", {"payment_method": "credit_card"})

    def ship(self, tracking_number: str):
        self.store.append("OrderShipped", {"tracking_number": tracking_number})

    def rebuild(self) -> OrderState:
        events = self.store.load_events()
        state = OrderState(order_id=self.order_id)
        for event in events:
            self._apply(state, event)
        self.state = state
        return state

    def _apply(self, state: OrderState, event: dict):
        event_type = event["event_type"]
        data = event["data"]
        if event_type == "OrderCreated":
            state.items = data.get("items", [])
            state.total = data.get("total", 0)
            state.status = "created"
        elif event_type == "ItemAdded":
            state.items.append(data)
            state.total += data.get("price", 0) * data.get("qty", 1)
        elif event_type == "OrderPaid":
            state.status = "paid"
        elif event_type == "OrderShipped":
            state.status = "shipped"
        state.version = event["version"]

if __name__ == "__main__":
    order = OrderAggregate("ORD-0001")
    order.create([{"sku": "WIDGET-01", "price": 29.9, "qty": 2}])
    order.add_item({"sku": "GADGET-02", "price": 49.9, "qty": 1})
    order.pay()
    order.ship("SF1234567890")

    rebuilt = order.rebuild()
    print(f"\n[Rebuilt State] order_id={rebuilt.order_id} status={rebuilt.status} "
          f"total={rebuilt.total} items={len(rebuilt.items)} version={rebuilt.version}")

Go版本:

package main

import (
	"context"
	"encoding/json"
	"fmt"

	"github.com/redis/go-redis/v9"
)

var ctx = context.Background()

type OrderState struct {
	OrderID string                   `json:"order_id"`
	Status  string                   `json:"status"`
	Items   []map[string]interface{} `json:"items"`
	Total   float64                  `json:"total"`
	Version int                      `json:"version"`
}

type Event struct {
	ID         string                 `json:"id"`
	EventType  string                 `json:"event_type"`
	Data       map[string]interface{} `json:"data"`
	Version    int                    `json:"version"`
}

type EventStore struct {
	rdb         *redis.Client
	StreamKey   string
	AggregateID string
}

func NewEventStore(rdb *redis.Client, aggregateType, aggregateID string) *EventStore {
	return &EventStore{
		rdb:         rdb,
		StreamKey:   fmt.Sprintf("events:%s:%s", aggregateType, aggregateID),
		AggregateID: aggregateID,
	}
}

func (s *EventStore) Append(eventType string, data map[string]interface{}) (string, error) {
	info, err := s.rdb.XInfoStream(ctx, s.StreamKey).Result()
	version := 1
	if err == nil {
		version = int(info.Length) + 1
	}
	dataJSON, _ := json.Marshal(data)
	msgID, err := s.rdb.XAdd(ctx, &redis.XAddArgs{
		Stream: s.StreamKey,
		Values: map[string]interface{}{
			"event_type":   eventType,
			"aggregate_id": s.AggregateID,
			"data":         string(dataJSON),
			"version":      fmt.Sprintf("%d", version),
		},
	}).Result()
	if err != nil {
		return "", err
	}
	fmt.Printf("[EventStore] Appended: %s type=%s version=%d\n", msgID, eventType, version)
	return msgID, nil
}

func (s *EventStore) LoadEvents() ([]Event, error) {
	entries, err := s.rdb.XRange(ctx, s.StreamKey, "-", "+").Result()
	if err != nil {
		return nil, err
	}
	var events []Event
	for _, msg := range entries {
		var data map[string]interface{}
		json.Unmarshal([]byte(msg.Values["data"].(string)), &data)
		var version int
		fmt.Sscanf(msg.Values["version"].(string), "%d", &version)
		events = append(events, Event{
			ID:        msg.ID,
			EventType: msg.Values["event_type"].(string),
			Data:      data,
			Version:   version,
		})
	}
	return events, nil
}

type OrderAggregate struct {
	OrderID string
	Store   *EventStore
	State   OrderState
}

func NewOrderAggregate(rdb *redis.Client, orderID string) *OrderAggregate {
	return &OrderAggregate{
		OrderID: orderID,
		Store:   NewEventStore(rdb, "order", orderID),
		State:   OrderState{OrderID: orderID, Status: "created"},
	}
}

func (a *OrderAggregate) Create(items []map[string]interface{}) {
	total := 0.0
	for _, item := range items {
		price, _ := item["price"].(float64)
		qty, _ := item["qty"].(float64)
		total += price * qty
	}
	a.Store.Append("OrderCreated", map[string]interface{}{
		"items": items, "total": total,
	})
}

func (a *OrderAggregate) Pay() {
	a.Store.Append("OrderPaid", map[string]interface{}{
		"payment_method": "credit_card",
	})
}

func (a *OrderAggregate) Ship(trackingNumber string) {
	a.Store.Append("OrderShipped", map[string]interface{}{
		"tracking_number": trackingNumber,
	})
}

func (a *OrderAggregate) Rebuild() (*OrderState, error) {
	events, err := a.Store.LoadEvents()
	if err != nil {
		return nil, err
	}
	state := OrderState{OrderID: a.OrderID, Status: "created"}
	for _, event := range events {
		switch event.EventType {
		case "OrderCreated":
			if items, ok := event.Data["items"].([]interface{}); ok {
				for _, item := range items {
					if m, ok := item.(map[string]interface{}); ok {
						state.Items = append(state.Items, m)
					}
				}
			}
			if total, ok := event.Data["total"].(float64); ok {
				state.Total = total
			}
			state.Status = "created"
		case "OrderPaid":
			state.Status = "paid"
		case "OrderShipped":
			state.Status = "shipped"
		}
		state.Version = event.Version
	}
	a.State = state
	return &state, nil
}

func main() {
	rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
	order := NewOrderAggregate(rdb, "ORD-0001")
	order.Create([]map[string]interface{}{
		{"sku": "WIDGET-01", "price": 29.9, "qty": 2},
	})
	order.Pay()
	order.Ship("SF1234567890")

	state, _ := order.Rebuild()
	fmt.Printf("\n[Rebuilt State] order_id=%s status=%s total=%.2f items=%d version=%d\n",
		state.OrderID, state.Status, state.Total, len(state.Items), state.Version)
}

模式4:CQRS讀模型投影

寫端事件流寫入Stream,讀端監聽Stream並更新投影(讀模型),實現命令查詢職責分離。

import redis
import json
import time
import threading

r = redis.Redis(host="localhost", port=6379, decode_responses=True)

EVENT_STREAM = "events:orders"
PROJECTION_KEY = "projection:order_summary"
GROUP_NAME = "cqrs-projectors"

try:
    r.xgroup_create(EVENT_STREAM, GROUP_NAME, id="0", mkstream=True)
except redis.ResponseError:
    pass

def command_side_publish_events():
    events = [
        {"event_type": "OrderCreated", "order_id": "ORD-001", "amount": "299", "user_id": "USR-001"},
        {"event_type": "OrderCreated", "order_id": "ORD-002", "amount": "599", "user_id": "USR-002"},
        {"event_type": "OrderPaid", "order_id": "ORD-001", "amount": "299"},
        {"event_type": "OrderShipped", "order_id": "ORD-001"},
        {"event_type": "OrderCreated", "order_id": "ORD-003", "amount": "199", "user_id": "USR-001"},
    ]
    for event in events:
        r.xadd(EVENT_STREAM, event)
        print(f"[Command] Published: {event['event_type']} -> {event.get('order_id')}")
        time.sleep(0.3)

def query_side_projector():
    consumer_name = "projector-1"
    while True:
        entries = r.xreadgroup(
            GROUP_NAME, consumer_name,
            {EVENT_STREAM: ">"},
            count=5, block=3000
        )
        if not entries:
            continue
        for stream_name, messages in entries:
            for msg_id, fields in messages:
                update_projection(fields)
                r.xack(EVENT_STREAM, GROUP_NAME, msg_id)

def update_projection(event: dict):
    event_type = event.get("event_type")
    order_id = event.get("order_id", "")
    if event_type == "OrderCreated":
        r.hset(PROJECTION_KEY, order_id, json.dumps({
            "status": "created",
            "amount": event.get("amount", "0"),
            "user_id": event.get("user_id", ""),
        }))
    elif event_type == "OrderPaid":
        existing = r.hget(PROJECTION_KEY, order_id)
        if existing:
            data = json.loads(existing)
            data["status"] = "paid"
            r.hset(PROJECTION_KEY, order_id, json.dumps(data))
    elif event_type == "OrderShipped":
        existing = r.hget(PROJECTION_KEY, order_id)
        if existing:
            data = json.loads(existing)
            data["status"] = "shipped"
            r.hset(PROJECTION_KEY, order_id, json.dumps(data))
    print(f"[Projection] Updated: {order_id} <- {event_type}")

def query_read_model():
    time.sleep(3)
    all_orders = r.hgetall(PROJECTION_KEY)
    print("\n[Read Model] Order Summary:")
    for order_id, data in all_orders.items():
        print(f"  {order_id}: {data}")

if __name__ == "__main__":
    threading.Thread(target=command_side_publish_events, daemon=True).start()
    threading.Thread(target=query_side_projector, daemon=True).start()
    query_read_model()

Go版本:

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"time"

	"github.com/redis/go-redis/v9"
)

var ctx = context.Background()

const (
	eventStream   = "events:orders"
	projectionKey = "projection:order_summary"
	groupName     = "cqrs-projectors"
)

func main() {
	rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
	rdb.XGroupCreateMkStream(ctx, eventStream, groupName, "0")

	go commandSidePublishEvents(rdb)
	go querySideProjector(rdb)
	time.Sleep(4 * time.Second)
	queryReadModel(rdb)
}

func commandSidePublishEvents(rdb *redis.Client) {
	events := []map[string]interface{}{
		{"event_type": "OrderCreated", "order_id": "ORD-001", "amount": "299", "user_id": "USR-001"},
		{"event_type": "OrderCreated", "order_id": "ORD-002", "amount": "599", "user_id": "USR-002"},
		{"event_type": "OrderPaid", "order_id": "ORD-001", "amount": "299"},
		{"event_type": "OrderShipped", "order_id": "ORD-001"},
		{"event_type": "OrderCreated", "order_id": "ORD-003", "amount": "199", "user_id": "USR-001"},
	}
	for _, event := range events {
		rdb.XAdd(ctx, &redis.XAddArgs{Stream: eventStream, Values: event})
		fmt.Printf("[Command] Published: %s -> %s\n", event["event_type"], event["order_id"])
		time.Sleep(300 * time.Millisecond)
	}
}

func querySideProjector(rdb *redis.Client) {
	for {
		results, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
			Group:    groupName,
			Consumer: "projector-1",
			Streams:  []string{eventStream, ">"},
			Count:    5,
			Block:    3 * time.Second,
		}).Result()
		if err != nil {
			continue
		}
		for _, stream := range results {
			for _, msg := range stream.Messages {
				updateProjection(rdb, msg.Values)
				rdb.XAck(ctx, eventStream, groupName, msg.ID)
			}
		}
	}
}

func updateProjection(rdb *redis.Client, fields map[string]interface{}) {
	eventType, _ := fields["event_type"].(string)
	orderID, _ := fields["order_id"].(string)
	switch eventType {
	case "OrderCreated":
		data, _ := json.Marshal(map[string]string{
			"status":  "created",
			"amount":  fmt.Sprintf("%v", fields["amount"]),
			"user_id": fmt.Sprintf("%v", fields["user_id"]),
		})
		rdb.HSet(ctx, projectionKey, orderID, string(data))
	case "OrderPaid":
		existing, _ := rdb.HGet(ctx, projectionKey, orderID).Result()
		if existing != "" {
			var data map[string]string
			json.Unmarshal([]byte(existing), &data)
			data["status"] = "paid"
			updated, _ := json.Marshal(data)
			rdb.HSet(ctx, projectionKey, orderID, string(updated))
		}
	case "OrderShipped":
		existing, _ := rdb.HGet(ctx, projectionKey, orderID).Result()
		if existing != "" {
			var data map[string]string
			json.Unmarshal([]byte(existing), &data)
			data["status"] = "shipped"
			updated, _ := json.Marshal(data)
			rdb.HSet(ctx, projectionKey, orderID, string(updated))
		}
	}
	fmt.Printf("[Projection] Updated: %s <- %s\n", orderID, eventType)
}

func queryReadModel(rdb *redis.Client) {
	orders, _ := rdb.HGetAll(ctx, projectionKey).Result()
	fmt.Println("\n[Read Model] Order Summary:")
	for orderID, data := range orders {
		fmt.Printf("  %s: %s\n", orderID, data)
	}
}

模式5:死信佇列與重試機制

訊息處理失敗超過重試閾值後轉入死信佇列,避免無限重試阻塞消費。

import redis
import json
import time
import random

r = redis.Redis(host="localhost", port=6379, decode_responses=True)

MAIN_STREAM = "events:orders"
DLQ_STREAM = "events:orders:dlq"
GROUP_NAME = "order-processors"
MAX_RETRIES = 3
RETRY_DELAYS = [1, 5, 30]

try:
    r.xgroup_create(MAIN_STREAM, GROUP_NAME, id="0", mkstream=True)
except redis.ResponseError:
    pass

def process_message(fields: dict) -> bool:
    order_id = fields.get("order_id", "")
    if random.random() < 0.4:
        print(f"  [FAIL] Processing failed for {order_id}")
        return False
    print(f"  [OK] Processed {order_id}")
    return True

def consumer_with_retry(consumer_name: str):
    while True:
        entries = r.xreadgroup(
            GROUP_NAME, consumer_name,
            {MAIN_STREAM: ">"},
            count=1, block=3000
        )
        if not entries:
            continue
        for stream_name, messages in entries:
            for msg_id, fields in messages:
                success = process_message(fields)
                if success:
                    r.xack(MAIN_STREAM, GROUP_NAME, msg_id)
                else:
                    pending_info = r.xpending_range(
                        MAIN_STREAM, GROUP_NAME,
                        min=msg_id, max=msg_id, count=1
                    )
                    retry_count = 0
                    if pending_info:
                        retry_count = pending_info[0].get("times_delivered", 1) - 1
                    if retry_count >= MAX_RETRIES:
                        r.xadd(DLQ_STREAM, {
                            **fields,
                            "original_id": msg_id,
                            "failure_reason": "max_retries_exceeded",
                            "retry_count": str(retry_count),
                        })
                        r.xack(MAIN_STREAM, GROUP_NAME, msg_id)
                        print(f"  [DLQ] Moved to dead letter: {msg_id}")
                    else:
                        delay = RETRY_DELAYS[min(retry_count, len(RETRY_DELAYS) - 1)]
                        print(f"  [RETRY] Will retry {msg_id} after {delay}s "
                              f"(attempt {retry_count + 1}/{MAX_RETRIES})")
                        r.xack(MAIN_STREAM, GROUP_NAME, msg_id)
                        time.sleep(delay)
                        r.xadd(MAIN_STREAM, fields)

def inspect_dlq():
    dlq_entries = r.xrange(DLQ_STREAM, "-", "+", count=20)
    print(f"\n[DLQ] {len(dlq_entries)} dead letters:")
    for msg_id, fields in dlq_entries:
        print(f"  {msg_id}: order_id={fields.get('order_id')} "
              f"retries={fields.get('retry_count')} reason={fields.get('failure_reason')}")

if __name__ == "__main__":
    for i in range(1, 11):
        r.xadd(MAIN_STREAM, {"order_id": f"ORD-{i:04d}", "amount": str(i * 100)})

    random.seed(42)
    consumer_with_retry("consumer-1")

Go版本:

package main

import (
	"context"
	"fmt"
	"math/rand"
	"time"

	"github.com/redis/go-redis/v9"
)

var ctx = context.Background()

const (
	mainStream  = "events:orders"
	dlqStream   = "events:orders:dlq"
	groupName   = "order-processors"
	maxRetries  = 3
)

var retryDelays = []time.Duration{1 * time.Second, 5 * time.Second, 30 * time.Second}

func main() {
	rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
	rdb.XGroupCreateMkStream(ctx, mainStream, groupName, "0")

	for i := 1; i <= 10; i++ {
		rdb.XAdd(ctx, &redis.XAddArgs{
			Stream: mainStream,
			Values: map[string]interface{}{
				"order_id": fmt.Sprintf("ORD-%04d", i),
				"amount":   fmt.Sprintf("%d", i*100),
			},
		})
	}

	consumerWithRetry(rdb, "consumer-1")
}

func processMessage(fields map[string]interface{}) bool {
	orderID := fmt.Sprintf("%v", fields["order_id"])
	if rand.Float64() < 0.4 {
		fmt.Printf("  [FAIL] Processing failed for %s\n", orderID)
		return false
	}
	fmt.Printf("  [OK] Processed %s\n", orderID)
	return true
}

func consumerWithRetry(rdb *redis.Client, consumerName string) {
	for {
		results, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
			Group:    groupName,
			Consumer: consumerName,
			Streams:  []string{mainStream, ">"},
			Count:    1,
			Block:    3 * time.Second,
		}).Result()
		if err != nil {
			continue
		}
		for _, stream := range results {
			for _, msg := range stream.Messages {
				if processMessage(msg.Values) {
					rdb.XAck(ctx, mainStream, groupName, msg.ID)
				} else {
					pending, _ := rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
						Stream: mainStream,
						Group:  groupName,
						Start:  msg.ID,
						End:    msg.ID,
						Count:  1,
					}).Result()
					retryCount := 0
					if len(pending) > 0 {
						retryCount = int(pending[0].DeliveryCount) - 1
					}
					if retryCount >= maxRetries {
						dlqValues := map[string]interface{}{
							"original_id":    msg.ID,
							"failure_reason": "max_retries_exceeded",
							"retry_count":    fmt.Sprintf("%d", retryCount),
						}
						for k, v := range msg.Values {
							dlqValues[k] = v
						}
						rdb.XAdd(ctx, &redis.XAddArgs{Stream: dlqStream, Values: dlqValues})
						rdb.XAck(ctx, mainStream, groupName, msg.ID)
						fmt.Printf("  [DLQ] Moved to dead letter: %s\n", msg.ID)
					} else {
						delay := retryDelays[min(retryCount, len(retryDelays)-1)]
						fmt.Printf("  [RETRY] Will retry %s after %v (attempt %d/%d)\n",
							msg.ID, delay, retryCount+1, maxRetries)
						rdb.XAck(ctx, mainStream, groupName, msg.ID)
						time.Sleep(delay)
						rdb.XAdd(ctx, &redis.XAddArgs{Stream: mainStream, Values: msg.Values})
					}
				}
			}
		}
	}
}

模式6:生產級事件處理服務(Go實作)

完整的生產級事件處理服務,包含優雅關閉、健康檢查、指標監控、XCLAIM故障轉移。

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"os/signal"
	"sync"
	"sync/atomic"
	"syscall"
	"time"

	"github.com/redis/go-redis/v9"
)

type EventProcessor struct {
	rdb          *redis.Client
	stream       string
	group        string
	consumer     string
	dlqStream    string
	maxRetries   int
	processed    atomic.Int64
	failed       atomic.Int64
	retried      atomic.Int64
	mu           sync.Mutex
	running      bool
	ctx          context.Context
	cancel       context.CancelFunc
}

func NewEventProcessor(rdb *redis.Client, stream, group, consumer string) *EventProcessor {
	ctx, cancel := context.WithCancel(context.Background())
	return &EventProcessor{
		rdb:        rdb,
		stream:     stream,
		group:      group,
		consumer:   consumer,
		dlqStream:  stream + ":dlq",
		maxRetries: 3,
		running:    true,
		ctx:        ctx,
		cancel:     cancel,
	}
}

func (p *EventProcessor) Start() {
	rdb := p.rdb
	rdb.XGroupCreateMkStream(p.ctx, p.stream, p.group, "0")

	log.Printf("[Processor] Starting consumer %s for stream %s", p.consumer, p.stream)

	go p.claimPendingMessages()
	go p.consumeNewMessages()
	go p.reportMetrics()
}

func (p *EventProcessor) consumeNewMessages() {
	for p.running {
		results, err := p.rdb.XReadGroup(p.ctx, &redis.XReadGroupArgs{
			Group:    p.group,
			Consumer: p.consumer,
			Streams:  []string{p.stream, ">"},
			Count:    10,
			Block:    2 * time.Second,
		}).Result()
		if err != nil {
			if err == redis.Nil || p.ctx.Err() != nil {
				continue
			}
			log.Printf("[Consumer] Error: %v", err)
			continue
		}
		for _, stream := range results {
			for _, msg := range stream.Messages {
				p.handleMessage(msg.ID, msg.Values)
			}
		}
	}
}

func (p *EventProcessor) claimPendingMessages() {
	ticker := time.NewTicker(30 * time.Second)
	defer ticker.Stop()
	for {
		select {
		case <-ticker.C:
			pending, err := p.rdb.XPendingExt(p.ctx, &redis.XPendingExtArgs{
				Stream: p.stream,
				Group:  p.group,
				Start:  "-",
				End:    "+",
				Count:  10,
				Idle:   60 * time.Second,
			}).Result()
			if err != nil {
				continue
			}
			for _, msg := range pending {
				claimed, err := p.rdb.XClaim(p.ctx, &redis.XClaimArgs{
					Stream:   p.stream,
					Group:    p.group,
					Consumer: p.consumer,
					MinIdle:  60 * time.Second,
					Messages: []string{msg.ID},
				}).Result()
				if err == nil && len(claimed) > 0 {
					log.Printf("[Claim] Claimed pending message: %s", claimed[0].ID)
					p.retried.Add(1)
					p.handleMessage(claimed[0].ID, claimed[0].Values)
				}
			}
		case <-p.ctx.Done():
			return
		}
	}
}

func (p *EventProcessor) handleMessage(msgID string, values map[string]interface{}) {
	err := p.processEvent(values)
	if err != nil {
		p.failed.Add(1)
		pending, _ := p.rdb.XPendingExt(p.ctx, &redis.XPendingExtArgs{
			Stream: p.stream,
			Group:  p.group,
			Start:  msgID,
			End:    msgID,
			Count:  1,
		}).Result()
		deliveryCount := 1
		if len(pending) > 0 {
			deliveryCount = int(pending[0].DeliveryCount)
		}
		if deliveryCount >= p.maxRetries {
			dlqValues := map[string]interface{}{
				"original_id":    msgID,
				"failure_reason": err.Error(),
				"retry_count":    fmt.Sprintf("%d", deliveryCount),
			}
			for k, v := range values {
				dlqValues[k] = v
			}
			p.rdb.XAdd(p.ctx, &redis.XAddArgs{Stream: p.dlqStream, Values: dlqValues})
			p.rdb.XAck(p.ctx, p.stream, p.group, msgID)
			log.Printf("[DLQ] Moved %s to dead letter after %d attempts", msgID, deliveryCount)
		} else {
			log.Printf("[Retry] Message %s failed (attempt %d/%d): %v",
				msgID, deliveryCount, p.maxRetries, err)
		}
		return
	}
	p.rdb.XAck(p.ctx, p.stream, p.group, msgID)
	p.processed.Add(1)
}

func (p *EventProcessor) processEvent(values map[string]interface{}) error {
	eventType, _ := values["event_type"].(string)
	switch eventType {
	case "order_created":
		return p.handleOrderCreated(values)
	case "payment_completed":
		return p.handlePaymentCompleted(values)
	default:
		log.Printf("[Process] Unknown event type: %s", eventType)
		return nil
	}
}

func (p *EventProcessor) handleOrderCreated(values map[string]interface{}) error {
	orderID := fmt.Sprintf("%v", values["order_id"])
	log.Printf("[Process] Order created: %s", orderID)
	return nil
}

func (p *EventProcessor) handlePaymentCompleted(values map[string]interface{}) error {
	paymentID := fmt.Sprintf("%v", values["payment_id"])
	log.Printf("[Process] Payment completed: %s", paymentID)
	return nil
}

func (p *EventProcessor) reportMetrics() {
	ticker := time.NewTicker(5 * time.Second)
	defer ticker.Stop()
	for {
		select {
		case <-ticker.C:
			log.Printf("[Metrics] processed=%d failed=%d retried=%d",
				p.processed.Load(), p.failed.Load(), p.retried.Load())
		case <-p.ctx.Done():
			return
		}
	}
}

func (p *EventProcessor) Stop() {
	log.Println("[Processor] Shutting down gracefully...")
	p.running = false
	p.cancel()
	time.Sleep(2 * time.Second)
	log.Printf("[Processor] Final metrics: processed=%d failed=%d retried=%d",
		p.processed.Load(), p.failed.Load(), p.retried.Load())
}

func main() {
	rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
	processor := NewEventProcessor(rdb, "events:orders", "order-processors", "worker-1")
	processor.Start()

	for i := 1; i <= 20; i++ {
		rdb.XAdd(context.Background(), &redis.XAddArgs{
			Stream: "events:orders",
			Values: map[string]interface{}{
				"event_type": "order_created",
				"order_id":   fmt.Sprintf("ORD-%04d", i),
				"amount":     fmt.Sprintf("%d", i*100),
			},
		})
	}

	sigCh := make(chan os.Signal, 1)
	signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
	<-sigCh
	processor.Stop()
}

避坑指南:5個常見錯誤

❌ 坑1:XREAD不使用消費者組,訊息被重複消費

# ❌ 錯誤:多個消費者獨立XREAD,每條訊息被所有消費者處理
def bad_consumer():
    entries = r.xread({STREAM_NAME: "0-0"}, count=10, block=5000)
    for _, messages in entries:
        for msg_id, fields in messages:
            process(fields)
# ✅ 正確:使用XREADGROUP,消費者組保證每條訊息只被組內一個消費者處理
def good_consumer():
    entries = r.xreadgroup(
        GROUP_NAME, "consumer-1",
        {STREAM_NAME: ">"},
        count=10, block=5000
    )
    for _, messages in entries:
        for msg_id, fields in messages:
            process(fields)
            r.xack(STREAM_NAME, GROUP_NAME, msg_id)

❌ 坑2:忘記XACK,訊息永遠留在Pending列表

// ❌ 錯誤:處理完訊息不ACK,訊息一直處於Pending狀態
func badConsumer(rdb *redis.Client) {
    results, _ := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
        Group:    groupName,
        Consumer: "consumer-1",
        Streams:  []string{streamName, ">"},
        Count:    10,
    }).Result()
    for _, stream := range results {
        for _, msg := range stream.Messages {
            processEvent(msg.Values)
            // 忘記ACK!
        }
    }
}
// ✅ 正確:處理完成後立即ACK
func goodConsumer(rdb *redis.Client) {
    results, _ := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
        Group:    groupName,
        Consumer: "consumer-1",
        Streams:  []string{streamName, ">"},
        Count:    10,
    }).Result()
    for _, stream := range results {
        for _, msg := range stream.Messages {
            if err := processEvent(msg.Values); err == nil {
                rdb.XAck(ctx, streamName, groupName, msg.ID)
            }
        }
    }
}

❌ 坑3:Stream無限增長導致記憶體溢位

# ❌ 錯誤:只寫不刪,Stream無限增長
r.xadd("events:orders", {"data": "..."})
# 從不呼叫XTRIM或XDEL
# ✅ 正確:使用MAXLEN限制Stream長度,或定期XTRIM
r.xadd("events:orders", {"data": "..."}, maxlen=10000, approximate=True)
# 或手動裁剪
r.xtrim("events:orders", maxlen=10000, approximate=True)

❌ 坑4:XCLAIM轉移訊息時不檢查Idle時間

// ❌ 錯誤:盲目XCLAIM所有Pending訊息,可能搶走正在處理的訊息
func badClaim(rdb *redis.Client) {
    pending, _ := rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
        Stream: streamName,
        Group:  groupName,
        Start:  "-",
        End:    "+",
        Count:  100,
    }).Result()
    for _, msg := range pending {
        rdb.XClaim(ctx, &redis.XClaimArgs{
            Stream:   streamName,
            Group:    groupName,
            Consumer: "consumer-2",
            Messages: []string{msg.ID},
        })
    }
}
// ✅ 正確:只Claim閒置時間超過閾值的Pending訊息
func goodClaim(rdb *redis.Client) {
    pending, _ := rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
        Stream: streamName,
        Group:  groupName,
        Start:  "-",
        End:    "+",
        Count:  100,
        Idle:   60 * time.Second, // 只Claim閒置超過60秒的
    }).Result()
    for _, msg := range pending {
        rdb.XClaim(ctx, &redis.XClaimArgs{
            Stream:   streamName,
            Group:    groupName,
            Consumer: "consumer-2",
            MinIdle:  60 * time.Second,
            Messages: []string{msg.ID},
        })
    }
}

❌ 坑5:消費者組建立時ID設定錯誤

# ❌ 錯誤:用$建立消費者組,只能消費建立後的新訊息,歷史訊息遺失
r.xgroup_create(STREAM_NAME, GROUP_NAME, id="$")
# ✅ 正確:用0建立消費者組,從頭消費所有訊息
r.xgroup_create(STREAM_NAME, GROUP_NAME, id="0")
# 如果確實只想消費新訊息,顯式註解說明意圖
r.xgroup_create(STREAM_NAME, GROUP_NAME, id="$")  # 只消費新訊息

錯誤排查表

錯誤現象 可能原因 排查指令 解決方案
NOGROUP No such key Stream或消費者組不存在 XINFO GROUPS events:orders XGROUP CREATE建立組,加MKSTREAM自動建立Stream
BUSYGROUP Consumer Group name already exists 重複建立消費者組 XINFO GROUPS events:orders 捕獲異常忽略,或先檢查組是否存在
消費者收不到訊息 XREADGROUP使用0-0而非> 檢查程式碼中Streams參數 新訊息用>,未確認訊息用0-0
Pending訊息堆積 消費者當機未ACK XPENDING events:orders group-name 使用XCLAIM轉移給活躍消費者
Stream記憶體持續增長 未設定MAXLEN/XTRIM XINFO STREAM events:orders 檢視length XADD時加MAXLEN ~ 10000或定期XTRIM
訊息被重複消費 消費者處理完成但ACK失敗 檢查ACK是否在處理邏輯之後 確保ACK在業務邏輯成功後執行
XCLAIM後訊息仍被原消費者處理 MinIdle時間設定過短 XPENDING檢視訊息閒置時間 設定合理的MinIdle閾值(如60秒)
事件溯源回放慢 事件數量過大,全量XRANGE XLEN events:order:ORD-001 定期快照+增量事件,用XRANGE按ID範圍回放
CQRS讀模型不一致 投影消費者延遲或當機 XINFO CONSUMERS檢視消費者延遲 監控消費者lag,設定告警閾值
XADD回傳ID格式錯誤 Redis版本低於5.0 redis-server --version 升級Redis至5.0+

進階最佳化

1. Stream分片與分區策略

當單Stream吞吐量不足時,可按業務鍵分片到多個Stream:

import hashlib

def get_shard_stream(base_stream: str, key: str, shard_count: int = 16) -> str:
    shard_index = int(hashlib.md5(key.encode()).hexdigest(), 16) % shard_count
    return f"{base_stream}:shard-{shard_index:03d}"

order_id = "ORD-0001"
stream = get_shard_stream("events:orders", order_id)
r.xadd(stream, {"order_id": order_id, "event_type": "order_created"})

2. 消費者Lag監控與告警

func monitorConsumerLag(rdb *redis.Client, stream, group string) {
    ticker := time.NewTicker(10 * time.Second)
    for range ticker.C {
        info, err := rdb.XInfoStream(ctx, stream).Result()
        if err != nil {
            continue
        }
        groups, _ := rdb.XInfoGroups(ctx, stream).Result()
        for _, g := range groups {
            if g.Name == group {
                pending := g.Pending
                consumers, _ := rdb.XInfoConsumers(ctx, stream, group).Result()
                for _, c := range consumers {
                    if c.Pending > 100 {
                        log.Printf("[ALERT] Consumer %s has %d pending messages!",
                            c.Name, c.Pending)
                    }
                    if c.Idle > 5*time.Minute {
                        log.Printf("[WARN] Consumer %s idle for %v", c.Name, c.Idle)
                    }
                }
                if pending > 1000 {
                    log.Printf("[ALERT] Group %s has %d pending messages!", group, pending)
                }
                _ = info
            }
        }
    }
}

3. 事件快照與增量回放

import json

SNAPSHOT_INTERVAL = 100

def save_snapshot(aggregate_id: str, state: dict, version: int):
    snapshot_key = f"snapshot:order:{aggregate_id}"
    r.hset(snapshot_key, mapping={
        "state": json.dumps(state),
        "version": str(version),
        "timestamp": str(time.time()),
    })

def load_from_snapshot(aggregate_id: str):
    snapshot_key = f"snapshot:order:{aggregate_id}"
    data = r.hgetall(snapshot_key)
    if not data:
        return None, 0
    state = json.loads(data["state"])
    version = int(data["version"])
    return state, version

def rebuild_with_snapshot(aggregate_id: str):
    state, version = load_from_snapshot(aggregate_id)
    stream_key = f"events:order:{aggregate_id}"
    if state is None:
        events = r.xrange(stream_key, "-", "+")
    else:
        min_id = f"{version + 1}-0"
        events = r.xrange(stream_key, min_id, "+")
    for msg_id, fields in events:
        state = apply_event(state or {}, fields)
    return state

對比:Redis Streams vs 其他訊息系統

特性 Redis Streams Kafka RabbitMQ NATS JetStream Pulsar
定位 輕量級事件流 分散式事件流平台 訊息代理 雲原生訊息系統 分散式訊息平台
部署複雜度 ★☆☆☆☆ ★★★★★ ★★★☆☆ ★★☆☆☆ ★★★★☆
吞吐量 10-50萬/s 100萬+/s 5-10萬/s 50-100萬/s 100萬+/s
訊息持久化 AOF/RDB 磁碟日誌 可選持久化 磁碟日誌 BookKeeper
消費者組 ✅ 原生支援 ✅ 原生支援 ❌ 需要外掛 ✅ 原生支援 ✅ 原生支援
訊息回放 ✅ XRANGE ✅ Offset重置 ❌ 不支援 ✅ 支援回放 ✅ 支援回放
Exactly-Once ❌ At-Least-Once ✅ 交易支援 ❌ 需要外掛 ✅ 支援 ✅ 支援
死信佇列 ❌ 需手動實作 ❌ 需手動實作 ✅ 原生支援 ✅ 原生支援 ✅ 原生支援
分區 ❌ 需手動分片 ✅ 自動分區 ✅ Exchange路由 ✅ 自動分區 ✅ 自動分區
運維成本 極低 高(ZooKeeper/KRaft) 中等 高(BookKeeper)
適用場景 中小規模微服務事件通信 大規模資料流處理 企業級訊息路由 雲原生輕量級訊息 多租戶大規模訊息

總結

Redis Streams在2026年依然是中小規模微服務事件驅動架構的最優輕量級選擇。它不需要Kafka那樣重的基礎設施,也不需要RabbitMQ那樣複雜的交換機配置,用6種核心模式就能覆蓋從基礎生產消費到生產級事件處理的全場景。關鍵記住三點:一定要用消費者組(XREADGROUP)處理完必須XACKStream必須設定MAXLEN防止記憶體溢位。當你需要Exactly-Once語義或百萬級吞吐時,再考慮升級到Kafka或Pulsar。


推薦工具

本站提供瀏覽器本地工具,免註冊即可試用 →

#Redis#Streams#事件驱动#消息队列#Consumer Group#2026#微服务