Spring Boot 3 + MyBatis-Plus多数据源架构全攻略

后端开发

为什么多数据源是企业级刚需

单数据源架构在互联网早期够用,但随着业务规模膨胀,一个数据库扛不住所有流量和复杂度。多数据源不是"锦上添花",而是"不得不做"。

五大核心场景

场景 典型需求 数据源关系 示例
读写分离 读多写少,读压力分散 一主多从 电商商品详情页
业务分库 不同业务域物理隔离 平行独立 订单库 vs 用户库
分库分表 单表数据量过大 水平分片 日志表按月分表
多租户 不同租户数据隔离 一租户一库/一Schema SaaS平台
异构数据源 不同类型存储协作 MySQL + PG + ES 搜索+事务混合
┌──────────────────────────────────────────────────────────┐
│                    单数据源架构(早期)                      │
│                                                          │
│    App ──────► MySQL (读写全在这里)                        │
│                                                          │
│    问题:读写竞争、单表膨胀、无法水平扩展                      │
└──────────────────────────────────────────────────────────┘

┌──────────────────────────────────────────────────────────┐
│                    多数据源架构(演进)                      │
│                                                          │
│    App ──┬──► MySQL-Master (写)                           │
│           ├──► MySQL-Slave1 (读)                          │
│           ├──► MySQL-Slave2 (读)                          │
│           ├──► OrderDB (订单库)                            │
│           ├──► UserDB (用户库)                             │
│           └──► Elasticsearch (搜索)                       │
└──────────────────────────────────────────────────────────┘

真实案例: 某社交平台日活500万,单库QPS峰值12万,读写分离+业务分库后,主库QPS降至3万,整体吞吐提升4倍。


多数据源架构三种模式

模式对比总览

维度 应用内多数据源 JDBC层代理 代理层独立部署
代表方案 dynamic-datasource ShardingSphere-JDBC ShardingSphere-Proxy / MyCat
侵入性 低(注解/配置) 中(替换DataSource) 无(独立进程)
运维复杂度
性能损耗 极低 中(网络跳转)
功能丰富度 基础切换 分片+读写分离+加密 全功能
适用规模 中小项目 中大型项目 大型/跨语言项目
语言绑定 Java Java 语言无关

模式一:应用内多数据源

┌─────────────────────────────────────────────┐
│              Spring Boot Application          │
│                                              │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  │
│  │ DataSource│  │ DataSource│  │ DataSource│  │
│  │  master   │  │  slave_1  │  │  slave_2  │  │
│  └─────┬────┘  └─────┬────┘  └─────┬────┘  │
│        │             │             │        │
│  ┌─────▼─────────────▼─────────────▼────┐   │
│  │     DynamicRoutingDataSource          │   │
│  │  (ThreadLocal + @DS 注解路由)          │   │
│  └──────────────────────────────────────┘   │
└─────────────────────────────────────────────┘

模式二:JDBC层代理

┌─────────────────────────────────────────────┐
│              Spring Boot Application          │
│                                              │
│  ┌──────────────────────────────────────┐   │
│  │     ShardingSphere-JDBC (JAR)         │   │
│  │  ┌────────────────────────────────┐  │   │
│  │  │  SQL解析 → 路由 → 改写 → 执行    │  │   │
│  │  └────────────────────────────────┘  │   │
│  └──────┬──────────┬──────────┬─────────┘   │
│         │          │          │              │
│    ds_0 │     ds_1 │     ds_2 │              │
└─────────┼──────────┼──────────┼──────────────┘
          ▼          ▼          ▼
      MySQL-0    MySQL-1    MySQL-2

模式三:代理层独立部署

┌────────────┐     ┌──────────────────┐     ┌─────────┐
│  App (Java) │────►│                  │     │ MySQL-0 │
├────────────┤     │  ShardingSphere  │────►├─────────┤
│  App (Go)  │────►│  -Proxy (独立)    │────►│ MySQL-1 │
├────────────┤     │                  │     ├─────────┤
│  App (Py)  │────►│  对应用透明        │────►│ MySQL-2 │
└────────────┘     └──────────────────┘     └─────────┘

dynamic-datasource-spring-boot-starter实战

这是目前最流行的应用内多数据源方案,由苞米豆团队维护,与MyBatis-Plus深度集成。

Maven依赖

<dependencies>
    <dependency>
        <groupId>com.baomidou</groupId>
        <artifactId>dynamic-datasource-spring-boot3-starter</artifactId>
        <version>4.3.1</version>
    </dependency>
    <dependency>
        <groupId>com.baomidou</groupId>
        <artifactId>mybatis-plus-spring-boot3-starter</artifactId>
        <version>3.5.7</version>
    </dependency>
    <dependency>
        <groupId>com.mysql</groupId>
        <artifactId>mysql-connector-j</artifactId>
        <scope>runtime</scope>
    </dependency>
</dependencies>

基础配置

