Spring Boot 3 + MyBatis-Plus多資料源架構全攻略

后端开发

為什麼多資料源是企業級剛需

單資料源架構在網際網路早期夠用,但隨著業務規模膨脹,一個資料庫扛不住所有流量和複雜度。多資料源不是「錦上添花」,而是「不得不做」。

五大核心場景

場景 典型需求 資料源關係 範例
讀寫分離 讀多寫少,讀壓力分散 一主多從 電商商品詳情頁
業務分庫 不同業務域物理隔離 平行獨立 訂單庫 vs 使用者庫
分庫分表 單表資料量過大 水平分片 日誌表按月分表
多租戶 不同租戶資料隔離 一租戶一庫/一Schema SaaS平台
異構資料源 不同類型儲存協作 MySQL + PG + ES 搜尋+事務混合
┌──────────────────────────────────────────────────────────┐
│                    單資料源架構(早期)                      │
│                                                          │
│    App ──────► MySQL (讀寫全在這裡)                        │
│                                                          │
│    問題:讀寫競爭、單表膨脹、無法水平擴展                      │
└──────────────────────────────────────────────────────────┘

┌──────────────────────────────────────────────────────────┐
│                    多資料源架構(演進)                      │
│                                                          │
│    App ──┬──► MySQL-Master (寫)                           │
│           ├──► MySQL-Slave1 (讀)                          │
│           ├──► MySQL-Slave2 (讀)                          │
│           ├──► OrderDB (訂單庫)                            │
│           ├──► UserDB (使用者庫)                           │
│           └──► Elasticsearch (搜尋)                       │
└──────────────────────────────────────────────────────────┘

真實案例: 某社交平台日活500萬,單庫QPS峰值12萬,讀寫分離+業務分庫後,主庫QPS降至3萬,整體吞吐提升4倍。


多資料源架構三種模式

模式對比總覽

維度 應用內多資料源 JDBC層代理 代理層獨立部署
代表方案 dynamic-datasource ShardingSphere-JDBC ShardingSphere-Proxy / MyCat
侵入性 低(註解/配置) 中(替換DataSource) 無(獨立程序)
運維複雜度
效能損耗 極低 中(網路跳轉)
功能豐富度 基礎切換 分片+讀寫分離+加密 全功能
適用規模 中小專案 中大型專案 大型/跨語言專案
語言綁定 Java Java 語言無關

模式一:應用內多資料源

┌─────────────────────────────────────────────┐
│              Spring Boot Application          │
│                                              │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  │
│  │ DataSource│  │ DataSource│  │ DataSource│  │
│  │  master   │  │  slave_1  │  │  slave_2  │  │
│  └─────┬────┘  └─────┬────┘  └─────┬────┘  │
│        │             │             │        │
│  ┌─────▼─────────────▼─────────────▼────┐   │
│  │     DynamicRoutingDataSource          │   │
│  │  (ThreadLocal + @DS 註解路由)          │   │
│  └──────────────────────────────────────┘   │
└─────────────────────────────────────────────┘

模式二:JDBC層代理

┌─────────────────────────────────────────────┐
│              Spring Boot Application          │
│                                              │
│  ┌──────────────────────────────────────┐   │
│  │     ShardingSphere-JDBC (JAR)         │   │
│  │  ┌────────────────────────────────┐  │   │
│  │  │  SQL解析 → 路由 → 改寫 → 執行    │  │   │
│  │  └────────────────────────────────┘  │   │
│  └──────┬──────────┬──────────┬─────────┘   │
│         │          │          │              │
│    ds_0 │     ds_1 │     ds_2 │              │
└─────────┼──────────┼──────────┼──────────────┘
          ▼          ▼          ▼
      MySQL-0    MySQL-1    MySQL-2

模式三:代理層獨立部署

┌────────────┐     ┌──────────────────┐     ┌─────────┐
│  App (Java) │────►│                  │     │ MySQL-0 │
├────────────┤     │  ShardingSphere  │────►├─────────┤
│  App (Go)  │────►│  -Proxy (獨立)    │────►│ MySQL-1 │
├────────────┤     │                  │     ├─────────┤
│  App (Py)  │────►│  對應用透明        │────►│ MySQL-2 │
└────────────┘     └──────────────────┘     └─────────┘

dynamic-datasource-spring-boot-starter實戰

這是目前最流行的應用內多資料源方案,由苞米豆團隊維護,與MyBatis-Plus深度整合。

Maven依賴

<dependencies>
    <dependency>
        <groupId>com.baomidou</groupId>
        <artifactId>dynamic-datasource-spring-boot3-starter</artifactId>
        <version>4.3.1</version>
    </dependency>
    <dependency>
        <groupId>com.baomidou</groupId>
        <artifactId>mybatis-plus-spring-boot3-starter</artifactId>
        <version>3.5.7</version>
    </dependency>
    <dependency>
        <groupId>com.mysql</groupId>
        <artifactId>mysql-connector-j</artifactId>
        <scope>runtime</scope>
    </dependency>
</dependencies>

基礎配置

