我需要解决以下问题,举例说明:
我的 Spring 应用程序每分钟都会检查是否有新作业。如果有作业,它的子任务应该并行运行。它们大多是在“其他地方”完成的,并且依赖于 API 调用。它们的运行时间从 10 分钟到几个小时不等。这些 API 调用以不记名令牌的形式进行身份验证,其生命周期为一小时,因此我们需要能够更新它们。 当所有子任务完成后,必须完成一些(单独的)转换,然后保存结果。
我当前的方法是让 Scheduler 每分钟运行一次并运行一个 @Async 函数,如下所示:
CompletableFuture<Result> subtask1 = subtaskService1.performTask()
.thenCompose(subtaskService1::transfrom)
CompletableFuture<Result> subtask2 = subtaskService2.performTask()
.thenCompose(subtaskService2::transfrom)
CompletableFuture<Result> subtask3 = subtaskService3.performTask()
.thenCompose(subtaskService3::transfrom)
CompletableFuture.allOf(subtask1, subtask2, subtask3).join()
saveResult(subtask1.get(), subtask2.get(), subtask3.get())
performTask 方法也是 @Async 并且主要处理逻辑:
现在我遇到了一些意想不到的问题:
WebClient.builder()
...
.filter((request, next) -> next.exchange(withBearerAuth(request)))
...
其中 withBearerAuth 进行同步 WebClient 调用(不同的实例),以在旧的 Bearer-token 过期时获取新的 Bearer-token。
多线程让我很困惑。有时,日志中的线程使用我在 Spring 异步配置的执行器中配置的前缀。有时它使用“ForkJoinPool.commonPool-worker-x”,有时使用“parallel-x”之类的东西。差别在哪里呢?这是出现问题的征兆吗?
最后,无论我将多少个线程配置为 CorePoolSize,并行任务的数量都低于我的预期。即使我将 CorePoolSize 配置为 10 并且只有 3 个不同的子任务,它们也并不总是并行运行。我必须以编程方式“释放”线程吗?
我应该研究 VirtualThreads 吗?
一些观察:
考虑到这些观察结果,我们可以慢慢来,坚持一个简单的线程模型:放弃任何有反应性/通量的东西,坚持简单的“Spring MVC”。将
WebClient
替换为同步(阻塞)客户端(例如 RestClient
)。并保持简单:每次需要在单独的调用中使用承载令牌之前都刷新它。
这应该可以修复 1) 和 2)。
CorePoolSize
(3) 仅用于设置准备执行工作的最小线程数,对正在使用的最大线程数没有影响(将根据需要创建新线程)。
Spring Boot 3.2 及 Java 21 中(正确)支持虚拟线程,但它们无法帮助您解决当前的问题。但是,如果您需要应用程序使用更少的资源(并且速度更快),您可以切换到使用虚拟线程(即将 Spring Boot 配置为使用虚拟线程,可能会对应用程序代码进行一些细微的更改)。这就是虚拟线程的美妙之处:您的“老派”阻塞代码(使用大量昂贵的平台线程)突然变得与非阻塞反应式解决方案一样高效和高性能,而没有这些解决方案的缺点。但是当前代码中的任何(逻辑)问题都无法通过切换到虚拟线程来解决。事实上,虚拟线程的使用甚至可能会突出当前代码中存在的一些逻辑问题,因为执行变得更快且更加并行(例如,引起您不知道存在的竞争条件)。