分散トランザクションパターン実践:SagaからTCCまでの5つのプロダクションパターン

技术架构

分散トランザクション:マイクロサービスアーキテクチャで最も難しいエンジニアリング問題

注文して在庫を引き当て、残高を引き落とす——3つのサービス、3つのデータベース。在庫は引き当てられたが残高は引き落とされなかった——このデータ不整合は本番環境で毎日発生しています。ローカルトランザクションを使ってもサービスを跨げない。分散ロックを追加しても、ロックが期限切れになってトランザクションがまだコミットされていない。Seataを試すと、グローバルロックが同時実行性をシングルスレッドに戻してしまう。メッセージ最終整合性を使うと、メッセージ損失で補償ロジックが山のように増える。2026年、分散トランザクションはマイクロサービスアーキテクチャで最も事故を起こしやすいコンポーネントであり続けています。

本記事では5つのプロダクションパターンから出発し、Sagaオーケストレーション→TCCパターン→Seata AT→メッセージ最終整合性→プロダクション級信頼性保障のフルパイプライン実践を完全なJava/Spring Bootコードと落とし穴ガイドとともに解説します。


分散トランザクションコア概念

概念 説明
ローカルトランザクション 単一データソースのACIDトランザクション、サービス跨ぎの一貫性を保証できない
2PC(二相コミット) コーディネーターが一括prepare/commit、同期ブロッキング、パフォーマンスが低い
3PC(三相コミット) CanCommitフェーズを追加、ブロッキングを削減するが実装が複雑
Sagaパターン 長トランザクションを複数のローカルトランザクションに分割、失敗時に補償操作を実行
TCCパターン Try-Confirm-Cancelの3ステップ操作、ビジネスレベルで一貫性を保証
Seata ATモード SQLを自動傍受してロールバックログを生成、非侵入型分散トランザクション
メッセージ最終整合性 メッセージキューによる非同期保証、リアルタイムではなく最終的な状態一貫性
トランザクショナルアウトボックス ビジネス操作とメッセージ送信を同一トランザクションで、メッセージ損失を防止
グローバルロック Seataでグローバルトランザクションが保持する行ロック、ダーティライトを防止
冪等性 同一操作を複数回実行しても結果が同じ、分散トランザクションのコア保証

問題分析:分散トランザクションの5つの課題

  1. サービス跨ぎデータ一貫性:注文サービスが注文を作成、在庫サービスが在庫を引き当て、アカウントサービスが残高を引き落とす——いずれかのステップが失敗すると全ロールバックが必要
  2. 補償操作の原子性:Saga補償自体も失敗する可能性がある——補償が失敗したらどうするか
  3. グローバルロックと同時実行競合:Seataグローバルロックがパフォーマンス低下を引き起こす、高同時実行下でロック待ちタイムアウト
  4. メッセージ損失と重複消費:メッセージ送信成功してもコンシューマーがクラッシュ、または重複消費による二重引き落とし
  5. タイムアウトとサスペンション問題:TCCのTryがタイムアウトした後Cancelが既に実行済み、後続のConfirm到着がサスペンションになる

ステップバイステップ:5つの分散トランザクション実装

パターン1:Sagaオーケストレーション(中央コーディネーター)

package com.toolsku.saga;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.function.Supplier;

@Data
@Slf4j
public class SagaDefinition {

    private String sagaId;
    private List<SagaStep> steps = new ArrayList<>();
    private int currentStep = 0;
    private SagaStatus status = SagaStatus.PENDING;

    public enum SagaStatus {
        PENDING, RUNNING, COMPENSATING, COMPLETED, FAILED
    }

    @Data
    public static class SagaStep {
        private String name;
        private Supplier<Boolean> action;
        private Supplier<Boolean> compensation;
        private StepStatus stepStatus = StepStatus.PENDING;

        public enum StepStatus {
            PENDING, EXECUTING, COMPLETED, COMPENSATING, COMPENSATED, FAILED
        }
    }

    public static SagaBuilder builder() {
        return new SagaBuilder();
    }

    public static class SagaBuilder {
        private final SagaDefinition saga = new SagaDefinition();

        public SagaBuilder step(String name, Supplier<Boolean> action, Supplier<Boolean> compensation) {
            SagaStep step = new SagaStep();
            step.setName(name);
            step.setAction(action);
            step.setCompensation(compensation);
            saga.getSteps().add(step);
            return this;
        }