spring:
  datasource:
    dynamic:
      primary: master
      strict: true
      datasource:
        master:
          url: jdbc:mysql://localhost:3306/db_master?useSSL=false&serverTimezone=Asia/Shanghai
          username: root
          password: master_pwd
          driver-class-name: com.mysql.cj.jdbc.Driver
        slave_1:
          url: jdbc:mysql://localhost:3307/db_slave?useSSL=false&serverTimezone=Asia/Shanghai
          username: root
          password: slave_pwd
          driver-class-name: com.mysql.cj.jdbc.Driver
        slave_2:
          url: jdbc:mysql://localhost:3308/db_slave?useSSL=false&serverTimezone=Asia/Shanghai
          username: root
          password: slave_pwd
          driver-class-name: com.mysql.cj.jdbc.Driver
      hikari:
        minimum-idle: 5
        maximum-pool-size: 20
        idle-timeout: 30000
        max-lifetime: 1800000
        connection-timeout: 30000
        pool-name: DynamicHikariCP

註解切換 @DS

@Service
public class UserServiceImpl implements UserService {

    @Autowired
    private UserMapper userMapper;

    @DS("master")
    @Override
    public void createUser(User user) {
        userMapper.insert(user);
    }

    @DS("slave_1")
    @Override
    public User getUserById(Long id) {
        return userMapper.selectById(id);
    }

    @DS("slave_2")
    @Override
    public List<User> listUsers(int pageNum, int pageSize) {
        Page<User> page = new Page<>(pageNum, pageSize);
        return userMapper.selectPage(page, null).getRecords();
    }
}

程式設計式切換

註解方式在複雜場景下不夠靈活,程式設計式切換可以在執行時動態決定資料源:

@Service
public class OrderServiceImpl implements OrderService {

    @Autowired
    private OrderMapper orderMapper;

    @Override
    public Order getOrderWithDynamicDs(String tenantId) {
        String dsKey = "tenant_" + tenantId;
        return DynamicDataSourceContextHolder.push(dsKey, () -> {
            return orderMapper.selectById(tenantId);
        });
    }

    @Override
    public List<Order> batchQuery(List<String> tenantIds) {
        List<Order> result = new ArrayList<>();
        for (String tenantId : tenantIds) {
            DynamicDataSourceContextHolder.push("tenant_" + tenantId, () -> {
                result.addAll(orderMapper.selectList(null));
            });
        }
        return result;
    }
}

自定義動態資料源路由策略

@Component
public class CustomDsProcessor extends DsProcessor {

    @Autowired
    private TenantDataSourceManager tenantDsManager;

    @Override
    public String determineDsKey(MethodInvocation invocation, String key) {
        if (key.startsWith("tenant#")) {
            String tenantId = extractTenantId(invocation);
            return tenantDsManager.resolveDsKey(tenantId);
        }
        return key;
    }

    private String extractTenantId(MethodInvocation invocation) {
        Object[] args = invocation.getArguments();
        for (Object arg : args) {
            if (arg instanceof TenantAware tenantAware) {
                return tenantAware.getTenantId();
            }
        }
        throw new IllegalStateException("無法提取租戶ID");
    }
}

AOP切面實現讀寫分離自動路由

@Aspect
@Component
@Order(Ordered.HIGHEST_PRECEDENCE)
public class ReadWriteSplitAspect {

    private static final Set<String> READ_METHOD_PREFIXES =
        Set.of("get", "query", "find", "list", "count", "page", "search");

    @Around("execution(* com.example..service.impl.*ServiceImpl.*(..))")
    public Object around(ProceedingJoinPoint point) throws Throwable {
        String methodName = point.getSignature().getName();
        boolean isRead = READ_METHOD_PREFIXES.stream()
            .anyMatch(methodName::startsWith);

        if (isRead) {
            DynamicDataSourceContextHolder.push("slave_1");
        } else {
            DynamicDataSourceContextHolder.push("master");
        }

        try {
            return point.proceed();
        } finally {
            DynamicDataSourceContextHolder.poll();
        }
    }
}

MyBatis-Plus多資料源配置

跨資料源關聯查詢

MyBatis-Plus預設不支援跨資料源JOIN,需要應用層組裝:

@Service
public class OrderDetailServiceImpl implements OrderDetailService {

    @Autowired
    private OrderMapper orderMapper;

    @Autowired
    private UserMapper userMapper;

    @Autowired
    private ProductMapper productMapper;

    @DS("order_db")
    public OrderDetailVO getOrderDetail(Long orderId) {
        Order order = orderMapper.selectById(orderId);

        User user = DynamicDataSourceContextHolder.push("user_db", () -> {
            return userMapper.selectById(order.getUserId());
        });

        Product product = DynamicDataSourceContextHolder.push("product_db", () -> {
            return productMapper.selectById(order.getProductId());
        });

        return OrderDetailVO.builder()
                .order(order)
                .user(user)
                .product(product)
                .build();
    }
}

跨資料源批次查詢最佳化

@Service
public class BatchCrossDsService {

    @Autowired
    private OrderMapper orderMapper;

    @Autowired
    private UserMapper userMapper;

