用于计划的、长时间运行的 api 调用的 Spring @Async 架构

问题描述 投票:0回答:1

我需要解决以下问题,举例说明:

我的 Spring 应用程序每分钟都会检查是否有新作业。如果有作业,它的子任务应该并行运行。它们大多是在“其他地方”完成的,并且依赖于 API 调用。它们的运行时间从 10 分钟到几个小时不等。这些 API 调用以不记名令牌的形式进行身份验证,其生命周期为一小时,因此我们需要能够更新它们。 当所有子任务完成后,必须完成一些(单独的)转换,然后保存结果。

我当前的方法是让 Scheduler 每分钟运行一次并运行一个 @Async 函数,如下所示:

CompletableFuture<Result> subtask1 = subtaskService1.performTask()
             .thenCompose(subtaskService1::transfrom)

CompletableFuture<Result> subtask2 = subtaskService2.performTask()
             .thenCompose(subtaskService2::transfrom)

CompletableFuture<Result> subtask3 = subtaskService3.performTask()
             .thenCompose(subtaskService3::transfrom)


CompletableFuture.allOf(subtask1, subtask2, subtask3).join()

saveResult(subtask1.get(), subtask2.get(), subtask3.get())

performTask 方法也是 @Async 并且主要处理逻辑:

  1. 如果没有或当前的身份验证令牌不再有效,则获取有效的身份验证令牌
  2. 启动一个进程
  3. 一个 while 循环,用于检查远程任务的状态,如果它仍在进行,则通过 Thread.sleep() 等待两分钟。这不是@Async
  4. 如果状态完成,它将获取结果并返回 CompletableFuture.supplyAsync() 或 CompletableFuture.completedFuture()

现在我遇到了一些意想不到的问题:

  1. 有时(不是每次)当我获得新令牌时,我会从 WebClient 收到“block()/blockFirst()/blockLast() areblocking error”。现在这个逻辑的实现是这样的:
WebClient.builder()
...
.filter((request, next) -> next.exchange(withBearerAuth(request)))
...

其中 withBearerAuth 进行同步 WebClient 调用(不同的实例),以在旧的 Bearer-token 过期时获取新的 Bearer-token。

  1. 多线程让我很困惑。有时,日志中的线程使用我在 Spring 异步配置的执行器中配置的前缀。有时它使用“ForkJoinPool.commonPool-worker-x”,有时使用“parallel-x”之类的东西。差别在哪里呢?这是出现问题的征兆吗?

  2. 最后,无论我将多少个线程配置为 CorePoolSize,并行任务的数量都低于我的预期。即使我将 CorePoolSize 配置为 10 并且只有 3 个不同的子任务,它们也并不总是并行运行。我必须以编程方式“释放”线程吗?

  3. 我应该研究 VirtualThreads 吗?

java spring asynchronous scheduled-tasks java-threads
1个回答
0
投票

一些观察:

  • 任务运行几分钟,而不是毫秒
  • 多线程令人困惑

考虑到这些观察结果,我们可以慢慢来,坚持一个简单的线程模型:放弃任何有反应性/通量的东西,坚持简单的“Spring MVC”。将

WebClient
替换为同步(阻塞)客户端(例如
RestClient
)。并保持简单:每次需要在单独的调用中使用承载令牌之前都刷新它。

这应该可以修复 1) 和 2)。

CorePoolSize
(3) 仅用于设置准备执行工作的最小线程数,对正在使用的最大线程数没有影响(将根据需要创建新线程)。

Spring Boot 3.2 及 Java 21 中(正确)支持虚拟线程,但它们无法帮助您解决当前的问题。但是,如果您需要应用程序使用更少的资源(并且速度更快),您可以切换到使用虚拟线程(即将 Spring Boot 配置为使用虚拟线程,可能会对应用程序代码进行一些细微的更改)。这就是虚拟线程的美妙之处:您的“老派”阻塞代码(使用大量昂贵的平台线程)突然变得与非阻塞反应式解决方案一样高效和高性能,而没有这些解决方案的缺点。但是当前代码中的任何(逻辑)问题都无法通过切换到虚拟线程来解决。事实上,虚拟线程的使用甚至可能会突出当前代码中存在的一些逻辑问题,因为执行变得更快且更加并行(例如,引起您不知道存在的竞争条件)。

© www.soinside.com 2019 - 2024. All rights reserved.