spring:
  datasource:
    dynamic:
      primary: master
      strict: true
      datasource:
        master:
          url: jdbc:mysql://localhost:3306/db_master?useSSL=false&serverTimezone=Asia/Shanghai
          username: root
          password: master_pwd
          driver-class-name: com.mysql.cj.jdbc.Driver
        slave_1:
          url: jdbc:mysql://localhost:3307/db_slave?useSSL=false&serverTimezone=Asia/Shanghai
          username: root
          password: slave_pwd
          driver-class-name: com.mysql.cj.jdbc.Driver
        slave_2:
          url: jdbc:mysql://localhost:3308/db_slave?useSSL=false&serverTimezone=Asia/Shanghai
          username: root
          password: slave_pwd
          driver-class-name: com.mysql.cj.jdbc.Driver
      hikari:
        minimum-idle: 5
        maximum-pool-size: 20
        idle-timeout: 30000
        max-lifetime: 1800000
        connection-timeout: 30000
        pool-name: DynamicHikariCP

注解切换 @DS

@Service
public class UserServiceImpl implements UserService {

    @Autowired
    private UserMapper userMapper;

    @DS("master")
    @Override
    public void createUser(User user) {
        userMapper.insert(user);
    }

    @DS("slave_1")
    @Override
    public User getUserById(Long id) {
        return userMapper.selectById(id);
    }

    @DS("slave_2")
    @Override
    public List<User> listUsers(int pageNum, int pageSize) {
        Page<User> page = new Page<>(pageNum, pageSize);
        return userMapper.selectPage(page, null).getRecords();
    }
}

编程式切换

注解方式在复杂场景下不够灵活,编程式切换可以在运行时动态决定数据源:

@Service
public class OrderServiceImpl implements OrderService {

    @Autowired
    private OrderMapper orderMapper;

    @Override
    public Order getOrderWithDynamicDs(String tenantId) {
        String dsKey = "tenant_" + tenantId;
        return DynamicDataSourceContextHolder.push(dsKey, () -> {
            return orderMapper.selectById(tenantId);
        });
    }

    @Override
    public List<Order> batchQuery(List<String> tenantIds) {
        List<Order> result = new ArrayList<>();
        for (String tenantId : tenantIds) {
            DynamicDataSourceContextHolder.push("tenant_" + tenantId, () -> {
                result.addAll(orderMapper.selectList(null));
            });
        }
        return result;
    }
}

自定义动态数据源路由策略

@Component
public class CustomDsProcessor extends DsProcessor {

    @Autowired
    private TenantDataSourceManager tenantDsManager;

    @Override
    public String determineDsKey(MethodInvocation invocation, String key) {
        if (key.startsWith("tenant#")) {
            String tenantId = extractTenantId(invocation);
            return tenantDsManager.resolveDsKey(tenantId);
        }
        return key;
    }

    private String extractTenantId(MethodInvocation invocation) {
        Object[] args = invocation.getArguments();
        for (Object arg : args) {
            if (arg instanceof TenantAware tenantAware) {
                return tenantAware.getTenantId();
            }
        }
        throw new IllegalStateException("无法提取租户ID");
    }
}

AOP切面实现读写分离自动路由

@Aspect
@Component
@Order(Ordered.HIGHEST_PRECEDENCE)
public class ReadWriteSplitAspect {

    private static final Set<String> READ_METHOD_PREFIXES =
        Set.of("get", "query", "find", "list", "count", "page", "search");

    @Around("execution(* com.example..service.impl.*ServiceImpl.*(..))")
    public Object around(ProceedingJoinPoint point) throws Throwable {
        String methodName = point.getSignature().getName();
        boolean isRead = READ_METHOD_PREFIXES.stream()
            .anyMatch(methodName::startsWith);

        if (isRead) {
            DynamicDataSourceContextHolder.push("slave_1");
        } else {
            DynamicDataSourceContextHolder.push("master");
        }

        try {
            return point.proceed();
        } finally {
            DynamicDataSourceContextHolder.poll();
        }
    }
}

MyBatis-Plus多数据源配置

跨数据源关联查询

MyBatis-Plus默认不支持跨数据源JOIN,需要应用层组装:

@Service
public class OrderDetailServiceImpl implements OrderDetailService {

    @Autowired
    private OrderMapper orderMapper;

    @Autowired
    private UserMapper userMapper;

    @Autowired
    private ProductMapper productMapper;

    @DS("order_db")
    public OrderDetailVO getOrderDetail(Long orderId) {
        Order order = orderMapper.selectById(orderId);

        User user = DynamicDataSourceContextHolder.push("user_db", () -> {
            return userMapper.selectById(order.getUserId());
        });

        Product product = DynamicDataSourceContextHolder.push("product_db", () -> {
            return productMapper.selectById(order.getProductId());
        });

        return OrderDetailVO.builder()
                .order(order)
                .user(user)
                .product(product)
                .build();
    }
}

跨数据源批量查询优化

@Service
public class BatchCrossDsService {

    @Autowired
    private OrderMapper orderMapper;

    @Autowired
    private UserMapper userMapper;

