diff --git a/doc/资金服务平台 FundPlatform 架构设计文档.md b/doc/资金服务平台 FundPlatform 架构设计文档.md index 8ad71dc..865cdb5 100644 --- a/doc/资金服务平台 FundPlatform 架构设计文档.md +++ b/doc/资金服务平台 FundPlatform 架构设计文档.md @@ -327,13 +327,34 @@ public class TenantDataSourceManager { public void addDataSource(Long tenantId) { TenantDataSourceConfig config = configRepository.findByTenantId(tenantId); - DruidDataSource dataSource = new DruidDataSource(); - dataSource.setDriverClassName(config.getDriverClassName()); - dataSource.setUrl(config.getJdbcUrl()); - dataSource.setUsername(config.getUsername()); - dataSource.setPassword(config.getPassword()); - dataSource.setInitialSize(5); - dataSource.setMaxActive(20); + HikariConfig hikariConfig = new HikariConfig(); + hikariConfig.setDriverClassName(config.getDriverClassName()); + hikariConfig.setJdbcUrl(config.getJdbcUrl()); + hikariConfig.setUsername(config.getUsername()); + hikariConfig.setPassword(config.getPassword()); + + // HikariCP 优化配置 + hikariConfig.setPoolName("TenantPool-" + tenantId); + hikariConfig.setMinimumIdle(5); + hikariConfig.setMaximumPoolSize(20); + hikariConfig.setIdleTimeout(300000); + hikariConfig.setConnectionTimeout(20000); + hikariConfig.setMaxLifetime(1200000); + hikariConfig.setConnectionTestQuery("SELECT 1"); + + // 性能优化配置 + hikariConfig.addDataSourceProperty("cachePrepStmts", "true"); + hikariConfig.addDataSourceProperty("prepStmtCacheSize", "250"); + hikariConfig.addDataSourceProperty("prepStmtCacheSqlLimit", "2048"); + hikariConfig.addDataSourceProperty("useServerPrepStmts", "true"); + hikariConfig.addDataSourceProperty("useLocalSessionState", "true"); + hikariConfig.addDataSourceProperty("rewriteBatchedStatements", "true"); + hikariConfig.addDataSourceProperty("cacheResultSetMetadata", "true"); + hikariConfig.addDataSourceProperty("cacheServerConfiguration", "true"); + hikariConfig.addDataSourceProperty("elideSetAutoCommits", "true"); + hikariConfig.addDataSourceProperty("maintainTimeStats", "false"); + + HikariDataSource dataSource = new HikariDataSource(hikariConfig); // 添加到动态数据源 dynamicDataSource.addDataSource("tenant_" + tenantId, dataSource); @@ -790,13 +811,37 @@ public class FeignChainInterceptor implements RequestInterceptor { /** * 用户上下文持有者 * 存储当前登录用户信息,支持跨服务传递 + * 支持同步和异步场景 */ public class UserContext { + // ========== ThreadLocal 存储(同步场景)========== private static final ThreadLocal CURRENT_USER_ID = new ThreadLocal<>(); private static final ThreadLocal CURRENT_USERNAME = new ThreadLocal<>(); private static final ThreadLocal CURRENT_USER = new ThreadLocal<>(); + // ========== 用户上下文载体(支持异步传递)========== + @Data + @AllArgsConstructor + @NoArgsConstructor + public static class UserContextHolder { + private Long userId; + private String username; + private String tenantId; + private String traceId; + + public static UserContextHolder fromCurrent() { + return new UserContextHolder( + getCurrentUserId(), + getCurrentUsername(), + TenantContextHolder.getTenantId(), + TraceIdUtil.getCurrentTraceId() + ); + } + } + + // ========== 同步场景方法 ========== + /** * 设置当前用户(登录时调用) */ @@ -853,10 +898,352 @@ public class UserContext { MDC.remove("uid"); MDC.remove("uname"); } + + // ========== 异步场景支持 ========== + + /** + * 包装 Runnable,传递用户上下文到异步线程 + */ + public static Runnable wrap(Runnable runnable) { + UserContextHolder holder = UserContextHolder.fromCurrent(); + return () -> { + try { + setFromHolder(holder); + runnable.run(); + } finally { + clear(); + } + }; + } + + /** + * 包装 Callable,传递用户上下文到异步线程 + */ + public static Callable wrap(Callable callable) { + UserContextHolder holder = UserContextHolder.fromCurrent(); + return () -> { + try { + setFromHolder(holder); + return callable.call(); + } finally { + clear(); + } + }; + } + + /** + * 包装 Supplier,传递用户上下文到异步线程 + */ + public static Supplier wrap(Supplier supplier) { + UserContextHolder holder = UserContextHolder.fromCurrent(); + return () -> { + try { + setFromHolder(holder); + return supplier.get(); + } finally { + clear(); + } + }; + } + + /** + * 包装 Function,传递用户上下文到异步线程 + */ + public static Function wrap(Function function) { + UserContextHolder holder = UserContextHolder.fromCurrent(); + return (T t) -> { + try { + setFromHolder(holder); + return function.apply(t); + } finally { + clear(); + } + }; + } + + /** + * 从 Holder 设置上下文 + */ + private static void setFromHolder(UserContextHolder holder) { + if (holder != null) { + if (holder.getUserId() != null) { + CURRENT_USER_ID.set(holder.getUserId()); + MDC.put("uid", String.valueOf(holder.getUserId())); + } + if (StringUtils.isNotEmpty(holder.getUsername())) { + CURRENT_USERNAME.set(holder.getUsername()); + MDC.put("uname", holder.getUsername()); + } + if (StringUtils.isNotEmpty(holder.getTenantId())) { + TenantContextHolder.setTenantId(holder.getTenantId()); + MDC.put("tenantId", holder.getTenantId()); + } + if (StringUtils.isNotEmpty(holder.getTraceId())) { + TraceIdUtil.setTraceId(holder.getTraceId()); + } + } + } + + /** + * 创建 CompletableFuture 上下文包装器 + */ + public static CompletableFuture wrapFuture(Supplier> supplier) { + UserContextHolder holder = UserContextHolder.fromCurrent(); + return supplier.get().whenComplete((result, ex) -> { + // 确保上下文清理 + clear(); + }); + } } ``` -**6. 调用链上下文过滤器(接收方)** +**5. 异步场景使用示例** + +```java +/** + * 异步服务示例 + */ +@Service +public class AsyncService { + + @Autowired + private ThreadPoolExecutor executor; + + @Autowired + private ProjectMapper projectMapper; + + /** + * 异步处理 - 方式1:使用包装器 + */ + public void asyncProcessWithWrapper(Long projectId) { + // 包装 Runnable,自动传递当前用户上下文 + executor.execute(UserContext.wrap(() -> { + // 在异步线程中也能获取用户信息 + Long uid = UserContext.getCurrentUserId(); + String uname = UserContext.getCurrentUsername(); + + log.info("[Async] 用户 {} 处理项目 {}", uname, projectId); + + // 数据库操作 + Project project = projectMapper.selectById(projectId); + project.setProcessedBy(uid); + projectMapper.updateById(project); + })); + } + + /** + * 异步处理 - 方式2:使用 CompletableFuture + */ + public CompletableFuture asyncQueryProject(Long projectId) { + return CompletableFuture.supplyAsync( + UserContext.wrap(() -> { + // 异步查询时保持用户上下文 + log.info("[Async] 用户 {} 查询项目 {}", + UserContext.getCurrentUsername(), projectId); + + return projectMapper.selectById(projectId); + }), + executor + ); + } + + /** + * 批量异步处理 + */ + public void batchAsyncProcess(List projectIds) { + List> futures = projectIds.stream() + .map(id -> CompletableFuture.runAsync( + UserContext.wrap(() -> processSingleProject(id)), + executor + )) + .collect(Collectors.toList()); + + // 等待所有任务完成 + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + } + + private void processSingleProject(Long projectId) { + // 每个异步线程都有独立的用户上下文 + Long uid = UserContext.getCurrentUserId(); + String uname = UserContext.getCurrentUsername(); + + log.info("[BatchAsync] 用户 {} 处理项目 {}", uname, projectId); + + // 业务处理... + } + + /** + * 链式异步调用 + */ + public CompletableFuture chainAsyncProcess(Long projectId) { + return CompletableFuture.supplyAsync( + UserContext.wrap(() -> fetchProject(projectId)), executor) + .thenApplyAsync(UserContext.wrap(project -> { + // 转换操作,上下文自动传递 + log.info("[Chain] 用户 {} 转换项目 {}", + UserContext.getCurrentUsername(), project.getProjectName()); + return transformProject(project); + }), executor) + .thenComposeAsync(UserContext.wrap(transformed -> + // 异步保存 + CompletableFuture.supplyAsync( + UserContext.wrap(() -> saveProject(transformed)), executor) + ), executor); + } + + private Project fetchProject(Long id) { /* ... */ return null; } + private Project transformProject(Project p) { /* ... */ return p; } + private String saveProject(Project p) { /* ... */ return "saved"; } +} + +/** + * Spring @Async 支持 + */ +@Configuration +@EnableAsync +public class AsyncConfig implements AsyncConfigurer { + + @Override + @Bean(name = "taskExecutor") + public Executor getAsyncExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(10); + executor.setMaxPoolSize(50); + executor.setQueueCapacity(200); + executor.setThreadNamePrefix("async-"); + + // 关键:使用 ContextPropagatingTaskDecorator 传递上下文 + executor.setTaskDecorator(new ContextPropagatingTaskDecorator()); + executor.initialize(); + return executor; + } + + @Override + public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { + return new SimpleAsyncUncaughtExceptionHandler(); + } +} + +/** + * 上下文传递装饰器(用于 @Async) + */ +public class ContextPropagatingTaskDecorator implements TaskDecorator { + + @Override + public Runnable decorate(Runnable runnable) { + // 使用 UserContext 的包装方法 + return UserContext.wrap(runnable); + } +} + +/** + * 使用 @Async 注解 + */ +@Service +public class NotificationService { + + /** + * 异步发送通知(自动传递用户上下文) + */ + @Async("taskExecutor") + public void sendNotificationAsync(Long userId, String message) { + // 在异步方法中也能获取当前操作用户 + String operator = UserContext.getCurrentUsername(); + log.info("[Notification] 用户 {} 发送通知给 {}: {}", operator, userId, message); + + // 记录操作日志 + operationLogService.saveLog(new OperationLog() + .setOperation("发送通知") + .setOperatorId(UserContext.getCurrentUserId()) + .setOperatorName(operator) + .setTargetId(userId) + .setContent(message) + ); + + // 发送通知... + } +} +``` + +**6. 响应式编程支持(WebFlux)** + +```java +/** + * Reactor 上下文工具(用于 WebFlux) + */ +public class ReactiveUserContext { + + private static final String USER_CONTEXT_KEY = "UserContext"; + + /** + * 将用户上下文写入 Reactor Context + */ + public static Mono withUserContext(Mono mono, UserContextHolder holder) { + return mono.contextWrite(Context.of(USER_CONTEXT_KEY, holder)); + } + + /** + * 从 Reactor Context 读取用户上下文 + */ + public static Mono getUserContext() { + return Mono.deferContextual(ctx -> { + if (ctx.hasKey(USER_CONTEXT_KEY)) { + return Mono.just(ctx.get(USER_CONTEXT_KEY)); + } + return Mono.empty(); + }); + } + + /** + * 在响应式链中获取用户ID + */ + public static Mono getUserId() { + return getUserContext().map(UserContextHolder::getUserId); + } + + /** + * 包装响应式操作 + */ + public static Mono wrap(Mono mono) { + UserContextHolder holder = UserContextHolder.fromCurrent(); + return withUserContext(mono, holder); + } +} + +/** + * WebFlux 过滤器 + */ +@Component +public class ReactiveContextFilter implements WebFilter { + + @Override + public Mono filter(ServerWebExchange exchange, WebFilterChain chain) { + ServerHttpRequest request = exchange.getRequest(); + + // 解析用户信息 + String uid = request.getHeaders().getFirst("X-Uid"); + String uname = request.getHeaders().getFirst("X-Uname"); + String tenantId = request.getHeaders().getFirst("X-Tenant-Id"); + String traceId = request.getHeaders().getFirst("X-Trace-Id"); + + UserContextHolder holder = new UserContextHolder( + StringUtils.isNotEmpty(uid) ? Long.valueOf(uid) : null, + uname, + tenantId, + traceId + ); + + // 写入 Reactor Context + return chain.filter(exchange) + .contextWrite(Context.of("UserContext", holder)); + } +} +``` + +**原 5. 用户上下文管理(已升级为支持异步的版本)** + + +**7. 调用链上下文过滤器(接收方)** ```java /** @@ -919,7 +1306,7 @@ public class ChainContextFilter implements Filter { } ``` -**7. 调用链传递示意图** +**8. 调用链传递示意图** ``` ┌─────────────────────────────────────────────────────────────────────────────┐ @@ -971,7 +1358,7 @@ public class ChainContextFilter implements Filter { └─────────────────────────────────────────────────────────────────────────────┘ ``` -**8. 使用场景示例** +**9. 使用场景示例(包含异步场景)** ```java /** @@ -1777,7 +2164,7 @@ public class PerformanceAspect { | **服务调用** | OpenFeign | 4.x | 声明式HTTP客户端 | | **服务容错** | Sentinel | 1.8.x | 限流、熔断、降级 | | **ORM框架** | MyBatis-Plus | 3.5.x | 数据访问层 | -| **数据库连接池** | Druid | 1.2.x | 连接池、监控 | +| **数据库连接池** | HikariCP | 5.x | 高性能连接池 | | **缓存框架** | Spring Data Redis | 3.x | Redis操作 | | **安全框架** | Apache Shiro | 2.x | 认证授权、会话管理 | | **JWT令牌** | jjwt | 0.12.x | Token生成与验证 | @@ -2583,6 +2970,7 @@ fund-sys/ | v1.0 | 2026-02-13 | 初始版本 | zhangjf | | v1.1 | 2026-02-13 | 补充多租户架构(一库多租户/一库一租户)和 Head 日志追踪设计 | zhangjf | | v1.2 | 2026-02-13 | 补充 Shiro 认证框架、服务调用链 uid/uname 传递设计 | zhangjf | +| v1.3 | 2026-02-13 | 补充 HikariCP 连接池、支持异步场景的 UserContext 封装 | zhangjf | ---