Redis Streams Event-Driven Architecture: 6 Production Patterns from Consumer Groups to Event Sourcing

数据库

Microservice Communication Dilemma: Kafka Is Overkill, Redis Streams Just Right

Order service needs to notify inventory deduction, payment service broadcasts transaction results, user registration triggers welcome emails — event-driven communication between microservices is everywhere. Deploy Kafka? Cluster ops, ZooKeeper/KRaft coordination, partition rebalancing... a small service can't handle that infrastructure. Use RabbitMQ? Exchange bindings, routing key configs, message acknowledgment... also complex. In 2026, Redis Streams as a lightweight event-driven solution is becoming the go-to choice for small-to-medium microservice communication.

This article covers 6 production patterns, guiding you through basic producer/consumer → consumer groups → event sourcing → CQRS read model → dead letter queue → production-grade event processing service with complete runnable Python and Go code.


Redis Streams Core Concepts

Concept Description
Stream Log-type data structure introduced in Redis 5.0, similar to Kafka Topic, stores messages in chronological order
XADD Append message to Stream, auto-generates timestamp ID (e.g., 1718432000000-0)
XREAD Read messages from Stream, supports blocking and non-blocking modes
Consumer Group Consumer group, similar to Kafka Consumer Group, ensures each message is processed by only one consumer in the group
XREADGROUP Read messages as a consumer group member, messages enter Pending state
XACK Acknowledge message processed, remove from Pending list
Pending Entries List (PEL) List of delivered but unacknowledged messages, used for failure recovery and retry
XPENDING View consumer group's Pending message statistics
XCLAIM Transfer Pending messages to another consumer, used for failover
Event Sourcing All state changes persisted as event sequences, can rebuild state at any point in time
CQRS Command Query Responsibility Segregation, write model (event stream) and read model (projection) evolve independently
Dead Letter Queue (DLQ) Dead letter queue for messages exceeding retry thresholds, prevents infinite retry blocking

Problem Analysis: 5 Major Event-Driven Architecture Challenges

  1. Message loss and duplicate consumption: When a consumer crashes, unACKed messages may be lost or redelivered, requiring Exactly-Once or At-Least-Once semantics
  2. Consumer group load imbalance: Slow consumers cause message accumulation while others are idle, requiring dynamic rebalancing and XCLAIM transfer
  3. Event sourcing snapshot and replay: Event streams grow infinitely, full replay is expensive, requiring periodic snapshots + incremental events
  4. CQRS read model consistency: Read model projections are eventually consistent, users may see stale data, requiring version tracking and compensation
  5. Dead letter and retry storms: Repeated processing failures cause retry storms, requiring exponential backoff + DLQ + manual intervention

Step-by-Step: 6 Redis Streams Event-Driven Patterns

Pattern 1: Basic Stream Producer/Consumer (XADD/XREAD)

The simplest producer-consumer model: one producer writes to Stream, one consumer reads.

import redis
import json
import time
import threading

r = redis.Redis(host="localhost", port=6379, decode_responses=True)

STREAM_NAME = "events:orders"

def producer():
    for i in range(1, 6):
        event = {
            "event_type": "order_created",
            "order_id": f"ORD-{i:04d}",
            "amount": 100 * i,
            "user_id": f"USR-{i:03d}",
            "timestamp": time.time()
        }
        msg_id = r.xadd(STREAM_NAME, event)
        print(f"[Producer] Added event: {msg_id} -> {event}")
        time.sleep(0.5)

def consumer():
    last_id = "0-0"
    while True:
        entries = r.xread({STREAM_NAME: last_id}, count=5, block=5000)
        if not entries:
            print("[Consumer] No new messages, waiting...")
            continue
        for stream_name, messages in entries:
            for msg_id, fields in messages:
                print(f"[Consumer] Processed: {msg_id} -> {fields}")
                last_id = msg_id

if __name__ == "__main__":
    threading.Thread(target=producer, daemon=True).start()
    consumer()

Go version:

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/redis/go-redis/v9"
)

var ctx = context.Background()

func main() {
	rdb := redis.NewClient(&redis.Options{
		Addr: "localhost:6379",
	})
	streamName := "events:orders"

	go producer(rdb, streamName)
	consumer(rdb, streamName)
}

func producer(rdb *redis.Client, streamName string) {
	for i := 1; i <= 5; i++ {
		values := map[string]interface{}{
			"event_type": "order_created",
			"order_id":   fmt.Sprintf("ORD-%04d", i),
			"amount":     fmt.Sprintf("%d", 100*i),
			"user_id":    fmt.Sprintf("USR-%03d", i),
			"timestamp":  fmt.Sprintf("%d", time.Now().UnixMilli()),
		}
		msgID, err := rdb.XAdd(ctx, &redis.XAddArgs{
			Stream: streamName,
			Values: values,
		}).Result()
		if err != nil {
			fmt.Printf("[Producer] Error: %v\n", err)
			continue
		}
		fmt.Printf("[Producer] Added event: %s\n", msgID)
		time.Sleep(500 * time.Millisecond)
	}
}

func consumer(rdb *redis.Client, streamName string) {
	lastID := "0-0"
	for {
		results, err := rdb.XRead(ctx, &redis.XReadArgs{
			Streams: []string{streamName, lastID},
			Count:   5,
			Block:   5 * time.Second,
		}).Result()
		if err != nil {
			if err == redis.Nil {
				fmt.Println("[Consumer] No new messages, waiting...")
				continue
			}
			fmt.Printf("[Consumer] Error: %v\n", err)
			continue
		}
		for _, stream := range results {
			for _, msg := range stream.Messages {
				fmt.Printf("[Consumer] Processed: %s -> %v\n", msg.ID, msg.Values)
				lastID = msg.ID
			}
		}
	}
}

Pattern 2: Consumer Group (XREADGROUP)

Consumer groups ensure each message is processed by only one consumer in the group, similar to Kafka's Consumer Group.

import redis
import time
import threading
import random

r = redis.Redis(host="localhost", port=6379, decode_responses=True)

