Redis Streamsイベント駆動アーキテクチャ実践:コンシューマーグループからイベントソーシングまで6つのプロダクションパターン
マイクロサービス通信のジレンマ:Kafkaは重すぎる、Redis Streamsがちょうどいい
注文サービスは在庫引き当てに通知し、決済サービスは取引結果をブロードキャストし、ユーザー登録はウェルカムメールをトリガーする——マイクロサービス間のイベント通信は至る所に存在する。Kafkaを導入する?クラスタ運用、ZooKeeper/KRaft調整、パーティションリバランス……小さなサービスには重すぎるインフラだ。RabbitMQを使う?Exchangeバインディング、ルーティングキー設定、メッセージ確認……複雑さも低くない。2026年、Redis Streamsは軽量イベント駆動ソリューションとして、中小規模マイクロサービス通信の選択肢になりつつある。
本記事では6つのプロダクションパターンを通じて、基本プロデューサー/コンシューマー→コンシューマーグループ→イベントソーシング→CQRS読み取りモデル→デッドレターキュー→プロダクション級イベント処理サービスの全チェーン実践を解説する。各ステップには完全な実行可能なPythonとGoコードを含む。
Redis Streamsコア概念
| 概念 | 説明 |
|---|---|
| Stream | Redis 5.0で導入されたログ型データ構造、Kafka Topicに類似、時系列でメッセージを格納 |
| XADD | Streamにメッセージを追加、タイムスタンプIDを自動生成(例:1718432000000-0) |
| XREAD | Streamからメッセージを読み取り、ブロッキング/ノンブロッキングモード対応 |
| Consumer Group | コンシューマーグループ、Kafka Consumer Groupに類似、各メッセージがグループ内の1コンシューマーのみで処理されることを保証 |
| XREADGROUP | コンシューマーグループとしてメッセージを読み取り、メッセージはPending状態に移行 |
| XACK | メッセージ処理完了を確認、Pendingリストから削除 |
| Pending Entries List (PEL) | 配信済みだが未確認のメッセージリスト、障害復旧とリトライに使用 |
| XPENDING | コンシューマーグループのPendingメッセージ統計を表示 |
| XCLAIM | Pendingメッセージを他のコンシューマーに移管、フェイルオーバーに使用 |
| Event Sourcing | イベントソーシングパターン、全状態変更をイベントシーケンスとして永続化、任意時点の状態を再構築可能 |
| CQRS | Command Query Responsibility Segregation、書き込みモデル(イベントストリーム)と読み取りモデル(プロジェクション)が独立して進化 |
| Dead Letter Queue (DLQ) | デッドレターキュー、リトライ閾値を超えた失敗メッセージを処理、無限リトライによる消費ブロックを防止 |
問題分析:イベント駆動アーキテクチャの5つの課題
- メッセージ損失と重複消費:コンシューマークラッシュ時、未ACKメッセージが損失または再配信される可能性、Exactly-OnceまたはAt-Least-Onceセマンティクス保証が必要
- コンシューマーグループ負荷不均衡:処理の遅いコンシューマーがメッセージ蓄積を引き起こし、他のコンシューマーがアイドル状態に、動的リバランスとXCLAIM移管が必要
- イベントソーシングのスナップショットとリプレイ:イベントストリームが無限に増大、フルリプレイのコストが高い、定期スナップショット+増分イベントの組み合わせ戦略が必要
- CQRS読み取りモデルの整合性:読み取りモデルプロジェクションは結果整合性、ユーザーが見るデータが遅延する可能性、バージョントラッキングと補償メカニズムが必要
- デッドレターとリトライストーム:メッセージ処理の繰り返し失敗がリトライストームを引き起こす、指数バックオフ+DLQ+手動介入の完全な戦略が必要
ステップバイステップ:6つのRedis Streamsイベント駆動パターン
パターン1:基本Streamプロデューサー/コンシューマー(XADD/XREAD)
最もシンプルなプロデューサー・コンシューマーモデル:1つのプロデューサーがStreamに書き込み、1つのコンシューマーが読み取る。
import redis
import json
import time
import threading
r = redis.Redis(host="localhost", port=6379, decode_responses=True)
STREAM_NAME = "events:orders"
def producer():
for i in range(1, 6):
event = {
"event_type": "order_created",
"order_id": f"ORD-{i:04d}",
"amount": 100 * i,
"user_id": f"USR-{i:03d}",
"timestamp": time.time()
}
msg_id = r.xadd(STREAM_NAME, event)
print(f"[Producer] Added event: {msg_id} -> {event}")
time.sleep(0.5)
def consumer():
last_id = "0-0"
while True:
entries = r.xread({STREAM_NAME: last_id}, count=5, block=5000)
if not entries:
print("[Consumer] No new messages, waiting...")
continue
for stream_name, messages in entries:
for msg_id, fields in messages:
print(f"[Consumer] Processed: {msg_id} -> {fields}")
last_id = msg_id
if __name__ == "__main__":
threading.Thread(target=producer, daemon=True).start()
consumer()
Go版:
package main
import (
"context"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
var ctx = context.Background()
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
streamName := "events:orders"
go producer(rdb, streamName)
consumer(rdb, streamName)
}
func producer(rdb *redis.Client, streamName string) {
for i := 1; i <= 5; i++ {
values := map[string]interface{}{
"event_type": "order_created",
"order_id": fmt.Sprintf("ORD-%04d", i),
"amount": fmt.Sprintf("%d", 100*i),
"user_id": fmt.Sprintf("USR-%03d", i),
"timestamp": fmt.Sprintf("%d", time.Now().UnixMilli()),
}
msgID, err := rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
Values: values,
}).Result()
if err != nil {
fmt.Printf("[Producer] Error: %v\n", err)
continue
}
fmt.Printf("[Producer] Added event: %s\n", msgID)
time.Sleep(500 * time.Millisecond)
}
}
func consumer(rdb *redis.Client, streamName string) {
lastID := "0-0"
for {
results, err := rdb.XRead(ctx, &redis.XReadArgs{
Streams: []string{streamName, lastID},
Count: 5,
Block: 5 * time.Second,
}).Result()
if err != nil {
if err == redis.Nil {
fmt.Println("[Consumer] No new messages, waiting...")
continue
}
fmt.Printf("[Consumer] Error: %v\n", err)
continue
}
for _, stream := range results {
for _, msg := range stream.Messages {
fmt.Printf("[Consumer] Processed: %s -> %v\n", msg.ID, msg.Values)
lastID = msg.ID
}
}
}
}
パターン2:コンシューマーグループ(XREADGROUP)
コンシューマーグループは各メッセージがグループ内の1コンシューマーのみで処理されることを保証する。KafkaのConsumer Groupに類似。
import redis
import time
import threading
import random
r = redis.Redis(host="localhost", port=6379, decode_responses=True)
STREAM_NAME = "events:payments"
GROUP_NAME = "payment-processors"
try:
r.xgroup_create(STREAM_NAME, GROUP_NAME, id="0", mkstream=True)
print(f"[Init] Created consumer group: {GROUP_NAME}")
except redis.ResponseError as e:
if "BUSYGROUP" in str(e):
print(f"[Init] Group already exists: {GROUP_NAME}")
else:
raise
def group_consumer(consumer_name: str):
while True:
entries = r.xreadgroup(
GROUP_NAME, consumer_name,
{STREAM_NAME: ">"},
count=3, block=3000
)
if not entries:
print(f"[{consumer_name}] No new messages")
continue
for stream_name, messages in entries:
for msg_id, fields in messages:
process_time = random.uniform(0.1, 0.5)
time.sleep(process_time)
r.xack(STREAM_NAME, GROUP_NAME, msg_id)
print(f"[{consumer_name}] ACKed: {msg_id} -> {fields}")
def check_pending():
pending = r.xpending_range(STREAM_NAME, GROUP_NAME, min="-", max="+", count=10)
if pending:
print(f"[Monitor] Pending messages: {len(pending)}")
for p in pending:
print(f" msg_id={p['message_id']}, consumer={p['consumer']}, "
f"delivered={p['times_delivered']} times")
if __name__ == "__main__":
for i in range(1, 4):
for j in range(1, 3):
r.xadd(STREAM_NAME, {
"event_type": "payment_completed",
"payment_id": f"PAY-{i:02d}{j:02d}",
"amount": str(random.randint(50, 500)),
})
threading.Thread(target=group_consumer, args=("consumer-1",), daemon=True).start()
threading.Thread(target=group_consumer, args=("consumer-2",), daemon=True).start()
time.sleep(1)
check_pending()
time.sleep(5)
Go版:
package main
import (
"context"
"fmt"
"math/rand"
"time"
"github.com/redis/go-redis/v9"
)
var ctx = context.Background()
func main() {
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
streamName := "events:payments"
groupName := "payment-processors"
err := rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()
if err != nil {
fmt.Printf("[Init] Group create: %v\n", err)
} else {
fmt.Printf("[Init] Created consumer group: %s\n", groupName)
}
for i := 1; i <= 3; i++ {
for j := 1; j <= 2; j++ {
rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
Values: map[string]interface{}{
"event_type": "payment_completed",
"payment_id": fmt.Sprintf("PAY-%02d%02d", i, j),
"amount": fmt.Sprintf("%d", rand.Intn(450)+50),
},
})
}
}
go groupConsumer(rdb, streamName, groupName, "consumer-1")
go groupConsumer(rdb, streamName, groupName, "consumer-2")
time.Sleep(6 * time.Second)
}
func groupConsumer(rdb *redis.Client, stream, group, consumer string) {
for {
results, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: group,
Consumer: consumer,
Streams: []string{stream, ">"},
Count: 3,
Block: 3 * time.Second,
}).Result()
if err != nil {
if err == redis.Nil {
fmt.Printf("[%s] No new messages\n", consumer)
continue
}
fmt.Printf("[%s] Error: %v\n", consumer, err)
continue
}
for _, stream := range results {
for _, msg := range stream.Messages {
fmt.Printf("[%s] Processing: %s -> %v\n", consumer, msg.ID, msg.Values)
rdb.XAck(ctx, stream, group, msg.ID)
fmt.Printf("[%s] ACKed: %s\n", consumer, msg.ID)
}
}
}
}
パターン3:イベントソーシング(Event Sourcing)
全状態変更をイベントシーケンスとしてStreamに永続化し、任意時点の集約ルート状態を再構築可能にする。
import redis
import json
from dataclasses import dataclass, field
from typing import List, Dict
r = redis.Redis(host="localhost", port=6379, decode_responses=True)
@dataclass
class OrderState:
order_id: str = ""
status: str = "created"
items: List[Dict] = field(default_factory=list)
total: float = 0.0
version: int = 0
class EventStore:
def __init__(self, aggregate_type: str, aggregate_id: str):
self.stream_key = f"events:{aggregate_type}:{aggregate_id}"
self.aggregate_type = aggregate_type
self.aggregate_id = aggregate_id
def append(self, event_type: str, data: dict):
event = {
"event_type": event_type,
"aggregate_id": self.aggregate_id,
"data": json.dumps(data),
"version": str(self._next_version()),
}
msg_id = r.xadd(self.stream_key, event)
print(f"[EventStore] Appended: {msg_id} type={event_type} version={event['version']}")
return msg_id
def _next_version(self) -> int:
info = r.xinfo_stream(self.stream_key)
length = info.get("length", 0)
return length + 1
def load_events(self) -> List[dict]:
entries = r.xrange(self.stream_key, "-", "+", count=1000)
events = []
for msg_id, fields in entries:
events.append({
"id": msg_id,
"event_type": fields["event_type"],
"data": json.loads(fields["data"]),
"version": int(fields["version"]),
})
return events
class OrderAggregate:
def __init__(self, order_id: str):
self.order_id = order_id
self.store = EventStore("order", order_id)
self.state = OrderState(order_id=order_id)
def create(self, items: List[Dict]):
total = sum(item["price"] * item["qty"] for item in items)
self.store.append("OrderCreated", {"items": items, "total": total})
def add_item(self, item: Dict):
self.store.append("ItemAdded", item)
def pay(self):
self.store.append("OrderPaid", {"payment_method": "credit_card"})
def ship(self, tracking_number: str):
self.store.append("OrderShipped", {"tracking_number": tracking_number})
def rebuild(self) -> OrderState:
events = self.store.load_events()
state = OrderState(order_id=self.order_id)
for event in events:
self._apply(state, event)
self.state = state
return state
def _apply(self, state: OrderState, event: dict):
event_type = event["event_type"]
data = event["data"]
if event_type == "OrderCreated":
state.items = data.get("items", [])
state.total = data.get("total", 0)
state.status = "created"
elif event_type == "ItemAdded":
state.items.append(data)
state.total += data.get("price", 0) * data.get("qty", 1)
elif event_type == "OrderPaid":
state.status = "paid"
elif event_type == "OrderShipped":
state.status = "shipped"
state.version = event["version"]
if __name__ == "__main__":
order = OrderAggregate("ORD-0001")
order.create([{"sku": "WIDGET-01", "price": 29.9, "qty": 2}])
order.add_item({"sku": "GADGET-02", "price": 49.9, "qty": 1})
order.pay()
order.ship("SF1234567890")
rebuilt = order.rebuild()
print(f"\n[Rebuilt State] order_id={rebuilt.order_id} status={rebuilt.status} "
f"total={rebuilt.total} items={len(rebuilt.items)} version={rebuilt.version}")
Go版:
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/redis/go-redis/v9"
)
var ctx = context.Background()
type OrderState struct {
OrderID string `json:"order_id"`
Status string `json:"status"`
Items []map[string]interface{} `json:"items"`
Total float64 `json:"total"`
Version int `json:"version"`
}
type Event struct {
ID string `json:"id"`
EventType string `json:"event_type"`
Data map[string]interface{} `json:"data"`
Version int `json:"version"`
}
type EventStore struct {
rdb *redis.Client
StreamKey string
AggregateID string
}
func NewEventStore(rdb *redis.Client, aggregateType, aggregateID string) *EventStore {
return &EventStore{
rdb: rdb,
StreamKey: fmt.Sprintf("events:%s:%s", aggregateType, aggregateID),
AggregateID: aggregateID,
}
}
func (s *EventStore) Append(eventType string, data map[string]interface{}) (string, error) {
info, err := s.rdb.XInfoStream(ctx, s.StreamKey).Result()
version := 1
if err == nil {
version = int(info.Length) + 1
}
dataJSON, _ := json.Marshal(data)
msgID, err := s.rdb.XAdd(ctx, &redis.XAddArgs{
Stream: s.StreamKey,
Values: map[string]interface{}{
"event_type": eventType,
"aggregate_id": s.AggregateID,
"data": string(dataJSON),
"version": fmt.Sprintf("%d", version),
},
}).Result()
if err != nil {
return "", err
}
fmt.Printf("[EventStore] Appended: %s type=%s version=%d\n", msgID, eventType, version)
return msgID, nil
}
func (s *EventStore) LoadEvents() ([]Event, error) {
entries, err := s.rdb.XRange(ctx, s.StreamKey, "-", "+").Result()
if err != nil {
return nil, err
}
var events []Event
for _, msg := range entries {
var data map[string]interface{}
json.Unmarshal([]byte(msg.Values["data"].(string)), &data)
var version int
fmt.Sscanf(msg.Values["version"].(string), "%d", &version)
events = append(events, Event{
ID: msg.ID,
EventType: msg.Values["event_type"].(string),
Data: data,
Version: version,
})
}
return events, nil
}
type OrderAggregate struct {
OrderID string
Store *EventStore
State OrderState
}
func NewOrderAggregate(rdb *redis.Client, orderID string) *OrderAggregate {
return &OrderAggregate{
OrderID: orderID,
Store: NewEventStore(rdb, "order", orderID),
State: OrderState{OrderID: orderID, Status: "created"},
}
}
func (a *OrderAggregate) Create(items []map[string]interface{}) {
total := 0.0
for _, item := range items {
price, _ := item["price"].(float64)
qty, _ := item["qty"].(float64)
total += price * qty
}
a.Store.Append("OrderCreated", map[string]interface{}{
"items": items, "total": total,
})
}
func (a *OrderAggregate) Pay() {
a.Store.Append("OrderPaid", map[string]interface{}{
"payment_method": "credit_card",
})
}
func (a *OrderAggregate) Ship(trackingNumber string) {
a.Store.Append("OrderShipped", map[string]interface{}{
"tracking_number": trackingNumber,
})
}
func (a *OrderAggregate) Rebuild() (*OrderState, error) {
events, err := a.Store.LoadEvents()
if err != nil {
return nil, err
}
state := OrderState{OrderID: a.OrderID, Status: "created"}
for _, event := range events {
switch event.EventType {
case "OrderCreated":
if items, ok := event.Data["items"].([]interface{}); ok {
for _, item := range items {
if m, ok := item.(map[string]interface{}); ok {
state.Items = append(state.Items, m)
}
}
}
if total, ok := event.Data["total"].(float64); ok {
state.Total = total
}
state.Status = "created"
case "OrderPaid":
state.Status = "paid"
case "OrderShipped":
state.Status = "shipped"
}
state.Version = event.Version
}
a.State = state
return &state, nil
}
func main() {
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
order := NewOrderAggregate(rdb, "ORD-0001")
order.Create([]map[string]interface{}{
{"sku": "WIDGET-01", "price": 29.9, "qty": 2},
})
order.Pay()
order.Ship("SF1234567890")
state, _ := order.Rebuild()
fmt.Printf("\n[Rebuilt State] order_id=%s status=%s total=%.2f items=%d version=%d\n",
state.OrderID, state.Status, state.Total, len(state.Items), state.Version)
}
パターン4:CQRS読み取りモデルプロジェクション
書き込み側がイベントをStreamに公開し、読み取り側がStreamをリスニングしてプロジェクション(読み取りモデル)を更新、Command Query Responsibility Segregationを実現。
import redis
import json
import time
import threading
r = redis.Redis(host="localhost", port=6379, decode_responses=True)
EVENT_STREAM = "events:orders"
PROJECTION_KEY = "projection:order_summary"
GROUP_NAME = "cqrs-projectors"
try:
r.xgroup_create(EVENT_STREAM, GROUP_NAME, id="0", mkstream=True)
except redis.ResponseError:
pass
def command_side_publish_events():
events = [
{"event_type": "OrderCreated", "order_id": "ORD-001", "amount": "299", "user_id": "USR-001"},
{"event_type": "OrderCreated", "order_id": "ORD-002", "amount": "599", "user_id": "USR-002"},
{"event_type": "OrderPaid", "order_id": "ORD-001", "amount": "299"},
{"event_type": "OrderShipped", "order_id": "ORD-001"},
{"event_type": "OrderCreated", "order_id": "ORD-003", "amount": "199", "user_id": "USR-001"},
]
for event in events:
r.xadd(EVENT_STREAM, event)
print(f"[Command] Published: {event['event_type']} -> {event.get('order_id')}")
time.sleep(0.3)
def query_side_projector():
consumer_name = "projector-1"
while True:
entries = r.xreadgroup(
GROUP_NAME, consumer_name,
{EVENT_STREAM: ">"},
count=5, block=3000
)
if not entries:
continue
for stream_name, messages in entries:
for msg_id, fields in messages:
update_projection(fields)
r.xack(EVENT_STREAM, GROUP_NAME, msg_id)
def update_projection(event: dict):
event_type = event.get("event_type")
order_id = event.get("order_id", "")
if event_type == "OrderCreated":
r.hset(PROJECTION_KEY, order_id, json.dumps({
"status": "created",
"amount": event.get("amount", "0"),
"user_id": event.get("user_id", ""),
}))
elif event_type == "OrderPaid":
existing = r.hget(PROJECTION_KEY, order_id)
if existing:
data = json.loads(existing)
data["status"] = "paid"
r.hset(PROJECTION_KEY, order_id, json.dumps(data))
elif event_type == "OrderShipped":
existing = r.hget(PROJECTION_KEY, order_id)
if existing:
data = json.loads(existing)
data["status"] = "shipped"
r.hset(PROJECTION_KEY, order_id, json.dumps(data))
print(f"[Projection] Updated: {order_id} <- {event_type}")
def query_read_model():
time.sleep(3)
all_orders = r.hgetall(PROJECTION_KEY)
print("\n[Read Model] Order Summary:")
for order_id, data in all_orders.items():
print(f" {order_id}: {data}")
if __name__ == "__main__":
threading.Thread(target=command_side_publish_events, daemon=True).start()
threading.Thread(target=query_side_projector, daemon=True).start()
query_read_model()
Go版:
package main
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
var ctx = context.Background()
const (
eventStream = "events:orders"
projectionKey = "projection:order_summary"
groupName = "cqrs-projectors"
)
func main() {
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
rdb.XGroupCreateMkStream(ctx, eventStream, groupName, "0")
go commandSidePublishEvents(rdb)
go querySideProjector(rdb)
time.Sleep(4 * time.Second)
queryReadModel(rdb)
}
func commandSidePublishEvents(rdb *redis.Client) {
events := []map[string]interface{}{
{"event_type": "OrderCreated", "order_id": "ORD-001", "amount": "299", "user_id": "USR-001"},
{"event_type": "OrderCreated", "order_id": "ORD-002", "amount": "599", "user_id": "USR-002"},
{"event_type": "OrderPaid", "order_id": "ORD-001", "amount": "299"},
{"event_type": "OrderShipped", "order_id": "ORD-001"},
{"event_type": "OrderCreated", "order_id": "ORD-003", "amount": "199", "user_id": "USR-001"},
}
for _, event := range events {
rdb.XAdd(ctx, &redis.XAddArgs{Stream: eventStream, Values: event})
fmt.Printf("[Command] Published: %s -> %s\n", event["event_type"], event["order_id"])
time.Sleep(300 * time.Millisecond)
}
}
func querySideProjector(rdb *redis.Client) {
for {
results, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: "projector-1",
Streams: []string{eventStream, ">"},
Count: 5,
Block: 3 * time.Second,
}).Result()
if err != nil {
continue
}
for _, stream := range results {
for _, msg := range stream.Messages {
updateProjection(rdb, msg.Values)
rdb.XAck(ctx, eventStream, groupName, msg.ID)
}
}
}
}
func updateProjection(rdb *redis.Client, fields map[string]interface{}) {
eventType, _ := fields["event_type"].(string)
orderID, _ := fields["order_id"].(string)
switch eventType {
case "OrderCreated":
data, _ := json.Marshal(map[string]string{
"status": "created",
"amount": fmt.Sprintf("%v", fields["amount"]),
"user_id": fmt.Sprintf("%v", fields["user_id"]),
})
rdb.HSet(ctx, projectionKey, orderID, string(data))
case "OrderPaid":
existing, _ := rdb.HGet(ctx, projectionKey, orderID).Result()
if existing != "" {
var data map[string]string
json.Unmarshal([]byte(existing), &data)
data["status"] = "paid"
updated, _ := json.Marshal(data)
rdb.HSet(ctx, projectionKey, orderID, string(updated))
}
case "OrderShipped":
existing, _ := rdb.HGet(ctx, projectionKey, orderID).Result()
if existing != "" {
var data map[string]string
json.Unmarshal([]byte(existing), &data)
data["status"] = "shipped"
updated, _ := json.Marshal(data)
rdb.HSet(ctx, projectionKey, orderID, string(updated))
}
}
fmt.Printf("[Projection] Updated: %s <- %s\n", orderID, eventType)
}
func queryReadModel(rdb *redis.Client) {
orders, _ := rdb.HGetAll(ctx, projectionKey).Result()
fmt.Println("\n[Read Model] Order Summary:")
for orderID, data := range orders {
fmt.Printf(" %s: %s\n", orderID, data)
}
}
パターン5:デッドレターキューとリトライメカニズム
処理失敗がリトライ閾値を超えたメッセージをデッドレターキューに移行し、無限リトライによる消費ブロックを防止する。
import redis
import json
import time
import random
r = redis.Redis(host="localhost", port=6379, decode_responses=True)
MAIN_STREAM = "events:orders"
DLQ_STREAM = "events:orders:dlq"
GROUP_NAME = "order-processors"
MAX_RETRIES = 3
RETRY_DELAYS = [1, 5, 30]
try:
r.xgroup_create(MAIN_STREAM, GROUP_NAME, id="0", mkstream=True)
except redis.ResponseError:
pass
def process_message(fields: dict) -> bool:
order_id = fields.get("order_id", "")
if random.random() < 0.4:
print(f" [FAIL] Processing failed for {order_id}")
return False
print(f" [OK] Processed {order_id}")
return True
def consumer_with_retry(consumer_name: str):
while True:
entries = r.xreadgroup(
GROUP_NAME, consumer_name,
{MAIN_STREAM: ">"},
count=1, block=3000
)
if not entries:
continue
for stream_name, messages in entries:
for msg_id, fields in messages:
success = process_message(fields)
if success:
r.xack(MAIN_STREAM, GROUP_NAME, msg_id)
else:
pending_info = r.xpending_range(
MAIN_STREAM, GROUP_NAME,
min=msg_id, max=msg_id, count=1
)
retry_count = 0
if pending_info:
retry_count = pending_info[0].get("times_delivered", 1) - 1
if retry_count >= MAX_RETRIES:
r.xadd(DLQ_STREAM, {
**fields,
"original_id": msg_id,
"failure_reason": "max_retries_exceeded",
"retry_count": str(retry_count),
})
r.xack(MAIN_STREAM, GROUP_NAME, msg_id)
print(f" [DLQ] Moved to dead letter: {msg_id}")
else:
delay = RETRY_DELAYS[min(retry_count, len(RETRY_DELAYS) - 1)]
print(f" [RETRY] Will retry {msg_id} after {delay}s "
f"(attempt {retry_count + 1}/{MAX_RETRIES})")
r.xack(MAIN_STREAM, GROUP_NAME, msg_id)
time.sleep(delay)
r.xadd(MAIN_STREAM, fields)
def inspect_dlq():
dlq_entries = r.xrange(DLQ_STREAM, "-", "+", count=20)
print(f"\n[DLQ] {len(dlq_entries)} dead letters:")
for msg_id, fields in dlq_entries:
print(f" {msg_id}: order_id={fields.get('order_id')} "
f"retries={fields.get('retry_count')} reason={fields.get('failure_reason')}")
if __name__ == "__main__":
for i in range(1, 11):
r.xadd(MAIN_STREAM, {"order_id": f"ORD-{i:04d}", "amount": str(i * 100)})
random.seed(42)
consumer_with_retry("consumer-1")
Go版:
package main
import (
"context"
"fmt"
"math/rand"
"time"
"github.com/redis/go-redis/v9"
)
var ctx = context.Background()
const (
mainStream = "events:orders"
dlqStream = "events:orders:dlq"
groupName = "order-processors"
maxRetries = 3
)
var retryDelays = []time.Duration{1 * time.Second, 5 * time.Second, 30 * time.Second}
func main() {
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
rdb.XGroupCreateMkStream(ctx, mainStream, groupName, "0")
for i := 1; i <= 10; i++ {
rdb.XAdd(ctx, &redis.XAddArgs{
Stream: mainStream,
Values: map[string]interface{}{
"order_id": fmt.Sprintf("ORD-%04d", i),
"amount": fmt.Sprintf("%d", i*100),
},
})
}
consumerWithRetry(rdb, "consumer-1")
}
func processMessage(fields map[string]interface{}) bool {
orderID := fmt.Sprintf("%v", fields["order_id"])
if rand.Float64() < 0.4 {
fmt.Printf(" [FAIL] Processing failed for %s\n", orderID)
return false
}
fmt.Printf(" [OK] Processed %s\n", orderID)
return true
}
func consumerWithRetry(rdb *redis.Client, consumerName string) {
for {
results, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: consumerName,
Streams: []string{mainStream, ">"},
Count: 1,
Block: 3 * time.Second,
}).Result()
if err != nil {
continue
}
for _, stream := range results {
for _, msg := range stream.Messages {
if processMessage(msg.Values) {
rdb.XAck(ctx, mainStream, groupName, msg.ID)
} else {
pending, _ := rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: mainStream,
Group: groupName,
Start: msg.ID,
End: msg.ID,
Count: 1,
}).Result()
retryCount := 0
if len(pending) > 0 {
retryCount = int(pending[0].DeliveryCount) - 1
}
if retryCount >= maxRetries {
dlqValues := map[string]interface{}{
"original_id": msg.ID,
"failure_reason": "max_retries_exceeded",
"retry_count": fmt.Sprintf("%d", retryCount),
}
for k, v := range msg.Values {
dlqValues[k] = v
}
rdb.XAdd(ctx, &redis.XAddArgs{Stream: dlqStream, Values: dlqValues})
rdb.XAck(ctx, mainStream, groupName, msg.ID)
fmt.Printf(" [DLQ] Moved to dead letter: %s\n", msg.ID)
} else {
delay := retryDelays[min(retryCount, len(retryDelays)-1)]
fmt.Printf(" [RETRY] Will retry %s after %v (attempt %d/%d)\n",
msg.ID, delay, retryCount+1, maxRetries)
rdb.XAck(ctx, mainStream, groupName, msg.ID)
time.Sleep(delay)
rdb.XAdd(ctx, &redis.XAddArgs{Stream: mainStream, Values: msg.Values})
}
}
}
}
}
}
パターン6:プロダクション級イベント処理サービス(Go実装)
グレースフルシャットダウン、ヘルスチェック、メトリクス監視、XCLAIMフェイルオーバーを備えた完全なプロダクション級イベント処理サービス。
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/redis/go-redis/v9"
)
type EventProcessor struct {
rdb *redis.Client
stream string
group string
consumer string
dlqStream string
maxRetries int
processed atomic.Int64
failed atomic.Int64
retried atomic.Int64
mu sync.Mutex
running bool
ctx context.Context
cancel context.CancelFunc
}
func NewEventProcessor(rdb *redis.Client, stream, group, consumer string) *EventProcessor {
ctx, cancel := context.WithCancel(context.Background())
return &EventProcessor{
rdb: rdb,
stream: stream,
group: group,
consumer: consumer,
dlqStream: stream + ":dlq",
maxRetries: 3,
running: true,
ctx: ctx,
cancel: cancel,
}
}
func (p *EventProcessor) Start() {
rdb := p.rdb
rdb.XGroupCreateMkStream(p.ctx, p.stream, p.group, "0")
log.Printf("[Processor] Starting consumer %s for stream %s", p.consumer, p.stream)
go p.claimPendingMessages()
go p.consumeNewMessages()
go p.reportMetrics()
}
func (p *EventProcessor) consumeNewMessages() {
for p.running {
results, err := p.rdb.XReadGroup(p.ctx, &redis.XReadGroupArgs{
Group: p.group,
Consumer: p.consumer,
Streams: []string{p.stream, ">"},
Count: 10,
Block: 2 * time.Second,
}).Result()
if err != nil {
if err == redis.Nil || p.ctx.Err() != nil {
continue
}
log.Printf("[Consumer] Error: %v", err)
continue
}
for _, stream := range results {
for _, msg := range stream.Messages {
p.handleMessage(msg.ID, msg.Values)
}
}
}
}
func (p *EventProcessor) claimPendingMessages() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
pending, err := p.rdb.XPendingExt(p.ctx, &redis.XPendingExtArgs{
Stream: p.stream,
Group: p.group,
Start: "-",
End: "+",
Count: 10,
Idle: 60 * time.Second,
}).Result()
if err != nil {
continue
}
for _, msg := range pending {
claimed, err := p.rdb.XClaim(p.ctx, &redis.XClaimArgs{
Stream: p.stream,
Group: p.group,
Consumer: p.consumer,
MinIdle: 60 * time.Second,
Messages: []string{msg.ID},
}).Result()
if err == nil && len(claimed) > 0 {
log.Printf("[Claim] Claimed pending message: %s", claimed[0].ID)
p.retried.Add(1)
p.handleMessage(claimed[0].ID, claimed[0].Values)
}
}
case <-p.ctx.Done():
return
}
}
}
func (p *EventProcessor) handleMessage(msgID string, values map[string]interface{}) {
err := p.processEvent(values)
if err != nil {
p.failed.Add(1)
pending, _ := p.rdb.XPendingExt(p.ctx, &redis.XPendingExtArgs{
Stream: p.stream,
Group: p.group,
Start: msgID,
End: msgID,
Count: 1,
}).Result()
deliveryCount := 1
if len(pending) > 0 {
deliveryCount = int(pending[0].DeliveryCount)
}
if deliveryCount >= p.maxRetries {
dlqValues := map[string]interface{}{
"original_id": msgID,
"failure_reason": err.Error(),
"retry_count": fmt.Sprintf("%d", deliveryCount),
}
for k, v := range values {
dlqValues[k] = v
}
p.rdb.XAdd(p.ctx, &redis.XAddArgs{Stream: p.dlqStream, Values: dlqValues})
p.rdb.XAck(p.ctx, p.stream, p.group, msgID)
log.Printf("[DLQ] Moved %s to dead letter after %d attempts", msgID, deliveryCount)
} else {
log.Printf("[Retry] Message %s failed (attempt %d/%d): %v",
msgID, deliveryCount, p.maxRetries, err)
}
return
}
p.rdb.XAck(p.ctx, p.stream, p.group, msgID)
p.processed.Add(1)
}
func (p *EventProcessor) processEvent(values map[string]interface{}) error {
eventType, _ := values["event_type"].(string)
switch eventType {
case "order_created":
return p.handleOrderCreated(values)
case "payment_completed":
return p.handlePaymentCompleted(values)
default:
log.Printf("[Process] Unknown event type: %s", eventType)
return nil
}
}
func (p *EventProcessor) handleOrderCreated(values map[string]interface{}) error {
orderID := fmt.Sprintf("%v", values["order_id"])
log.Printf("[Process] Order created: %s", orderID)
return nil
}
func (p *EventProcessor) handlePaymentCompleted(values map[string]interface{}) error {
paymentID := fmt.Sprintf("%v", values["payment_id"])
log.Printf("[Process] Payment completed: %s", paymentID)
return nil
}
func (p *EventProcessor) reportMetrics() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
log.Printf("[Metrics] processed=%d failed=%d retried=%d",
p.processed.Load(), p.failed.Load(), p.retried.Load())
case <-p.ctx.Done():
return
}
}
}
func (p *EventProcessor) Stop() {
log.Println("[Processor] Shutting down gracefully...")
p.running = false
p.cancel()
time.Sleep(2 * time.Second)
log.Printf("[Processor] Final metrics: processed=%d failed=%d retried=%d",
p.processed.Load(), p.failed.Load(), p.retried.Load())
}
func main() {
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
processor := NewEventProcessor(rdb, "events:orders", "order-processors", "worker-1")
processor.Start()
for i := 1; i <= 20; i++ {
rdb.XAdd(context.Background(), &redis.XAddArgs{
Stream: "events:orders",
Values: map[string]interface{}{
"event_type": "order_created",
"order_id": fmt.Sprintf("ORD-%04d", i),
"amount": fmt.Sprintf("%d", i*100),
},
})
}
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
processor.Stop()
}
落とし穴ガイド:5つのよくある間違い
❌ 落とし穴1:XREADでコンシューマーグループを使わず、メッセージが重複消費される
# ❌ 誤り:複数コンシューマーが独立してXREAD、全メッセージが全コンシューマーで処理される
def bad_consumer():
entries = r.xread({STREAM_NAME: "0-0"}, count=10, block=5000)
for _, messages in entries:
for msg_id, fields in messages:
process(fields)
# ✅ 正しい:XREADGROUPを使用、コンシューマーグループが各メッセージの1コンシューマーのみ処理を保証
def good_consumer():
entries = r.xreadgroup(
GROUP_NAME, "consumer-1",
{STREAM_NAME: ">"},
count=10, block=5000
)
for _, messages in entries:
for msg_id, fields in messages:
process(fields)
r.xack(STREAM_NAME, GROUP_NAME, msg_id)
❌ 落とし穴2:XACKを忘れ、メッセージがPendingリストに永遠に残る
// ❌ 誤り:メッセージ処理後にACKせず、メッセージがPending状態のまま
func badConsumer(rdb *redis.Client) {
results, _ := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: "consumer-1",
Streams: []string{streamName, ">"},
Count: 10,
}).Result()
for _, stream := range results {
for _, msg := range stream.Messages {
processEvent(msg.Values)
// ACK忘れ!
}
}
}
// ✅ 正しい:処理成功後すぐにACK
func goodConsumer(rdb *redis.Client) {
results, _ := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: "consumer-1",
Streams: []string{streamName, ">"},
Count: 10,
}).Result()
for _, stream := range results {
for _, msg := range stream.Messages {
if err := processEvent(msg.Values); err == nil {
rdb.XAck(ctx, streamName, groupName, msg.ID)
}
}
}
}
❌ 落とし穴3:Streamが無限増大してメモリオーバーフロー
# ❌ 誤り:書き込みだけでトリムなし、Streamが無限増大
r.xadd("events:orders", {"data": "..."})
# XTRIMやXDELを呼び出さない
# ✅ 正しい:MAXLENでStream長を制限、または定期的にXTRIM
r.xadd("events:orders", {"data": "..."}, maxlen=10000, approximate=True)
# または手動トリム
r.xtrim("events:orders", maxlen=10000, approximate=True)
❌ 落とし穴4:XCLAIMでアイドル時間をチェックしない
// ❌ 誤り:全Pendingメッセージを無差別にXCLAIM、処理中のメッセージを奪う可能性
func badClaim(rdb *redis.Client) {
pending, _ := rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: streamName,
Group: groupName,
Start: "-",
End: "+",
Count: 100,
}).Result()
for _, msg := range pending {
rdb.XClaim(ctx, &redis.XClaimArgs{
Stream: streamName,
Group: groupName,
Consumer: "consumer-2",
Messages: []string{msg.ID},
})
}
}
// ✅ 正しい:アイドル時間が閾値を超えたPendingメッセージのみClaim
func goodClaim(rdb *redis.Client) {
pending, _ := rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: streamName,
Group: groupName,
Start: "-",
End: "+",
Count: 100,
Idle: 60 * time.Second, // 60秒以上アイドルのもののみClaim
}).Result()
for _, msg := range pending {
rdb.XClaim(ctx, &redis.XClaimArgs{
Stream: streamName,
Group: groupName,
Consumer: "consumer-2",
MinIdle: 60 * time.Second,
Messages: []string{msg.ID},
})
}
}
❌ 落とし穴5:コンシューマーグループ作成時のID設定ミス
# ❌ 誤り:$でコンシューマーグループを作成、新メッセージのみ消費、履歴メッセージが損失
r.xgroup_create(STREAM_NAME, GROUP_NAME, id="$")
# ✅ 正しい:0でコンシューマーグループを作成、全メッセージを最初から消費
r.xgroup_create(STREAM_NAME, GROUP_NAME, id="0")
# 新メッセージのみ消費したい場合は、意図を明示的にコメント
r.xgroup_create(STREAM_NAME, GROUP_NAME, id="$") # 新メッセージのみ消費
エラートラブルシューティング表
| エラー現象 | 可能な原因 | 診断コマンド | 解決策 |
|---|---|---|---|
NOGROUP No such key |
Streamまたはコンシューマーグループが存在しない | XINFO GROUPS events:orders |
先にXGROUP CREATEでグループ作成、MKSTREAMでStream自動作成 |
BUSYGROUP Consumer Group name already exists |
コンシューマーグループの重複作成 | XINFO GROUPS events:orders |
例外をキャッチして無視、または事前にグループ存在確認 |
| コンシューマーがメッセージを受信しない | XREADGROUPで0-0を使用(>ではない) |
コード内のStreamsパラメータを確認 | 新メッセージは>、未確認メッセージは0-0 |
| Pendingメッセージの蓄積 | コンシューマークラッシュで未ACK | XPENDING events:orders group-name |
XCLAIMでアクティブなコンシューマーに移管 |
| Streamメモリが増加し続ける | MAXLEN/XTRIM未設定 | XINFO STREAM events:ordersでlength確認 |
XADD時にMAXLEN ~ 10000を追加、または定期的にXTRIM |
| メッセージが重複消費される | 処理完了後ACKに失敗 | ACKが処理ロジックの後に実行されているか確認 | ACKがビジネスロジック成功後に実行されることを保証 |
| XCLAIM後も元コンシューマーが処理 | MinIdleの設定が短すぎる | XPENDINGでメッセージのアイドル時間を確認 |
合理的なMinIdle閾値を設定(例:60秒) |
| イベントソーシングのリプレイが遅い | イベント数が多すぎる、フルXRANGE | XLEN events:order:ORD-001 |
定期スナップショット+増分イベント、XRANGEでID範囲指定リプレイ |
| CQRS読み取りモデルが不整合 | プロジェクションコンシューマーの遅延またはクラッシュ | XINFO CONSUMERSでコンシューマー遅延を確認 |
コンシューマーlagを監視、アラート閾値を設定 |
XADDのID形式が不正 |
Redisバージョンが5.0未満 | redis-server --version |
Redis 5.0+にアップグレード |
高度な最適化
1. Streamシャーディングとパーティショニング戦略
単一Streamのスループットが不足する場合、ビジネスキーで複数Streamにシャーディング:
import hashlib
def get_shard_stream(base_stream: str, key: str, shard_count: int = 16) -> str:
shard_index = int(hashlib.md5(key.encode()).hexdigest(), 16) % shard_count
return f"{base_stream}:shard-{shard_index:03d}"
order_id = "ORD-0001"
stream = get_shard_stream("events:orders", order_id)
r.xadd(stream, {"order_id": order_id, "event_type": "order_created"})
2. コンシューマーLag監視とアラート
func monitorConsumerLag(rdb *redis.Client, stream, group string) {
ticker := time.NewTicker(10 * time.Second)
for range ticker.C {
info, err := rdb.XInfoStream(ctx, stream).Result()
if err != nil {
continue
}
groups, _ := rdb.XInfoGroups(ctx, stream).Result()
for _, g := range groups {
if g.Name == group {
pending := g.Pending
consumers, _ := rdb.XInfoConsumers(ctx, stream, group).Result()
for _, c := range consumers {
if c.Pending > 100 {
log.Printf("[ALERT] Consumer %s has %d pending messages!",
c.Name, c.Pending)
}
if c.Idle > 5*time.Minute {
log.Printf("[WARN] Consumer %s idle for %v", c.Name, c.Idle)
}
}
if pending > 1000 {
log.Printf("[ALERT] Group %s has %d pending messages!", group, pending)
}
_ = info
}
}
}
}
3. イベントスナップショットと増分リプレイ
import json
SNAPSHOT_INTERVAL = 100
def save_snapshot(aggregate_id: str, state: dict, version: int):
snapshot_key = f"snapshot:order:{aggregate_id}"
r.hset(snapshot_key, mapping={
"state": json.dumps(state),
"version": str(version),
"timestamp": str(time.time()),
})
def load_from_snapshot(aggregate_id: str):
snapshot_key = f"snapshot:order:{aggregate_id}"
data = r.hgetall(snapshot_key)
if not data:
return None, 0
state = json.loads(data["state"])
version = int(data["version"])
return state, version
def rebuild_with_snapshot(aggregate_id: str):
state, version = load_from_snapshot(aggregate_id)
stream_key = f"events:order:{aggregate_id}"
if state is None:
events = r.xrange(stream_key, "-", "+")
else:
min_id = f"{version + 1}-0"
events = r.xrange(stream_key, min_id, "+")
for msg_id, fields in events:
state = apply_event(state or {}, fields)
return state
比較:Redis Streams vs 他のメッセージングシステム
| 特徴 | Redis Streams | Kafka | RabbitMQ | NATS JetStream | Pulsar |
|---|---|---|---|---|---|
| 位置づけ | 軽量イベントストリーム | 分散イベントストリーミングプラットフォーム | メッセージブローカー | クラウドネイティブメッセージング | 分散メッセージングプラットフォーム |
| デプロイ複雑度 | ★☆☆☆☆ | ★★★★★ | ★★★☆☆ | ★★☆☆☆ | ★★★★☆ |
| スループット | 10-50万/s | 100万+/s | 5-10万/s | 50-100万/s | 100万+/s |
| メッセージ永続化 | AOF/RDB | ディスクログ | オプション永続化 | ディスクログ | BookKeeper |
| コンシューマーグループ | ✅ ネイティブ対応 | ✅ ネイティブ対応 | ❌ プラグイン必要 | ✅ ネイティブ対応 | ✅ ネイティブ対応 |
| メッセージリプレイ | ✅ XRANGE | ✅ Offsetリセット | ❌ 非対応 | ✅ 対応 | ✅ 対応 |
| Exactly-Once | ❌ At-Least-Once | ✅ トランザクション対応 | ❌ プラグイン必要 | ✅ 対応 | ✅ 対応 |
| デッドレターキュー | ❌ 手動実装必要 | ❌ 手動実装必要 | ✅ ネイティブ対応 | ✅ ネイティブ対応 | ✅ ネイティブ対応 |
| パーティショニング | ❌ 手動シャーディング | ✅ 自動パーティショニング | ✅ Exchangeルーティング | ✅ 自動パーティショニング | ✅ 自動パーティショニング |
| 運用コスト | 非常に低い | 高い(ZooKeeper/KRaft) | 中程度 | 低い | 高い(BookKeeper) |
| ユースケース | 中小規模マイクロサービスイベント通信 | 大規模データストリーム処理 | エンタープライズメッセージルーティング | クラウドネイティブ軽量メッセージング | マルチテナント大規模メッセージング |
まとめ
Redis Streamsは2026年も中小規模マイクロサービスイベント駆動アーキテクチャの最適な軽量選択肢であり続ける。Kafkaのような重いインフラもRabbitMQのような複雑なExchange設定も不要で、6つのコアパターンで基本プロデューサー/コンシューマーからプロダクション級イベント処理まで全シナリオをカバーする。覚えておくべき3つのポイント:必ずコンシューマーグループ(XREADGROUP)を使う、処理後は必ずXACKする、Streamには必ずMAXLENを設定してメモリオーバーフローを防ぐ。Exactly-Onceセマンティクスや100万級スループットが必要になったら、KafkaやPulsarへのアップグレードを検討しよう。
おすすめツール
- JSONフォーマッター — Redis Stream内のJSONイベントデータをフォーマット
- Base64エンコード/デコード — Stream内のバイナリイベントペイロードをエンコード/デコード
- ハッシュ計算 — イベントデータのハッシュ値を計算して冪等性チェックに使用
ブラウザローカルツールを無料で試す →