分散式交易模式實戰:從Saga到TCC的5種生產模式

技术架构

分散式交易:微服務架構下最難的工程問題

下單扣庫存扣餘額,三個服務三個庫,庫存扣了餘額沒扣——這種資料不一致在生產環境每天都在發生。你用本地交易,跨服務根本管不了;你加分散式鎖,鎖超時了交易還沒提交;你上Seata,發現全域鎖把併發打回單執行緒;你用訊息最終一致性,訊息丟了補償邏輯又寫了一堆。2026年,分散式交易依然是微服務架構中最容易翻車的環節。

本文將從5種生產模式出發,帶你完成Saga編排→TCC模式→Seata AT→訊息最終一致性→生產級可靠性保障的全鏈路實戰,每一步都有完整Java/Spring Boot程式碼和避坑指南。


分散式交易核心概念

概念 說明
本地交易 單資料來源ACID交易,無法跨服務保證一致性
2PC(兩階段提交) 協調者統一prepare/commit,同步阻塞,效能差
3PC(三階段提交) 增加CanCommit階段,降低阻塞但實作複雜
Saga模式 長交易拆分為多個本地交易,失敗時執行補償操作
TCC模式 Try-Confirm-Cancel三步操作,業務層面保證一致性
Seata AT模式 自動攔截SQL生成回滾日誌,無侵入式分散式交易
訊息最終一致性 基於訊息佇列的非同步保證,最終狀態一致而非即時
交易性發件箱 業務操作與訊息傳送在同一交易中,避免訊息遺失
全域鎖 Seata中全域交易持有的行鎖,防止髒寫
冪等性 同一操作執行多次結果一致,分散式交易的核心保障

問題分析:分散式交易的5大挑戰

  1. 跨服務資料一致性:訂單服務建立訂單、庫存服務扣減庫存、帳戶服務扣減餘額,任一步失敗需全部回滾
  2. 補償操作的原子性:Saga補償本身也可能失敗,補償失敗怎麼辦
  3. 全域鎖與併發衝突:Seata全域鎖導致效能退化,高併發場景下鎖等待超時
  4. 訊息遺失與重複消費:訊息傳送成功但消費端當機,或訊息重複消費導致重複扣款
  5. 超時與懸掛問題:TCC的Try超時後Cancel已執行,後續Confirm到達變成懸掛

分步實操:5種分散式交易實作

模式1:Saga編排模式(中央協調器)

package com.toolsku.saga;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.function.Supplier;

@Data
@Slf4j
public class SagaDefinition {

    private String sagaId;
    private List<SagaStep> steps = new ArrayList<>();
    private int currentStep = 0;
    private SagaStatus status = SagaStatus.PENDING;

    public enum SagaStatus {
        PENDING, RUNNING, COMPENSATING, COMPLETED, FAILED
    }

    @Data
    public static class SagaStep {
        private String name;
        private Supplier<Boolean> action;
        private Supplier<Boolean> compensation;
        private StepStatus stepStatus = StepStatus.PENDING;

        public enum StepStatus {
            PENDING, EXECUTING, COMPLETED, COMPENSATING, COMPENSATED, FAILED
        }
    }

    public static SagaBuilder builder() {
        return new SagaBuilder();
    }

    public static class SagaBuilder {
        private final SagaDefinition saga = new SagaDefinition();

        public SagaBuilder step(String name, Supplier<Boolean> action, Supplier<Boolean> compensation) {
            SagaStep step = new SagaStep();
            step.setName(name);
            step.setAction(action);
            step.setCompensation(compensation);
            saga.getSteps().add(step);
            return this;
        }

        public SagaDefinition build() {
            saga.setSagaId(UUID.randomUUID().toString());
            return saga;
        }
    }
}
package com.toolsku.saga;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class SagaOrchestrator {

    public SagaDefinition execute(SagaDefinition saga) {
        saga.setStatus(SagaDefinition.SagaStatus.RUNNING);
        log.info("Saga [{}] started, total steps: {}", saga.getSagaId(), saga.getSteps().size());

        for (int i = 0; i < saga.getSteps().size(); i++) {
            saga.setCurrentStep(i);
            SagaDefinition.SagaStep step = saga.getSteps().get(i);
            step.setStepStatus(SagaDefinition.SagaStep.StepStatus.EXECUTING);

            try {
                Boolean result = step.getAction().get();
                if (result == null || !result) {
                    log.error("Saga [{}] step [{}] action failed", saga.getSagaId(), step.getName());
                    step.setStepStatus(SagaDefinition.SagaStep.StepStatus.FAILED);
                    return compensate(saga);
                }
                step.setStepStatus(SagaDefinition.SagaStep.StepStatus.COMPLETED);
                log.info("Saga [{}] step [{}] completed", saga.getSagaId(), step.getName());
            } catch (Exception e) {
                log.error("Saga [{}] step [{}] exception: {}", saga.getSagaId(), step.getName(), e.getMessage(), e);
                step.setStepStatus(SagaDefinition.SagaStep.StepStatus.FAILED);
                return compensate(saga);
            }
        }

        saga.setStatus(SagaDefinition.SagaStatus.COMPLETED);
        log.info("Saga [{}] completed successfully", saga.getSagaId());
        return saga;
    }

    private SagaDefinition compensate(SagaDefinition saga) {
        saga.setStatus(SagaDefinition.SagaStatus.COMPENSATING);
        log.info("Saga [{}] compensating from step [{}]", saga.getSagaId(), saga.getCurrentStep());

        for (int i = saga.getCurrentStep(); i >= 0; i--) {
            SagaDefinition.SagaStep step = saga.getSteps().get(i);
            if (step.getStepStatus() != SagaDefinition.SagaStep.StepStatus.COMPLETED) {
                continue;
            }

            step.setStepStatus(SagaDefinition.SagaStep.StepStatus.COMPENSATING);
            try {
                Boolean result = step.getCompensation().get();
                if (result != null && result) {
                    step.setStepStatus(SagaDefinition.SagaStep.StepStatus.COMPENSATED);
                    log.info("Saga [{}] step [{}] compensated", saga.getSagaId(), step.getName());
                } else {
                    step.setStepStatus(SagaDefinition.SagaStep.StepStatus.FAILED);
                    log.error("Saga [{}] step [{}] compensation failed", saga.getSagaId(), step.getName());
                }
            } catch (Exception e) {
                step.setStepStatus(SagaDefinition.SagaStep.StepStatus.FAILED);
                log.error("Saga [{}] step [{}] compensation exception: {}", saga.getSagaId(), step.getName(), e.getMessage(), e);
            }
        }

        saga.setStatus(SagaDefinition.SagaStatus.FAILED);
        return saga;
    }
}
package com.toolsku.saga;