STREAM_NAME = "events:payments"
GROUP_NAME = "payment-processors"

try:
    r.xgroup_create(STREAM_NAME, GROUP_NAME, id="0", mkstream=True)
    print(f"[Init] Created consumer group: {GROUP_NAME}")
except redis.ResponseError as e:
    if "BUSYGROUP" in str(e):
        print(f"[Init] Group already exists: {GROUP_NAME}")
    else:
        raise

def group_consumer(consumer_name: str):
    while True:
        entries = r.xreadgroup(
            GROUP_NAME, consumer_name,
            {STREAM_NAME: ">"},
            count=3, block=3000
        )
        if not entries:
            print(f"[{consumer_name}] No new messages")
            continue
        for stream_name, messages in entries:
            for msg_id, fields in messages:
                process_time = random.uniform(0.1, 0.5)
                time.sleep(process_time)
                r.xack(STREAM_NAME, GROUP_NAME, msg_id)
                print(f"[{consumer_name}] ACKed: {msg_id} -> {fields}")

def check_pending():
    pending = r.xpending_range(STREAM_NAME, GROUP_NAME, min="-", max="+", count=10)
    if pending:
        print(f"[Monitor] Pending messages: {len(pending)}")
        for p in pending:
            print(f"  msg_id={p['message_id']}, consumer={p['consumer']}, "
                  f"delivered={p['times_delivered']} times")

if __name__ == "__main__":
    for i in range(1, 4):
        for j in range(1, 3):
            r.xadd(STREAM_NAME, {
                "event_type": "payment_completed",
                "payment_id": f"PAY-{i:02d}{j:02d}",
                "amount": str(random.randint(50, 500)),
            })

    threading.Thread(target=group_consumer, args=("consumer-1",), daemon=True).start()
    threading.Thread(target=group_consumer, args=("consumer-2",), daemon=True).start()

    time.sleep(1)
    check_pending()
    time.sleep(5)

Go version:

package main

import (
	"context"
	"fmt"
	"math/rand"
	"time"

	"github.com/redis/go-redis/v9"
)

var ctx = context.Background()

func main() {
	rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
	streamName := "events:payments"
	groupName := "payment-processors"

	err := rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()
	if err != nil {
		fmt.Printf("[Init] Group create: %v\n", err)
	} else {
		fmt.Printf("[Init] Created consumer group: %s\n", groupName)
	}

	for i := 1; i <= 3; i++ {
		for j := 1; j <= 2; j++ {
			rdb.XAdd(ctx, &redis.XAddArgs{
				Stream: streamName,
				Values: map[string]interface{}{
					"event_type": "payment_completed",
					"payment_id": fmt.Sprintf("PAY-%02d%02d", i, j),
					"amount":     fmt.Sprintf("%d", rand.Intn(450)+50),
				},
			})
		}
	}

	go groupConsumer(rdb, streamName, groupName, "consumer-1")
	go groupConsumer(rdb, streamName, groupName, "consumer-2")
	time.Sleep(6 * time.Second)
}

func groupConsumer(rdb *redis.Client, stream, group, consumer string) {
	for {
		results, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
			Group:    group,
			Consumer: consumer,
			Streams:  []string{stream, ">"},
			Count:    3,
			Block:    3 * time.Second,
		}).Result()
		if err != nil {
			if err == redis.Nil {
				fmt.Printf("[%s] No new messages\n", consumer)
				continue
			}
			fmt.Printf("[%s] Error: %v\n", consumer, err)
			continue
		}
		for _, stream := range results {
			for _, msg := range stream.Messages {
				fmt.Printf("[%s] Processing: %s -> %v\n", consumer, msg.ID, msg.Values)
				rdb.XAck(ctx, stream, group, msg.ID)
				fmt.Printf("[%s] ACKed: %s\n", consumer, msg.ID)
			}
		}
	}
}

Pattern 3: Event Sourcing with Redis Streams

All state changes are persisted as event sequences in a Stream, enabling aggregate state reconstruction at any point in time.

import redis
import json
from dataclasses import dataclass, field
from typing import List, Dict

r = redis.Redis(host="localhost", port=6379, decode_responses=True)

@dataclass
class OrderState:
    order_id: str = ""
    status: str = "created"
    items: List[Dict] = field(default_factory=list)
    total: float = 0.0
    version: int = 0

class EventStore:
    def __init__(self, aggregate_type: str, aggregate_id: str):
        self.stream_key = f"events:{aggregate_type}:{aggregate_id}"
        self.aggregate_type = aggregate_type
        self.aggregate_id = aggregate_id

    def append(self, event_type: str, data: dict):
        event = {
            "event_type": event_type,
            "aggregate_id": self.aggregate_id,
            "data": json.dumps(data),
            "version": str(self._next_version()),
        }
        msg_id = r.xadd(self.stream_key, event)
        print(f"[EventStore] Appended: {msg_id} type={event_type} version={event['version']}")
        return msg_id

    def _next_version(self) -> int:
        info = r.xinfo_stream(self.stream_key)
        length = info.get("length", 0)
        return length + 1

    def load_events(self) -> List[dict]:
        entries = r.xrange(self.stream_key, "-", "+", count=1000)
        events = []
        for msg_id, fields in entries:
            events.append({
                "id": msg_id,
                "event_type": fields["event_type"],
                "data": json.loads(fields["data"]),
                "version": int(fields["version"]),
            })
        return events

