多线程如何处理BlockingQueue

问题描述 投票:0回答:0

如何在 BlockingQueue 上使用多线程处理数据(Consumer)? 我正在按照此处所述实施生产者和消费者问题https://javarevisited.blogspot.com/2012/02/producer-consumer-design-pattern-with.html#axzz81nA2tZnG 在我的用例中,我将项目添加到 BlockingQueue。 我对此很好。开始消费(在我处理添加到队列的数据的情况下),我想要多个线程来处理队列。

 `BlockingQueue insertData = new LinkedBlockingQueue();
    asnProcessor.setSharedQueue(insertData);
    Thread asnProcessorThread = new Thread(asnProcessor);
    asnProcessorThread.start();
    if(!listOutput.isEmpty()) {
        //here successfully populating insertData through multiple threads using ForkJoinPool
        final int parallelism = 50;
        ForkJoinPool forkJoinPool = null;
        try {
            forkJoinPool = new ForkJoinPool(parallelism);
            List<AllAsnsResult> finalListOutput = listOutput;
            final List<Boolean> output = (List<Boolean>) forkJoinPool.submit(() ->
                    finalListOutput.parallelStream().
                            forEach(allAsnsResult -> {
                                        call(allAsnsResult, userId, insertData);
                                    }
                            )
            ).get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        } finally {
            if (forkJoinPool != null) {
                forkJoinPool.shutdown();
            }
        }
    }`

asnProcessor 如下所示

public class AsnProcessor implements Runnable {
运行方法如下

`@覆盖 公共无效运行(){

    final int parallelism = 20;
    ForkJoinPool forkJoinPool = null;
    try {
        forkJoinPool = new ForkJoinPool(parallelism);
        List<Boolean> finalListOutput = (List<Boolean>) forkJoinPool.submit(() ->
                sharedQueue.parallelStream().
                        forEach(asnBatch -> {
                         
                            asnService.saveAllEntities(asnBatch);                                            }
                        )
        ).get();
    } catch (InterruptedException | ExecutionException e) {
        throw new RuntimeException(e);
    } finally {
        if (forkJoinPool != null) {
            forkJoinPool.shutdown();
        }
    }
    
}

`

如果消费者(AsnProcessor.java)是单线程的,我可以写一些像下面这样的东西

`@覆盖 公共无效运行(){

    while(true){
            AsnEntity asnEntity = (AsnEntity) sharedQueue.take();
            asnService.saveAllEntities(asnEntity)

    }
}`

但是我需要多个线程来处理 insertData,无法用多个线程处理相同的数据。如何在 BlockingQueue 的处理中使用多线程?

  1. 我在生产者中调用 .get() 以确保我在生产完成后获得控制权。还有一种方法可以知道消费者何时也完成了吗?
multithreading blockingqueue forkjoinpool linkedblockingqueue
© www.soinside.com 2019 - 2024. All rights reserved.