我需要实施一个生产者/消费者方案,出于性能原因,消费者试图一批处理多个工作项(每个工作项都消耗工作队列)。
目前,我只是创建固定数量的相同工作程序,它们在循环中对相同队列进行工作。由于其中一些可能会死,我需要注意更换它们。
我很乐意使用fixedThreadPool来管理线程替换,但是我的案例并没有映射到Executor
方案,因为生产者和消费者所需的粒度不匹配-只有消费者才能收集合适的批次工作
当我的工作项不能表示为Runnables / Callables时,管理(固定大小)线程池有哪些选择?
(或者,我可以以某种方式将批量生产的工作项的要求保持在一起,并且仍然能够使用执行器服务吗?]
一种方法是将生产者/消费者作为Runnable
,并使用BlockingQueue
在它们之间传递任何数据。
例如,这是生产者的简化实现,生产者将String
项生成到queue
,而消费者则批量读取这些项:
class ProducerConsumerPing {
private static final class PingProducer implements Runnable {
private final BlockingQueue<String> queue;
PingProducer(BlockingQueue<String> queue) {
this.queue = queue;
}
public void run() {
while (true) {
queue.offer("ping");
}
}
}
private static final class PingConsumer implements Runnable {
private final BlockingQueue<String> queue;
private final int batchSize;
PingConsumer(BlockingQueue<String> queue, int batchSize) {
this.queue = queue;
this.batchSize = batchSize;
}
public void run() {
while (true) {
List<String> batch = new ArrayList<>();
queue.drainTo(batch, batchSize);
System.out.println("Consumed: " + batch);
}
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService producers = Executors.newFixedThreadPool(10);
ExecutorService consumers = Executors.newFixedThreadPool(10);
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
for (int i = 0; i < 10; i++) {
producers.submit(new PingProducer(queue));
}
for (int i = 0; i < 10; i++) {
consumers.submit(new PingConsumer(queue, 10));
}
producers.shutdown();
producers.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
consumers.shutdown();
consumers.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
}
}
注意:
在示例中,我将String
用作work items
,但是您当然可以将任何Object
放入队列中
我通过将实际工作项保留在BlockingQueue中来解决,但是生产者将通知任务提交给Executor,该Executor指导工作线程耗尽队列。当通知任务开始排队时,一些工作人员将能够从BlockingQueue中获取一些工作项,而另一些工作人员将一无所获,这足以满足我的批处理目的。