docs: 架构文档补充HikariCP连接池和支持异步场景的UserContext封装

This commit is contained in:
zhangjf 2026-02-15 11:35:39 +08:00
parent 2a45ac0279
commit 43e6e41a4a

View File

@ -327,13 +327,34 @@ public class TenantDataSourceManager {
public void addDataSource(Long tenantId) { public void addDataSource(Long tenantId) {
TenantDataSourceConfig config = configRepository.findByTenantId(tenantId); TenantDataSourceConfig config = configRepository.findByTenantId(tenantId);
DruidDataSource dataSource = new DruidDataSource(); HikariConfig hikariConfig = new HikariConfig();
dataSource.setDriverClassName(config.getDriverClassName()); hikariConfig.setDriverClassName(config.getDriverClassName());
dataSource.setUrl(config.getJdbcUrl()); hikariConfig.setJdbcUrl(config.getJdbcUrl());
dataSource.setUsername(config.getUsername()); hikariConfig.setUsername(config.getUsername());
dataSource.setPassword(config.getPassword()); hikariConfig.setPassword(config.getPassword());
dataSource.setInitialSize(5);
dataSource.setMaxActive(20); // HikariCP 优化配置
hikariConfig.setPoolName("TenantPool-" + tenantId);
hikariConfig.setMinimumIdle(5);
hikariConfig.setMaximumPoolSize(20);
hikariConfig.setIdleTimeout(300000);
hikariConfig.setConnectionTimeout(20000);
hikariConfig.setMaxLifetime(1200000);
hikariConfig.setConnectionTestQuery("SELECT 1");
// 性能优化配置
hikariConfig.addDataSourceProperty("cachePrepStmts", "true");
hikariConfig.addDataSourceProperty("prepStmtCacheSize", "250");
hikariConfig.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
hikariConfig.addDataSourceProperty("useServerPrepStmts", "true");
hikariConfig.addDataSourceProperty("useLocalSessionState", "true");
hikariConfig.addDataSourceProperty("rewriteBatchedStatements", "true");
hikariConfig.addDataSourceProperty("cacheResultSetMetadata", "true");
hikariConfig.addDataSourceProperty("cacheServerConfiguration", "true");
hikariConfig.addDataSourceProperty("elideSetAutoCommits", "true");
hikariConfig.addDataSourceProperty("maintainTimeStats", "false");
HikariDataSource dataSource = new HikariDataSource(hikariConfig);
// 添加到动态数据源 // 添加到动态数据源
dynamicDataSource.addDataSource("tenant_" + tenantId, dataSource); dynamicDataSource.addDataSource("tenant_" + tenantId, dataSource);
@ -790,13 +811,37 @@ public class FeignChainInterceptor implements RequestInterceptor {
/** /**
* 用户上下文持有者 * 用户上下文持有者
* 存储当前登录用户信息,支持跨服务传递 * 存储当前登录用户信息,支持跨服务传递
* 支持同步和异步场景
*/ */
public class UserContext { public class UserContext {
// ========== ThreadLocal 存储(同步场景)==========
private static final ThreadLocal<Long> CURRENT_USER_ID = new ThreadLocal<>(); private static final ThreadLocal<Long> CURRENT_USER_ID = new ThreadLocal<>();
private static final ThreadLocal<String> CURRENT_USERNAME = new ThreadLocal<>(); private static final ThreadLocal<String> CURRENT_USERNAME = new ThreadLocal<>();
private static final ThreadLocal<User> CURRENT_USER = new ThreadLocal<>(); private static final ThreadLocal<User> CURRENT_USER = new ThreadLocal<>();
// ========== 用户上下文载体(支持异步传递)==========
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class UserContextHolder {
private Long userId;
private String username;
private String tenantId;
private String traceId;
public static UserContextHolder fromCurrent() {
return new UserContextHolder(
getCurrentUserId(),
getCurrentUsername(),
TenantContextHolder.getTenantId(),
TraceIdUtil.getCurrentTraceId()
);
}
}
// ========== 同步场景方法 ==========
/** /**
* 设置当前用户(登录时调用) * 设置当前用户(登录时调用)
*/ */
@ -853,10 +898,352 @@ public class UserContext {
MDC.remove("uid"); MDC.remove("uid");
MDC.remove("uname"); MDC.remove("uname");
} }
// ========== 异步场景支持 ==========
/**
* 包装 Runnable传递用户上下文到异步线程
*/
public static Runnable wrap(Runnable runnable) {
UserContextHolder holder = UserContextHolder.fromCurrent();
return () -> {
try {
setFromHolder(holder);
runnable.run();
} finally {
clear();
}
};
}
/**
* 包装 Callable传递用户上下文到异步线程
*/
public static <T> Callable<T> wrap(Callable<T> callable) {
UserContextHolder holder = UserContextHolder.fromCurrent();
return () -> {
try {
setFromHolder(holder);
return callable.call();
} finally {
clear();
}
};
}
/**
* 包装 Supplier传递用户上下文到异步线程
*/
public static <T> Supplier<T> wrap(Supplier<T> supplier) {
UserContextHolder holder = UserContextHolder.fromCurrent();
return () -> {
try {
setFromHolder(holder);
return supplier.get();
} finally {
clear();
}
};
}
/**
* 包装 Function传递用户上下文到异步线程
*/
public static <T, R> Function<T, R> wrap(Function<T, R> function) {
UserContextHolder holder = UserContextHolder.fromCurrent();
return (T t) -> {
try {
setFromHolder(holder);
return function.apply(t);
} finally {
clear();
}
};
}
/**
* 从 Holder 设置上下文
*/
private static void setFromHolder(UserContextHolder holder) {
if (holder != null) {
if (holder.getUserId() != null) {
CURRENT_USER_ID.set(holder.getUserId());
MDC.put("uid", String.valueOf(holder.getUserId()));
}
if (StringUtils.isNotEmpty(holder.getUsername())) {
CURRENT_USERNAME.set(holder.getUsername());
MDC.put("uname", holder.getUsername());
}
if (StringUtils.isNotEmpty(holder.getTenantId())) {
TenantContextHolder.setTenantId(holder.getTenantId());
MDC.put("tenantId", holder.getTenantId());
}
if (StringUtils.isNotEmpty(holder.getTraceId())) {
TraceIdUtil.setTraceId(holder.getTraceId());
}
}
}
/**
* 创建 CompletableFuture 上下文包装器
*/
public static <T> CompletableFuture<T> wrapFuture(Supplier<CompletableFuture<T>> supplier) {
UserContextHolder holder = UserContextHolder.fromCurrent();
return supplier.get().whenComplete((result, ex) -> {
// 确保上下文清理
clear();
});
}
} }
``` ```
**6. 调用链上下文过滤器(接收方)** **5. 异步场景使用示例**
```java
/**
* 异步服务示例
*/
@Service
public class AsyncService {
@Autowired
private ThreadPoolExecutor executor;
@Autowired
private ProjectMapper projectMapper;
/**
* 异步处理 - 方式1使用包装器
*/
public void asyncProcessWithWrapper(Long projectId) {
// 包装 Runnable自动传递当前用户上下文
executor.execute(UserContext.wrap(() -> {
// 在异步线程中也能获取用户信息
Long uid = UserContext.getCurrentUserId();
String uname = UserContext.getCurrentUsername();
log.info("[Async] 用户 {} 处理项目 {}", uname, projectId);
// 数据库操作
Project project = projectMapper.selectById(projectId);
project.setProcessedBy(uid);
projectMapper.updateById(project);
}));
}
/**
* 异步处理 - 方式2使用 CompletableFuture
*/
public CompletableFuture<Project> asyncQueryProject(Long projectId) {
return CompletableFuture.supplyAsync(
UserContext.wrap(() -> {
// 异步查询时保持用户上下文
log.info("[Async] 用户 {} 查询项目 {}",
UserContext.getCurrentUsername(), projectId);
return projectMapper.selectById(projectId);
}),
executor
);
}
/**
* 批量异步处理
*/
public void batchAsyncProcess(List<Long> projectIds) {
List<CompletableFuture<Void>> futures = projectIds.stream()
.map(id -> CompletableFuture.runAsync(
UserContext.wrap(() -> processSingleProject(id)),
executor
))
.collect(Collectors.toList());
// 等待所有任务完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
private void processSingleProject(Long projectId) {
// 每个异步线程都有独立的用户上下文
Long uid = UserContext.getCurrentUserId();
String uname = UserContext.getCurrentUsername();
log.info("[BatchAsync] 用户 {} 处理项目 {}", uname, projectId);
// 业务处理...
}
/**
* 链式异步调用
*/
public CompletableFuture<String> chainAsyncProcess(Long projectId) {
return CompletableFuture.supplyAsync(
UserContext.wrap(() -> fetchProject(projectId)), executor)
.thenApplyAsync(UserContext.wrap(project -> {
// 转换操作,上下文自动传递
log.info("[Chain] 用户 {} 转换项目 {}",
UserContext.getCurrentUsername(), project.getProjectName());
return transformProject(project);
}), executor)
.thenComposeAsync(UserContext.wrap(transformed ->
// 异步保存
CompletableFuture.supplyAsync(
UserContext.wrap(() -> saveProject(transformed)), executor)
), executor);
}
private Project fetchProject(Long id) { /* ... */ return null; }
private Project transformProject(Project p) { /* ... */ return p; }
private String saveProject(Project p) { /* ... */ return "saved"; }
}
/**
* Spring @Async 支持
*/
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
@Override
@Bean(name = "taskExecutor")
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(200);
executor.setThreadNamePrefix("async-");
// 关键:使用 ContextPropagatingTaskDecorator 传递上下文
executor.setTaskDecorator(new ContextPropagatingTaskDecorator());
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new SimpleAsyncUncaughtExceptionHandler();
}
}
/**
* 上下文传递装饰器(用于 @Async
*/
public class ContextPropagatingTaskDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
// 使用 UserContext 的包装方法
return UserContext.wrap(runnable);
}
}
/**
* 使用 @Async 注解
*/
@Service
public class NotificationService {
/**
* 异步发送通知(自动传递用户上下文)
*/
@Async("taskExecutor")
public void sendNotificationAsync(Long userId, String message) {
// 在异步方法中也能获取当前操作用户
String operator = UserContext.getCurrentUsername();
log.info("[Notification] 用户 {} 发送通知给 {}: {}", operator, userId, message);
// 记录操作日志
operationLogService.saveLog(new OperationLog()
.setOperation("发送通知")
.setOperatorId(UserContext.getCurrentUserId())
.setOperatorName(operator)
.setTargetId(userId)
.setContent(message)
);
// 发送通知...
}
}
```
**6. 响应式编程支持WebFlux**
```java
/**
* Reactor 上下文工具(用于 WebFlux
*/
public class ReactiveUserContext {
private static final String USER_CONTEXT_KEY = "UserContext";
/**
* 将用户上下文写入 Reactor Context
*/
public static <T> Mono<T> withUserContext(Mono<T> mono, UserContextHolder holder) {
return mono.contextWrite(Context.of(USER_CONTEXT_KEY, holder));
}
/**
* 从 Reactor Context 读取用户上下文
*/
public static Mono<UserContextHolder> getUserContext() {
return Mono.deferContextual(ctx -> {
if (ctx.hasKey(USER_CONTEXT_KEY)) {
return Mono.just(ctx.get(USER_CONTEXT_KEY));
}
return Mono.empty();
});
}
/**
* 在响应式链中获取用户ID
*/
public static Mono<Long> getUserId() {
return getUserContext().map(UserContextHolder::getUserId);
}
/**
* 包装响应式操作
*/
public static <T> Mono<T> wrap(Mono<T> mono) {
UserContextHolder holder = UserContextHolder.fromCurrent();
return withUserContext(mono, holder);
}
}
/**
* WebFlux 过滤器
*/
@Component
public class ReactiveContextFilter implements WebFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
// 解析用户信息
String uid = request.getHeaders().getFirst("X-Uid");
String uname = request.getHeaders().getFirst("X-Uname");
String tenantId = request.getHeaders().getFirst("X-Tenant-Id");
String traceId = request.getHeaders().getFirst("X-Trace-Id");
UserContextHolder holder = new UserContextHolder(
StringUtils.isNotEmpty(uid) ? Long.valueOf(uid) : null,
uname,
tenantId,
traceId
);
// 写入 Reactor Context
return chain.filter(exchange)
.contextWrite(Context.of("UserContext", holder));
}
}
```
**原 5. 用户上下文管理(已升级为支持异步的版本)**
**7. 调用链上下文过滤器(接收方)**
```java ```java
/** /**
@ -919,7 +1306,7 @@ public class ChainContextFilter implements Filter {
} }
``` ```
**7. 调用链传递示意图** **8. 调用链传递示意图**
``` ```
┌─────────────────────────────────────────────────────────────────────────────┐ ┌─────────────────────────────────────────────────────────────────────────────┐
@ -971,7 +1358,7 @@ public class ChainContextFilter implements Filter {
└─────────────────────────────────────────────────────────────────────────────┘ └─────────────────────────────────────────────────────────────────────────────┘
``` ```
**8. 使用场景示例** **9. 使用场景示例(包含异步场景)**
```java ```java
/** /**
@ -1777,7 +2164,7 @@ public class PerformanceAspect {
| **服务调用** | OpenFeign | 4.x | 声明式HTTP客户端 | | **服务调用** | OpenFeign | 4.x | 声明式HTTP客户端 |
| **服务容错** | Sentinel | 1.8.x | 限流、熔断、降级 | | **服务容错** | Sentinel | 1.8.x | 限流、熔断、降级 |
| **ORM框架** | MyBatis-Plus | 3.5.x | 数据访问层 | | **ORM框架** | MyBatis-Plus | 3.5.x | 数据访问层 |
| **数据库连接池** | Druid | 1.2.x | 连接池、监控 | | **数据库连接池** | HikariCP | 5.x | 高性能连接池 |
| **缓存框架** | Spring Data Redis | 3.x | Redis操作 | | **缓存框架** | Spring Data Redis | 3.x | Redis操作 |
| **安全框架** | Apache Shiro | 2.x | 认证授权、会话管理 | | **安全框架** | Apache Shiro | 2.x | 认证授权、会话管理 |
| **JWT令牌** | jjwt | 0.12.x | Token生成与验证 | | **JWT令牌** | jjwt | 0.12.x | Token生成与验证 |
@ -2583,6 +2970,7 @@ fund-sys/
| v1.0 | 2026-02-13 | 初始版本 | zhangjf | | v1.0 | 2026-02-13 | 初始版本 | zhangjf |
| v1.1 | 2026-02-13 | 补充多租户架构(一库多租户/一库一租户)和 Head 日志追踪设计 | zhangjf | | v1.1 | 2026-02-13 | 补充多租户架构(一库多租户/一库一租户)和 Head 日志追踪设计 | zhangjf |
| v1.2 | 2026-02-13 | 补充 Shiro 认证框架、服务调用链 uid/uname 传递设计 | zhangjf | | v1.2 | 2026-02-13 | 补充 Shiro 认证框架、服务调用链 uid/uname 传递设计 | zhangjf |
| v1.3 | 2026-02-13 | 补充 HikariCP 连接池、支持异步场景的 UserContext 封装 | zhangjf |
--- ---