如何使用自动线程管理在Java中实现生产者/消费者

问题描述 投票:0回答:2

我需要实施一个生产者/消费者方案,出于性能原因,消费者试图一批处理多个工作项(每个工作项都消耗工作队列)。

目前,我只是创建固定数量的相同工作程序,它们在循环中对相同队列进行工作。由于其中一些可能会死,我需要注意更换它们。

我很乐意使用fixedThreadPool来管理线程替换,但是我的案例并没有映射到Executor方案,因为生产者和消费者所需的粒度不匹配-只有消费者才能收集合适的批次工作

当我的工作项不能表示为Runnables / Callables时,管理(固定大小)线程池有哪些选择?

(或者,我可以以某种方式将批量生产的工作项的要求保持在一起,并且仍然能够使用执行器服务吗?]

java executorservice producer-consumer
2个回答
1
投票

一种方法是将生产者/消费者作为Runnable,并使用BlockingQueue在它们之间传递任何数据。

例如,这是生产者的简化实现,生产者将String项生成到queue,而消费者则批量读取这些项:

class ProducerConsumerPing {
    private static final class PingProducer implements Runnable {
        private final BlockingQueue<String> queue;

        PingProducer(BlockingQueue<String> queue) {
            this.queue = queue;
        }

        public void run() {
            while (true) {
                queue.offer("ping");
            }
        }
    }

    private static final class PingConsumer implements Runnable {

        private final BlockingQueue<String> queue;
        private final int batchSize;

        PingConsumer(BlockingQueue<String> queue, int batchSize) {
            this.queue = queue;
            this.batchSize = batchSize;
        }

        public void run() {
            while (true) {
                List<String> batch = new ArrayList<>();
                queue.drainTo(batch, batchSize);
                System.out.println("Consumed: " + batch);
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService producers = Executors.newFixedThreadPool(10);
        ExecutorService consumers = Executors.newFixedThreadPool(10);
        BlockingQueue<String> queue = new LinkedBlockingQueue<>();

        for (int i = 0; i < 10; i++) {
            producers.submit(new PingProducer(queue));
        }

        for (int i = 0; i < 10; i++) {
            consumers.submit(new PingConsumer(queue, 10));
        }

        producers.shutdown();
        producers.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
        consumers.shutdown();
        consumers.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
    }
}

注意:


0
投票

我通过将实际工作项保留在BlockingQueue中来解决,但是生产者将通知任务提交给Executor,该Executor指导工作线程耗尽队列。当通知任务开始排队时,一些工作人员将能够从BlockingQueue中获取一些工作项,而另一些工作人员将一无所获,这足以满足我的批​​处理目的。

© www.soinside.com 2019 - 2024. All rights reserved.