Redis Streamsイベント駆動アーキテクチャ実践:コンシューマーグループからイベントソーシングまで6つのプロダクションパターン

数据库

マイクロサービス通信のジレンマ:Kafkaは重すぎる、Redis Streamsがちょうどいい

注文サービスは在庫引き当てに通知し、決済サービスは取引結果をブロードキャストし、ユーザー登録はウェルカムメールをトリガーする——マイクロサービス間のイベント通信は至る所に存在する。Kafkaを導入する?クラスタ運用、ZooKeeper/KRaft調整、パーティションリバランス……小さなサービスには重すぎるインフラだ。RabbitMQを使う?Exchangeバインディング、ルーティングキー設定、メッセージ確認……複雑さも低くない。2026年、Redis Streamsは軽量イベント駆動ソリューションとして、中小規模マイクロサービス通信の選択肢になりつつある。

本記事では6つのプロダクションパターンを通じて、基本プロデューサー/コンシューマー→コンシューマーグループ→イベントソーシング→CQRS読み取りモデル→デッドレターキュー→プロダクション級イベント処理サービスの全チェーン実践を解説する。各ステップには完全な実行可能なPythonとGoコードを含む。


Redis Streamsコア概念

概念 説明
Stream Redis 5.0で導入されたログ型データ構造、Kafka Topicに類似、時系列でメッセージを格納
XADD Streamにメッセージを追加、タイムスタンプIDを自動生成(例:1718432000000-0
XREAD Streamからメッセージを読み取り、ブロッキング/ノンブロッキングモード対応
Consumer Group コンシューマーグループ、Kafka Consumer Groupに類似、各メッセージがグループ内の1コンシューマーのみで処理されることを保証
XREADGROUP コンシューマーグループとしてメッセージを読み取り、メッセージはPending状態に移行
XACK メッセージ処理完了を確認、Pendingリストから削除
Pending Entries List (PEL) 配信済みだが未確認のメッセージリスト、障害復旧とリトライに使用
XPENDING コンシューマーグループのPendingメッセージ統計を表示
XCLAIM Pendingメッセージを他のコンシューマーに移管、フェイルオーバーに使用
Event Sourcing イベントソーシングパターン、全状態変更をイベントシーケンスとして永続化、任意時点の状態を再構築可能
CQRS Command Query Responsibility Segregation、書き込みモデル(イベントストリーム)と読み取りモデル(プロジェクション)が独立して進化
Dead Letter Queue (DLQ) デッドレターキュー、リトライ閾値を超えた失敗メッセージを処理、無限リトライによる消費ブロックを防止

問題分析:イベント駆動アーキテクチャの5つの課題

  1. メッセージ損失と重複消費:コンシューマークラッシュ時、未ACKメッセージが損失または再配信される可能性、Exactly-OnceまたはAt-Least-Onceセマンティクス保証が必要
  2. コンシューマーグループ負荷不均衡:処理の遅いコンシューマーがメッセージ蓄積を引き起こし、他のコンシューマーがアイドル状態に、動的リバランスとXCLAIM移管が必要
  3. イベントソーシングのスナップショットとリプレイ:イベントストリームが無限に増大、フルリプレイのコストが高い、定期スナップショット+増分イベントの組み合わせ戦略が必要
  4. CQRS読み取りモデルの整合性:読み取りモデルプロジェクションは結果整合性、ユーザーが見るデータが遅延する可能性、バージョントラッキングと補償メカニズムが必要
  5. デッドレターとリトライストーム:メッセージ処理の繰り返し失敗がリトライストームを引き起こす、指数バックオフ+DLQ+手動介入の完全な戦略が必要

ステップバイステップ:6つのRedis Streamsイベント駆動パターン

パターン1:基本Streamプロデューサー/コンシューマー(XADD/XREAD)

最もシンプルなプロデューサー・コンシューマーモデル:1つのプロデューサーがStreamに書き込み、1つのコンシューマーが読み取る。

import redis
import json
import time
import threading

r = redis.Redis(host="localhost", port=6379, decode_responses=True)

STREAM_NAME = "events:orders"

def producer():
    for i in range(1, 6):
        event = {
            "event_type": "order_created",
            "order_id": f"ORD-{i:04d}",
            "amount": 100 * i,
            "user_id": f"USR-{i:03d}",
            "timestamp": time.time()
        }
        msg_id = r.xadd(STREAM_NAME, event)
        print(f"[Producer] Added event: {msg_id} -> {event}")
        time.sleep(0.5)

def consumer():
    last_id = "0-0"
    while True:
        entries = r.xread({STREAM_NAME: last_id}, count=5, block=5000)
        if not entries:
            print("[Consumer] No new messages, waiting...")
            continue
        for stream_name, messages in entries:
            for msg_id, fields in messages:
                print(f"[Consumer] Processed: {msg_id} -> {fields}")
                last_id = msg_id

if __name__ == "__main__":
    threading.Thread(target=producer, daemon=True).start()
    consumer()

Go版:

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/redis/go-redis/v9"
)

var ctx = context.Background()

func main() {
	rdb := redis.NewClient(&redis.Options{
		Addr: "localhost:6379",
	})
	streamName := "events:orders"

	go producer(rdb, streamName)
	consumer(rdb, streamName)
}

func producer(rdb *redis.Client, streamName string) {
	for i := 1; i <= 5; i++ {
		values := map[string]interface{}{
			"event_type": "order_created",
			"order_id":   fmt.Sprintf("ORD-%04d", i),
			"amount":     fmt.Sprintf("%d", 100*i),
			"user_id":    fmt.Sprintf("USR-%03d", i),
			"timestamp":  fmt.Sprintf("%d", time.Now().UnixMilli()),
		}
		msgID, err := rdb.XAdd(ctx, &redis.XAddArgs{
			Stream: streamName,
			Values: values,
		}).Result()
		if err != nil {
			fmt.Printf("[Producer] Error: %v\n", err)
			continue
		}
		fmt.Printf("[Producer] Added event: %s\n", msgID)
		time.Sleep(500 * time.Millisecond)
	}
}

