Redis分散ロック実践:Redlockからプロダクション級ロックサービスまで5つの実装パターン

数据库

分散ロック:書いたコードより踏んだ罠の方が多い

在庫のオーバーセル、定期タスクの重複実行、冪等APIの同時実行突破——これらのプロダクション事故の根本原因はすべて分散同時制御の失敗です。SET NX EXでロックを取得しても、ロックが期限切れになってビジネスがまだ終わっていない。Redissonに切り替えると、ウォッチドッグの更新がGC一時停止中に失敗する。Redlockを試すと、Martin Kleppmannの論文に怖気づく。2026年、Redis分散ロックは分散システムで最もバグを生みやすいコンポーネントの一つです。

本記事では5つの実装パターンから出発し、基本ロック→再入可能ロック→Redlock→ロック更新→プロダクション級ロックサービスのフルパイプラインを実践します。


Redis分散ロックコア概念

概念 説明
SET NX EX Redisネイティブコマンド、NXは存在しない場合のみ設定、EXは秒単位の有効期限
再入可能ロック 同一スレッド/コルーチンが同じロックを複数回取得可能、カウンターが必要
ウォッチドッグ バックグラウンド定期更新スレッド、ビジネス完了前のロック期限切れを防止
Redlockアルゴリズム マルチノード分散ロック、N/2+1ノードで取得成功が必要
Luaスクリプト 原子操作の保証、ロック取得/解放のcheck-and-setは原子でなければならない
フェアロック リクエスト順にロックを取得、飢餓問題を回避
読み書きロック 読みロック共有・書きロック排他、読み多いシナリオの並行度を向上
セマフォ N個のホルダーが同時取得可能、レート制限/リソースプールに使用

問題分析:分散ロックの5つの課題

  1. ロックタイムアウトとビジネス時間の不一致:ロック10秒期限切れ、ビジネス15秒実行で早期解放・同時突破
  2. GC一時停止によるウォッチドッグ失敗:JVM/GoランタイムのSTW一時停止で更新スレッドが時間通りに実行できない
  3. Redlockのクロックドリフト問題:マルチノードの時計非同期がロック安全性を損なう可能性
  4. 他人のロックの誤削除:Aのロック期限切れ後Bが取得、Aの解放がBのロックを削除
  5. ネットワーク分断下のスプリットブレイン:クライアントとRedisノードのネットワーク切断でロック状態不一致

ステップバイステップ:5つのRedis分散ロック実装

パターン1:基本SET NX EXロック

import redis
import uuid
import time

class RedisBasicLock:
    def __init__(self, redis_client: redis.Redis, lock_name: str, timeout: int = 10):
        self.redis = redis_client
        self.lock_name = f"lock:{lock_name}"
        self.timeout = timeout
        self.identifier = str(uuid.uuid4())

    def acquire(self) -> bool:
        result = self.redis.set(
            self.lock_name,
            self.identifier,
            nx=True,
            ex=self.timeout
        )
        return result is not None

    def release(self) -> bool:
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        result = self.redis.eval(lua_script, 1, self.lock_name, self.identifier)
        return result == 1

    def __enter__(self):
        if not self.acquire():
            raise RuntimeError(f"Failed to acquire lock: {self.lock_name}")
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()

パターン2:再入可能ロック(Reentrant Lock)

import redis
import uuid
import threading

