我有N个多头它们的ID。对于每一个ID,我需要执行一个Runnable(即我不在乎返回值),并等待,直到所有的人都完成了。每一个Runnable接口可以从几秒钟到几分钟,它是安全的并行运行约100个线程。
在我们目前的解决方案中,我们使用Executors.newFixedThreadPool()调用提交()每个ID,然后调用get()每个返回的未来。
该代码工作得很好,这是非常简单的,我没有要处理的线程,复杂的等待逻辑,等它有一个缺点:内存占用。
所有仍在排队Runnable接口的消耗内存(远远超过8个字节比通过长需要更多:这是我的Java类的一些内部状态),并且所有的N个未来的情况下,占用内存过多(这是Java类与状态,以及,我只用等待,但我并不需要实际的结果)。我看着堆转储和我估计的内存稍微超过1吉布吸收为N = 10万元。千万多头阵列中的只会消耗76个MIB。
有没有办法解决这个问题,只有持有在内存中的ID,最好不采取低层次并行编程的方法吗?
这是什么样的,我通常会用一个生产者/消费者模式做的事情,和一个阻塞队列协调两,或使用阿卡演员,如果我有手头上的项目。
但我想我会建议的东西有点不同,依靠Java流的行为。
直观上,流的懒惰执行将用于节流的工作单位,期货,其结果的产生。
public static void main(String[] args) {
// So we have a list of ids, I stream it
// (note : if we have an iterator, you could group it by a batch of, say 100,
// and then flat map each batch)
LongStream ids = LongStream.range(0, 10_000_000L);
// This is were the actual tasks will be dispatched
ExecutorService executor = Executors.newFixedThreadPool(4);
// For each id to compute, create a runnable, which I call "WorkUnit"
Optional<Exception> error = ids.mapToObj(WorkUnit::new)
// create a parralel stream
// this allows the stream engine to launch the next instructions concurrently
.parallel()
// We dispatch ("parallely") the work units to a thread and have them execute
.map(workUnit -> CompletableFuture.runAsync(workUnit, executor))
// And then we wait for the unit of work to complete
.map(future -> {
try {
future.get();
} catch (Exception e) {
// we do care about exceptions
return e;
} finally {
System.out.println("Done with a work unit ");
}
// we do not care for the result
return null;
})
// Keep exceptions on the stream
.filter(Objects::nonNull)
// Stop as soon as one is found
.findFirst();
executor.shutdown();
System.out.println(error.isPresent());
}
说实话,我不Quiete酒店肯定的行为是由规范保证,但根据我的经验它的工作原理。并行“chunck”中的每一个抓住几个IDS,饲料它管道(映射到工作单位,分派到线程池,等待结果,过滤器例外),这意味着达到平衡很快积极平衡工作单位的数量的executor
的。
如果并联的“块”的数量来进行微调,应该在这里跟进:qazxsw POI
是的:你可以有多头的共享队列。您将n Custom thread pool in Java 8 parallel streams提交给执行者,其中n是在执行的线程数,在Runnable
方法的末尾,你的下一个长从队列中,并重新提交一份新run
。
而不是创造的Runnable万元的,创建特定线程池这需要多头的任务。与其等待任务,那么Future.get(完成),使用CountdownLatch。
该线程池可以实现这样的:
Runnable
怎么样使用int N = 1000000;// number of tasks;
int T = 100; // number of threads;
CountdownLatch latch = new CountdownLatch(N);
ArrayBlockingQueue<Long> queue = new ArrayBlockingQueue<>();
for (int k=0; k<N; k++) {
queue.put(createNumber(k));
}
for (int k=0; k<T; k++) {
new WorkingThread().start();
}
CountdownLatch.await();
class WorkingThread extends Thread {
public void run() {
while (latch.getCount() != 0) {
processNumber(queue.take());
latch.countDown();
}
}
}
?类似下面的(可能包含的错误,我没有测试):
ExecutorCompletionService
上述的另一个版本可以使用import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.function.LongFunction;
public class Foo {
private final ExecutorCompletionService<Void> completionService;
private final LongFunction<Runnable> taskCreator;
private final long maxRunning; // max tasks running or queued
public Foo(Executor executor, LongFunction<Runnable> taskCreator, long maxRunning) {
this.completionService = new ExecutorCompletionService<>(executor);
this.taskCreator = taskCreator;
this.maxRunning = maxRunning;
}
public synchronized void processIds(long[] ids) throws InterruptedException {
int completed = 0;
int running = 0;
for (long id : ids) {
if (running < maxRunning) {
completionService.submit(taskCreator.apply(id), null);
running++;
} else {
completionService.take();
running--;
completed++;
}
}
while (completed < ids.length) {
completionService.take();
completed++;
}
}
}
和Semaphore
,而不是CountDownLatch
。
CompletionService