func consumer(rdb *redis.Client, streamName string) {
	lastID := "0-0"
	for {
		results, err := rdb.XRead(ctx, &redis.XReadArgs{
			Streams: []string{streamName, lastID},
			Count:   5,
			Block:   5 * time.Second,
		}).Result()
		if err != nil {
			if err == redis.Nil {
				fmt.Println("[Consumer] No new messages, waiting...")
				continue
			}
			fmt.Printf("[Consumer] Error: %v\n", err)
			continue
		}
		for _, stream := range results {
			for _, msg := range stream.Messages {
				fmt.Printf("[Consumer] Processed: %s -> %v\n", msg.ID, msg.Values)
				lastID = msg.ID
			}
		}
	}
}

パターン2:コンシューマーグループ(XREADGROUP)

コンシューマーグループは各メッセージがグループ内の1コンシューマーのみで処理されることを保証する。KafkaのConsumer Groupに類似。

import redis
import time
import threading
import random

r = redis.Redis(host="localhost", port=6379, decode_responses=True)

STREAM_NAME = "events:payments"
GROUP_NAME = "payment-processors"

try:
    r.xgroup_create(STREAM_NAME, GROUP_NAME, id="0", mkstream=True)
    print(f"[Init] Created consumer group: {GROUP_NAME}")
except redis.ResponseError as e:
    if "BUSYGROUP" in str(e):
        print(f"[Init] Group already exists: {GROUP_NAME}")
    else:
        raise

def group_consumer(consumer_name: str):
    while True:
        entries = r.xreadgroup(
            GROUP_NAME, consumer_name,
            {STREAM_NAME: ">"},
            count=3, block=3000
        )
        if not entries:
            print(f"[{consumer_name}] No new messages")
            continue
        for stream_name, messages in entries:
            for msg_id, fields in messages:
                process_time = random.uniform(0.1, 0.5)
                time.sleep(process_time)
                r.xack(STREAM_NAME, GROUP_NAME, msg_id)
                print(f"[{consumer_name}] ACKed: {msg_id} -> {fields}")

def check_pending():
    pending = r.xpending_range(STREAM_NAME, GROUP_NAME, min="-", max="+", count=10)
    if pending:
        print(f"[Monitor] Pending messages: {len(pending)}")
        for p in pending:
            print(f"  msg_id={p['message_id']}, consumer={p['consumer']}, "
                  f"delivered={p['times_delivered']} times")

if __name__ == "__main__":
    for i in range(1, 4):
        for j in range(1, 3):
            r.xadd(STREAM_NAME, {
                "event_type": "payment_completed",
                "payment_id": f"PAY-{i:02d}{j:02d}",
                "amount": str(random.randint(50, 500)),
            })

    threading.Thread(target=group_consumer, args=("consumer-1",), daemon=True).start()
    threading.Thread(target=group_consumer, args=("consumer-2",), daemon=True).start()

    time.sleep(1)
    check_pending()
    time.sleep(5)

Go版:

package main

import (
	"context"
	"fmt"
	"math/rand"
	"time"

	"github.com/redis/go-redis/v9"
)

var ctx = context.Background()

func main() {
	rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
	streamName := "events:payments"
	groupName := "payment-processors"

	err := rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()
	if err != nil {
		fmt.Printf("[Init] Group create: %v\n", err)
	} else {
		fmt.Printf("[Init] Created consumer group: %s\n", groupName)
	}

	for i := 1; i <= 3; i++ {
		for j := 1; j <= 2; j++ {
			rdb.XAdd(ctx, &redis.XAddArgs{
				Stream: streamName,
				Values: map[string]interface{}{
					"event_type": "payment_completed",
					"payment_id": fmt.Sprintf("PAY-%02d%02d", i, j),
					"amount":     fmt.Sprintf("%d", rand.Intn(450)+50),
				},
			})
		}
	}

	go groupConsumer(rdb, streamName, groupName, "consumer-1")
	go groupConsumer(rdb, streamName, groupName, "consumer-2")
	time.Sleep(6 * time.Second)
}

func groupConsumer(rdb *redis.Client, stream, group, consumer string) {
	for {
		results, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
			Group:    group,
			Consumer: consumer,
			Streams:  []string{stream, ">"},
			Count:    3,
			Block:    3 * time.Second,
		}).Result()
		if err != nil {
			if err == redis.Nil {
				fmt.Printf("[%s] No new messages\n", consumer)
				continue
			}
			fmt.Printf("[%s] Error: %v\n", consumer, err)
			continue
		}
		for _, stream := range results {
			for _, msg := range stream.Messages {
				fmt.Printf("[%s] Processing: %s -> %v\n", consumer, msg.ID, msg.Values)
				rdb.XAck(ctx, stream, group, msg.ID)
				fmt.Printf("[%s] ACKed: %s\n", consumer, msg.ID)
			}
		}
	}
}

パターン3:イベントソーシング(Event Sourcing)

全状態変更をイベントシーケンスとしてStreamに永続化し、任意時点の集約ルート状態を再構築可能にする。

import redis
import json
from dataclasses import dataclass, field
from typing import List, Dict

r = redis.Redis(host="localhost", port=6379, decode_responses=True)

@dataclass
class OrderState:
    order_id: str = ""
    status: str = "created"
    items: List[Dict] = field(default_factory=list)
    total: float = 0.0
    version: int = 0

class EventStore:
    def __init__(self, aggregate_type: str, aggregate_id: str):
        self.stream_key = f"events:{aggregate_type}:{aggregate_id}"
        self.aggregate_type = aggregate_type
        self.aggregate_id = aggregate_id

    def append(self, event_type: str, data: dict):
        event = {
            "event_type": event_type,
            "aggregate_id": self.aggregate_id,
            "data": json.dumps(data),
            "version": str(self._next_version()),
        }
        msg_id = r.xadd(self.stream_key, event)
        print(f"[EventStore] Appended: {msg_id} type={event_type} version={event['version']}")
        return msg_id

    def _next_version(self) -> int:
        info = r.xinfo_stream(self.stream_key)
        length = info.get("length", 0)
        return length + 1

    def load_events(self) -> List[dict]:
        entries = r.xrange(self.stream_key, "-", "+", count=1000)
        events = []
        for msg_id, fields in entries:
            events.append({
                "id": msg_id,
                "event_type": fields["event_type"],
                "data": json.loads(fields["data"]),
                "version": int(fields["version"]),
            })
        return events