import com.toolsku.service.OrderService;
import com.toolsku.service.InventoryService;
import com.toolsku.service.AccountService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RequiredArgsConstructor
public class OrderSagaService {

    private final SagaOrchestrator sagaOrchestrator;
    private final OrderService orderService;
    private final InventoryService inventoryService;
    private final AccountService accountService;

    public SagaDefinition createOrder(Long userId, Long productId, Integer quantity, BigDecimal amount) {
        SagaDefinition saga = SagaDefinition.builder()
                .step("createOrder",
                        () -> orderService.createOrder(userId, productId, quantity, amount),
                        () -> orderService.cancelOrder(userId, productId))
                .step("deductInventory",
                        () -> inventoryService.deductInventory(productId, quantity),
                        () -> inventoryService.restoreInventory(productId, quantity))
                .step("deductAccount",
                        () -> accountService.deductBalance(userId, amount),
                        () -> accountService.restoreBalance(userId, amount))
                .build();

        return sagaOrchestrator.execute(saga);
    }
}

模式2:TCC模式(Try-Confirm-Cancel)

package com.toolsku.tcc;

import lombok.Data;
import java.math.BigDecimal;

@Data
public class AccountTccRequest {
    private String xid;
    private Long userId;
    private BigDecimal amount;
    private String branchId;
}
package com.toolsku.tcc;

import org.apache.ibatis.annotations.*;

@Mapper
public interface AccountTccMapper {

    @Insert("INSERT INTO account_tcc_freeze (xid, user_id, amount, status, branch_id, created_at) " +
            "VALUES (#{xid}, #{userId}, #{amount}, 'TRYING', #{branchId}, NOW())")
    int insertFreezeRecord(@Param("xid") String xid,
                           @Param("userId") Long userId,
                           @Param("amount") BigDecimal amount,
                           @Param("branchId") String branchId);

    @Update("UPDATE account SET balance = balance - #{amount}, frozen = frozen + #{amount} " +
            "WHERE user_id = #{userId} AND balance >= #{amount}")
    int freezeBalance(@Param("userId") Long userId, @Param("amount") BigDecimal amount);

    @Update("UPDATE account SET frozen = frozen - #{amount} " +
            "WHERE user_id = #{userId} AND frozen >= #{amount}")
    int confirmDeduct(@Param("userId") Long userId, @Param("amount") BigDecimal amount);

    @Update("UPDATE account SET balance = balance + #{amount}, frozen = frozen - #{amount} " +
            "WHERE user_id = #{userId} AND frozen >= #{amount}")
    int cancelFreeze(@Param("userId") Long userId, @Param("amount") BigDecimal amount);

    @Select("SELECT COUNT(*) FROM account_tcc_freeze WHERE xid = #{xid} AND branch_id = #{branchId}")
    int countFreezeRecord(@Param("xid") String xid, @Param("branchId") String branchId);

    @Update("UPDATE account_tcc_freeze SET status = #{status} WHERE xid = #{xid} AND branch_id = #{branchId}")
    int updateFreezeStatus(@Param("xid") String xid,
                           @Param("branchId") String branchId,
                           @Param("status") String status);

    @Select("SELECT status FROM account_tcc_freeze WHERE xid = #{xid} AND branch_id = #{branchId}")
    String getFreezeStatus(@Param("xid") String xid, @Param("branchId") String branchId);
}
package com.toolsku.tcc;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.math.BigDecimal;

@Slf4j
@Service
@RequiredArgsConstructor
public class AccountTccService {

    private final AccountTccMapper accountTccMapper;

    @Transactional(rollbackFor = Exception.class)
    public boolean tryDeduct(String xid, Long userId, BigDecimal amount, String branchId) {
        if (accountTccMapper.countFreezeRecord(xid, branchId) > 0) {
            log.info("TCC try already executed: xid={}, branchId={}", xid, branchId);
            return true;
        }

        int rows = accountTccMapper.freezeBalance(userId, amount);
        if (rows == 0) {
            log.warn("TCC try failed: insufficient balance, userId={}, amount={}", userId, amount);
            return false;
        }

        accountTccMapper.insertFreezeRecord(xid, userId, amount, branchId);
        log.info("TCC try success: xid={}, userId={}, amount={}", xid, userId, amount);
        return true;
    }