class OrderAggregate:
    def __init__(self, order_id: str):
        self.order_id = order_id
        self.store = EventStore("order", order_id)
        self.state = OrderState(order_id=order_id)

    def create(self, items: List[Dict]):
        total = sum(item["price"] * item["qty"] for item in items)
        self.store.append("OrderCreated", {"items": items, "total": total})

    def add_item(self, item: Dict):
        self.store.append("ItemAdded", item)

    def pay(self):
        self.store.append("OrderPaid", {"payment_method": "credit_card"})

    def ship(self, tracking_number: str):
        self.store.append("OrderShipped", {"tracking_number": tracking_number})

    def rebuild(self) -> OrderState:
        events = self.store.load_events()
        state = OrderState(order_id=self.order_id)
        for event in events:
            self._apply(state, event)
        self.state = state
        return state

    def _apply(self, state: OrderState, event: dict):
        event_type = event["event_type"]
        data = event["data"]
        if event_type == "OrderCreated":
            state.items = data.get("items", [])
            state.total = data.get("total", 0)
            state.status = "created"
        elif event_type == "ItemAdded":
            state.items.append(data)
            state.total += data.get("price", 0) * data.get("qty", 1)
        elif event_type == "OrderPaid":
            state.status = "paid"
        elif event_type == "OrderShipped":
            state.status = "shipped"
        state.version = event["version"]

if __name__ == "__main__":
    order = OrderAggregate("ORD-0001")
    order.create([{"sku": "WIDGET-01", "price": 29.9, "qty": 2}])
    order.add_item({"sku": "GADGET-02", "price": 49.9, "qty": 1})
    order.pay()
    order.ship("SF1234567890")

    rebuilt = order.rebuild()
    print(f"\n[Rebuilt State] order_id={rebuilt.order_id} status={rebuilt.status} "
          f"total={rebuilt.total} items={len(rebuilt.items)} version={rebuilt.version}")

Go version:

package main

import (
	"context"
	"encoding/json"
	"fmt"

	"github.com/redis/go-redis/v9"
)

var ctx = context.Background()

type OrderState struct {
	OrderID string                   `json:"order_id"`
	Status  string                   `json:"status"`
	Items   []map[string]interface{} `json:"items"`
	Total   float64                  `json:"total"`
	Version int                      `json:"version"`
}

type Event struct {
	ID         string                 `json:"id"`
	EventType  string                 `json:"event_type"`
	Data       map[string]interface{} `json:"data"`
	Version    int                    `json:"version"`
}

type EventStore struct {
	rdb         *redis.Client
	StreamKey   string
	AggregateID string
}

func NewEventStore(rdb *redis.Client, aggregateType, aggregateID string) *EventStore {
	return &EventStore{
		rdb:         rdb,
		StreamKey:   fmt.Sprintf("events:%s:%s", aggregateType, aggregateID),
		AggregateID: aggregateID,
	}
}

func (s *EventStore) Append(eventType string, data map[string]interface{}) (string, error) {
	info, err := s.rdb.XInfoStream(ctx, s.StreamKey).Result()
	version := 1
	if err == nil {
		version = int(info.Length) + 1
	}
	dataJSON, _ := json.Marshal(data)
	msgID, err := s.rdb.XAdd(ctx, &redis.XAddArgs{
		Stream: s.StreamKey,
		Values: map[string]interface{}{
			"event_type":   eventType,
			"aggregate_id": s.AggregateID,
			"data":         string(dataJSON),
			"version":      fmt.Sprintf("%d", version),
		},
	}).Result()
	if err != nil {
		return "", err
	}
	fmt.Printf("[EventStore] Appended: %s type=%s version=%d\n", msgID, eventType, version)
	return msgID, nil
}

func (s *EventStore) LoadEvents() ([]Event, error) {
	entries, err := s.rdb.XRange(ctx, s.StreamKey, "-", "+").Result()
	if err != nil {
		return nil, err
	}
	var events []Event
	for _, msg := range entries {
		var data map[string]interface{}
		json.Unmarshal([]byte(msg.Values["data"].(string)), &data)
		var version int
		fmt.Sscanf(msg.Values["version"].(string), "%d", &version)
		events = append(events, Event{
			ID:        msg.ID,
			EventType: msg.Values["event_type"].(string),
			Data:      data,
			Version:   version,
		})
	}
	return events, nil
}

type OrderAggregate struct {
	OrderID string
	Store   *EventStore
	State   OrderState
}

func NewOrderAggregate(rdb *redis.Client, orderID string) *OrderAggregate {
	return &OrderAggregate{
		OrderID: orderID,
		Store:   NewEventStore(rdb, "order", orderID),
		State:   OrderState{OrderID: orderID, Status: "created"},
	}
}

func (a *OrderAggregate) Create(items []map[string]interface{}) {
	total := 0.0
	for _, item := range items {
		price, _ := item["price"].(float64)
		qty, _ := item["qty"].(float64)
		total += price * qty
	}
	a.Store.Append("OrderCreated", map[string]interface{}{
		"items": items, "total": total,
	})
}

func (a *OrderAggregate) Pay() {
	a.Store.Append("OrderPaid", map[string]interface{}{
		"payment_method": "credit_card",
	})
}

func (a *OrderAggregate) Ship(trackingNumber string) {
	a.Store.Append("OrderShipped", map[string]interface{}{
		"tracking_number": trackingNumber,
	})
}

func (a *OrderAggregate) Rebuild() (*OrderState, error) {
	events, err := a.Store.LoadEvents()
	if err != nil {
		return nil, err
	}
	state := OrderState{OrderID: a.OrderID, Status: "created"}
	for _, event := range events {
		switch event.EventType {
		case "OrderCreated":
			if items, ok := event.Data["items"].([]interface{}); ok {
				for _, item := range items {
					if m, ok := item.(map[string]interface{}); ok {
						state.Items = append(state.Items, m)
					}
				}
			}
			if total, ok := event.Data["total"].(float64); ok {
				state.Total = total
			}
			state.Status = "created"
		case "OrderPaid":
			state.Status = "paid"
		case "OrderShipped":
			state.Status = "shipped"
		}
		state.Version = event.Version
	}
	a.State = state
	return &state, nil
}

func main() {
	rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
	order := NewOrderAggregate(rdb, "ORD-0001")
	order.Create([]map[string]interface{}{
		{"sku": "WIDGET-01", "price": 29.9, "qty": 2},
	})
	order.Pay()
	order.Ship("SF1234567890")

	state, _ := order.Rebuild()
	fmt.Printf("\n[Rebuilt State] order_id=%s status=%s total=%.2f items=%d version=%d\n",
		state.OrderID, state.Status, state.Total, len(state.Items), state.Version)
}