    public List<OrderWithUserVO> batchQuery(List<Long> orderIds) {
        List<Order> orders = DynamicDataSourceContextHolder.push("order_db", () -> {
            return orderMapper.selectBatchIds(orderIds);
        });

        List<Long> userIds = orders.stream()
                .map(Order::getUserId)
                .distinct()
                .collect(Collectors.toList());

        Map<Long, User> userMap = DynamicDataSourceContextHolder.push("user_db", () -> {
            return userMapper.selectBatchIds(userIds).stream()
                    .collect(Collectors.toMap(User::getId, Function.identity()));
        });

        return orders.stream().map(order -> {
            OrderWithUserVO vo = new OrderWithUserVO();
            vo.setOrder(order);
            vo.setUser(userMap.get(order.getUserId()));
            return vo;
        }).collect(Collectors.toList());
    }
}

動態增減資料源

SaaS場景下,租戶動態註冊需要執行時新增資料源:

@Component
public class TenantDataSourceManager {

    @Autowired
    private DynamicRoutingDataSource dynamicRoutingDataSource;

    @Autowired
    private DataSourceProperty defaultProperty;

    private final Map<String, DataSourceProperty> tenantDsMap = new ConcurrentHashMap<>();

    public synchronized void addTenantDs(String tenantId, String url, String username, String password) {
        String dsKey = "tenant_" + tenantId;

        DataSourceProperty property = new DataSourceProperty();
        property.setUrl(url);
        property.setUsername(username);
        property.setPassword(password);
        property.setDriverClassName(defaultProperty.getDriverClassName());
        property.setLazy(true);

        HikariDataSource dataSource = property.getDataSource()
                .orElseThrow(() -> new RuntimeException("建立資料源失敗"));
        dataSource.setMaximumPoolSize(10);
        dataSource.setMinimumIdle(2);

        dynamicRoutingDataSource.addDataSource(dsKey, dataSource);
        tenantDsMap.put(dsKey, property);

        log.info("租戶[{}]資料源已新增: {}", tenantId, dsKey);
    }

    public synchronized void removeTenantDs(String tenantId) {
        String dsKey = "tenant_" + tenantId;
        dynamicRoutingDataSource.removeDataSource(dsKey);
        tenantDsMap.remove(dsKey);
        log.info("租戶[{}]資料源已移除: {}", tenantId, dsKey);
    }

    public String resolveDsKey(String tenantId) {
        String dsKey = "tenant_" + tenantId;
        if (!tenantDsMap.containsKey(dsKey)) {
            throw new IllegalStateException("租戶資料源不存在: " + dsKey);
        }
        return dsKey;
    }

    public Set<String> getAllTenantDsKeys() {
        return Collections.unmodifiableSet(tenantDsMap.keySet());
    }
}

租戶資料源自動註冊監聽

@Component
@Slf4j
public class TenantDataSourceInitializer implements ApplicationRunner {

    @Autowired
    private TenantDataSourceManager tenantDsManager;

    @Autowired
    private TenantConfigRepository tenantConfigRepository;

    @Override
    public void run(ApplicationArguments args) {
        List<TenantConfig> tenants = tenantConfigRepository.findAllActive();
        for (TenantConfig tenant : tenants) {
            try {
                tenantDsManager.addTenantDs(
                    tenant.getTenantId(),
                    tenant.getDbUrl(),
                    tenant.getDbUsername(),
                    tenant.getDbPassword()
                );
            } catch (Exception e) {
                log.error("租戶[{}]資料源初始化失敗", tenant.getTenantId(), e);
            }
        }
        log.info("共初始化{}個租戶資料源", tenants.size());
    }
}

讀寫分離:從配置到實現

完整讀寫分離架構

                    ┌─────────────────┐
                    │   Application    │
                    │  @DS("master")   │──寫
                    │  @DS("slave")    │──讀
                    └────────┬────────┘
                             │
              ┌──────────────┼──────────────┐
              │              │              │
        ┌─────▼─────┐ ┌─────▼─────┐ ┌─────▼─────┐
        │  Master    │ │  Slave-1  │ │  Slave-2  │
        │  (讀寫)    │ │  (唯讀)   │ │  (唯讀)   │
        └─────┬─────┘ └───────────┘ └───────────┘
              │
        ┌─────▼─────┐
        │  Binlog    │
        │  同步      │
        └───────────┘

讀寫分離YAML配置

spring:
  datasource:
    dynamic:
      primary: master
      strict: true
      datasource:
        master:
          url: jdbc:mysql://mysql-master:3306/app_db?useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8mb4
          username: app_writer
          password: ${MASTER_DB_PWD}
        slave_1:
          url: jdbc:mysql://mysql-slave-1:3306/app_db?useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8mb4
          username: app_reader
          password: ${SLAVE1_DB_PWD}
        slave_2:
          url: jdbc:mysql://mysql-slave-2:3306/app_db?useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8mb4
          username: app_reader
          password: ${SLAVE2_DB_PWD}

自定義負載均衡策略

預設輪詢策略不夠靈活,生產環境需要根據從庫延遲動態路由:

@Component
public class SlaveLoadBalanceStrategy implements LoadBalanceStrategy {

    @Autowired
    private MySQLReplicationMonitor replicationMonitor;

