执行Runnable的数以百万计的与小的内存占用

问题描述 投票:3回答:4

我有N个多头它们的ID。对于每一个ID,我需要执行一个Runnable(即我不在乎返回值),并等待,直到所有的人都完成了。每一个Runnable接口可以从几秒钟到几分钟,它是安全的并行运行约100个线程。

在我们目前的解决方案中,我们使用Executors.newFixedThreadPool()调用提交()每个ID,然后调用get()每个返回的未来。

该代码工作得很好,这是非常简单的,我没有要处理的线程,复杂的等待逻辑,等它有一个缺点:内存占用。

所有仍在排队Runnable接口的消耗内存(远远超过8个字节比通过长需要更多:这是我的Java类的一些内部状态),并且所有的N个未来的情况下,占用内存过多(这是Java类与状态,以及,我只用等待,但我并不需要实际的结果)。我看着堆转储和我估计的内存稍微超过1吉布吸收为N = 10万元。千万多头阵列中的只会消耗76个MIB。

有没有办法解决这个问题,只有持有在内存中的ID,最好不采取低层次并行编程的方法吗?

java multithreading
4个回答
1
投票

这是什么样的,我通常会用一个生产者/消费者模式做的事情,和一个阻塞队列协调两,或使用阿卡演员,如果我有手头上的项目。

但我想我会建议的东西有点不同,依​​靠Java流的行为。

直观上,流的懒惰执行将用于节流的工作单位,期货,其结果的产生。

public static void main(String[] args) {
    // So we have a list of ids, I stream it
    // (note : if we have an iterator, you could group it by a batch of, say 100,
    // and then flat map each batch)
    LongStream ids = LongStream.range(0, 10_000_000L);
    // This is were the actual tasks will be dispatched
    ExecutorService executor = Executors.newFixedThreadPool(4);

    // For each id to compute, create a runnable, which I call "WorkUnit"
    Optional<Exception> error = ids.mapToObj(WorkUnit::new)
             // create a parralel stream
             // this allows the stream engine to launch the next instructions concurrently
            .parallel()
            // We dispatch ("parallely") the work units to a thread and have them execute
            .map(workUnit -> CompletableFuture.runAsync(workUnit, executor))
            // And then we wait for the unit of work to complete
            .map(future -> {
                try {
                    future.get();
                } catch (Exception e) {
                    // we do care about exceptions
                    return e;
                } finally {
                    System.out.println("Done with a work unit ");
                }
                // we do not care for the result
                return null;
            })
            // Keep exceptions on the stream
            .filter(Objects::nonNull)
            // Stop as soon as one is found
            .findFirst();


    executor.shutdown();
    System.out.println(error.isPresent());
}

说实话,我不Quiete酒店肯定的行为是由规范保证,但根据我的经验它的工作原理。并行“chunck”中的每一个抓住几个IDS,饲料它管道(映射到工作单位,分派到线程池,等待结果,过滤器例外),这意味着达到平衡很快积极平衡工作单位的数量的executor的。

如果并联的“块”的数量来进行微调,应该在这里跟进:qazxsw POI


1
投票

是的:你可以有多头的共享队列。您将n Custom thread pool in Java 8 parallel streams提交给执行者,其中n是在执行的线程数,在Runnable方法的末尾,你的下一个长从队列中,并重新提交一份新run


1
投票

而不是创造的Runnable万元的,创建特定线程池这需要多头的任务。与其等待任务,那么Future.get(完成),使用CountdownLatch。

该线程池可以实现这样的:

Runnable

1
投票

怎么样使用int N = 1000000;// number of tasks; int T = 100; // number of threads; CountdownLatch latch = new CountdownLatch(N); ArrayBlockingQueue<Long> queue = new ArrayBlockingQueue<>(); for (int k=0; k<N; k++) { queue.put(createNumber(k)); } for (int k=0; k<T; k++) { new WorkingThread().start(); } CountdownLatch.await(); class WorkingThread extends Thread { public void run() { while (latch.getCount() != 0) { processNumber(queue.take()); latch.countDown(); } } } ?类似下面的(可能包含的错误,我没有测试):

ExecutorCompletionService

上述的另一个版本可以使用import java.util.concurrent.Executor; import java.util.concurrent.ExecutorCompletionService; import java.util.function.LongFunction; public class Foo { private final ExecutorCompletionService<Void> completionService; private final LongFunction<Runnable> taskCreator; private final long maxRunning; // max tasks running or queued public Foo(Executor executor, LongFunction<Runnable> taskCreator, long maxRunning) { this.completionService = new ExecutorCompletionService<>(executor); this.taskCreator = taskCreator; this.maxRunning = maxRunning; } public synchronized void processIds(long[] ids) throws InterruptedException { int completed = 0; int running = 0; for (long id : ids) { if (running < maxRunning) { completionService.submit(taskCreator.apply(id), null); running++; } else { completionService.take(); running--; completed++; } } while (completed < ids.length) { completionService.take(); completed++; } } } Semaphore,而不是CountDownLatch

CompletionService
© www.soinside.com 2019 - 2024. All rights reserved.