class OrderAggregate:
    def __init__(self, order_id: str):
        self.order_id = order_id
        self.store = EventStore("order", order_id)
        self.state = OrderState(order_id=order_id)

    def create(self, items: List[Dict]):
        total = sum(item["price"] * item["qty"] for item in items)
        self.store.append("OrderCreated", {"items": items, "total": total})

    def add_item(self, item: Dict):
        self.store.append("ItemAdded", item)

    def pay(self):
        self.store.append("OrderPaid", {"payment_method": "credit_card"})

    def ship(self, tracking_number: str):
        self.store.append("OrderShipped", {"tracking_number": tracking_number})

    def rebuild(self) -> OrderState:
        events = self.store.load_events()
        state = OrderState(order_id=self.order_id)
        for event in events:
            self._apply(state, event)
        self.state = state
        return state

    def _apply(self, state: OrderState, event: dict):
        event_type = event["event_type"]
        data = event["data"]
        if event_type == "OrderCreated":
            state.items = data.get("items", [])
            state.total = data.get("total", 0)
            state.status = "created"
        elif event_type == "ItemAdded":
            state.items.append(data)
            state.total += data.get("price", 0) * data.get("qty", 1)
        elif event_type == "OrderPaid":
            state.status = "paid"
        elif event_type == "OrderShipped":
            state.status = "shipped"
        state.version = event["version"]

if __name__ == "__main__":
    order = OrderAggregate("ORD-0001")
    order.create([{"sku": "WIDGET-01", "price": 29.9, "qty": 2}])
    order.add_item({"sku": "GADGET-02", "price": 49.9, "qty": 1})
    order.pay()
    order.ship("SF1234567890")

    rebuilt = order.rebuild()
    print(f"\n[Rebuilt State] order_id={rebuilt.order_id} status={rebuilt.status} "
          f"total={rebuilt.total} items={len(rebuilt.items)} version={rebuilt.version}")

Go版:

package main

import (
	"context"
	"encoding/json"
	"fmt"

	"github.com/redis/go-redis/v9"
)

var ctx = context.Background()

type OrderState struct {
	OrderID string                   `json:"order_id"`
	Status  string                   `json:"status"`
	Items   []map[string]interface{} `json:"items"`
	Total   float64                  `json:"total"`
	Version int                      `json:"version"`
}

type Event struct {
	ID         string                 `json:"id"`
	EventType  string                 `json:"event_type"`
	Data       map[string]interface{} `json:"data"`
	Version    int                    `json:"version"`
}

type EventStore struct {
	rdb         *redis.Client
	StreamKey   string
	AggregateID string
}

func NewEventStore(rdb *redis.Client, aggregateType, aggregateID string) *EventStore {
	return &EventStore{
		rdb:         rdb,
		StreamKey:   fmt.Sprintf("events:%s:%s", aggregateType, aggregateID),
		AggregateID: aggregateID,
	}
}

func (s *EventStore) Append(eventType string, data map[string]interface{}) (string, error) {
	info, err := s.rdb.XInfoStream(ctx, s.StreamKey).Result()
	version := 1
	if err == nil {
		version = int(info.Length) + 1
	}
	dataJSON, _ := json.Marshal(data)
	msgID, err := s.rdb.XAdd(ctx, &redis.XAddArgs{
		Stream: s.StreamKey,
		Values: map[string]interface{}{
			"event_type":   eventType,
			"aggregate_id": s.AggregateID,
			"data":         string(dataJSON),
			"version":      fmt.Sprintf("%d", version),
		},
	}).Result()
	if err != nil {
		return "", err
	}
	fmt.Printf("[EventStore] Appended: %s type=%s version=%d\n", msgID, eventType, version)
	return msgID, nil
}

func (s *EventStore) LoadEvents() ([]Event, error) {
	entries, err := s.rdb.XRange(ctx, s.StreamKey, "-", "+").Result()
	if err != nil {
		return nil, err
	}
	var events []Event
	for _, msg := range entries {
		var data map[string]interface{}
		json.Unmarshal([]byte(msg.Values["data"].(string)), &data)
		var version int
		fmt.Sscanf(msg.Values["version"].(string), "%d", &version)
		events = append(events, Event{
			ID:        msg.ID,
			EventType: msg.Values["event_type"].(string),
			Data:      data,
			Version:   version,
		})
	}
	return events, nil
}

type OrderAggregate struct {
	OrderID string
	Store   *EventStore
	State   OrderState
}

func NewOrderAggregate(rdb *redis.Client, orderID string) *OrderAggregate {
	return &OrderAggregate{
		OrderID: orderID,
		Store:   NewEventStore(rdb, "order", orderID),
		State:   OrderState{OrderID: orderID, Status: "created"},
	}
}

func (a *OrderAggregate) Create(items []map[string]interface{}) {
	total := 0.0
	for _, item := range items {
		price, _ := item["price"].(float64)
		qty, _ := item["qty"].(float64)
		total += price * qty
	}
	a.Store.Append("OrderCreated", map[string]interface{}{
		"items": items, "total": total,
	})
}

func (a *OrderAggregate) Pay() {
	a.Store.Append("OrderPaid", map[string]interface{}{
		"payment_method": "credit_card",
	})
}

func (a *OrderAggregate) Ship(trackingNumber string) {
	a.Store.Append("OrderShipped", map[string]interface{}{
		"tracking_number": trackingNumber,
	})
}

func (a *OrderAggregate) Rebuild() (*OrderState, error) {
	events, err := a.Store.LoadEvents()
	if err != nil {
		return nil, err
	}
	state := OrderState{OrderID: a.OrderID, Status: "created"}
	for _, event := range events {
		switch event.EventType {
		case "OrderCreated":
			if items, ok := event.Data["items"].([]interface{}); ok {
				for _, item := range items {
					if m, ok := item.(map[string]interface{}); ok {
						state.Items = append(state.Items, m)
					}
				}
			}
			if total, ok := event.Data["total"].(float64); ok {
				state.Total = total
			}
			state.Status = "created"
		case "OrderPaid":
			state.Status = "paid"
		case "OrderShipped":
			state.Status = "shipped"
		}
		state.Version = event.Version
	}
	a.State = state
	return &state, nil
}

func main() {
	rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
	order := NewOrderAggregate(rdb, "ORD-0001")
	order.Create([]map[string]interface{}{
		{"sku": "WIDGET-01", "price": 29.9, "qty": 2},
	})
	order.Pay()
	order.Ship("SF1234567890")

	state, _ := order.Rebuild()
	fmt.Printf("\n[Rebuilt State] order_id=%s status=%s total=%.2f items=%d version=%d\n",
		state.OrderID, state.Status, state.Total, len(state.Items), state.Version)
}

