分布式事务模式实战:从Saga到TCC的5种生产模式

技术架构

分布式事务:微服务架构下最难的工程问题

下单扣库存扣余额,三个服务三个库,库存扣了余额没扣——这种数据不一致在生产环境每天都在发生。你用本地事务,跨服务根本管不了;你加分布式锁,锁超时了事务还没提交;你上Seata,发现全局锁把并发打回单线程;你用消息最终一致性,消息丢了补偿逻辑又写了一堆。2026年,分布式事务依然是微服务架构中最容易翻车的环节。

本文将从5种生产模式出发,带你完成Saga编排→TCC模式→Seata AT→消息最终一致性→生产级可靠性保障的全链路实战,每一步都有完整Java/Spring Boot代码和避坑指南。


分布式事务核心概念

概念 说明
本地事务 单数据源ACID事务,无法跨服务保证一致性
2PC(两阶段提交) 协调者统一prepare/commit,同步阻塞,性能差
3PC(三阶段提交) 增加CanCommit阶段,降低阻塞但实现复杂
Saga模式 长事务拆分为多个本地事务,失败时执行补偿操作
TCC模式 Try-Confirm-Cancel三步操作,业务层面保证一致性
Seata AT模式 自动拦截SQL生成回滚日志,无侵入式分布式事务
消息最终一致性 基于消息队列的异步保证,最终状态一致而非实时
事务性发件箱 业务操作与消息发送在同一事务中,避免消息丢失
全局锁 Seata中全局事务持有的行锁,防止脏写
幂等性 同一操作执行多次结果一致,分布式事务的核心保障

问题分析:分布式事务的5大挑战

  1. 跨服务数据一致性:订单服务创建订单、库存服务扣减库存、账户服务扣减余额,任一步失败需全部回滚
  2. 补偿操作的原子性:Saga补偿本身也可能失败,补偿失败怎么办
  3. 全局锁与并发冲突:Seata全局锁导致性能退化,高并发场景下锁等待超时
  4. 消息丢失与重复消费:消息发送成功但消费端宕机,或消息重复消费导致重复扣款
  5. 超时与悬挂问题:TCC的Try超时后Cancel已执行,后续Confirm到达变成悬挂

分步实操:5种分布式事务实现

模式1:Saga编排模式(中央协调器)

package com.toolsku.saga;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.function.Supplier;

@Data
@Slf4j
public class SagaDefinition {

    private String sagaId;
    private List<SagaStep> steps = new ArrayList<>();
    private int currentStep = 0;
    private SagaStatus status = SagaStatus.PENDING;

    public enum SagaStatus {
        PENDING, RUNNING, COMPENSATING, COMPLETED, FAILED
    }

    @Data
    public static class SagaStep {
        private String name;
        private Supplier<Boolean> action;
        private Supplier<Boolean> compensation;
        private StepStatus stepStatus = StepStatus.PENDING;

        public enum StepStatus {
            PENDING, EXECUTING, COMPLETED, COMPENSATING, COMPENSATED, FAILED
        }
    }

    public static SagaBuilder builder() {
        return new SagaBuilder();
    }

    public static class SagaBuilder {
        private final SagaDefinition saga = new SagaDefinition();

        public SagaBuilder step(String name, Supplier<Boolean> action, Supplier<Boolean> compensation) {
            SagaStep step = new SagaStep();
            step.setName(name);
            step.setAction(action);
            step.setCompensation(compensation);
            saga.getSteps().add(step);
            return this;
        }

        public SagaDefinition build() {
            saga.setSagaId(UUID.randomUUID().toString());
            return saga;
        }
    }
}
package com.toolsku.saga;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class SagaOrchestrator {

    public SagaDefinition execute(SagaDefinition saga) {
        saga.setStatus(SagaDefinition.SagaStatus.RUNNING);
        log.info("Saga [{}] started, total steps: {}", saga.getSagaId(), saga.getSteps().size());

        for (int i = 0; i < saga.getSteps().size(); i++) {
            saga.setCurrentStep(i);
            SagaDefinition.SagaStep step = saga.getSteps().get(i);
            step.setStepStatus(SagaDefinition.SagaStep.StepStatus.EXECUTING);

            try {
                Boolean result = step.getAction().get();
                if (result == null || !result) {
                    log.error("Saga [{}] step [{}] action failed", saga.getSagaId(), step.getName());
                    step.setStepStatus(SagaDefinition.SagaStep.StepStatus.FAILED);
                    return compensate(saga);
                }
                step.setStepStatus(SagaDefinition.SagaStep.StepStatus.COMPLETED);
                log.info("Saga [{}] step [{}] completed", saga.getSagaId(), step.getName());
            } catch (Exception e) {
                log.error("Saga [{}] step [{}] exception: {}", saga.getSagaId(), step.getName(), e.getMessage(), e);
                step.setStepStatus(SagaDefinition.SagaStep.StepStatus.FAILED);
                return compensate(saga);
            }
        }

        saga.setStatus(SagaDefinition.SagaStatus.COMPLETED);
        log.info("Saga [{}] completed successfully", saga.getSagaId());
        return saga;
    }

    private SagaDefinition compensate(SagaDefinition saga) {
        saga.setStatus(SagaDefinition.SagaStatus.COMPENSATING);
        log.info("Saga [{}] compensating from step [{}]", saga.getSagaId(), saga.getCurrentStep());

        for (int i = saga.getCurrentStep(); i >= 0; i--) {
            SagaDefinition.SagaStep step = saga.getSteps().get(i);
            if (step.getStepStatus() != SagaDefinition.SagaStep.StepStatus.COMPLETED) {
                continue;
            }

            step.setStepStatus(SagaDefinition.SagaStep.StepStatus.COMPENSATING);
            try {
                Boolean result = step.getCompensation().get();
                if (result != null && result) {
                    step.setStepStatus(SagaDefinition.SagaStep.StepStatus.COMPENSATED);
                    log.info("Saga [{}] step [{}] compensated", saga.getSagaId(), step.getName());
                } else {
                    step.setStepStatus(SagaDefinition.SagaStep.StepStatus.FAILED);
                    log.error("Saga [{}] step [{}] compensation failed", saga.getSagaId(), step.getName());
                }
            } catch (Exception e) {
                step.setStepStatus(SagaDefinition.SagaStep.StepStatus.FAILED);
                log.error("Saga [{}] step [{}] compensation exception: {}", saga.getSagaId(), step.getName(), e.getMessage(), e);
            }
        }

        saga.setStatus(SagaDefinition.SagaStatus.FAILED);
        return saga;
    }
}
package com.toolsku.saga;

