我想创建在单独的 Java 线程池上运行的虚拟线程池。
这是我正在尝试创建的架构:
这使我能够创建单独的池来在一个 JVM 中运行批处理任务,并利用每个池的 n:m 映射的虚拟线程。因此,如果我有 12 个核心,那么我可以创建 2 个 6 线程的线程池。每个池只会执行一个特定的任务。每个池将有 N 个虚拟线程。因此,这里的映射将是 2 个 {N VirtualThreads -> 6 Platform Threads} 池。
TLDR,我想限制虚拟线程池可以运行的 PlatformThreads 数量。
我能想到的一件事是,创建线程池,当传入可运行对象时,在 run 方法内我可以创建虚拟线程,但不确定它有多实用,以及我是否会得到我想要的池分区。这种方法的另一个问题是,虚拟线程将仅在一个 java 线程中运行,因此没有 N:M 映射
虚拟线程的发明是为了避免您似乎正在经历的麻烦。
而且,虚拟线程被明确记录为“不”用于池化。就像面巾纸一样,拿一张新的,使用后丢弃。 你说:
我的用例使用相互依赖的作业(基本上一个作业的输出为队列中的另一个作业提供输入)。
…并且:
传入可运行对象时,在 run 方法中我可以创建虚拟线程
如果您有一系列级联任务,每个任务都在前一个任务的结果之后执行,那么只需将所有工作封装在一个方法中即可。在虚拟线程中执行该方法。让该虚拟线程运行完成。
让我们设计一个简单的演示应用程序。我们有三项相互关联的任务:
TaskA
、
TaskB
和TaskC
。它们被安置在一起作为 AlphabetTask
。class AlphabetTask implements Callable < String >
{
private final UUID id = UUID.randomUUID ( );
@Override
public String call ( ) throws Exception
{
System.out.println ( "Starting AlphabetTask " + this.id + " " + Instant.now ( ) );
String a = new TaskA ( ).call ( );
String b = new TaskB ( a ).call ( );
String c = new TaskC ( b ).call ( );
System.out.println ( "Ending AlphabetTask " + this.id + " Result: " + c + " " + Instant.now ( ) );
return c;
}
}
class TaskA implements Callable < String >
{
@Override
public String call ( ) throws Exception
{
System.out.println ( "Running TaskA. " + Instant.now ( ) );
Thread.sleep ( Duration.ofMillis ( ThreadLocalRandom.current ( ).nextInt ( 100 , 800 ) ) );
return "A";
}
}
class TaskB implements Callable < String >
{
private final String input;
public TaskB ( final String input )
{
this.input = input;
}
@Override
public String call ( ) throws Exception
{
System.out.println ( "Running TaskB. " + Instant.now ( ) );
Thread.sleep ( Duration.ofMillis ( ThreadLocalRandom.current ( ).nextInt ( 100 , 800 ) ) );
return this.input + "B";
}
}
class TaskC implements Callable < String >
{
private final String input;
public TaskC ( final String input )
{
this.input = input;
}
@Override
public String call ( ) throws Exception
{
System.out.println ( "Running TaskC. " + Instant.now ( ) );
Thread.sleep ( Duration.ofMillis ( ThreadLocalRandom.current ( ).nextInt ( 100 , 800 ) ) );
return this.input + "C";
}
}
现在我们执行三个 AlphabetTask 实例,每个实例都在自己的虚拟线程中。
Collection < AlphabetTask > alphabetTasks =
List.of (
new AlphabetTask ( ) ,
new AlphabetTask ( ) ,
new AlphabetTask ( )
);
List < Future < String > > futures = List.of ( );
try (
ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor ( ) ;
)
{
try
{
futures = executorService.invokeAll ( alphabetTasks );
} catch ( InterruptedException e )
{
throw new RuntimeException ( e );
}
}
// Blocks here until executor service is done.
futures.forEach ( stringFuture -> {
try
{
System.out.println ( stringFuture.get ( ) );
} catch ( InterruptedException | ExecutionException e )
{
throw new RuntimeException ( e );
}
} );