    public List<OrderWithUserVO> batchQuery(List<Long> orderIds) {
        List<Order> orders = DynamicDataSourceContextHolder.push("order_db", () -> {
            return orderMapper.selectBatchIds(orderIds);
        });

        List<Long> userIds = orders.stream()
                .map(Order::getUserId)
                .distinct()
                .collect(Collectors.toList());

        Map<Long, User> userMap = DynamicDataSourceContextHolder.push("user_db", () -> {
            return userMapper.selectBatchIds(userIds).stream()
                    .collect(Collectors.toMap(User::getId, Function.identity()));
        });

        return orders.stream().map(order -> {
            OrderWithUserVO vo = new OrderWithUserVO();
            vo.setOrder(order);
            vo.setUser(userMap.get(order.getUserId()));
            return vo;
        }).collect(Collectors.toList());
    }
}

动态增减数据源

SaaS场景下,租户动态注册需要运行时添加数据源:

@Component
public class TenantDataSourceManager {

    @Autowired
    private DynamicRoutingDataSource dynamicRoutingDataSource;

    @Autowired
    private DataSourceProperty defaultProperty;

    private final Map<String, DataSourceProperty> tenantDsMap = new ConcurrentHashMap<>();

    public synchronized void addTenantDs(String tenantId, String url, String username, String password) {
        String dsKey = "tenant_" + tenantId;

        DataSourceProperty property = new DataSourceProperty();
        property.setUrl(url);
        property.setUsername(username);
        property.setPassword(password);
        property.setDriverClassName(defaultProperty.getDriverClassName());
        property.setLazy(true);

        HikariDataSource dataSource = property.getDataSource()
                .orElseThrow(() -> new RuntimeException("创建数据源失败"));
        dataSource.setMaximumPoolSize(10);
        dataSource.setMinimumIdle(2);

        dynamicRoutingDataSource.addDataSource(dsKey, dataSource);
        tenantDsMap.put(dsKey, property);

        log.info("租户[{}]数据源已添加: {}", tenantId, dsKey);
    }

    public synchronized void removeTenantDs(String tenantId) {
        String dsKey = "tenant_" + tenantId;
        dynamicRoutingDataSource.removeDataSource(dsKey);
        tenantDsMap.remove(dsKey);
        log.info("租户[{}]数据源已移除: {}", tenantId, dsKey);
    }

    public String resolveDsKey(String tenantId) {
        String dsKey = "tenant_" + tenantId;
        if (!tenantDsMap.containsKey(dsKey)) {
            throw new IllegalStateException("租户数据源不存在: " + dsKey);
        }
        return dsKey;
    }

    public Set<String> getAllTenantDsKeys() {
        return Collections.unmodifiableSet(tenantDsMap.keySet());
    }
}

租户数据源自动注册监听

@Component
@Slf4j
public class TenantDataSourceInitializer implements ApplicationRunner {

    @Autowired
    private TenantDataSourceManager tenantDsManager;

    @Autowired
    private TenantConfigRepository tenantConfigRepository;

    @Override
    public void run(ApplicationArguments args) {
        List<TenantConfig> tenants = tenantConfigRepository.findAllActive();
        for (TenantConfig tenant : tenants) {
            try {
                tenantDsManager.addTenantDs(
                    tenant.getTenantId(),
                    tenant.getDbUrl(),
                    tenant.getDbUsername(),
                    tenant.getDbPassword()
                );
            } catch (Exception e) {
                log.error("租户[{}]数据源初始化失败", tenant.getTenantId(), e);
            }
        }
        log.info("共初始化{}个租户数据源", tenants.size());
    }
}

读写分离:从配置到实现

完整读写分离架构

                    ┌─────────────────┐
                    │   Application    │
                    │  @DS("master")   │──写
                    │  @DS("slave")    │──读
                    └────────┬────────┘
                             │
              ┌──────────────┼──────────────┐
              │              │              │
        ┌─────▼─────┐ ┌─────▼─────┐ ┌─────▼─────┐
        │  Master    │ │  Slave-1  │ │  Slave-2  │
        │  (读写)    │ │  (只读)   │ │  (只读)   │
        └─────┬─────┘ └───────────┘ └───────────┘
              │
        ┌─────▼─────┐
        │  Binlog    │
        │  同步      │
        └───────────┘

读写分离YAML配置

spring:
  datasource:
    dynamic:
      primary: master
      strict: true
      datasource:
        master:
          url: jdbc:mysql://mysql-master:3306/app_db?useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8mb4
          username: app_writer
          password: ${MASTER_DB_PWD}
        slave_1:
          url: jdbc:mysql://mysql-slave-1:3306/app_db?useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8mb4
          username: app_reader
          password: ${SLAVE1_DB_PWD}
        slave_2:
          url: jdbc:mysql://mysql-slave-2:3306/app_db?useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8mb4
          username: app_reader
          password: ${SLAVE2_DB_PWD}

自定义负载均衡策略

默认轮询策略不够灵活,生产环境需要根据从库延迟动态路由:

@Component
public class SlaveLoadBalanceStrategy implements LoadBalanceStrategy {

    @Autowired
    private MySQLReplicationMonitor replicationMonitor;

    @Override
    public String determineDsKey(List<String> slaveDsKeys, String masterDsKey) {
        List<String> availableSlaves = slaveDsKeys.stream()
                .filter(key -> {
                    long delay = replicationMonitor.getReplicationDelay(key);
                    return delay < 1000;
                })
                .collect(Collectors.toList());

        if (availableSlaves.isEmpty()) {
            log.warn("所有从库延迟过高,降级到主库读取");
            return masterDsKey;
        }

        int index = (int) (System.currentTimeMillis() % availableSlaves.size());
        return availableSlaves.get(index);
    }
}

