ExecutorService 未将完成的线程返回到池中?

问题描述 投票:0回答:1

我创建了一个异步任务线程,它为长时间运行的作业创建了 8 个工作线程。假设我有 8 个长期运行的作业,一切都工作正常。所有 8 个 Worker-Jobs 在例如 10 秒内完成。现在的问题是,如果我将 FixThreadPoolSize 设置为 7(比我的作业量少一个线程),我会再次获得 8 个作业。不是花了大约10秒,而是花了近70秒。但为什么会这样呢?池中似乎没有更多可用线程。但是前面的7个线程已经完成了。最后一个线程他花了 60 秒。

我的任务线程看起来像这样:

 private final ExecutorService executorService;
 private final OpenAiService openAiService;
 private final List<String> workerJobs;

 public TaskThread(List<String> workerJobs,OpenAiService openAiService;) {
    this.workerJobs = workerJobs;
this.openAiService = openAiService;
    this.executorService = Executors.newFixedThreadPool(7, Thread.ofVirtual().name("Thread-", 1).factory());
  }


  @Override
  public void run() {
    List<CompletableFuture<String>> jobs = workerJobs.stream().map(model ->
      CompletableFuture.supplyAsync(new WorkerJob(openAiService,model),executorService)
    ).toList();

    List<String> taskResultModels = jobs
      .stream()
      .map(CompletableFuture::join)
      .toList();
    
    //Save result to DB
  }
}

我的 WorkerThread 像这样:

public class WorkerJob implements Supplier<String> {

  private final OpenAiService openAiService;
  private final String model;

    public WorkerJob(OpenAiService openAiService,String model) {
        this.openAiService = openAiService;
        this.model = model;
    }

  @Override
  public String get() {
    try{
      return openAiService.generate(model);
    }catch(Exception e){
      return "Error";
    }
  }
}

服务看起来像这样: OpenAIClient 是全局创建的,因此所有线程都使用相同的客户端,但即使每个线程创建自己的客户端,我也会遇到同样的问题。 客户端来自这里:https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/openai/azure-ai-openai/README.md

@Service
class OpenAiService{
  private final OpenAIClient openAIClient;

  @Autowired
  public OpenAiService(OpenAIClient openAIClient) {
    this.openAIClient = openAIClient;
  }
  public String generate(String model) {
    try {
      String content = null;
      List<ChatRequestMessage> chatMessages = new ArrayList<>();
      chatMessages.add(new ChatRequestUserMessage(model));
      ChatCompletions chatCompletions = openAIClient.getChatCompletions("id", new ChatCompletionsOptions(chatMessages));
      for (ChatChoice choice : chatCompletions.getChoices()) {
        ChatResponseMessage message = choice.getMessage();
        content = message.getContent();
      }

      return content;
    } catch (Exception e) {
      return "Error";
    }
  }
}

我还使用 ExecutorService 启动 TaskThread: Executors.newFixedThreadPool(2).submit(new TaskThread(List.of(""),openAiService));

java multithreading executorservice
1个回答
0
投票

首先,永远不要池化虚拟线程。研究 JEP 444:虚拟线程。引用(我的重点):

虚拟线程便宜且充足,因此永远不应该池化:应该为每个应用程序任务创建一个新的虚拟线程。因此,大多数虚拟线程都是短暂的,并且具有浅层调用堆栈,只执行单个 HTTP 客户端调用或单个 JDBC 查询。相比之下,平台线程重量级且昂贵,因此通常必须进行池化。它们往往寿命很长,具有很深的调用堆栈,并且在许多任务之间共享。

虚拟线程的全部要点在于,它们在内存和 CPU 方面非常便宜,以至于可以像

面巾纸一样使用:当需要时,获取一个新的,使用并处理。

CompletableFuture.supplyAsync


这种代码是Alger现在用虚拟线程不需要的。只需用简单明了的 Java 代码编写您的任务,然后提交给由虚拟线程支持的执行器服务,每个任务一个新的虚拟线程。

ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor() ;

虚拟线程非常便宜,您可以合理地在传统硬件上同时运行数百万个虚拟线程。但当然,廉价的线程可能会运行昂贵的任务。如果您需要限制太多虚拟线程对宝贵资源的过度使用,请使用 
Semaphore

及其可重用许可池。

    

© www.soinside.com 2019 - 2024. All rights reserved.