我创建了一个异步任务线程,它为长时间运行的作业创建了 8 个工作线程。假设我有 8 个长期运行的作业,一切都工作正常。所有 8 个 Worker-Jobs 在例如 10 秒内完成。现在的问题是,如果我将 FixThreadPoolSize 设置为 7(比我的作业量少一个线程),我会再次获得 8 个作业。不是花了大约10秒,而是花了近70秒。但为什么会这样呢?池中似乎没有更多可用线程。但是前面的7个线程已经完成了。最后一个线程他花了 60 秒。
我的任务线程看起来像这样:
private final ExecutorService executorService;
private final OpenAiService openAiService;
private final List<String> workerJobs;
public TaskThread(List<String> workerJobs,OpenAiService openAiService;) {
this.workerJobs = workerJobs;
this.openAiService = openAiService;
this.executorService = Executors.newFixedThreadPool(7, Thread.ofVirtual().name("Thread-", 1).factory());
}
@Override
public void run() {
List<CompletableFuture<String>> jobs = workerJobs.stream().map(model ->
CompletableFuture.supplyAsync(new WorkerJob(openAiService,model),executorService)
).toList();
List<String> taskResultModels = jobs
.stream()
.map(CompletableFuture::join)
.toList();
//Save result to DB
}
}
我的 WorkerThread 像这样:
public class WorkerJob implements Supplier<String> {
private final OpenAiService openAiService;
private final String model;
public WorkerJob(OpenAiService openAiService,String model) {
this.openAiService = openAiService;
this.model = model;
}
@Override
public String get() {
try{
return openAiService.generate(model);
}catch(Exception e){
return "Error";
}
}
}
服务看起来像这样: OpenAIClient 是全局创建的,因此所有线程都使用相同的客户端,但即使每个线程创建自己的客户端,我也会遇到同样的问题。 客户端来自这里:https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/openai/azure-ai-openai/README.md
@Service
class OpenAiService{
private final OpenAIClient openAIClient;
@Autowired
public OpenAiService(OpenAIClient openAIClient) {
this.openAIClient = openAIClient;
}
public String generate(String model) {
try {
String content = null;
List<ChatRequestMessage> chatMessages = new ArrayList<>();
chatMessages.add(new ChatRequestUserMessage(model));
ChatCompletions chatCompletions = openAIClient.getChatCompletions("id", new ChatCompletionsOptions(chatMessages));
for (ChatChoice choice : chatCompletions.getChoices()) {
ChatResponseMessage message = choice.getMessage();
content = message.getContent();
}
return content;
} catch (Exception e) {
return "Error";
}
}
}
我还使用 ExecutorService 启动 TaskThread: Executors.newFixedThreadPool(2).submit(new TaskThread(List.of(""),openAiService));
首先,永远不要池化虚拟线程。研究 JEP 444:虚拟线程。引用(我的重点):
虚拟线程的全部要点在于,它们在内存和 CPU 方面非常便宜,以至于可以像虚拟线程便宜且充足,因此永远不应该池化:应该为每个应用程序任务创建一个新的虚拟线程。因此,大多数虚拟线程都是短暂的,并且具有浅层调用堆栈,只执行单个 HTTP 客户端调用或单个 JDBC 查询。相比之下,平台线程重量级且昂贵,因此通常必须进行池化。它们往往寿命很长,具有很深的调用堆栈,并且在许多任务之间共享。
CompletableFuture.supplyAsync
这种代码是Alger现在用虚拟线程不需要的。只需用简单明了的 Java 代码编写您的任务,然后提交给由虚拟线程支持的执行器服务,每个任务一个新的虚拟线程。
ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor() ;
虚拟线程非常便宜,您可以合理地在传统硬件上同时运行数百万个虚拟线程。但当然,廉价的线程可能会运行昂贵的任务。如果您需要限制太多虚拟线程对宝贵资源的过度使用,请使用
Semaphore
及其可重用许可池。