复制延迟监控

@Component
@Slf4j
public class MySQLReplicationMonitor {

    private final Map<String, Long> replicationDelayMap = new ConcurrentHashMap<>();

    @Scheduled(fixedDelay = 5000)
    public void monitorReplicationDelay() {
        List<String> slaveKeys = List.of("slave_1", "slave_2");
        for (String slaveKey : slaveKeys) {
            DynamicDataSourceContextHolder.push(slaveKey, () -> {
                try (Connection conn = dataSource.getConnection();
                     Statement stmt = conn.createStatement();
                     ResultSet rs = stmt.executeQuery("SHOW SLAVE STATUS")) {
                    if (rs.next()) {
                        long secondsBehindMaster = rs.getLong("Seconds_Behind_Master");
                        replicationDelayMap.put(slaveKey, secondsBehindMaster * 1000);
                        if (secondsBehindMaster > 5) {
                            log.warn("从库[{}]复制延迟: {}秒", slaveKey, secondsBehindMaster);
                        }
                    }
                } catch (SQLException e) {
                    replicationDelayMap.put(slaveKey, Long.MAX_VALUE);
                    log.error("从库[{}]状态检查失败", slaveKey, e);
                }
            });
        }
    }

    public long getReplicationDelay(String slaveKey) {
        return replicationDelayMap.getOrDefault(slaveKey, 0L);
    }
}

强制走主库注解

某些场景(如写入后立即读取)必须走主库:

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@DS("master")
public @interface MasterOnly {
}
@Service
public class PaymentServiceImpl implements PaymentService {

    @Autowired
    private PaymentMapper paymentMapper;

    @DS("master")
    @Override
    public Payment createPayment(PaymentDTO dto) {
        Payment payment = new Payment();
        BeanUtils.copyProperties(dto, payment);
        paymentMapper.insert(payment);
        return payment;
    }

    @MasterOnly
    @Override
    public Payment getPaymentAfterCreate(Long paymentId) {
        return paymentMapper.selectById(paymentId);
    }
}

分库分表:ShardingSphere-JDBC深度集成

分片架构

┌──────────────────────────────────────────────────────┐
│                   Application Layer                    │
│                                                       │
│  ┌─────────────────────────────────────────────────┐ │
│  │           ShardingSphere-JDBC                    │ │
│  │                                                  │ │
│  │  SQL: SELECT * FROM t_order WHERE user_id=1001  │ │
│  │         │                                       │ │
│  │    ┌────▼────┐                                  │ │
│  │    │ SQL解析  │                                  │ │
│  │    └────┬────┘                                  │ │
│  │    ┌────▼────┐                                  │ │
│  │    │ 路由引擎 │──► ds_0.t_order_1               │ │
│  │    └────┬────┘                                  │ │
│  │    ┌────▼────┐                                  │ │
│  │    │ SQL改写  │──► SELECT * FROM t_order_1 ...  │ │
│  │    └────┬────┘                                  │ │
│  │    ┌────▼────┐                                  │ │
│  │    │ 结果归并  │                                  │ │
│  │    └─────────┘                                  │ │
│  └─────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────┘

Maven依赖

<dependencies>
    <dependency>
        <groupId>org.apache.shardingsphere</groupId>
        <artifactId>shardingsphere-jdbc-core</artifactId>
        <version>5.5.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.shardingsphere</groupId>
        <artifactId>shardingsphere-jdbc-core-spring-boot-starter</artifactId>
        <version>5.5.1</version>
    </dependency>
</dependencies>

分库分表配置

spring:
  shardingsphere:
    mode:
      type: Standalone
      repository:
        type: JDBC
    datasource:
      names: ds_0,ds_1
      ds_0:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://localhost:3306/shard_db_0?useSSL=false&serverTimezone=Asia/Shanghai
        username: root
        password: ds0_pwd
      ds_1:
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.mysql.cj.jdbc.Driver
        jdbc-url: jdbc:mysql://localhost:3306/shard_db_1?useSSL=false&serverTimezone=Asia/Shanghai
        username: root
        password: ds1_pwd
    rules:
      sharding:
        tables:
          t_order:
            actual-data-nodes: ds_${0..1}.t_order_${0..15}
            table-strategy:
              standard:
                sharding-column: order_id
                sharding-algorithm-name: order-table-inline
            database-strategy:
              standard:
                sharding-column: user_id
                sharding-algorithm-name: order-db-mod
            key-generate-strategy:
              column: order_id
              key-generator-name: snowflake
          t_order_item:
            actual-data-nodes: ds_${0..1}.t_order_item_${0..15}
            table-strategy:
              standard:
                sharding-column: order_id
                sharding-algorithm-name: order-item-table-inline
            database-strategy:
              standard:
                sharding-column: user_id
                sharding-algorithm-name: order-db-mod
        sharding-algorithms:
          order-table-inline:
            type: INLINE
            props:
              algorithm-expression: t_order_${order_id % 16}
          order-item-table-inline:
            type: INLINE
            props:
              algorithm-expression: t_order_item_${order_id % 16}
          order-db-mod:
            type: MOD
            props:
              sharding-count: 2
        key-generators:
          snowflake:
            type: SNOWFLAKE
            props:
              worker-id: 1
    props:
      sql-show: true