パターン4:CQRS読み取りモデルプロジェクション

書き込み側がイベントをStreamに公開し、読み取り側がStreamをリスニングしてプロジェクション(読み取りモデル)を更新、Command Query Responsibility Segregationを実現。

import redis
import json
import time
import threading

r = redis.Redis(host="localhost", port=6379, decode_responses=True)

EVENT_STREAM = "events:orders"
PROJECTION_KEY = "projection:order_summary"
GROUP_NAME = "cqrs-projectors"

try:
    r.xgroup_create(EVENT_STREAM, GROUP_NAME, id="0", mkstream=True)
except redis.ResponseError:
    pass

def command_side_publish_events():
    events = [
        {"event_type": "OrderCreated", "order_id": "ORD-001", "amount": "299", "user_id": "USR-001"},
        {"event_type": "OrderCreated", "order_id": "ORD-002", "amount": "599", "user_id": "USR-002"},
        {"event_type": "OrderPaid", "order_id": "ORD-001", "amount": "299"},
        {"event_type": "OrderShipped", "order_id": "ORD-001"},
        {"event_type": "OrderCreated", "order_id": "ORD-003", "amount": "199", "user_id": "USR-001"},
    ]
    for event in events:
        r.xadd(EVENT_STREAM, event)
        print(f"[Command] Published: {event['event_type']} -> {event.get('order_id')}")
        time.sleep(0.3)

def query_side_projector():
    consumer_name = "projector-1"
    while True:
        entries = r.xreadgroup(
            GROUP_NAME, consumer_name,
            {EVENT_STREAM: ">"},
            count=5, block=3000
        )
        if not entries:
            continue
        for stream_name, messages in entries:
            for msg_id, fields in messages:
                update_projection(fields)
                r.xack(EVENT_STREAM, GROUP_NAME, msg_id)

def update_projection(event: dict):
    event_type = event.get("event_type")
    order_id = event.get("order_id", "")
    if event_type == "OrderCreated":
        r.hset(PROJECTION_KEY, order_id, json.dumps({
            "status": "created",
            "amount": event.get("amount", "0"),
            "user_id": event.get("user_id", ""),
        }))
    elif event_type == "OrderPaid":
        existing = r.hget(PROJECTION_KEY, order_id)
        if existing:
            data = json.loads(existing)
            data["status"] = "paid"
            r.hset(PROJECTION_KEY, order_id, json.dumps(data))
    elif event_type == "OrderShipped":
        existing = r.hget(PROJECTION_KEY, order_id)
        if existing:
            data = json.loads(existing)
            data["status"] = "shipped"
            r.hset(PROJECTION_KEY, order_id, json.dumps(data))
    print(f"[Projection] Updated: {order_id} <- {event_type}")

def query_read_model():
    time.sleep(3)
    all_orders = r.hgetall(PROJECTION_KEY)
    print("\n[Read Model] Order Summary:")
    for order_id, data in all_orders.items():
        print(f"  {order_id}: {data}")

if __name__ == "__main__":
    threading.Thread(target=command_side_publish_events, daemon=True).start()
    threading.Thread(target=query_side_projector, daemon=True).start()
    query_read_model()

Go版:

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"time"

	"github.com/redis/go-redis/v9"
)

var ctx = context.Background()

const (
	eventStream   = "events:orders"
	projectionKey = "projection:order_summary"
	groupName     = "cqrs-projectors"
)

func main() {
	rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
	rdb.XGroupCreateMkStream(ctx, eventStream, groupName, "0")

	go commandSidePublishEvents(rdb)
	go querySideProjector(rdb)
	time.Sleep(4 * time.Second)
	queryReadModel(rdb)
}

func commandSidePublishEvents(rdb *redis.Client) {
	events := []map[string]interface{}{
		{"event_type": "OrderCreated", "order_id": "ORD-001", "amount": "299", "user_id": "USR-001"},
		{"event_type": "OrderCreated", "order_id": "ORD-002", "amount": "599", "user_id": "USR-002"},
		{"event_type": "OrderPaid", "order_id": "ORD-001", "amount": "299"},
		{"event_type": "OrderShipped", "order_id": "ORD-001"},
		{"event_type": "OrderCreated", "order_id": "ORD-003", "amount": "199", "user_id": "USR-001"},
	}
	for _, event := range events {
		rdb.XAdd(ctx, &redis.XAddArgs{Stream: eventStream, Values: event})
		fmt.Printf("[Command] Published: %s -> %s\n", event["event_type"], event["order_id"])
		time.Sleep(300 * time.Millisecond)
	}
}

func querySideProjector(rdb *redis.Client) {
	for {
		results, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
			Group:    groupName,
			Consumer: "projector-1",
			Streams:  []string{eventStream, ">"},
			Count:    5,
			Block:    3 * time.Second,
		}).Result()
		if err != nil {
			continue
		}
		for _, stream := range results {
			for _, msg := range stream.Messages {
				updateProjection(rdb, msg.Values)
				rdb.XAck(ctx, eventStream, groupName, msg.ID)
			}
		}
	}
}

func updateProjection(rdb *redis.Client, fields map[string]interface{}) {
	eventType, _ := fields["event_type"].(string)
	orderID, _ := fields["order_id"].(string)
	switch eventType {
	case "OrderCreated":
		data, _ := json.Marshal(map[string]string{
			"status":  "created",
			"amount":  fmt.Sprintf("%v", fields["amount"]),
			"user_id": fmt.Sprintf("%v", fields["user_id"]),
		})
		rdb.HSet(ctx, projectionKey, orderID, string(data))
	case "OrderPaid":
		existing, _ := rdb.HGet(ctx, projectionKey, orderID).Result()
		if existing != "" {
			var data map[string]string
			json.Unmarshal([]byte(existing), &data)
			data["status"] = "paid"
			updated, _ := json.Marshal(data)
			rdb.HSet(ctx, projectionKey, orderID, string(updated))
		}
	case "OrderShipped":
		existing, _ := rdb.HGet(ctx, projectionKey, orderID).Result()
		if existing != "" {
			var data map[string]string
			json.Unmarshal([]byte(existing), &data)
			data["status"] = "shipped"
			updated, _ := json.Marshal(data)
			rdb.HSet(ctx, projectionKey, orderID, string(updated))
		}
	}
	fmt.Printf("[Projection] Updated: %s <- %s\n", orderID, eventType)
}