    @Transactional(rollbackFor = Exception.class)
    public boolean confirmDeduct(String xid, Long userId, BigDecimal amount, String branchId) {
        String status = accountTccMapper.getFreezeStatus(xid, branchId);
        if ("CONFIRMED".equals(status)) {
            log.info("TCC confirm already executed: xid={}, branchId={}", xid, branchId);
            return true;
        }

        int rows = accountTccMapper.confirmDeduct(userId, amount);
        if (rows == 0) {
            log.error("TCC confirm failed: frozen amount mismatch, userId={}, amount={}", userId, amount);
            return false;
        }

        accountTccMapper.updateFreezeStatus(xid, branchId, "CONFIRMED");
        log.info("TCC confirm success: xid={}, userId={}, amount={}", xid, userId, amount);
        return true;
    }

    @Transactional(rollbackFor = Exception.class)
    public boolean cancelFreeze(String xid, Long userId, BigDecimal amount, String branchId) {
        String status = accountTccMapper.getFreezeStatus(xid, branchId);
        if ("CANCELLED".equals(status)) {
            log.info("TCC cancel already executed: xid={}, branchId={}", xid, branchId);
            return true;
        }
        if ("CONFIRMED".equals(status)) {
            log.warn("TCC cancel skipped: already confirmed, xid={}, branchId={}", xid, branchId);
            return true;
        }

        int rows = accountTccMapper.cancelFreeze(userId, amount);
        if (rows == 0) {
            log.error("TCC cancel failed: frozen amount mismatch, userId={}, amount={}", userId, amount);
            return false;
        }

        accountTccMapper.updateFreezeStatus(xid, branchId, "CANCELLED");
        log.info("TCC cancel success: xid={}, userId={}, amount={}", xid, userId, amount);
        return true;
    }
}
package com.toolsku.tcc;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.math.BigDecimal;
import java.util.UUID;

@Slf4j
@Service
@RequiredArgsConstructor
public class TccTransactionManager {

    private final AccountTccService accountTccService;

    public boolean executeDeduct(String xid, Long userId, BigDecimal amount) {
        String branchId = UUID.randomUUID().toString();

        boolean tryResult = accountTccService.tryDeduct(xid, userId, amount, branchId);
        if (!tryResult) {
            accountTccService.cancelFreeze(xid, userId, amount, branchId);
            return false;
        }

        boolean confirmResult = accountTccService.confirmDeduct(xid, userId, amount, branchId);
        if (!confirmResult) {
            accountTccService.cancelFreeze(xid, userId, amount, branchId);
            return false;
        }

        return true;
    }
}

模式3:Seata AT模式(自動補償)

package com.toolsku.seata;

import io.seata.spring.annotation.GlobalTransactional;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.math.BigDecimal;

@Slf4j
@Service
@RequiredArgsConstructor
public class OrderSeataService {

    private final OrderClient orderClient;
    private final InventoryClient inventoryClient;
    private final AccountClient accountClient;

    @GlobalTransactional(name = "create-order", rollbackFor = Exception.class, timeoutMills = 60000)
    public String createOrder(Long userId, Long productId, Integer quantity, BigDecimal amount) {
        log.info("Seata global transaction started: userId={}, productId={}, quantity={}, amount={}",
                userId, productId, quantity, amount);

        String orderNo = orderClient.createOrder(userId, productId, quantity, amount);
        log.info("Step 1: order created, orderNo={}", orderNo);

        inventoryClient.deductInventory(productId, quantity);
        log.info("Step 2: inventory deducted, productId={}, quantity={}", productId, quantity);

        accountClient.deductBalance(userId, amount);
        log.info("Step 3: balance deducted, userId={}, amount={}", userId, amount);

        return orderNo;
    }
}
package com.toolsku.seata;

import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.*;

@FeignClient(name = "order-service", url = "${service.order.url}")
public interface OrderClient {

    @PostMapping("/api/orders")
    String createOrder(@RequestParam("userId") Long userId,
                       @RequestParam("productId") Long productId,
                       @RequestParam("quantity") Integer quantity,
                       @RequestParam("amount") BigDecimal amount);

    @DeleteMapping("/api/orders/{orderNo}")
    void cancelOrder(@PathVariable("orderNo") String orderNo);
}

@FeignClient(name = "inventory-service", url = "${service.inventory.url}")
public interface InventoryClient {

    @PostMapping("/api/inventory/deduct")
    void deductInventory(@RequestParam("productId") Long productId,
                         @RequestParam("quantity") Integer quantity);

    @PostMapping("/api/inventory/restore")
    void restoreInventory(@RequestParam("productId") Long productId,
                          @RequestParam("quantity") Integer quantity);
}

@FeignClient(name = "account-service", url = "${service.account.url}")
public interface AccountClient {

    @PostMapping("/api/accounts/deduct")
    void deductBalance(@RequestParam("userId") Long userId,
                       @RequestParam("amount") BigDecimal amount);