Pattern 4: CQRS Read Model Projection

The write side publishes events to a Stream; the read side listens and updates projections (read models), achieving Command Query Responsibility Segregation.

import redis
import json
import time
import threading

r = redis.Redis(host="localhost", port=6379, decode_responses=True)

EVENT_STREAM = "events:orders"
PROJECTION_KEY = "projection:order_summary"
GROUP_NAME = "cqrs-projectors"

try:
    r.xgroup_create(EVENT_STREAM, GROUP_NAME, id="0", mkstream=True)
except redis.ResponseError:
    pass

def command_side_publish_events():
    events = [
        {"event_type": "OrderCreated", "order_id": "ORD-001", "amount": "299", "user_id": "USR-001"},
        {"event_type": "OrderCreated", "order_id": "ORD-002", "amount": "599", "user_id": "USR-002"},
        {"event_type": "OrderPaid", "order_id": "ORD-001", "amount": "299"},
        {"event_type": "OrderShipped", "order_id": "ORD-001"},
        {"event_type": "OrderCreated", "order_id": "ORD-003", "amount": "199", "user_id": "USR-001"},
    ]
    for event in events:
        r.xadd(EVENT_STREAM, event)
        print(f"[Command] Published: {event['event_type']} -> {event.get('order_id')}")
        time.sleep(0.3)

def query_side_projector():
    consumer_name = "projector-1"
    while True:
        entries = r.xreadgroup(
            GROUP_NAME, consumer_name,
            {EVENT_STREAM: ">"},
            count=5, block=3000
        )
        if not entries:
            continue
        for stream_name, messages in entries:
            for msg_id, fields in messages:
                update_projection(fields)
                r.xack(EVENT_STREAM, GROUP_NAME, msg_id)

def update_projection(event: dict):
    event_type = event.get("event_type")
    order_id = event.get("order_id", "")
    if event_type == "OrderCreated":
        r.hset(PROJECTION_KEY, order_id, json.dumps({
            "status": "created",
            "amount": event.get("amount", "0"),
            "user_id": event.get("user_id", ""),
        }))
    elif event_type == "OrderPaid":
        existing = r.hget(PROJECTION_KEY, order_id)
        if existing:
            data = json.loads(existing)
            data["status"] = "paid"
            r.hset(PROJECTION_KEY, order_id, json.dumps(data))
    elif event_type == "OrderShipped":
        existing = r.hget(PROJECTION_KEY, order_id)
        if existing:
            data = json.loads(existing)
            data["status"] = "shipped"
            r.hset(PROJECTION_KEY, order_id, json.dumps(data))
    print(f"[Projection] Updated: {order_id} <- {event_type}")

def query_read_model():
    time.sleep(3)
    all_orders = r.hgetall(PROJECTION_KEY)
    print("\n[Read Model] Order Summary:")
    for order_id, data in all_orders.items():
        print(f"  {order_id}: {data}")

if __name__ == "__main__":
    threading.Thread(target=command_side_publish_events, daemon=True).start()
    threading.Thread(target=query_side_projector, daemon=True).start()
    query_read_model()

Go version:

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"time"

	"github.com/redis/go-redis/v9"
)

var ctx = context.Background()

const (
	eventStream   = "events:orders"
	projectionKey = "projection:order_summary"
	groupName     = "cqrs-projectors"
)

func main() {
	rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
	rdb.XGroupCreateMkStream(ctx, eventStream, groupName, "0")

	go commandSidePublishEvents(rdb)
	go querySideProjector(rdb)
	time.Sleep(4 * time.Second)
	queryReadModel(rdb)
}

func commandSidePublishEvents(rdb *redis.Client) {
	events := []map[string]interface{}{
		{"event_type": "OrderCreated", "order_id": "ORD-001", "amount": "299", "user_id": "USR-001"},
		{"event_type": "OrderCreated", "order_id": "ORD-002", "amount": "599", "user_id": "USR-002"},
		{"event_type": "OrderPaid", "order_id": "ORD-001", "amount": "299"},
		{"event_type": "OrderShipped", "order_id": "ORD-001"},
		{"event_type": "OrderCreated", "order_id": "ORD-003", "amount": "199", "user_id": "USR-001"},
	}
	for _, event := range events {
		rdb.XAdd(ctx, &redis.XAddArgs{Stream: eventStream, Values: event})
		fmt.Printf("[Command] Published: %s -> %s\n", event["event_type"], event["order_id"])
		time.Sleep(300 * time.Millisecond)
	}
}

func querySideProjector(rdb *redis.Client) {
	for {
		results, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
			Group:    groupName,
			Consumer: "projector-1",
			Streams:  []string{eventStream, ">"},
			Count:    5,
			Block:    3 * time.Second,
		}).Result()
		if err != nil {
			continue
		}
		for _, stream := range results {
			for _, msg := range stream.Messages {
				updateProjection(rdb, msg.Values)
				rdb.XAck(ctx, eventStream, groupName, msg.ID)
			}
		}
	}
}

func updateProjection(rdb *redis.Client, fields map[string]interface{}) {
	eventType, _ := fields["event_type"].(string)
	orderID, _ := fields["order_id"].(string)
	switch eventType {
	case "OrderCreated":
		data, _ := json.Marshal(map[string]string{
			"status":  "created",
			"amount":  fmt.Sprintf("%v", fields["amount"]),
			"user_id": fmt.Sprintf("%v", fields["user_id"]),
		})
		rdb.HSet(ctx, projectionKey, orderID, string(data))
	case "OrderPaid":
		existing, _ := rdb.HGet(ctx, projectionKey, orderID).Result()
		if existing != "" {
			var data map[string]string
			json.Unmarshal([]byte(existing), &data)
			data["status"] = "paid"
			updated, _ := json.Marshal(data)
			rdb.HSet(ctx, projectionKey, orderID, string(updated))
		}
	case "OrderShipped":
		existing, _ := rdb.HGet(ctx, projectionKey, orderID).Result()
		if existing != "" {
			var data map[string]string
			json.Unmarshal([]byte(existing), &data)
			data["status"] = "shipped"
			updated, _ := json.Marshal(data)
			rdb.HSet(ctx, projectionKey, orderID, string(updated))
		}
	}
	fmt.Printf("[Projection] Updated: %s <- %s\n", orderID, eventType)
}

