连续将Runnable任务提交给ExecutorService,直到工作完成,并获取java.util.concurrent.RejectedExecutionException

问题描述 投票:1回答:1

我的多线程类应该对operation1类的许多对象执行三个操作– operation2operation3ClassA,其中每种类型的操作都取决于较早的对象。操作。为此,我尝试使用多个BlockingQueue和一个ExecutorService实现生产者-消费者模式。

final ExecutorService executor = ForkJoinPool.commonPool();
final BlockingQueue<ClassA> operationOneQueue = new ArrayBlockingQueue<>(NO_OF_CLASS_A_OBJECTS);
final BlockingQueue<ClassA> operationTwoQueue = new ArrayBlockingQueue<>(NO_OF_CLASS_A_OBJECTS);
final BlockingQueue<ClassA> operationThreeQueue = new ArrayBlockingQueue<>(NO_OF_CLASS_A_OBJECTS);
final BlockingQueue<ClassA> resultQueue = new ArrayBlockingQueue<>(NO_OF_CLASS_A_OBJECTS);

操作是这样实现的:

void doOperationOne() throws InterruptedException {
    ClassA objectA = operationOneQueue.take();
    objectA.operationOne();
    operationTwoQueue.put(objectA);
}

其中每种类型的操作都有自己的对应方法,并具有“自己的”入队和出队。每个操作方法都会在ClassA对象上调用适当的方法。 doOperationThree方法将ClassA对象放入resultQueue中,表示它们已被完全处理。

首先,我用要操作的所有operationOneQueue对象填充ClassA。然后,我尝试将可执行任务分配给ExecutorService,如下所示:

    while (resultQueue.size() < NO_OF_CLASS_A_OBJECTS) {
        executor.execute(() -> {
            try {
                doOperationOne();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        executor.execute(() -> {
            try {
                doOperationTwo();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        executor.execute(() -> {
            try {
                doOperationThree();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
    }
    executor.shutdown();

运行程序,我得到一个java.util.concurrent.RejectedExecutionException

    Operation1: ClassA object 0
    Operation2: ClassA object 0
    Operation1: ClassA object 1
    Operation3: ClassA object 0
    ....
    Operation1: ClassA object 46
    Operation2: ClassA object 45
    Operation3: ClassA object 45
    Exception in thread "main" java.util.concurrent.RejectedExecutionException: Queue capacity exceeded
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.growArray(ForkJoinPool.java:912)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.lockedPush(ForkJoinPool.java:867)
at java.base/java.util.concurrent.ForkJoinPool.externalPush(ForkJoinPool.java:1911)
at java.base/java.util.concurrent.ForkJoinPool.externalSubmit(ForkJoinPool.java:1930)
at java.base/java.util.concurrent.ForkJoinPool.execute(ForkJoinPool.java:2462)
at concurrent.operations.Program1.main(Program1.java:96)

我做错了什么?如何在不使线程池过度饱和的情况下实现这一目标?

编辑:全面披露-这是有一些要求的作业。 1.我必须使用ForkJoinPool.commonPool()并且不能自己设置线程数,2.我必须使用使用者-生产者模式,并且3.我不能修改ClassA

我的多线程类应该对ClassA类的许多对象执行三种操作-operation1,operation2和operation3,其中每种类型的操作都取决于...

java threadpool producer-consumer java.util.concurrent blockingqueue
1个回答
0
投票
我真的很喜欢做并发的东西,所以我确实尝试编写它。我确实使用了CompletableFuture,其中a)默认情况下确实在ForkJoinPool.commonPool中运行,b)使得实际处理变得非常容易:
© www.soinside.com 2019 - 2024. All rights reserved.