class RedisReentrantLock:
    def __init__(self, redis_client: redis.Redis, lock_name: str, timeout: int = 30):
        self.redis = redis_client
        self.lock_name = f"reentrant_lock:{lock_name}"
        self.timeout = timeout
        self.identifier = str(uuid.uuid4())
        self._local = threading.local()

    def acquire(self) -> bool:
        count = getattr(self._local, 'count', 0)
        if count > 0:
            self._local.count = count + 1
            return True

        lua_acquire = """
        if redis.call("exists", KEYS[1]) == 0 then
            redis.call("hset", KEYS[1], ARGV[1], 1)
            redis.call("expire", KEYS[1], ARGV[2])
            return 1
        elseif redis.call("hexists", KEYS[1], ARGV[1]) == 1 then
            redis.call("hincrby", KEYS[1], ARGV[1], 1)
            redis.call("expire", KEYS[1], ARGV[2])
            return 1
        else
            return 0
        end
        """
        result = self.redis.eval(lua_acquire, 1, self.lock_name, self.identifier, str(self.timeout))
        if result == 1:
            self._local.count = 1
            return True
        return False

    def release(self) -> bool:
        count = getattr(self._local, 'count', 0)
        if count == 0:
            return False

        if count > 1:
            self._local.count = count - 1
            lua_decr = """
            if redis.call("hexists", KEYS[1], ARGV[1]) == 1 then
                redis.call("hincrby", KEYS[1], ARGV[1], -1)
                return 1
            else
                return 0
            end
            """
            self.redis.eval(lua_decr, 1, self.lock_name, self.identifier)
            return True

        lua_release = """
        if redis.call("hexists", KEYS[1], ARGV[1]) == 0 then
            return 0
        elseif redis.call("hincrby", KEYS[1], ARGV[1], -1) > 0 then
            redis.call("expire", KEYS[1], ARGV[2])
            return 1
        else
            return redis.call("del", KEYS[1])
        end
        """
        result = self.redis.eval(lua_release, 1, self.lock_name, self.identifier, str(self.timeout))
        self._local.count = 0
        return result in (1,)

    def __enter__(self):
        if not self.acquire():
            raise RuntimeError(f"Failed to acquire reentrant lock: {self.lock_name}")
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()

パターン3:ウォッチドッグ自動更新ロック

import redis
import uuid
import threading
import time
import logging

logger = logging.getLogger(__name__)

class RedisWatchdogLock:
    def __init__(self, redis_client: redis.Redis, lock_name: str, timeout: int = 30, renewal_interval: int = 10):
        self.redis = redis_client
        self.lock_name = f"watchdog_lock:{lock_name}"
        self.timeout = timeout
        self.renewal_interval = renewal_interval
        self.identifier = str(uuid.uuid4())
        self._watchdog_thread = None
        self._stop_event = threading.Event()

    def acquire(self, blocking: bool = True, wait_timeout: float = 30.0) -> bool:
        deadline = time.time() + wait_timeout
        while True:
            result = self.redis.set(self.lock_name, self.identifier, nx=True, ex=self.timeout)
            if result is not None:
                self._start_watchdog()
                return True
            if not blocking:
                return False
            if time.time() >= deadline:
                return False
            time.sleep(0.1)

    def _start_watchdog(self):
        self._stop_event.clear()
        self._watchdog_thread = threading.Thread(target=self._watchdog_loop, daemon=True)
        self._watchdog_thread.start()

    def _watchdog_loop(self):
        while not self._stop_event.is_set():
            self._stop_event.wait(self.renewal_interval)
            if self._stop_event.is_set():
                break
            try:
                lua_renew = """
                if redis.call("get", KEYS[1]) == ARGV[1] then
                    return redis.call("expire", KEYS[1], ARGV[2])
                else
                    return 0
                end
                """
                result = self.redis.eval(lua_renew, 1, self.lock_name, self.identifier, str(self.timeout))
                if result != 1:
                    logger.warning("Watchdog renewal failed for lock %s", self.lock_name)
                    break
            except Exception as e:
                logger.error("Watchdog error: %s", e)
                break

    def release(self) -> bool:
        self._stop_event.set()
        if self._watchdog_thread and self._watchdog_thread.is_alive():
            self._watchdog_thread.join(timeout=2.0)

        lua_release = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        result = self.redis.eval(lua_release, 1, self.lock_name, self.identifier)
        return result == 1

    def __enter__(self):
        if not self.acquire():
            raise RuntimeError(f"Failed to acquire watchdog lock: {self.lock_name}")
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()

パターン4:Redlockマルチノードロック

import redis
import uuid
import time
import logging
from typing import List

