假设我要读取1000个文件,由于某些限制,我想最多并行读取5个文件。而且,一旦其中一个完成,我希望一个新的开始。
我有一个具有文件列表的主要功能,每当一个线程完成时,我都尝试更改一个计数器。但它不起作用!
有任何建议吗?
以下为主要功能循环
for (final File filename : folder.listFiles()) {
Object lock1 = new Object();
new myThread(filename, lock1).start();
counter++;
while (counter > 5);
}
像这样产生线程不是要走的路。使用ExecutorService
并将池指定为5。将所有文件放在BlockingQueue
或另一个线程安全的集合中,所有正在执行的文件都可以随意将其poll()
。
public class ThreadReader {
public static void main(String[] args) {
File f = null;//folder
final BlockingQueue<File> queue = new ArrayBlockingQueue<File>(1000);
for(File kid : f.listFiles()){
queue.add(kid);
}
ExecutorService pool = Executors.newFixedThreadPool(5);
for(int i = 1; i <= 5; i++){
Runnable r = new Runnable(){
public void run() {
File workFile = null;
while((workFile = queue.poll()) != null){
//work on the file.
}
}
};
pool.execute(r);
}
}
}
Kylar回答中的方法是正确的。使用Java类库提供的执行程序类,而不是从头开始(非常糟糕)自己实现线程池。
但是我认为讨论您的问题中的代码以及为什么它不起作用可能很有用。 (我已尽我所能填写了您遗漏的某些部分...)
public class MyThread extends Thread {
private static int counter;
public MyThread(String fileName, Object lock) {
// Save parameters in instance variables
}
public void run() {
// Do stuff with instance variables
counter--;
}
public static void main(String[] args) {
// ...
for (final File filename : folder.listFiles()) {
Object lock1 = new Object();
new MyThread(filename, lock1).start();
counter++;
while (counter > 5);
}
// ...
}
}
好的,这怎么了?为什么不起作用?
第一个问题是,在main
中您正在读写counter
,而没有进行任何同步。我假设它也正在由工作线程更新-否则,该代码将毫无意义。因此,这意味着主线程很有可能看不到子线程进行更新的结果。换句话说,while (counter > 5);
可能是无限循环。 (实际上,这很有可能。允许JIT编译器生成代码,其中counter > 5
仅测试前一个counter
语句之后寄存器中剩余的counter++;
的值。
第二个问题是while (counter > 5);
循环非常浪费资源。您正在告诉JVM轮询一个变量...并且它将每秒执行数十亿次此操作...运行一个处理器(核心)。你不应该那样做。如果要使用低级原语来实现此类工作,则应使用Java的Object.wait()
和Object.notify()
方法;例如主线程等待,每个工作线程通知。
您可以将ExecutorService用作线程池和队列。
ExecutorService pool = Executors.newFixedThreadPool(5);
File f = new File(args[0]);
for (final File kid : f.listFiles()) {
pool.execute(new Runnable() {
@Override
public void run() {
process(kid);
}
});
}
pool.shutdown();
// wait for them to finish for up to one minute.
pool.awaitTermination(1, TimeUnit.MINUTES);
[无论您使用什么方法来创建新线程,增加全局计数器,在创建线程的周围添加条件语句,如果已达到限制,则不要创建新线程,可以将文件推送到队列中(一个列表?),然后您可以在创建线程后添加另一个条件语句,如果队列中有项目,则首先要处理这些项目。