自定义分片算法

标准取模算法在扩容时需要数据迁移,一致性哈希算法可以减少迁移量:

@Component
public class ConsistentHashShardingAlgorithm implements StandardShardingAlgorithm<Long> {

    private ConsistentHash<String> consistentHash;

    @Override
    public void init(Properties props) {
        int virtualNodes = Integer.parseInt(props.getProperty("virtual-nodes", "160"));
        List<String> actualDataNodes = Arrays.stream(props.getProperty("actual-data-nodes").split(","))
                .collect(Collectors.toList());

        HashFunction hashFunction = new Murmur3HashFunction();
        consistentHash = new ConsistentHash<>(hashFunction, virtualNodes, actualDataNodes);
    }

    @Override
    public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<Long> shardingValue) {
        return consistentHash.get(shardingValue.getValue());
    }

    @Override
    public Collection<String> doSharding(Collection<String> availableTargetNames, RangeShardingValue<Long> shardingValue) {
        return availableTargetNames;
    }

    @Override
    public String getType() {
        return "CONSISTENT_HASH";
    }
}

分片算法注册

@Configuration
public class ShardingAlgorithmConfiguration {

    @Bean
    public ShardingAlgorithm<?> consistentHashAlgorithm() {
        return new ConsistentHashShardingAlgorithm();
    }
}

MyBatis-Plus与ShardingSphere集成注意事项

@Configuration
@MapperScan("com.example.mapper")
public class MybatisPlusConfig {

    @Bean
    public MybatisPlusInterceptor mybatisPlusInterceptor() {
        MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
        interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
        return interceptor;
    }

    @Bean
    public IdentifierGenerator identifierGenerator() {
        return new CustomIdGenerator();
    }
}

public class CustomIdGenerator implements IdentifierGenerator {
    @Override
    public Number nextId(Object entity) {
        return ShardingSphereIdGenerator.nextId();
    }
}

多租户数据隔离:Schema级与Table级

两种隔离模式对比

维度 Schema级隔离 Table级隔离
隔离级别 较高 较低
资源开销 每租户独立Schema 共享Schema,表名区分
运维复杂度
数据泄露风险 中(需严格过滤)
适用场景 大客户/金融 小客户/SaaS
扩展性 Schema数量有限 表数量有限

Schema级隔离实现

@Component
public class SchemaTenantInterceptor implements InnerInterceptor {

    @Override
    public void beforeQuery(Executor executor, MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) {
        String tenantId = TenantContextHolder.getTenantId();
        if (tenantId != null) {
            DynamicDataSourceContextHolder.push("tenant_" + tenantId);
        }
    }
}

Table级隔离实现(MyBatis-Plus租户插件)

@Configuration
public class TenantConfig {

    @Bean
    public MybatisPlusInterceptor mybatisPlusInterceptor(TenantLineInnerInterceptor tenantInterceptor) {
        MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
        interceptor.addInnerInterceptor(tenantInterceptor);
        interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
        return interceptor;
    }

    @Bean
    public TenantLineInnerInterceptor tenantLineInnerInterceptor() {
        return new TenantLineInnerInterceptor(new TenantLineHandler() {
            @Override
            public Expression getTenantId() {
                Long tenantId = TenantContextHolder.getTenantId();
                if (tenantId == null) {
                    throw new IllegalStateException("租户ID不能为空");
                }
                return new LongValue(tenantId);
            }

            @Override
            public boolean ignoreTable(String tableName) {
                return Set.of("sys_config", "sys_dict", "sys_menu")
                        .contains(tableName);
            }

            @Override
            public String getTenantIdColumn() {
                return "tenant_id";
            }
        });
    }
}

租户上下文管理

public class TenantContextHolder {

    private static final ThreadLocal<Long> TENANT_ID = new ThreadLocal<>();

    public static void setTenantId(Long tenantId) {
        TENANT_ID.set(tenantId);
    }

    public static Long getTenantId() {
        return TENANT_ID.get();
    }

    public static void clear() {
        TENANT_ID.remove();
    }
}

Web层租户ID注入

@Component
@WebFilter(urlPatterns = "/*")
public class TenantFilter implements Filter {

    @Override
    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
            throws IOException, ServletException {
        HttpServletRequest httpRequest = (HttpServletRequest) request;
        String tenantHeader = httpRequest.getHeader("X-Tenant-Id");

        if (tenantHeader != null) {
            try {
                TenantContextHolder.setTenantId(Long.parseLong(tenantHeader));
            } catch (NumberFormatException e) {
                throw new ServletException("无效的租户ID: " + tenantHeader);
            }
        }

        try {
            chain.doFilter(request, response);
        } finally {
            TenantContextHolder.clear();
        }
    }
}

事务管理:本地事务/Seata分布式事务/MQ最终一致性