logger = logging.getLogger(__name__)

class Redlock:
    def __init__(self, redis_clients: List[redis.Redis], lock_name: str, timeout: int = 10, retry_count: int = 3, retry_delay: float = 0.2):
        self.redis_clients = redis_clients
        self.quorum = len(redis_clients) // 2 + 1
        self.lock_name = f"redlock:{lock_name}"
        self.timeout = timeout
        self.retry_count = retry_count
        self.retry_delay = retry_delay
        self.identifier = str(uuid.uuid4())

    def acquire(self) -> bool:
        for attempt in range(self.retry_count):
            acquired_count = 0
            start_time = time.monotonic()

            for client in self.redis_clients:
                try:
                    result = client.set(self.lock_name, self.identifier, nx=True, ex=self.timeout)
                    if result is not None:
                        acquired_count += 1
                except Exception as e:
                    logger.warning("Redlock acquire error on node: %s", e)

            elapsed = time.monotonic() - start_time
            validity_time = self.timeout - elapsed

            if acquired_count >= self.quorum and validity_time > 0:
                return True

            self._release_all_nodes()

            if attempt < self.retry_count - 1:
                jitter = (attempt * 0.01)
                time.sleep(self.retry_delay + jitter)

        return False

    def _release_all_nodes(self):
        lua_release = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        for client in self.redis_clients:
            try:
                client.eval(lua_release, 1, self.lock_name, self.identifier)
            except Exception as e:
                logger.warning("Redlock release error on node: %s", e)

    def release(self) -> bool:
        self._release_all_nodes()
        return True

    def __enter__(self):
        if not self.acquire():
            raise RuntimeError(f"Failed to acquire Redlock: {self.lock_name}")
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()

パターン5:プロダクション級ロックサービス(Go実装)

package distlock

import (
	"context"
	"crypto/rand"
	"encoding/hex"
	"fmt"
	"log"
	"sync"
	"time"

	"github.com/redis/go-redis/v9"
)

type LockService struct {
	client         *redis.Client
	watchdogCancel map[string]context.CancelFunc
	mu             sync.Mutex
}

func NewLockService(client *redis.Client) *LockService {
	return &LockService{
		client:         client,
		watchdogCancel: make(map[string]context.CancelFunc),
	}
}

type LockOptions struct {
	Timeout         time.Duration
	RenewalInterval time.Duration
	RetryCount      int
	RetryDelay      time.Duration
}

func DefaultLockOptions() LockOptions {
	return LockOptions{
		Timeout:         30 * time.Second,
		RenewalInterval: 10 * time.Second,
		RetryCount:      3,
		RetryDelay:      200 * time.Millisecond,
	}
}

func generateIdentifier() string {
	b := make([]byte, 16)
	rand.Read(b)
	return hex.EncodeToString(b)
}

var acquireScript = redis.NewScript(`
if redis.call("exists", KEYS[1]) == 0 then
    redis.call("hset", KEYS[1], "identifier", ARGV[1], "count", 1)
    redis.call("expire", KEYS[1], ARGV[2])
    return 1
elseif redis.call("hget", KEYS[1], "identifier") == ARGV[1] then
    redis.call("hincrby", KEYS[1], "count", 1)
    redis.call("expire", KEYS[1], ARGV[2])
    return 1
else
    return 0
end
`)

var releaseScript = redis.NewScript(`
if redis.call("hget", KEYS[1], "identifier") ~= ARGV[1] then
    return 0
end
local count = redis.call("hincrby", KEYS[1], "count", -1)
if count > 0 then
    redis.call("expire", KEYS[1], ARGV[2])
    return 1
end
return redis.call("del", KEYS[1])
`)

var renewScript = redis.NewScript(`
if redis.call("hget", KEYS[1], "identifier") == ARGV[1] then
    return redis.call("expire", KEYS[1], ARGV[2])
else
    return 0
end
`)

