Redis分散ロック実践:Redlockからプロダクション級ロックサービスまで5つの実装パターン
数据库
分散ロック:書いたコードより踏んだ罠の方が多い
在庫のオーバーセル、定期タスクの重複実行、冪等APIの同時実行突破——これらのプロダクション事故の根本原因はすべて分散同時制御の失敗です。SET NX EXでロックを取得しても、ロックが期限切れになってビジネスがまだ終わっていない。Redissonに切り替えると、ウォッチドッグの更新がGC一時停止中に失敗する。Redlockを試すと、Martin Kleppmannの論文に怖気づく。2026年、Redis分散ロックは分散システムで最もバグを生みやすいコンポーネントの一つです。
本記事では5つの実装パターンから出発し、基本ロック→再入可能ロック→Redlock→ロック更新→プロダクション級ロックサービスのフルパイプラインを実践します。
Redis分散ロックコア概念
| 概念 | 説明 |
|---|---|
| SET NX EX | Redisネイティブコマンド、NXは存在しない場合のみ設定、EXは秒単位の有効期限 |
| 再入可能ロック | 同一スレッド/コルーチンが同じロックを複数回取得可能、カウンターが必要 |
| ウォッチドッグ | バックグラウンド定期更新スレッド、ビジネス完了前のロック期限切れを防止 |
| Redlockアルゴリズム | マルチノード分散ロック、N/2+1ノードで取得成功が必要 |
| Luaスクリプト | 原子操作の保証、ロック取得/解放のcheck-and-setは原子でなければならない |
| フェアロック | リクエスト順にロックを取得、飢餓問題を回避 |
| 読み書きロック | 読みロック共有・書きロック排他、読み多いシナリオの並行度を向上 |
| セマフォ | N個のホルダーが同時取得可能、レート制限/リソースプールに使用 |
問題分析:分散ロックの5つの課題
- ロックタイムアウトとビジネス時間の不一致:ロック10秒期限切れ、ビジネス15秒実行で早期解放・同時突破
- GC一時停止によるウォッチドッグ失敗:JVM/GoランタイムのSTW一時停止で更新スレッドが時間通りに実行できない
- Redlockのクロックドリフト問題:マルチノードの時計非同期がロック安全性を損なう可能性
- 他人のロックの誤削除:Aのロック期限切れ後Bが取得、Aの解放がBのロックを削除
- ネットワーク分断下のスプリットブレイン:クライアントとRedisノードのネットワーク切断でロック状態不一致
ステップバイステップ:5つのRedis分散ロック実装
パターン1:基本SET NX EXロック
import redis
import uuid
import time
class RedisBasicLock:
def __init__(self, redis_client: redis.Redis, lock_name: str, timeout: int = 10):
self.redis = redis_client
self.lock_name = f"lock:{lock_name}"
self.timeout = timeout
self.identifier = str(uuid.uuid4())
def acquire(self) -> bool:
result = self.redis.set(
self.lock_name,
self.identifier,
nx=True,
ex=self.timeout
)
return result is not None
def release(self) -> bool:
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
result = self.redis.eval(lua_script, 1, self.lock_name, self.identifier)
return result == 1
def __enter__(self):
if not self.acquire():
raise RuntimeError(f"Failed to acquire lock: {self.lock_name}")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
パターン2:再入可能ロック(Reentrant Lock)
import redis
import uuid
import threading
class RedisReentrantLock:
def __init__(self, redis_client: redis.Redis, lock_name: str, timeout: int = 30):
self.redis = redis_client
self.lock_name = f"reentrant_lock:{lock_name}"
self.timeout = timeout
self.identifier = str(uuid.uuid4())
self._local = threading.local()
def acquire(self) -> bool:
count = getattr(self._local, 'count', 0)
if count > 0:
self._local.count = count + 1
return True
lua_acquire = """
if redis.call("exists", KEYS[1]) == 0 then
redis.call("hset", KEYS[1], ARGV[1], 1)
redis.call("expire", KEYS[1], ARGV[2])
return 1
elseif redis.call("hexists", KEYS[1], ARGV[1]) == 1 then
redis.call("hincrby", KEYS[1], ARGV[1], 1)
redis.call("expire", KEYS[1], ARGV[2])
return 1
else
return 0
end
"""
result = self.redis.eval(lua_acquire, 1, self.lock_name, self.identifier, str(self.timeout))
if result == 1:
self._local.count = 1
return True
return False
def release(self) -> bool:
count = getattr(self._local, 'count', 0)
if count == 0:
return False
if count > 1:
self._local.count = count - 1
lua_decr = """
if redis.call("hexists", KEYS[1], ARGV[1]) == 1 then
redis.call("hincrby", KEYS[1], ARGV[1], -1)
return 1
else
return 0
end
"""
self.redis.eval(lua_decr, 1, self.lock_name, self.identifier)
return True
lua_release = """
if redis.call("hexists", KEYS[1], ARGV[1]) == 0 then
return 0
elseif redis.call("hincrby", KEYS[1], ARGV[1], -1) > 0 then
redis.call("expire", KEYS[1], ARGV[2])
return 1
else
return redis.call("del", KEYS[1])
end
"""
result = self.redis.eval(lua_release, 1, self.lock_name, self.identifier, str(self.timeout))
self._local.count = 0
return result in (1,)
def __enter__(self):
if not self.acquire():
raise RuntimeError(f"Failed to acquire reentrant lock: {self.lock_name}")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
パターン3:ウォッチドッグ自動更新ロック
import redis
import uuid
import threading
import time
import logging
logger = logging.getLogger(__name__)
class RedisWatchdogLock:
def __init__(self, redis_client: redis.Redis, lock_name: str, timeout: int = 30, renewal_interval: int = 10):
self.redis = redis_client
self.lock_name = f"watchdog_lock:{lock_name}"
self.timeout = timeout
self.renewal_interval = renewal_interval
self.identifier = str(uuid.uuid4())
self._watchdog_thread = None
self._stop_event = threading.Event()
def acquire(self, blocking: bool = True, wait_timeout: float = 30.0) -> bool:
deadline = time.time() + wait_timeout
while True:
result = self.redis.set(self.lock_name, self.identifier, nx=True, ex=self.timeout)
if result is not None:
self._start_watchdog()
return True
if not blocking:
return False
if time.time() >= deadline:
return False
time.sleep(0.1)
def _start_watchdog(self):
self._stop_event.clear()
self._watchdog_thread = threading.Thread(target=self._watchdog_loop, daemon=True)
self._watchdog_thread.start()
def _watchdog_loop(self):
while not self._stop_event.is_set():
self._stop_event.wait(self.renewal_interval)
if self._stop_event.is_set():
break
try:
lua_renew = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("expire", KEYS[1], ARGV[2])
else
return 0
end
"""
result = self.redis.eval(lua_renew, 1, self.lock_name, self.identifier, str(self.timeout))
if result != 1:
logger.warning("Watchdog renewal failed for lock %s", self.lock_name)
break
except Exception as e:
logger.error("Watchdog error: %s", e)
break
def release(self) -> bool:
self._stop_event.set()
if self._watchdog_thread and self._watchdog_thread.is_alive():
self._watchdog_thread.join(timeout=2.0)
lua_release = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
result = self.redis.eval(lua_release, 1, self.lock_name, self.identifier)
return result == 1
def __enter__(self):
if not self.acquire():
raise RuntimeError(f"Failed to acquire watchdog lock: {self.lock_name}")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
パターン4:Redlockマルチノードロック
import redis
import uuid
import time
import logging
from typing import List
logger = logging.getLogger(__name__)
class Redlock:
def __init__(self, redis_clients: List[redis.Redis], lock_name: str, timeout: int = 10, retry_count: int = 3, retry_delay: float = 0.2):
self.redis_clients = redis_clients
self.quorum = len(redis_clients) // 2 + 1
self.lock_name = f"redlock:{lock_name}"
self.timeout = timeout
self.retry_count = retry_count
self.retry_delay = retry_delay
self.identifier = str(uuid.uuid4())
def acquire(self) -> bool:
for attempt in range(self.retry_count):
acquired_count = 0
start_time = time.monotonic()
for client in self.redis_clients:
try:
result = client.set(self.lock_name, self.identifier, nx=True, ex=self.timeout)
if result is not None:
acquired_count += 1
except Exception as e:
logger.warning("Redlock acquire error on node: %s", e)
elapsed = time.monotonic() - start_time
validity_time = self.timeout - elapsed
if acquired_count >= self.quorum and validity_time > 0:
return True
self._release_all_nodes()
if attempt < self.retry_count - 1:
jitter = (attempt * 0.01)
time.sleep(self.retry_delay + jitter)
return False
def _release_all_nodes(self):
lua_release = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
for client in self.redis_clients:
try:
client.eval(lua_release, 1, self.lock_name, self.identifier)
except Exception as e:
logger.warning("Redlock release error on node: %s", e)
def release(self) -> bool:
self._release_all_nodes()
return True
def __enter__(self):
if not self.acquire():
raise RuntimeError(f"Failed to acquire Redlock: {self.lock_name}")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
パターン5:プロダクション級ロックサービス(Go実装)
package distlock
import (
"context"
"crypto/rand"
"encoding/hex"
"fmt"
"log"
"sync"
"time"
"github.com/redis/go-redis/v9"
)
type LockService struct {
client *redis.Client
watchdogCancel map[string]context.CancelFunc
mu sync.Mutex
}
func NewLockService(client *redis.Client) *LockService {
return &LockService{
client: client,
watchdogCancel: make(map[string]context.CancelFunc),
}
}
type LockOptions struct {
Timeout time.Duration
RenewalInterval time.Duration
RetryCount int
RetryDelay time.Duration
}
func DefaultLockOptions() LockOptions {
return LockOptions{
Timeout: 30 * time.Second,
RenewalInterval: 10 * time.Second,
RetryCount: 3,
RetryDelay: 200 * time.Millisecond,
}
}
func generateIdentifier() string {
b := make([]byte, 16)
rand.Read(b)
return hex.EncodeToString(b)
}
var acquireScript = redis.NewScript(`
if redis.call("exists", KEYS[1]) == 0 then
redis.call("hset", KEYS[1], "identifier", ARGV[1], "count", 1)
redis.call("expire", KEYS[1], ARGV[2])
return 1
elseif redis.call("hget", KEYS[1], "identifier") == ARGV[1] then
redis.call("hincrby", KEYS[1], "count", 1)
redis.call("expire", KEYS[1], ARGV[2])
return 1
else
return 0
end
`)
var releaseScript = redis.NewScript(`
if redis.call("hget", KEYS[1], "identifier") ~= ARGV[1] then
return 0
end
local count = redis.call("hincrby", KEYS[1], "count", -1)
if count > 0 then
redis.call("expire", KEYS[1], ARGV[2])
return 1
end
return redis.call("del", KEYS[1])
`)
var renewScript = redis.NewScript(`
if redis.call("hget", KEYS[1], "identifier") == ARGV[1] then
return redis.call("expire", KEYS[1], ARGV[2])
else
return 0
end
`)
func (ls *LockService) Acquire(ctx context.Context, lockName string, opts LockOptions) (string, error) {
identifier := generateIdentifier()
key := fmt.Sprintf("lock_service:%s", lockName)
for i := 0; i < opts.RetryCount; i++ {
result, err := acquireScript.Run(ctx, ls.client, []string{key}, identifier, int(opts.Timeout.Seconds())).Int()
if err != nil {
return "", fmt.Errorf("acquire script error: %w", err)
}
if result == 1 {
ls.startWatchdog(ctx, key, identifier, opts)
return identifier, nil
}
select {
case <-ctx.Done():
return "", ctx.Err()
case <-time.After(opts.RetryDelay):
}
}
return "", fmt.Errorf("failed to acquire lock after %d retries", opts.RetryCount)
}
func (ls *LockService) startWatchdog(ctx context.Context, key, identifier string, opts LockOptions) {
wdCtx, cancel := context.WithCancel(context.Background())
ls.mu.Lock()
ls.watchdogCancel[key+":"+identifier] = cancel
ls.mu.Unlock()
go func() {
defer cancel()
ticker := time.NewTicker(opts.RenewalInterval)
defer ticker.Stop()
for {
select {
case <-wdCtx.Done():
return
case <-ticker.C:
result, err := renewScript.Run(wdCtx, ls.client, []string{key}, identifier, int(opts.Timeout.Seconds())).Int()
if err != nil || result != 1 {
log.Printf("Watchdog renewal failed for key=%s identifier=%s: result=%d err=%v", key, identifier, result, err)
return
}
}
}
}()
}
func (ls *LockService) Release(ctx context.Context, lockName, identifier string, opts LockOptions) error {
key := fmt.Sprintf("lock_service:%s", lockName)
ls.mu.Lock()
if cancel, ok := ls.watchdogCancel[key+":"+identifier]; ok {
cancel()
delete(ls.watchdogCancel, key+":"+identifier)
}
ls.mu.Unlock()
result, err := releaseScript.Run(ctx, ls.client, []string{key}, identifier, int(opts.Timeout.Seconds())).Int()
if err != nil {
return fmt.Errorf("release script error: %w", err)
}
if result == 0 {
return fmt.Errorf("lock not owned by identifier %s", identifier)
}
return nil
}
落とし穴ガイド
落とし穴1:オーナー検証なしでDEL
# ❌ 誤り:直接削除、他人のロックを削除する可能性
redis_client.delete("lock:order:123")
# ✅ 正しい:Luaスクリプトで原子check-and-delete
lua = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
redis_client.eval(lua, 1, "lock:order:123", my_identifier)
落とし穴2:ロックタイムアウトが短すぎる
# ❌ 誤り:3秒タイムアウト、遅いクエリで期限切れ
redis_client.set("lock:order", identifier, nx=True, ex=3)
# ✅ 正しい:ウォッチドッグ更新 + 適切な初期タイムアウト
lock = RedisWatchdogLock(redis_client, "order", timeout=30, renewal_interval=10)
with lock:
process_order()
落とし穴3:再入可能ロックでカウントなし
# ❌ 誤り:毎回SET NX、ネスト呼び出しでロック取得不可
def outer():
with basic_lock:
inner()
def inner():
with basic_lock: # デッドロック!自身のロックを再取得できない
pass
# ✅ 正しい:再入可能ロックを使用
def outer():
with reentrant_lock:
inner()
def inner():
with reentrant_lock: # カウント+1、正常に取得可能
pass
落とし穴4:Redlockでクロックドリフトを無視
# ❌ 誤り:ロックの残り有効時間を検証しない
acquired_count = 0
for client in redis_clients:
result = client.set(lock_name, identifier, nx=True, ex=timeout)
if result:
acquired_count += 1
if acquired_count >= quorum:
return True # ロックがもうすぐ期限切れかも!
# ✅ 正しい:有効時間を検証
start = time.monotonic()
# ... 取得ロジック ...
elapsed = time.monotonic() - start
validity = timeout - elapsed
if acquired_count >= quorum and validity > 0:
return True
落とし穴5:ウォッチドッグ更新間隔=ロックタイムアウト
# ❌ 誤り:更新間隔30秒 = ロックタイムアウト30秒、GC一時停止で更新不可
lock = RedisWatchdogLock(client, "order", timeout=30, renewal_interval=30)
# ✅ 正しい:更新間隔 = ロックタイムアウト / 3
lock = RedisWatchdogLock(client, "order", timeout=30, renewal_interval=10)
エラートラブルシューティング
| # | エラーメッセージ | 原因 | 解決方法 |
|---|---|---|---|
| 1 | UNLOCK_FAILED: lock not owned |
解放時のidentifier不一致 | ロック取得/解放で同じidentifierを使用 |
| 2 | LOCK_TIMEOUT: acquire failed after retries |
ロック長時間保持または競合激しい | リトライ回数増加、デッドロック確認 |
| 3 | WATCHDOG_RENEWAL_FAILED |
ウォッチドッグ更新失敗、ロック削除/期限切れ | ネットワーク確認、タイムアウト設定検証 |
| 4 | RedisConnectionError |
Redis接続断 | 接続プールリトライ設定、Sentinel/Cluster使用 |
| 5 | LuaScriptError: wrong number of arguments |
Luaスクリプト引数不一致 | KEYSとARGVの数と順序を確認 |
| 6 | Redlock quorum not reached |
過半数ノードで取得失敗 | ノード状態確認、retry_count増加 |
| 7 | CONCURRENT_MODIFICATION: data inconsistency |
ロック早期解放による同時変更 | ウォッチドッグ更新使用、タイムアウト増加 |
| 8 | OOM: Redis out of memory |
期限なしロックキーの蓄積 | SET NX EXのEXパラメータが有効であることを確認 |
| 9 | DEADLOCK: circular wait detected |
マルチロックの循環待機 | ロック順序統一、グローバルタイムアウト設定 |
| 10 | CLOCK_DRIFT: lock validity expired |
Redlockノードの時計偏差過大 | NTP同期設定、validity time検証 |
高度な最適化
1. フェアロック実装
class RedisFairLock:
def __init__(self, redis_client: redis.Redis, lock_name: str, timeout: int = 30):
self.redis = redis_client
self.lock_name = f"fair_lock:{lock_name}"
self.queue_name = f"fair_lock_queue:{lock_name}"
self.timeout = timeout
self.identifier = str(uuid.uuid4())
def acquire(self, wait_timeout: float = 30.0) -> bool:
timestamp = time.time()
self.redis.zadd(self.queue_name, {self.identifier: timestamp})
self.redis.expire(self.queue_name, wait_timeout * 2)
deadline = time.time() + wait_timeout
while time.time() < deadline:
lua = """
local first = redis.call("zrange", KEYS[2], 0, 0)
if first[1] == ARGV[1] then
local result = redis.call("set", KEYS[1], ARGV[1], "nx", "ex", ARGV[2])
if result then
redis.call("zrem", KEYS[2], ARGV[1])
return 1
end
end
return 0
"""
result = self.redis.eval(lua, 2, self.lock_name, self.queue_name, self.identifier, str(self.timeout))
if result == 1:
return True
time.sleep(0.05)
self.redis.zrem(self.queue_name, self.identifier)
return False
def release(self) -> bool:
lua = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
result = self.redis.eval(lua, 1, self.lock_name, self.identifier)
return result == 1
2. 読み書きロック実装
class RedisReadWriteLock:
def __init__(self, redis_client: redis.Redis, lock_name: str, timeout: int = 30):
self.redis = redis_client
self.read_lock_name = f"rw_lock:{lock_name}:read"
self.write_lock_name = f"rw_lock:{lock_name}:write"
self.timeout = timeout
self.identifier = str(uuid.uuid4())
def acquire_read(self) -> bool:
lua = """
if redis.call("exists", KEYS[2]) == 1 then
return 0
end
redis.call("hincrby", KEYS[1], "readers", 1)
redis.call("expire", KEYS[1], ARGV[2])
return 1
"""
result = self.redis.eval(lua, 2, self.read_lock_name, self.write_lock_name, self.identifier, str(self.timeout))
return result == 1
def release_read(self) -> bool:
lua = """
local count = redis.call("hincrby", KEYS[1], "readers", -1)
if count <= 0 then
redis.call("del", KEYS[1])
end
return 1
"""
self.redis.eval(lua, 1, self.read_lock_name, self.identifier)
return True
def acquire_write(self) -> bool:
result = self.redis.set(self.write_lock_name, self.identifier, nx=True, ex=self.timeout)
if result is None:
return False
lua = """
if redis.call("exists", KEYS[1]) == 1 and redis.call("hget", KEYS[1], "readers") ~= "0" then
redis.call("del", KEYS[2])
return 0
end
return 1
"""
check = self.redis.eval(lua, 2, self.read_lock_name, self.write_lock_name, self.identifier)
return check == 1
def release_write(self) -> bool:
lua = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
result = self.redis.eval(lua, 1, self.write_lock_name, self.identifier)
return result == 1
3. ロック監視メトリクス収集
package distlock
import (
"context"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
type LockMetrics struct {
LockName string
CurrentHolder string
RemainTTL time.Duration
AcquireCount int64
WaitQueueLen int64
}
func CollectLockMetrics(ctx context.Context, client *redis.Client, lockName string) (*LockMetrics, error) {
key := fmt.Sprintf("lock_service:%s", lockName)
ttl, err := client.TTL(ctx, key).Result()
if err != nil {
return nil, err
}
identifier, _ := client.HGet(ctx, key, "identifier").Result()
count, _ := client.HGet(ctx, key, "count").Int64()
queueKey := fmt.Sprintf("fair_lock_queue:%s", lockName)
queueLen, _ := client.ZCard(ctx, queueKey).Result()
return &LockMetrics{
LockName: lockName,
CurrentHolder: identifier,
RemainTTL: ttl,
AcquireCount: count,
WaitQueueLen: queueLen,
}, nil
}
比較分析
| 次元 | SET NX EX | 再入可能ロック | ウォッチドッグロック | Redlock | プロダクション級ロックサービス |
|---|---|---|---|---|---|
| 実装複雑度 | ⭐低 | ⭐⭐中 | ⭐⭐⭐高 | ⭐⭐⭐高 | ⭐⭐⭐⭐極高 |
| 原子性 | ⚠️Lua必要 | ✅Lua保証 | ✅Lua保証 | ✅マルチノード | ✅Lua保証 |
| 再入可能 | ❌ | ✅ | ✅ | ❌ | ✅ |
| 自動更新 | ❌ | ❌ | ✅ | ❌ | ✅ |
| マルチノード耐障害性 | ❌ | ❌ | ❌ | ✅ | オプション |
| 誤削除防止 | ⚠️Lua必要 | ✅ | ✅ | ✅ | ✅ |
| GC耐性 | ⭐高 | ⭐高 | ⚠️中 | ⭐高 | ⚠️中 |
| プロダクション推奨 | プロトタイプ検証 | 標準ビジネス | 長トランザクション | 高可用性 | クリティカルパス |
まとめ:Redis分散ロックは「1つのコマンド」ではなく「1つの体系」のエンジニアリングです。SET NX EXからプロダクション級ロックサービスまで、コア原則は3つだけ:原子操作にはLua、ロックタイムアウトは更新、解放前にオーナー検証。Redlockは多くのシナリオで過剰設計です——単一ノード+Sentinel HAで99%のビジネスケースに対応できます。Redlockマルチノード方案に投資すべきは、「ロック失敗が深刻なデータ不整合を引き起こす」クリティカルパスのみです。
オンラインツール推奨
- Hash計算:/ja/encode/hash
- JSONフォーマッター:/ja/json/format
- Base64エンコード/デコード:/ja/encode/base64
- JWTデコード:/ja/encode/jwt-decode
ブラウザローカルツールを無料で試す →
#Redis#分布式锁#Redlock#并发控制#分布式系统#2026#Redisson