我试图编写代码以计算计算机上某些类型的文件。我同时测试了一个线程解决方案和多线程异步解决方案,看来一个线程的运行速度更快。我的代码有什么问题吗?如果没有,为什么它不能更快地运行?
以下代码:AsynchFileCounter-异步版本。ExtensionFilter-文件过滤器,仅列出指定扩展名的目录和文件BasicFileCounter-一线程版本。
public class AsynchFileCounter {
public int countFiles(String path, String extension) throws InterruptedException, ExecutionException {
ExtensionFilter filter = new ExtensionFilter(extension, true);
File f = new File(path);
return countFilesRecursive(f, filter);
}
private int countFilesRecursive(File f, ExtensionFilter filter) throws InterruptedException, ExecutionException {
return CompletableFuture.supplyAsync(() -> f.listFiles(filter))
.thenApplyAsync(files -> {
int count = 0;
for (File file : files) {
if(file.isFile())
count++;
else
try {
count += countFilesRecursive(file, filter);
} catch (Exception e) {
e.printStackTrace();
}
}
return count;
}).get();
}
}
public class ExtensionFilter implements FileFilter {
private String extension;
private boolean allowDirectories;
public ExtensionFilter(String extension, boolean allowDirectories) {
if(extension.startsWith("."))
extension = extension.substring(1);
this.extension = extension;
this.allowDirectories = allowDirectories;
}
@Override
public boolean accept(File pathname) {
if(pathname.isFile() && pathname.getName().endsWith("." + extension))
return true;
if(allowDirectories) {
if(pathname.isDirectory())
return true;
}
return false;
}
}
public class BasicFileCounter {
public int countFiles(String path, String extension) {
ExtensionFilter filter = new ExtensionFilter(extension, true);
File f = new File(path);
return countFilesRecursive(f, filter);
}
private int countFilesRecursive(File f, ExtensionFilter filter) {
int count = 0;
File [] ar = f.listFiles(filter);
for (File file : ar) {
if(file.isFile())
count++;
else
count += countFilesRecursive(file, filter);
}
return count;
}
}
public int countFiles(String path, String extension) {
ExtensionFilter filter = new ExtensionFilter(extension, true);
File f = new File(path);
return countFilesRecursive(f, filter).join();
}
private CompletableFuture<Integer> countFilesRecursive(File f, FileFilter filter) {
return CompletableFuture.supplyAsync(() -> f.listFiles(filter))
.thenCompose(files -> {
if(files == null) return CompletableFuture.completedFuture(0);
int count = 0;
CompletableFuture<Integer> fileCount = new CompletableFuture<>(), all=fileCount;
for (File file : files) {
if(file.isFile())
count++;
else
all = countFilesRecursive(file, filter).thenCombine(all, Integer::sum);
}
fileCount.complete(count);
return all;
});
}
注意,File.listFiles
可能会返回null
。
此代码将立即计算目录中的所有文件,但会为子目录启动新的异步作业。子目录作业的结果通过thenCombine
组合起来,对其结果求和。为了简化,我们创建另一个CompletableFuture
,fileCount
来表示本地计数的文件。thenCompose
返回的Future将由指定函数返回的Future的结果完成,因此调用方可以使用join()
等待整个操作的最终结果。对于I / O操作,使用不同的线程池可能会有所帮助,因为默认
ForkJoinPool
配置为利用CPU内核而不是I / O带宽:
public int countFiles(String path, String extension) { ExecutorService es = Executors.newFixedThreadPool(30); ExtensionFilter filter = new ExtensionFilter(extension, true); File f = new File(path); int count = countFilesRecursive(f, filter, es).join(); es.shutdown(); return count; } private CompletableFuture<Integer> countFilesRecursive(File f,FileFilter filter,Executor e){ return CompletableFuture.supplyAsync(() -> f.listFiles(filter), e) .thenCompose(files -> { if(files == null) return CompletableFuture.completedFuture(0); int count = 0; CompletableFuture<Integer> fileCount = new CompletableFuture<>(), all=fileCount; for (File file : files) { if(file.isFile()) count++; else all = countFilesRecursive(file, filter,e).thenCombine(all,Integer::sum); } fileCount.complete(count); return all; }); }
没有最佳的线程数,这取决于实际的执行环境,并且需要进行测量和调整。当应用程序应该在不同的环境中运行时,这应该是可配置的参数。
但是请考虑您可能使用了错误的工具来完成这项工作。另一种方法是Fork / Join任务,该任务支持与线程池进行交互以确定当前的饱和度,因此一旦所有工作线程都忙后,它将以普通的递归方式继续进行本地扫描,而不是提交更多的异步作业:
public int countFiles(String path, String extension) { ExtensionFilter filter = new ExtensionFilter(extension, true); File f = new File(path); return POOL.invoke(new FileCountTask(f, filter)); } private static final int TARGET_SURPLUS = 3, TARGET_PARALLELISM = 30; private static final ForkJoinPool POOL = new ForkJoinPool(TARGET_PARALLELISM); static final class FileCountTask extends RecursiveTask<Integer> { private final File path; private final FileFilter filter; public FileCountTask(File file, FileFilter ff) { this.path = file; this.filter = ff; } @Override protected Integer compute() { return scan(path, filter); } private static int scan(File directory, FileFilter filter) { File[] fileList = directory.listFiles(filter); if(fileList == null || fileList.length == 0) return 0; List<FileCountTask> recursiveTasks = new ArrayList<>(); int count = 0; for(File file: fileList) { if(file.isFile()) count++; else { if(getSurplusQueuedTaskCount() < TARGET_SURPLUS) { FileCountTask task = new FileCountTask(file, filter); recursiveTasks.add(task); task.fork(); } else count += scan(file, filter); } } for(int ix = recursiveTasks.size() - 1; ix >= 0; ix--) { FileCountTask task = recursiveTasks.get(ix); if(task.tryUnfork()) task.complete(scan(task.path, task.filter)); } for(FileCountTask task: recursiveTasks) { count += task.join(); } return count; } }
count + = countFilesRecursive(file,filter);
并且使用get()接收结果,我实际上是在等待结果,而不是真正地并行化代码。
我很想听听我应该如何解决?
这是我使用的丑陋方式:
public class AsynchFileCounter {
private LongAdder count;
public int countFiles(String path, String extension) {
count = new LongAdder();
ExtensionFilter filter = new ExtensionFilter(extension, true);
File f = new File(path);
countFilesRecursive(f, filter);
// ******** The way I check whether The function is done **************** //
int prev = 0;
int cur = 0;
do {
prev = cur;
try {
Thread.sleep(50);
} catch (InterruptedException e) {}
cur = (int)count.sum();
} while(cur>prev);
// ******************************************************************** //
return count.intValue();
}
private void countFilesRecursive(File f, ExtensionFilter filter) {
CompletableFuture.supplyAsync(() -> f.listFiles(filter))
.thenAcceptAsync(files -> {
for (File file : files) {
if(file.isFile())
count.increment();
else
countFilesRecursive(file, filter);
}
});
}
}
我使用AtomicInteger而不是LongAdder来计数文件。
public class AsynchFileCounter {
private AtomicInteger count;
private AtomicInteger countDirectories;
private ReentrantLock lock;
private Condition noMoreDirectories;
public int countFiles(String path, String extension) {
count = new AtomicInteger();
countDirectories = new AtomicInteger();
lock = new ReentrantLock();
noMoreDirectories = lock.newCondition();
ExtensionFilter filter = new ExtensionFilter(extension, true);
File f = new File(path);
countFilesRecursive(f, filter);
lock.lock();
try {
noMoreDirectories.await();
} catch (InterruptedException e) {}
finally {
lock.unlock();
}
return count.intValue();
}
private void countFilesRecursive(File f, ExtensionFilter filter) {
countDirectories.getAndIncrement();
CompletableFuture.supplyAsync(() -> f.listFiles(filter))
.thenAcceptAsync(files -> countFiles(filter, files));
}
private void countFiles(ExtensionFilter filter, File[] files) {
if(files != null) {
for (File file : files) {
if(file.isFile())
count.incrementAndGet();
else
countFilesRecursive(file, filter);
}
}
int currentCount = countDirectories.decrementAndGet();
if(currentCount == 0) {
lock.lock();
try {
noMoreDirectories.signal();
}
finally {
lock.unlock();
}
}
}
}