事务方案对比

方案 一致性 性能 复杂度 适用场景
本地事务(@DSTransactional) 强一致 单数据源或可容忍短暂不一致
Seata AT模式 强一致 跨库强一致要求
Seata TCC模式 强一致 中低 很高 资金/库存等核心业务
MQ最终一致性 最终一致 异步场景/对延迟容忍
Saga模式 最终一致 长流程编排

本地事务:@DSTransactional

dynamic-datasource提供了@DSTransactional,在切换数据源时保持事务:

@Service
public class OrderServiceImpl implements OrderService {

    @Autowired
    private OrderMapper orderMapper;

    @Autowired
    private OrderItemMapper orderItemMapper;

    @DSTransactional
    @DS("master")
    @Override
    public void createOrder(OrderCreateDTO dto) {
        Order order = new Order();
        order.setOrderNo(generateOrderNo());
        order.setUserId(dto.getUserId());
        order.setStatus(OrderStatus.CREATED);
        orderMapper.insert(order);

        for (OrderItemDTO itemDTO : dto.getItems()) {
            OrderItem item = new OrderItem();
            item.setOrderId(order.getId());
            item.setProductId(itemDTO.getProductId());
            item.setQuantity(itemDTO.getQuantity());
            orderItemMapper.insert(item);
        }
    }
}

Seata AT模式集成

┌──────────┐    ┌──────────┐    ┌──────────┐
│  Service A│───►│  Service B│───►│  Service C│
│  (订单库)  │    │  (库存库)  │    │  (账户库)  │
└─────┬────┘    └─────┬────┘    └─────┬────┘
      │               │               │
      └───────────────┼───────────────┘
                      │
              ┌───────▼───────┐
              │  Seata Server  │
              │  (TC协调器)    │
              └───────────────┘
<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
    <version>2023.0.3.2</version>
</dependency>
seata:
  enabled: true
  application-id: order-service
  tx-service-group: my_test_tx_group
  service:
    vgroup-mapping:
      my_test_tx_group: default
  registry:
    type: nacos
    nacos:
      server-addr: localhost:8848
      namespace: seata
      group: SEATA_GROUP
  config:
    type: nacos
    nacos:
      server-addr: localhost:8848
      namespace: seata
      group: SEATA_GROUP
@Service
public class BusinessServiceImpl implements BusinessService {

    @Autowired
    private OrderService orderService;

    @Autowired
    private StockService stockService;

    @Autowired
    private AccountService accountService;

    @GlobalTransactional(name = "create-order-tx", rollbackFor = Exception.class)
    @Override
    public void purchase(PurchaseDTO dto) {
        orderService.createOrder(dto);
        stockService.deductStock(dto.getProductId(), dto.getQuantity());
        accountService.deductBalance(dto.getUserId(), dto.getAmount());
    }
}

MQ最终一致性

@Service
@Slf4j
public class OrderMQService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Autowired
    private OrderMapper orderMapper;

    @Autowired
    private TransactionLogMapper transactionLogMapper;

    @Transactional
    public void createOrderWithMQ(OrderCreateDTO dto) {
        Order order = new Order();
        BeanUtils.copyProperties(dto, order);
        order.setStatus(OrderStatus.CREATED);
        orderMapper.insert(order);

        TransactionLog log = new TransactionLog();
        log.setTransactionId(UUID.randomUUID().toString());
        log.setBizType("CREATE_ORDER");
        log.setBizId(order.getId());
        log.setPayload(JSON.toJSONString(dto));
        transactionLogMapper.insert(log);
    }

    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void onOrderCreated(OrderCreatedEvent event) {
        rocketMQTemplate.asyncSend(
            "order-create-topic",
            MessageBuilder.withPayload(event)
                .setHeader("KEYS", event.getOrderNo())
                .build(),
            new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    log.info("订单消息发送成功: {}", event.getOrderNo());
                }

                @Override
                public void onException(Throwable e) {
                    log.error("订单消息发送失败: {}", event.getOrderNo(), e);
                }
            }
        );
    }
}

消息补偿定时任务

@Component
@Slf4j
public class TransactionLogCompensator {

    @Autowired
    private TransactionLogMapper transactionLogMapper;

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Scheduled(fixedDelay = 30000)
    public void compensate() {
        List<TransactionLog> pendingLogs = transactionLogMapper.selectList(
            new LambdaQueryWrapper<TransactionLog>()
                .eq(TransactionLog::getStatus, 0)
                .lt(TransactionLog::getRetryCount, 5)
                .le(TransactionLog::getCreateTime, LocalDateTime.now().minusMinutes(1))
        );

        for (TransactionLog log : pendingLogs) {
            try {
                rocketMQTemplate.syncSend("order-create-topic", log.getPayload());
                log.setStatus(1);
                transactionLogMapper.updateById(log);
            } catch (Exception e) {
                log.setRetryCount(log.getRetryCount() + 1);
                transactionLogMapper.updateById(log);
                log.warn("事务日志[{}]补偿失败,重试次数: {}", log.getTransactionId(), log.getRetryCount());
            }
        }
    }
}

监控与运维:数据源健康检查+Micrometer指标采集