func queryReadModel(rdb *redis.Client) {
	orders, _ := rdb.HGetAll(ctx, projectionKey).Result()
	fmt.Println("\n[Read Model] Order Summary:")
	for orderID, data := range orders {
		fmt.Printf("  %s: %s\n", orderID, data)
	}
}

パターン5:デッドレターキューとリトライメカニズム

処理失敗がリトライ閾値を超えたメッセージをデッドレターキューに移行し、無限リトライによる消費ブロックを防止する。

import redis
import json
import time
import random

r = redis.Redis(host="localhost", port=6379, decode_responses=True)

MAIN_STREAM = "events:orders"
DLQ_STREAM = "events:orders:dlq"
GROUP_NAME = "order-processors"
MAX_RETRIES = 3
RETRY_DELAYS = [1, 5, 30]

try:
    r.xgroup_create(MAIN_STREAM, GROUP_NAME, id="0", mkstream=True)
except redis.ResponseError:
    pass

def process_message(fields: dict) -> bool:
    order_id = fields.get("order_id", "")
    if random.random() < 0.4:
        print(f"  [FAIL] Processing failed for {order_id}")
        return False
    print(f"  [OK] Processed {order_id}")
    return True

def consumer_with_retry(consumer_name: str):
    while True:
        entries = r.xreadgroup(
            GROUP_NAME, consumer_name,
            {MAIN_STREAM: ">"},
            count=1, block=3000
        )
        if not entries:
            continue
        for stream_name, messages in entries:
            for msg_id, fields in messages:
                success = process_message(fields)
                if success:
                    r.xack(MAIN_STREAM, GROUP_NAME, msg_id)
                else:
                    pending_info = r.xpending_range(
                        MAIN_STREAM, GROUP_NAME,
                        min=msg_id, max=msg_id, count=1
                    )
                    retry_count = 0
                    if pending_info:
                        retry_count = pending_info[0].get("times_delivered", 1) - 1
                    if retry_count >= MAX_RETRIES:
                        r.xadd(DLQ_STREAM, {
                            **fields,
                            "original_id": msg_id,
                            "failure_reason": "max_retries_exceeded",
                            "retry_count": str(retry_count),
                        })
                        r.xack(MAIN_STREAM, GROUP_NAME, msg_id)
                        print(f"  [DLQ] Moved to dead letter: {msg_id}")
                    else:
                        delay = RETRY_DELAYS[min(retry_count, len(RETRY_DELAYS) - 1)]
                        print(f"  [RETRY] Will retry {msg_id} after {delay}s "
                              f"(attempt {retry_count + 1}/{MAX_RETRIES})")
                        r.xack(MAIN_STREAM, GROUP_NAME, msg_id)
                        time.sleep(delay)
                        r.xadd(MAIN_STREAM, fields)

def inspect_dlq():
    dlq_entries = r.xrange(DLQ_STREAM, "-", "+", count=20)
    print(f"\n[DLQ] {len(dlq_entries)} dead letters:")
    for msg_id, fields in dlq_entries:
        print(f"  {msg_id}: order_id={fields.get('order_id')} "
              f"retries={fields.get('retry_count')} reason={fields.get('failure_reason')}")

if __name__ == "__main__":
    for i in range(1, 11):
        r.xadd(MAIN_STREAM, {"order_id": f"ORD-{i:04d}", "amount": str(i * 100)})

    random.seed(42)
    consumer_with_retry("consumer-1")

Go版:

package main

import (
	"context"
	"fmt"
	"math/rand"
	"time"

	"github.com/redis/go-redis/v9"
)

var ctx = context.Background()

const (
	mainStream  = "events:orders"
	dlqStream   = "events:orders:dlq"
	groupName   = "order-processors"
	maxRetries  = 3
)

var retryDelays = []time.Duration{1 * time.Second, 5 * time.Second, 30 * time.Second}

func main() {
	rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
	rdb.XGroupCreateMkStream(ctx, mainStream, groupName, "0")

	for i := 1; i <= 10; i++ {
		rdb.XAdd(ctx, &redis.XAddArgs{
			Stream: mainStream,
			Values: map[string]interface{}{
				"order_id": fmt.Sprintf("ORD-%04d", i),
				"amount":   fmt.Sprintf("%d", i*100),
			},
		})
	}

	consumerWithRetry(rdb, "consumer-1")
}

func processMessage(fields map[string]interface{}) bool {
	orderID := fmt.Sprintf("%v", fields["order_id"])
	if rand.Float64() < 0.4 {
		fmt.Printf("  [FAIL] Processing failed for %s\n", orderID)
		return false
	}
	fmt.Printf("  [OK] Processed %s\n", orderID)
	return true
}

func consumerWithRetry(rdb *redis.Client, consumerName string) {
	for {
		results, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
			Group:    groupName,
			Consumer: consumerName,
			Streams:  []string{mainStream, ">"},
			Count:    1,
			Block:    3 * time.Second,
		}).Result()
		if err != nil {
			continue
		}
		for _, stream := range results {
			for _, msg := range stream.Messages {
				if processMessage(msg.Values) {
					rdb.XAck(ctx, mainStream, groupName, msg.ID)
				} else {
					pending, _ := rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
						Stream: mainStream,
						Group:  groupName,
						Start:  msg.ID,
						End:    msg.ID,
						Count:  1,
					}).Result()
					retryCount := 0
					if len(pending) > 0 {
						retryCount = int(pending[0].DeliveryCount) - 1
					}
					if retryCount >= maxRetries {
						dlqValues := map[string]interface{}{
							"original_id":    msg.ID,
							"failure_reason": "max_retries_exceeded",
							"retry_count":    fmt.Sprintf("%d", retryCount),
						}
						for k, v := range msg.Values {
							dlqValues[k] = v
						}
						rdb.XAdd(ctx, &redis.XAddArgs{Stream: dlqStream, Values: dlqValues})
						rdb.XAck(ctx, mainStream, groupName, msg.ID)
						fmt.Printf("  [DLQ] Moved to dead letter: %s\n", msg.ID)
					} else {
						delay := retryDelays[min(retryCount, len(retryDelays)-1)]
						fmt.Printf("  [RETRY] Will retry %s after %v (attempt %d/%d)\n",
							msg.ID, delay, retryCount+1, maxRetries)
						rdb.XAck(ctx, mainStream, groupName, msg.ID)
						time.Sleep(delay)
						rdb.XAdd(ctx, &redis.XAddArgs{Stream: mainStream, Values: msg.Values})
					}
				}
			}
		}
	}
}