import com.toolsku.service.OrderService;
import com.toolsku.service.InventoryService;
import com.toolsku.service.AccountService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RequiredArgsConstructor
public class OrderSagaService {

    private final SagaOrchestrator sagaOrchestrator;
    private final OrderService orderService;
    private final InventoryService inventoryService;
    private final AccountService accountService;

    public SagaDefinition createOrder(Long userId, Long productId, Integer quantity, BigDecimal amount) {
        SagaDefinition saga = SagaDefinition.builder()
                .step("createOrder",
                        () -> orderService.createOrder(userId, productId, quantity, amount),
                        () -> orderService.cancelOrder(userId, productId))
                .step("deductInventory",
                        () -> inventoryService.deductInventory(productId, quantity),
                        () -> inventoryService.restoreInventory(productId, quantity))
                .step("deductAccount",
                        () -> accountService.deductBalance(userId, amount),
                        () -> accountService.restoreBalance(userId, amount))
                .build();

        return sagaOrchestrator.execute(saga);
    }
}

模式2:TCC模式(Try-Confirm-Cancel)

package com.toolsku.tcc;

import lombok.Data;
import java.math.BigDecimal;

@Data
public class AccountTccRequest {
    private String xid;
    private Long userId;
    private BigDecimal amount;
    private String branchId;
}
package com.toolsku.tcc;

import org.apache.ibatis.annotations.*;

@Mapper
public interface AccountTccMapper {

    @Insert("INSERT INTO account_tcc_freeze (xid, user_id, amount, status, branch_id, created_at) " +
            "VALUES (#{xid}, #{userId}, #{amount}, 'TRYING', #{branchId}, NOW())")
    int insertFreezeRecord(@Param("xid") String xid,
                           @Param("userId") Long userId,
                           @Param("amount") BigDecimal amount,
                           @Param("branchId") String branchId);

    @Update("UPDATE account SET balance = balance - #{amount}, frozen = frozen + #{amount} " +
            "WHERE user_id = #{userId} AND balance >= #{amount}")
    int freezeBalance(@Param("userId") Long userId, @Param("amount") BigDecimal amount);

    @Update("UPDATE account SET frozen = frozen - #{amount} " +
            "WHERE user_id = #{userId} AND frozen >= #{amount}")
    int confirmDeduct(@Param("userId") Long userId, @Param("amount") BigDecimal amount);

    @Update("UPDATE account SET balance = balance + #{amount}, frozen = frozen - #{amount} " +
            "WHERE user_id = #{userId} AND frozen >= #{amount}")
    int cancelFreeze(@Param("userId") Long userId, @Param("amount") BigDecimal amount);

    @Select("SELECT COUNT(*) FROM account_tcc_freeze WHERE xid = #{xid} AND branch_id = #{branchId}")
    int countFreezeRecord(@Param("xid") String xid, @Param("branchId") String branchId);

    @Update("UPDATE account_tcc_freeze SET status = #{status} WHERE xid = #{xid} AND branch_id = #{branchId}")
    int updateFreezeStatus(@Param("xid") String xid,
                           @Param("branchId") String branchId,
                           @Param("status") String status);

    @Select("SELECT status FROM account_tcc_freeze WHERE xid = #{xid} AND branch_id = #{branchId}")
    String getFreezeStatus(@Param("xid") String xid, @Param("branchId") String branchId);
}
package com.toolsku.tcc;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.math.BigDecimal;

@Slf4j
@Service
@RequiredArgsConstructor
public class AccountTccService {

    private final AccountTccMapper accountTccMapper;

    @Transactional(rollbackFor = Exception.class)
    public boolean tryDeduct(String xid, Long userId, BigDecimal amount, String branchId) {
        if (accountTccMapper.countFreezeRecord(xid, branchId) > 0) {
            log.info("TCC try already executed: xid={}, branchId={}", xid, branchId);
            return true;
        }

        int rows = accountTccMapper.freezeBalance(userId, amount);
        if (rows == 0) {
            log.warn("TCC try failed: insufficient balance, userId={}, amount={}", userId, amount);
            return false;
        }

        accountTccMapper.insertFreezeRecord(xid, userId, amount, branchId);
        log.info("TCC try success: xid={}, userId={}, amount={}", xid, userId, amount);
        return true;
    }