数据源健康检查

@Component
@Slf4j
public class DataSourceHealthChecker {

    @Autowired
    private DynamicRoutingDataSource dynamicRoutingDataSource;

    @Scheduled(fixedDelay = 10000)
    public void checkAllDataSources() {
        Map<String, DataSource> dataSources = dynamicRoutingDataSource.getCurrentDataSources();

        for (Map.Entry<String, DataSource> entry : dataSources.entrySet()) {
            String dsKey = entry.getKey();
            try (Connection conn = entry.getValue().getConnection();
                 Statement stmt = conn.createStatement();
                 ResultSet rs = stmt.executeQuery("SELECT 1")) {

                if (rs.next() && rs.getInt(1) == 1) {
                    log.debug("数据源[{}]健康检查通过", dsKey);
                }
            } catch (SQLException e) {
                log.error("数据源[{}]健康检查失败", dsKey, e);
                alertService.sendAlert("数据源异常: " + dsKey, e.getMessage());
            }
        }
    }
}

Micrometer指标采集

@Configuration
public class DataSourceMetricsConfig {

    @Bean
    public MeterDataSourceAspect meterDataSourceAspect(MeterRegistry registry) {
        return new MeterDataSourceAspect(registry);
    }

    public static class MeterDataSourceAspect {

        private final MeterRegistry registry;
        private final Counter dsSwitchCounter;
        private final Timer dsQueryTimer;

        public MeterDataSourceAspect(MeterRegistry registry) {
            this.registry = registry;
            this.dsSwitchCounter = Counter.builder("datasource.switch.count")
                    .description("数据源切换次数")
                    .register(registry);
            this.dsQueryTimer = Timer.builder("datasource.query.duration")
                    .description("数据源查询耗时")
                    .tag("type", "dynamic")
                    .register(registry);
        }

        public void recordSwitch(String dsKey) {
            dsSwitchCounter.increment();
            registry.counter("datasource.switch.count", "ds", dsKey).increment();
        }

        public Timer.Sample startQuery() {
            return Timer.start(registry);
        }

        public void endQuery(Timer.Sample sample, String dsKey) {
            sample.stop(Timer.builder("datasource.query.duration")
                    .tag("ds", dsKey)
                    .register(registry));
        }
    }
}

HikariCP连接池监控

spring:
  datasource:
    dynamic:
      hikari:
        register-mbeans: true
        metrics-tracker: true
@Component
public class HikariMetricsExporter {

    @Autowired
    private DynamicRoutingDataSource dynamicRoutingDataSource;

    @Autowired
    private MeterRegistry meterRegistry;

    @Scheduled(fixedDelay = 15000)
    public void exportHikariMetrics() {
        Map<String, DataSource> dataSources = dynamicRoutingDataSource.getCurrentDataSources();

        for (Map.Entry<String, DataSource> entry : dataSources.entrySet()) {
            if (entry.getValue() instanceof HikariDataSource hikari) {
                HikariPoolMXBean pool = hikari.getHikariPoolMXBean();

                Gauge.builder("hikari.active.connections", pool, HikariPoolMXBean::getActiveConnections)
                        .tag("pool", entry.getKey())
                        .description("活跃连接数")
                        .register(meterRegistry);

                Gauge.builder("hikari.idle.connections", pool, HikariPoolMXBean::getIdleConnections)
                        .tag("pool", entry.getKey())
                        .description("空闲连接数")
                        .register(meterRegistry);

                Gauge.builder("hikari.threads.awaiting", pool, HikariPoolMXBean::getThreadsAwaitingConnection)
                        .tag("pool", entry.getKey())
                        .description("等待连接的线程数")
                        .register(meterRegistry);
            }
        }
    }
}

Prometheus + Grafana监控面板关键指标

指标 含义 告警阈值
hikari.active.connections 当前活跃连接数 > maxPool * 0.8
hikari.threads.awaiting 等待连接的线程数 > 5
datasource.switch.count 数据源切换频率 异常飙升
datasource.query.duration 查询耗时 P99 > 1s
hikari.connection.creation 连接创建速率 持续高频创建

生产级架构与最佳实践

完整生产架构

┌─────────────────────────────────────────────────────────────┐
│                       Nginx / Gateway                        │
└──────────────────────────┬──────────────────────────────────┘
                           │
┌──────────────────────────▼──────────────────────────────────┐
│                    Spring Boot Application                    │
│                                                              │
│  ┌──────────────────────────────────────────────────────┐   │
│  │                 Controller Layer                      │   │
│  │        TenantFilter → @DS → @GlobalTransactional     │   │
│  └──────────────────────────┬───────────────────────────┘   │
│                             │                                │
│  ┌──────────────────────────▼───────────────────────────┐   │
│  │                 Service Layer                         │   │
│  │     @DSTransactional / @GlobalTransactional           │   │
│  └──────────────────────────┬───────────────────────────┘   │
│                             │                                │
│  ┌──────────────────────────▼───────────────────────────┐   │
│  │          DynamicRoutingDataSource                     │   │
│  │    ┌─────────┬──────────┬──────────┬──────────┐      │   │
│  │    │ master  │ slave_1  │ order_db │ tenant_X │      │   │
│  │    └────┬────┴────┬─────┴────┬─────┴────┬─────┘      │   │
│  └─────────┼─────────┼──────────┼──────────┼────────────┘   │
└────────────┼─────────┼──────────┼──────────┼────────────────┘
             │         │          │          │
        ┌────▼───┐ ┌──▼───┐ ┌───▼──┐ ┌────▼────┐
        │Master  │ │Slave │ │Order │ │Tenant   │
        │MySQL   │ │MySQL │ │MySQL │ │MySQL    │
        └────────┘ └──────┘ └──────┘ └─────────┘

