带有CompletableFuture的Java多线程工作速度较慢

问题描述 投票:0回答:3

我试图编写代码以计算计算机上某些类型的文件。我同时测试了一个线程解决方案和多线程异步解决方案,看来一个线程的运行速度更快。我的代码有什么问题吗?如果没有,为什么它不能更快​​地运行?

以下代码:AsynchFileCounter-异步版本。ExtensionFilter-文件过滤器,仅列出指定扩展名的目录和文件BasicFileCounter-一线程版本。

public class AsynchFileCounter {
    public int countFiles(String path, String extension) throws InterruptedException, ExecutionException {
        ExtensionFilter filter = new ExtensionFilter(extension, true);
        File f = new File(path);
        return countFilesRecursive(f, filter);
    }

    private int countFilesRecursive(File f, ExtensionFilter filter) throws InterruptedException, ExecutionException {
        return CompletableFuture.supplyAsync(() -> f.listFiles(filter))
            .thenApplyAsync(files -> {
                int count = 0;
                for (File file : files) {
                    if(file.isFile())
                        count++;
                    else
                        try {
                            count += countFilesRecursive(file, filter);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                }
                return count;
            }).get();
    }

}

public class ExtensionFilter implements FileFilter {
    private String extension;
    private boolean allowDirectories;

    public ExtensionFilter(String extension, boolean allowDirectories) {
        if(extension.startsWith("."))
            extension = extension.substring(1);
        this.extension = extension;
        this.allowDirectories = allowDirectories;
    }

    @Override
    public boolean accept(File pathname) {
        if(pathname.isFile() && pathname.getName().endsWith("." + extension))
            return true;
        if(allowDirectories) {
            if(pathname.isDirectory())
                return true;
        }
        return false;
    }
}

public class BasicFileCounter {
    public int countFiles(String path, String extension) {
        ExtensionFilter filter = new ExtensionFilter(extension, true);
        File f = new File(path);
        return countFilesRecursive(f, filter);
    }

