为java中的平台线程池创建单独的线程池

问题描述 投票:0回答:1

我想创建在单独的 Java 线程池上运行的虚拟线程池。

这是我正在尝试创建的架构:

这使我能够创建单独的池来在一个 JVM 中运行批处理任务,并利用每个池的 n:m 映射的虚拟线程。因此,如果我有 12 个核心,那么我可以创建 2 个 6 线程的线程池。每个池只会执行一个特定的任务。每个池将有 N 个虚拟线程。因此,这里的映射将是 2 个 {N VirtualThreads -> 6 Platform Threads} 池。

TLDR,我想限制虚拟线程池可以运行的 PlatformThreads 数量。

我能想到的一件事是,创建线程池,当传入可运行对象时,在 run 方法内我可以创建虚拟线程,但不确定它有多实用,以及我是否会得到我想要的池分区。这种方法的另一个问题是,虚拟线程将仅在一个 java 线程中运行,因此没有 N:M 映射

java threadpoolexecutor java-threads java-21 virtual-threads
1个回答
0
投票

虚拟线程的发明是为了避免您似乎正在经历的麻烦。

而且,虚拟线程被明确记录为“不”用于池化。就像面巾纸一样,拿一张新的,使用后丢弃。 你说:

我的用例使用相互依赖的作业(基本上一个作业的输出为队列中的另一个作业提供输入)。

…并且:

传入可运行对象时,在 run 方法中我可以创建虚拟线程

如果您有一系列级联任务,每个任务都在前一个任务的结果之后执行,那么只需将所有工作封装在一个方法中即可。在虚拟线程中执行该方法。让该虚拟线程运行完成。

让我们设计一个简单的演示应用程序。我们有三项相互关联的任务:

TaskA

TaskB
TaskC
。它们被安置在一起作为
AlphabetTask
class AlphabetTask implements Callable < String >
{
    private final UUID id = UUID.randomUUID ( );

    @Override
    public String call ( ) throws Exception
    {
        System.out.println ( "Starting AlphabetTask " + this.id + " " + Instant.now ( ) );
        String a = new TaskA ( ).call ( );
        String b = new TaskB ( a ).call ( );
        String c = new TaskC ( b ).call ( );
        System.out.println ( "Ending AlphabetTask " + this.id + " Result: " + c + " " + Instant.now ( ) );
        return c;
    }
}

class TaskA implements Callable < String >
{
    @Override
    public String call ( ) throws Exception
    {
        System.out.println ( "Running TaskA. " + Instant.now ( ) );
        Thread.sleep ( Duration.ofMillis ( ThreadLocalRandom.current ( ).nextInt ( 100 , 800 ) ) );
        return "A";
    }
}

class TaskB implements Callable < String >
{
    private final String input;

    public TaskB ( final String input )
    {
        this.input = input;
    }

    @Override
    public String call ( ) throws Exception
    {
        System.out.println ( "Running TaskB. " + Instant.now ( ) );
        Thread.sleep ( Duration.ofMillis ( ThreadLocalRandom.current ( ).nextInt ( 100 , 800 ) ) );
        return this.input + "B";
    }
}

class TaskC implements Callable < String >
{
    private final String input;

    public TaskC ( final String input )
    {
        this.input = input;
    }

    @Override
    public String call ( ) throws Exception
    {
        System.out.println ( "Running TaskC. " + Instant.now ( ) );
        Thread.sleep ( Duration.ofMillis ( ThreadLocalRandom.current ( ).nextInt ( 100 , 800 ) ) );
        return this.input + "C";
    }
}

现在我们执行三个 AlphabetTask 实例,每个实例都在自己的虚拟线程中。

Collection < AlphabetTask > alphabetTasks = List.of ( new AlphabetTask ( ) , new AlphabetTask ( ) , new AlphabetTask ( ) ); List < Future < String > > futures = List.of ( ); try ( ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor ( ) ; ) { try { futures = executorService.invokeAll ( alphabetTasks ); } catch ( InterruptedException e ) { throw new RuntimeException ( e ); } } // Blocks here until executor service is done. futures.forEach ( stringFuture -> { try { System.out.println ( stringFuture.get ( ) ); } catch ( InterruptedException | ExecutionException e ) { throw new RuntimeException ( e ); } } );

© www.soinside.com 2019 - 2024. All rights reserved.