    @Transactional(rollbackFor = Exception.class)
    public boolean confirmDeduct(String xid, Long userId, BigDecimal amount, String branchId) {
        String status = accountTccMapper.getFreezeStatus(xid, branchId);
        if ("CONFIRMED".equals(status)) {
            log.info("TCC confirm already executed: xid={}, branchId={}", xid, branchId);
            return true;
        }

        int rows = accountTccMapper.confirmDeduct(userId, amount);
        if (rows == 0) {
            log.error("TCC confirm failed: frozen amount mismatch, userId={}, amount={}", userId, amount);
            return false;
        }

        accountTccMapper.updateFreezeStatus(xid, branchId, "CONFIRMED");
        log.info("TCC confirm success: xid={}, userId={}, amount={}", xid, userId, amount);
        return true;
    }

    @Transactional(rollbackFor = Exception.class)
    public boolean cancelFreeze(String xid, Long userId, BigDecimal amount, String branchId) {
        String status = accountTccMapper.getFreezeStatus(xid, branchId);
        if ("CANCELLED".equals(status)) {
            log.info("TCC cancel already executed: xid={}, branchId={}", xid, branchId);
            return true;
        }
        if ("CONFIRMED".equals(status)) {
            log.warn("TCC cancel skipped: already confirmed, xid={}, branchId={}", xid, branchId);
            return true;
        }

        int rows = accountTccMapper.cancelFreeze(userId, amount);
        if (rows == 0) {
            log.error("TCC cancel failed: frozen amount mismatch, userId={}, amount={}", userId, amount);
            return false;
        }

        accountTccMapper.updateFreezeStatus(xid, branchId, "CANCELLED");
        log.info("TCC cancel success: xid={}, userId={}, amount={}", xid, userId, amount);
        return true;
    }
}
package com.toolsku.tcc;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.math.BigDecimal;
import java.util.UUID;

@Slf4j
@Service
@RequiredArgsConstructor
public class TccTransactionManager {

    private final AccountTccService accountTccService;

    public boolean executeDeduct(String xid, Long userId, BigDecimal amount) {
        String branchId = UUID.randomUUID().toString();

        boolean tryResult = accountTccService.tryDeduct(xid, userId, amount, branchId);
        if (!tryResult) {
            accountTccService.cancelFreeze(xid, userId, amount, branchId);
            return false;
        }

        boolean confirmResult = accountTccService.confirmDeduct(xid, userId, amount, branchId);
        if (!confirmResult) {
            accountTccService.cancelFreeze(xid, userId, amount, branchId);
            return false;
        }

        return true;
    }
}

模式3:Seata AT模式(自动补偿)

package com.toolsku.seata;

import io.seata.spring.annotation.GlobalTransactional;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.math.BigDecimal;

@Slf4j
@Service
@RequiredArgsConstructor
public class OrderSeataService {

    private final OrderClient orderClient;
    private final InventoryClient inventoryClient;
    private final AccountClient accountClient;

    @GlobalTransactional(name = "create-order", rollbackFor = Exception.class, timeoutMills = 60000)
    public String createOrder(Long userId, Long productId, Integer quantity, BigDecimal amount) {
        log.info("Seata global transaction started: userId={}, productId={}, quantity={}, amount={}",
                userId, productId, quantity, amount);

        String orderNo = orderClient.createOrder(userId, productId, quantity, amount);
        log.info("Step 1: order created, orderNo={}", orderNo);

        inventoryClient.deductInventory(productId, quantity);
        log.info("Step 2: inventory deducted, productId={}, quantity={}", productId, quantity);

        accountClient.deductBalance(userId, amount);
        log.info("Step 3: balance deducted, userId={}, amount={}", userId, amount);

        return orderNo;
    }
}
package com.toolsku.seata;

import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.*;

@FeignClient(name = "order-service", url = "${service.order.url}")
public interface OrderClient {

    @PostMapping("/api/orders")
    String createOrder(@RequestParam("userId") Long userId,
                       @RequestParam("productId") Long productId,
                       @RequestParam("quantity") Integer quantity,
                       @RequestParam("amount") BigDecimal amount);

    @DeleteMapping("/api/orders/{orderNo}")
    void cancelOrder(@PathVariable("orderNo") String orderNo);
}

@FeignClient(name = "inventory-service", url = "${service.inventory.url}")
public interface InventoryClient {

    @PostMapping("/api/inventory/deduct")
    void deductInventory(@RequestParam("productId") Long productId,
                         @RequestParam("quantity") Integer quantity);

    @PostMapping("/api/inventory/restore")
    void restoreInventory(@RequestParam("productId") Long productId,
                          @RequestParam("quantity") Integer quantity);
}

@FeignClient(name = "account-service", url = "${service.account.url}")
public interface AccountClient {

    @PostMapping("/api/accounts/deduct")
    void deductBalance(@RequestParam("userId") Long userId,
                       @RequestParam("amount") BigDecimal amount);

    @PostMapping("/api/accounts/restore")
    void restoreBalance(@RequestParam("userId") Long userId,
                        @RequestParam("amount") BigDecimal amount);
}
# application-seata.yml
seata:
  enabled: true
  application-id: order-service
  tx-service-group: toolsku-tx-group
  service:
    vgroup-mapping:
      toolsku-tx-group: default
    grouplist:
      default: 127.0.0.1:8091
  config:
    type: nacos
    nacos:
      server-addr: 127.0.0.1:8848
      namespace: seata
      group: SEATA_GROUP
  registry:
    type: nacos
    nacos:
      server-addr: 127.0.0.1:8848
      namespace: seata
      group: SEATA_GROUP
  client:
    undo:
      data-validation: true
      log-serialization: jackson
      log-table: undo_log
    lock:
      retry-interval: 10
      retry-times: 30
      retry-policy-branch-rollback-on-conflict: true