パターン6:プロダクション級イベント処理サービス(Go実装)

グレースフルシャットダウン、ヘルスチェック、メトリクス監視、XCLAIMフェイルオーバーを備えた完全なプロダクション級イベント処理サービス。

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"os/signal"
	"sync"
	"sync/atomic"
	"syscall"
	"time"

	"github.com/redis/go-redis/v9"
)

type EventProcessor struct {
	rdb          *redis.Client
	stream       string
	group        string
	consumer     string
	dlqStream    string
	maxRetries   int
	processed    atomic.Int64
	failed       atomic.Int64
	retried      atomic.Int64
	mu           sync.Mutex
	running      bool
	ctx          context.Context
	cancel       context.CancelFunc
}

func NewEventProcessor(rdb *redis.Client, stream, group, consumer string) *EventProcessor {
	ctx, cancel := context.WithCancel(context.Background())
	return &EventProcessor{
		rdb:        rdb,
		stream:     stream,
		group:      group,
		consumer:   consumer,
		dlqStream:  stream + ":dlq",
		maxRetries: 3,
		running:    true,
		ctx:        ctx,
		cancel:     cancel,
	}
}

func (p *EventProcessor) Start() {
	rdb := p.rdb
	rdb.XGroupCreateMkStream(p.ctx, p.stream, p.group, "0")

	log.Printf("[Processor] Starting consumer %s for stream %s", p.consumer, p.stream)

	go p.claimPendingMessages()
	go p.consumeNewMessages()
	go p.reportMetrics()
}

func (p *EventProcessor) consumeNewMessages() {
	for p.running {
		results, err := p.rdb.XReadGroup(p.ctx, &redis.XReadGroupArgs{
			Group:    p.group,
			Consumer: p.consumer,
			Streams:  []string{p.stream, ">"},
			Count:    10,
			Block:    2 * time.Second,
		}).Result()
		if err != nil {
			if err == redis.Nil || p.ctx.Err() != nil {
				continue
			}
			log.Printf("[Consumer] Error: %v", err)
			continue
		}
		for _, stream := range results {
			for _, msg := range stream.Messages {
				p.handleMessage(msg.ID, msg.Values)
			}
		}
	}
}

func (p *EventProcessor) claimPendingMessages() {
	ticker := time.NewTicker(30 * time.Second)
	defer ticker.Stop()
	for {
		select {
		case <-ticker.C:
			pending, err := p.rdb.XPendingExt(p.ctx, &redis.XPendingExtArgs{
				Stream: p.stream,
				Group:  p.group,
				Start:  "-",
				End:    "+",
				Count:  10,
				Idle:   60 * time.Second,
			}).Result()
			if err != nil {
				continue
			}
			for _, msg := range pending {
				claimed, err := p.rdb.XClaim(p.ctx, &redis.XClaimArgs{
					Stream:   p.stream,
					Group:    p.group,
					Consumer: p.consumer,
					MinIdle:  60 * time.Second,
					Messages: []string{msg.ID},
				}).Result()
				if err == nil && len(claimed) > 0 {
					log.Printf("[Claim] Claimed pending message: %s", claimed[0].ID)
					p.retried.Add(1)
					p.handleMessage(claimed[0].ID, claimed[0].Values)
				}
			}
		case <-p.ctx.Done():
			return
		}
	}
}

func (p *EventProcessor) handleMessage(msgID string, values map[string]interface{}) {
	err := p.processEvent(values)
	if err != nil {
		p.failed.Add(1)
		pending, _ := p.rdb.XPendingExt(p.ctx, &redis.XPendingExtArgs{
			Stream: p.stream,
			Group:  p.group,
			Start:  msgID,
			End:    msgID,
			Count:  1,
		}).Result()
		deliveryCount := 1
		if len(pending) > 0 {
			deliveryCount = int(pending[0].DeliveryCount)
		}
		if deliveryCount >= p.maxRetries {
			dlqValues := map[string]interface{}{
				"original_id":    msgID,
				"failure_reason": err.Error(),
				"retry_count":    fmt.Sprintf("%d", deliveryCount),
			}
			for k, v := range values {
				dlqValues[k] = v
			}
			p.rdb.XAdd(p.ctx, &redis.XAddArgs{Stream: p.dlqStream, Values: dlqValues})
			p.rdb.XAck(p.ctx, p.stream, p.group, msgID)
			log.Printf("[DLQ] Moved %s to dead letter after %d attempts", msgID, deliveryCount)
		} else {
			log.Printf("[Retry] Message %s failed (attempt %d/%d): %v",
				msgID, deliveryCount, p.maxRetries, err)
		}
		return
	}
	p.rdb.XAck(p.ctx, p.stream, p.group, msgID)
	p.processed.Add(1)
}

func (p *EventProcessor) processEvent(values map[string]interface{}) error {
	eventType, _ := values["event_type"].(string)
	switch eventType {
	case "order_created":
		return p.handleOrderCreated(values)
	case "payment_completed":
		return p.handlePaymentCompleted(values)
	default:
		log.Printf("[Process] Unknown event type: %s", eventType)
		return nil
	}
}

func (p *EventProcessor) handleOrderCreated(values map[string]interface{}) error {
	orderID := fmt.Sprintf("%v", values["order_id"])
	log.Printf("[Process] Order created: %s", orderID)
	return nil
}

func (p *EventProcessor) handlePaymentCompleted(values map[string]interface{}) error {
	paymentID := fmt.Sprintf("%v", values["payment_id"])
	log.Printf("[Process] Payment completed: %s", paymentID)
	return nil
}

func (p *EventProcessor) reportMetrics() {
	ticker := time.NewTicker(5 * time.Second)
	defer ticker.Stop()
	for {
		select {
		case <-ticker.C:
			log.Printf("[Metrics] processed=%d failed=%d retried=%d",
				p.processed.Load(), p.failed.Load(), p.retried.Load())
		case <-p.ctx.Done():
			return
		}
	}
}