func queryReadModel(rdb *redis.Client) {
	orders, _ := rdb.HGetAll(ctx, projectionKey).Result()
	fmt.Println("\n[Read Model] Order Summary:")
	for orderID, data := range orders {
		fmt.Printf("  %s: %s\n", orderID, data)
	}
}

Pattern 5: Dead Letter Queue and Retry Mechanism

Messages that fail processing beyond the retry threshold are moved to a dead letter queue, preventing infinite retry blocking.

import redis
import json
import time
import random

r = redis.Redis(host="localhost", port=6379, decode_responses=True)

MAIN_STREAM = "events:orders"
DLQ_STREAM = "events:orders:dlq"
GROUP_NAME = "order-processors"
MAX_RETRIES = 3
RETRY_DELAYS = [1, 5, 30]

try:
    r.xgroup_create(MAIN_STREAM, GROUP_NAME, id="0", mkstream=True)
except redis.ResponseError:
    pass

def process_message(fields: dict) -> bool:
    order_id = fields.get("order_id", "")
    if random.random() < 0.4:
        print(f"  [FAIL] Processing failed for {order_id}")
        return False
    print(f"  [OK] Processed {order_id}")
    return True

def consumer_with_retry(consumer_name: str):
    while True:
        entries = r.xreadgroup(
            GROUP_NAME, consumer_name,
            {MAIN_STREAM: ">"},
            count=1, block=3000
        )
        if not entries:
            continue
        for stream_name, messages in entries:
            for msg_id, fields in messages:
                success = process_message(fields)
                if success:
                    r.xack(MAIN_STREAM, GROUP_NAME, msg_id)
                else:
                    pending_info = r.xpending_range(
                        MAIN_STREAM, GROUP_NAME,
                        min=msg_id, max=msg_id, count=1
                    )
                    retry_count = 0
                    if pending_info:
                        retry_count = pending_info[0].get("times_delivered", 1) - 1
                    if retry_count >= MAX_RETRIES:
                        r.xadd(DLQ_STREAM, {
                            **fields,
                            "original_id": msg_id,
                            "failure_reason": "max_retries_exceeded",
                            "retry_count": str(retry_count),
                        })
                        r.xack(MAIN_STREAM, GROUP_NAME, msg_id)
                        print(f"  [DLQ] Moved to dead letter: {msg_id}")
                    else:
                        delay = RETRY_DELAYS[min(retry_count, len(RETRY_DELAYS) - 1)]
                        print(f"  [RETRY] Will retry {msg_id} after {delay}s "
                              f"(attempt {retry_count + 1}/{MAX_RETRIES})")
                        r.xack(MAIN_STREAM, GROUP_NAME, msg_id)
                        time.sleep(delay)
                        r.xadd(MAIN_STREAM, fields)

def inspect_dlq():
    dlq_entries = r.xrange(DLQ_STREAM, "-", "+", count=20)
    print(f"\n[DLQ] {len(dlq_entries)} dead letters:")
    for msg_id, fields in dlq_entries:
        print(f"  {msg_id}: order_id={fields.get('order_id')} "
              f"retries={fields.get('retry_count')} reason={fields.get('failure_reason')}")

if __name__ == "__main__":
    for i in range(1, 11):
        r.xadd(MAIN_STREAM, {"order_id": f"ORD-{i:04d}", "amount": str(i * 100)})

    random.seed(42)
    consumer_with_retry("consumer-1")

Go version:

package main

import (
	"context"
	"fmt"
	"math/rand"
	"time"

	"github.com/redis/go-redis/v9"
)

var ctx = context.Background()

const (
	mainStream  = "events:orders"
	dlqStream   = "events:orders:dlq"
	groupName   = "order-processors"
	maxRetries  = 3
)

var retryDelays = []time.Duration{1 * time.Second, 5 * time.Second, 30 * time.Second}

func main() {
	rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
	rdb.XGroupCreateMkStream(ctx, mainStream, groupName, "0")

	for i := 1; i <= 10; i++ {
		rdb.XAdd(ctx, &redis.XAddArgs{
			Stream: mainStream,
			Values: map[string]interface{}{
				"order_id": fmt.Sprintf("ORD-%04d", i),
				"amount":   fmt.Sprintf("%d", i*100),
			},
		})
	}

	consumerWithRetry(rdb, "consumer-1")
}

func processMessage(fields map[string]interface{}) bool {
	orderID := fmt.Sprintf("%v", fields["order_id"])
	if rand.Float64() < 0.4 {
		fmt.Printf("  [FAIL] Processing failed for %s\n", orderID)
		return false
	}
	fmt.Printf("  [OK] Processed %s\n", orderID)
	return true
}

func consumerWithRetry(rdb *redis.Client, consumerName string) {
	for {
		results, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
			Group:    groupName,
			Consumer: consumerName,
			Streams:  []string{mainStream, ">"},
			Count:    1,
			Block:    3 * time.Second,
		}).Result()
		if err != nil {
			continue
		}
		for _, stream := range results {
			for _, msg := range stream.Messages {
				if processMessage(msg.Values) {
					rdb.XAck(ctx, mainStream, groupName, msg.ID)
				} else {
					pending, _ := rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
						Stream: mainStream,
						Group:  groupName,
						Start:  msg.ID,
						End:    msg.ID,
						Count:  1,
					}).Result()
					retryCount := 0
					if len(pending) > 0 {
						retryCount = int(pending[0].DeliveryCount) - 1
					}
					if retryCount >= maxRetries {
						dlqValues := map[string]interface{}{
							"original_id":    msg.ID,
							"failure_reason": "max_retries_exceeded",
							"retry_count":    fmt.Sprintf("%d", retryCount),
						}
						for k, v := range msg.Values {
							dlqValues[k] = v
						}
						rdb.XAdd(ctx, &redis.XAddArgs{Stream: dlqStream, Values: dlqValues})
						rdb.XAck(ctx, mainStream, groupName, msg.ID)
						fmt.Printf("  [DLQ] Moved to dead letter: %s\n", msg.ID)
					} else {
						delay := retryDelays[min(retryCount, len(retryDelays)-1)]
						fmt.Printf("  [RETRY] Will retry %s after %v (attempt %d/%d)\n",
							msg.ID, delay, retryCount+1, maxRetries)
						rdb.XAck(ctx, mainStream, groupName, msg.ID)
						time.Sleep(delay)
						rdb.XAdd(ctx, &redis.XAddArgs{Stream: mainStream, Values: msg.Values})
					}
				}
			}
		}
	}
}