    @Override
    public String determineDsKey(List<String> slaveDsKeys, String masterDsKey) {
        List<String> availableSlaves = slaveDsKeys.stream()
                .filter(key -> {
                    long delay = replicationMonitor.getReplicationDelay(key);
                    return delay < 1000;
                })
                .collect(Collectors.toList());

        if (availableSlaves.isEmpty()) {
            log.warn("所有從庫延遲過高,降級到主庫讀取");
            return masterDsKey;
        }

        int index = (int) (System.currentTimeMillis() % availableSlaves.size());
        return availableSlaves.get(index);
    }
}

複製延遲監控

@Component
@Slf4j
public class MySQLReplicationMonitor {

    private final Map<String, Long> replicationDelayMap = new ConcurrentHashMap<>();

    @Scheduled(fixedDelay = 5000)
    public void monitorReplicationDelay() {
        List<String> slaveKeys = List.of("slave_1", "slave_2");
        for (String slaveKey : slaveKeys) {
            DynamicDataSourceContextHolder.push(slaveKey, () -> {
                try (Connection conn = dataSource.getConnection();
                     Statement stmt = conn.createStatement();
                     ResultSet rs = stmt.executeQuery("SHOW SLAVE STATUS")) {
                    if (rs.next()) {
                        long secondsBehindMaster = rs.getLong("Seconds_Behind_Master");
                        replicationDelayMap.put(slaveKey, secondsBehindMaster * 1000);
                        if (secondsBehindMaster > 5) {
                            log.warn("從庫[{}]複製延遲: {}秒", slaveKey, secondsBehindMaster);
                        }
                    }
                } catch (SQLException e) {
                    replicationDelayMap.put(slaveKey, Long.MAX_VALUE);
                    log.error("從庫[{}]狀態檢查失敗", slaveKey, e);
                }
            });
        }
    }

    public long getReplicationDelay(String slaveKey) {
        return replicationDelayMap.getOrDefault(slaveKey, 0L);
    }
}

強制走主庫註解

某些場景(如寫入後立即讀取)必須走主庫:

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@DS("master")
public @interface MasterOnly {
}
@Service
public class PaymentServiceImpl implements PaymentService {

    @Autowired
    private PaymentMapper paymentMapper;

    @DS("master")
    @Override
    public Payment createPayment(PaymentDTO dto) {
        Payment payment = new Payment();
        BeanUtils.copyProperties(dto, payment);
        paymentMapper.insert(payment);
        return payment;
    }

    @MasterOnly
    @Override
    public Payment getPaymentAfterCreate(Long paymentId) {
        return paymentMapper.selectById(paymentId);
    }
}

分庫分表:ShardingSphere-JDBC深度整合

分片架構

┌──────────────────────────────────────────────────────┐
│                   Application Layer                    │
│                                                       │
│  ┌─────────────────────────────────────────────────┐ │
│  │           ShardingSphere-JDBC                    │ │
│  │                                                  │ │
│  │  SQL: SELECT * FROM t_order WHERE user_id=1001  │ │
│  │         │                                       │ │
│  │    ┌────▼────┐                                  │ │
│  │    │ SQL解析  │                                  │ │
│  │    └────┬────┘                                  │ │
│  │    ┌────▼────┐                                  │ │
│  │    │ 路由引擎 │──► ds_0.t_order_1               │ │
│  │    └────┬────┘                                  │ │
│  │    ┌────▼────┐                                  │ │
│  │    │ SQL改寫  │──► SELECT * FROM t_order_1 ...  │ │
│  │    └────┬────┘                                  │ │
│  │    ┌────▼────┐                                  │ │
│  │    │ 結果歸併  │                                  │ │
│  │    └─────────┘                                  │ │
│  └─────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────┘

Maven依賴

<dependencies>
    <dependency>
        <groupId>org.apache.shardingsphere</groupId>
        <artifactId>shardingsphere-jdbc-core</artifactId>
        <version>5.5.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.shardingsphere</groupId>
        <artifactId>shardingsphere-jdbc-core-spring-boot-starter</artifactId>
        <version>5.5.1</version>
    </dependency>
</dependencies>

分庫分表配置

spring:
  shardingsphere:
    mode:
      type: Standalone
      repository:
        type: JDBC
    datasource:
      names: ds_0,ds_1
      ds_0:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://localhost:3306/shard_db_0?useSSL=false&serverTimezone=Asia/Shanghai
        username: root
        password: ds0_pwd
      ds_1:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://localhost:3306/shard_db_1?useSSL=false&serverTimezone=Asia/Shanghai
        username: root
        password: ds1_pwd
    rules:
      sharding:
        tables:
          t_order:
            actual-data-nodes: ds_${0..1}.t_order_${0..15}
            table-strategy:
              standard:
                sharding-column: order_id
                sharding-algorithm-name: order-table-inline
            database-strategy:
              standard:
                sharding-column: user_id
                sharding-algorithm-name: order-db-mod
            key-generate-strategy:
              column: order_id
              key-generator-name: snowflake
          t_order_item:
            actual-data-nodes: ds_${0..1}.t_order_item_${0..15}
            table-strategy:
              standard:
                sharding-column: order_id
                sharding-algorithm-name: order-item-table-inline
            database-strategy:
              standard:
                sharding-column: user_id
                sharding-algorithm-name: order-db-mod
        sharding-algorithms:
          order-table-inline:
            type: INLINE
            props:
              algorithm-expression: t_order_${order_id % 16}
          order-item-table-inline:
            type: INLINE
            props:
              algorithm-expression: t_order_item_${order_id % 16}
          order-db-mod:
            type: MOD
            props:
              sharding-count: 2
        key-generators:
          snowflake:
            type: SNOWFLAKE
            props:
              worker-id: 1
    props:
      sql-show: true