func (ls *LockService) Acquire(ctx context.Context, lockName string, opts LockOptions) (string, error) {
	identifier := generateIdentifier()
	key := fmt.Sprintf("lock_service:%s", lockName)

	for i := 0; i < opts.RetryCount; i++ {
		result, err := acquireScript.Run(ctx, ls.client, []string{key}, identifier, int(opts.Timeout.Seconds())).Int()
		if err != nil {
			return "", fmt.Errorf("acquire script error: %w", err)
		}
		if result == 1 {
			ls.startWatchdog(ctx, key, identifier, opts)
			return identifier, nil
		}

		select {
		case <-ctx.Done():
			return "", ctx.Err()
		case <-time.After(opts.RetryDelay):
		}
	}

	return "", fmt.Errorf("failed to acquire lock after %d retries", opts.RetryCount)
}

func (ls *LockService) startWatchdog(ctx context.Context, key, identifier string, opts LockOptions) {
	wdCtx, cancel := context.WithCancel(context.Background())

	ls.mu.Lock()
	ls.watchdogCancel[key+":"+identifier] = cancel
	ls.mu.Unlock()

	go func() {
		defer cancel()
		ticker := time.NewTicker(opts.RenewalInterval)
		defer ticker.Stop()

		for {
			select {
			case <-wdCtx.Done():
				return
			case <-ticker.C:
				result, err := renewScript.Run(wdCtx, ls.client, []string{key}, identifier, int(opts.Timeout.Seconds())).Int()
				if err != nil || result != 1 {
					log.Printf("Watchdog renewal failed for key=%s identifier=%s: result=%d err=%v", key, identifier, result, err)
					return
				}
			}
		}
	}()
}

func (ls *LockService) Release(ctx context.Context, lockName, identifier string, opts LockOptions) error {
	key := fmt.Sprintf("lock_service:%s", lockName)

	ls.mu.Lock()
	if cancel, ok := ls.watchdogCancel[key+":"+identifier]; ok {
		cancel()
		delete(ls.watchdogCancel, key+":"+identifier)
	}
	ls.mu.Unlock()

	result, err := releaseScript.Run(ctx, ls.client, []string{key}, identifier, int(opts.Timeout.Seconds())).Int()
	if err != nil {
		return fmt.Errorf("release script error: %w", err)
	}
	if result == 0 {
		return fmt.Errorf("lock not owned by identifier %s", identifier)
	}
	return nil
}

落とし穴ガイド

落とし穴1:オーナー検証なしでDEL

# ❌ 誤り:直接削除、他人のロックを削除する可能性
redis_client.delete("lock:order:123")

# ✅ 正しい:Luaスクリプトで原子check-and-delete
lua = """
if redis.call("get", KEYS[1]) == ARGV[1] then
    return redis.call("del", KEYS[1])
else
    return 0
end
"""
redis_client.eval(lua, 1, "lock:order:123", my_identifier)

落とし穴2:ロックタイムアウトが短すぎる

# ❌ 誤り:3秒タイムアウト、遅いクエリで期限切れ
redis_client.set("lock:order", identifier, nx=True, ex=3)

# ✅ 正しい:ウォッチドッグ更新 + 適切な初期タイムアウト
lock = RedisWatchdogLock(redis_client, "order", timeout=30, renewal_interval=10)
with lock:
    process_order()

落とし穴3:再入可能ロックでカウントなし

# ❌ 誤り:毎回SET NX、ネスト呼び出しでロック取得不可
def outer():
    with basic_lock:
        inner()

def inner():
    with basic_lock:  # デッドロック!自身のロックを再取得できない
        pass

# ✅ 正しい:再入可能ロックを使用
def outer():
    with reentrant_lock:
        inner()

def inner():
    with reentrant_lock:  # カウント+1、正常に取得可能
        pass

落とし穴4:Redlockでクロックドリフトを無視

# ❌ 誤り:ロックの残り有効時間を検証しない
acquired_count = 0
for client in redis_clients:
    result = client.set(lock_name, identifier, nx=True, ex=timeout)
    if result:
        acquired_count += 1