-- undo_log table for Seata AT mode (each service database needs one)
CREATE TABLE IF NOT EXISTS undo_log (
    id            BIGINT       AUTO_INCREMENT PRIMARY KEY,
    branch_id     BIGINT       NOT NULL,
    xid           VARCHAR(128) NOT NULL,
    context       VARCHAR(128) NOT NULL,
    rollback_info LONGBLOB     NOT NULL,
    log_status    INT          NOT NULL,
    log_created   DATETIME     NOT NULL,
    log_modified  DATETIME     NOT NULL,
    UNIQUE KEY ux_undo_log (xid, branch_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

模式4:消息最终一致性(事务性发件箱 + RocketMQ)

-- Transactional outbox table
CREATE TABLE IF NOT EXISTS transactional_outbox (
    id            BIGINT       AUTO_INCREMENT PRIMARY KEY,
    aggregate_id  VARCHAR(128) NOT NULL,
    aggregate_type VARCHAR(64) NOT NULL,
    event_type    VARCHAR(128) NOT NULL,
    payload       JSON         NOT NULL,
    status        VARCHAR(32)  NOT NULL DEFAULT 'PENDING',
    retry_count   INT          NOT NULL DEFAULT 0,
    max_retry     INT          NOT NULL DEFAULT 5,
    created_at    DATETIME     NOT NULL DEFAULT CURRENT_TIMESTAMP,
    updated_at    DATETIME     NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    INDEX idx_status_created (status, created_at),
    UNIQUE KEY uk_aggregate_event (aggregate_id, aggregate_type, event_type)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
package com.toolsku.outbox;

import lombok.Data;
import java.time.LocalDateTime;

@Data
public class OutboxEvent {
    private Long id;
    private String aggregateId;
    private String aggregateType;
    private String eventType;
    private String payload;
    private String status;
    private Integer retryCount;
    private Integer maxRetry;
    private LocalDateTime createdAt;
    private LocalDateTime updatedAt;
}
package com.toolsku.outbox;

import org.apache.ibatis.annotations.*;

import java.util.List;

@Mapper
public interface OutboxMapper {

    @Insert("INSERT INTO transactional_outbox (aggregate_id, aggregate_type, event_type, payload, status) " +
            "VALUES (#{aggregateId}, #{aggregateType}, #{eventType}, #{payload}, 'PENDING')")
    @Options(useGeneratedKeys = true, keyProperty = "id")
    int insert(OutboxEvent event);

    @Select("SELECT * FROM transactional_outbox WHERE status = 'PENDING' AND retry_count < max_retry " +
            "ORDER BY created_at ASC LIMIT #{limit}")
    List<OutboxEvent> findPendingEvents(@Param("limit") int limit);

    @Update("UPDATE transactional_outbox SET status = #{status}, retry_count = retry_count + 1, " +
            "updated_at = NOW() WHERE id = #{id}")
    int updateStatus(@Param("id") Long id, @Param("status") String status);

    @Update("UPDATE transactional_outbox SET status = 'SENT', updated_at = NOW() WHERE id = #{id}")
    int markAsSent(@Param("id") Long id);

    @Update("UPDATE transactional_outbox SET status = 'FAILED', updated_at = NOW() " +
            "WHERE id = #{id} AND retry_count >= max_retry")
    int markAsFailed(@Param("id") Long id);
}
package com.toolsku.outbox;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;

@Slf4j
@Component
@RequiredArgsConstructor
public class OutboxRelayScheduler {

    private final OutboxMapper outboxMapper;
    private final RocketMQTemplate rocketMQTemplate;

    private static final int BATCH_SIZE = 50;

    @Scheduled(fixedDelay = 1000)
    public void relayPendingEvents() {
        List<OutboxEvent> events = outboxMapper.findPendingEvents(BATCH_SIZE);
        if (events.isEmpty()) {
            return;
        }

        for (OutboxEvent event : events) {
            try {
                String topic = String.format("toolsku-%s-topic", event.getAggregateType());
                String key = event.getAggregateId();

                rocketMQTemplate.syncSend(topic,
                        rocketMQTemplate.getMessageConverter().toMessage(event.getPayload(), null, key));

                outboxMapper.markAsSent(event.getId());
                log.info("Outbox event sent: id={}, type={}, aggregateId={}",
                        event.getId(), event.getEventType(), event.getAggregateId());
            } catch (Exception e) {
                outboxMapper.updateStatus(event.getId(), "PENDING");
                outboxMapper.markAsFailed(event.getId());
                log.error("Outbox event send failed: id={}, error={}", event.getId(), e.getMessage(), e);
            }
        }
    }
}
package com.toolsku.outbox;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.UUID;

@Slf4j
@Service
@RequiredArgsConstructor
public class OrderEventService {

    private final OutboxMapper outboxMapper;
    private final OrderMapper orderMapper;
    private final ObjectMapper objectMapper;

    @Transactional(rollbackFor = Exception.class)
    public String createOrderWithOutbox(Long userId, Long productId, Integer quantity, BigDecimal amount) {
        String orderNo = "ORD" + UUID.randomUUID().toString().replace("-", "").substring(0, 16).toUpperCase();

        orderMapper.insertOrder(orderNo, userId, productId, quantity, amount, "CREATED");

        try {
            String payload = objectMapper.writeValueAsString(
                    new OrderCreatedEvent(orderNo, userId, productId, quantity, amount));

            OutboxEvent event = new OutboxEvent();
            event.setAggregateId(orderNo);
            event.setAggregateType("order");
            event.setEventType("ORDER_CREATED");
            event.setPayload(payload);
            outboxMapper.insert(event);
        } catch (Exception e) {
            throw new RuntimeException("Failed to serialize order event", e);
        }

        log.info("Order created with outbox event: orderNo={}", orderNo);
        return orderNo;
    }
}
package com.toolsku.consumer;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(topic = "toolsku-order-topic", consumerGroup = "inventory-consumer-group")
public class InventoryOrderConsumer implements RocketMQListener<String> {

    private final InventoryService inventoryService;
    private final IdempotentRecordService idempotentRecordService;

    @Override
    public void onMessage(String message) {
        OrderCreatedEvent event = parseEvent(message);
        String idempotentKey = "INVENTORY_DEDUCT:" + event.getOrderNo();

        if (idempotentRecordService.isProcessed(idempotentKey)) {
            log.info("Duplicate message skipped: orderNo={}", event.getOrderNo());
            return;
        }

        try {
            inventoryService.deductInventory(event.getProductId(), event.getQuantity());
            idempotentRecordService.markProcessed(idempotentKey);
            log.info("Inventory deducted for order: orderNo={}", event.getOrderNo());
        } catch (Exception e) {
            log.error("Inventory deduction failed: orderNo={}, error={}",
                    event.getOrderNo(), e.getMessage(), e);
            throw new RuntimeException("Inventory deduction failed", e);
        }
    }
}

模式5:生产级可靠性保障

package com.toolsku.reliability;

import lombok.Data;
import org.apache.ibatis.annotations.*;

import java.time.LocalDateTime;

@Data
public class IdempotentRecord {
    private Long id;
    private String idempotentKey;
    private String status;
    private LocalDateTime createdAt;
    private LocalDateTime expireAt;
}

@Mapper
public interface IdempotentRecordMapper {

    @Insert("INSERT INTO idempotent_record (idempotent_key, status, created_at, expire_at) " +
            "VALUES (#{idempotentKey}, 'PROCESSING', NOW(), DATE_ADD(NOW(), INTERVAL 24 HOUR)) " +
            "ON DUPLICATE KEY UPDATE idempotent_key = idempotent_key")
    int tryInsert(@Param("idempotentKey") String idempotentKey);

    @Select("SELECT status FROM idempotent_record WHERE idempotent_key = #{idempotentKey}")
    String getStatus(@Param("idempotentKey") String idempotentKey);

    @Update("UPDATE idempotent_record SET status = 'PROCESSED' WHERE idempotent_key = #{idempotentKey}")
    int markProcessed(@Param("idempotentKey") String idempotentKey);

    @Delete("DELETE FROM idempotent_record WHERE expire_at < NOW()")
    int cleanExpired();
}
package com.toolsku.reliability;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RequiredArgsConstructor
public class IdempotentRecordService {

    private final IdempotentRecordMapper idempotentRecordMapper;

    public boolean isProcessed(String idempotentKey) {
        String status = idempotentRecordMapper.getStatus(idempotentKey);
        return "PROCESSED".equals(status);
    }

    public boolean tryAcquire(String idempotentKey) {
        int rows = idempotentRecordMapper.tryInsert(idempotentKey);
        if (rows == 0) {
            String status = idempotentRecordMapper.getStatus(idempotentKey);
            return "PROCESSED".equals(status);
        }
        return true;
    }

    public void markProcessed(String idempotentKey) {
        idempotentRecordMapper.markProcessed(idempotentKey);
    }
}
package com.toolsku.reliability;

import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.*;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import jakarta.servlet.http.HttpServletRequest;
import java.lang.reflect.Method;

@Slf4j
@Component
@Aspect
public class IdempotentAspect {

    private final IdempotentRecordService idempotentRecordService;

    public IdempotentAspect(IdempotentRecordService idempotentRecordService) {
        this.idempotentRecordService = idempotentRecordService;
    }

    @Around("@annotation(com.toolsku.reliability.Idempotent)")
    public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Method method = signature.getMethod();
        Idempotent annotation = method.getAnnotation(Idempotent.class);

        String idempotentKey = resolveKey(annotation);
        if (idempotentKey == null) {
            return joinPoint.proceed();
        }

        if (idempotentRecordService.isProcessed(idempotentKey)) {
            log.info("Idempotent check: already processed, key={}", idempotentKey);
            return null;
        }

        if (!idempotentRecordService.tryAcquire(idempotentKey)) {
            log.warn("Idempotent check: concurrent processing, key={}", idempotentKey);
            throw new RuntimeException("Concurrent request detected");
        }

        try {
            Object result = joinPoint.proceed();
            idempotentRecordService.markProcessed(idempotentKey);
            return result;
        } catch (Exception e) {
            log.error("Idempotent execution failed: key={}", idempotentKey, e);
            throw e;
        }
    }

    private String resolveKey(Idempotent annotation) {
        ServletRequestAttributes attributes =
                (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
        if (attributes == null) {
            return null;
        }
        HttpServletRequest request = attributes.getRequest();
        String header = request.getHeader(annotation.headerKey());
        return header != null ? annotation.prefix() + header : null;
    }
}
package com.toolsku.reliability;

import java.lang.annotation.*;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Idempotent {
    String headerKey() default "X-Idempotent-Key";
    String prefix() default "IDEMPOTENT:";
}
package com.toolsku.reliability;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.concurrent.*;

@Slf4j
@Component
public class TransactionTimeoutGuard {

    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
    private final ConcurrentHashMap<String, TimeoutTask> runningTasks = new ConcurrentHashMap<>();

    public void register(String xid, long timeoutMillis, Runnable onTimeout) {
        ScheduledFuture<?> future = scheduler.schedule(() -> {
            log.warn("Transaction timeout triggered: xid={}", xid);
            runningTasks.remove(xid);
            try {
                onTimeout.run();
            } catch (Exception e) {
                log.error("Timeout callback failed: xid={}", xid, e);
            }
        }, timeoutMillis, TimeUnit.MILLISECONDS);

        runningTasks.put(xid, new TimeoutTask(xid, future, timeoutMillis));
    }

    public void cancel(String xid) {
        TimeoutTask task = runningTasks.remove(xid);
        if (task != null) {
            task.getFuture().cancel(false);
            log.info("Transaction timeout guard cancelled: xid={}", xid);
        }
    }

    public long getRemainingTime(String xid) {
        TimeoutTask task = runningTasks.get(xid);
        if (task == null) {
            return -1;
        }
        long elapsed = System.currentTimeMillis() - task.getStartTime();
        return Math.max(0, task.getTimeoutMillis() - elapsed);
    }

    @lombok.Data
    @lombok.AllArgsConstructor
    private static class TimeoutTask {
        private String xid;
        private ScheduledFuture<?> future;
        private long timeoutMillis;
        private long startTime = System.currentTimeMillis();
    }
}
package com.toolsku.reliability;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@Component
public class TransactionMetrics {

    private final Counter sagaSuccessCounter;
    private final Counter sagaFailureCounter;
    private final Counter tccConfirmCounter;
    private final Counter tccCancelCounter;
    private final Counter seataCommitCounter;
    private final Counter seataRollbackCounter;
    private final Timer sagaExecutionTimer;
    private final AtomicInteger activeTransactionGauge;

    public TransactionMetrics(MeterRegistry registry) {
        this.sagaSuccessCounter = Counter.builder("transaction.saga.success")
                .description("Saga transaction success count").register(registry);
        this.sagaFailureCounter = Counter.builder("transaction.saga.failure")
                .description("Saga transaction failure count").register(registry);
        this.tccConfirmCounter = Counter.builder("transaction.tcc.confirm")
                .description("TCC confirm count").register(registry);
        this.tccCancelCounter = Counter.builder("transaction.tcc.cancel")
                .description("TCC cancel count").register(registry);
        this.seataCommitCounter = Counter.builder("transaction.seata.commit")
                .description("Seata commit count").register(registry);
        this.seataRollbackCounter = Counter.builder("transaction.seata.rollback")
                .description("Seata rollback count").register(registry);
        this.sagaExecutionTimer = Timer.builder("transaction.saga.duration")
                .description("Saga execution duration").register(registry);
        this.activeTransactionGauge = registry.gauge("transaction.active.count",
                new AtomicInteger(0));
    }

    public void recordSagaSuccess(long durationMs) {
        sagaSuccessCounter.increment();
        sagaExecutionTimer.record(durationMs, TimeUnit.MILLISECONDS);
    }

    public void recordSagaFailure(long durationMs) {
        sagaFailureCounter.increment();
        sagaExecutionTimer.record(durationMs, TimeUnit.MILLISECONDS);
    }

    public void recordTccConfirm() { tccConfirmCounter.increment(); }
    public void recordTccCancel() { tccCancelCounter.increment(); }
    public void recordSeataCommit() { seataCommitCounter.increment(); }
    public void recordSeataRollback() { seataRollbackCounter.increment(); }

    public void incrementActive() { activeTransactionGauge.incrementAndGet(); }
    public void decrementActive() { activeTransactionGauge.decrementAndGet(); }
}

避坑指南

坑1:Saga补偿操作不是回滚

// ❌ 错误:补偿操作试图回滚数据到之前的状态
public boolean compensateOrder(String orderNo) {
    return orderMapper.deleteById(orderNo) > 0; // 物理删除,审计数据丢失
}

// ✅ 正确:补偿操作是语义上的反向操作
public boolean compensateOrder(String orderNo) {
    return orderMapper.updateStatus(orderNo, "CANCELLED") > 0; // 状态变更,保留记录
}

坑2:TCC的空回滚与悬挂

// ❌ 错误:Try未执行但Cancel被调用,Cancel直接操作数据
public boolean cancelFreeze(String xid, Long userId, BigDecimal amount) {
    return accountMapper.cancelFreeze(userId, amount) > 0; // 空回滚:冻结记录不存在
}

// ✅ 正确:检查Try是否执行过
public boolean cancelFreeze(String xid, Long userId, BigDecimal amount, String branchId) {
    String status = accountTccMapper.getFreezeStatus(xid, branchId);
    if (status == null) {
        // 空回滚:Try未执行,插入一条记录防止悬挂
        accountTccMapper.insertFreezeRecord(xid, userId, amount, branchId);
        accountTccMapper.updateFreezeStatus(xid, branchId, "CANCELLED");
        log.warn("TCC empty rollback: xid={}, branchId={}", xid, branchId);
        return true;
    }
    if ("CANCELLED".equals(status) || "CONFIRMED".equals(status)) {
        return true; // 幂等:已处理
    }
    return accountTccMapper.cancelFreeze(userId, amount) > 0;
}

坑3:Seata全局锁导致死锁

// ❌ 错误:全局事务中嵌套查询同一行数据
@GlobalTransactional
public void processOrder(String orderNo) {
    orderMapper.updateStatus(orderNo, "PROCESSING"); // 获取全局锁
    Order order = orderMapper.selectByOrderNo(orderNo); // 再次查询同一行
    // 如果另一个全局事务也持有该行锁,互相等待→死锁
}

// ✅ 正确:减少全局锁持有时间,避免交叉更新
@GlobalTransactional(timeoutMills = 30000)
public void processOrder(String orderNo) {
    Order order = orderMapper.selectByOrderNo(orderNo); // 先查询
    orderMapper.updateStatus(orderNo, "PROCESSING"); // 再更新
    // 或拆分为多个短事务
}

坑4:消息发件箱与业务不在同一事务

// ❌ 错误:先发消息再写数据库,消息发了但数据库写失败
public void createOrder(Order order) {
    rocketMQTemplate.convertAndSend("order-topic", orderEvent);
    orderMapper.insert(order); // 如果这里失败,消息已发出无法撤回
}

// ✅ 正确:发件箱模式,业务操作和事件记录在同一事务
@Transactional
public void createOrder(Order order) {
    orderMapper.insert(order);
    outboxMapper.insert(buildOutboxEvent(order)); // 同一事务保证原子性
}

坑5:消息消费端不幂等

// ❌ 错误:直接扣减,重复消费导致重复扣款
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "account-group")
public class AccountConsumer implements RocketMQListener<String> {
    public void onMessage(String message) {
        OrderEvent event = parse(message);
        accountService.deductBalance(event.getUserId(), event.getAmount()); // 重复消费!
    }
}

// ✅ 正确:消费端幂等校验
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "account-group")
public class AccountConsumer implements RocketMQListener<String> {
    public void onMessage(String message) {
        OrderEvent event = parse(message);
        String key = "DEDUCT:" + event.getOrderNo();
        if (idempotentRecordService.isProcessed(key)) {
            return; // 幂等跳过
        }
        accountService.deductBalance(event.getUserId(), event.getAmount());
        idempotentRecordService.markProcessed(key);
    }
}

报错排查

序号 报错信息 原因 解决方法
1 Saga compensation failed for step 补偿操作抛异常,可能数据已被手动修改 增加补偿重试,记录补偿失败事件人工介入
2 TCC empty rollback detected Try未执行但Cancel被调用(网络超时) 插入空回滚记录防止悬挂,Cancel幂等
3 Seata global lock conflict 多个全局事务竞争同一行锁 缩小事务范围,降低隔离级别,增加锁重试
4 Seata undo_log not found 分支事务回滚时找不到undo日志 检查undo_log表是否在每个库创建,数据源代理是否正确
5 RocketMQ send timeout 消息发送超时,Broker不可达 检查Broker状态,增加发送超时时间,使用发件箱模式
6 Message consumption duplicate 消息重复消费 消费端实现幂等,使用唯一业务键去重
7 Outbox event stuck in PENDING 发件箱事件未被relay调度器消费 检查调度器运行状态,确认RocketMQ连接正常
8 Global transaction timeout 全局事务执行超时 增大timeoutMills,优化慢SQL,拆分长事务
9 TCC resource suspended Cancel先于Try执行,Try后续到达 空回滚记录拦截Try执行,检查超时配置
10 Compensation circular dependency 多个Saga补偿互相依赖形成环 设计单向补偿链,避免循环补偿

进阶优化

1. Saga状态机持久化

package com.toolsku.saga;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.annotations.*;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Slf4j
@Service
@RequiredArgsConstructor
public class SagaStateService {

    private final SagaStateMapper sagaStateMapper;

    @Transactional(rollbackFor = Exception.class)
    public void persistSagaState(SagaDefinition saga) {
        String stateJson = serializeSagaState(saga);
        sagaStateMapper.upsert(saga.getSagaId(), stateJson, saga.getStatus().name());
    }

    public SagaDefinition recoverSaga(String sagaId) {
        String stateJson = sagaStateMapper.getState(sagaId);
        if (stateJson == null) {
            return null;
        }
        return deserializeSagaState(stateJson);
    }

    private String serializeSagaState(SagaDefinition saga) {
        // 序列化saga状态为JSON
        StringBuilder sb = new StringBuilder();
        sb.append("{\"sagaId\":\"").append(saga.getSagaId()).append("\"");
        sb.append(",\"currentStep\":").append(saga.getCurrentStep());
        sb.append(",\"status\":\"").append(saga.getStatus().name()).append("\"");
        sb.append(",\"steps\":[");
        for (int i = 0; i < saga.getSteps().size(); i++) {
            if (i > 0) sb.append(",");
            SagaDefinition.SagaStep step = saga.getSteps().get(i);
            sb.append("{\"name\":\"").append(step.getName()).append("\"");
            sb.append(",\"status\":\"").append(step.getStepStatus().name()).append("\"}");
        }
        sb.append("]}");
        return sb.toString();
    }

    private SagaDefinition deserializeSagaState(String json) {
        // 反序列化并重建SagaDefinition
        // 实际项目中使用Jackson/Gson
        throw new UnsupportedOperationException("Use Jackson for deserialization");
    }
}

@Mapper
interface SagaStateMapper {
    @Insert("INSERT INTO saga_state (saga_id, state_json, status, created_at, updated_at) " +
            "VALUES (#{sagaId}, #{stateJson}, #{status}, NOW(), NOW()) " +
            "ON DUPLICATE KEY UPDATE state_json = #{stateJson}, status = #{status}, updated_at = NOW()")
    int upsert(@Param("sagaId") String sagaId,
               @Param("stateJson") String stateJson,
               @Param("status") String status);

    @Select("SELECT state_json FROM saga_state WHERE saga_id = #{sagaId}")
    String getState(@Param("sagaId") String sagaId);
}

2. TCC分支事务注册中心

package com.toolsku.tcc;

import lombok.Data;
import org.apache.ibatis.annotations.*;

import java.math.BigDecimal;
import java.time.LocalDateTime;

@Data
public class TccBranchTransaction {
    private Long id;
    private String xid;
    private String branchId;
    private String serviceName;
    private String tryMethod;
    private String confirmMethod;
    private String cancelMethod;
    private String paramJson;
    private String status;
    private LocalDateTime createdAt;
}

@Mapper
interface TccBranchTransactionMapper {
    @Insert("INSERT INTO tcc_branch_transaction (xid, branch_id, service_name, try_method, " +
            "confirm_method, cancel_method, param_json, status, created_at) " +
            "VALUES (#{xid}, #{branchId}, #{serviceName}, #{tryMethod}, #{confirmMethod}, " +
            "#{cancelMethod}, #{paramJson}, #{status}, NOW())")
    int insert(TccBranchTransaction branch);

    @Select("SELECT * FROM tcc_branch_transaction WHERE xid = #{xid} ORDER BY created_at ASC")
    java.util.List<TccBranchTransaction> findByXid(@Param("xid") String xid);

    @Update("UPDATE tcc_branch_transaction SET status = #{status} WHERE xid = #{xid} AND branch_id = #{branchId}")
    int updateStatus(@Param("xid") String xid, @Param("branchId") String branchId, @Param("status") String status);
}

3. 分布式事务监控大盘

package com.toolsku.reliability;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.concurrent.atomic.AtomicLong;

@Slf4j
@Component
@RequiredArgsConstructor
public class TransactionMonitor {

    private final TransactionMetrics transactionMetrics;
    private final AlertService alertService;

    private final AtomicLong lastSagaFailureCount = new AtomicLong(0);
    private final AtomicLong lastSeataRollbackCount = new AtomicLong(0);

    @Scheduled(fixedRate = 60000)
    public void checkTransactionHealth() {
        double sagaFailureRate = calculateSagaFailureRate();
        if (sagaFailureRate > 0.1) {
            alertService.sendAlert("Saga failure rate too high: " + (sagaFailureRate * 100) + "%");
        }

        int activeCount = transactionMetrics.getActiveTransactionCount();
        if (activeCount > 500) {
            alertService.sendAlert("Too many active transactions: " + activeCount);
        }

        log.info("Transaction health check: sagaFailureRate={:.2f}%, activeTransactions={}",
                sagaFailureRate * 100, activeCount);
    }

    private double calculateSagaFailureRate() {
        // 实际从metrics中获取最近1小时的success/failure计数
        return 0.0;
    }
}

对比分析

维度 Saga编排 TCC模式 Seata AT 消息最终一致性 生产级可靠性
一致性模型 最终一致 最终一致 强一致(全局锁) 最终一致 最终一致
侵入性 低(补偿逻辑) 高(Try/Confirm/Cancel) 低(自动代理) 中(发件箱+幂等) 高(全套保障)
性能 ⭐⭐⭐⭐高 ⭐⭐⭐中 ⭐⭐低(全局锁) ⭐⭐⭐⭐⭐最高 ⭐⭐⭐中
实现复杂度 ⭐⭐中 ⭐⭐⭐⭐高 ⭐低(框架承担) ⭐⭐⭐中 ⭐⭐⭐⭐⭐极高
隔离性 ❌无隔离 ✅Try阶段隔离 ✅全局锁隔离 ❌无隔离 ✅可选隔离
补偿/回滚 ✅补偿操作 ✅Cancel操作 ✅自动回滚 ✅重试+人工 ✅多级保障
适用场景 长流程编排 资金/库存核心 传统微服务 异步解耦场景 核心交易链路
框架依赖 自研/Seata Saga 自研/Seata TCC Seata RocketMQ/Kafka 全栈组合

总结:分布式事务没有银弹。Saga适合长流程编排,补偿语义清晰但无隔离;TCC适合资金核心链路,隔离性强但实现成本高;Seata AT适合快速接入,低侵入但全局锁是性能瓶颈;消息最终一致性适合异步解耦,性能最高但需要幂等和发件箱。生产环境的核心链路,建议TCC + 消息最终一致性 + 幂等 + 监控组合使用,宁可多写一倍代码,也不要在线上排查数据不一致。


在线工具推荐

本站提供浏览器本地工具,免注册即可试用 →

#分布式事务#Saga模式#TCC#Seata#消息最终一致性#2026#技术架构