自定義分片演算法

標準取模演算法在擴容時需要資料遷移,一致性雜湊演算法可以減少遷移量:

@Component
public class ConsistentHashShardingAlgorithm implements StandardShardingAlgorithm<Long> {

    private ConsistentHash<String> consistentHash;

    @Override
    public void init(Properties props) {
        int virtualNodes = Integer.parseInt(props.getProperty("virtual-nodes", "160"));
        List<String> actualDataNodes = Arrays.stream(props.getProperty("actual-data-nodes").split(","))
                .collect(Collectors.toList());

        HashFunction hashFunction = new Murmur3HashFunction();
        consistentHash = new ConsistentHash<>(hashFunction, virtualNodes, actualDataNodes);
    }

    @Override
    public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<Long> shardingValue) {
        return consistentHash.get(shardingValue.getValue());
    }

    @Override
    public Collection<String> doSharding(Collection<String> availableTargetNames, RangeShardingValue<Long> shardingValue) {
        return availableTargetNames;
    }

    @Override
    public String getType() {
        return "CONSISTENT_HASH";
    }
}

MyBatis-Plus與ShardingSphere整合注意事項

@Configuration
@MapperScan("com.example.mapper")
public class MybatisPlusConfig {

    @Bean
    public MybatisPlusInterceptor mybatisPlusInterceptor() {
        MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
        interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
        return interceptor;
    }

    @Bean
    public IdentifierGenerator identifierGenerator() {
        return new CustomIdGenerator();
    }
}

public class CustomIdGenerator implements IdentifierGenerator {
    @Override
    public Number nextId(Object entity) {
        return ShardingSphereIdGenerator.nextId();
    }
}

多租戶資料隔離:Schema級與Table級

兩種隔離模式對比

維度 Schema級隔離 Table級隔離
隔離級別 較高 較低
資源開銷 每租戶獨立Schema 共享Schema,表名區分
運維複雜度
資料洩露風險 中(需嚴格過濾)
適用場景 大客戶/金融 小客戶/SaaS
擴展性 Schema數量有限 表數量有限

Schema級隔離實現

@Component
public class SchemaTenantInterceptor implements InnerInterceptor {

    @Override
    public void beforeQuery(Executor executor, MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) {
        String tenantId = TenantContextHolder.getTenantId();
        if (tenantId != null) {
            DynamicDataSourceContextHolder.push("tenant_" + tenantId);
        }
    }
}

Table級隔離實現(MyBatis-Plus租戶外掛)

@Configuration
public class TenantConfig {

    @Bean
    public MybatisPlusInterceptor mybatisPlusInterceptor(TenantLineInnerInterceptor tenantInterceptor) {
        MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
        interceptor.addInnerInterceptor(tenantInterceptor);
        interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
        return interceptor;
    }

    @Bean
    public TenantLineInnerInterceptor tenantLineInnerInterceptor() {
        return new TenantLineInnerInterceptor(new TenantLineHandler() {
            @Override
            public Expression getTenantId() {
                Long tenantId = TenantContextHolder.getTenantId();
                if (tenantId == null) {
                    throw new IllegalStateException("租戶ID不能為空");
                }
                return new LongValue(tenantId);
            }

            @Override
            public boolean ignoreTable(String tableName) {
                return Set.of("sys_config", "sys_dict", "sys_menu")
                        .contains(tableName);
            }

            @Override
            public String getTenantIdColumn() {
                return "tenant_id";
            }
        });
    }
}

租戶上下文管理

public class TenantContextHolder {

    private static final ThreadLocal<Long> TENANT_ID = new ThreadLocal<>();

    public static void setTenantId(Long tenantId) {
        TENANT_ID.set(tenantId);
    }

    public static Long getTenantId() {
        return TENANT_ID.get();
    }

    public static void clear() {
        TENANT_ID.remove();
    }
}

Web層租戶ID注入

@Component
@WebFilter(urlPatterns = "/*")
public class TenantFilter implements Filter {

    @Override
    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
            throws IOException, ServletException {
        HttpServletRequest httpRequest = (HttpServletRequest) request;
        String tenantHeader = httpRequest.getHeader("X-Tenant-Id");

        if (tenantHeader != null) {
            try {
                TenantContextHolder.setTenantId(Long.parseLong(tenantHeader));
            } catch (NumberFormatException e) {
                throw new ServletException("無效的租戶ID: " + tenantHeader);
            }
        }

        try {
            chain.doFilter(request, response);
        } finally {
            TenantContextHolder.clear();
        }
    }
}

事務管理:本地事務/Seata分散式事務/MQ最終一致性

事務方案對比