if acquired_count >= quorum:
    return True  # ロックがもうすぐ期限切れかも!

# ✅ 正しい:有効時間を検証
start = time.monotonic()
# ... 取得ロジック ...
elapsed = time.monotonic() - start
validity = timeout - elapsed
if acquired_count >= quorum and validity > 0:
    return True

落とし穴5:ウォッチドッグ更新間隔=ロックタイムアウト

# ❌ 誤り:更新間隔30秒 = ロックタイムアウト30秒、GC一時停止で更新不可
lock = RedisWatchdogLock(client, "order", timeout=30, renewal_interval=30)

# ✅ 正しい:更新間隔 = ロックタイムアウト / 3
lock = RedisWatchdogLock(client, "order", timeout=30, renewal_interval=10)

エラートラブルシューティング

# エラーメッセージ 原因 解決方法
1 UNLOCK_FAILED: lock not owned 解放時のidentifier不一致 ロック取得/解放で同じidentifierを使用
2 LOCK_TIMEOUT: acquire failed after retries ロック長時間保持または競合激しい リトライ回数増加、デッドロック確認
3 WATCHDOG_RENEWAL_FAILED ウォッチドッグ更新失敗、ロック削除/期限切れ ネットワーク確認、タイムアウト設定検証
4 RedisConnectionError Redis接続断 接続プールリトライ設定、Sentinel/Cluster使用
5 LuaScriptError: wrong number of arguments Luaスクリプト引数不一致 KEYSとARGVの数と順序を確認
6 Redlock quorum not reached 過半数ノードで取得失敗 ノード状態確認、retry_count増加
7 CONCURRENT_MODIFICATION: data inconsistency ロック早期解放による同時変更 ウォッチドッグ更新使用、タイムアウト増加
8 OOM: Redis out of memory 期限なしロックキーの蓄積 SET NX EXのEXパラメータが有効であることを確認
9 DEADLOCK: circular wait detected マルチロックの循環待機 ロック順序統一、グローバルタイムアウト設定
10 CLOCK_DRIFT: lock validity expired Redlockノードの時計偏差過大 NTP同期設定、validity time検証

高度な最適化

1. フェアロック実装

class RedisFairLock:
    def __init__(self, redis_client: redis.Redis, lock_name: str, timeout: int = 30):
        self.redis = redis_client
        self.lock_name = f"fair_lock:{lock_name}"
        self.queue_name = f"fair_lock_queue:{lock_name}"
        self.timeout = timeout
        self.identifier = str(uuid.uuid4())

    def acquire(self, wait_timeout: float = 30.0) -> bool:
        timestamp = time.time()
        self.redis.zadd(self.queue_name, {self.identifier: timestamp})
        self.redis.expire(self.queue_name, wait_timeout * 2)

        deadline = time.time() + wait_timeout
        while time.time() < deadline:
            lua = """
            local first = redis.call("zrange", KEYS[2], 0, 0)
            if first[1] == ARGV[1] then
                local result = redis.call("set", KEYS[1], ARGV[1], "nx", "ex", ARGV[2])
                if result then
                    redis.call("zrem", KEYS[2], ARGV[1])
                    return 1
                end
            end
            return 0
            """
            result = self.redis.eval(lua, 2, self.lock_name, self.queue_name, self.identifier, str(self.timeout))
            if result == 1:
                return True
            time.sleep(0.05)

        self.redis.zrem(self.queue_name, self.identifier)
        return False

    def release(self) -> bool:
        lua = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        result = self.redis.eval(lua, 1, self.lock_name, self.identifier)
        return result == 1

2. 読み書きロック実装