        public SagaDefinition build() {
            saga.setSagaId(UUID.randomUUID().toString());
            return saga;
        }
    }
}
package com.toolsku.saga;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class SagaOrchestrator {

    public SagaDefinition execute(SagaDefinition saga) {
        saga.setStatus(SagaDefinition.SagaStatus.RUNNING);
        log.info("Saga [{}] started, total steps: {}", saga.getSagaId(), saga.getSteps().size());

        for (int i = 0; i < saga.getSteps().size(); i++) {
            saga.setCurrentStep(i);
            SagaDefinition.SagaStep step = saga.getSteps().get(i);
            step.setStepStatus(SagaDefinition.SagaStep.StepStatus.EXECUTING);

            try {
                Boolean result = step.getAction().get();
                if (result == null || !result) {
                    log.error("Saga [{}] step [{}] action failed", saga.getSagaId(), step.getName());
                    step.setStepStatus(SagaDefinition.SagaStep.StepStatus.FAILED);
                    return compensate(saga);
                }
                step.setStepStatus(SagaDefinition.SagaStep.StepStatus.COMPLETED);
                log.info("Saga [{}] step [{}] completed", saga.getSagaId(), step.getName());
            } catch (Exception e) {
                log.error("Saga [{}] step [{}] exception: {}", saga.getSagaId(), step.getName(), e.getMessage(), e);
                step.setStepStatus(SagaDefinition.SagaStep.StepStatus.FAILED);
                return compensate(saga);
            }
        }

        saga.setStatus(SagaDefinition.SagaStatus.COMPLETED);
        log.info("Saga [{}] completed successfully", saga.getSagaId());
        return saga;
    }

    private SagaDefinition compensate(SagaDefinition saga) {
        saga.setStatus(SagaDefinition.SagaStatus.COMPENSATING);
        log.info("Saga [{}] compensating from step [{}]", saga.getSagaId(), saga.getCurrentStep());

        for (int i = saga.getCurrentStep(); i >= 0; i--) {
            SagaDefinition.SagaStep step = saga.getSteps().get(i);
            if (step.getStepStatus() != SagaDefinition.SagaStep.StepStatus.COMPLETED) {
                continue;
            }

            step.setStepStatus(SagaDefinition.SagaStep.StepStatus.COMPENSATING);
            try {
                Boolean result = step.getCompensation().get();
                if (result != null && result) {
                    step.setStepStatus(SagaDefinition.SagaStep.StepStatus.COMPENSATED);
                    log.info("Saga [{}] step [{}] compensated", saga.getSagaId(), step.getName());
                } else {
                    step.setStepStatus(SagaDefinition.SagaStep.StepStatus.FAILED);
                    log.error("Saga [{}] step [{}] compensation failed", saga.getSagaId(), step.getName());
                }
            } catch (Exception e) {
                step.setStepStatus(SagaDefinition.SagaStep.StepStatus.FAILED);
                log.error("Saga [{}] step [{}] compensation exception: {}", saga.getSagaId(), step.getName(), e.getMessage(), e);
            }
        }

        saga.setStatus(SagaDefinition.SagaStatus.FAILED);
        return saga;
    }
}
package com.toolsku.saga;

import com.toolsku.service.OrderService;
import com.toolsku.service.InventoryService;
import com.toolsku.service.AccountService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RequiredArgsConstructor
public class OrderSagaService {

    private final SagaOrchestrator sagaOrchestrator;
    private final OrderService orderService;
    private final InventoryService inventoryService;
    private final AccountService accountService;

    public SagaDefinition createOrder(Long userId, Long productId, Integer quantity, BigDecimal amount) {
        SagaDefinition saga = SagaDefinition.builder()
                .step("createOrder",
                        () -> orderService.createOrder(userId, productId, quantity, amount),
                        () -> orderService.cancelOrder(userId, productId))
                .step("deductInventory",
                        () -> inventoryService.deductInventory(productId, quantity),
                        () -> inventoryService.restoreInventory(productId, quantity))
                .step("deductAccount",
                        () -> accountService.deductBalance(userId, amount),
                        () -> accountService.restoreBalance(userId, amount))
                .build();

        return sagaOrchestrator.execute(saga);
    }
}

パターン2:TCCパターン(Try-Confirm-Cancel)

package com.toolsku.tcc;

import lombok.Data;
import java.math.BigDecimal;

@Data
public class AccountTccRequest {
    private String xid;
    private Long userId;
    private BigDecimal amount;
    private String branchId;
}
package com.toolsku.tcc;

import org.apache.ibatis.annotations.*;

@Mapper
public interface AccountTccMapper {

    @Insert("INSERT INTO account_tcc_freeze (xid, user_id, amount, status, branch_id, created_at) " +
            "VALUES (#{xid}, #{userId}, #{amount}, 'TRYING', #{branchId}, NOW())")
    int insertFreezeRecord(@Param("xid") String xid,
                           @Param("userId") Long userId,
                           @Param("amount") BigDecimal amount,
                           @Param("branchId") String branchId);

    @Update("UPDATE account SET balance = balance - #{amount}, frozen = frozen + #{amount} " +
            "WHERE user_id = #{userId} AND balance >= #{amount}")
    int freezeBalance(@Param("userId") Long userId, @Param("amount") BigDecimal amount);

    @Update("UPDATE account SET frozen = frozen - #{amount} " +
            "WHERE user_id = #{userId} AND frozen >= #{amount}")
    int confirmDeduct(@Param("userId") Long userId, @Param("amount") BigDecimal amount);

    @Update("UPDATE account SET balance = balance + #{amount}, frozen = frozen - #{amount} " +
            "WHERE user_id = #{userId} AND frozen >= #{amount}")
    int cancelFreeze(@Param("userId") Long userId, @Param("amount") BigDecimal amount);

    @Select("SELECT COUNT(*) FROM account_tcc_freeze WHERE xid = #{xid} AND branch_id = #{branchId}")
    int countFreezeRecord(@Param("xid") String xid, @Param("branchId") String branchId);

    @Update("UPDATE account_tcc_freeze SET status = #{status} WHERE xid = #{xid} AND branch_id = #{branchId}")
    int updateFreezeStatus(@Param("xid") String xid,
                           @Param("branchId") String branchId,
                           @Param("status") String status);

    @Select("SELECT status FROM account_tcc_freeze WHERE xid = #{xid} AND branch_id = #{branchId}")
    String getFreezeStatus(@Param("xid") String xid, @Param("branchId") String branchId);
}
package com.toolsku.tcc;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.math.BigDecimal;

@Slf4j
@Service
@RequiredArgsConstructor
public class AccountTccService {

    private final AccountTccMapper accountTccMapper;

