所以我有一个网络爬虫。它首先将 baseURL 任务放入阻塞队列。然后,如果找到新链接,每个线程都会选择任务并在队列中输入新任务。终止是基于深度的,因此无限执行不是问题。
我应该如何优雅地等待 processqueue 任务耗尽?
以下是一些片段: Wrapper 类 - 将基本 URL 置于深度 0 并启动线程池
Constructor function:
this.maxDepth = maxDepth;
processQueue.offer(new Task(URL, 0));
executor function:
executor.prestartAllCoreThreads();
executor.shutdown();
executor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
... other functions to export data ...
Task 类 - 抓取当前 URL 并将未访问的链接作为任务输入到 processQueue
public void run() {
if(depth >= maxDepth){ return; }
if(visited.contains(URL) == true){ return; }
Document doc = null;
doc = requestConnection(URL, depth);
for(Element link: doc.select("a[href]")){
String newLink = link.absUrl("href");
newLink = (newLink.contains("#")) ? newLink.substring(0, newLink.indexOf("#")) : newLink;
if (visited.contains(newLink) == false && depth + 1 <= maxDepth) {
processQueue.offer(new Task(newLink, depth + 1));
}
}
}
我无法查明任何错误,但根据文档,这是错误的实现方式。任何帮助都适用。
首先将 baseURL 任务放入阻塞队列。然后,如果找到新链接,每个线程都会选择任务并在队列中输入新任务。
一个简单的解决方案是将
ExecutorService
传递给URL蜘蛛并处理Runnable
。然后,您可以在处理之前找到的 URL 时使用线程池添加其他 URL 作业。
您不能使用
awaitTermination()
,因为在所有作业完成之前您无法关闭执行程序。有几种方法可以做到这一点。一种是 AtomicInteger
计算未完成的 URL 的数量。然后你会做这样的事情:
ExecutorService threadPool = Executors.newFixedThreadPool(MAX_NUM_THREADS);
AtomicInteger numberOutstandingUrls = new AtomicInteger();
// submit the first URL to the thread-pool
threadPool.submit(new UrlProcessingRunnable(initialUrl, threadPool,
numberOutstandingUrls));
// wait for all of the outstanding urls to be processed
while (numberOutstandingUrls.get() > 0) {
// change as necessary
Thread.sleep(1000);
}
threadPool.shutdown();
threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
你的工作会做这样的事情:
class UrlProcessingRunnable {
private final String url;
private final ExecutorService threadPool;
private final AtomicInteger numberOutstandingUrls;
public UrlProcessingRunnable(String url, ExecutorService threadPool,
AtomicInteger numberOutstandingUrls) {
this.url = url;
this.threadPool = threadPool;
this.numberOutstandingUrls = numberOutstandingUrls;
numberOutstandingUrls.incrementAndGet();
}
public void run() {
// spider URL to get content ...
// parse content to find other URLs ...
for (String url : otherUrls) {
threadPool.submit(new UrlProcessingRunnable(url, threadPool,
numberOutstandingUrls);
}
// now we are finished with this URL
numberOutstandingUrls.decrementAndGet();
}
}
如果你不喜欢轮询方法,那么你可以使用等待/通知。