func (p *EventProcessor) Stop() {
	log.Println("[Processor] Shutting down gracefully...")
	p.running = false
	p.cancel()
	time.Sleep(2 * time.Second)
	log.Printf("[Processor] Final metrics: processed=%d failed=%d retried=%d",
		p.processed.Load(), p.failed.Load(), p.retried.Load())
}

func main() {
	rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
	processor := NewEventProcessor(rdb, "events:orders", "order-processors", "worker-1")
	processor.Start()

	for i := 1; i <= 20; i++ {
		rdb.XAdd(context.Background(), &redis.XAddArgs{
			Stream: "events:orders",
			Values: map[string]interface{}{
				"event_type": "order_created",
				"order_id":   fmt.Sprintf("ORD-%04d", i),
				"amount":     fmt.Sprintf("%d", i*100),
			},
		})
	}

	sigCh := make(chan os.Signal, 1)
	signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
	<-sigCh
	processor.Stop()
}

落とし穴ガイド:5つのよくある間違い

❌ 落とし穴1:XREADでコンシューマーグループを使わず、メッセージが重複消費される

# ❌ 誤り:複数コンシューマーが独立してXREAD、全メッセージが全コンシューマーで処理される
def bad_consumer():
    entries = r.xread({STREAM_NAME: "0-0"}, count=10, block=5000)
    for _, messages in entries:
        for msg_id, fields in messages:
            process(fields)
# ✅ 正しい:XREADGROUPを使用、コンシューマーグループが各メッセージの1コンシューマーのみ処理を保証
def good_consumer():
    entries = r.xreadgroup(
        GROUP_NAME, "consumer-1",
        {STREAM_NAME: ">"},
        count=10, block=5000
    )
    for _, messages in entries:
        for msg_id, fields in messages:
            process(fields)
            r.xack(STREAM_NAME, GROUP_NAME, msg_id)

❌ 落とし穴2:XACKを忘れ、メッセージがPendingリストに永遠に残る

// ❌ 誤り:メッセージ処理後にACKせず、メッセージがPending状態のまま
func badConsumer(rdb *redis.Client) {
    results, _ := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
        Group:    groupName,
        Consumer: "consumer-1",
        Streams:  []string{streamName, ">"},
        Count:    10,
    }).Result()
    for _, stream := range results {
        for _, msg := range stream.Messages {
            processEvent(msg.Values)
            // ACK忘れ!
        }
    }
}
// ✅ 正しい:処理成功後すぐにACK
func goodConsumer(rdb *redis.Client) {
    results, _ := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
        Group:    groupName,
        Consumer: "consumer-1",
        Streams:  []string{streamName, ">"},
        Count:    10,
    }).Result()
    for _, stream := range results {
        for _, msg := range stream.Messages {
            if err := processEvent(msg.Values); err == nil {
                rdb.XAck(ctx, streamName, groupName, msg.ID)
            }
        }
    }
}

❌ 落とし穴3:Streamが無限増大してメモリオーバーフロー

# ❌ 誤り:書き込みだけでトリムなし、Streamが無限増大
r.xadd("events:orders", {"data": "..."})
# XTRIMやXDELを呼び出さない
# ✅ 正しい:MAXLENでStream長を制限、または定期的にXTRIM
r.xadd("events:orders", {"data": "..."}, maxlen=10000, approximate=True)
# または手動トリム
r.xtrim("events:orders", maxlen=10000, approximate=True)

❌ 落とし穴4:XCLAIMでアイドル時間をチェックしない

// ❌ 誤り:全Pendingメッセージを無差別にXCLAIM、処理中のメッセージを奪う可能性
func badClaim(rdb *redis.Client) {
    pending, _ := rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
        Stream: streamName,
        Group:  groupName,
        Start:  "-",
        End:    "+",
        Count:  100,
    }).Result()
    for _, msg := range pending {
        rdb.XClaim(ctx, &redis.XClaimArgs{
            Stream:   streamName,
            Group:    groupName,
            Consumer: "consumer-2",
            Messages: []string{msg.ID},
        })
    }
}
// ✅ 正しい:アイドル時間が閾値を超えたPendingメッセージのみClaim
func goodClaim(rdb *redis.Client) {
    pending, _ := rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
        Stream: streamName,
        Group:  groupName,
        Start:  "-",
        End:    "+",
        Count:  100,
        Idle:   60 * time.Second, // 60秒以上アイドルのもののみClaim
    }).Result()
    for _, msg := range pending {
        rdb.XClaim(ctx, &redis.XClaimArgs{
            Stream:   streamName,
            Group:    groupName,
            Consumer: "consumer-2",
            MinIdle:  60 * time.Second,
            Messages: []string{msg.ID},
        })
    }
}

❌ 落とし穴5:コンシューマーグループ作成時のID設定ミス

# ❌ 誤り:$でコンシューマーグループを作成、新メッセージのみ消費、履歴メッセージが損失
r.xgroup_create(STREAM_NAME, GROUP_NAME, id="$")
# ✅ 正しい:0でコンシューマーグループを作成、全メッセージを最初から消費
r.xgroup_create(STREAM_NAME, GROUP_NAME, id="0")
# 新メッセージのみ消費したい場合は、意図を明示的にコメント
r.xgroup_create(STREAM_NAME, GROUP_NAME, id="$")  # 新メッセージのみ消費

エラートラブルシューティング表