    @Transactional(rollbackFor = Exception.class)
    public boolean tryDeduct(String xid, Long userId, BigDecimal amount, String branchId) {
        if (accountTccMapper.countFreezeRecord(xid, branchId) > 0) {
            log.info("TCC try already executed: xid={}, branchId={}", xid, branchId);
            return true;
        }

        int rows = accountTccMapper.freezeBalance(userId, amount);
        if (rows == 0) {
            log.warn("TCC try failed: insufficient balance, userId={}, amount={}", userId, amount);
            return false;
        }

        accountTccMapper.insertFreezeRecord(xid, userId, amount, branchId);
        log.info("TCC try success: xid={}, userId={}, amount={}", xid, userId, amount);
        return true;
    }

    @Transactional(rollbackFor = Exception.class)
    public boolean confirmDeduct(String xid, Long userId, BigDecimal amount, String branchId) {
        String status = accountTccMapper.getFreezeStatus(xid, branchId);
        if ("CONFIRMED".equals(status)) {
            log.info("TCC confirm already executed: xid={}, branchId={}", xid, branchId);
            return true;
        }

        int rows = accountTccMapper.confirmDeduct(userId, amount);
        if (rows == 0) {
            log.error("TCC confirm failed: frozen amount mismatch, userId={}, amount={}", userId, amount);
            return false;
        }

        accountTccMapper.updateFreezeStatus(xid, branchId, "CONFIRMED");
        log.info("TCC confirm success: xid={}, userId={}, amount={}", xid, userId, amount);
        return true;
    }

    @Transactional(rollbackFor = Exception.class)
    public boolean cancelFreeze(String xid, Long userId, BigDecimal amount, String branchId) {
        String status = accountTccMapper.getFreezeStatus(xid, branchId);
        if ("CANCELLED".equals(status)) {
            log.info("TCC cancel already executed: xid={}, branchId={}", xid, branchId);
            return true;
        }
        if ("CONFIRMED".equals(status)) {
            log.warn("TCC cancel skipped: already confirmed, xid={}, branchId={}", xid, branchId);
            return true;
        }

        int rows = accountTccMapper.cancelFreeze(userId, amount);
        if (rows == 0) {
            log.error("TCC cancel failed: frozen amount mismatch, userId={}, amount={}", userId, amount);
            return false;
        }

        accountTccMapper.updateFreezeStatus(xid, branchId, "CANCELLED");
        log.info("TCC cancel success: xid={}, userId={}, amount={}", xid, userId, amount);
        return true;
    }
}
package com.toolsku.tcc;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.math.BigDecimal;
import java.util.UUID;

@Slf4j
@Service
@RequiredArgsConstructor
public class TccTransactionManager {

    private final AccountTccService accountTccService;

    public boolean executeDeduct(String xid, Long userId, BigDecimal amount) {
        String branchId = UUID.randomUUID().toString();

        boolean tryResult = accountTccService.tryDeduct(xid, userId, amount, branchId);
        if (!tryResult) {
            accountTccService.cancelFreeze(xid, userId, amount, branchId);
            return false;
        }

        boolean confirmResult = accountTccService.confirmDeduct(xid, userId, amount, branchId);
        if (!confirmResult) {
            accountTccService.cancelFreeze(xid, userId, amount, branchId);
            return false;
        }

        return true;
    }
}

パターン3:Seata ATモード(自動補償)

package com.toolsku.seata;

import io.seata.spring.annotation.GlobalTransactional;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.math.BigDecimal;

@Slf4j
@Service
@RequiredArgsConstructor
public class OrderSeataService {

    private final OrderClient orderClient;
    private final InventoryClient inventoryClient;
    private final AccountClient accountClient;

    @GlobalTransactional(name = "create-order", rollbackFor = Exception.class, timeoutMills = 60000)
    public String createOrder(Long userId, Long productId, Integer quantity, BigDecimal amount) {
        log.info("Seata global transaction started: userId={}, productId={}, quantity={}, amount={}",
                userId, productId, quantity, amount);

        String orderNo = orderClient.createOrder(userId, productId, quantity, amount);
        log.info("Step 1: order created, orderNo={}", orderNo);

        inventoryClient.deductInventory(productId, quantity);
        log.info("Step 2: inventory deducted, productId={}, quantity={}", productId, quantity);

        accountClient.deductBalance(userId, amount);
        log.info("Step 3: balance deducted, userId={}, amount={}", userId, amount);

        return orderNo;
    }
}
package com.toolsku.seata;

import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.*;

@FeignClient(name = "order-service", url = "${service.order.url}")
public interface OrderClient {

    @PostMapping("/api/orders")
    String createOrder(@RequestParam("userId") Long userId,
                       @RequestParam("productId") Long productId,
                       @RequestParam("quantity") Integer quantity,
                       @RequestParam("amount") BigDecimal amount);

    @DeleteMapping("/api/orders/{orderNo}")
    void cancelOrder(@PathVariable("orderNo") String orderNo);
}

@FeignClient(name = "inventory-service", url = "${service.inventory.url}")
public interface InventoryClient {

    @PostMapping("/api/inventory/deduct")
    void deductInventory(@RequestParam("productId") Long productId,
                         @RequestParam("quantity") Integer quantity);

    @PostMapping("/api/inventory/restore")
    void restoreInventory(@RequestParam("productId") Long productId,
                          @RequestParam("quantity") Integer quantity);
}

@FeignClient(name = "account-service", url = "${service.account.url}")
public interface AccountClient {

    @PostMapping("/api/accounts/deduct")
    void deductBalance(@RequestParam("userId") Long userId,
                       @RequestParam("amount") BigDecimal amount);