Pattern 6: Production Event Processing Service (Go Implementation)

A complete production-grade event processing service with graceful shutdown, health checks, metrics monitoring, and XCLAIM failover.

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"os/signal"
	"sync"
	"sync/atomic"
	"syscall"
	"time"

	"github.com/redis/go-redis/v9"
)

type EventProcessor struct {
	rdb          *redis.Client
	stream       string
	group        string
	consumer     string
	dlqStream    string
	maxRetries   int
	processed    atomic.Int64
	failed       atomic.Int64
	retried      atomic.Int64
	mu           sync.Mutex
	running      bool
	ctx          context.Context
	cancel       context.CancelFunc
}

func NewEventProcessor(rdb *redis.Client, stream, group, consumer string) *EventProcessor {
	ctx, cancel := context.WithCancel(context.Background())
	return &EventProcessor{
		rdb:        rdb,
		stream:     stream,
		group:      group,
		consumer:   consumer,
		dlqStream:  stream + ":dlq",
		maxRetries: 3,
		running:    true,
		ctx:        ctx,
		cancel:     cancel,
	}
}

func (p *EventProcessor) Start() {
	rdb := p.rdb
	rdb.XGroupCreateMkStream(p.ctx, p.stream, p.group, "0")

	log.Printf("[Processor] Starting consumer %s for stream %s", p.consumer, p.stream)

	go p.claimPendingMessages()
	go p.consumeNewMessages()
	go p.reportMetrics()
}

func (p *EventProcessor) consumeNewMessages() {
	for p.running {
		results, err := p.rdb.XReadGroup(p.ctx, &redis.XReadGroupArgs{
			Group:    p.group,
			Consumer: p.consumer,
			Streams:  []string{p.stream, ">"},
			Count:    10,
			Block:    2 * time.Second,
		}).Result()
		if err != nil {
			if err == redis.Nil || p.ctx.Err() != nil {
				continue
			}
			log.Printf("[Consumer] Error: %v", err)
			continue
		}
		for _, stream := range results {
			for _, msg := range stream.Messages {
				p.handleMessage(msg.ID, msg.Values)
			}
		}
	}
}

func (p *EventProcessor) claimPendingMessages() {
	ticker := time.NewTicker(30 * time.Second)
	defer ticker.Stop()
	for {
		select {
		case <-ticker.C:
			pending, err := p.rdb.XPendingExt(p.ctx, &redis.XPendingExtArgs{
				Stream: p.stream,
				Group:  p.group,
				Start:  "-",
				End:    "+",
				Count:  10,
				Idle:   60 * time.Second,
			}).Result()
			if err != nil {
				continue
			}
			for _, msg := range pending {
				claimed, err := p.rdb.XClaim(p.ctx, &redis.XClaimArgs{
					Stream:   p.stream,
					Group:    p.group,
					Consumer: p.consumer,
					MinIdle:  60 * time.Second,
					Messages: []string{msg.ID},
				}).Result()
				if err == nil && len(claimed) > 0 {
					log.Printf("[Claim] Claimed pending message: %s", claimed[0].ID)
					p.retried.Add(1)
					p.handleMessage(claimed[0].ID, claimed[0].Values)
				}
			}
		case <-p.ctx.Done():
			return
		}
	}
}

func (p *EventProcessor) handleMessage(msgID string, values map[string]interface{}) {
	err := p.processEvent(values)
	if err != nil {
		p.failed.Add(1)
		pending, _ := p.rdb.XPendingExt(p.ctx, &redis.XPendingExtArgs{
			Stream: p.stream,
			Group:  p.group,
			Start:  msgID,
			End:    msgID,
			Count:  1,
		}).Result()
		deliveryCount := 1
		if len(pending) > 0 {
			deliveryCount = int(pending[0].DeliveryCount)
		}
		if deliveryCount >= p.maxRetries {
			dlqValues := map[string]interface{}{
				"original_id":    msgID,
				"failure_reason": err.Error(),
				"retry_count":    fmt.Sprintf("%d", deliveryCount),
			}
			for k, v := range values {
				dlqValues[k] = v
			}
			p.rdb.XAdd(p.ctx, &redis.XAddArgs{Stream: p.dlqStream, Values: dlqValues})
			p.rdb.XAck(p.ctx, p.stream, p.group, msgID)
			log.Printf("[DLQ] Moved %s to dead letter after %d attempts", msgID, deliveryCount)
		} else {
			log.Printf("[Retry] Message %s failed (attempt %d/%d): %v",
				msgID, deliveryCount, p.maxRetries, err)
		}
		return
	}
	p.rdb.XAck(p.ctx, p.stream, p.group, msgID)
	p.processed.Add(1)
}

func (p *EventProcessor) processEvent(values map[string]interface{}) error {
	eventType, _ := values["event_type"].(string)
	switch eventType {
	case "order_created":
		return p.handleOrderCreated(values)
	case "payment_completed":
		return p.handlePaymentCompleted(values)
	default:
		log.Printf("[Process] Unknown event type: %s", eventType)
		return nil
	}
}