方案 一致性 效能 複雜度 適用場景
本地事務(@DSTransactional) 強一致 單資料源或可容忍短暫不一致
Seata AT模式 強一致 跨庫強一致要求
Seata TCC模式 強一致 中低 很高 資金/庫存等核心業務
MQ最終一致性 最終一致 非同步場景/對延遲容忍
Saga模式 最終一致 長流程編排

本地事務:@DSTransactional

dynamic-datasource提供了@DSTransactional,在切換資料源時保持事務:

@Service
public class OrderServiceImpl implements OrderService {

    @Autowired
    private OrderMapper orderMapper;

    @Autowired
    private OrderItemMapper orderItemMapper;

    @DSTransactional
    @DS("master")
    @Override
    public void createOrder(OrderCreateDTO dto) {
        Order order = new Order();
        order.setOrderNo(generateOrderNo());
        order.setUserId(dto.getUserId());
        order.setStatus(OrderStatus.CREATED);
        orderMapper.insert(order);

        for (OrderItemDTO itemDTO : dto.getItems()) {
            OrderItem item = new OrderItem();
            item.setOrderId(order.getId());
            item.setProductId(itemDTO.getProductId());
            item.setQuantity(itemDTO.getQuantity());
            orderItemMapper.insert(item);
        }
    }
}

Seata AT模式整合

┌──────────┐    ┌──────────┐    ┌──────────┐
│  Service A│───►│  Service B│───►│  Service C│
│  (訂單庫)  │    │  (庫存庫)  │    │  (帳戶庫)  │
└─────┬────┘    └─────┬────┘    └─────┬────┘
      │               │               │
      └───────────────┼───────────────┘
                      │
              ┌───────▼───────┐
              │  Seata Server  │
              │  (TC協調器)    │
              └───────────────┘
<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
    <version>2023.0.3.2</version>
</dependency>
seata:
  enabled: true
  application-id: order-service
  tx-service-group: my_test_tx_group
  service:
    vgroup-mapping:
      my_test_tx_group: default
  registry:
    type: nacos
    nacos:
      server-addr: localhost:8848
      namespace: seata
      group: SEATA_GROUP
  config:
    type: nacos
    nacos:
      server-addr: localhost:8848
      namespace: seata
      group: SEATA_GROUP
@Service
public class BusinessServiceImpl implements BusinessService {

    @Autowired
    private OrderService orderService;

    @Autowired
    private StockService stockService;

    @Autowired
    private AccountService accountService;

    @GlobalTransactional(name = "create-order-tx", rollbackFor = Exception.class)
    @Override
    public void purchase(PurchaseDTO dto) {
        orderService.createOrder(dto);
        stockService.deductStock(dto.getProductId(), dto.getQuantity());
        accountService.deductBalance(dto.getUserId(), dto.getAmount());
    }
}

MQ最終一致性

@Service
@Slf4j
public class OrderMQService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Autowired
    private OrderMapper orderMapper;

    @Autowired
    private TransactionLogMapper transactionLogMapper;

    @Transactional
    public void createOrderWithMQ(OrderCreateDTO dto) {
        Order order = new Order();
        BeanUtils.copyProperties(dto, order);
        order.setStatus(OrderStatus.CREATED);
        orderMapper.insert(order);

        TransactionLog log = new TransactionLog();
        log.setTransactionId(UUID.randomUUID().toString());
        log.setBizType("CREATE_ORDER");
        log.setBizId(order.getId());
        log.setPayload(JSON.toJSONString(dto));
        transactionLogMapper.insert(log);
    }

    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void onOrderCreated(OrderCreatedEvent event) {
        rocketMQTemplate.asyncSend(
            "order-create-topic",
            MessageBuilder.withPayload(event)
                .setHeader("KEYS", event.getOrderNo())
                .build(),
            new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    log.info("訂單訊息傳送成功: {}", event.getOrderNo());
                }

                @Override
                public void onException(Throwable e) {
                    log.error("訂單訊息傳送失敗: {}", event.getOrderNo(), e);
                }
            }
        );
    }
}

訊息補償定時任務

@Component
@Slf4j
public class TransactionLogCompensator {

    @Autowired
    private TransactionLogMapper transactionLogMapper;

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Scheduled(fixedDelay = 30000)
    public void compensate() {
        List<TransactionLog> pendingLogs = transactionLogMapper.selectList(
            new LambdaQueryWrapper<TransactionLog>()
                .eq(TransactionLog::getStatus, 0)
                .lt(TransactionLog::getRetryCount, 5)
                .le(TransactionLog::getCreateTime, LocalDateTime.now().minusMinutes(1))
        );

        for (TransactionLog log : pendingLogs) {
            try {
                rocketMQTemplate.syncSend("order-create-topic", log.getPayload());
                log.setStatus(1);
                transactionLogMapper.updateById(log);
            } catch (Exception e) {
                log.setRetryCount(log.getRetryCount() + 1);
                transactionLogMapper.updateById(log);
                log.warn("事務日誌[{}]補償失敗,重試次數: {}", log.getTransactionId(), log.getRetryCount());
            }
        }
    }
}

監控與運維:資料源健康檢查+Micrometer指標採集

資料源健康檢查

@Component
@Slf4j
public class DataSourceHealthChecker {