常见陷阱总结

陷阱 现象 解决方案
@DS注解失效 AOP代理导致切换不生效 确保注解在public方法上,避免同类内部调用
事务内切换数据源 切换后仍在原数据源事务中 使用@DSTransactional替代@Transactional
ThreadLocal泄漏 异步线程中数据源标识残留 在finally中清理DynamicDataSourceContextHolder
连接池耗尽 高并发下连接等待超时 按数据源QPS合理配置连接池大小
主从延迟读旧数据 写后立即读从库读不到 写后读走主库或使用@MasterOnly
分片键未命中全表扫描 未携带分片键的查询路由到所有分片 强制要求分片键查询或使用ES宽表
租户ID未注入 SQL缺少tenant_id条件 全局租户拦截器 + 白名单表
分布式事务超时 Seata全局事务超时回滚 合理设置timeout,避免长事务
动态数据源未关闭 租户注销后连接池未释放 removeDataSource时关闭连接池
HikariCP配置不当 minimumIdle=maximumPoolSize导致资源浪费 区分核心与峰值配置

连接池配置参考

spring:
  datasource:
    dynamic:
      hikari:
        minimum-idle: 5
        maximum-pool-size: 20
        idle-timeout: 300000
        max-lifetime: 1800000
        connection-timeout: 30000
        connection-test-query: SELECT 1
        pool-name: DynamicHikariCP
      datasource:
        master:
          hikari:
            minimum-idle: 10
            maximum-pool-size: 50
        slave_1:
          hikari:
            minimum-idle: 10
            maximum-pool-size: 30
        slave_2:
          hikari:
            minimum-idle: 10
            maximum-pool-size: 30

配置加密

spring:
  datasource:
    dynamic:
      datasource:
        master:
          url: jdbc:mysql://mysql-master:3306/app_db
          username: ENC(g2U3x8vKpQ==)
          password: ENC(aB3dE7fG9hJ==)
@Configuration
public class DataSourceEncryptConfig {

    @Bean
    public DataSourcePropertySourceProcessor dataSourcePropertySourceProcessor() {
        return new DataSourcePropertySourceProcessor() {
            @Override
            public String decrypt(String cipherText) {
                if (cipherText.startsWith("ENC(")) {
                    String encrypted = cipherText.substring(4, cipherText.length() - 1);
                    return AesUtil.decrypt(encrypted, getSecretKey());
                }
                return cipherText;
            }
        };
    }
}

优雅停机

@Configuration
public class GracefulShutdownConfig {

    @Autowired
    private DynamicRoutingDataSource dynamicRoutingDataSource;

    @PreDestroy
    public void gracefulShutdown() {
        log.info("开始优雅关闭数据源...");

        Map<String, DataSource> dataSources = dynamicRoutingDataSource.getCurrentDataSources();
        for (Map.Entry<String, DataSource> entry : dataSources.entrySet()) {
            if (entry.getValue() instanceof HikariDataSource hikari) {
                log.info("关闭数据源[{}],活跃连接: {}", entry.getKey(), hikari.getHikariPoolMXBean().getActiveConnections());
                hikari.close();
            }
        }

        log.info("所有数据源已关闭");
    }
}

多环境配置管理

# application-dev.yml
spring:
  datasource:
    dynamic:
      strict: false
      datasource:
        master:
          url: jdbc:mysql://localhost:3306/dev_db

# application-prod.yml
spring:
  datasource:
    dynamic:
      strict: true
      hikari:
        minimum-idle: 10
        maximum-pool-size: 50
      datasource:
        master:
          url: jdbc:mysql://mysql-master.internal:3306/prod_db
          username: ${DB_MASTER_USER}
          password: ${DB_MASTER_PWD}

总结

Spring Boot 3 + MyBatis-Plus多数据源架构是企业级Java应用的标配方案。核心选型建议:

  • 中小项目dynamic-datasource-spring-boot-starter + 注解切换,简单高效
  • 中大型项目ShardingSphere-JDBC + 分库分表,功能全面
  • 跨语言项目ShardingSphere-Proxy 独立代理层
  • 事务要求:单库用@DSTransactional,跨库用Seata AT,异步用MQ最终一致性
  • 多租户:大客户Schema隔离,小客户Table隔离 + MyBatis-Plus租户插件

多数据源不是银弹,引入前先评估是否真的需要。能用单库解决的,不要过度设计。

本站提供浏览器本地工具,免注册即可试用 →

#MyBatis-Plus#多数据源#分库分表#读写分离#ShardingSphere#Spring Boot