我的多线程类应该对operation1
类的许多对象执行三个操作– operation2
,operation3
和ClassA
,其中每种类型的操作都取决于较早的对象。操作。为此,我尝试使用多个BlockingQueue
和一个ExecutorService
实现生产者-消费者模式。
final ExecutorService executor = ForkJoinPool.commonPool(); final BlockingQueue<ClassA> operationOneQueue = new ArrayBlockingQueue<>(NO_OF_CLASS_A_OBJECTS); final BlockingQueue<ClassA> operationTwoQueue = new ArrayBlockingQueue<>(NO_OF_CLASS_A_OBJECTS); final BlockingQueue<ClassA> operationThreeQueue = new ArrayBlockingQueue<>(NO_OF_CLASS_A_OBJECTS); final BlockingQueue<ClassA> resultQueue = new ArrayBlockingQueue<>(NO_OF_CLASS_A_OBJECTS);
操作是这样实现的:
void doOperationOne() throws InterruptedException { ClassA objectA = operationOneQueue.take(); objectA.operationOne(); operationTwoQueue.put(objectA); }
其中每种类型的操作都有自己的对应方法,并具有“自己的”入队和出队。每个操作方法都会在
ClassA
对象上调用适当的方法。doOperationThree
方法将ClassA
对象放入resultQueue
中,表示它们已被完全处理。
首先,我用要操作的所有operationOneQueue
对象填充ClassA
。然后,我尝试将可执行任务分配给ExecutorService
,如下所示:
while (resultQueue.size() < NO_OF_CLASS_A_OBJECTS) { executor.execute(() -> { try { doOperationOne(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); executor.execute(() -> { try { doOperationTwo(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); executor.execute(() -> { try { doOperationThree(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); } executor.shutdown();
运行程序,我得到一个
java.util.concurrent.RejectedExecutionException
。
Operation1: ClassA object 0 Operation2: ClassA object 0 Operation1: ClassA object 1 Operation3: ClassA object 0 .... Operation1: ClassA object 46 Operation2: ClassA object 45 Operation3: ClassA object 45 Exception in thread "main" java.util.concurrent.RejectedExecutionException: Queue capacity exceeded at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.growArray(ForkJoinPool.java:912) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.lockedPush(ForkJoinPool.java:867) at java.base/java.util.concurrent.ForkJoinPool.externalPush(ForkJoinPool.java:1911) at java.base/java.util.concurrent.ForkJoinPool.externalSubmit(ForkJoinPool.java:1930) at java.base/java.util.concurrent.ForkJoinPool.execute(ForkJoinPool.java:2462) at concurrent.operations.Program1.main(Program1.java:96)
我做错了什么?如何在不使线程池过度饱和的情况下实现这一目标?
编辑:全面披露-这是有一些要求的作业。 1.我必须使用ForkJoinPool.commonPool()
并且不能自己设置线程数,2.我必须使用使用者-生产者模式,并且3.我不能修改ClassA
。
我的多线程类应该对ClassA类的许多对象执行三种操作-operation1,operation2和operation3,其中每种类型的操作都取决于...
CompletableFuture
,其中a)默认情况下确实在ForkJoinPool.commonPool
中运行,b)使得实际处理变得非常容易: