Spring Boot 3 + MyBatis-Plus多数据源架构全攻略
后端开发
为什么多数据源是企业级刚需
单数据源架构在互联网早期够用,但随着业务规模膨胀,一个数据库扛不住所有流量和复杂度。多数据源不是"锦上添花",而是"不得不做"。
五大核心场景
| 场景 | 典型需求 | 数据源关系 | 示例 |
|---|---|---|---|
| 读写分离 | 读多写少,读压力分散 | 一主多从 | 电商商品详情页 |
| 业务分库 | 不同业务域物理隔离 | 平行独立 | 订单库 vs 用户库 |
| 分库分表 | 单表数据量过大 | 水平分片 | 日志表按月分表 |
| 多租户 | 不同租户数据隔离 | 一租户一库/一Schema | SaaS平台 |
| 异构数据源 | 不同类型存储协作 | MySQL + PG + ES | 搜索+事务混合 |
┌──────────────────────────────────────────────────────────┐
│ 单数据源架构(早期) │
│ │
│ App ──────► MySQL (读写全在这里) │
│ │
│ 问题:读写竞争、单表膨胀、无法水平扩展 │
└──────────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────┐
│ 多数据源架构(演进) │
│ │
│ App ──┬──► MySQL-Master (写) │
│ ├──► MySQL-Slave1 (读) │
│ ├──► MySQL-Slave2 (读) │
│ ├──► OrderDB (订单库) │
│ ├──► UserDB (用户库) │
│ └──► Elasticsearch (搜索) │
└──────────────────────────────────────────────────────────┘
真实案例: 某社交平台日活500万,单库QPS峰值12万,读写分离+业务分库后,主库QPS降至3万,整体吞吐提升4倍。
多数据源架构三种模式
模式对比总览
| 维度 | 应用内多数据源 | JDBC层代理 | 代理层独立部署 |
|---|---|---|---|
| 代表方案 | dynamic-datasource | ShardingSphere-JDBC | ShardingSphere-Proxy / MyCat |
| 侵入性 | 低(注解/配置) | 中(替换DataSource) | 无(独立进程) |
| 运维复杂度 | 低 | 中 | 高 |
| 性能损耗 | 极低 | 低 | 中(网络跳转) |
| 功能丰富度 | 基础切换 | 分片+读写分离+加密 | 全功能 |
| 适用规模 | 中小项目 | 中大型项目 | 大型/跨语言项目 |
| 语言绑定 | Java | Java | 语言无关 |
模式一:应用内多数据源
┌─────────────────────────────────────────────┐
│ Spring Boot Application │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ DataSource│ │ DataSource│ │ DataSource│ │
│ │ master │ │ slave_1 │ │ slave_2 │ │
│ └─────┬────┘ └─────┬────┘ └─────┬────┘ │
│ │ │ │ │
│ ┌─────▼─────────────▼─────────────▼────┐ │
│ │ DynamicRoutingDataSource │ │
│ │ (ThreadLocal + @DS 注解路由) │ │
│ └──────────────────────────────────────┘ │
└─────────────────────────────────────────────┘
模式二:JDBC层代理
┌─────────────────────────────────────────────┐
│ Spring Boot Application │
│ │
│ ┌──────────────────────────────────────┐ │
│ │ ShardingSphere-JDBC (JAR) │ │
│ │ ┌────────────────────────────────┐ │ │
│ │ │ SQL解析 → 路由 → 改写 → 执行 │ │ │
│ │ └────────────────────────────────┘ │ │
│ └──────┬──────────┬──────────┬─────────┘ │
│ │ │ │ │
│ ds_0 │ ds_1 │ ds_2 │ │
└─────────┼──────────┼──────────┼──────────────┘
▼ ▼ ▼
MySQL-0 MySQL-1 MySQL-2
模式三:代理层独立部署
┌────────────┐ ┌──────────────────┐ ┌─────────┐
│ App (Java) │────►│ │ │ MySQL-0 │
├────────────┤ │ ShardingSphere │────►├─────────┤
│ App (Go) │────►│ -Proxy (独立) │────►│ MySQL-1 │
├────────────┤ │ │ ├─────────┤
│ App (Py) │────►│ 对应用透明 │────►│ MySQL-2 │
└────────────┘ └──────────────────┘ └─────────┘
dynamic-datasource-spring-boot-starter实战
这是目前最流行的应用内多数据源方案,由苞米豆团队维护,与MyBatis-Plus深度集成。
Maven依赖
<dependencies>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>dynamic-datasource-spring-boot3-starter</artifactId>
<version>4.3.1</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-spring-boot3-starter</artifactId>
<version>3.5.7</version>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
基础配置
spring:
datasource:
dynamic:
primary: master
strict: true
datasource:
master:
url: jdbc:mysql://localhost:3306/db_master?useSSL=false&serverTimezone=Asia/Shanghai
username: root
password: master_pwd
driver-class-name: com.mysql.cj.jdbc.Driver
slave_1:
url: jdbc:mysql://localhost:3307/db_slave?useSSL=false&serverTimezone=Asia/Shanghai
username: root
password: slave_pwd
driver-class-name: com.mysql.cj.jdbc.Driver
slave_2:
url: jdbc:mysql://localhost:3308/db_slave?useSSL=false&serverTimezone=Asia/Shanghai
username: root
password: slave_pwd
driver-class-name: com.mysql.cj.jdbc.Driver
hikari:
minimum-idle: 5
maximum-pool-size: 20
idle-timeout: 30000
max-lifetime: 1800000
connection-timeout: 30000
pool-name: DynamicHikariCP
注解切换 @DS
@Service
public class UserServiceImpl implements UserService {
@Autowired
private UserMapper userMapper;
@DS("master")
@Override
public void createUser(User user) {
userMapper.insert(user);
}
@DS("slave_1")
@Override
public User getUserById(Long id) {
return userMapper.selectById(id);
}
@DS("slave_2")
@Override
public List<User> listUsers(int pageNum, int pageSize) {
Page<User> page = new Page<>(pageNum, pageSize);
return userMapper.selectPage(page, null).getRecords();
}
}
编程式切换
注解方式在复杂场景下不够灵活,编程式切换可以在运行时动态决定数据源:
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
private OrderMapper orderMapper;
@Override
public Order getOrderWithDynamicDs(String tenantId) {
String dsKey = "tenant_" + tenantId;
return DynamicDataSourceContextHolder.push(dsKey, () -> {
return orderMapper.selectById(tenantId);
});
}
@Override
public List<Order> batchQuery(List<String> tenantIds) {
List<Order> result = new ArrayList<>();
for (String tenantId : tenantIds) {
DynamicDataSourceContextHolder.push("tenant_" + tenantId, () -> {
result.addAll(orderMapper.selectList(null));
});
}
return result;
}
}
自定义动态数据源路由策略
@Component
public class CustomDsProcessor extends DsProcessor {
@Autowired
private TenantDataSourceManager tenantDsManager;
@Override
public String determineDsKey(MethodInvocation invocation, String key) {
if (key.startsWith("tenant#")) {
String tenantId = extractTenantId(invocation);
return tenantDsManager.resolveDsKey(tenantId);
}
return key;
}
private String extractTenantId(MethodInvocation invocation) {
Object[] args = invocation.getArguments();
for (Object arg : args) {
if (arg instanceof TenantAware tenantAware) {
return tenantAware.getTenantId();
}
}
throw new IllegalStateException("无法提取租户ID");
}
}
AOP切面实现读写分离自动路由
@Aspect
@Component
@Order(Ordered.HIGHEST_PRECEDENCE)
public class ReadWriteSplitAspect {
private static final Set<String> READ_METHOD_PREFIXES =
Set.of("get", "query", "find", "list", "count", "page", "search");
@Around("execution(* com.example..service.impl.*ServiceImpl.*(..))")
public Object around(ProceedingJoinPoint point) throws Throwable {
String methodName = point.getSignature().getName();
boolean isRead = READ_METHOD_PREFIXES.stream()
.anyMatch(methodName::startsWith);
if (isRead) {
DynamicDataSourceContextHolder.push("slave_1");
} else {
DynamicDataSourceContextHolder.push("master");
}
try {
return point.proceed();
} finally {
DynamicDataSourceContextHolder.poll();
}
}
}
MyBatis-Plus多数据源配置
跨数据源关联查询
MyBatis-Plus默认不支持跨数据源JOIN,需要应用层组装:
@Service
public class OrderDetailServiceImpl implements OrderDetailService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private UserMapper userMapper;
@Autowired
private ProductMapper productMapper;
@DS("order_db")
public OrderDetailVO getOrderDetail(Long orderId) {
Order order = orderMapper.selectById(orderId);
User user = DynamicDataSourceContextHolder.push("user_db", () -> {
return userMapper.selectById(order.getUserId());
});
Product product = DynamicDataSourceContextHolder.push("product_db", () -> {
return productMapper.selectById(order.getProductId());
});
return OrderDetailVO.builder()
.order(order)
.user(user)
.product(product)
.build();
}
}
跨数据源批量查询优化
@Service
public class BatchCrossDsService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private UserMapper userMapper;
public List<OrderWithUserVO> batchQuery(List<Long> orderIds) {
List<Order> orders = DynamicDataSourceContextHolder.push("order_db", () -> {
return orderMapper.selectBatchIds(orderIds);
});
List<Long> userIds = orders.stream()
.map(Order::getUserId)
.distinct()
.collect(Collectors.toList());
Map<Long, User> userMap = DynamicDataSourceContextHolder.push("user_db", () -> {
return userMapper.selectBatchIds(userIds).stream()
.collect(Collectors.toMap(User::getId, Function.identity()));
});
return orders.stream().map(order -> {
OrderWithUserVO vo = new OrderWithUserVO();
vo.setOrder(order);
vo.setUser(userMap.get(order.getUserId()));
return vo;
}).collect(Collectors.toList());
}
}
动态增减数据源
SaaS场景下,租户动态注册需要运行时添加数据源:
@Component
public class TenantDataSourceManager {
@Autowired
private DynamicRoutingDataSource dynamicRoutingDataSource;
@Autowired
private DataSourceProperty defaultProperty;
private final Map<String, DataSourceProperty> tenantDsMap = new ConcurrentHashMap<>();
public synchronized void addTenantDs(String tenantId, String url, String username, String password) {
String dsKey = "tenant_" + tenantId;
DataSourceProperty property = new DataSourceProperty();
property.setUrl(url);
property.setUsername(username);
property.setPassword(password);
property.setDriverClassName(defaultProperty.getDriverClassName());
property.setLazy(true);
HikariDataSource dataSource = property.getDataSource()
.orElseThrow(() -> new RuntimeException("创建数据源失败"));
dataSource.setMaximumPoolSize(10);
dataSource.setMinimumIdle(2);
dynamicRoutingDataSource.addDataSource(dsKey, dataSource);
tenantDsMap.put(dsKey, property);
log.info("租户[{}]数据源已添加: {}", tenantId, dsKey);
}
public synchronized void removeTenantDs(String tenantId) {
String dsKey = "tenant_" + tenantId;
dynamicRoutingDataSource.removeDataSource(dsKey);
tenantDsMap.remove(dsKey);
log.info("租户[{}]数据源已移除: {}", tenantId, dsKey);
}
public String resolveDsKey(String tenantId) {
String dsKey = "tenant_" + tenantId;
if (!tenantDsMap.containsKey(dsKey)) {
throw new IllegalStateException("租户数据源不存在: " + dsKey);
}
return dsKey;
}
public Set<String> getAllTenantDsKeys() {
return Collections.unmodifiableSet(tenantDsMap.keySet());
}
}
租户数据源自动注册监听
@Component
@Slf4j
public class TenantDataSourceInitializer implements ApplicationRunner {
@Autowired
private TenantDataSourceManager tenantDsManager;
@Autowired
private TenantConfigRepository tenantConfigRepository;
@Override
public void run(ApplicationArguments args) {
List<TenantConfig> tenants = tenantConfigRepository.findAllActive();
for (TenantConfig tenant : tenants) {
try {
tenantDsManager.addTenantDs(
tenant.getTenantId(),
tenant.getDbUrl(),
tenant.getDbUsername(),
tenant.getDbPassword()
);
} catch (Exception e) {
log.error("租户[{}]数据源初始化失败", tenant.getTenantId(), e);
}
}
log.info("共初始化{}个租户数据源", tenants.size());
}
}
读写分离:从配置到实现
完整读写分离架构
┌─────────────────┐
│ Application │
│ @DS("master") │──写
│ @DS("slave") │──读
└────────┬────────┘
│
┌──────────────┼──────────────┐
│ │ │
┌─────▼─────┐ ┌─────▼─────┐ ┌─────▼─────┐
│ Master │ │ Slave-1 │ │ Slave-2 │
│ (读写) │ │ (只读) │ │ (只读) │
└─────┬─────┘ └───────────┘ └───────────┘
│
┌─────▼─────┐
│ Binlog │
│ 同步 │
└───────────┘
读写分离YAML配置
spring:
datasource:
dynamic:
primary: master
strict: true
datasource:
master:
url: jdbc:mysql://mysql-master:3306/app_db?useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8mb4
username: app_writer
password: ${MASTER_DB_PWD}
slave_1:
url: jdbc:mysql://mysql-slave-1:3306/app_db?useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8mb4
username: app_reader
password: ${SLAVE1_DB_PWD}
slave_2:
url: jdbc:mysql://mysql-slave-2:3306/app_db?useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8mb4
username: app_reader
password: ${SLAVE2_DB_PWD}
自定义负载均衡策略
默认轮询策略不够灵活,生产环境需要根据从库延迟动态路由:
@Component
public class SlaveLoadBalanceStrategy implements LoadBalanceStrategy {
@Autowired
private MySQLReplicationMonitor replicationMonitor;
@Override
public String determineDsKey(List<String> slaveDsKeys, String masterDsKey) {
List<String> availableSlaves = slaveDsKeys.stream()
.filter(key -> {
long delay = replicationMonitor.getReplicationDelay(key);
return delay < 1000;
})
.collect(Collectors.toList());
if (availableSlaves.isEmpty()) {
log.warn("所有从库延迟过高,降级到主库读取");
return masterDsKey;
}
int index = (int) (System.currentTimeMillis() % availableSlaves.size());
return availableSlaves.get(index);
}
}
复制延迟监控
@Component
@Slf4j
public class MySQLReplicationMonitor {
private final Map<String, Long> replicationDelayMap = new ConcurrentHashMap<>();
@Scheduled(fixedDelay = 5000)
public void monitorReplicationDelay() {
List<String> slaveKeys = List.of("slave_1", "slave_2");
for (String slaveKey : slaveKeys) {
DynamicDataSourceContextHolder.push(slaveKey, () -> {
try (Connection conn = dataSource.getConnection();
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SHOW SLAVE STATUS")) {
if (rs.next()) {
long secondsBehindMaster = rs.getLong("Seconds_Behind_Master");
replicationDelayMap.put(slaveKey, secondsBehindMaster * 1000);
if (secondsBehindMaster > 5) {
log.warn("从库[{}]复制延迟: {}秒", slaveKey, secondsBehindMaster);
}
}
} catch (SQLException e) {
replicationDelayMap.put(slaveKey, Long.MAX_VALUE);
log.error("从库[{}]状态检查失败", slaveKey, e);
}
});
}
}
public long getReplicationDelay(String slaveKey) {
return replicationDelayMap.getOrDefault(slaveKey, 0L);
}
}
强制走主库注解
某些场景(如写入后立即读取)必须走主库:
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@DS("master")
public @interface MasterOnly {
}
@Service
public class PaymentServiceImpl implements PaymentService {
@Autowired
private PaymentMapper paymentMapper;
@DS("master")
@Override
public Payment createPayment(PaymentDTO dto) {
Payment payment = new Payment();
BeanUtils.copyProperties(dto, payment);
paymentMapper.insert(payment);
return payment;
}
@MasterOnly
@Override
public Payment getPaymentAfterCreate(Long paymentId) {
return paymentMapper.selectById(paymentId);
}
}
分库分表:ShardingSphere-JDBC深度集成
分片架构
┌──────────────────────────────────────────────────────┐
│ Application Layer │
│ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ ShardingSphere-JDBC │ │
│ │ │ │
│ │ SQL: SELECT * FROM t_order WHERE user_id=1001 │ │
│ │ │ │ │
│ │ ┌────▼────┐ │ │
│ │ │ SQL解析 │ │ │
│ │ └────┬────┘ │ │
│ │ ┌────▼────┐ │ │
│ │ │ 路由引擎 │──► ds_0.t_order_1 │ │
│ │ └────┬────┘ │ │
│ │ ┌────▼────┐ │ │
│ │ │ SQL改写 │──► SELECT * FROM t_order_1 ... │ │
│ │ └────┬────┘ │ │
│ │ ┌────▼────┐ │ │
│ │ │ 结果归并 │ │ │
│ │ └─────────┘ │ │
│ └─────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────┘
Maven依赖
<dependencies>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-jdbc-core</artifactId>
<version>5.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-jdbc-core-spring-boot-starter</artifactId>
<version>5.5.1</version>
</dependency>
</dependencies>
分库分表配置
spring:
shardingsphere:
mode:
type: Standalone
repository:
type: JDBC
datasource:
names: ds_0,ds_1
ds_0:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://localhost:3306/shard_db_0?useSSL=false&serverTimezone=Asia/Shanghai
username: root
password: ds0_pwd
ds_1:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://localhost:3306/shard_db_1?useSSL=false&serverTimezone=Asia/Shanghai
username: root
password: ds1_pwd
rules:
sharding:
tables:
t_order:
actual-data-nodes: ds_${0..1}.t_order_${0..15}
table-strategy:
standard:
sharding-column: order_id
sharding-algorithm-name: order-table-inline
database-strategy:
standard:
sharding-column: user_id
sharding-algorithm-name: order-db-mod
key-generate-strategy:
column: order_id
key-generator-name: snowflake
t_order_item:
actual-data-nodes: ds_${0..1}.t_order_item_${0..15}
table-strategy:
standard:
sharding-column: order_id
sharding-algorithm-name: order-item-table-inline
database-strategy:
standard:
sharding-column: user_id
sharding-algorithm-name: order-db-mod
sharding-algorithms:
order-table-inline:
type: INLINE
props:
algorithm-expression: t_order_${order_id % 16}
order-item-table-inline:
type: INLINE
props:
algorithm-expression: t_order_item_${order_id % 16}
order-db-mod:
type: MOD
props:
sharding-count: 2
key-generators:
snowflake:
type: SNOWFLAKE
props:
worker-id: 1
props:
sql-show: true
自定义分片算法
标准取模算法在扩容时需要数据迁移,一致性哈希算法可以减少迁移量:
@Component
public class ConsistentHashShardingAlgorithm implements StandardShardingAlgorithm<Long> {
private ConsistentHash<String> consistentHash;
@Override
public void init(Properties props) {
int virtualNodes = Integer.parseInt(props.getProperty("virtual-nodes", "160"));
List<String> actualDataNodes = Arrays.stream(props.getProperty("actual-data-nodes").split(","))
.collect(Collectors.toList());
HashFunction hashFunction = new Murmur3HashFunction();
consistentHash = new ConsistentHash<>(hashFunction, virtualNodes, actualDataNodes);
}
@Override
public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<Long> shardingValue) {
return consistentHash.get(shardingValue.getValue());
}
@Override
public Collection<String> doSharding(Collection<String> availableTargetNames, RangeShardingValue<Long> shardingValue) {
return availableTargetNames;
}
@Override
public String getType() {
return "CONSISTENT_HASH";
}
}
分片算法注册
@Configuration
public class ShardingAlgorithmConfiguration {
@Bean
public ShardingAlgorithm<?> consistentHashAlgorithm() {
return new ConsistentHashShardingAlgorithm();
}
}
MyBatis-Plus与ShardingSphere集成注意事项
@Configuration
@MapperScan("com.example.mapper")
public class MybatisPlusConfig {
@Bean
public MybatisPlusInterceptor mybatisPlusInterceptor() {
MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
return interceptor;
}
@Bean
public IdentifierGenerator identifierGenerator() {
return new CustomIdGenerator();
}
}
public class CustomIdGenerator implements IdentifierGenerator {
@Override
public Number nextId(Object entity) {
return ShardingSphereIdGenerator.nextId();
}
}
多租户数据隔离:Schema级与Table级
两种隔离模式对比
| 维度 | Schema级隔离 | Table级隔离 |
|---|---|---|
| 隔离级别 | 较高 | 较低 |
| 资源开销 | 每租户独立Schema | 共享Schema,表名区分 |
| 运维复杂度 | 中 | 低 |
| 数据泄露风险 | 低 | 中(需严格过滤) |
| 适用场景 | 大客户/金融 | 小客户/SaaS |
| 扩展性 | Schema数量有限 | 表数量有限 |
Schema级隔离实现
@Component
public class SchemaTenantInterceptor implements InnerInterceptor {
@Override
public void beforeQuery(Executor executor, MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) {
String tenantId = TenantContextHolder.getTenantId();
if (tenantId != null) {
DynamicDataSourceContextHolder.push("tenant_" + tenantId);
}
}
}
Table级隔离实现(MyBatis-Plus租户插件)
@Configuration
public class TenantConfig {
@Bean
public MybatisPlusInterceptor mybatisPlusInterceptor(TenantLineInnerInterceptor tenantInterceptor) {
MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
interceptor.addInnerInterceptor(tenantInterceptor);
interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
return interceptor;
}
@Bean
public TenantLineInnerInterceptor tenantLineInnerInterceptor() {
return new TenantLineInnerInterceptor(new TenantLineHandler() {
@Override
public Expression getTenantId() {
Long tenantId = TenantContextHolder.getTenantId();
if (tenantId == null) {
throw new IllegalStateException("租户ID不能为空");
}
return new LongValue(tenantId);
}
@Override
public boolean ignoreTable(String tableName) {
return Set.of("sys_config", "sys_dict", "sys_menu")
.contains(tableName);
}
@Override
public String getTenantIdColumn() {
return "tenant_id";
}
});
}
}
租户上下文管理
public class TenantContextHolder {
private static final ThreadLocal<Long> TENANT_ID = new ThreadLocal<>();
public static void setTenantId(Long tenantId) {
TENANT_ID.set(tenantId);
}
public static Long getTenantId() {
return TENANT_ID.get();
}
public static void clear() {
TENANT_ID.remove();
}
}
Web层租户ID注入
@Component
@WebFilter(urlPatterns = "/*")
public class TenantFilter implements Filter {
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest) request;
String tenantHeader = httpRequest.getHeader("X-Tenant-Id");
if (tenantHeader != null) {
try {
TenantContextHolder.setTenantId(Long.parseLong(tenantHeader));
} catch (NumberFormatException e) {
throw new ServletException("无效的租户ID: " + tenantHeader);
}
}
try {
chain.doFilter(request, response);
} finally {
TenantContextHolder.clear();
}
}
}
事务管理:本地事务/Seata分布式事务/MQ最终一致性
事务方案对比
| 方案 | 一致性 | 性能 | 复杂度 | 适用场景 |
|---|---|---|---|---|
| 本地事务(@DSTransactional) | 强一致 | 高 | 低 | 单数据源或可容忍短暂不一致 |
| Seata AT模式 | 强一致 | 中 | 高 | 跨库强一致要求 |
| Seata TCC模式 | 强一致 | 中低 | 很高 | 资金/库存等核心业务 |
| MQ最终一致性 | 最终一致 | 高 | 中 | 异步场景/对延迟容忍 |
| Saga模式 | 最终一致 | 高 | 高 | 长流程编排 |
本地事务:@DSTransactional
dynamic-datasource提供了@DSTransactional,在切换数据源时保持事务:
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private OrderItemMapper orderItemMapper;
@DSTransactional
@DS("master")
@Override
public void createOrder(OrderCreateDTO dto) {
Order order = new Order();
order.setOrderNo(generateOrderNo());
order.setUserId(dto.getUserId());
order.setStatus(OrderStatus.CREATED);
orderMapper.insert(order);
for (OrderItemDTO itemDTO : dto.getItems()) {
OrderItem item = new OrderItem();
item.setOrderId(order.getId());
item.setProductId(itemDTO.getProductId());
item.setQuantity(itemDTO.getQuantity());
orderItemMapper.insert(item);
}
}
}
Seata AT模式集成
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Service A│───►│ Service B│───►│ Service C│
│ (订单库) │ │ (库存库) │ │ (账户库) │
└─────┬────┘ └─────┬────┘ └─────┬────┘
│ │ │
└───────────────┼───────────────┘
│
┌───────▼───────┐
│ Seata Server │
│ (TC协调器) │
└───────────────┘
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<version>2023.0.3.2</version>
</dependency>
seata:
enabled: true
application-id: order-service
tx-service-group: my_test_tx_group
service:
vgroup-mapping:
my_test_tx_group: default
registry:
type: nacos
nacos:
server-addr: localhost:8848
namespace: seata
group: SEATA_GROUP
config:
type: nacos
nacos:
server-addr: localhost:8848
namespace: seata
group: SEATA_GROUP
@Service
public class BusinessServiceImpl implements BusinessService {
@Autowired
private OrderService orderService;
@Autowired
private StockService stockService;
@Autowired
private AccountService accountService;
@GlobalTransactional(name = "create-order-tx", rollbackFor = Exception.class)
@Override
public void purchase(PurchaseDTO dto) {
orderService.createOrder(dto);
stockService.deductStock(dto.getProductId(), dto.getQuantity());
accountService.deductBalance(dto.getUserId(), dto.getAmount());
}
}
MQ最终一致性
@Service
@Slf4j
public class OrderMQService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private OrderMapper orderMapper;
@Autowired
private TransactionLogMapper transactionLogMapper;
@Transactional
public void createOrderWithMQ(OrderCreateDTO dto) {
Order order = new Order();
BeanUtils.copyProperties(dto, order);
order.setStatus(OrderStatus.CREATED);
orderMapper.insert(order);
TransactionLog log = new TransactionLog();
log.setTransactionId(UUID.randomUUID().toString());
log.setBizType("CREATE_ORDER");
log.setBizId(order.getId());
log.setPayload(JSON.toJSONString(dto));
transactionLogMapper.insert(log);
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void onOrderCreated(OrderCreatedEvent event) {
rocketMQTemplate.asyncSend(
"order-create-topic",
MessageBuilder.withPayload(event)
.setHeader("KEYS", event.getOrderNo())
.build(),
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("订单消息发送成功: {}", event.getOrderNo());
}
@Override
public void onException(Throwable e) {
log.error("订单消息发送失败: {}", event.getOrderNo(), e);
}
}
);
}
}
消息补偿定时任务
@Component
@Slf4j
public class TransactionLogCompensator {
@Autowired
private TransactionLogMapper transactionLogMapper;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Scheduled(fixedDelay = 30000)
public void compensate() {
List<TransactionLog> pendingLogs = transactionLogMapper.selectList(
new LambdaQueryWrapper<TransactionLog>()
.eq(TransactionLog::getStatus, 0)
.lt(TransactionLog::getRetryCount, 5)
.le(TransactionLog::getCreateTime, LocalDateTime.now().minusMinutes(1))
);
for (TransactionLog log : pendingLogs) {
try {
rocketMQTemplate.syncSend("order-create-topic", log.getPayload());
log.setStatus(1);
transactionLogMapper.updateById(log);
} catch (Exception e) {
log.setRetryCount(log.getRetryCount() + 1);
transactionLogMapper.updateById(log);
log.warn("事务日志[{}]补偿失败,重试次数: {}", log.getTransactionId(), log.getRetryCount());
}
}
}
}
监控与运维:数据源健康检查+Micrometer指标采集
数据源健康检查
@Component
@Slf4j
public class DataSourceHealthChecker {
@Autowired
private DynamicRoutingDataSource dynamicRoutingDataSource;
@Scheduled(fixedDelay = 10000)
public void checkAllDataSources() {
Map<String, DataSource> dataSources = dynamicRoutingDataSource.getCurrentDataSources();
for (Map.Entry<String, DataSource> entry : dataSources.entrySet()) {
String dsKey = entry.getKey();
try (Connection conn = entry.getValue().getConnection();
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT 1")) {
if (rs.next() && rs.getInt(1) == 1) {
log.debug("数据源[{}]健康检查通过", dsKey);
}
} catch (SQLException e) {
log.error("数据源[{}]健康检查失败", dsKey, e);
alertService.sendAlert("数据源异常: " + dsKey, e.getMessage());
}
}
}
}
Micrometer指标采集
@Configuration
public class DataSourceMetricsConfig {
@Bean
public MeterDataSourceAspect meterDataSourceAspect(MeterRegistry registry) {
return new MeterDataSourceAspect(registry);
}
public static class MeterDataSourceAspect {
private final MeterRegistry registry;
private final Counter dsSwitchCounter;
private final Timer dsQueryTimer;
public MeterDataSourceAspect(MeterRegistry registry) {
this.registry = registry;
this.dsSwitchCounter = Counter.builder("datasource.switch.count")
.description("数据源切换次数")
.register(registry);
this.dsQueryTimer = Timer.builder("datasource.query.duration")
.description("数据源查询耗时")
.tag("type", "dynamic")
.register(registry);
}
public void recordSwitch(String dsKey) {
dsSwitchCounter.increment();
registry.counter("datasource.switch.count", "ds", dsKey).increment();
}
public Timer.Sample startQuery() {
return Timer.start(registry);
}
public void endQuery(Timer.Sample sample, String dsKey) {
sample.stop(Timer.builder("datasource.query.duration")
.tag("ds", dsKey)
.register(registry));
}
}
}
HikariCP连接池监控
spring:
datasource:
dynamic:
hikari:
register-mbeans: true
metrics-tracker: true
@Component
public class HikariMetricsExporter {
@Autowired
private DynamicRoutingDataSource dynamicRoutingDataSource;
@Autowired
private MeterRegistry meterRegistry;
@Scheduled(fixedDelay = 15000)
public void exportHikariMetrics() {
Map<String, DataSource> dataSources = dynamicRoutingDataSource.getCurrentDataSources();
for (Map.Entry<String, DataSource> entry : dataSources.entrySet()) {
if (entry.getValue() instanceof HikariDataSource hikari) {
HikariPoolMXBean pool = hikari.getHikariPoolMXBean();
Gauge.builder("hikari.active.connections", pool, HikariPoolMXBean::getActiveConnections)
.tag("pool", entry.getKey())
.description("活跃连接数")
.register(meterRegistry);
Gauge.builder("hikari.idle.connections", pool, HikariPoolMXBean::getIdleConnections)
.tag("pool", entry.getKey())
.description("空闲连接数")
.register(meterRegistry);
Gauge.builder("hikari.threads.awaiting", pool, HikariPoolMXBean::getThreadsAwaitingConnection)
.tag("pool", entry.getKey())
.description("等待连接的线程数")
.register(meterRegistry);
}
}
}
}
Prometheus + Grafana监控面板关键指标
| 指标 | 含义 | 告警阈值 |
|---|---|---|
| hikari.active.connections | 当前活跃连接数 | > maxPool * 0.8 |
| hikari.threads.awaiting | 等待连接的线程数 | > 5 |
| datasource.switch.count | 数据源切换频率 | 异常飙升 |
| datasource.query.duration | 查询耗时 | P99 > 1s |
| hikari.connection.creation | 连接创建速率 | 持续高频创建 |
生产级架构与最佳实践
完整生产架构
┌─────────────────────────────────────────────────────────────┐
│ Nginx / Gateway │
└──────────────────────────┬──────────────────────────────────┘
│
┌──────────────────────────▼──────────────────────────────────┐
│ Spring Boot Application │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Controller Layer │ │
│ │ TenantFilter → @DS → @GlobalTransactional │ │
│ └──────────────────────────┬───────────────────────────┘ │
│ │ │
│ ┌──────────────────────────▼───────────────────────────┐ │
│ │ Service Layer │ │
│ │ @DSTransactional / @GlobalTransactional │ │
│ └──────────────────────────┬───────────────────────────┘ │
│ │ │
│ ┌──────────────────────────▼───────────────────────────┐ │
│ │ DynamicRoutingDataSource │ │
│ │ ┌─────────┬──────────┬──────────┬──────────┐ │ │
│ │ │ master │ slave_1 │ order_db │ tenant_X │ │ │
│ │ └────┬────┴────┬─────┴────┬─────┴────┬─────┘ │ │
│ └─────────┼─────────┼──────────┼──────────┼────────────┘ │
└────────────┼─────────┼──────────┼──────────┼────────────────┘
│ │ │ │
┌────▼───┐ ┌──▼───┐ ┌───▼──┐ ┌────▼────┐
│Master │ │Slave │ │Order │ │Tenant │
│MySQL │ │MySQL │ │MySQL │ │MySQL │
└────────┘ └──────┘ └──────┘ └─────────┘
常见陷阱总结
| 陷阱 | 现象 | 解决方案 |
|---|---|---|
| @DS注解失效 | AOP代理导致切换不生效 | 确保注解在public方法上,避免同类内部调用 |
| 事务内切换数据源 | 切换后仍在原数据源事务中 | 使用@DSTransactional替代@Transactional |
| ThreadLocal泄漏 | 异步线程中数据源标识残留 | 在finally中清理DynamicDataSourceContextHolder |
| 连接池耗尽 | 高并发下连接等待超时 | 按数据源QPS合理配置连接池大小 |
| 主从延迟读旧数据 | 写后立即读从库读不到 | 写后读走主库或使用@MasterOnly |
| 分片键未命中全表扫描 | 未携带分片键的查询路由到所有分片 | 强制要求分片键查询或使用ES宽表 |
| 租户ID未注入 | SQL缺少tenant_id条件 | 全局租户拦截器 + 白名单表 |
| 分布式事务超时 | Seata全局事务超时回滚 | 合理设置timeout,避免长事务 |
| 动态数据源未关闭 | 租户注销后连接池未释放 | removeDataSource时关闭连接池 |
| HikariCP配置不当 | minimumIdle=maximumPoolSize导致资源浪费 | 区分核心与峰值配置 |
连接池配置参考
spring:
datasource:
dynamic:
hikari:
minimum-idle: 5
maximum-pool-size: 20
idle-timeout: 300000
max-lifetime: 1800000
connection-timeout: 30000
connection-test-query: SELECT 1
pool-name: DynamicHikariCP
datasource:
master:
hikari:
minimum-idle: 10
maximum-pool-size: 50
slave_1:
hikari:
minimum-idle: 10
maximum-pool-size: 30
slave_2:
hikari:
minimum-idle: 10
maximum-pool-size: 30
配置加密
spring:
datasource:
dynamic:
datasource:
master:
url: jdbc:mysql://mysql-master:3306/app_db
username: ENC(g2U3x8vKpQ==)
password: ENC(aB3dE7fG9hJ==)
@Configuration
public class DataSourceEncryptConfig {
@Bean
public DataSourcePropertySourceProcessor dataSourcePropertySourceProcessor() {
return new DataSourcePropertySourceProcessor() {
@Override
public String decrypt(String cipherText) {
if (cipherText.startsWith("ENC(")) {
String encrypted = cipherText.substring(4, cipherText.length() - 1);
return AesUtil.decrypt(encrypted, getSecretKey());
}
return cipherText;
}
};
}
}
优雅停机
@Configuration
public class GracefulShutdownConfig {
@Autowired
private DynamicRoutingDataSource dynamicRoutingDataSource;
@PreDestroy
public void gracefulShutdown() {
log.info("开始优雅关闭数据源...");
Map<String, DataSource> dataSources = dynamicRoutingDataSource.getCurrentDataSources();
for (Map.Entry<String, DataSource> entry : dataSources.entrySet()) {
if (entry.getValue() instanceof HikariDataSource hikari) {
log.info("关闭数据源[{}],活跃连接: {}", entry.getKey(), hikari.getHikariPoolMXBean().getActiveConnections());
hikari.close();
}
}
log.info("所有数据源已关闭");
}
}
多环境配置管理
# application-dev.yml
spring:
datasource:
dynamic:
strict: false
datasource:
master:
url: jdbc:mysql://localhost:3306/dev_db
# application-prod.yml
spring:
datasource:
dynamic:
strict: true
hikari:
minimum-idle: 10
maximum-pool-size: 50
datasource:
master:
url: jdbc:mysql://mysql-master.internal:3306/prod_db
username: ${DB_MASTER_USER}
password: ${DB_MASTER_PWD}
总结
Spring Boot 3 + MyBatis-Plus多数据源架构是企业级Java应用的标配方案。核心选型建议:
- 中小项目:
dynamic-datasource-spring-boot-starter+ 注解切换,简单高效 - 中大型项目:
ShardingSphere-JDBC+ 分库分表,功能全面 - 跨语言项目:
ShardingSphere-Proxy独立代理层 - 事务要求:单库用
@DSTransactional,跨库用Seata AT,异步用MQ最终一致性 - 多租户:大客户Schema隔离,小客户Table隔离 + MyBatis-Plus租户插件
多数据源不是银弹,引入前先评估是否真的需要。能用单库解决的,不要过度设计。
本站提供浏览器本地工具,免注册即可试用 →
#MyBatis-Plus#多数据源#分库分表#读写分离#ShardingSphere#Spring Boot