Spring Boot 3 + MyBatis-Plus多資料源架構全攻略
后端开发
為什麼多資料源是企業級剛需
單資料源架構在網際網路早期夠用,但隨著業務規模膨脹,一個資料庫扛不住所有流量和複雜度。多資料源不是「錦上添花」,而是「不得不做」。
五大核心場景
| 場景 | 典型需求 | 資料源關係 | 範例 |
|---|---|---|---|
| 讀寫分離 | 讀多寫少,讀壓力分散 | 一主多從 | 電商商品詳情頁 |
| 業務分庫 | 不同業務域物理隔離 | 平行獨立 | 訂單庫 vs 使用者庫 |
| 分庫分表 | 單表資料量過大 | 水平分片 | 日誌表按月分表 |
| 多租戶 | 不同租戶資料隔離 | 一租戶一庫/一Schema | SaaS平台 |
| 異構資料源 | 不同類型儲存協作 | MySQL + PG + ES | 搜尋+事務混合 |
┌──────────────────────────────────────────────────────────┐
│ 單資料源架構(早期) │
│ │
│ App ──────► MySQL (讀寫全在這裡) │
│ │
│ 問題:讀寫競爭、單表膨脹、無法水平擴展 │
└──────────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────┐
│ 多資料源架構(演進) │
│ │
│ App ──┬──► MySQL-Master (寫) │
│ ├──► MySQL-Slave1 (讀) │
│ ├──► MySQL-Slave2 (讀) │
│ ├──► OrderDB (訂單庫) │
│ ├──► UserDB (使用者庫) │
│ └──► Elasticsearch (搜尋) │
└──────────────────────────────────────────────────────────┘
真實案例: 某社交平台日活500萬,單庫QPS峰值12萬,讀寫分離+業務分庫後,主庫QPS降至3萬,整體吞吐提升4倍。
多資料源架構三種模式
模式對比總覽
| 維度 | 應用內多資料源 | JDBC層代理 | 代理層獨立部署 |
|---|---|---|---|
| 代表方案 | dynamic-datasource | ShardingSphere-JDBC | ShardingSphere-Proxy / MyCat |
| 侵入性 | 低(註解/配置) | 中(替換DataSource) | 無(獨立程序) |
| 運維複雜度 | 低 | 中 | 高 |
| 效能損耗 | 極低 | 低 | 中(網路跳轉) |
| 功能豐富度 | 基礎切換 | 分片+讀寫分離+加密 | 全功能 |
| 適用規模 | 中小專案 | 中大型專案 | 大型/跨語言專案 |
| 語言綁定 | Java | Java | 語言無關 |
模式一:應用內多資料源
┌─────────────────────────────────────────────┐
│ Spring Boot Application │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ DataSource│ │ DataSource│ │ DataSource│ │
│ │ master │ │ slave_1 │ │ slave_2 │ │
│ └─────┬────┘ └─────┬────┘ └─────┬────┘ │
│ │ │ │ │
│ ┌─────▼─────────────▼─────────────▼────┐ │
│ │ DynamicRoutingDataSource │ │
│ │ (ThreadLocal + @DS 註解路由) │ │
│ └──────────────────────────────────────┘ │
└─────────────────────────────────────────────┘
模式二:JDBC層代理
┌─────────────────────────────────────────────┐
│ Spring Boot Application │
│ │
│ ┌──────────────────────────────────────┐ │
│ │ ShardingSphere-JDBC (JAR) │ │
│ │ ┌────────────────────────────────┐ │ │
│ │ │ SQL解析 → 路由 → 改寫 → 執行 │ │ │
│ │ └────────────────────────────────┘ │ │
│ └──────┬──────────┬──────────┬─────────┘ │
│ │ │ │ │
│ ds_0 │ ds_1 │ ds_2 │ │
└─────────┼──────────┼──────────┼──────────────┘
▼ ▼ ▼
MySQL-0 MySQL-1 MySQL-2
模式三:代理層獨立部署
┌────────────┐ ┌──────────────────┐ ┌─────────┐
│ App (Java) │────►│ │ │ MySQL-0 │
├────────────┤ │ ShardingSphere │────►├─────────┤
│ App (Go) │────►│ -Proxy (獨立) │────►│ MySQL-1 │
├────────────┤ │ │ ├─────────┤
│ App (Py) │────►│ 對應用透明 │────►│ MySQL-2 │
└────────────┘ └──────────────────┘ └─────────┘
dynamic-datasource-spring-boot-starter實戰
這是目前最流行的應用內多資料源方案,由苞米豆團隊維護,與MyBatis-Plus深度整合。
Maven依賴
<dependencies>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>dynamic-datasource-spring-boot3-starter</artifactId>
<version>4.3.1</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-spring-boot3-starter</artifactId>
<version>3.5.7</version>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
基礎配置
spring:
datasource:
dynamic:
primary: master
strict: true
datasource:
master:
url: jdbc:mysql://localhost:3306/db_master?useSSL=false&serverTimezone=Asia/Shanghai
username: root
password: master_pwd
driver-class-name: com.mysql.cj.jdbc.Driver
slave_1:
url: jdbc:mysql://localhost:3307/db_slave?useSSL=false&serverTimezone=Asia/Shanghai
username: root
password: slave_pwd
driver-class-name: com.mysql.cj.jdbc.Driver
slave_2:
url: jdbc:mysql://localhost:3308/db_slave?useSSL=false&serverTimezone=Asia/Shanghai
username: root
password: slave_pwd
driver-class-name: com.mysql.cj.jdbc.Driver
hikari:
minimum-idle: 5
maximum-pool-size: 20
idle-timeout: 30000
max-lifetime: 1800000
connection-timeout: 30000
pool-name: DynamicHikariCP
註解切換 @DS
@Service
public class UserServiceImpl implements UserService {
@Autowired
private UserMapper userMapper;
@DS("master")
@Override
public void createUser(User user) {
userMapper.insert(user);
}
@DS("slave_1")
@Override
public User getUserById(Long id) {
return userMapper.selectById(id);
}
@DS("slave_2")
@Override
public List<User> listUsers(int pageNum, int pageSize) {
Page<User> page = new Page<>(pageNum, pageSize);
return userMapper.selectPage(page, null).getRecords();
}
}
程式設計式切換
註解方式在複雜場景下不夠靈活,程式設計式切換可以在執行時動態決定資料源:
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
private OrderMapper orderMapper;
@Override
public Order getOrderWithDynamicDs(String tenantId) {
String dsKey = "tenant_" + tenantId;
return DynamicDataSourceContextHolder.push(dsKey, () -> {
return orderMapper.selectById(tenantId);
});
}
@Override
public List<Order> batchQuery(List<String> tenantIds) {
List<Order> result = new ArrayList<>();
for (String tenantId : tenantIds) {
DynamicDataSourceContextHolder.push("tenant_" + tenantId, () -> {
result.addAll(orderMapper.selectList(null));
});
}
return result;
}
}
自定義動態資料源路由策略
@Component
public class CustomDsProcessor extends DsProcessor {
@Autowired
private TenantDataSourceManager tenantDsManager;
@Override
public String determineDsKey(MethodInvocation invocation, String key) {
if (key.startsWith("tenant#")) {
String tenantId = extractTenantId(invocation);
return tenantDsManager.resolveDsKey(tenantId);
}
return key;
}
private String extractTenantId(MethodInvocation invocation) {
Object[] args = invocation.getArguments();
for (Object arg : args) {
if (arg instanceof TenantAware tenantAware) {
return tenantAware.getTenantId();
}
}
throw new IllegalStateException("無法提取租戶ID");
}
}
AOP切面實現讀寫分離自動路由
@Aspect
@Component
@Order(Ordered.HIGHEST_PRECEDENCE)
public class ReadWriteSplitAspect {
private static final Set<String> READ_METHOD_PREFIXES =
Set.of("get", "query", "find", "list", "count", "page", "search");
@Around("execution(* com.example..service.impl.*ServiceImpl.*(..))")
public Object around(ProceedingJoinPoint point) throws Throwable {
String methodName = point.getSignature().getName();
boolean isRead = READ_METHOD_PREFIXES.stream()
.anyMatch(methodName::startsWith);
if (isRead) {
DynamicDataSourceContextHolder.push("slave_1");
} else {
DynamicDataSourceContextHolder.push("master");
}
try {
return point.proceed();
} finally {
DynamicDataSourceContextHolder.poll();
}
}
}
MyBatis-Plus多資料源配置
跨資料源關聯查詢
MyBatis-Plus預設不支援跨資料源JOIN,需要應用層組裝:
@Service
public class OrderDetailServiceImpl implements OrderDetailService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private UserMapper userMapper;
@Autowired
private ProductMapper productMapper;
@DS("order_db")
public OrderDetailVO getOrderDetail(Long orderId) {
Order order = orderMapper.selectById(orderId);
User user = DynamicDataSourceContextHolder.push("user_db", () -> {
return userMapper.selectById(order.getUserId());
});
Product product = DynamicDataSourceContextHolder.push("product_db", () -> {
return productMapper.selectById(order.getProductId());
});
return OrderDetailVO.builder()
.order(order)
.user(user)
.product(product)
.build();
}
}
跨資料源批次查詢最佳化
@Service
public class BatchCrossDsService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private UserMapper userMapper;
public List<OrderWithUserVO> batchQuery(List<Long> orderIds) {
List<Order> orders = DynamicDataSourceContextHolder.push("order_db", () -> {
return orderMapper.selectBatchIds(orderIds);
});
List<Long> userIds = orders.stream()
.map(Order::getUserId)
.distinct()
.collect(Collectors.toList());
Map<Long, User> userMap = DynamicDataSourceContextHolder.push("user_db", () -> {
return userMapper.selectBatchIds(userIds).stream()
.collect(Collectors.toMap(User::getId, Function.identity()));
});
return orders.stream().map(order -> {
OrderWithUserVO vo = new OrderWithUserVO();
vo.setOrder(order);
vo.setUser(userMap.get(order.getUserId()));
return vo;
}).collect(Collectors.toList());
}
}
動態增減資料源
SaaS場景下,租戶動態註冊需要執行時新增資料源:
@Component
public class TenantDataSourceManager {
@Autowired
private DynamicRoutingDataSource dynamicRoutingDataSource;
@Autowired
private DataSourceProperty defaultProperty;
private final Map<String, DataSourceProperty> tenantDsMap = new ConcurrentHashMap<>();
public synchronized void addTenantDs(String tenantId, String url, String username, String password) {
String dsKey = "tenant_" + tenantId;
DataSourceProperty property = new DataSourceProperty();
property.setUrl(url);
property.setUsername(username);
property.setPassword(password);
property.setDriverClassName(defaultProperty.getDriverClassName());
property.setLazy(true);
HikariDataSource dataSource = property.getDataSource()
.orElseThrow(() -> new RuntimeException("建立資料源失敗"));
dataSource.setMaximumPoolSize(10);
dataSource.setMinimumIdle(2);
dynamicRoutingDataSource.addDataSource(dsKey, dataSource);
tenantDsMap.put(dsKey, property);
log.info("租戶[{}]資料源已新增: {}", tenantId, dsKey);
}
public synchronized void removeTenantDs(String tenantId) {
String dsKey = "tenant_" + tenantId;
dynamicRoutingDataSource.removeDataSource(dsKey);
tenantDsMap.remove(dsKey);
log.info("租戶[{}]資料源已移除: {}", tenantId, dsKey);
}
public String resolveDsKey(String tenantId) {
String dsKey = "tenant_" + tenantId;
if (!tenantDsMap.containsKey(dsKey)) {
throw new IllegalStateException("租戶資料源不存在: " + dsKey);
}
return dsKey;
}
public Set<String> getAllTenantDsKeys() {
return Collections.unmodifiableSet(tenantDsMap.keySet());
}
}
租戶資料源自動註冊監聽
@Component
@Slf4j
public class TenantDataSourceInitializer implements ApplicationRunner {
@Autowired
private TenantDataSourceManager tenantDsManager;
@Autowired
private TenantConfigRepository tenantConfigRepository;
@Override
public void run(ApplicationArguments args) {
List<TenantConfig> tenants = tenantConfigRepository.findAllActive();
for (TenantConfig tenant : tenants) {
try {
tenantDsManager.addTenantDs(
tenant.getTenantId(),
tenant.getDbUrl(),
tenant.getDbUsername(),
tenant.getDbPassword()
);
} catch (Exception e) {
log.error("租戶[{}]資料源初始化失敗", tenant.getTenantId(), e);
}
}
log.info("共初始化{}個租戶資料源", tenants.size());
}
}
讀寫分離:從配置到實現
完整讀寫分離架構
┌─────────────────┐
│ Application │
│ @DS("master") │──寫
│ @DS("slave") │──讀
└────────┬────────┘
│
┌──────────────┼──────────────┐
│ │ │
┌─────▼─────┐ ┌─────▼─────┐ ┌─────▼─────┐
│ Master │ │ Slave-1 │ │ Slave-2 │
│ (讀寫) │ │ (唯讀) │ │ (唯讀) │
└─────┬─────┘ └───────────┘ └───────────┘
│
┌─────▼─────┐
│ Binlog │
│ 同步 │
└───────────┘
讀寫分離YAML配置
spring:
datasource:
dynamic:
primary: master
strict: true
datasource:
master:
url: jdbc:mysql://mysql-master:3306/app_db?useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8mb4
username: app_writer
password: ${MASTER_DB_PWD}
slave_1:
url: jdbc:mysql://mysql-slave-1:3306/app_db?useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8mb4
username: app_reader
password: ${SLAVE1_DB_PWD}
slave_2:
url: jdbc:mysql://mysql-slave-2:3306/app_db?useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8mb4
username: app_reader
password: ${SLAVE2_DB_PWD}
自定義負載均衡策略
預設輪詢策略不夠靈活,生產環境需要根據從庫延遲動態路由:
@Component
public class SlaveLoadBalanceStrategy implements LoadBalanceStrategy {
@Autowired
private MySQLReplicationMonitor replicationMonitor;
@Override
public String determineDsKey(List<String> slaveDsKeys, String masterDsKey) {
List<String> availableSlaves = slaveDsKeys.stream()
.filter(key -> {
long delay = replicationMonitor.getReplicationDelay(key);
return delay < 1000;
})
.collect(Collectors.toList());
if (availableSlaves.isEmpty()) {
log.warn("所有從庫延遲過高,降級到主庫讀取");
return masterDsKey;
}
int index = (int) (System.currentTimeMillis() % availableSlaves.size());
return availableSlaves.get(index);
}
}
複製延遲監控
@Component
@Slf4j
public class MySQLReplicationMonitor {
private final Map<String, Long> replicationDelayMap = new ConcurrentHashMap<>();
@Scheduled(fixedDelay = 5000)
public void monitorReplicationDelay() {
List<String> slaveKeys = List.of("slave_1", "slave_2");
for (String slaveKey : slaveKeys) {
DynamicDataSourceContextHolder.push(slaveKey, () -> {
try (Connection conn = dataSource.getConnection();
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SHOW SLAVE STATUS")) {
if (rs.next()) {
long secondsBehindMaster = rs.getLong("Seconds_Behind_Master");
replicationDelayMap.put(slaveKey, secondsBehindMaster * 1000);
if (secondsBehindMaster > 5) {
log.warn("從庫[{}]複製延遲: {}秒", slaveKey, secondsBehindMaster);
}
}
} catch (SQLException e) {
replicationDelayMap.put(slaveKey, Long.MAX_VALUE);
log.error("從庫[{}]狀態檢查失敗", slaveKey, e);
}
});
}
}
public long getReplicationDelay(String slaveKey) {
return replicationDelayMap.getOrDefault(slaveKey, 0L);
}
}
強制走主庫註解
某些場景(如寫入後立即讀取)必須走主庫:
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@DS("master")
public @interface MasterOnly {
}
@Service
public class PaymentServiceImpl implements PaymentService {
@Autowired
private PaymentMapper paymentMapper;
@DS("master")
@Override
public Payment createPayment(PaymentDTO dto) {
Payment payment = new Payment();
BeanUtils.copyProperties(dto, payment);
paymentMapper.insert(payment);
return payment;
}
@MasterOnly
@Override
public Payment getPaymentAfterCreate(Long paymentId) {
return paymentMapper.selectById(paymentId);
}
}
分庫分表:ShardingSphere-JDBC深度整合
分片架構
┌──────────────────────────────────────────────────────┐
│ Application Layer │
│ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ ShardingSphere-JDBC │ │
│ │ │ │
│ │ SQL: SELECT * FROM t_order WHERE user_id=1001 │ │
│ │ │ │ │
│ │ ┌────▼────┐ │ │
│ │ │ SQL解析 │ │ │
│ │ └────┬────┘ │ │
│ │ ┌────▼────┐ │ │
│ │ │ 路由引擎 │──► ds_0.t_order_1 │ │
│ │ └────┬────┘ │ │
│ │ ┌────▼────┐ │ │
│ │ │ SQL改寫 │──► SELECT * FROM t_order_1 ... │ │
│ │ └────┬────┘ │ │
│ │ ┌────▼────┐ │ │
│ │ │ 結果歸併 │ │ │
│ │ └─────────┘ │ │
│ └─────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────┘
Maven依賴
<dependencies>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-jdbc-core</artifactId>
<version>5.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-jdbc-core-spring-boot-starter</artifactId>
<version>5.5.1</version>
</dependency>
</dependencies>
分庫分表配置
spring:
shardingsphere:
mode:
type: Standalone
repository:
type: JDBC
datasource:
names: ds_0,ds_1
ds_0:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://localhost:3306/shard_db_0?useSSL=false&serverTimezone=Asia/Shanghai
username: root
password: ds0_pwd
ds_1:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://localhost:3306/shard_db_1?useSSL=false&serverTimezone=Asia/Shanghai
username: root
password: ds1_pwd
rules:
sharding:
tables:
t_order:
actual-data-nodes: ds_${0..1}.t_order_${0..15}
table-strategy:
standard:
sharding-column: order_id
sharding-algorithm-name: order-table-inline
database-strategy:
standard:
sharding-column: user_id
sharding-algorithm-name: order-db-mod
key-generate-strategy:
column: order_id
key-generator-name: snowflake
t_order_item:
actual-data-nodes: ds_${0..1}.t_order_item_${0..15}
table-strategy:
standard:
sharding-column: order_id
sharding-algorithm-name: order-item-table-inline
database-strategy:
standard:
sharding-column: user_id
sharding-algorithm-name: order-db-mod
sharding-algorithms:
order-table-inline:
type: INLINE
props:
algorithm-expression: t_order_${order_id % 16}
order-item-table-inline:
type: INLINE
props:
algorithm-expression: t_order_item_${order_id % 16}
order-db-mod:
type: MOD
props:
sharding-count: 2
key-generators:
snowflake:
type: SNOWFLAKE
props:
worker-id: 1
props:
sql-show: true
自定義分片演算法
標準取模演算法在擴容時需要資料遷移,一致性雜湊演算法可以減少遷移量:
@Component
public class ConsistentHashShardingAlgorithm implements StandardShardingAlgorithm<Long> {
private ConsistentHash<String> consistentHash;
@Override
public void init(Properties props) {
int virtualNodes = Integer.parseInt(props.getProperty("virtual-nodes", "160"));
List<String> actualDataNodes = Arrays.stream(props.getProperty("actual-data-nodes").split(","))
.collect(Collectors.toList());
HashFunction hashFunction = new Murmur3HashFunction();
consistentHash = new ConsistentHash<>(hashFunction, virtualNodes, actualDataNodes);
}
@Override
public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<Long> shardingValue) {
return consistentHash.get(shardingValue.getValue());
}
@Override
public Collection<String> doSharding(Collection<String> availableTargetNames, RangeShardingValue<Long> shardingValue) {
return availableTargetNames;
}
@Override
public String getType() {
return "CONSISTENT_HASH";
}
}
MyBatis-Plus與ShardingSphere整合注意事項
@Configuration
@MapperScan("com.example.mapper")
public class MybatisPlusConfig {
@Bean
public MybatisPlusInterceptor mybatisPlusInterceptor() {
MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
return interceptor;
}
@Bean
public IdentifierGenerator identifierGenerator() {
return new CustomIdGenerator();
}
}
public class CustomIdGenerator implements IdentifierGenerator {
@Override
public Number nextId(Object entity) {
return ShardingSphereIdGenerator.nextId();
}
}
多租戶資料隔離:Schema級與Table級
兩種隔離模式對比
| 維度 | Schema級隔離 | Table級隔離 |
|---|---|---|
| 隔離級別 | 較高 | 較低 |
| 資源開銷 | 每租戶獨立Schema | 共享Schema,表名區分 |
| 運維複雜度 | 中 | 低 |
| 資料洩露風險 | 低 | 中(需嚴格過濾) |
| 適用場景 | 大客戶/金融 | 小客戶/SaaS |
| 擴展性 | Schema數量有限 | 表數量有限 |
Schema級隔離實現
@Component
public class SchemaTenantInterceptor implements InnerInterceptor {
@Override
public void beforeQuery(Executor executor, MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) {
String tenantId = TenantContextHolder.getTenantId();
if (tenantId != null) {
DynamicDataSourceContextHolder.push("tenant_" + tenantId);
}
}
}
Table級隔離實現(MyBatis-Plus租戶外掛)
@Configuration
public class TenantConfig {
@Bean
public MybatisPlusInterceptor mybatisPlusInterceptor(TenantLineInnerInterceptor tenantInterceptor) {
MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
interceptor.addInnerInterceptor(tenantInterceptor);
interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
return interceptor;
}
@Bean
public TenantLineInnerInterceptor tenantLineInnerInterceptor() {
return new TenantLineInnerInterceptor(new TenantLineHandler() {
@Override
public Expression getTenantId() {
Long tenantId = TenantContextHolder.getTenantId();
if (tenantId == null) {
throw new IllegalStateException("租戶ID不能為空");
}
return new LongValue(tenantId);
}
@Override
public boolean ignoreTable(String tableName) {
return Set.of("sys_config", "sys_dict", "sys_menu")
.contains(tableName);
}
@Override
public String getTenantIdColumn() {
return "tenant_id";
}
});
}
}
租戶上下文管理
public class TenantContextHolder {
private static final ThreadLocal<Long> TENANT_ID = new ThreadLocal<>();
public static void setTenantId(Long tenantId) {
TENANT_ID.set(tenantId);
}
public static Long getTenantId() {
return TENANT_ID.get();
}
public static void clear() {
TENANT_ID.remove();
}
}
Web層租戶ID注入
@Component
@WebFilter(urlPatterns = "/*")
public class TenantFilter implements Filter {
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest) request;
String tenantHeader = httpRequest.getHeader("X-Tenant-Id");
if (tenantHeader != null) {
try {
TenantContextHolder.setTenantId(Long.parseLong(tenantHeader));
} catch (NumberFormatException e) {
throw new ServletException("無效的租戶ID: " + tenantHeader);
}
}
try {
chain.doFilter(request, response);
} finally {
TenantContextHolder.clear();
}
}
}
事務管理:本地事務/Seata分散式事務/MQ最終一致性
事務方案對比
| 方案 | 一致性 | 效能 | 複雜度 | 適用場景 |
|---|---|---|---|---|
| 本地事務(@DSTransactional) | 強一致 | 高 | 低 | 單資料源或可容忍短暫不一致 |
| Seata AT模式 | 強一致 | 中 | 高 | 跨庫強一致要求 |
| Seata TCC模式 | 強一致 | 中低 | 很高 | 資金/庫存等核心業務 |
| MQ最終一致性 | 最終一致 | 高 | 中 | 非同步場景/對延遲容忍 |
| Saga模式 | 最終一致 | 高 | 高 | 長流程編排 |
本地事務:@DSTransactional
dynamic-datasource提供了@DSTransactional,在切換資料源時保持事務:
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private OrderItemMapper orderItemMapper;
@DSTransactional
@DS("master")
@Override
public void createOrder(OrderCreateDTO dto) {
Order order = new Order();
order.setOrderNo(generateOrderNo());
order.setUserId(dto.getUserId());
order.setStatus(OrderStatus.CREATED);
orderMapper.insert(order);
for (OrderItemDTO itemDTO : dto.getItems()) {
OrderItem item = new OrderItem();
item.setOrderId(order.getId());
item.setProductId(itemDTO.getProductId());
item.setQuantity(itemDTO.getQuantity());
orderItemMapper.insert(item);
}
}
}
Seata AT模式整合
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Service A│───►│ Service B│───►│ Service C│
│ (訂單庫) │ │ (庫存庫) │ │ (帳戶庫) │
└─────┬────┘ └─────┬────┘ └─────┬────┘
│ │ │
└───────────────┼───────────────┘
│
┌───────▼───────┐
│ Seata Server │
│ (TC協調器) │
└───────────────┘
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<version>2023.0.3.2</version>
</dependency>
seata:
enabled: true
application-id: order-service
tx-service-group: my_test_tx_group
service:
vgroup-mapping:
my_test_tx_group: default
registry:
type: nacos
nacos:
server-addr: localhost:8848
namespace: seata
group: SEATA_GROUP
config:
type: nacos
nacos:
server-addr: localhost:8848
namespace: seata
group: SEATA_GROUP
@Service
public class BusinessServiceImpl implements BusinessService {
@Autowired
private OrderService orderService;
@Autowired
private StockService stockService;
@Autowired
private AccountService accountService;
@GlobalTransactional(name = "create-order-tx", rollbackFor = Exception.class)
@Override
public void purchase(PurchaseDTO dto) {
orderService.createOrder(dto);
stockService.deductStock(dto.getProductId(), dto.getQuantity());
accountService.deductBalance(dto.getUserId(), dto.getAmount());
}
}
MQ最終一致性
@Service
@Slf4j
public class OrderMQService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private OrderMapper orderMapper;
@Autowired
private TransactionLogMapper transactionLogMapper;
@Transactional
public void createOrderWithMQ(OrderCreateDTO dto) {
Order order = new Order();
BeanUtils.copyProperties(dto, order);
order.setStatus(OrderStatus.CREATED);
orderMapper.insert(order);
TransactionLog log = new TransactionLog();
log.setTransactionId(UUID.randomUUID().toString());
log.setBizType("CREATE_ORDER");
log.setBizId(order.getId());
log.setPayload(JSON.toJSONString(dto));
transactionLogMapper.insert(log);
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void onOrderCreated(OrderCreatedEvent event) {
rocketMQTemplate.asyncSend(
"order-create-topic",
MessageBuilder.withPayload(event)
.setHeader("KEYS", event.getOrderNo())
.build(),
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("訂單訊息傳送成功: {}", event.getOrderNo());
}
@Override
public void onException(Throwable e) {
log.error("訂單訊息傳送失敗: {}", event.getOrderNo(), e);
}
}
);
}
}
訊息補償定時任務
@Component
@Slf4j
public class TransactionLogCompensator {
@Autowired
private TransactionLogMapper transactionLogMapper;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Scheduled(fixedDelay = 30000)
public void compensate() {
List<TransactionLog> pendingLogs = transactionLogMapper.selectList(
new LambdaQueryWrapper<TransactionLog>()
.eq(TransactionLog::getStatus, 0)
.lt(TransactionLog::getRetryCount, 5)
.le(TransactionLog::getCreateTime, LocalDateTime.now().minusMinutes(1))
);
for (TransactionLog log : pendingLogs) {
try {
rocketMQTemplate.syncSend("order-create-topic", log.getPayload());
log.setStatus(1);
transactionLogMapper.updateById(log);
} catch (Exception e) {
log.setRetryCount(log.getRetryCount() + 1);
transactionLogMapper.updateById(log);
log.warn("事務日誌[{}]補償失敗,重試次數: {}", log.getTransactionId(), log.getRetryCount());
}
}
}
}
監控與運維:資料源健康檢查+Micrometer指標採集
資料源健康檢查
@Component
@Slf4j
public class DataSourceHealthChecker {
@Autowired
private DynamicRoutingDataSource dynamicRoutingDataSource;
@Scheduled(fixedDelay = 10000)
public void checkAllDataSources() {
Map<String, DataSource> dataSources = dynamicRoutingDataSource.getCurrentDataSources();
for (Map.Entry<String, DataSource> entry : dataSources.entrySet()) {
String dsKey = entry.getKey();
try (Connection conn = entry.getValue().getConnection();
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT 1")) {
if (rs.next() && rs.getInt(1) == 1) {
log.debug("資料源[{}]健康檢查通過", dsKey);
}
} catch (SQLException e) {
log.error("資料源[{}]健康檢查失敗", dsKey, e);
alertService.sendAlert("資料源異常: " + dsKey, e.getMessage());
}
}
}
}
Micrometer指標採集
@Configuration
public class DataSourceMetricsConfig {
@Bean
public MeterDataSourceAspect meterDataSourceAspect(MeterRegistry registry) {
return new MeterDataSourceAspect(registry);
}
public static class MeterDataSourceAspect {
private final MeterRegistry registry;
private final Counter dsSwitchCounter;
private final Timer dsQueryTimer;
public MeterDataSourceAspect(MeterRegistry registry) {
this.registry = registry;
this.dsSwitchCounter = Counter.builder("datasource.switch.count")
.description("資料源切換次數")
.register(registry);
this.dsQueryTimer = Timer.builder("datasource.query.duration")
.description("資料源查詢耗時")
.tag("type", "dynamic")
.register(registry);
}
public void recordSwitch(String dsKey) {
dsSwitchCounter.increment();
registry.counter("datasource.switch.count", "ds", dsKey).increment();
}
public Timer.Sample startQuery() {
return Timer.start(registry);
}
public void endQuery(Timer.Sample sample, String dsKey) {
sample.stop(Timer.builder("datasource.query.duration")
.tag("ds", dsKey)
.register(registry));
}
}
}
HikariCP連線池監控
spring:
datasource:
dynamic:
hikari:
register-mbeans: true
metrics-tracker: true
@Component
public class HikariMetricsExporter {
@Autowired
private DynamicRoutingDataSource dynamicRoutingDataSource;
@Autowired
private MeterRegistry meterRegistry;
@Scheduled(fixedDelay = 15000)
public void exportHikariMetrics() {
Map<String, DataSource> dataSources = dynamicRoutingDataSource.getCurrentDataSources();
for (Map.Entry<String, DataSource> entry : dataSources.entrySet()) {
if (entry.getValue() instanceof HikariDataSource hikari) {
HikariPoolMXBean pool = hikari.getHikariPoolMXBean();
Gauge.builder("hikari.active.connections", pool, HikariPoolMXBean::getActiveConnections)
.tag("pool", entry.getKey())
.description("活躍連線數")
.register(meterRegistry);
Gauge.builder("hikari.idle.connections", pool, HikariPoolMXBean::getIdleConnections)
.tag("pool", entry.getKey())
.description("空閒連線數")
.register(meterRegistry);
Gauge.builder("hikari.threads.awaiting", pool, HikariPoolMXBean::getThreadsAwaitingConnection)
.tag("pool", entry.getKey())
.description("等待連線的執行緒數")
.register(meterRegistry);
}
}
}
}
Prometheus + Grafana監控面板關鍵指標
| 指標 | 含義 | 告警閾值 |
|---|---|---|
| hikari.active.connections | 目前活躍連線數 | > maxPool * 0.8 |
| hikari.threads.awaiting | 等待連線的執行緒數 | > 5 |
| datasource.switch.count | 資料源切換頻率 | 異常飆升 |
| datasource.query.duration | 查詢耗時 | P99 > 1s |
| hikari.connection.creation | 連線建立速率 | 持續高頻建立 |
生產級架構與最佳實踐
完整生產架構
┌─────────────────────────────────────────────────────────────┐
│ Nginx / Gateway │
└──────────────────────────┬──────────────────────────────────┘
│
┌──────────────────────────▼──────────────────────────────────┐
│ Spring Boot Application │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Controller Layer │ │
│ │ TenantFilter → @DS → @GlobalTransactional │ │
│ └──────────────────────────┬───────────────────────────┘ │
│ │ │
│ ┌──────────────────────────▼───────────────────────────┐ │
│ │ Service Layer │ │
│ │ @DSTransactional / @GlobalTransactional │ │
│ └──────────────────────────┬───────────────────────────┘ │
│ │ │
│ ┌──────────────────────────▼───────────────────────────┐ │
│ │ DynamicRoutingDataSource │ │
│ │ ┌─────────┬──────────┬──────────┬──────────┐ │ │
│ │ │ master │ slave_1 │ order_db │ tenant_X │ │ │
│ │ └────┬────┴────┬─────┴────┬─────┴────┬─────┘ │ │
│ └─────────┼─────────┼──────────┼──────────┼────────────┘ │
└────────────┼─────────┼──────────┼──────────┼────────────────┘
│ │ │ │
┌────▼───┐ ┌──▼───┐ ┌───▼──┐ ┌────▼────┐
│Master │ │Slave │ │Order │ │Tenant │
│MySQL │ │MySQL │ │MySQL │ │MySQL │
└────────┘ └──────┘ └──────┘ └─────────┘
常見陷阱總結
| 陷阱 | 現象 | 解決方案 |
|---|---|---|
| @DS註解失效 | AOP代理導致切換不生效 | 確保註解在public方法上,避免同類內部呼叫 |
| 事務內切換資料源 | 切換後仍在原資料源事務中 | 使用@DSTransactional替代@Transactional |
| ThreadLocal洩漏 | 非同步執行緒中資料源標識殘留 | 在finally中清理DynamicDataSourceContextHolder |
| 連線池耗盡 | 高並發下連線等待逾時 | 按資料源QPS合理配置連線池大小 |
| 主從延遲讀舊資料 | 寫後立即讀從庫讀不到 | 寫後讀走主庫或使用@MasterOnly |
| 分片鍵未命中全表掃描 | 未攜帶分片鍵的查詢路由到所有分片 | 強制要求分片鍵查詢或使用ES寬表 |
| 租戶ID未注入 | SQL缺少tenant_id條件 | 全域租戶攔截器 + 白名單表 |
| 分散式事務逾時 | Seata全域事務逾時回滾 | 合理設定timeout,避免長事務 |
| 動態資料源未關閉 | 租戶註銷後連線池未釋放 | removeDataSource時關閉連線池 |
| HikariCP配置不當 | minimumIdle=maximumPoolSize導致資源浪費 | 區分核心與峰值配置 |
連線池配置參考
spring:
datasource:
dynamic:
hikari:
minimum-idle: 5
maximum-pool-size: 20
idle-timeout: 300000
max-lifetime: 1800000
connection-timeout: 30000
connection-test-query: SELECT 1
pool-name: DynamicHikariCP
datasource:
master:
hikari:
minimum-idle: 10
maximum-pool-size: 50
slave_1:
hikari:
minimum-idle: 10
maximum-pool-size: 30
slave_2:
hikari:
minimum-idle: 10
maximum-pool-size: 30
配置加密
spring:
datasource:
dynamic:
datasource:
master:
url: jdbc:mysql://mysql-master:3306/app_db
username: ENC(g2U3x8vKpQ==)
password: ENC(aB3dE7fG9hJ==)
@Configuration
public class DataSourceEncryptConfig {
@Bean
public DataSourcePropertySourceProcessor dataSourcePropertySourceProcessor() {
return new DataSourcePropertySourceProcessor() {
@Override
public String decrypt(String cipherText) {
if (cipherText.startsWith("ENC(")) {
String encrypted = cipherText.substring(4, cipherText.length() - 1);
return AesUtil.decrypt(encrypted, getSecretKey());
}
return cipherText;
}
};
}
}
優雅停機
@Configuration
public class GracefulShutdownConfig {
@Autowired
private DynamicRoutingDataSource dynamicRoutingDataSource;
@PreDestroy
public void gracefulShutdown() {
log.info("開始優雅關閉資料源...");
Map<String, DataSource> dataSources = dynamicRoutingDataSource.getCurrentDataSources();
for (Map.Entry<String, DataSource> entry : dataSources.entrySet()) {
if (entry.getValue() instanceof HikariDataSource hikari) {
log.info("關閉資料源[{}],活躍連線: {}", entry.getKey(), hikari.getHikariPoolMXBean().getActiveConnections());
hikari.close();
}
}
log.info("所有資料源已關閉");
}
}
多環境配置管理
# application-dev.yml
spring:
datasource:
dynamic:
strict: false
datasource:
master:
url: jdbc:mysql://localhost:3306/dev_db
# application-prod.yml
spring:
datasource:
dynamic:
strict: true
hikari:
minimum-idle: 10
maximum-pool-size: 50
datasource:
master:
url: jdbc:mysql://mysql-master.internal:3306/prod_db
username: ${DB_MASTER_USER}
password: ${DB_MASTER_PWD}
總結
Spring Boot 3 + MyBatis-Plus多資料源架構是企業級Java應用的標配方案。核心選型建議:
- 中小專案:
dynamic-datasource-spring-boot-starter+ 註解切換,簡單高效 - 中大型專案:
ShardingSphere-JDBC+ 分庫分表,功能全面 - 跨語言專案:
ShardingSphere-Proxy獨立代理層 - 事務要求:單庫用
@DSTransactional,跨庫用Seata AT,非同步用MQ最終一致性 - 多租戶:大客戶Schema隔離,小客戶Table隔離 + MyBatis-Plus租戶外掛
多資料源不是銀彈,引入前先評估是否真的需要。能用單庫解決的,不要過度設計。
本站提供瀏覽器本地工具,免註冊即可試用 →
#MyBatis-Plus#多数据源#分库分表#读写分离#ShardingSphere#Spring Boot