每个线程添加新任务时等待线程完成

问题描述 投票:0回答:1

所以我有一个网络爬虫。它首先将 baseURL 任务放入阻塞队列。然后,如果找到新链接,每个线程都会选择任务并在队列中输入新任务。终止是基于深度的,因此无限执行不是问题。

我应该如何优雅地等待 processqueue 任务耗尽?

以下是一些片段: Wrapper 类 - 将基本 URL 置于深度 0 并启动线程池

Constructor function:
        this.maxDepth = maxDepth;
        processQueue.offer(new Task(URL, 0));

executor function:
        executor.prestartAllCoreThreads();
        executor.shutdown();
        executor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);

... other functions to export data ...

Task 类 - 抓取当前 URL 并将未访问的链接作为任务输入到 processQueue

    public void run() {
        if(depth >= maxDepth){ return; }
        if(visited.contains(URL) == true){ return; }

        Document doc = null;
        doc = requestConnection(URL, depth);

        for(Element link: doc.select("a[href]")){
                String newLink = link.absUrl("href");
                newLink = (newLink.contains("#")) ? newLink.substring(0, newLink.indexOf("#")) : newLink;
                if (visited.contains(newLink) == false && depth + 1 <= maxDepth) {
                    processQueue.offer(new Task(newLink, depth + 1));
                }
        }
    }

我无法查明任何错误,但根据文档,这是错误的实现方式。任何帮助都适用。

java multithreading web-crawler threadpool threadpoolexecutor
1个回答
0
投票

首先将 baseURL 任务放入阻塞队列。然后,如果找到新链接,每个线程都会选择任务并在队列中输入新任务。

一个简单的解决方案是将

ExecutorService
传递给URL蜘蛛并处理
Runnable
。然后,您可以在处理之前找到的 URL 时使用线程池添加其他 URL 作业。

您不能使用

awaitTermination()
,因为在所有作业完成之前您无法关闭执行程序。有几种方法可以做到这一点。一种是
AtomicInteger
计算未完成的 URL 的数量。然后你会做这样的事情:

ExecutorService threadPool = Executors.newFixedThreadPool(MAX_NUM_THREADS);
AtomicInteger numberOutstandingUrls = new AtomicInteger();
// submit the first URL to the thread-pool
threadPool.submit(new UrlProcessingRunnable(initialUrl, threadPool,
             numberOutstandingUrls));
// wait for all of the outstanding urls to be processed
while (numberOutstandingUrls.get() > 0) {
   // change as necessary
   Thread.sleep(1000);
}
threadPool.shutdown();
threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);

你的工作会做这样的事情:

class UrlProcessingRunnable {
    private final String url;
    private final ExecutorService threadPool;
    private final AtomicInteger numberOutstandingUrls;
    public UrlProcessingRunnable(String url, ExecutorService threadPool,
              AtomicInteger numberOutstandingUrls) {
       this.url = url;
       this.threadPool = threadPool;
       this.numberOutstandingUrls = numberOutstandingUrls;
       numberOutstandingUrls.incrementAndGet();
    }
    public void run() {
       // spider URL to get content ...
       // parse content to find other URLs ...

       for (String url : otherUrls) {
           threadPool.submit(new UrlProcessingRunnable(url, threadPool,
                    numberOutstandingUrls);
       }
       // now we are finished with this URL
       numberOutstandingUrls.decrementAndGet();
    }
}

如果你不喜欢轮询方法,那么你可以使用等待/通知。

© www.soinside.com 2019 - 2024. All rights reserved.