    @Autowired
    private DynamicRoutingDataSource dynamicRoutingDataSource;

    @Scheduled(fixedDelay = 10000)
    public void checkAllDataSources() {
        Map<String, DataSource> dataSources = dynamicRoutingDataSource.getCurrentDataSources();

        for (Map.Entry<String, DataSource> entry : dataSources.entrySet()) {
            String dsKey = entry.getKey();
            try (Connection conn = entry.getValue().getConnection();
                 Statement stmt = conn.createStatement();
                 ResultSet rs = stmt.executeQuery("SELECT 1")) {

                if (rs.next() && rs.getInt(1) == 1) {
                    log.debug("資料源[{}]健康檢查通過", dsKey);
                }
            } catch (SQLException e) {
                log.error("資料源[{}]健康檢查失敗", dsKey, e);
                alertService.sendAlert("資料源異常: " + dsKey, e.getMessage());
            }
        }
    }
}

Micrometer指標採集

@Configuration
public class DataSourceMetricsConfig {

    @Bean
    public MeterDataSourceAspect meterDataSourceAspect(MeterRegistry registry) {
        return new MeterDataSourceAspect(registry);
    }

    public static class MeterDataSourceAspect {

        private final MeterRegistry registry;
        private final Counter dsSwitchCounter;
        private final Timer dsQueryTimer;

        public MeterDataSourceAspect(MeterRegistry registry) {
            this.registry = registry;
            this.dsSwitchCounter = Counter.builder("datasource.switch.count")
                    .description("資料源切換次數")
                    .register(registry);
            this.dsQueryTimer = Timer.builder("datasource.query.duration")
                    .description("資料源查詢耗時")
                    .tag("type", "dynamic")
                    .register(registry);
        }

        public void recordSwitch(String dsKey) {
            dsSwitchCounter.increment();
            registry.counter("datasource.switch.count", "ds", dsKey).increment();
        }

        public Timer.Sample startQuery() {
            return Timer.start(registry);
        }

        public void endQuery(Timer.Sample sample, String dsKey) {
            sample.stop(Timer.builder("datasource.query.duration")
                    .tag("ds", dsKey)
                    .register(registry));
        }
    }
}

HikariCP連線池監控

spring:
  datasource:
    dynamic:
      hikari:
        register-mbeans: true
        metrics-tracker: true
@Component
public class HikariMetricsExporter {

    @Autowired
    private DynamicRoutingDataSource dynamicRoutingDataSource;

    @Autowired
    private MeterRegistry meterRegistry;

    @Scheduled(fixedDelay = 15000)
    public void exportHikariMetrics() {
        Map<String, DataSource> dataSources = dynamicRoutingDataSource.getCurrentDataSources();

        for (Map.Entry<String, DataSource> entry : dataSources.entrySet()) {
            if (entry.getValue() instanceof HikariDataSource hikari) {
                HikariPoolMXBean pool = hikari.getHikariPoolMXBean();

                Gauge.builder("hikari.active.connections", pool, HikariPoolMXBean::getActiveConnections)
                        .tag("pool", entry.getKey())
                        .description("活躍連線數")
                        .register(meterRegistry);

                Gauge.builder("hikari.idle.connections", pool, HikariPoolMXBean::getIdleConnections)
                        .tag("pool", entry.getKey())
                        .description("空閒連線數")
                        .register(meterRegistry);

                Gauge.builder("hikari.threads.awaiting", pool, HikariPoolMXBean::getThreadsAwaitingConnection)
                        .tag("pool", entry.getKey())
                        .description("等待連線的執行緒數")
                        .register(meterRegistry);
            }
        }
    }
}

Prometheus + Grafana監控面板關鍵指標

指標 含義 告警閾值
hikari.active.connections 目前活躍連線數 > maxPool * 0.8
hikari.threads.awaiting 等待連線的執行緒數 > 5
datasource.switch.count 資料源切換頻率 異常飆升
datasource.query.duration 查詢耗時 P99 > 1s
hikari.connection.creation 連線建立速率 持續高頻建立

生產級架構與最佳實踐

完整生產架構

┌─────────────────────────────────────────────────────────────┐
│                       Nginx / Gateway                        │
└──────────────────────────┬──────────────────────────────────┘
                           │
┌──────────────────────────▼──────────────────────────────────┐
│                    Spring Boot Application                    │
│                                                              │
│  ┌──────────────────────────────────────────────────────┐   │
│  │                 Controller Layer                      │   │
│  │        TenantFilter → @DS → @GlobalTransactional     │   │
│  └──────────────────────────┬───────────────────────────┘   │
│                             │                                │
│  ┌──────────────────────────▼───────────────────────────┐   │
│  │                 Service Layer                         │   │
│  │     @DSTransactional / @GlobalTransactional           │   │
│  └──────────────────────────┬───────────────────────────┘   │
│                             │                                │
│  ┌──────────────────────────▼───────────────────────────┐   │
│  │          DynamicRoutingDataSource                     │   │
│  │    ┌─────────┬──────────┬──────────┬──────────┐      │   │
│  │    │ master  │ slave_1  │ order_db │ tenant_X │      │   │
│  │    └────┬────┴────┬─────┴────┬─────┴────┬─────┘      │   │
│  └─────────┼─────────┼──────────┼──────────┼────────────┘   │
└────────────┼─────────┼──────────┼──────────┼────────────────┘
             │         │          │          │
        ┌────▼───┐ ┌──▼───┐ ┌───▼──┐ ┌────▼────┐
        │Master  │ │Slave │ │Order │ │Tenant   │
        │MySQL   │ │MySQL │ │MySQL │ │MySQL    │
        └────────┘ └──────┘ └──────┘ └─────────┘