エラー現象 可能な原因 診断コマンド 解決策
NOGROUP No such key Streamまたはコンシューマーグループが存在しない XINFO GROUPS events:orders 先にXGROUP CREATEでグループ作成、MKSTREAMでStream自動作成
BUSYGROUP Consumer Group name already exists コンシューマーグループの重複作成 XINFO GROUPS events:orders 例外をキャッチして無視、または事前にグループ存在確認
コンシューマーがメッセージを受信しない XREADGROUPで0-0を使用(>ではない) コード内のStreamsパラメータを確認 新メッセージは>、未確認メッセージは0-0
Pendingメッセージの蓄積 コンシューマークラッシュで未ACK XPENDING events:orders group-name XCLAIMでアクティブなコンシューマーに移管
Streamメモリが増加し続ける MAXLEN/XTRIM未設定 XINFO STREAM events:orderslength確認 XADD時にMAXLEN ~ 10000を追加、または定期的にXTRIM
メッセージが重複消費される 処理完了後ACKに失敗 ACKが処理ロジックの後に実行されているか確認 ACKがビジネスロジック成功後に実行されることを保証
XCLAIM後も元コンシューマーが処理 MinIdleの設定が短すぎる XPENDINGでメッセージのアイドル時間を確認 合理的なMinIdle閾値を設定(例:60秒)
イベントソーシングのリプレイが遅い イベント数が多すぎる、フルXRANGE XLEN events:order:ORD-001 定期スナップショット+増分イベント、XRANGEでID範囲指定リプレイ
CQRS読み取りモデルが不整合 プロジェクションコンシューマーの遅延またはクラッシュ XINFO CONSUMERSでコンシューマー遅延を確認 コンシューマーlagを監視、アラート閾値を設定
XADDのID形式が不正 Redisバージョンが5.0未満 redis-server --version Redis 5.0+にアップグレード

高度な最適化

1. Streamシャーディングとパーティショニング戦略

単一Streamのスループットが不足する場合、ビジネスキーで複数Streamにシャーディング:

import hashlib

def get_shard_stream(base_stream: str, key: str, shard_count: int = 16) -> str:
    shard_index = int(hashlib.md5(key.encode()).hexdigest(), 16) % shard_count
    return f"{base_stream}:shard-{shard_index:03d}"

order_id = "ORD-0001"
stream = get_shard_stream("events:orders", order_id)
r.xadd(stream, {"order_id": order_id, "event_type": "order_created"})

2. コンシューマーLag監視とアラート

func monitorConsumerLag(rdb *redis.Client, stream, group string) {
    ticker := time.NewTicker(10 * time.Second)
    for range ticker.C {
        info, err := rdb.XInfoStream(ctx, stream).Result()
        if err != nil {
            continue
        }
        groups, _ := rdb.XInfoGroups(ctx, stream).Result()
        for _, g := range groups {
            if g.Name == group {
                pending := g.Pending
                consumers, _ := rdb.XInfoConsumers(ctx, stream, group).Result()
                for _, c := range consumers {
                    if c.Pending > 100 {
                        log.Printf("[ALERT] Consumer %s has %d pending messages!",
                            c.Name, c.Pending)
                    }
                    if c.Idle > 5*time.Minute {
                        log.Printf("[WARN] Consumer %s idle for %v", c.Name, c.Idle)
                    }
                }
                if pending > 1000 {
                    log.Printf("[ALERT] Group %s has %d pending messages!", group, pending)
                }
                _ = info
            }
        }
    }
}

3. イベントスナップショットと増分リプレイ

import json

SNAPSHOT_INTERVAL = 100

def save_snapshot(aggregate_id: str, state: dict, version: int):
    snapshot_key = f"snapshot:order:{aggregate_id}"
    r.hset(snapshot_key, mapping={
        "state": json.dumps(state),
        "version": str(version),
        "timestamp": str(time.time()),
    })

def load_from_snapshot(aggregate_id: str):
    snapshot_key = f"snapshot:order:{aggregate_id}"
    data = r.hgetall(snapshot_key)
    if not data:
        return None, 0
    state = json.loads(data["state"])
    version = int(data["version"])
    return state, version

def rebuild_with_snapshot(aggregate_id: str):
    state, version = load_from_snapshot(aggregate_id)
    stream_key = f"events:order:{aggregate_id}"
    if state is None:
        events = r.xrange(stream_key, "-", "+")
    else:
        min_id = f"{version + 1}-0"
        events = r.xrange(stream_key, min_id, "+")
    for msg_id, fields in events:
        state = apply_event(state or {}, fields)
    return state

比較:Redis Streams vs 他のメッセージングシステム

特徴 Redis Streams Kafka RabbitMQ NATS JetStream Pulsar
位置づけ 軽量イベントストリーム 分散イベントストリーミングプラットフォーム メッセージブローカー クラウドネイティブメッセージング 分散メッセージングプラットフォーム
デプロイ複雑度 ★☆☆☆☆ ★★★★★ ★★★☆☆ ★★☆☆☆ ★★★★☆
スループット 10-50万/s 100万+/s 5-10万/s 50-100万/s 100万+/s
メッセージ永続化 AOF/RDB ディスクログ オプション永続化 ディスクログ BookKeeper
コンシューマーグループ ✅ ネイティブ対応 ✅ ネイティブ対応 ❌ プラグイン必要 ✅ ネイティブ対応 ✅ ネイティブ対応
メッセージリプレイ ✅ XRANGE ✅ Offsetリセット ❌ 非対応 ✅ 対応 ✅ 対応
Exactly-Once ❌ At-Least-Once ✅ トランザクション対応 ❌ プラグイン必要 ✅ 対応 ✅ 対応
デッドレターキュー ❌ 手動実装必要 ❌ 手動実装必要 ✅ ネイティブ対応 ✅ ネイティブ対応 ✅ ネイティブ対応
パーティショニング ❌ 手動シャーディング ✅ 自動パーティショニング ✅ Exchangeルーティング ✅ 自動パーティショニング ✅ 自動パーティショニング
運用コスト 非常に低い 高い(ZooKeeper/KRaft) 中程度 低い 高い(BookKeeper)
ユースケース 中小規模マイクロサービスイベント通信 大規模データストリーム処理 エンタープライズメッセージルーティング クラウドネイティブ軽量メッセージング マルチテナント大規模メッセージング

まとめ

Redis Streamsは2026年も中小規模マイクロサービスイベント駆動アーキテクチャの最適な軽量選択肢であり続ける。Kafkaのような重いインフラもRabbitMQのような複雑なExchange設定も不要で、6つのコアパターンで基本プロデューサー/コンシューマーからプロダクション級イベント処理まで全シナリオをカバーする。覚えておくべき3つのポイント:必ずコンシューマーグループ(XREADGROUP)を使う処理後は必ずXACKするStreamには必ずMAXLENを設定してメモリオーバーフローを防ぐ。Exactly-Onceセマンティクスや100万級スループットが必要になったら、KafkaやPulsarへのアップグレードを検討しよう。


おすすめツール

ブラウザローカルツールを無料で試す →

#Redis#Streams#事件驱动#消息队列#Consumer Group#2026#微服务