func (p *EventProcessor) handleOrderCreated(values map[string]interface{}) error {
	orderID := fmt.Sprintf("%v", values["order_id"])
	log.Printf("[Process] Order created: %s", orderID)
	return nil
}

func (p *EventProcessor) handlePaymentCompleted(values map[string]interface{}) error {
	paymentID := fmt.Sprintf("%v", values["payment_id"])
	log.Printf("[Process] Payment completed: %s", paymentID)
	return nil
}

func (p *EventProcessor) reportMetrics() {
	ticker := time.NewTicker(5 * time.Second)
	defer ticker.Stop()
	for {
		select {
		case <-ticker.C:
			log.Printf("[Metrics] processed=%d failed=%d retried=%d",
				p.processed.Load(), p.failed.Load(), p.retried.Load())
		case <-p.ctx.Done():
			return
		}
	}
}

func (p *EventProcessor) Stop() {
	log.Println("[Processor] Shutting down gracefully...")
	p.running = false
	p.cancel()
	time.Sleep(2 * time.Second)
	log.Printf("[Processor] Final metrics: processed=%d failed=%d retried=%d",
		p.processed.Load(), p.failed.Load(), p.retried.Load())
}

func main() {
	rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
	processor := NewEventProcessor(rdb, "events:orders", "order-processors", "worker-1")
	processor.Start()

	for i := 1; i <= 20; i++ {
		rdb.XAdd(context.Background(), &redis.XAddArgs{
			Stream: "events:orders",
			Values: map[string]interface{}{
				"event_type": "order_created",
				"order_id":   fmt.Sprintf("ORD-%04d", i),
				"amount":     fmt.Sprintf("%d", i*100),
			},
		})
	}

	sigCh := make(chan os.Signal, 1)
	signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
	<-sigCh
	processor.Stop()
}

Pitfall Guide: 5 Common Mistakes

❌ Pitfall 1: Using XREAD without consumer groups causes duplicate consumption

# ❌ Wrong: Multiple consumers independently XREAD, every message processed by all consumers
def bad_consumer():
    entries = r.xread({STREAM_NAME: "0-0"}, count=10, block=5000)
    for _, messages in entries:
        for msg_id, fields in messages:
            process(fields)
# ✅ Correct: Use XREADGROUP, consumer group ensures each message processed by only one consumer
def good_consumer():
    entries = r.xreadgroup(
        GROUP_NAME, "consumer-1",
        {STREAM_NAME: ">"},
        count=10, block=5000
    )
    for _, messages in entries:
        for msg_id, fields in messages:
            process(fields)
            r.xack(STREAM_NAME, GROUP_NAME, msg_id)

❌ Pitfall 2: Forgetting XACK leaves messages in Pending list forever

// ❌ Wrong: Process message without ACK, message stays Pending indefinitely
func badConsumer(rdb *redis.Client) {
    results, _ := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
        Group:    groupName,
        Consumer: "consumer-1",
        Streams:  []string{streamName, ">"},
        Count:    10,
    }).Result()
    for _, stream := range results {
        for _, msg := range stream.Messages {
            processEvent(msg.Values)
            // Forgot ACK!
        }
    }
}
// ✅ Correct: ACK immediately after successful processing
func goodConsumer(rdb *redis.Client) {
    results, _ := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
        Group:    groupName,
        Consumer: "consumer-1",
        Streams:  []string{streamName, ">"},
        Count:    10,
    }).Result()
    for _, stream := range results {
        for _, msg := range stream.Messages {
            if err := processEvent(msg.Values); err == nil {
                rdb.XAck(ctx, streamName, groupName, msg.ID)
            }
        }
    }
}

❌ Pitfall 3: Stream grows infinitely causing memory overflow

# ❌ Wrong: Only write, never trim, Stream grows infinitely
r.xadd("events:orders", {"data": "..."})
# Never call XTRIM or XDEL
# ✅ Correct: Use MAXLEN to limit Stream length, or periodically XTRIM
r.xadd("events:orders", {"data": "..."}, maxlen=10000, approximate=True)
# Or manually trim
r.xtrim("events:orders", maxlen=10000, approximate=True)

❌ Pitfall 4: XCLAIM without checking idle time

// ❌ Wrong: Blindly XCLAIM all Pending messages, may steal messages being processed
func badClaim(rdb *redis.Client) {
    pending, _ := rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
        Stream: streamName,
        Group:  groupName,
        Start:  "-",
        End:    "+",
        Count:  100,
    }).Result()
    for _, msg := range pending {
        rdb.XClaim(ctx, &redis.XClaimArgs{
            Stream:   streamName,
            Group:    groupName,
            Consumer: "consumer-2",
            Messages: []string{msg.ID},
        })
    }
}
// ✅ Correct: Only claim Pending messages idle beyond threshold
func goodClaim(rdb *redis.Client) {
    pending, _ := rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
        Stream: streamName,
        Group:  groupName,
        Start:  "-",
        End:    "+",
        Count:  100,
        Idle:   60 * time.Second, // Only claim idle > 60s
    }).Result()
    for _, msg := range pending {
        rdb.XClaim(ctx, &redis.XClaimArgs{
            Stream:   streamName,
            Group:    groupName,
            Consumer: "consumer-2",
            MinIdle:  60 * time.Second,
            Messages: []string{msg.ID},
        })
    }
}

❌ Pitfall 5: Wrong consumer group creation ID

# ❌ Wrong: Using $ creates group that only consumes new messages, historical messages lost
r.xgroup_create(STREAM_NAME, GROUP_NAME, id="$")
# ✅ Correct: Using 0 creates group that consumes all messages from the beginning
r.xgroup_create(STREAM_NAME, GROUP_NAME, id="0")
# If you truly only want new messages, explicitly comment the intent
r.xgroup_create(STREAM_NAME, GROUP_NAME, id="$")  # Only consume new messages

Error Troubleshooting Table

