分布式事务模式实战:从Saga到TCC的5种生产模式
技术架构
分布式事务:微服务架构下最难的工程问题
下单扣库存扣余额,三个服务三个库,库存扣了余额没扣——这种数据不一致在生产环境每天都在发生。你用本地事务,跨服务根本管不了;你加分布式锁,锁超时了事务还没提交;你上Seata,发现全局锁把并发打回单线程;你用消息最终一致性,消息丢了补偿逻辑又写了一堆。2026年,分布式事务依然是微服务架构中最容易翻车的环节。
本文将从5种生产模式出发,带你完成Saga编排→TCC模式→Seata AT→消息最终一致性→生产级可靠性保障的全链路实战,每一步都有完整Java/Spring Boot代码和避坑指南。
分布式事务核心概念
| 概念 | 说明 |
|---|---|
| 本地事务 | 单数据源ACID事务,无法跨服务保证一致性 |
| 2PC(两阶段提交) | 协调者统一prepare/commit,同步阻塞,性能差 |
| 3PC(三阶段提交) | 增加CanCommit阶段,降低阻塞但实现复杂 |
| Saga模式 | 长事务拆分为多个本地事务,失败时执行补偿操作 |
| TCC模式 | Try-Confirm-Cancel三步操作,业务层面保证一致性 |
| Seata AT模式 | 自动拦截SQL生成回滚日志,无侵入式分布式事务 |
| 消息最终一致性 | 基于消息队列的异步保证,最终状态一致而非实时 |
| 事务性发件箱 | 业务操作与消息发送在同一事务中,避免消息丢失 |
| 全局锁 | Seata中全局事务持有的行锁,防止脏写 |
| 幂等性 | 同一操作执行多次结果一致,分布式事务的核心保障 |
问题分析:分布式事务的5大挑战
- 跨服务数据一致性:订单服务创建订单、库存服务扣减库存、账户服务扣减余额,任一步失败需全部回滚
- 补偿操作的原子性:Saga补偿本身也可能失败,补偿失败怎么办
- 全局锁与并发冲突:Seata全局锁导致性能退化,高并发场景下锁等待超时
- 消息丢失与重复消费:消息发送成功但消费端宕机,或消息重复消费导致重复扣款
- 超时与悬挂问题:TCC的Try超时后Cancel已执行,后续Confirm到达变成悬挂
分步实操:5种分布式事务实现
模式1:Saga编排模式(中央协调器)
package com.toolsku.saga;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.function.Supplier;
@Data
@Slf4j
public class SagaDefinition {
private String sagaId;
private List<SagaStep> steps = new ArrayList<>();
private int currentStep = 0;
private SagaStatus status = SagaStatus.PENDING;
public enum SagaStatus {
PENDING, RUNNING, COMPENSATING, COMPLETED, FAILED
}
@Data
public static class SagaStep {
private String name;
private Supplier<Boolean> action;
private Supplier<Boolean> compensation;
private StepStatus stepStatus = StepStatus.PENDING;
public enum StepStatus {
PENDING, EXECUTING, COMPLETED, COMPENSATING, COMPENSATED, FAILED
}
}
public static SagaBuilder builder() {
return new SagaBuilder();
}
public static class SagaBuilder {
private final SagaDefinition saga = new SagaDefinition();
public SagaBuilder step(String name, Supplier<Boolean> action, Supplier<Boolean> compensation) {
SagaStep step = new SagaStep();
step.setName(name);
step.setAction(action);
step.setCompensation(compensation);
saga.getSteps().add(step);
return this;
}
public SagaDefinition build() {
saga.setSagaId(UUID.randomUUID().toString());
return saga;
}
}
}
package com.toolsku.saga;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class SagaOrchestrator {
public SagaDefinition execute(SagaDefinition saga) {
saga.setStatus(SagaDefinition.SagaStatus.RUNNING);
log.info("Saga [{}] started, total steps: {}", saga.getSagaId(), saga.getSteps().size());
for (int i = 0; i < saga.getSteps().size(); i++) {
saga.setCurrentStep(i);
SagaDefinition.SagaStep step = saga.getSteps().get(i);
step.setStepStatus(SagaDefinition.SagaStep.StepStatus.EXECUTING);
try {
Boolean result = step.getAction().get();
if (result == null || !result) {
log.error("Saga [{}] step [{}] action failed", saga.getSagaId(), step.getName());
step.setStepStatus(SagaDefinition.SagaStep.StepStatus.FAILED);
return compensate(saga);
}
step.setStepStatus(SagaDefinition.SagaStep.StepStatus.COMPLETED);
log.info("Saga [{}] step [{}] completed", saga.getSagaId(), step.getName());
} catch (Exception e) {
log.error("Saga [{}] step [{}] exception: {}", saga.getSagaId(), step.getName(), e.getMessage(), e);
step.setStepStatus(SagaDefinition.SagaStep.StepStatus.FAILED);
return compensate(saga);
}
}
saga.setStatus(SagaDefinition.SagaStatus.COMPLETED);
log.info("Saga [{}] completed successfully", saga.getSagaId());
return saga;
}
private SagaDefinition compensate(SagaDefinition saga) {
saga.setStatus(SagaDefinition.SagaStatus.COMPENSATING);
log.info("Saga [{}] compensating from step [{}]", saga.getSagaId(), saga.getCurrentStep());
for (int i = saga.getCurrentStep(); i >= 0; i--) {
SagaDefinition.SagaStep step = saga.getSteps().get(i);
if (step.getStepStatus() != SagaDefinition.SagaStep.StepStatus.COMPLETED) {
continue;
}
step.setStepStatus(SagaDefinition.SagaStep.StepStatus.COMPENSATING);
try {
Boolean result = step.getCompensation().get();
if (result != null && result) {
step.setStepStatus(SagaDefinition.SagaStep.StepStatus.COMPENSATED);
log.info("Saga [{}] step [{}] compensated", saga.getSagaId(), step.getName());
} else {
step.setStepStatus(SagaDefinition.SagaStep.StepStatus.FAILED);
log.error("Saga [{}] step [{}] compensation failed", saga.getSagaId(), step.getName());
}
} catch (Exception e) {
step.setStepStatus(SagaDefinition.SagaStep.StepStatus.FAILED);
log.error("Saga [{}] step [{}] compensation exception: {}", saga.getSagaId(), step.getName(), e.getMessage(), e);
}
}
saga.setStatus(SagaDefinition.SagaStatus.FAILED);
return saga;
}
}
package com.toolsku.saga;
import com.toolsku.service.OrderService;
import com.toolsku.service.InventoryService;
import com.toolsku.service.AccountService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RequiredArgsConstructor
public class OrderSagaService {
private final SagaOrchestrator sagaOrchestrator;
private final OrderService orderService;
private final InventoryService inventoryService;
private final AccountService accountService;
public SagaDefinition createOrder(Long userId, Long productId, Integer quantity, BigDecimal amount) {
SagaDefinition saga = SagaDefinition.builder()
.step("createOrder",
() -> orderService.createOrder(userId, productId, quantity, amount),
() -> orderService.cancelOrder(userId, productId))
.step("deductInventory",
() -> inventoryService.deductInventory(productId, quantity),
() -> inventoryService.restoreInventory(productId, quantity))
.step("deductAccount",
() -> accountService.deductBalance(userId, amount),
() -> accountService.restoreBalance(userId, amount))
.build();
return sagaOrchestrator.execute(saga);
}
}
模式2:TCC模式(Try-Confirm-Cancel)
package com.toolsku.tcc;
import lombok.Data;
import java.math.BigDecimal;
@Data
public class AccountTccRequest {
private String xid;
private Long userId;
private BigDecimal amount;
private String branchId;
}
package com.toolsku.tcc;
import org.apache.ibatis.annotations.*;
@Mapper
public interface AccountTccMapper {
@Insert("INSERT INTO account_tcc_freeze (xid, user_id, amount, status, branch_id, created_at) " +
"VALUES (#{xid}, #{userId}, #{amount}, 'TRYING', #{branchId}, NOW())")
int insertFreezeRecord(@Param("xid") String xid,
@Param("userId") Long userId,
@Param("amount") BigDecimal amount,
@Param("branchId") String branchId);
@Update("UPDATE account SET balance = balance - #{amount}, frozen = frozen + #{amount} " +
"WHERE user_id = #{userId} AND balance >= #{amount}")
int freezeBalance(@Param("userId") Long userId, @Param("amount") BigDecimal amount);
@Update("UPDATE account SET frozen = frozen - #{amount} " +
"WHERE user_id = #{userId} AND frozen >= #{amount}")
int confirmDeduct(@Param("userId") Long userId, @Param("amount") BigDecimal amount);
@Update("UPDATE account SET balance = balance + #{amount}, frozen = frozen - #{amount} " +
"WHERE user_id = #{userId} AND frozen >= #{amount}")
int cancelFreeze(@Param("userId") Long userId, @Param("amount") BigDecimal amount);
@Select("SELECT COUNT(*) FROM account_tcc_freeze WHERE xid = #{xid} AND branch_id = #{branchId}")
int countFreezeRecord(@Param("xid") String xid, @Param("branchId") String branchId);
@Update("UPDATE account_tcc_freeze SET status = #{status} WHERE xid = #{xid} AND branch_id = #{branchId}")
int updateFreezeStatus(@Param("xid") String xid,
@Param("branchId") String branchId,
@Param("status") String status);
@Select("SELECT status FROM account_tcc_freeze WHERE xid = #{xid} AND branch_id = #{branchId}")
String getFreezeStatus(@Param("xid") String xid, @Param("branchId") String branchId);
}
package com.toolsku.tcc;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.math.BigDecimal;
@Slf4j
@Service
@RequiredArgsConstructor
public class AccountTccService {
private final AccountTccMapper accountTccMapper;
@Transactional(rollbackFor = Exception.class)
public boolean tryDeduct(String xid, Long userId, BigDecimal amount, String branchId) {
if (accountTccMapper.countFreezeRecord(xid, branchId) > 0) {
log.info("TCC try already executed: xid={}, branchId={}", xid, branchId);
return true;
}
int rows = accountTccMapper.freezeBalance(userId, amount);
if (rows == 0) {
log.warn("TCC try failed: insufficient balance, userId={}, amount={}", userId, amount);
return false;
}
accountTccMapper.insertFreezeRecord(xid, userId, amount, branchId);
log.info("TCC try success: xid={}, userId={}, amount={}", xid, userId, amount);
return true;
}
@Transactional(rollbackFor = Exception.class)
public boolean confirmDeduct(String xid, Long userId, BigDecimal amount, String branchId) {
String status = accountTccMapper.getFreezeStatus(xid, branchId);
if ("CONFIRMED".equals(status)) {
log.info("TCC confirm already executed: xid={}, branchId={}", xid, branchId);
return true;
}
int rows = accountTccMapper.confirmDeduct(userId, amount);
if (rows == 0) {
log.error("TCC confirm failed: frozen amount mismatch, userId={}, amount={}", userId, amount);
return false;
}
accountTccMapper.updateFreezeStatus(xid, branchId, "CONFIRMED");
log.info("TCC confirm success: xid={}, userId={}, amount={}", xid, userId, amount);
return true;
}
@Transactional(rollbackFor = Exception.class)
public boolean cancelFreeze(String xid, Long userId, BigDecimal amount, String branchId) {
String status = accountTccMapper.getFreezeStatus(xid, branchId);
if ("CANCELLED".equals(status)) {
log.info("TCC cancel already executed: xid={}, branchId={}", xid, branchId);
return true;
}
if ("CONFIRMED".equals(status)) {
log.warn("TCC cancel skipped: already confirmed, xid={}, branchId={}", xid, branchId);
return true;
}
int rows = accountTccMapper.cancelFreeze(userId, amount);
if (rows == 0) {
log.error("TCC cancel failed: frozen amount mismatch, userId={}, amount={}", userId, amount);
return false;
}
accountTccMapper.updateFreezeStatus(xid, branchId, "CANCELLED");
log.info("TCC cancel success: xid={}, userId={}, amount={}", xid, userId, amount);
return true;
}
}
package com.toolsku.tcc;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
import java.util.UUID;
@Slf4j
@Service
@RequiredArgsConstructor
public class TccTransactionManager {
private final AccountTccService accountTccService;
public boolean executeDeduct(String xid, Long userId, BigDecimal amount) {
String branchId = UUID.randomUUID().toString();
boolean tryResult = accountTccService.tryDeduct(xid, userId, amount, branchId);
if (!tryResult) {
accountTccService.cancelFreeze(xid, userId, amount, branchId);
return false;
}
boolean confirmResult = accountTccService.confirmDeduct(xid, userId, amount, branchId);
if (!confirmResult) {
accountTccService.cancelFreeze(xid, userId, amount, branchId);
return false;
}
return true;
}
}
模式3:Seata AT模式(自动补偿)
package com.toolsku.seata;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
@Slf4j
@Service
@RequiredArgsConstructor
public class OrderSeataService {
private final OrderClient orderClient;
private final InventoryClient inventoryClient;
private final AccountClient accountClient;
@GlobalTransactional(name = "create-order", rollbackFor = Exception.class, timeoutMills = 60000)
public String createOrder(Long userId, Long productId, Integer quantity, BigDecimal amount) {
log.info("Seata global transaction started: userId={}, productId={}, quantity={}, amount={}",
userId, productId, quantity, amount);
String orderNo = orderClient.createOrder(userId, productId, quantity, amount);
log.info("Step 1: order created, orderNo={}", orderNo);
inventoryClient.deductInventory(productId, quantity);
log.info("Step 2: inventory deducted, productId={}, quantity={}", productId, quantity);
accountClient.deductBalance(userId, amount);
log.info("Step 3: balance deducted, userId={}, amount={}", userId, amount);
return orderNo;
}
}
package com.toolsku.seata;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.*;
@FeignClient(name = "order-service", url = "${service.order.url}")
public interface OrderClient {
@PostMapping("/api/orders")
String createOrder(@RequestParam("userId") Long userId,
@RequestParam("productId") Long productId,
@RequestParam("quantity") Integer quantity,
@RequestParam("amount") BigDecimal amount);
@DeleteMapping("/api/orders/{orderNo}")
void cancelOrder(@PathVariable("orderNo") String orderNo);
}
@FeignClient(name = "inventory-service", url = "${service.inventory.url}")
public interface InventoryClient {
@PostMapping("/api/inventory/deduct")
void deductInventory(@RequestParam("productId") Long productId,
@RequestParam("quantity") Integer quantity);
@PostMapping("/api/inventory/restore")
void restoreInventory(@RequestParam("productId") Long productId,
@RequestParam("quantity") Integer quantity);
}
@FeignClient(name = "account-service", url = "${service.account.url}")
public interface AccountClient {
@PostMapping("/api/accounts/deduct")
void deductBalance(@RequestParam("userId") Long userId,
@RequestParam("amount") BigDecimal amount);
@PostMapping("/api/accounts/restore")
void restoreBalance(@RequestParam("userId") Long userId,
@RequestParam("amount") BigDecimal amount);
}
# application-seata.yml
seata:
enabled: true
application-id: order-service
tx-service-group: toolsku-tx-group
service:
vgroup-mapping:
toolsku-tx-group: default
grouplist:
default: 127.0.0.1:8091
config:
type: nacos
nacos:
server-addr: 127.0.0.1:8848
namespace: seata
group: SEATA_GROUP
registry:
type: nacos
nacos:
server-addr: 127.0.0.1:8848
namespace: seata
group: SEATA_GROUP
client:
undo:
data-validation: true
log-serialization: jackson
log-table: undo_log
lock:
retry-interval: 10
retry-times: 30
retry-policy-branch-rollback-on-conflict: true
-- undo_log table for Seata AT mode (each service database needs one)
CREATE TABLE IF NOT EXISTS undo_log (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
branch_id BIGINT NOT NULL,
xid VARCHAR(128) NOT NULL,
context VARCHAR(128) NOT NULL,
rollback_info LONGBLOB NOT NULL,
log_status INT NOT NULL,
log_created DATETIME NOT NULL,
log_modified DATETIME NOT NULL,
UNIQUE KEY ux_undo_log (xid, branch_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
模式4:消息最终一致性(事务性发件箱 + RocketMQ)
-- Transactional outbox table
CREATE TABLE IF NOT EXISTS transactional_outbox (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
aggregate_id VARCHAR(128) NOT NULL,
aggregate_type VARCHAR(64) NOT NULL,
event_type VARCHAR(128) NOT NULL,
payload JSON NOT NULL,
status VARCHAR(32) NOT NULL DEFAULT 'PENDING',
retry_count INT NOT NULL DEFAULT 0,
max_retry INT NOT NULL DEFAULT 5,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_status_created (status, created_at),
UNIQUE KEY uk_aggregate_event (aggregate_id, aggregate_type, event_type)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
package com.toolsku.outbox;
import lombok.Data;
import java.time.LocalDateTime;
@Data
public class OutboxEvent {
private Long id;
private String aggregateId;
private String aggregateType;
private String eventType;
private String payload;
private String status;
private Integer retryCount;
private Integer maxRetry;
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
}
package com.toolsku.outbox;
import org.apache.ibatis.annotations.*;
import java.util.List;
@Mapper
public interface OutboxMapper {
@Insert("INSERT INTO transactional_outbox (aggregate_id, aggregate_type, event_type, payload, status) " +
"VALUES (#{aggregateId}, #{aggregateType}, #{eventType}, #{payload}, 'PENDING')")
@Options(useGeneratedKeys = true, keyProperty = "id")
int insert(OutboxEvent event);
@Select("SELECT * FROM transactional_outbox WHERE status = 'PENDING' AND retry_count < max_retry " +
"ORDER BY created_at ASC LIMIT #{limit}")
List<OutboxEvent> findPendingEvents(@Param("limit") int limit);
@Update("UPDATE transactional_outbox SET status = #{status}, retry_count = retry_count + 1, " +
"updated_at = NOW() WHERE id = #{id}")
int updateStatus(@Param("id") Long id, @Param("status") String status);
@Update("UPDATE transactional_outbox SET status = 'SENT', updated_at = NOW() WHERE id = #{id}")
int markAsSent(@Param("id") Long id);
@Update("UPDATE transactional_outbox SET status = 'FAILED', updated_at = NOW() " +
"WHERE id = #{id} AND retry_count >= max_retry")
int markAsFailed(@Param("id") Long id);
}
package com.toolsku.outbox;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
@Slf4j
@Component
@RequiredArgsConstructor
public class OutboxRelayScheduler {
private final OutboxMapper outboxMapper;
private final RocketMQTemplate rocketMQTemplate;
private static final int BATCH_SIZE = 50;
@Scheduled(fixedDelay = 1000)
public void relayPendingEvents() {
List<OutboxEvent> events = outboxMapper.findPendingEvents(BATCH_SIZE);
if (events.isEmpty()) {
return;
}
for (OutboxEvent event : events) {
try {
String topic = String.format("toolsku-%s-topic", event.getAggregateType());
String key = event.getAggregateId();
rocketMQTemplate.syncSend(topic,
rocketMQTemplate.getMessageConverter().toMessage(event.getPayload(), null, key));
outboxMapper.markAsSent(event.getId());
log.info("Outbox event sent: id={}, type={}, aggregateId={}",
event.getId(), event.getEventType(), event.getAggregateId());
} catch (Exception e) {
outboxMapper.updateStatus(event.getId(), "PENDING");
outboxMapper.markAsFailed(event.getId());
log.error("Outbox event send failed: id={}, error={}", event.getId(), e.getMessage(), e);
}
}
}
}
package com.toolsku.outbox;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.UUID;
@Slf4j
@Service
@RequiredArgsConstructor
public class OrderEventService {
private final OutboxMapper outboxMapper;
private final OrderMapper orderMapper;
private final ObjectMapper objectMapper;
@Transactional(rollbackFor = Exception.class)
public String createOrderWithOutbox(Long userId, Long productId, Integer quantity, BigDecimal amount) {
String orderNo = "ORD" + UUID.randomUUID().toString().replace("-", "").substring(0, 16).toUpperCase();
orderMapper.insertOrder(orderNo, userId, productId, quantity, amount, "CREATED");
try {
String payload = objectMapper.writeValueAsString(
new OrderCreatedEvent(orderNo, userId, productId, quantity, amount));
OutboxEvent event = new OutboxEvent();
event.setAggregateId(orderNo);
event.setAggregateType("order");
event.setEventType("ORDER_CREATED");
event.setPayload(payload);
outboxMapper.insert(event);
} catch (Exception e) {
throw new RuntimeException("Failed to serialize order event", e);
}
log.info("Order created with outbox event: orderNo={}", orderNo);
return orderNo;
}
}
package com.toolsku.consumer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(topic = "toolsku-order-topic", consumerGroup = "inventory-consumer-group")
public class InventoryOrderConsumer implements RocketMQListener<String> {
private final InventoryService inventoryService;
private final IdempotentRecordService idempotentRecordService;
@Override
public void onMessage(String message) {
OrderCreatedEvent event = parseEvent(message);
String idempotentKey = "INVENTORY_DEDUCT:" + event.getOrderNo();
if (idempotentRecordService.isProcessed(idempotentKey)) {
log.info("Duplicate message skipped: orderNo={}", event.getOrderNo());
return;
}
try {
inventoryService.deductInventory(event.getProductId(), event.getQuantity());
idempotentRecordService.markProcessed(idempotentKey);
log.info("Inventory deducted for order: orderNo={}", event.getOrderNo());
} catch (Exception e) {
log.error("Inventory deduction failed: orderNo={}, error={}",
event.getOrderNo(), e.getMessage(), e);
throw new RuntimeException("Inventory deduction failed", e);
}
}
}
模式5:生产级可靠性保障
package com.toolsku.reliability;
import lombok.Data;
import org.apache.ibatis.annotations.*;
import java.time.LocalDateTime;
@Data
public class IdempotentRecord {
private Long id;
private String idempotentKey;
private String status;
private LocalDateTime createdAt;
private LocalDateTime expireAt;
}
@Mapper
public interface IdempotentRecordMapper {
@Insert("INSERT INTO idempotent_record (idempotent_key, status, created_at, expire_at) " +
"VALUES (#{idempotentKey}, 'PROCESSING', NOW(), DATE_ADD(NOW(), INTERVAL 24 HOUR)) " +
"ON DUPLICATE KEY UPDATE idempotent_key = idempotent_key")
int tryInsert(@Param("idempotentKey") String idempotentKey);
@Select("SELECT status FROM idempotent_record WHERE idempotent_key = #{idempotentKey}")
String getStatus(@Param("idempotentKey") String idempotentKey);
@Update("UPDATE idempotent_record SET status = 'PROCESSED' WHERE idempotent_key = #{idempotentKey}")
int markProcessed(@Param("idempotentKey") String idempotentKey);
@Delete("DELETE FROM idempotent_record WHERE expire_at < NOW()")
int cleanExpired();
}
package com.toolsku.reliability;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RequiredArgsConstructor
public class IdempotentRecordService {
private final IdempotentRecordMapper idempotentRecordMapper;
public boolean isProcessed(String idempotentKey) {
String status = idempotentRecordMapper.getStatus(idempotentKey);
return "PROCESSED".equals(status);
}
public boolean tryAcquire(String idempotentKey) {
int rows = idempotentRecordMapper.tryInsert(idempotentKey);
if (rows == 0) {
String status = idempotentRecordMapper.getStatus(idempotentKey);
return "PROCESSED".equals(status);
}
return true;
}
public void markProcessed(String idempotentKey) {
idempotentRecordMapper.markProcessed(idempotentKey);
}
}
package com.toolsku.reliability;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.*;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import jakarta.servlet.http.HttpServletRequest;
import java.lang.reflect.Method;
@Slf4j
@Component
@Aspect
public class IdempotentAspect {
private final IdempotentRecordService idempotentRecordService;
public IdempotentAspect(IdempotentRecordService idempotentRecordService) {
this.idempotentRecordService = idempotentRecordService;
}
@Around("@annotation(com.toolsku.reliability.Idempotent)")
public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();
Idempotent annotation = method.getAnnotation(Idempotent.class);
String idempotentKey = resolveKey(annotation);
if (idempotentKey == null) {
return joinPoint.proceed();
}
if (idempotentRecordService.isProcessed(idempotentKey)) {
log.info("Idempotent check: already processed, key={}", idempotentKey);
return null;
}
if (!idempotentRecordService.tryAcquire(idempotentKey)) {
log.warn("Idempotent check: concurrent processing, key={}", idempotentKey);
throw new RuntimeException("Concurrent request detected");
}
try {
Object result = joinPoint.proceed();
idempotentRecordService.markProcessed(idempotentKey);
return result;
} catch (Exception e) {
log.error("Idempotent execution failed: key={}", idempotentKey, e);
throw e;
}
}
private String resolveKey(Idempotent annotation) {
ServletRequestAttributes attributes =
(ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
if (attributes == null) {
return null;
}
HttpServletRequest request = attributes.getRequest();
String header = request.getHeader(annotation.headerKey());
return header != null ? annotation.prefix() + header : null;
}
}
package com.toolsku.reliability;
import java.lang.annotation.*;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Idempotent {
String headerKey() default "X-Idempotent-Key";
String prefix() default "IDEMPOTENT:";
}
package com.toolsku.reliability;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.*;
@Slf4j
@Component
public class TransactionTimeoutGuard {
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
private final ConcurrentHashMap<String, TimeoutTask> runningTasks = new ConcurrentHashMap<>();
public void register(String xid, long timeoutMillis, Runnable onTimeout) {
ScheduledFuture<?> future = scheduler.schedule(() -> {
log.warn("Transaction timeout triggered: xid={}", xid);
runningTasks.remove(xid);
try {
onTimeout.run();
} catch (Exception e) {
log.error("Timeout callback failed: xid={}", xid, e);
}
}, timeoutMillis, TimeUnit.MILLISECONDS);
runningTasks.put(xid, new TimeoutTask(xid, future, timeoutMillis));
}
public void cancel(String xid) {
TimeoutTask task = runningTasks.remove(xid);
if (task != null) {
task.getFuture().cancel(false);
log.info("Transaction timeout guard cancelled: xid={}", xid);
}
}
public long getRemainingTime(String xid) {
TimeoutTask task = runningTasks.get(xid);
if (task == null) {
return -1;
}
long elapsed = System.currentTimeMillis() - task.getStartTime();
return Math.max(0, task.getTimeoutMillis() - elapsed);
}
@lombok.Data
@lombok.AllArgsConstructor
private static class TimeoutTask {
private String xid;
private ScheduledFuture<?> future;
private long timeoutMillis;
private long startTime = System.currentTimeMillis();
}
}
package com.toolsku.reliability;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@Component
public class TransactionMetrics {
private final Counter sagaSuccessCounter;
private final Counter sagaFailureCounter;
private final Counter tccConfirmCounter;
private final Counter tccCancelCounter;
private final Counter seataCommitCounter;
private final Counter seataRollbackCounter;
private final Timer sagaExecutionTimer;
private final AtomicInteger activeTransactionGauge;
public TransactionMetrics(MeterRegistry registry) {
this.sagaSuccessCounter = Counter.builder("transaction.saga.success")
.description("Saga transaction success count").register(registry);
this.sagaFailureCounter = Counter.builder("transaction.saga.failure")
.description("Saga transaction failure count").register(registry);
this.tccConfirmCounter = Counter.builder("transaction.tcc.confirm")
.description("TCC confirm count").register(registry);
this.tccCancelCounter = Counter.builder("transaction.tcc.cancel")
.description("TCC cancel count").register(registry);
this.seataCommitCounter = Counter.builder("transaction.seata.commit")
.description("Seata commit count").register(registry);
this.seataRollbackCounter = Counter.builder("transaction.seata.rollback")
.description("Seata rollback count").register(registry);
this.sagaExecutionTimer = Timer.builder("transaction.saga.duration")
.description("Saga execution duration").register(registry);
this.activeTransactionGauge = registry.gauge("transaction.active.count",
new AtomicInteger(0));
}
public void recordSagaSuccess(long durationMs) {
sagaSuccessCounter.increment();
sagaExecutionTimer.record(durationMs, TimeUnit.MILLISECONDS);
}
public void recordSagaFailure(long durationMs) {
sagaFailureCounter.increment();
sagaExecutionTimer.record(durationMs, TimeUnit.MILLISECONDS);
}
public void recordTccConfirm() { tccConfirmCounter.increment(); }
public void recordTccCancel() { tccCancelCounter.increment(); }
public void recordSeataCommit() { seataCommitCounter.increment(); }
public void recordSeataRollback() { seataRollbackCounter.increment(); }
public void incrementActive() { activeTransactionGauge.incrementAndGet(); }
public void decrementActive() { activeTransactionGauge.decrementAndGet(); }
}
避坑指南
坑1:Saga补偿操作不是回滚
// ❌ 错误:补偿操作试图回滚数据到之前的状态
public boolean compensateOrder(String orderNo) {
return orderMapper.deleteById(orderNo) > 0; // 物理删除,审计数据丢失
}
// ✅ 正确:补偿操作是语义上的反向操作
public boolean compensateOrder(String orderNo) {
return orderMapper.updateStatus(orderNo, "CANCELLED") > 0; // 状态变更,保留记录
}
坑2:TCC的空回滚与悬挂
// ❌ 错误:Try未执行但Cancel被调用,Cancel直接操作数据
public boolean cancelFreeze(String xid, Long userId, BigDecimal amount) {
return accountMapper.cancelFreeze(userId, amount) > 0; // 空回滚:冻结记录不存在
}
// ✅ 正确:检查Try是否执行过
public boolean cancelFreeze(String xid, Long userId, BigDecimal amount, String branchId) {
String status = accountTccMapper.getFreezeStatus(xid, branchId);
if (status == null) {
// 空回滚:Try未执行,插入一条记录防止悬挂
accountTccMapper.insertFreezeRecord(xid, userId, amount, branchId);
accountTccMapper.updateFreezeStatus(xid, branchId, "CANCELLED");
log.warn("TCC empty rollback: xid={}, branchId={}", xid, branchId);
return true;
}
if ("CANCELLED".equals(status) || "CONFIRMED".equals(status)) {
return true; // 幂等:已处理
}
return accountTccMapper.cancelFreeze(userId, amount) > 0;
}
坑3:Seata全局锁导致死锁
// ❌ 错误:全局事务中嵌套查询同一行数据
@GlobalTransactional
public void processOrder(String orderNo) {
orderMapper.updateStatus(orderNo, "PROCESSING"); // 获取全局锁
Order order = orderMapper.selectByOrderNo(orderNo); // 再次查询同一行
// 如果另一个全局事务也持有该行锁,互相等待→死锁
}
// ✅ 正确:减少全局锁持有时间,避免交叉更新
@GlobalTransactional(timeoutMills = 30000)
public void processOrder(String orderNo) {
Order order = orderMapper.selectByOrderNo(orderNo); // 先查询
orderMapper.updateStatus(orderNo, "PROCESSING"); // 再更新
// 或拆分为多个短事务
}
坑4:消息发件箱与业务不在同一事务
// ❌ 错误:先发消息再写数据库,消息发了但数据库写失败
public void createOrder(Order order) {
rocketMQTemplate.convertAndSend("order-topic", orderEvent);
orderMapper.insert(order); // 如果这里失败,消息已发出无法撤回
}
// ✅ 正确:发件箱模式,业务操作和事件记录在同一事务
@Transactional
public void createOrder(Order order) {
orderMapper.insert(order);
outboxMapper.insert(buildOutboxEvent(order)); // 同一事务保证原子性
}
坑5:消息消费端不幂等
// ❌ 错误:直接扣减,重复消费导致重复扣款
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "account-group")
public class AccountConsumer implements RocketMQListener<String> {
public void onMessage(String message) {
OrderEvent event = parse(message);
accountService.deductBalance(event.getUserId(), event.getAmount()); // 重复消费!
}
}
// ✅ 正确:消费端幂等校验
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "account-group")
public class AccountConsumer implements RocketMQListener<String> {
public void onMessage(String message) {
OrderEvent event = parse(message);
String key = "DEDUCT:" + event.getOrderNo();
if (idempotentRecordService.isProcessed(key)) {
return; // 幂等跳过
}
accountService.deductBalance(event.getUserId(), event.getAmount());
idempotentRecordService.markProcessed(key);
}
}
报错排查
| 序号 | 报错信息 | 原因 | 解决方法 |
|---|---|---|---|
| 1 | Saga compensation failed for step |
补偿操作抛异常,可能数据已被手动修改 | 增加补偿重试,记录补偿失败事件人工介入 |
| 2 | TCC empty rollback detected |
Try未执行但Cancel被调用(网络超时) | 插入空回滚记录防止悬挂,Cancel幂等 |
| 3 | Seata global lock conflict |
多个全局事务竞争同一行锁 | 缩小事务范围,降低隔离级别,增加锁重试 |
| 4 | Seata undo_log not found |
分支事务回滚时找不到undo日志 | 检查undo_log表是否在每个库创建,数据源代理是否正确 |
| 5 | RocketMQ send timeout |
消息发送超时,Broker不可达 | 检查Broker状态,增加发送超时时间,使用发件箱模式 |
| 6 | Message consumption duplicate |
消息重复消费 | 消费端实现幂等,使用唯一业务键去重 |
| 7 | Outbox event stuck in PENDING |
发件箱事件未被relay调度器消费 | 检查调度器运行状态,确认RocketMQ连接正常 |
| 8 | Global transaction timeout |
全局事务执行超时 | 增大timeoutMills,优化慢SQL,拆分长事务 |
| 9 | TCC resource suspended |
Cancel先于Try执行,Try后续到达 | 空回滚记录拦截Try执行,检查超时配置 |
| 10 | Compensation circular dependency |
多个Saga补偿互相依赖形成环 | 设计单向补偿链,避免循环补偿 |
进阶优化
1. Saga状态机持久化
package com.toolsku.saga;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.annotations.*;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Slf4j
@Service
@RequiredArgsConstructor
public class SagaStateService {
private final SagaStateMapper sagaStateMapper;
@Transactional(rollbackFor = Exception.class)
public void persistSagaState(SagaDefinition saga) {
String stateJson = serializeSagaState(saga);
sagaStateMapper.upsert(saga.getSagaId(), stateJson, saga.getStatus().name());
}
public SagaDefinition recoverSaga(String sagaId) {
String stateJson = sagaStateMapper.getState(sagaId);
if (stateJson == null) {
return null;
}
return deserializeSagaState(stateJson);
}
private String serializeSagaState(SagaDefinition saga) {
// 序列化saga状态为JSON
StringBuilder sb = new StringBuilder();
sb.append("{\"sagaId\":\"").append(saga.getSagaId()).append("\"");
sb.append(",\"currentStep\":").append(saga.getCurrentStep());
sb.append(",\"status\":\"").append(saga.getStatus().name()).append("\"");
sb.append(",\"steps\":[");
for (int i = 0; i < saga.getSteps().size(); i++) {
if (i > 0) sb.append(",");
SagaDefinition.SagaStep step = saga.getSteps().get(i);
sb.append("{\"name\":\"").append(step.getName()).append("\"");
sb.append(",\"status\":\"").append(step.getStepStatus().name()).append("\"}");
}
sb.append("]}");
return sb.toString();
}
private SagaDefinition deserializeSagaState(String json) {
// 反序列化并重建SagaDefinition
// 实际项目中使用Jackson/Gson
throw new UnsupportedOperationException("Use Jackson for deserialization");
}
}
@Mapper
interface SagaStateMapper {
@Insert("INSERT INTO saga_state (saga_id, state_json, status, created_at, updated_at) " +
"VALUES (#{sagaId}, #{stateJson}, #{status}, NOW(), NOW()) " +
"ON DUPLICATE KEY UPDATE state_json = #{stateJson}, status = #{status}, updated_at = NOW()")
int upsert(@Param("sagaId") String sagaId,
@Param("stateJson") String stateJson,
@Param("status") String status);
@Select("SELECT state_json FROM saga_state WHERE saga_id = #{sagaId}")
String getState(@Param("sagaId") String sagaId);
}
2. TCC分支事务注册中心
package com.toolsku.tcc;
import lombok.Data;
import org.apache.ibatis.annotations.*;
import java.math.BigDecimal;
import java.time.LocalDateTime;
@Data
public class TccBranchTransaction {
private Long id;
private String xid;
private String branchId;
private String serviceName;
private String tryMethod;
private String confirmMethod;
private String cancelMethod;
private String paramJson;
private String status;
private LocalDateTime createdAt;
}
@Mapper
interface TccBranchTransactionMapper {
@Insert("INSERT INTO tcc_branch_transaction (xid, branch_id, service_name, try_method, " +
"confirm_method, cancel_method, param_json, status, created_at) " +
"VALUES (#{xid}, #{branchId}, #{serviceName}, #{tryMethod}, #{confirmMethod}, " +
"#{cancelMethod}, #{paramJson}, #{status}, NOW())")
int insert(TccBranchTransaction branch);
@Select("SELECT * FROM tcc_branch_transaction WHERE xid = #{xid} ORDER BY created_at ASC")
java.util.List<TccBranchTransaction> findByXid(@Param("xid") String xid);
@Update("UPDATE tcc_branch_transaction SET status = #{status} WHERE xid = #{xid} AND branch_id = #{branchId}")
int updateStatus(@Param("xid") String xid, @Param("branchId") String branchId, @Param("status") String status);
}
3. 分布式事务监控大盘
package com.toolsku.reliability;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.concurrent.atomic.AtomicLong;
@Slf4j
@Component
@RequiredArgsConstructor
public class TransactionMonitor {
private final TransactionMetrics transactionMetrics;
private final AlertService alertService;
private final AtomicLong lastSagaFailureCount = new AtomicLong(0);
private final AtomicLong lastSeataRollbackCount = new AtomicLong(0);
@Scheduled(fixedRate = 60000)
public void checkTransactionHealth() {
double sagaFailureRate = calculateSagaFailureRate();
if (sagaFailureRate > 0.1) {
alertService.sendAlert("Saga failure rate too high: " + (sagaFailureRate * 100) + "%");
}
int activeCount = transactionMetrics.getActiveTransactionCount();
if (activeCount > 500) {
alertService.sendAlert("Too many active transactions: " + activeCount);
}
log.info("Transaction health check: sagaFailureRate={:.2f}%, activeTransactions={}",
sagaFailureRate * 100, activeCount);
}
private double calculateSagaFailureRate() {
// 实际从metrics中获取最近1小时的success/failure计数
return 0.0;
}
}
对比分析
| 维度 | Saga编排 | TCC模式 | Seata AT | 消息最终一致性 | 生产级可靠性 |
|---|---|---|---|---|---|
| 一致性模型 | 最终一致 | 最终一致 | 强一致(全局锁) | 最终一致 | 最终一致 |
| 侵入性 | 低(补偿逻辑) | 高(Try/Confirm/Cancel) | 低(自动代理) | 中(发件箱+幂等) | 高(全套保障) |
| 性能 | ⭐⭐⭐⭐高 | ⭐⭐⭐中 | ⭐⭐低(全局锁) | ⭐⭐⭐⭐⭐最高 | ⭐⭐⭐中 |
| 实现复杂度 | ⭐⭐中 | ⭐⭐⭐⭐高 | ⭐低(框架承担) | ⭐⭐⭐中 | ⭐⭐⭐⭐⭐极高 |
| 隔离性 | ❌无隔离 | ✅Try阶段隔离 | ✅全局锁隔离 | ❌无隔离 | ✅可选隔离 |
| 补偿/回滚 | ✅补偿操作 | ✅Cancel操作 | ✅自动回滚 | ✅重试+人工 | ✅多级保障 |
| 适用场景 | 长流程编排 | 资金/库存核心 | 传统微服务 | 异步解耦场景 | 核心交易链路 |
| 框架依赖 | 自研/Seata Saga | 自研/Seata TCC | Seata | RocketMQ/Kafka | 全栈组合 |
总结:分布式事务没有银弹。Saga适合长流程编排,补偿语义清晰但无隔离;TCC适合资金核心链路,隔离性强但实现成本高;Seata AT适合快速接入,低侵入但全局锁是性能瓶颈;消息最终一致性适合异步解耦,性能最高但需要幂等和发件箱。生产环境的核心链路,建议TCC + 消息最终一致性 + 幂等 + 监控组合使用,宁可多写一倍代码,也不要在线上排查数据不一致。
在线工具推荐
- JSON格式化:/zh-CN/json/format
- Hash计算:/zh-CN/encode/hash
- cURL转代码:/zh-CN/dev/curl-to-code
本站提供浏览器本地工具,免注册即可试用 →
#分布式事务#Saga模式#TCC#Seata#消息最终一致性#2026#技术架构