如何在 BlockingQueue 上使用多线程处理数据(Consumer)? 我正在按照此处所述实施生产者和消费者问题https://javarevisited.blogspot.com/2012/02/producer-consumer-design-pattern-with.html#axzz81nA2tZnG 在我的用例中,我将项目添加到 BlockingQueue。 我对此很好。开始消费(在我处理添加到队列的数据的情况下),我想要多个线程来处理队列。
`BlockingQueue insertData = new LinkedBlockingQueue();
asnProcessor.setSharedQueue(insertData);
Thread asnProcessorThread = new Thread(asnProcessor);
asnProcessorThread.start();
if(!listOutput.isEmpty()) {
//here successfully populating insertData through multiple threads using ForkJoinPool
final int parallelism = 50;
ForkJoinPool forkJoinPool = null;
try {
forkJoinPool = new ForkJoinPool(parallelism);
List<AllAsnsResult> finalListOutput = listOutput;
final List<Boolean> output = (List<Boolean>) forkJoinPool.submit(() ->
finalListOutput.parallelStream().
forEach(allAsnsResult -> {
call(allAsnsResult, userId, insertData);
}
)
).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown();
}
}
}`
asnProcessor 如下所示
public class AsnProcessor implements Runnable {
运行方法如下
`@覆盖 公共无效运行(){
final int parallelism = 20;
ForkJoinPool forkJoinPool = null;
try {
forkJoinPool = new ForkJoinPool(parallelism);
List<Boolean> finalListOutput = (List<Boolean>) forkJoinPool.submit(() ->
sharedQueue.parallelStream().
forEach(asnBatch -> {
asnService.saveAllEntities(asnBatch); }
)
).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
} finally {
if (forkJoinPool != null) {
forkJoinPool.shutdown();
}
}
}
`
如果消费者(AsnProcessor.java)是单线程的,我可以写一些像下面这样的东西
`@覆盖 公共无效运行(){
while(true){
AsnEntity asnEntity = (AsnEntity) sharedQueue.take();
asnService.saveAllEntities(asnEntity)
}
}`
但是我需要多个线程来处理 insertData,无法用多个线程处理相同的数据。如何在 BlockingQueue 的处理中使用多线程?