    private int countFilesRecursive(File f, ExtensionFilter filter) {
        int count = 0;
        File [] ar = f.listFiles(filter);
        for (File file : ar) {
            if(file.isFile())
                count++;
            else
                count += countFilesRecursive(file, filter);
        }
        return count;
    }
}

java asynchronous completable-future
3个回答
1
投票
public int countFiles(String path, String extension) { ExtensionFilter filter = new ExtensionFilter(extension, true); File f = new File(path); return countFilesRecursive(f, filter).join(); } private CompletableFuture<Integer> countFilesRecursive(File f, FileFilter filter) { return CompletableFuture.supplyAsync(() -> f.listFiles(filter)) .thenCompose(files -> { if(files == null) return CompletableFuture.completedFuture(0); int count = 0; CompletableFuture<Integer> fileCount = new CompletableFuture<>(), all=fileCount; for (File file : files) { if(file.isFile()) count++; else all = countFilesRecursive(file, filter).thenCombine(all, Integer::sum); } fileCount.complete(count); return all; }); }

注意,File.listFiles可能会返回null

此代码将立即计算目录中的所有文件,但会为子目录启动新的异步作业。子目录作业的结果通过thenCombine组合起来,对其结果求和。为了简化,我们创建另一个CompletableFuturefileCount来表示本地计数的文件。 thenCompose返回的Future将由指定函数返回的Future的结果完成,因此调用方可以使用join()等待整个操作的最终结果。

对于I / O操作,使用不同的线程池可能会有所帮助,因为默认ForkJoinPool配置为利用CPU内核而不是I / O带宽:

public int countFiles(String path, String extension) { ExecutorService es = Executors.newFixedThreadPool(30); ExtensionFilter filter = new ExtensionFilter(extension, true); File f = new File(path); int count = countFilesRecursive(f, filter, es).join(); es.shutdown(); return count; } private CompletableFuture<Integer> countFilesRecursive(File f,FileFilter filter,Executor e){ return CompletableFuture.supplyAsync(() -> f.listFiles(filter), e) .thenCompose(files -> { if(files == null) return CompletableFuture.completedFuture(0); int count = 0; CompletableFuture<Integer> fileCount = new CompletableFuture<>(), all=fileCount; for (File file : files) { if(file.isFile()) count++; else all = countFilesRecursive(file, filter,e).thenCombine(all,Integer::sum); } fileCount.complete(count); return all; }); }

没有最佳的线程数,这取决于实际的执行环境,并且需要进行测量和调整。当应用程序应该在不同的环境中运行时,这应该是可配置的参数。

但是请考虑您可能使用了错误的工具来完成这项工作。另一种方法是Fork / Join任务,该任务支持与线程池进行交互以确定当前的饱和度,因此一旦所有工作线程都忙后,它将以普通的递归方式继续进行本地扫描,而不是提交更多的异步作业:


public int countFiles(String path, String extension) { ExtensionFilter filter = new ExtensionFilter(extension, true); File f = new File(path); return POOL.invoke(new FileCountTask(f, filter)); } private static final int TARGET_SURPLUS = 3, TARGET_PARALLELISM = 30; private static final ForkJoinPool POOL = new ForkJoinPool(TARGET_PARALLELISM); static final class FileCountTask extends RecursiveTask<Integer> { private final File path; private final FileFilter filter; public FileCountTask(File file, FileFilter ff) { this.path = file; this.filter = ff; } @Override protected Integer compute() { return scan(path, filter); } private static int scan(File directory, FileFilter filter) { File[] fileList = directory.listFiles(filter); if(fileList == null || fileList.length == 0) return 0; List<FileCountTask> recursiveTasks = new ArrayList<>(); int count = 0; for(File file: fileList) { if(file.isFile()) count++; else { if(getSurplusQueuedTaskCount() < TARGET_SURPLUS) { FileCountTask task = new FileCountTask(file, filter); recursiveTasks.add(task); task.fork(); } else count += scan(file, filter); } } for(int ix = recursiveTasks.size() - 1; ix >= 0; ix--) { FileCountTask task = recursiveTasks.get(ix); if(task.tryUnfork()) task.complete(scan(task.path, task.filter)); } for(FileCountTask task: recursiveTasks) { count += task.join(); } return count; } }

0
投票

count + = countFilesRecursive(file,filter);


并且使用get()接收结果,我实际上是在等待结果,而不是真正地并行化代码。


这是我当前的代码,它实际上比一个线程的代码运行快得多。但是,我无法找出一种优雅的方式来了解何时完成并行方法。

我很想听听我应该如何解决?

这是我使用的丑陋方式:

public class AsynchFileCounter { private LongAdder count; public int countFiles(String path, String extension) { count = new LongAdder(); ExtensionFilter filter = new ExtensionFilter(extension, true); File f = new File(path); countFilesRecursive(f, filter); // ******** The way I check whether The function is done **************** // int prev = 0; int cur = 0; do { prev = cur; try { Thread.sleep(50); } catch (InterruptedException e) {} cur = (int)count.sum(); } while(cur>prev); // ******************************************************************** // return count.intValue(); } private void countFilesRecursive(File f, ExtensionFilter filter) { CompletableFuture.supplyAsync(() -> f.listFiles(filter)) .thenAcceptAsync(files -> { for (File file : files) { if(file.isFile()) count.increment(); else countFilesRecursive(file, filter); } }); } }


0
投票

我使用AtomicInteger而不是LongAdder来计数文件。

    阅读Holger的答案后,我决定计算正在处理的目录。当数字降为零时,工作就完成了。因此,我添加了一个锁和一个条件,以使主线程知道工作何时完成。
  1. 我添加了一个检查file.listFiles()是否返回空值。我在Windows上运行了代码,但从未执行(我有一个空目录,并且它返回了一个空数组),但是由于它使用的是本机代码,因此在其他操作系统上可能会返回null。
    public class AsynchFileCounter { private AtomicInteger count; private AtomicInteger countDirectories; private ReentrantLock lock; private Condition noMoreDirectories; public int countFiles(String path, String extension) { count = new AtomicInteger(); countDirectories = new AtomicInteger(); lock = new ReentrantLock(); noMoreDirectories = lock.newCondition(); ExtensionFilter filter = new ExtensionFilter(extension, true); File f = new File(path); countFilesRecursive(f, filter); lock.lock(); try { noMoreDirectories.await(); } catch (InterruptedException e) {} finally { lock.unlock(); } return count.intValue(); } private void countFilesRecursive(File f, ExtensionFilter filter) { countDirectories.getAndIncrement(); CompletableFuture.supplyAsync(() -> f.listFiles(filter)) .thenAcceptAsync(files -> countFiles(filter, files)); } private void countFiles(ExtensionFilter filter, File[] files) { if(files != null) { for (File file : files) { if(file.isFile()) count.incrementAndGet(); else countFilesRecursive(file, filter); } } int currentCount = countDirectories.decrementAndGet(); if(currentCount == 0) { lock.lock(); try { noMoreDirectories.signal(); } finally { lock.unlock(); } } } }
© www.soinside.com 2019 - 2024. All rights reserved.