分散式交易模式實戰:從Saga到TCC的5種生產模式
技术架构
分散式交易:微服務架構下最難的工程問題
下單扣庫存扣餘額,三個服務三個庫,庫存扣了餘額沒扣——這種資料不一致在生產環境每天都在發生。你用本地交易,跨服務根本管不了;你加分散式鎖,鎖超時了交易還沒提交;你上Seata,發現全域鎖把併發打回單執行緒;你用訊息最終一致性,訊息丟了補償邏輯又寫了一堆。2026年,分散式交易依然是微服務架構中最容易翻車的環節。
本文將從5種生產模式出發,帶你完成Saga編排→TCC模式→Seata AT→訊息最終一致性→生產級可靠性保障的全鏈路實戰,每一步都有完整Java/Spring Boot程式碼和避坑指南。
分散式交易核心概念
| 概念 | 說明 |
|---|---|
| 本地交易 | 單資料來源ACID交易,無法跨服務保證一致性 |
| 2PC(兩階段提交) | 協調者統一prepare/commit,同步阻塞,效能差 |
| 3PC(三階段提交) | 增加CanCommit階段,降低阻塞但實作複雜 |
| Saga模式 | 長交易拆分為多個本地交易,失敗時執行補償操作 |
| TCC模式 | Try-Confirm-Cancel三步操作,業務層面保證一致性 |
| Seata AT模式 | 自動攔截SQL生成回滾日誌,無侵入式分散式交易 |
| 訊息最終一致性 | 基於訊息佇列的非同步保證,最終狀態一致而非即時 |
| 交易性發件箱 | 業務操作與訊息傳送在同一交易中,避免訊息遺失 |
| 全域鎖 | Seata中全域交易持有的行鎖,防止髒寫 |
| 冪等性 | 同一操作執行多次結果一致,分散式交易的核心保障 |
問題分析:分散式交易的5大挑戰
- 跨服務資料一致性:訂單服務建立訂單、庫存服務扣減庫存、帳戶服務扣減餘額,任一步失敗需全部回滾
- 補償操作的原子性:Saga補償本身也可能失敗,補償失敗怎麼辦
- 全域鎖與併發衝突:Seata全域鎖導致效能退化,高併發場景下鎖等待超時
- 訊息遺失與重複消費:訊息傳送成功但消費端當機,或訊息重複消費導致重複扣款
- 超時與懸掛問題:TCC的Try超時後Cancel已執行,後續Confirm到達變成懸掛
分步實操:5種分散式交易實作
模式1:Saga編排模式(中央協調器)
package com.toolsku.saga;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.function.Supplier;
@Data
@Slf4j
public class SagaDefinition {
private String sagaId;
private List<SagaStep> steps = new ArrayList<>();
private int currentStep = 0;
private SagaStatus status = SagaStatus.PENDING;
public enum SagaStatus {
PENDING, RUNNING, COMPENSATING, COMPLETED, FAILED
}
@Data
public static class SagaStep {
private String name;
private Supplier<Boolean> action;
private Supplier<Boolean> compensation;
private StepStatus stepStatus = StepStatus.PENDING;
public enum StepStatus {
PENDING, EXECUTING, COMPLETED, COMPENSATING, COMPENSATED, FAILED
}
}
public static SagaBuilder builder() {
return new SagaBuilder();
}
public static class SagaBuilder {
private final SagaDefinition saga = new SagaDefinition();
public SagaBuilder step(String name, Supplier<Boolean> action, Supplier<Boolean> compensation) {
SagaStep step = new SagaStep();
step.setName(name);
step.setAction(action);
step.setCompensation(compensation);
saga.getSteps().add(step);
return this;
}
public SagaDefinition build() {
saga.setSagaId(UUID.randomUUID().toString());
return saga;
}
}
}
package com.toolsku.saga;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class SagaOrchestrator {
public SagaDefinition execute(SagaDefinition saga) {
saga.setStatus(SagaDefinition.SagaStatus.RUNNING);
log.info("Saga [{}] started, total steps: {}", saga.getSagaId(), saga.getSteps().size());
for (int i = 0; i < saga.getSteps().size(); i++) {
saga.setCurrentStep(i);
SagaDefinition.SagaStep step = saga.getSteps().get(i);
step.setStepStatus(SagaDefinition.SagaStep.StepStatus.EXECUTING);
try {
Boolean result = step.getAction().get();
if (result == null || !result) {
log.error("Saga [{}] step [{}] action failed", saga.getSagaId(), step.getName());
step.setStepStatus(SagaDefinition.SagaStep.StepStatus.FAILED);
return compensate(saga);
}
step.setStepStatus(SagaDefinition.SagaStep.StepStatus.COMPLETED);
log.info("Saga [{}] step [{}] completed", saga.getSagaId(), step.getName());
} catch (Exception e) {
log.error("Saga [{}] step [{}] exception: {}", saga.getSagaId(), step.getName(), e.getMessage(), e);
step.setStepStatus(SagaDefinition.SagaStep.StepStatus.FAILED);
return compensate(saga);
}
}
saga.setStatus(SagaDefinition.SagaStatus.COMPLETED);
log.info("Saga [{}] completed successfully", saga.getSagaId());
return saga;
}
private SagaDefinition compensate(SagaDefinition saga) {
saga.setStatus(SagaDefinition.SagaStatus.COMPENSATING);
log.info("Saga [{}] compensating from step [{}]", saga.getSagaId(), saga.getCurrentStep());
for (int i = saga.getCurrentStep(); i >= 0; i--) {
SagaDefinition.SagaStep step = saga.getSteps().get(i);
if (step.getStepStatus() != SagaDefinition.SagaStep.StepStatus.COMPLETED) {
continue;
}
step.setStepStatus(SagaDefinition.SagaStep.StepStatus.COMPENSATING);
try {
Boolean result = step.getCompensation().get();
if (result != null && result) {
step.setStepStatus(SagaDefinition.SagaStep.StepStatus.COMPENSATED);
log.info("Saga [{}] step [{}] compensated", saga.getSagaId(), step.getName());
} else {
step.setStepStatus(SagaDefinition.SagaStep.StepStatus.FAILED);
log.error("Saga [{}] step [{}] compensation failed", saga.getSagaId(), step.getName());
}
} catch (Exception e) {
step.setStepStatus(SagaDefinition.SagaStep.StepStatus.FAILED);
log.error("Saga [{}] step [{}] compensation exception: {}", saga.getSagaId(), step.getName(), e.getMessage(), e);
}
}
saga.setStatus(SagaDefinition.SagaStatus.FAILED);
return saga;
}
}
package com.toolsku.saga;
import com.toolsku.service.OrderService;
import com.toolsku.service.InventoryService;
import com.toolsku.service.AccountService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RequiredArgsConstructor
public class OrderSagaService {
private final SagaOrchestrator sagaOrchestrator;
private final OrderService orderService;
private final InventoryService inventoryService;
private final AccountService accountService;
public SagaDefinition createOrder(Long userId, Long productId, Integer quantity, BigDecimal amount) {
SagaDefinition saga = SagaDefinition.builder()
.step("createOrder",
() -> orderService.createOrder(userId, productId, quantity, amount),
() -> orderService.cancelOrder(userId, productId))
.step("deductInventory",
() -> inventoryService.deductInventory(productId, quantity),
() -> inventoryService.restoreInventory(productId, quantity))
.step("deductAccount",
() -> accountService.deductBalance(userId, amount),
() -> accountService.restoreBalance(userId, amount))
.build();
return sagaOrchestrator.execute(saga);
}
}
模式2:TCC模式(Try-Confirm-Cancel)
package com.toolsku.tcc;
import lombok.Data;
import java.math.BigDecimal;
@Data
public class AccountTccRequest {
private String xid;
private Long userId;
private BigDecimal amount;
private String branchId;
}
package com.toolsku.tcc;
import org.apache.ibatis.annotations.*;
@Mapper
public interface AccountTccMapper {
@Insert("INSERT INTO account_tcc_freeze (xid, user_id, amount, status, branch_id, created_at) " +
"VALUES (#{xid}, #{userId}, #{amount}, 'TRYING', #{branchId}, NOW())")
int insertFreezeRecord(@Param("xid") String xid,
@Param("userId") Long userId,
@Param("amount") BigDecimal amount,
@Param("branchId") String branchId);
@Update("UPDATE account SET balance = balance - #{amount}, frozen = frozen + #{amount} " +
"WHERE user_id = #{userId} AND balance >= #{amount}")
int freezeBalance(@Param("userId") Long userId, @Param("amount") BigDecimal amount);
@Update("UPDATE account SET frozen = frozen - #{amount} " +
"WHERE user_id = #{userId} AND frozen >= #{amount}")
int confirmDeduct(@Param("userId") Long userId, @Param("amount") BigDecimal amount);
@Update("UPDATE account SET balance = balance + #{amount}, frozen = frozen - #{amount} " +
"WHERE user_id = #{userId} AND frozen >= #{amount}")
int cancelFreeze(@Param("userId") Long userId, @Param("amount") BigDecimal amount);
@Select("SELECT COUNT(*) FROM account_tcc_freeze WHERE xid = #{xid} AND branch_id = #{branchId}")
int countFreezeRecord(@Param("xid") String xid, @Param("branchId") String branchId);
@Update("UPDATE account_tcc_freeze SET status = #{status} WHERE xid = #{xid} AND branch_id = #{branchId}")
int updateFreezeStatus(@Param("xid") String xid,
@Param("branchId") String branchId,
@Param("status") String status);
@Select("SELECT status FROM account_tcc_freeze WHERE xid = #{xid} AND branch_id = #{branchId}")
String getFreezeStatus(@Param("xid") String xid, @Param("branchId") String branchId);
}
package com.toolsku.tcc;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.math.BigDecimal;
@Slf4j
@Service
@RequiredArgsConstructor
public class AccountTccService {
private final AccountTccMapper accountTccMapper;
@Transactional(rollbackFor = Exception.class)
public boolean tryDeduct(String xid, Long userId, BigDecimal amount, String branchId) {
if (accountTccMapper.countFreezeRecord(xid, branchId) > 0) {
log.info("TCC try already executed: xid={}, branchId={}", xid, branchId);
return true;
}
int rows = accountTccMapper.freezeBalance(userId, amount);
if (rows == 0) {
log.warn("TCC try failed: insufficient balance, userId={}, amount={}", userId, amount);
return false;
}
accountTccMapper.insertFreezeRecord(xid, userId, amount, branchId);
log.info("TCC try success: xid={}, userId={}, amount={}", xid, userId, amount);
return true;
}
@Transactional(rollbackFor = Exception.class)
public boolean confirmDeduct(String xid, Long userId, BigDecimal amount, String branchId) {
String status = accountTccMapper.getFreezeStatus(xid, branchId);
if ("CONFIRMED".equals(status)) {
log.info("TCC confirm already executed: xid={}, branchId={}", xid, branchId);
return true;
}
int rows = accountTccMapper.confirmDeduct(userId, amount);
if (rows == 0) {
log.error("TCC confirm failed: frozen amount mismatch, userId={}, amount={}", userId, amount);
return false;
}
accountTccMapper.updateFreezeStatus(xid, branchId, "CONFIRMED");
log.info("TCC confirm success: xid={}, userId={}, amount={}", xid, userId, amount);
return true;
}
@Transactional(rollbackFor = Exception.class)
public boolean cancelFreeze(String xid, Long userId, BigDecimal amount, String branchId) {
String status = accountTccMapper.getFreezeStatus(xid, branchId);
if ("CANCELLED".equals(status)) {
log.info("TCC cancel already executed: xid={}, branchId={}", xid, branchId);
return true;
}
if ("CONFIRMED".equals(status)) {
log.warn("TCC cancel skipped: already confirmed, xid={}, branchId={}", xid, branchId);
return true;
}
int rows = accountTccMapper.cancelFreeze(userId, amount);
if (rows == 0) {
log.error("TCC cancel failed: frozen amount mismatch, userId={}, amount={}", userId, amount);
return false;
}
accountTccMapper.updateFreezeStatus(xid, branchId, "CANCELLED");
log.info("TCC cancel success: xid={}, userId={}, amount={}", xid, userId, amount);
return true;
}
}
package com.toolsku.tcc;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
import java.util.UUID;
@Slf4j
@Service
@RequiredArgsConstructor
public class TccTransactionManager {
private final AccountTccService accountTccService;
public boolean executeDeduct(String xid, Long userId, BigDecimal amount) {
String branchId = UUID.randomUUID().toString();
boolean tryResult = accountTccService.tryDeduct(xid, userId, amount, branchId);
if (!tryResult) {
accountTccService.cancelFreeze(xid, userId, amount, branchId);
return false;
}
boolean confirmResult = accountTccService.confirmDeduct(xid, userId, amount, branchId);
if (!confirmResult) {
accountTccService.cancelFreeze(xid, userId, amount, branchId);
return false;
}
return true;
}
}
模式3:Seata AT模式(自動補償)
package com.toolsku.seata;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
@Slf4j
@Service
@RequiredArgsConstructor
public class OrderSeataService {
private final OrderClient orderClient;
private final InventoryClient inventoryClient;
private final AccountClient accountClient;
@GlobalTransactional(name = "create-order", rollbackFor = Exception.class, timeoutMills = 60000)
public String createOrder(Long userId, Long productId, Integer quantity, BigDecimal amount) {
log.info("Seata global transaction started: userId={}, productId={}, quantity={}, amount={}",
userId, productId, quantity, amount);
String orderNo = orderClient.createOrder(userId, productId, quantity, amount);
log.info("Step 1: order created, orderNo={}", orderNo);
inventoryClient.deductInventory(productId, quantity);
log.info("Step 2: inventory deducted, productId={}, quantity={}", productId, quantity);
accountClient.deductBalance(userId, amount);
log.info("Step 3: balance deducted, userId={}, amount={}", userId, amount);
return orderNo;
}
}
package com.toolsku.seata;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.*;
@FeignClient(name = "order-service", url = "${service.order.url}")
public interface OrderClient {
@PostMapping("/api/orders")
String createOrder(@RequestParam("userId") Long userId,
@RequestParam("productId") Long productId,
@RequestParam("quantity") Integer quantity,
@RequestParam("amount") BigDecimal amount);
@DeleteMapping("/api/orders/{orderNo}")
void cancelOrder(@PathVariable("orderNo") String orderNo);
}
@FeignClient(name = "inventory-service", url = "${service.inventory.url}")
public interface InventoryClient {
@PostMapping("/api/inventory/deduct")
void deductInventory(@RequestParam("productId") Long productId,
@RequestParam("quantity") Integer quantity);
@PostMapping("/api/inventory/restore")
void restoreInventory(@RequestParam("productId") Long productId,
@RequestParam("quantity") Integer quantity);
}
@FeignClient(name = "account-service", url = "${service.account.url}")
public interface AccountClient {
@PostMapping("/api/accounts/deduct")
void deductBalance(@RequestParam("userId") Long userId,
@RequestParam("amount") BigDecimal amount);
@PostMapping("/api/accounts/restore")
void restoreBalance(@RequestParam("userId") Long userId,
@RequestParam("amount") BigDecimal amount);
}
# application-seata.yml
seata:
enabled: true
application-id: order-service
tx-service-group: toolsku-tx-group
service:
vgroup-mapping:
toolsku-tx-group: default
grouplist:
default: 127.0.0.1:8091
config:
type: nacos
nacos:
server-addr: 127.0.0.1:8848
namespace: seata
group: SEATA_GROUP
registry:
type: nacos
nacos:
server-addr: 127.0.0.1:8848
namespace: seata
group: SEATA_GROUP
client:
undo:
data-validation: true
log-serialization: jackson
log-table: undo_log
lock:
retry-interval: 10
retry-times: 30
retry-policy-branch-rollback-on-conflict: true
-- Seata AT模式用undo_log表(每個服務資料庫都需要一個)
CREATE TABLE IF NOT EXISTS undo_log (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
branch_id BIGINT NOT NULL,
xid VARCHAR(128) NOT NULL,
context VARCHAR(128) NOT NULL,
rollback_info LONGBLOB NOT NULL,
log_status INT NOT NULL,
log_created DATETIME NOT NULL,
log_modified DATETIME NOT NULL,
UNIQUE KEY ux_undo_log (xid, branch_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
模式4:訊息最終一致性(交易性發件箱 + RocketMQ)
-- 交易性發件箱表
CREATE TABLE IF NOT EXISTS transactional_outbox (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
aggregate_id VARCHAR(128) NOT NULL,
aggregate_type VARCHAR(64) NOT NULL,
event_type VARCHAR(128) NOT NULL,
payload JSON NOT NULL,
status VARCHAR(32) NOT NULL DEFAULT 'PENDING',
retry_count INT NOT NULL DEFAULT 0,
max_retry INT NOT NULL DEFAULT 5,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_status_created (status, created_at),
UNIQUE KEY uk_aggregate_event (aggregate_id, aggregate_type, event_type)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
package com.toolsku.outbox;
import lombok.Data;
import java.time.LocalDateTime;
@Data
public class OutboxEvent {
private Long id;
private String aggregateId;
private String aggregateType;
private String eventType;
private String payload;
private String status;
private Integer retryCount;
private Integer maxRetry;
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
}
package com.toolsku.outbox;
import org.apache.ibatis.annotations.*;
import java.util.List;
@Mapper
public interface OutboxMapper {
@Insert("INSERT INTO transactional_outbox (aggregate_id, aggregate_type, event_type, payload, status) " +
"VALUES (#{aggregateId}, #{aggregateType}, #{eventType}, #{payload}, 'PENDING')")
@Options(useGeneratedKeys = true, keyProperty = "id")
int insert(OutboxEvent event);
@Select("SELECT * FROM transactional_outbox WHERE status = 'PENDING' AND retry_count < max_retry " +
"ORDER BY created_at ASC LIMIT #{limit}")
List<OutboxEvent> findPendingEvents(@Param("limit") int limit);
@Update("UPDATE transactional_outbox SET status = #{status}, retry_count = retry_count + 1, " +
"updated_at = NOW() WHERE id = #{id}")
int updateStatus(@Param("id") Long id, @Param("status") String status);
@Update("UPDATE transactional_outbox SET status = 'SENT', updated_at = NOW() WHERE id = #{id}")
int markAsSent(@Param("id") Long id);
@Update("UPDATE transactional_outbox SET status = 'FAILED', updated_at = NOW() " +
"WHERE id = #{id} AND retry_count >= max_retry")
int markAsFailed(@Param("id") Long id);
}
package com.toolsku.outbox;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
@Slf4j
@Component
@RequiredArgsConstructor
public class OutboxRelayScheduler {
private final OutboxMapper outboxMapper;
private final RocketMQTemplate rocketMQTemplate;
private static final int BATCH_SIZE = 50;
@Scheduled(fixedDelay = 1000)
public void relayPendingEvents() {
List<OutboxEvent> events = outboxMapper.findPendingEvents(BATCH_SIZE);
if (events.isEmpty()) {
return;
}
for (OutboxEvent event : events) {
try {
String topic = String.format("toolsku-%s-topic", event.getAggregateType());
String key = event.getAggregateId();
rocketMQTemplate.syncSend(topic,
rocketMQTemplate.getMessageConverter().toMessage(event.getPayload(), null, key));
outboxMapper.markAsSent(event.getId());
log.info("Outbox event sent: id={}, type={}, aggregateId={}",
event.getId(), event.getEventType(), event.getAggregateId());
} catch (Exception e) {
outboxMapper.updateStatus(event.getId(), "PENDING");
outboxMapper.markAsFailed(event.getId());
log.error("Outbox event send failed: id={}, error={}", event.getId(), e.getMessage(), e);
}
}
}
}
package com.toolsku.outbox;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.UUID;
@Slf4j
@Service
@RequiredArgsConstructor
public class OrderEventService {
private final OutboxMapper outboxMapper;
private final OrderMapper orderMapper;
private final ObjectMapper objectMapper;
@Transactional(rollbackFor = Exception.class)
public String createOrderWithOutbox(Long userId, Long productId, Integer quantity, BigDecimal amount) {
String orderNo = "ORD" + UUID.randomUUID().toString().replace("-", "").substring(0, 16).toUpperCase();
orderMapper.insertOrder(orderNo, userId, productId, quantity, amount, "CREATED");
try {
String payload = objectMapper.writeValueAsString(
new OrderCreatedEvent(orderNo, userId, productId, quantity, amount));
OutboxEvent event = new OutboxEvent();
event.setAggregateId(orderNo);
event.setAggregateType("order");
event.setEventType("ORDER_CREATED");
event.setPayload(payload);
outboxMapper.insert(event);
} catch (Exception e) {
throw new RuntimeException("Failed to serialize order event", e);
}
log.info("Order created with outbox event: orderNo={}", orderNo);
return orderNo;
}
}
package com.toolsku.consumer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(topic = "toolsku-order-topic", consumerGroup = "inventory-consumer-group")
public class InventoryOrderConsumer implements RocketMQListener<String> {
private final InventoryService inventoryService;
private final IdempotentRecordService idempotentRecordService;
@Override
public void onMessage(String message) {
OrderCreatedEvent event = parseEvent(message);
String idempotentKey = "INVENTORY_DEDUCT:" + event.getOrderNo();
if (idempotentRecordService.isProcessed(idempotentKey)) {
log.info("Duplicate message skipped: orderNo={}", event.getOrderNo());
return;
}
try {
inventoryService.deductInventory(event.getProductId(), event.getQuantity());
idempotentRecordService.markProcessed(idempotentKey);
log.info("Inventory deducted for order: orderNo={}", event.getOrderNo());
} catch (Exception e) {
log.error("Inventory deduction failed: orderNo={}, error={}",
event.getOrderNo(), e.getMessage(), e);
throw new RuntimeException("Inventory deduction failed", e);
}
}
}
模式5:生產級可靠性保障
package com.toolsku.reliability;
import lombok.Data;
import org.apache.ibatis.annotations.*;
import java.time.LocalDateTime;
@Data
public class IdempotentRecord {
private Long id;
private String idempotentKey;
private String status;
private LocalDateTime createdAt;
private LocalDateTime expireAt;
}
@Mapper
public interface IdempotentRecordMapper {
@Insert("INSERT INTO idempotent_record (idempotent_key, status, created_at, expire_at) " +
"VALUES (#{idempotentKey}, 'PROCESSING', NOW(), DATE_ADD(NOW(), INTERVAL 24 HOUR)) " +
"ON DUPLICATE KEY UPDATE idempotent_key = idempotent_key")
int tryInsert(@Param("idempotentKey") String idempotentKey);
@Select("SELECT status FROM idempotent_record WHERE idempotent_key = #{idempotentKey}")
String getStatus(@Param("idempotentKey") String idempotentKey);
@Update("UPDATE idempotent_record SET status = 'PROCESSED' WHERE idempotent_key = #{idempotentKey}")
int markProcessed(@Param("idempotentKey") String idempotentKey);
@Delete("DELETE FROM idempotent_record WHERE expire_at < NOW()")
int cleanExpired();
}
package com.toolsku.reliability;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RequiredArgsConstructor
public class IdempotentRecordService {
private final IdempotentRecordMapper idempotentRecordMapper;
public boolean isProcessed(String idempotentKey) {
String status = idempotentRecordMapper.getStatus(idempotentKey);
return "PROCESSED".equals(status);
}
public boolean tryAcquire(String idempotentKey) {
int rows = idempotentRecordMapper.tryInsert(idempotentKey);
if (rows == 0) {
String status = idempotentRecordMapper.getStatus(idempotentKey);
return "PROCESSED".equals(status);
}
return true;
}
public void markProcessed(String idempotentKey) {
idempotentRecordMapper.markProcessed(idempotentKey);
}
}
package com.toolsku.reliability;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.*;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import jakarta.servlet.http.HttpServletRequest;
import java.lang.reflect.Method;
@Slf4j
@Component
@Aspect
public class IdempotentAspect {
private final IdempotentRecordService idempotentRecordService;
public IdempotentAspect(IdempotentRecordService idempotentRecordService) {
this.idempotentRecordService = idempotentRecordService;
}
@Around("@annotation(com.toolsku.reliability.Idempotent)")
public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();
Idempotent annotation = method.getAnnotation(Idempotent.class);
String idempotentKey = resolveKey(annotation);
if (idempotentKey == null) {
return joinPoint.proceed();
}
if (idempotentRecordService.isProcessed(idempotentKey)) {
log.info("Idempotent check: already processed, key={}", idempotentKey);
return null;
}
if (!idempotentRecordService.tryAcquire(idempotentKey)) {
log.warn("Idempotent check: concurrent processing, key={}", idempotentKey);
throw new RuntimeException("Concurrent request detected");
}
try {
Object result = joinPoint.proceed();
idempotentRecordService.markProcessed(idempotentKey);
return result;
} catch (Exception e) {
log.error("Idempotent execution failed: key={}", idempotentKey, e);
throw e;
}
}
private String resolveKey(Idempotent annotation) {
ServletRequestAttributes attributes =
(ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
if (attributes == null) {
return null;
}
HttpServletRequest request = attributes.getRequest();
String header = request.getHeader(annotation.headerKey());
return header != null ? annotation.prefix() + header : null;
}
}
package com.toolsku.reliability;
import java.lang.annotation.*;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Idempotent {
String headerKey() default "X-Idempotent-Key";
String prefix() default "IDEMPOTENT:";
}
package com.toolsku.reliability;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.*;
@Slf4j
@Component
public class TransactionTimeoutGuard {
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
private final ConcurrentHashMap<String, TimeoutTask> runningTasks = new ConcurrentHashMap<>();
public void register(String xid, long timeoutMillis, Runnable onTimeout) {
ScheduledFuture<?> future = scheduler.schedule(() -> {
log.warn("Transaction timeout triggered: xid={}", xid);
runningTasks.remove(xid);
try {
onTimeout.run();
} catch (Exception e) {
log.error("Timeout callback failed: xid={}", xid, e);
}
}, timeoutMillis, TimeUnit.MILLISECONDS);
runningTasks.put(xid, new TimeoutTask(xid, future, timeoutMillis));
}
public void cancel(String xid) {
TimeoutTask task = runningTasks.remove(xid);
if (task != null) {
task.getFuture().cancel(false);
log.info("Transaction timeout guard cancelled: xid={}", xid);
}
}
public long getRemainingTime(String xid) {
TimeoutTask task = runningTasks.get(xid);
if (task == null) {
return -1;
}
long elapsed = System.currentTimeMillis() - task.getStartTime();
return Math.max(0, task.getTimeoutMillis() - elapsed);
}
@lombok.Data
@lombok.AllArgsConstructor
private static class TimeoutTask {
private String xid;
private ScheduledFuture<?> future;
private long timeoutMillis;
private long startTime = System.currentTimeMillis();
}
}
package com.toolsku.reliability;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@Component
public class TransactionMetrics {
private final Counter sagaSuccessCounter;
private final Counter sagaFailureCounter;
private final Counter tccConfirmCounter;
private final Counter tccCancelCounter;
private final Counter seataCommitCounter;
private final Counter seataRollbackCounter;
private final Timer sagaExecutionTimer;
private final AtomicInteger activeTransactionGauge;
public TransactionMetrics(MeterRegistry registry) {
this.sagaSuccessCounter = Counter.builder("transaction.saga.success")
.description("Saga transaction success count").register(registry);
this.sagaFailureCounter = Counter.builder("transaction.saga.failure")
.description("Saga transaction failure count").register(registry);
this.tccConfirmCounter = Counter.builder("transaction.tcc.confirm")
.description("TCC confirm count").register(registry);
this.tccCancelCounter = Counter.builder("transaction.tcc.cancel")
.description("TCC cancel count").register(registry);
this.seataCommitCounter = Counter.builder("transaction.seata.commit")
.description("Seata commit count").register(registry);
this.seataRollbackCounter = Counter.builder("transaction.seata.rollback")
.description("Seata rollback count").register(registry);
this.sagaExecutionTimer = Timer.builder("transaction.saga.duration")
.description("Saga execution duration").register(registry);
this.activeTransactionGauge = registry.gauge("transaction.active.count",
new AtomicInteger(0));
}
public void recordSagaSuccess(long durationMs) {
sagaSuccessCounter.increment();
sagaExecutionTimer.record(durationMs, TimeUnit.MILLISECONDS);
}
public void recordSagaFailure(long durationMs) {
sagaFailureCounter.increment();
sagaExecutionTimer.record(durationMs, TimeUnit.MILLISECONDS);
}
public void recordTccConfirm() { tccConfirmCounter.increment(); }
public void recordTccCancel() { tccCancelCounter.increment(); }
public void recordSeataCommit() { seataCommitCounter.increment(); }
public void recordSeataRollback() { seataRollbackCounter.increment(); }
public void incrementActive() { activeTransactionGauge.incrementAndGet(); }
public void decrementActive() { activeTransactionGauge.decrementAndGet(); }
public int getActiveTransactionCount() { return activeTransactionGauge.get(); }
}
避坑指南
坑1:Saga補償操作不是回滾
// ❌ 錯誤:補償操作試圖物理刪除資料
public boolean compensateOrder(String orderNo) {
return orderMapper.deleteById(orderNo) > 0; // 物理刪除,稽核資料遺失
}
// ✅ 正確:補償操作是語義上的反向操作
public boolean compensateOrder(String orderNo) {
return orderMapper.updateStatus(orderNo, "CANCELLED") > 0; // 狀態變更,保留記錄
}
坑2:TCC的空回滾與懸掛
// ❌ 錯誤:Try未執行但Cancel被呼叫,Cancel直接操作資料
public boolean cancelFreeze(String xid, Long userId, BigDecimal amount) {
return accountMapper.cancelFreeze(userId, amount) > 0; // 空回滾:凍結記錄不存在
}
// ✅ 正確:檢查Try是否執行過
public boolean cancelFreeze(String xid, Long userId, BigDecimal amount, String branchId) {
String status = accountTccMapper.getFreezeStatus(xid, branchId);
if (status == null) {
// 空回滾:Try未執行,插入一條記錄防止懸掛
accountTccMapper.insertFreezeRecord(xid, userId, amount, branchId);
accountTccMapper.updateFreezeStatus(xid, branchId, "CANCELLED");
log.warn("TCC empty rollback: xid={}, branchId={}", xid, branchId);
return true;
}
if ("CANCELLED".equals(status) || "CONFIRMED".equals(status)) {
return true; // 冪等:已處理
}
return accountTccMapper.cancelFreeze(userId, amount) > 0;
}
坑3:Seata全域鎖導致死鎖
// ❌ 錯誤:全域交易中巢狀查詢同一行資料
@GlobalTransactional
public void processOrder(String orderNo) {
orderMapper.updateStatus(orderNo, "PROCESSING"); // 取得全域鎖
Order order = orderMapper.selectByOrderNo(orderNo); // 再次查詢同一行
// 如果另一個全域交易也持有該行鎖,互相等待→死鎖
}
// ✅ 正確:減少全域鎖持有時間,避免交叉更新
@GlobalTransactional(timeoutMills = 30000)
public void processOrder(String orderNo) {
Order order = orderMapper.selectByOrderNo(orderNo); // 先查詢
orderMapper.updateStatus(orderNo, "PROCESSING"); // 再更新
// 或拆分為多個短交易
}
坑4:發件箱與業務不在同一交易
// ❌ 錯誤:先發訊息再寫資料庫,訊息發了但資料庫寫失敗
public void createOrder(Order order) {
rocketMQTemplate.convertAndSend("order-topic", orderEvent);
orderMapper.insert(order); // 如果這裡失敗,訊息已發出無法撤回
}
// ✅ 正確:發件箱模式,業務操作和事件記錄在同一交易
@Transactional
public void createOrder(Order order) {
orderMapper.insert(order);
outboxMapper.insert(buildOutboxEvent(order)); // 同一交易保證原子性
}
坑5:訊息消費端不冪等
// ❌ 錯誤:直接扣減,重複消費導致重複扣款
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "account-group")
public class AccountConsumer implements RocketMQListener<String> {
public void onMessage(String message) {
OrderEvent event = parse(message);
accountService.deductBalance(event.getUserId(), event.getAmount()); // 重複消費!
}
}
// ✅ 正確:消費端冪等校驗
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "account-group")
public class AccountConsumer implements RocketMQListener<String> {
public void onMessage(String message) {
OrderEvent event = parse(message);
String key = "DEDUCT:" + event.getOrderNo();
if (idempotentRecordService.isProcessed(key)) {
return; // 冪等跳過
}
accountService.deductBalance(event.getUserId(), event.getAmount());
idempotentRecordService.markProcessed(key);
}
}
報錯排查
| 序號 | 報錯資訊 | 原因 | 解決方法 |
|---|---|---|---|
| 1 | Saga compensation failed for step |
補償操作拋異常,可能資料已被手動修改 | 增加補償重試,記錄補償失敗事件人工介入 |
| 2 | TCC empty rollback detected |
Try未執行但Cancel被呼叫(網路超時) | 插入空回滾記錄防止懸掛,Cancel冪等 |
| 3 | Seata global lock conflict |
多個全域交易競爭同一行鎖 | 縮小交易範圍,降低隔離級別,增加鎖重試 |
| 4 | Seata undo_log not found |
分支交易回滾時找不到undo日誌 | 檢查undo_log表是否在每個庫建立,資料來源代理是否正確 |
| 5 | RocketMQ send timeout |
訊息傳送超時,Broker不可達 | 檢查Broker狀態,增加傳送超時時間,使用發件箱模式 |
| 6 | Message consumption duplicate |
訊息重複消費 | 消費端實作冪等,使用唯一業務鍵去重 |
| 7 | Outbox event stuck in PENDING |
發件箱事件未被relay排程器消費 | 檢查排程器執行狀態,確認RocketMQ連線正常 |
| 8 | Global transaction timeout |
全域交易執行超時 | 增大timeoutMills,最佳化慢SQL,拆分長交易 |
| 9 | TCC resource suspended |
Cancel先於Try執行,Try後續到達 | 空回滾記錄攔截Try執行,檢查超時配置 |
| 10 | Compensation circular dependency |
多個Saga補償互相依賴形成環 | 設計單向補償鏈,避免循環補償 |
進階最佳化
1. Saga狀態機持久化
package com.toolsku.saga;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.annotations.*;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Slf4j
@Service
@RequiredArgsConstructor
public class SagaStateService {
private final SagaStateMapper sagaStateMapper;
@Transactional(rollbackFor = Exception.class)
public void persistSagaState(SagaDefinition saga) {
String stateJson = serializeSagaState(saga);
sagaStateMapper.upsert(saga.getSagaId(), stateJson, saga.getStatus().name());
}
public SagaDefinition recoverSaga(String sagaId) {
String stateJson = sagaStateMapper.getState(sagaId);
if (stateJson == null) {
return null;
}
return deserializeSagaState(stateJson);
}
private String serializeSagaState(SagaDefinition saga) {
StringBuilder sb = new StringBuilder();
sb.append("{\"sagaId\":\"").append(saga.getSagaId()).append("\"");
sb.append(",\"currentStep\":").append(saga.getCurrentStep());
sb.append(",\"status\":\"").append(saga.getStatus().name()).append("\"");
sb.append(",\"steps\":[");
for (int i = 0; i < saga.getSteps().size(); i++) {
if (i > 0) sb.append(",");
SagaDefinition.SagaStep step = saga.getSteps().get(i);
sb.append("{\"name\":\"").append(step.getName()).append("\"");
sb.append(",\"status\":\"").append(step.getStepStatus().name()).append("\"}");
}
sb.append("]}");
return sb.toString();
}
private SagaDefinition deserializeSagaState(String json) {
throw new UnsupportedOperationException("Use Jackson for deserialization");
}
}
@Mapper
interface SagaStateMapper {
@Insert("INSERT INTO saga_state (saga_id, state_json, status, created_at, updated_at) " +
"VALUES (#{sagaId}, #{stateJson}, #{status}, NOW(), NOW()) " +
"ON DUPLICATE KEY UPDATE state_json = #{stateJson}, status = #{status}, updated_at = NOW()")
int upsert(@Param("sagaId") String sagaId,
@Param("stateJson") String stateJson,
@Param("status") String status);
@Select("SELECT state_json FROM saga_state WHERE saga_id = #{sagaId}")
String getState(@Param("sagaId") String sagaId);
}
2. TCC分支交易註冊中心
package com.toolsku.tcc;
import lombok.Data;
import org.apache.ibatis.annotations.*;
import java.time.LocalDateTime;
@Data
public class TccBranchTransaction {
private Long id;
private String xid;
private String branchId;
private String serviceName;
private String tryMethod;
private String confirmMethod;
private String cancelMethod;
private String paramJson;
private String status;
private LocalDateTime createdAt;
}
@Mapper
interface TccBranchTransactionMapper {
@Insert("INSERT INTO tcc_branch_transaction (xid, branch_id, service_name, try_method, " +
"confirm_method, cancel_method, param_json, status, created_at) " +
"VALUES (#{xid}, #{branchId}, #{serviceName}, #{tryMethod}, #{confirmMethod}, " +
"#{cancelMethod}, #{paramJson}, #{status}, NOW())")
int insert(TccBranchTransaction branch);
@Select("SELECT * FROM tcc_branch_transaction WHERE xid = #{xid} ORDER BY created_at ASC")
java.util.List<TccBranchTransaction> findByXid(@Param("xid") String xid);
@Update("UPDATE tcc_branch_transaction SET status = #{status} WHERE xid = #{xid} AND branch_id = #{branchId}")
int updateStatus(@Param("xid") String xid, @Param("branchId") String branchId, @Param("status") String status);
}
3. 分散式交易監控大盤
package com.toolsku.reliability;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.concurrent.atomic.AtomicLong;
@Slf4j
@Component
@RequiredArgsConstructor
public class TransactionMonitor {
private final TransactionMetrics transactionMetrics;
private final AlertService alertService;
private final AtomicLong lastSagaFailureCount = new AtomicLong(0);
private final AtomicLong lastSeataRollbackCount = new AtomicLong(0);
@Scheduled(fixedRate = 60000)
public void checkTransactionHealth() {
double sagaFailureRate = calculateSagaFailureRate();
if (sagaFailureRate > 0.1) {
alertService.sendAlert("Saga failure rate too high: " + (sagaFailureRate * 100) + "%");
}
int activeCount = transactionMetrics.getActiveTransactionCount();
if (activeCount > 500) {
alertService.sendAlert("Too many active transactions: " + activeCount);
}
log.info("Transaction health check: sagaFailureRate={:.2f}%, activeTransactions={}",
sagaFailureRate * 100, activeCount);
}
private double calculateSagaFailureRate() {
return 0.0;
}
}
對比分析
| 維度 | Saga編排 | TCC模式 | Seata AT | 訊息最終一致性 | 生產級可靠性 |
|---|---|---|---|---|---|
| 一致性模型 | 最終一致 | 最終一致 | 強一致(全域鎖) | 最終一致 | 最終一致 |
| 侵入性 | 低(補償邏輯) | 高(Try/Confirm/Cancel) | 低(自動代理) | 中(發件箱+冪等) | 高(全套保障) |
| 效能 | ⭐⭐⭐⭐高 | ⭐⭐⭐中 | ⭐⭐低(全域鎖) | ⭐⭐⭐⭐⭐最高 | ⭐⭐⭐中 |
| 實作複雜度 | ⭐⭐中 | ⭐⭐⭐⭐高 | ⭐低(框架承擔) | ⭐⭐⭐中 | ⭐⭐⭐⭐⭐極高 |
| 隔離性 | ❌無隔離 | ✅Try階段隔離 | ✅全域鎖隔離 | ❌無隔離 | ✅可選隔離 |
| 補償/回滾 | ✅補償操作 | ✅Cancel操作 | ✅自動回滾 | ✅重試+人工 | ✅多級保障 |
| 適用場景 | 長流程編排 | 資金/庫存核心 | 傳統微服務 | 非同步解耦場景 | 核心交易鏈路 |
| 框架依賴 | 自研/Seata Saga | 自研/Seata TCC | Seata | RocketMQ/Kafka | 全棧組合 |
總結:分散式交易沒有銀彈。Saga適合長流程編排,補償語義清晰但無隔離;TCC適合資金核心鏈路,隔離性強但實作成本高;Seata AT適合快速接入,低侵入但全域鎖是效能瓶頸;訊息最終一致性適合非同步解耦,效能最高但需要冪等和發件箱。生產環境的核心鏈路,建議TCC + 訊息最終一致性 + 冪等 + 監控組合使用,寧可多寫一倍程式碼,也不要在線上排查資料不一致。
線上工具推薦
- JSON格式化:/zh-TW/json/format
- Hash計算:/zh-TW/encode/hash
- cURL轉程式碼:/zh-TW/dev/curl-to-code
本站提供瀏覽器本地工具,免註冊即可試用 →
#分布式事务#Saga模式#TCC#Seata#消息最终一致性#2026#技术架构