    @PostMapping("/api/accounts/restore")
    void restoreBalance(@RequestParam("userId") Long userId,
                        @RequestParam("amount") BigDecimal amount);
}
# application-seata.yml
seata:
  enabled: true
  application-id: order-service
  tx-service-group: toolsku-tx-group
  service:
    vgroup-mapping:
      toolsku-tx-group: default
    grouplist:
      default: 127.0.0.1:8091
  config:
    type: nacos
    nacos:
      server-addr: 127.0.0.1:8848
      namespace: seata
      group: SEATA_GROUP
  registry:
    type: nacos
    nacos:
      server-addr: 127.0.0.1:8848
      namespace: seata
      group: SEATA_GROUP
  client:
    undo:
      data-validation: true
      log-serialization: jackson
      log-table: undo_log
    lock:
      retry-interval: 10
      retry-times: 30
      retry-policy-branch-rollback-on-conflict: true
-- Seata ATモード用undo_logテーブル(各サービスデータベースに必要)
CREATE TABLE IF NOT EXISTS undo_log (
    id            BIGINT       AUTO_INCREMENT PRIMARY KEY,
    branch_id     BIGINT       NOT NULL,
    xid           VARCHAR(128) NOT NULL,
    context       VARCHAR(128) NOT NULL,
    rollback_info LONGBLOB     NOT NULL,
    log_status    INT          NOT NULL,
    log_created   DATETIME     NOT NULL,
    log_modified  DATETIME     NOT NULL,
    UNIQUE KEY ux_undo_log (xid, branch_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

パターン4:メッセージ最終整合性(トランザクショナルアウトボックス + RocketMQ)

-- トランザクショナルアウトボックステーブル
CREATE TABLE IF NOT EXISTS transactional_outbox (
    id            BIGINT       AUTO_INCREMENT PRIMARY KEY,
    aggregate_id  VARCHAR(128) NOT NULL,
    aggregate_type VARCHAR(64) NOT NULL,
    event_type    VARCHAR(128) NOT NULL,
    payload       JSON         NOT NULL,
    status        VARCHAR(32)  NOT NULL DEFAULT 'PENDING',
    retry_count   INT          NOT NULL DEFAULT 0,
    max_retry     INT          NOT NULL DEFAULT 5,
    created_at    DATETIME     NOT NULL DEFAULT CURRENT_TIMESTAMP,
    updated_at    DATETIME     NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    INDEX idx_status_created (status, created_at),
    UNIQUE KEY uk_aggregate_event (aggregate_id, aggregate_type, event_type)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
package com.toolsku.outbox;

import lombok.Data;
import java.time.LocalDateTime;

@Data
public class OutboxEvent {
    private Long id;
    private String aggregateId;
    private String aggregateType;
    private String eventType;
    private String payload;
    private String status;
    private Integer retryCount;
    private Integer maxRetry;
    private LocalDateTime createdAt;
    private LocalDateTime updatedAt;
}
package com.toolsku.outbox;

import org.apache.ibatis.annotations.*;

import java.util.List;

@Mapper
public interface OutboxMapper {

    @Insert("INSERT INTO transactional_outbox (aggregate_id, aggregate_type, event_type, payload, status) " +
            "VALUES (#{aggregateId}, #{aggregateType}, #{eventType}, #{payload}, 'PENDING')")
    @Options(useGeneratedKeys = true, keyProperty = "id")
    int insert(OutboxEvent event);

    @Select("SELECT * FROM transactional_outbox WHERE status = 'PENDING' AND retry_count < max_retry " +
            "ORDER BY created_at ASC LIMIT #{limit}")
    List<OutboxEvent> findPendingEvents(@Param("limit") int limit);

    @Update("UPDATE transactional_outbox SET status = #{status}, retry_count = retry_count + 1, " +
            "updated_at = NOW() WHERE id = #{id}")
    int updateStatus(@Param("id") Long id, @Param("status") String status);

    @Update("UPDATE transactional_outbox SET status = 'SENT', updated_at = NOW() WHERE id = #{id}")
    int markAsSent(@Param("id") Long id);

    @Update("UPDATE transactional_outbox SET status = 'FAILED', updated_at = NOW() " +
            "WHERE id = #{id} AND retry_count >= max_retry")
    int markAsFailed(@Param("id") Long id);
}
package com.toolsku.outbox;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;

@Slf4j
@Component
@RequiredArgsConstructor
public class OutboxRelayScheduler {

    private final OutboxMapper outboxMapper;
    private final RocketMQTemplate rocketMQTemplate;

    private static final int BATCH_SIZE = 50;

    @Scheduled(fixedDelay = 1000)
    public void relayPendingEvents() {
        List<OutboxEvent> events = outboxMapper.findPendingEvents(BATCH_SIZE);
        if (events.isEmpty()) {
            return;
        }

        for (OutboxEvent event : events) {
            try {
                String topic = String.format("toolsku-%s-topic", event.getAggregateType());
                String key = event.getAggregateId();

                rocketMQTemplate.syncSend(topic,
                        rocketMQTemplate.getMessageConverter().toMessage(event.getPayload(), null, key));

                outboxMapper.markAsSent(event.getId());
                log.info("Outbox event sent: id={}, type={}, aggregateId={}",
                        event.getId(), event.getEventType(), event.getAggregateId());
            } catch (Exception e) {
                outboxMapper.updateStatus(event.getId(), "PENDING");
                outboxMapper.markAsFailed(event.getId());
                log.error("Outbox event send failed: id={}, error={}", event.getId(), e.getMessage(), e);
            }
        }
    }
}
package com.toolsku.outbox;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.UUID;

@Slf4j
@Service
@RequiredArgsConstructor
public class OrderEventService {

    private final OutboxMapper outboxMapper;
    private final OrderMapper orderMapper;
    private final ObjectMapper objectMapper;

    @Transactional(rollbackFor = Exception.class)
    public String createOrderWithOutbox(Long userId, Long productId, Integer quantity, BigDecimal amount) {
        String orderNo = "ORD" + UUID.randomUUID().toString().replace("-", "").substring(0, 16).toUpperCase();

        orderMapper.insertOrder(orderNo, userId, productId, quantity, amount, "CREATED");

        try {
            String payload = objectMapper.writeValueAsString(
                    new OrderCreatedEvent(orderNo, userId, productId, quantity, amount));

            OutboxEvent event = new OutboxEvent();
            event.setAggregateId(orderNo);
            event.setAggregateType("order");
            event.setEventType("ORDER_CREATED");
            event.setPayload(payload);
            outboxMapper.insert(event);
        } catch (Exception e) {
            throw new RuntimeException("Failed to serialize order event", e);
        }

        log.info("Order created with outbox event: orderNo={}", orderNo);
        return orderNo;
    }
}
package com.toolsku.consumer;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(topic = "toolsku-order-topic", consumerGroup = "inventory-consumer-group")
public class InventoryOrderConsumer implements RocketMQListener<String> {

    private final InventoryService inventoryService;
    private final IdempotentRecordService idempotentRecordService;

    @Override
    public void onMessage(String message) {
        OrderCreatedEvent event = parseEvent(message);
        String idempotentKey = "INVENTORY_DEDUCT:" + event.getOrderNo();

        if (idempotentRecordService.isProcessed(idempotentKey)) {
            log.info("Duplicate message skipped: orderNo={}", event.getOrderNo());
            return;
        }

        try {
            inventoryService.deductInventory(event.getProductId(), event.getQuantity());
            idempotentRecordService.markProcessed(idempotentKey);
            log.info("Inventory deducted for order: orderNo={}", event.getOrderNo());
        } catch (Exception e) {
            log.error("Inventory deduction failed: orderNo={}, error={}",
                    event.getOrderNo(), e.getMessage(), e);
            throw new RuntimeException("Inventory deduction failed", e);
        }
    }
}

パターン5:プロダクション級信頼性保障

package com.toolsku.reliability;

import lombok.Data;
import org.apache.ibatis.annotations.*;

import java.time.LocalDateTime;

@Data
public class IdempotentRecord {
    private Long id;
    private String idempotentKey;
    private String status;
    private LocalDateTime createdAt;
    private LocalDateTime expireAt;
}

@Mapper
public interface IdempotentRecordMapper {

    @Insert("INSERT INTO idempotent_record (idempotent_key, status, created_at, expire_at) " +
            "VALUES (#{idempotentKey}, 'PROCESSING', NOW(), DATE_ADD(NOW(), INTERVAL 24 HOUR)) " +
            "ON DUPLICATE KEY UPDATE idempotent_key = idempotent_key")
    int tryInsert(@Param("idempotentKey") String idempotentKey);

    @Select("SELECT status FROM idempotent_record WHERE idempotent_key = #{idempotentKey}")
    String getStatus(@Param("idempotentKey") String idempotentKey);

    @Update("UPDATE idempotent_record SET status = 'PROCESSED' WHERE idempotent_key = #{idempotentKey}")
    int markProcessed(@Param("idempotentKey") String idempotentKey);

    @Delete("DELETE FROM idempotent_record WHERE expire_at < NOW()")
    int cleanExpired();
}
package com.toolsku.reliability;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RequiredArgsConstructor
public class IdempotentRecordService {

    private final IdempotentRecordMapper idempotentRecordMapper;

    public boolean isProcessed(String idempotentKey) {
        String status = idempotentRecordMapper.getStatus(idempotentKey);
        return "PROCESSED".equals(status);
    }

    public boolean tryAcquire(String idempotentKey) {
        int rows = idempotentRecordMapper.tryInsert(idempotentKey);
        if (rows == 0) {
            String status = idempotentRecordMapper.getStatus(idempotentKey);
            return "PROCESSED".equals(status);
        }
        return true;
    }

    public void markProcessed(String idempotentKey) {
        idempotentRecordMapper.markProcessed(idempotentKey);
    }
}
package com.toolsku.reliability;

import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.*;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import jakarta.servlet.http.HttpServletRequest;
import java.lang.reflect.Method;

@Slf4j
@Component
@Aspect
public class IdempotentAspect {

    private final IdempotentRecordService idempotentRecordService;

    public IdempotentAspect(IdempotentRecordService idempotentRecordService) {
        this.idempotentRecordService = idempotentRecordService;
    }

    @Around("@annotation(com.toolsku.reliability.Idempotent)")
    public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Method method = signature.getMethod();
        Idempotent annotation = method.getAnnotation(Idempotent.class);

        String idempotentKey = resolveKey(annotation);
        if (idempotentKey == null) {
            return joinPoint.proceed();
        }

        if (idempotentRecordService.isProcessed(idempotentKey)) {
            log.info("Idempotent check: already processed, key={}", idempotentKey);
            return null;
        }

        if (!idempotentRecordService.tryAcquire(idempotentKey)) {
            log.warn("Idempotent check: concurrent processing, key={}", idempotentKey);
            throw new RuntimeException("Concurrent request detected");
        }

        try {
            Object result = joinPoint.proceed();
            idempotentRecordService.markProcessed(idempotentKey);
            return result;
        } catch (Exception e) {
            log.error("Idempotent execution failed: key={}", idempotentKey, e);
            throw e;
        }
    }

    private String resolveKey(Idempotent annotation) {
        ServletRequestAttributes attributes =
                (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
        if (attributes == null) {
            return null;
        }
        HttpServletRequest request = attributes.getRequest();
        String header = request.getHeader(annotation.headerKey());
        return header != null ? annotation.prefix() + header : null;
    }
}
package com.toolsku.reliability;

import java.lang.annotation.*;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Idempotent {
    String headerKey() default "X-Idempotent-Key";
    String prefix() default "IDEMPOTENT:";
}
package com.toolsku.reliability;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.concurrent.*;

@Slf4j
@Component
public class TransactionTimeoutGuard {

    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
    private final ConcurrentHashMap<String, TimeoutTask> runningTasks = new ConcurrentHashMap<>();

    public void register(String xid, long timeoutMillis, Runnable onTimeout) {
        ScheduledFuture<?> future = scheduler.schedule(() -> {
            log.warn("Transaction timeout triggered: xid={}", xid);
            runningTasks.remove(xid);
            try {
                onTimeout.run();
            } catch (Exception e) {
                log.error("Timeout callback failed: xid={}", xid, e);
            }
        }, timeoutMillis, TimeUnit.MILLISECONDS);

        runningTasks.put(xid, new TimeoutTask(xid, future, timeoutMillis));
    }

    public void cancel(String xid) {
        TimeoutTask task = runningTasks.remove(xid);
        if (task != null) {
            task.getFuture().cancel(false);
            log.info("Transaction timeout guard cancelled: xid={}", xid);
        }
    }

    public long getRemainingTime(String xid) {
        TimeoutTask task = runningTasks.get(xid);
        if (task == null) {
            return -1;
        }
        long elapsed = System.currentTimeMillis() - task.getStartTime();
        return Math.max(0, task.getTimeoutMillis() - elapsed);
    }

    @lombok.Data
    @lombok.AllArgsConstructor
    private static class TimeoutTask {
        private String xid;
        private ScheduledFuture<?> future;
        private long timeoutMillis;
        private long startTime = System.currentTimeMillis();
    }
}
package com.toolsku.reliability;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@Component
public class TransactionMetrics {

    private final Counter sagaSuccessCounter;
    private final Counter sagaFailureCounter;
    private final Counter tccConfirmCounter;
    private final Counter tccCancelCounter;
    private final Counter seataCommitCounter;
    private final Counter seataRollbackCounter;
    private final Timer sagaExecutionTimer;
    private final AtomicInteger activeTransactionGauge;

    public TransactionMetrics(MeterRegistry registry) {
        this.sagaSuccessCounter = Counter.builder("transaction.saga.success")
                .description("Saga transaction success count").register(registry);
        this.sagaFailureCounter = Counter.builder("transaction.saga.failure")
                .description("Saga transaction failure count").register(registry);
        this.tccConfirmCounter = Counter.builder("transaction.tcc.confirm")
                .description("TCC confirm count").register(registry);
        this.tccCancelCounter = Counter.builder("transaction.tcc.cancel")
                .description("TCC cancel count").register(registry);
        this.seataCommitCounter = Counter.builder("transaction.seata.commit")
                .description("Seata commit count").register(registry);
        this.seataRollbackCounter = Counter.builder("transaction.seata.rollback")
                .description("Seata rollback count").register(registry);
        this.sagaExecutionTimer = Timer.builder("transaction.saga.duration")
                .description("Saga execution duration").register(registry);
        this.activeTransactionGauge = registry.gauge("transaction.active.count",
                new AtomicInteger(0));
    }

    public void recordSagaSuccess(long durationMs) {
        sagaSuccessCounter.increment();
        sagaExecutionTimer.record(durationMs, TimeUnit.MILLISECONDS);
    }

    public void recordSagaFailure(long durationMs) {
        sagaFailureCounter.increment();
        sagaExecutionTimer.record(durationMs, TimeUnit.MILLISECONDS);
    }

    public void recordTccConfirm() { tccConfirmCounter.increment(); }
    public void recordTccCancel() { tccCancelCounter.increment(); }
    public void recordSeataCommit() { seataCommitCounter.increment(); }
    public void recordSeataRollback() { seataRollbackCounter.increment(); }

    public void incrementActive() { activeTransactionGauge.incrementAndGet(); }
    public void decrementActive() { activeTransactionGauge.decrementAndGet(); }
    public int getActiveTransactionCount() { return activeTransactionGauge.get(); }
}

落とし穴ガイド

落とし穴1:Saga補償操作はロールバックではない

// ❌ 間違い:補償操作がデータを物理削除しようとする
public boolean compensateOrder(String orderNo) {
    return orderMapper.deleteById(orderNo) > 0; // 物理削除、監査データが消失
}

// ✅ 正しい:補償操作は意味的な逆操作
public boolean compensateOrder(String orderNo) {
    return orderMapper.updateStatus(orderNo, "CANCELLED") > 0; // ステータス変更、記録を保持
}

落とし穴2:TCCの空ロールバックとサスペンション

// ❌ 間違い:Tryが実行されたか確認せずにCancelを実行
public boolean cancelFreeze(String xid, Long userId, BigDecimal amount) {
    return accountMapper.cancelFreeze(userId, amount) > 0; // 空ロールバック:凍結レコードが存在しない
}

// ✅ 正しい:Tryが実行されたかチェック
public boolean cancelFreeze(String xid, Long userId, BigDecimal amount, String branchId) {
    String status = accountTccMapper.getFreezeStatus(xid, branchId);
    if (status == null) {
        // 空ロールバック:Try未実行、サスペンション防止のレコードを挿入
        accountTccMapper.insertFreezeRecord(xid, userId, amount, branchId);
        accountTccMapper.updateFreezeStatus(xid, branchId, "CANCELLED");
        log.warn("TCC empty rollback: xid={}, branchId={}", xid, branchId);
        return true;
    }
    if ("CANCELLED".equals(status) || "CONFIRMED".equals(status)) {
        return true; // 冪等:既に処理済み
    }
    return accountTccMapper.cancelFreeze(userId, amount) > 0;
}

落とし穴3:Seataグローバルロックによるデッドロック

// ❌ 間違い:グローバルトランザクション内で同じ行をネストクエリ
@GlobalTransactional
public void processOrder(String orderNo) {
    orderMapper.updateStatus(orderNo, "PROCESSING"); // グローバルロックを取得
    Order order = orderMapper.selectByOrderNo(orderNo); // 同じ行を再度クエリ
    // 別のグローバルトランザクションが同じ行のロックを保持している場合、相互待機→デッドロック
}

// ✅ 正しい:グローバルロック保持時間を短縮、クロス更新を回避
@GlobalTransactional(timeoutMills = 30000)
public void processOrder(String orderNo) {
    Order order = orderMapper.selectByOrderNo(orderNo); // 先にクエリ
    orderMapper.updateStatus(orderNo, "PROCESSING"); // その後更新
    // または複数の短いトランザクションに分割
}

落とし穴4:アウトボックスとビジネスが同一トランザクションにない

// ❌ 間違い:先にメッセージを送信してからデータベースに書き込み
public void createOrder(Order order) {
    rocketMQTemplate.convertAndSend("order-topic", orderEvent);
    orderMapper.insert(order); // ここが失敗した場合、メッセージは既に送信済みで取り消せない
}

// ✅ 正しい:アウトボックスパターン、ビジネス操作とイベントレコードを同一トランザクションで
@Transactional
public void createOrder(Order order) {
    orderMapper.insert(order);
    outboxMapper.insert(buildOutboxEvent(order)); // 同一トランザクションで原子性を保証
}

落とし穴5:コンシューマーが冪等でない

// ❌ 間違い:直接引き落とし、重複消費で二重引き落とし
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "account-group")
public class AccountConsumer implements RocketMQListener<String> {
    public void onMessage(String message) {
        OrderEvent event = parse(message);
        accountService.deductBalance(event.getUserId(), event.getAmount()); // 重複消費!
    }
}

// ✅ 正しい:コンシューマー側の冪等チェック
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "account-group")
public class AccountConsumer implements RocketMQListener<String> {
    public void onMessage(String message) {
        OrderEvent event = parse(message);
        String key = "DEDUCT:" + event.getOrderNo();
        if (idempotentRecordService.isProcessed(key)) {
            return; // 冪等スキップ
        }
        accountService.deductBalance(event.getUserId(), event.getAmount());
        idempotentRecordService.markProcessed(key);
    }
}

エラートラブルシューティング

# エラーメッセージ 原因 解決方法
1 Saga compensation failed for step 補償操作が例外をスロー、データが手動変更された可能性 補償リトライを追加、失敗イベントを記録して手動介入
2 TCC empty rollback detected Try未実行でCancelが呼び出された(ネットワークタイムアウト) 空ロールバックレコードを挿入してサスペンション防止、Cancelを冪等に
3 Seata global lock conflict 複数のグローバルトランザクションが同じ行ロックを競合 トランザクションスコープを縮小、分離レベルを下げる、ロックリトライを増加
4 Seata undo_log not found ブランチトランザクションのロールバックでundoログが見つからない 各データベースにundo_logテーブルが存在するか確認、データソースプロキシを検証
5 RocketMQ send timeout メッセージ送信タイムアウト、Brokerに到達できない Brokerステータスを確認、送信タイムアウトを増加、アウトボックスパターンを使用
6 Message consumption duplicate メッセージの重複消費 コンシューマー冪等を実装、ユニークビジネスキーで重複排除
7 Outbox event stuck in PENDING アウトボックスイベントがリレースケジューラに消費されていない スケジューラの実行状態を確認、RocketMQ接続を確認
8 Global transaction timeout グローバルトランザクション実行タイムアウト timeoutMillsを増加、遅いSQLを最適化、長いトランザクションを分割
9 TCC resource suspended CancelがTryより先に実行、Tryが後から到着 空ロールバックレコードがTry実行をブロック、タイムアウト設定を確認
10 Compensation circular dependency 複数のSaga補償が相互依存して循環を形成 単方向補償チェーンを設計、循環補償を回避

高度な最適化

1. Sagaステートマシン永続化

package com.toolsku.saga;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.annotations.*;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Slf4j
@Service
@RequiredArgsConstructor
public class SagaStateService {

    private final SagaStateMapper sagaStateMapper;

    @Transactional(rollbackFor = Exception.class)
    public void persistSagaState(SagaDefinition saga) {
        String stateJson = serializeSagaState(saga);
        sagaStateMapper.upsert(saga.getSagaId(), stateJson, saga.getStatus().name());
    }

    public SagaDefinition recoverSaga(String sagaId) {
        String stateJson = sagaStateMapper.getState(sagaId);
        if (stateJson == null) {
            return null;
        }
        return deserializeSagaState(stateJson);
    }

    private String serializeSagaState(SagaDefinition saga) {
        StringBuilder sb = new StringBuilder();
        sb.append("{\"sagaId\":\"").append(saga.getSagaId()).append("\"");
        sb.append(",\"currentStep\":").append(saga.getCurrentStep());
        sb.append(",\"status\":\"").append(saga.getStatus().name()).append("\"");
        sb.append(",\"steps\":[");
        for (int i = 0; i < saga.getSteps().size(); i++) {
            if (i > 0) sb.append(",");
            SagaDefinition.SagaStep step = saga.getSteps().get(i);
            sb.append("{\"name\":\"").append(step.getName()).append("\"");
            sb.append(",\"status\":\"").append(step.getStepStatus().name()).append("\"}");
        }
        sb.append("]}");
        return sb.toString();
    }

    private SagaDefinition deserializeSagaState(String json) {
        throw new UnsupportedOperationException("Use Jackson for deserialization");
    }
}

@Mapper
interface SagaStateMapper {
    @Insert("INSERT INTO saga_state (saga_id, state_json, status, created_at, updated_at) " +
            "VALUES (#{sagaId}, #{stateJson}, #{status}, NOW(), NOW()) " +
            "ON DUPLICATE KEY UPDATE state_json = #{stateJson}, status = #{status}, updated_at = NOW()")
    int upsert(@Param("sagaId") String sagaId,
               @Param("stateJson") String stateJson,
               @Param("status") String status);

    @Select("SELECT state_json FROM saga_state WHERE saga_id = #{sagaId}")
    String getState(@Param("sagaId") String sagaId);
}

2. TCCブランチトランザクションレジストリ

package com.toolsku.tcc;

import lombok.Data;
import org.apache.ibatis.annotations.*;

import java.time.LocalDateTime;

@Data
public class TccBranchTransaction {
    private Long id;
    private String xid;
    private String branchId;
    private String serviceName;
    private String tryMethod;
    private String confirmMethod;
    private String cancelMethod;
    private String paramJson;
    private String status;
    private LocalDateTime createdAt;
}

@Mapper
interface TccBranchTransactionMapper {
    @Insert("INSERT INTO tcc_branch_transaction (xid, branch_id, service_name, try_method, " +
            "confirm_method, cancel_method, param_json, status, created_at) " +
            "VALUES (#{xid}, #{branchId}, #{serviceName}, #{tryMethod}, #{confirmMethod}, " +
            "#{cancelMethod}, #{paramJson}, #{status}, NOW())")
    int insert(TccBranchTransaction branch);

    @Select("SELECT * FROM tcc_branch_transaction WHERE xid = #{xid} ORDER BY created_at ASC")
    java.util.List<TccBranchTransaction> findByXid(@Param("xid") String xid);

    @Update("UPDATE tcc_branch_transaction SET status = #{status} WHERE xid = #{xid} AND branch_id = #{branchId}")
    int updateStatus(@Param("xid") String xid, @Param("branchId") String branchId, @Param("status") String status);
}

3. 分散トランザクションモニタリングダッシュボード

package com.toolsku.reliability;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.concurrent.atomic.AtomicLong;

@Slf4j
@Component
@RequiredArgsConstructor
public class TransactionMonitor {

    private final TransactionMetrics transactionMetrics;
    private final AlertService alertService;

    private final AtomicLong lastSagaFailureCount = new AtomicLong(0);
    private final AtomicLong lastSeataRollbackCount = new AtomicLong(0);

    @Scheduled(fixedRate = 60000)
    public void checkTransactionHealth() {
        double sagaFailureRate = calculateSagaFailureRate();
        if (sagaFailureRate > 0.1) {
            alertService.sendAlert("Saga failure rate too high: " + (sagaFailureRate * 100) + "%");
        }

        int activeCount = transactionMetrics.getActiveTransactionCount();
        if (activeCount > 500) {
            alertService.sendAlert("Too many active transactions: " + activeCount);
        }

        log.info("Transaction health check: sagaFailureRate={:.2f}%, activeTransactions={}",
                sagaFailureRate * 100, activeCount);
    }

    private double calculateSagaFailureRate() {
        return 0.0;
    }
}

比較分析

次元 Sagaオーケストレーション TCCパターン Seata AT メッセージ最終整合性 プロダクション信頼性
一貫性モデル 最終整合 最終整合 強整合(グローバルロック) 最終整合 最終整合
侵入性 低い(補償ロジック) 高い(Try/Confirm/Cancel) 低い(自動プロキシ) 中(アウトボックス+冪等) 高い(フルスタック)
パフォーマンス ⭐⭐⭐⭐高い ⭐⭐⭐中 ⭐⭐低い(グローバルロック) ⭐⭐⭐⭐⭐最高 ⭐⭐⭐中
実装複雑度 ⭐⭐中 ⭐⭐⭐⭐高い ⭐低い(フレームワーク担当) ⭐⭐⭐中 ⭐⭐⭐⭐⭐極めて高い
分離性 ❌なし ✅Tryフェーズ分離 ✅グローバルロック分離 ❌なし ✅オプション
補償/ロールバック ✅補償操作 ✅Cancel操作 ✅自動ロールバック ✅リトライ+手動 ✅マルチレベル
適用シナリオ 長いワークフロー編成 資金/在庫コア 従来型マイクロサービス 非同期疎結合 コアトランザクションパイプライン
フレームワーク依存 自作/Seata Saga 自作/Seata TCC Seata RocketMQ/Kafka フルスタック組み合わせ

まとめ:分散トランザクションに銀の弾丸はありません。Sagaは長いワークフロー編成に適し、補償セマンティクスは明確だが分離性はない。TCCは資金コアパイプラインに適し、分離性は強いが実装コストが高い。Seata ATは迅速な統合に適し、低侵入だがグローバルロックがパフォーマンスのボトルネック。メッセージ最終整合性は非同期疎結合に適し、最高のパフォーマンスだが冪等とアウトボックスが必要。プロダクションのコアパイプラインでは、TCC + メッセージ最終整合性 + 冪等 + モニタリングの組み合わせを推奨——本番でデータ不整合をデバッグするより、コードを2倍書く方がマシです。


オンラインツール推奨

ブラウザローカルツールを無料で試す →

#分布式事务#Saga模式#TCC#Seata#消息最终一致性#2026#技术架构