常見陷阱總結

陷阱 現象 解決方案
@DS註解失效 AOP代理導致切換不生效 確保註解在public方法上,避免同類內部呼叫
事務內切換資料源 切換後仍在原資料源事務中 使用@DSTransactional替代@Transactional
ThreadLocal洩漏 非同步執行緒中資料源標識殘留 在finally中清理DynamicDataSourceContextHolder
連線池耗盡 高並發下連線等待逾時 按資料源QPS合理配置連線池大小
主從延遲讀舊資料 寫後立即讀從庫讀不到 寫後讀走主庫或使用@MasterOnly
分片鍵未命中全表掃描 未攜帶分片鍵的查詢路由到所有分片 強制要求分片鍵查詢或使用ES寬表
租戶ID未注入 SQL缺少tenant_id條件 全域租戶攔截器 + 白名單表
分散式事務逾時 Seata全域事務逾時回滾 合理設定timeout,避免長事務
動態資料源未關閉 租戶註銷後連線池未釋放 removeDataSource時關閉連線池
HikariCP配置不當 minimumIdle=maximumPoolSize導致資源浪費 區分核心與峰值配置

連線池配置參考

spring:
  datasource:
    dynamic:
      hikari:
        minimum-idle: 5
        maximum-pool-size: 20
        idle-timeout: 300000
        max-lifetime: 1800000
        connection-timeout: 30000
        connection-test-query: SELECT 1
        pool-name: DynamicHikariCP
      datasource:
        master:
          hikari:
            minimum-idle: 10
            maximum-pool-size: 50
        slave_1:
          hikari:
            minimum-idle: 10
            maximum-pool-size: 30
        slave_2:
          hikari:
            minimum-idle: 10
            maximum-pool-size: 30

配置加密

spring:
  datasource:
    dynamic:
      datasource:
        master:
          url: jdbc:mysql://mysql-master:3306/app_db
          username: ENC(g2U3x8vKpQ==)
          password: ENC(aB3dE7fG9hJ==)
@Configuration
public class DataSourceEncryptConfig {

    @Bean
    public DataSourcePropertySourceProcessor dataSourcePropertySourceProcessor() {
        return new DataSourcePropertySourceProcessor() {
            @Override
            public String decrypt(String cipherText) {
                if (cipherText.startsWith("ENC(")) {
                    String encrypted = cipherText.substring(4, cipherText.length() - 1);
                    return AesUtil.decrypt(encrypted, getSecretKey());
                }
                return cipherText;
            }
        };
    }
}

優雅停機

@Configuration
public class GracefulShutdownConfig {

    @Autowired
    private DynamicRoutingDataSource dynamicRoutingDataSource;

    @PreDestroy
    public void gracefulShutdown() {
        log.info("開始優雅關閉資料源...");

        Map<String, DataSource> dataSources = dynamicRoutingDataSource.getCurrentDataSources();
        for (Map.Entry<String, DataSource> entry : dataSources.entrySet()) {
            if (entry.getValue() instanceof HikariDataSource hikari) {
                log.info("關閉資料源[{}],活躍連線: {}", entry.getKey(), hikari.getHikariPoolMXBean().getActiveConnections());
                hikari.close();
            }
        }

        log.info("所有資料源已關閉");
    }
}

多環境配置管理

# application-dev.yml
spring:
  datasource:
    dynamic:
      strict: false
      datasource:
        master:
          url: jdbc:mysql://localhost:3306/dev_db

# application-prod.yml
spring:
  datasource:
    dynamic:
      strict: true
      hikari:
        minimum-idle: 10
        maximum-pool-size: 50
      datasource:
        master:
          url: jdbc:mysql://mysql-master.internal:3306/prod_db
          username: ${DB_MASTER_USER}
          password: ${DB_MASTER_PWD}

總結

Spring Boot 3 + MyBatis-Plus多資料源架構是企業級Java應用的標配方案。核心選型建議:

  • 中小專案dynamic-datasource-spring-boot-starter + 註解切換,簡單高效
  • 中大型專案ShardingSphere-JDBC + 分庫分表,功能全面
  • 跨語言專案ShardingSphere-Proxy 獨立代理層
  • 事務要求:單庫用@DSTransactional,跨庫用Seata AT,非同步用MQ最終一致性
  • 多租戶:大客戶Schema隔離,小客戶Table隔離 + MyBatis-Plus租戶外掛

多資料源不是銀彈,引入前先評估是否真的需要。能用單庫解決的,不要過度設計。

本站提供瀏覽器本地工具,免註冊即可試用 →

#MyBatis-Plus#多数据源#分库分表#读写分离#ShardingSphere#Spring Boot