    @PostMapping("/api/accounts/restore")
    void restoreBalance(@RequestParam("userId") Long userId,
                        @RequestParam("amount") BigDecimal amount);
}
# application-seata.yml
seata:
  enabled: true
  application-id: order-service
  tx-service-group: toolsku-tx-group
  service:
    vgroup-mapping:
      toolsku-tx-group: default
    grouplist:
      default: 127.0.0.1:8091
  config:
    type: nacos
    nacos:
      server-addr: 127.0.0.1:8848
      namespace: seata
      group: SEATA_GROUP
  registry:
    type: nacos
    nacos:
      server-addr: 127.0.0.1:8848
      namespace: seata
      group: SEATA_GROUP
  client:
    undo:
      data-validation: true
      log-serialization: jackson
      log-table: undo_log
    lock:
      retry-interval: 10
      retry-times: 30
      retry-policy-branch-rollback-on-conflict: true
-- Seata AT模式用undo_log表(每個服務資料庫都需要一個)
CREATE TABLE IF NOT EXISTS undo_log (
    id            BIGINT       AUTO_INCREMENT PRIMARY KEY,
    branch_id     BIGINT       NOT NULL,
    xid           VARCHAR(128) NOT NULL,
    context       VARCHAR(128) NOT NULL,
    rollback_info LONGBLOB     NOT NULL,
    log_status    INT          NOT NULL,
    log_created   DATETIME     NOT NULL,
    log_modified  DATETIME     NOT NULL,
    UNIQUE KEY ux_undo_log (xid, branch_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

模式4:訊息最終一致性(交易性發件箱 + RocketMQ)

-- 交易性發件箱表
CREATE TABLE IF NOT EXISTS transactional_outbox (
    id            BIGINT       AUTO_INCREMENT PRIMARY KEY,
    aggregate_id  VARCHAR(128) NOT NULL,
    aggregate_type VARCHAR(64) NOT NULL,
    event_type    VARCHAR(128) NOT NULL,
    payload       JSON         NOT NULL,
    status        VARCHAR(32)  NOT NULL DEFAULT 'PENDING',
    retry_count   INT          NOT NULL DEFAULT 0,
    max_retry     INT          NOT NULL DEFAULT 5,
    created_at    DATETIME     NOT NULL DEFAULT CURRENT_TIMESTAMP,
    updated_at    DATETIME     NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    INDEX idx_status_created (status, created_at),
    UNIQUE KEY uk_aggregate_event (aggregate_id, aggregate_type, event_type)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
package com.toolsku.outbox;

import lombok.Data;
import java.time.LocalDateTime;

@Data
public class OutboxEvent {
    private Long id;
    private String aggregateId;
    private String aggregateType;
    private String eventType;
    private String payload;
    private String status;
    private Integer retryCount;
    private Integer maxRetry;
    private LocalDateTime createdAt;
    private LocalDateTime updatedAt;
}
package com.toolsku.outbox;

import org.apache.ibatis.annotations.*;

import java.util.List;

@Mapper
public interface OutboxMapper {

    @Insert("INSERT INTO transactional_outbox (aggregate_id, aggregate_type, event_type, payload, status) " +
            "VALUES (#{aggregateId}, #{aggregateType}, #{eventType}, #{payload}, 'PENDING')")
    @Options(useGeneratedKeys = true, keyProperty = "id")
    int insert(OutboxEvent event);

    @Select("SELECT * FROM transactional_outbox WHERE status = 'PENDING' AND retry_count < max_retry " +
            "ORDER BY created_at ASC LIMIT #{limit}")
    List<OutboxEvent> findPendingEvents(@Param("limit") int limit);

    @Update("UPDATE transactional_outbox SET status = #{status}, retry_count = retry_count + 1, " +
            "updated_at = NOW() WHERE id = #{id}")
    int updateStatus(@Param("id") Long id, @Param("status") String status);

    @Update("UPDATE transactional_outbox SET status = 'SENT', updated_at = NOW() WHERE id = #{id}")
    int markAsSent(@Param("id") Long id);

    @Update("UPDATE transactional_outbox SET status = 'FAILED', updated_at = NOW() " +
            "WHERE id = #{id} AND retry_count >= max_retry")
    int markAsFailed(@Param("id") Long id);
}
package com.toolsku.outbox;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;

@Slf4j
@Component
@RequiredArgsConstructor
public class OutboxRelayScheduler {

    private final OutboxMapper outboxMapper;
    private final RocketMQTemplate rocketMQTemplate;

    private static final int BATCH_SIZE = 50;

    @Scheduled(fixedDelay = 1000)
    public void relayPendingEvents() {
        List<OutboxEvent> events = outboxMapper.findPendingEvents(BATCH_SIZE);
        if (events.isEmpty()) {
            return;
        }

        for (OutboxEvent event : events) {
            try {
                String topic = String.format("toolsku-%s-topic", event.getAggregateType());
                String key = event.getAggregateId();

                rocketMQTemplate.syncSend(topic,
                        rocketMQTemplate.getMessageConverter().toMessage(event.getPayload(), null, key));

                outboxMapper.markAsSent(event.getId());
                log.info("Outbox event sent: id={}, type={}, aggregateId={}",
                        event.getId(), event.getEventType(), event.getAggregateId());
            } catch (Exception e) {
                outboxMapper.updateStatus(event.getId(), "PENDING");
                outboxMapper.markAsFailed(event.getId());
                log.error("Outbox event send failed: id={}, error={}", event.getId(), e.getMessage(), e);
            }
        }
    }
}
package com.toolsku.outbox;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.UUID;

@Slf4j
@Service
@RequiredArgsConstructor
public class OrderEventService {

    private final OutboxMapper outboxMapper;
    private final OrderMapper orderMapper;
    private final ObjectMapper objectMapper;

    @Transactional(rollbackFor = Exception.class)
    public String createOrderWithOutbox(Long userId, Long productId, Integer quantity, BigDecimal amount) {
        String orderNo = "ORD" + UUID.randomUUID().toString().replace("-", "").substring(0, 16).toUpperCase();

        orderMapper.insertOrder(orderNo, userId, productId, quantity, amount, "CREATED");

        try {
            String payload = objectMapper.writeValueAsString(
                    new OrderCreatedEvent(orderNo, userId, productId, quantity, amount));

            OutboxEvent event = new OutboxEvent();
            event.setAggregateId(orderNo);
            event.setAggregateType("order");
            event.setEventType("ORDER_CREATED");
            event.setPayload(payload);
            outboxMapper.insert(event);
        } catch (Exception e) {
            throw new RuntimeException("Failed to serialize order event", e);
        }

        log.info("Order created with outbox event: orderNo={}", orderNo);
        return orderNo;
    }
}
package com.toolsku.consumer;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(topic = "toolsku-order-topic", consumerGroup = "inventory-consumer-group")
public class InventoryOrderConsumer implements RocketMQListener<String> {

    private final InventoryService inventoryService;
    private final IdempotentRecordService idempotentRecordService;

    @Override
    public void onMessage(String message) {
        OrderCreatedEvent event = parseEvent(message);
        String idempotentKey = "INVENTORY_DEDUCT:" + event.getOrderNo();

        if (idempotentRecordService.isProcessed(idempotentKey)) {
            log.info("Duplicate message skipped: orderNo={}", event.getOrderNo());
            return;
        }

        try {
            inventoryService.deductInventory(event.getProductId(), event.getQuantity());
            idempotentRecordService.markProcessed(idempotentKey);
            log.info("Inventory deducted for order: orderNo={}", event.getOrderNo());
        } catch (Exception e) {
            log.error("Inventory deduction failed: orderNo={}, error={}",
                    event.getOrderNo(), e.getMessage(), e);
            throw new RuntimeException("Inventory deduction failed", e);
        }
    }
}

模式5:生產級可靠性保障

package com.toolsku.reliability;

import lombok.Data;
import org.apache.ibatis.annotations.*;

import java.time.LocalDateTime;

@Data
public class IdempotentRecord {
    private Long id;
    private String idempotentKey;
    private String status;
    private LocalDateTime createdAt;
    private LocalDateTime expireAt;
}

@Mapper
public interface IdempotentRecordMapper {

    @Insert("INSERT INTO idempotent_record (idempotent_key, status, created_at, expire_at) " +
            "VALUES (#{idempotentKey}, 'PROCESSING', NOW(), DATE_ADD(NOW(), INTERVAL 24 HOUR)) " +
            "ON DUPLICATE KEY UPDATE idempotent_key = idempotent_key")
    int tryInsert(@Param("idempotentKey") String idempotentKey);

    @Select("SELECT status FROM idempotent_record WHERE idempotent_key = #{idempotentKey}")
    String getStatus(@Param("idempotentKey") String idempotentKey);

    @Update("UPDATE idempotent_record SET status = 'PROCESSED' WHERE idempotent_key = #{idempotentKey}")
    int markProcessed(@Param("idempotentKey") String idempotentKey);

    @Delete("DELETE FROM idempotent_record WHERE expire_at < NOW()")
    int cleanExpired();
}
package com.toolsku.reliability;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RequiredArgsConstructor
public class IdempotentRecordService {

    private final IdempotentRecordMapper idempotentRecordMapper;

    public boolean isProcessed(String idempotentKey) {
        String status = idempotentRecordMapper.getStatus(idempotentKey);
        return "PROCESSED".equals(status);
    }

    public boolean tryAcquire(String idempotentKey) {
        int rows = idempotentRecordMapper.tryInsert(idempotentKey);
        if (rows == 0) {
            String status = idempotentRecordMapper.getStatus(idempotentKey);
            return "PROCESSED".equals(status);
        }
        return true;
    }

    public void markProcessed(String idempotentKey) {
        idempotentRecordMapper.markProcessed(idempotentKey);
    }
}
package com.toolsku.reliability;

import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.*;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import jakarta.servlet.http.HttpServletRequest;
import java.lang.reflect.Method;

@Slf4j
@Component
@Aspect
public class IdempotentAspect {

    private final IdempotentRecordService idempotentRecordService;

    public IdempotentAspect(IdempotentRecordService idempotentRecordService) {
        this.idempotentRecordService = idempotentRecordService;
    }

    @Around("@annotation(com.toolsku.reliability.Idempotent)")
    public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Method method = signature.getMethod();
        Idempotent annotation = method.getAnnotation(Idempotent.class);

        String idempotentKey = resolveKey(annotation);
        if (idempotentKey == null) {
            return joinPoint.proceed();
        }

        if (idempotentRecordService.isProcessed(idempotentKey)) {
            log.info("Idempotent check: already processed, key={}", idempotentKey);
            return null;
        }

        if (!idempotentRecordService.tryAcquire(idempotentKey)) {
            log.warn("Idempotent check: concurrent processing, key={}", idempotentKey);
            throw new RuntimeException("Concurrent request detected");
        }

        try {
            Object result = joinPoint.proceed();
            idempotentRecordService.markProcessed(idempotentKey);
            return result;
        } catch (Exception e) {
            log.error("Idempotent execution failed: key={}", idempotentKey, e);
            throw e;
        }
    }

    private String resolveKey(Idempotent annotation) {
        ServletRequestAttributes attributes =
                (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
        if (attributes == null) {
            return null;
        }
        HttpServletRequest request = attributes.getRequest();
        String header = request.getHeader(annotation.headerKey());
        return header != null ? annotation.prefix() + header : null;
    }
}
package com.toolsku.reliability;

import java.lang.annotation.*;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Idempotent {
    String headerKey() default "X-Idempotent-Key";
    String prefix() default "IDEMPOTENT:";
}
package com.toolsku.reliability;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.concurrent.*;

@Slf4j
@Component
public class TransactionTimeoutGuard {

    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
    private final ConcurrentHashMap<String, TimeoutTask> runningTasks = new ConcurrentHashMap<>();

    public void register(String xid, long timeoutMillis, Runnable onTimeout) {
        ScheduledFuture<?> future = scheduler.schedule(() -> {
            log.warn("Transaction timeout triggered: xid={}", xid);
            runningTasks.remove(xid);
            try {
                onTimeout.run();
            } catch (Exception e) {
                log.error("Timeout callback failed: xid={}", xid, e);
            }
        }, timeoutMillis, TimeUnit.MILLISECONDS);

        runningTasks.put(xid, new TimeoutTask(xid, future, timeoutMillis));
    }

    public void cancel(String xid) {
        TimeoutTask task = runningTasks.remove(xid);
        if (task != null) {
            task.getFuture().cancel(false);
            log.info("Transaction timeout guard cancelled: xid={}", xid);
        }
    }

    public long getRemainingTime(String xid) {
        TimeoutTask task = runningTasks.get(xid);
        if (task == null) {
            return -1;
        }
        long elapsed = System.currentTimeMillis() - task.getStartTime();
        return Math.max(0, task.getTimeoutMillis() - elapsed);
    }

    @lombok.Data
    @lombok.AllArgsConstructor
    private static class TimeoutTask {
        private String xid;
        private ScheduledFuture<?> future;
        private long timeoutMillis;
        private long startTime = System.currentTimeMillis();
    }
}
package com.toolsku.reliability;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@Component
public class TransactionMetrics {

    private final Counter sagaSuccessCounter;
    private final Counter sagaFailureCounter;
    private final Counter tccConfirmCounter;
    private final Counter tccCancelCounter;
    private final Counter seataCommitCounter;
    private final Counter seataRollbackCounter;
    private final Timer sagaExecutionTimer;
    private final AtomicInteger activeTransactionGauge;

    public TransactionMetrics(MeterRegistry registry) {
        this.sagaSuccessCounter = Counter.builder("transaction.saga.success")
                .description("Saga transaction success count").register(registry);
        this.sagaFailureCounter = Counter.builder("transaction.saga.failure")
                .description("Saga transaction failure count").register(registry);
        this.tccConfirmCounter = Counter.builder("transaction.tcc.confirm")
                .description("TCC confirm count").register(registry);
        this.tccCancelCounter = Counter.builder("transaction.tcc.cancel")
                .description("TCC cancel count").register(registry);
        this.seataCommitCounter = Counter.builder("transaction.seata.commit")
                .description("Seata commit count").register(registry);
        this.seataRollbackCounter = Counter.builder("transaction.seata.rollback")
                .description("Seata rollback count").register(registry);
        this.sagaExecutionTimer = Timer.builder("transaction.saga.duration")
                .description("Saga execution duration").register(registry);
        this.activeTransactionGauge = registry.gauge("transaction.active.count",
                new AtomicInteger(0));
    }

    public void recordSagaSuccess(long durationMs) {
        sagaSuccessCounter.increment();
        sagaExecutionTimer.record(durationMs, TimeUnit.MILLISECONDS);
    }

    public void recordSagaFailure(long durationMs) {
        sagaFailureCounter.increment();
        sagaExecutionTimer.record(durationMs, TimeUnit.MILLISECONDS);
    }

    public void recordTccConfirm() { tccConfirmCounter.increment(); }
    public void recordTccCancel() { tccCancelCounter.increment(); }
    public void recordSeataCommit() { seataCommitCounter.increment(); }
    public void recordSeataRollback() { seataRollbackCounter.increment(); }

    public void incrementActive() { activeTransactionGauge.incrementAndGet(); }
    public void decrementActive() { activeTransactionGauge.decrementAndGet(); }
    public int getActiveTransactionCount() { return activeTransactionGauge.get(); }
}

避坑指南

坑1:Saga補償操作不是回滾

// ❌ 錯誤:補償操作試圖物理刪除資料
public boolean compensateOrder(String orderNo) {
    return orderMapper.deleteById(orderNo) > 0; // 物理刪除,稽核資料遺失
}

// ✅ 正確:補償操作是語義上的反向操作
public boolean compensateOrder(String orderNo) {
    return orderMapper.updateStatus(orderNo, "CANCELLED") > 0; // 狀態變更,保留記錄
}

坑2:TCC的空回滾與懸掛

// ❌ 錯誤:Try未執行但Cancel被呼叫,Cancel直接操作資料
public boolean cancelFreeze(String xid, Long userId, BigDecimal amount) {
    return accountMapper.cancelFreeze(userId, amount) > 0; // 空回滾:凍結記錄不存在
}

// ✅ 正確:檢查Try是否執行過
public boolean cancelFreeze(String xid, Long userId, BigDecimal amount, String branchId) {
    String status = accountTccMapper.getFreezeStatus(xid, branchId);
    if (status == null) {
        // 空回滾:Try未執行,插入一條記錄防止懸掛
        accountTccMapper.insertFreezeRecord(xid, userId, amount, branchId);
        accountTccMapper.updateFreezeStatus(xid, branchId, "CANCELLED");
        log.warn("TCC empty rollback: xid={}, branchId={}", xid, branchId);
        return true;
    }
    if ("CANCELLED".equals(status) || "CONFIRMED".equals(status)) {
        return true; // 冪等:已處理
    }
    return accountTccMapper.cancelFreeze(userId, amount) > 0;
}

坑3:Seata全域鎖導致死鎖

// ❌ 錯誤:全域交易中巢狀查詢同一行資料
@GlobalTransactional
public void processOrder(String orderNo) {
    orderMapper.updateStatus(orderNo, "PROCESSING"); // 取得全域鎖
    Order order = orderMapper.selectByOrderNo(orderNo); // 再次查詢同一行
    // 如果另一個全域交易也持有該行鎖,互相等待→死鎖
}

// ✅ 正確:減少全域鎖持有時間,避免交叉更新
@GlobalTransactional(timeoutMills = 30000)
public void processOrder(String orderNo) {
    Order order = orderMapper.selectByOrderNo(orderNo); // 先查詢
    orderMapper.updateStatus(orderNo, "PROCESSING"); // 再更新
    // 或拆分為多個短交易
}

坑4:發件箱與業務不在同一交易

// ❌ 錯誤:先發訊息再寫資料庫,訊息發了但資料庫寫失敗
public void createOrder(Order order) {
    rocketMQTemplate.convertAndSend("order-topic", orderEvent);
    orderMapper.insert(order); // 如果這裡失敗,訊息已發出無法撤回
}

// ✅ 正確:發件箱模式,業務操作和事件記錄在同一交易
@Transactional
public void createOrder(Order order) {
    orderMapper.insert(order);
    outboxMapper.insert(buildOutboxEvent(order)); // 同一交易保證原子性
}

坑5:訊息消費端不冪等

// ❌ 錯誤:直接扣減,重複消費導致重複扣款
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "account-group")
public class AccountConsumer implements RocketMQListener<String> {
    public void onMessage(String message) {
        OrderEvent event = parse(message);
        accountService.deductBalance(event.getUserId(), event.getAmount()); // 重複消費!
    }
}

// ✅ 正確:消費端冪等校驗
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "account-group")
public class AccountConsumer implements RocketMQListener<String> {
    public void onMessage(String message) {
        OrderEvent event = parse(message);
        String key = "DEDUCT:" + event.getOrderNo();
        if (idempotentRecordService.isProcessed(key)) {
            return; // 冪等跳過
        }
        accountService.deductBalance(event.getUserId(), event.getAmount());
        idempotentRecordService.markProcessed(key);
    }
}

報錯排查

序號 報錯資訊 原因 解決方法
1 Saga compensation failed for step 補償操作拋異常,可能資料已被手動修改 增加補償重試,記錄補償失敗事件人工介入
2 TCC empty rollback detected Try未執行但Cancel被呼叫(網路超時) 插入空回滾記錄防止懸掛,Cancel冪等
3 Seata global lock conflict 多個全域交易競爭同一行鎖 縮小交易範圍,降低隔離級別,增加鎖重試
4 Seata undo_log not found 分支交易回滾時找不到undo日誌 檢查undo_log表是否在每個庫建立,資料來源代理是否正確
5 RocketMQ send timeout 訊息傳送超時,Broker不可達 檢查Broker狀態,增加傳送超時時間,使用發件箱模式
6 Message consumption duplicate 訊息重複消費 消費端實作冪等,使用唯一業務鍵去重
7 Outbox event stuck in PENDING 發件箱事件未被relay排程器消費 檢查排程器執行狀態,確認RocketMQ連線正常
8 Global transaction timeout 全域交易執行超時 增大timeoutMills,最佳化慢SQL,拆分長交易
9 TCC resource suspended Cancel先於Try執行,Try後續到達 空回滾記錄攔截Try執行,檢查超時配置
10 Compensation circular dependency 多個Saga補償互相依賴形成環 設計單向補償鏈,避免循環補償

進階最佳化

1. Saga狀態機持久化

package com.toolsku.saga;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.annotations.*;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Slf4j
@Service
@RequiredArgsConstructor
public class SagaStateService {

    private final SagaStateMapper sagaStateMapper;

    @Transactional(rollbackFor = Exception.class)
    public void persistSagaState(SagaDefinition saga) {
        String stateJson = serializeSagaState(saga);
        sagaStateMapper.upsert(saga.getSagaId(), stateJson, saga.getStatus().name());
    }

    public SagaDefinition recoverSaga(String sagaId) {
        String stateJson = sagaStateMapper.getState(sagaId);
        if (stateJson == null) {
            return null;
        }
        return deserializeSagaState(stateJson);
    }

    private String serializeSagaState(SagaDefinition saga) {
        StringBuilder sb = new StringBuilder();
        sb.append("{\"sagaId\":\"").append(saga.getSagaId()).append("\"");
        sb.append(",\"currentStep\":").append(saga.getCurrentStep());
        sb.append(",\"status\":\"").append(saga.getStatus().name()).append("\"");
        sb.append(",\"steps\":[");
        for (int i = 0; i < saga.getSteps().size(); i++) {
            if (i > 0) sb.append(",");
            SagaDefinition.SagaStep step = saga.getSteps().get(i);
            sb.append("{\"name\":\"").append(step.getName()).append("\"");
            sb.append(",\"status\":\"").append(step.getStepStatus().name()).append("\"}");
        }
        sb.append("]}");
        return sb.toString();
    }

    private SagaDefinition deserializeSagaState(String json) {
        throw new UnsupportedOperationException("Use Jackson for deserialization");
    }
}

@Mapper
interface SagaStateMapper {
    @Insert("INSERT INTO saga_state (saga_id, state_json, status, created_at, updated_at) " +
            "VALUES (#{sagaId}, #{stateJson}, #{status}, NOW(), NOW()) " +
            "ON DUPLICATE KEY UPDATE state_json = #{stateJson}, status = #{status}, updated_at = NOW()")
    int upsert(@Param("sagaId") String sagaId,
               @Param("stateJson") String stateJson,
               @Param("status") String status);

    @Select("SELECT state_json FROM saga_state WHERE saga_id = #{sagaId}")
    String getState(@Param("sagaId") String sagaId);
}

2. TCC分支交易註冊中心

package com.toolsku.tcc;

import lombok.Data;
import org.apache.ibatis.annotations.*;

import java.time.LocalDateTime;

@Data
public class TccBranchTransaction {
    private Long id;
    private String xid;
    private String branchId;
    private String serviceName;
    private String tryMethod;
    private String confirmMethod;
    private String cancelMethod;
    private String paramJson;
    private String status;
    private LocalDateTime createdAt;
}

@Mapper
interface TccBranchTransactionMapper {
    @Insert("INSERT INTO tcc_branch_transaction (xid, branch_id, service_name, try_method, " +
            "confirm_method, cancel_method, param_json, status, created_at) " +
            "VALUES (#{xid}, #{branchId}, #{serviceName}, #{tryMethod}, #{confirmMethod}, " +
            "#{cancelMethod}, #{paramJson}, #{status}, NOW())")
    int insert(TccBranchTransaction branch);

    @Select("SELECT * FROM tcc_branch_transaction WHERE xid = #{xid} ORDER BY created_at ASC")
    java.util.List<TccBranchTransaction> findByXid(@Param("xid") String xid);

    @Update("UPDATE tcc_branch_transaction SET status = #{status} WHERE xid = #{xid} AND branch_id = #{branchId}")
    int updateStatus(@Param("xid") String xid, @Param("branchId") String branchId, @Param("status") String status);
}

3. 分散式交易監控大盤

package com.toolsku.reliability;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.concurrent.atomic.AtomicLong;

@Slf4j
@Component
@RequiredArgsConstructor
public class TransactionMonitor {

    private final TransactionMetrics transactionMetrics;
    private final AlertService alertService;

    private final AtomicLong lastSagaFailureCount = new AtomicLong(0);
    private final AtomicLong lastSeataRollbackCount = new AtomicLong(0);

    @Scheduled(fixedRate = 60000)
    public void checkTransactionHealth() {
        double sagaFailureRate = calculateSagaFailureRate();
        if (sagaFailureRate > 0.1) {
            alertService.sendAlert("Saga failure rate too high: " + (sagaFailureRate * 100) + "%");
        }

        int activeCount = transactionMetrics.getActiveTransactionCount();
        if (activeCount > 500) {
            alertService.sendAlert("Too many active transactions: " + activeCount);
        }

        log.info("Transaction health check: sagaFailureRate={:.2f}%, activeTransactions={}",
                sagaFailureRate * 100, activeCount);
    }

    private double calculateSagaFailureRate() {
        return 0.0;
    }
}

對比分析

維度 Saga編排 TCC模式 Seata AT 訊息最終一致性 生產級可靠性
一致性模型 最終一致 最終一致 強一致(全域鎖) 最終一致 最終一致
侵入性 低(補償邏輯) 高(Try/Confirm/Cancel) 低(自動代理) 中(發件箱+冪等) 高(全套保障)
效能 ⭐⭐⭐⭐高 ⭐⭐⭐中 ⭐⭐低(全域鎖) ⭐⭐⭐⭐⭐最高 ⭐⭐⭐中
實作複雜度 ⭐⭐中 ⭐⭐⭐⭐高 ⭐低(框架承擔) ⭐⭐⭐中 ⭐⭐⭐⭐⭐極高
隔離性 ❌無隔離 ✅Try階段隔離 ✅全域鎖隔離 ❌無隔離 ✅可選隔離
補償/回滾 ✅補償操作 ✅Cancel操作 ✅自動回滾 ✅重試+人工 ✅多級保障
適用場景 長流程編排 資金/庫存核心 傳統微服務 非同步解耦場景 核心交易鏈路
框架依賴 自研/Seata Saga 自研/Seata TCC Seata RocketMQ/Kafka 全棧組合

總結:分散式交易沒有銀彈。Saga適合長流程編排,補償語義清晰但無隔離;TCC適合資金核心鏈路,隔離性強但實作成本高;Seata AT適合快速接入,低侵入但全域鎖是效能瓶頸;訊息最終一致性適合非同步解耦,效能最高但需要冪等和發件箱。生產環境的核心鏈路,建議TCC + 訊息最終一致性 + 冪等 + 監控組合使用,寧可多寫一倍程式碼,也不要在線上排查資料不一致。


線上工具推薦

本站提供瀏覽器本地工具,免註冊即可試用 →

#分布式事务#Saga模式#TCC#Seata#消息最终一致性#2026#技术架构