Error Possible Cause Diagnostic Command Solution
NOGROUP No such key Stream or consumer group doesn't exist XINFO GROUPS events:orders Create group first with XGROUP CREATE, add MKSTREAM to auto-create Stream
BUSYGROUP Consumer Group name already exists Duplicate consumer group creation XINFO GROUPS events:orders Catch and ignore exception, or check group existence first
Consumer receives no messages XREADGROUP using 0-0 instead of > Check Streams parameter in code Use > for new messages, 0-0 for unacknowledged messages
Pending messages accumulating Consumer crashed without ACK XPENDING events:orders group-name Use XCLAIM to transfer to active consumers
Stream memory keeps growing No MAXLEN/XTRIM set XINFO STREAM events:orders check length Add MAXLEN ~ 10000 to XADD or periodically XTRIM
Messages consumed multiple times Consumer processed but ACK failed Check if ACK executes after processing Ensure ACK runs after successful business logic
XCLAIM messages still processed by original consumer MinIdle set too short XPENDING to check message idle time Set reasonable MinIdle threshold (e.g., 60 seconds)
Event sourcing replay slow Too many events, full XRANGE XLEN events:order:ORD-001 Periodic snapshots + incremental events, use XRANGE by ID range
CQRS read model inconsistent Projection consumer delayed or crashed XINFO CONSUMERS check consumer lag Monitor consumer lag, set alert thresholds
XADD returns wrong ID format Redis version below 5.0 redis-server --version Upgrade Redis to 5.0+

Advanced Optimization

1. Stream Sharding and Partitioning Strategy

When a single Stream can't handle throughput, shard by business key across multiple Streams:

import hashlib

def get_shard_stream(base_stream: str, key: str, shard_count: int = 16) -> str:
    shard_index = int(hashlib.md5(key.encode()).hexdigest(), 16) % shard_count
    return f"{base_stream}:shard-{shard_index:03d}"

order_id = "ORD-0001"
stream = get_shard_stream("events:orders", order_id)
r.xadd(stream, {"order_id": order_id, "event_type": "order_created"})

2. Consumer Lag Monitoring and Alerting

func monitorConsumerLag(rdb *redis.Client, stream, group string) {
    ticker := time.NewTicker(10 * time.Second)
    for range ticker.C {
        info, err := rdb.XInfoStream(ctx, stream).Result()
        if err != nil {
            continue
        }
        groups, _ := rdb.XInfoGroups(ctx, stream).Result()
        for _, g := range groups {
            if g.Name == group {
                pending := g.Pending
                consumers, _ := rdb.XInfoConsumers(ctx, stream, group).Result()
                for _, c := range consumers {
                    if c.Pending > 100 {
                        log.Printf("[ALERT] Consumer %s has %d pending messages!",
                            c.Name, c.Pending)
                    }
                    if c.Idle > 5*time.Minute {
                        log.Printf("[WARN] Consumer %s idle for %v", c.Name, c.Idle)
                    }
                }
                if pending > 1000 {
                    log.Printf("[ALERT] Group %s has %d pending messages!", group, pending)
                }
                _ = info
            }
        }
    }
}

3. Event Snapshots and Incremental Replay

import json

SNAPSHOT_INTERVAL = 100

def save_snapshot(aggregate_id: str, state: dict, version: int):
    snapshot_key = f"snapshot:order:{aggregate_id}"
    r.hset(snapshot_key, mapping={
        "state": json.dumps(state),
        "version": str(version),
        "timestamp": str(time.time()),
    })

def load_from_snapshot(aggregate_id: str):
    snapshot_key = f"snapshot:order:{aggregate_id}"
    data = r.hgetall(snapshot_key)
    if not data:
        return None, 0
    state = json.loads(data["state"])
    version = int(data["version"])
    return state, version

def rebuild_with_snapshot(aggregate_id: str):
    state, version = load_from_snapshot(aggregate_id)
    stream_key = f"events:order:{aggregate_id}"
    if state is None:
        events = r.xrange(stream_key, "-", "+")
    else:
        min_id = f"{version + 1}-0"
        events = r.xrange(stream_key, min_id, "+")
    for msg_id, fields in events:
        state = apply_event(state or {}, fields)
    return state

Comparison: Redis Streams vs Other Messaging Systems

Feature Redis Streams Kafka RabbitMQ NATS JetStream Pulsar
Positioning Lightweight event stream Distributed event streaming platform Message broker Cloud-native messaging Distributed messaging platform
Deployment complexity ★☆☆☆☆ ★★★★★ ★★★☆☆ ★★☆☆☆ ★★★★☆
Throughput 100-500K/s 1M+/s 50-100K/s 500K-1M/s 1M+/s
Message persistence AOF/RDB Disk log Optional persistence Disk log BookKeeper
Consumer groups ✅ Native ✅ Native ❌ Plugin needed ✅ Native ✅ Native
Message replay ✅ XRANGE ✅ Offset reset ❌ Not supported ✅ Supported ✅ Supported
Exactly-Once ❌ At-Least-Once ✅ Transaction support ❌ Plugin needed ✅ Supported ✅ Supported
Dead letter queue ❌ Manual implementation ❌ Manual implementation ✅ Native ✅ Native ✅ Native
Partitioning ❌ Manual sharding ✅ Auto partitioning ✅ Exchange routing ✅ Auto partitioning ✅ Auto partitioning
Ops cost Very low High (ZooKeeper/KRaft) Medium Low High (BookKeeper)
Use case Small-to-medium microservice event communication Large-scale data stream processing Enterprise message routing Cloud-native lightweight messaging Multi-tenant large-scale messaging

Summary

Redis Streams in 2026 remains the optimal lightweight choice for small-to-medium microservice event-driven architecture. It doesn't require Kafka's heavy infrastructure or RabbitMQ's complex exchange configuration, and 6 core patterns cover everything from basic producer/consumer to production-grade event processing. Remember three key points: always use consumer groups (XREADGROUP), always XACK after processing, and always set MAXLEN to prevent memory overflow. When you need Exactly-Once semantics or million-level throughput, then consider upgrading to Kafka or Pulsar.


Try these browser-local tools — no sign-up required →

#Redis#Streams#事件驱动#消息队列#Consumer Group#2026#微服务