class RedisReadWriteLock:
    def __init__(self, redis_client: redis.Redis, lock_name: str, timeout: int = 30):
        self.redis = redis_client
        self.read_lock_name = f"rw_lock:{lock_name}:read"
        self.write_lock_name = f"rw_lock:{lock_name}:write"
        self.timeout = timeout
        self.identifier = str(uuid.uuid4())

    def acquire_read(self) -> bool:
        lua = """
        if redis.call("exists", KEYS[2]) == 1 then
            return 0
        end
        redis.call("hincrby", KEYS[1], "readers", 1)
        redis.call("expire", KEYS[1], ARGV[2])
        return 1
        """
        result = self.redis.eval(lua, 2, self.read_lock_name, self.write_lock_name, self.identifier, str(self.timeout))
        return result == 1

    def release_read(self) -> bool:
        lua = """
        local count = redis.call("hincrby", KEYS[1], "readers", -1)
        if count <= 0 then
            redis.call("del", KEYS[1])
        end
        return 1
        """
        self.redis.eval(lua, 1, self.read_lock_name, self.identifier)
        return True

    def acquire_write(self) -> bool:
        result = self.redis.set(self.write_lock_name, self.identifier, nx=True, ex=self.timeout)
        if result is None:
            return False
        lua = """
        if redis.call("exists", KEYS[1]) == 1 and redis.call("hget", KEYS[1], "readers") ~= "0" then
            redis.call("del", KEYS[2])
            return 0
        end
        return 1
        """
        check = self.redis.eval(lua, 2, self.read_lock_name, self.write_lock_name, self.identifier)
        return check == 1

    def release_write(self) -> bool:
        lua = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        result = self.redis.eval(lua, 1, self.write_lock_name, self.identifier)
        return result == 1

3. ロック監視メトリクス収集

package distlock

import (
	"context"
	"fmt"
	"time"

	"github.com/redis/go-redis/v9"
)

type LockMetrics struct {
	LockName      string
	CurrentHolder string
	RemainTTL     time.Duration
	AcquireCount  int64
	WaitQueueLen  int64
}

func CollectLockMetrics(ctx context.Context, client *redis.Client, lockName string) (*LockMetrics, error) {
	key := fmt.Sprintf("lock_service:%s", lockName)

	ttl, err := client.TTL(ctx, key).Result()
	if err != nil {
		return nil, err
	}

	identifier, _ := client.HGet(ctx, key, "identifier").Result()
	count, _ := client.HGet(ctx, key, "count").Int64()

	queueKey := fmt.Sprintf("fair_lock_queue:%s", lockName)
	queueLen, _ := client.ZCard(ctx, queueKey).Result()

	return &LockMetrics{
		LockName:      lockName,
		CurrentHolder: identifier,
		RemainTTL:     ttl,
		AcquireCount:  count,
		WaitQueueLen:  queueLen,
	}, nil
}

比較分析

次元 SET NX EX 再入可能ロック ウォッチドッグロック Redlock プロダクション級ロックサービス
実装複雑度 ⭐低 ⭐⭐中 ⭐⭐⭐高 ⭐⭐⭐高 ⭐⭐⭐⭐極高
原子性 ⚠️Lua必要 ✅Lua保証 ✅Lua保証 ✅マルチノード ✅Lua保証
再入可能
自動更新
マルチノード耐障害性 オプション
誤削除防止 ⚠️Lua必要
GC耐性 ⭐高 ⭐高 ⚠️中 ⭐高 ⚠️中
プロダクション推奨 プロトタイプ検証 標準ビジネス 長トランザクション 高可用性 クリティカルパス

まとめ:Redis分散ロックは「1つのコマンド」ではなく「1つの体系」のエンジニアリングです。SET NX EXからプロダクション級ロックサービスまで、コア原則は3つだけ:原子操作にはLua、ロックタイムアウトは更新、解放前にオーナー検証。Redlockは多くのシナリオで過剰設計です——単一ノード+Sentinel HAで99%のビジネスケースに対応できます。Redlockマルチノード方案に投資すべきは、「ロック失敗が深刻なデータ不整合を引き起こす」クリティカルパスのみです。


オンラインツール推奨

ブラウザローカルツールを無料で試す →

#Redis#分布式锁#Redlock#并发控制#分布式系统#2026#Redisson