我正在构建一个网络抓取项目。
我有两个列表:
private ConcurrentQueue<string> links = new ConcurrentQueue<string>();
private ConcurrentQueue<string> Visitedlinks = new ConcurrentQueue<string>();
对于我在一个页面上找到的所有链接,以及一个将包含我抓取的所有链接的链接。
处理业务的方法:
public async Task GetUrlContent(string url)
{
var page = string.Empty;
try
{
page = await service.Get(url);
if (page != string.Empty)
{
Regex regex = new Regex(@"<a[^>]*?href\s*=\s*[""']?([^'"" >]+?)[ '""][^>]*?>",
RegexOptions.Singleline | RegexOptions.CultureInvariant);
if (regex.IsMatch(page))
{
Console.WriteLine("Downloading url: " + url);
for (int i = 0; i < regex.Matches(page).Count; i++)
{
if (regex.Matches(page)[i].Groups[1].Value.StartsWith("/"))
{
if (!links.Contains(BaseUrl + regex.Matches(page)[i].Groups[1].Value.ToLower().Replace(".html", "")) &&
!Visitedlinks.Contains(BaseUrl + regex.Matches(page)[i].Groups[1].Value.ToLower()))
{
Uri ValidUri = GetUrl(regex.Matches(page)[i].Groups[1].Value);
if (ValidUri != null && HostUrls.Contains(ValidUri.Host))
links.Enqueue(regex.Matches(page)[i].Groups[1].Value.ToLower().Replace(".html", ""));
else
links.Enqueue(BaseUrl + regex.Matches(page)[i].Groups[1].Value.ToLower().Replace(".html", ""));
}
}
}
}
var results = links.Where(m => !Visitedlinks.Contains(m)); // problkem here, get multiple values
if (!results.Any())
{
// do nothing
}
else
{
Parallel.ForEach(results, new ParallelOptions { MaxDegreeOfParallelism = 4 },
webpage =>
{
if (ValidUrl(webpage))
{
if (!Visitedlinks.Contains(webpage))
{
Visitedlinks.Enqueue(webpage);
GetUrlContent(webpage).Wait();
}
}
});
}
}
}
catch (Exception e)
{
throw;
}
}
问题在这里:
var results = links.Where(m => !Visitedlinks.Contains(m));
我可能得到的第一次迭代:
链接 1、链接 2、链接 3、链接 4、
第二次迭代:
链接 2 链接 3 链接 4,链接 5,链接 6,链接 7
第三:
链接 3、链接 4、链接 5、链接 6 等
这意味着我将多次获得相同的链接,因为这是一个同时执行多个操作的并行 foreach。我不知道如何确保我没有得到多个值。
有谁可以伸出援手吗?
如果我没理解错的话,第一个队列包含你要抓取的链接,第二个队列包含你已经抓取的链接。
问题是您正在尝试迭代
ConcurrentQueue
的内容:
var results = links.Where(m => !Visitedlinks.Contains(m));
如果您从多个线程访问这些队列,这将无法正常工作。
你应该做的是从队列中取出项目并处理它们。值得注意的是,
TryDequeue
没有出现在您的代码中的任何地方。项目进入队列但永远不会出来。队列的全部目的是我们将东西放入并取出。 ConcurrentQueue
使多个线程可以安全地放入和取出物品而不会互相踩踏。
如果您将要处理的链接出队:
string linkToProcess = null;
if(links.TryDequeue(out linkToProcess)) // if this returns false, the queue was empty
{
// process it
}
然后一旦你从队列中取出一个项目来处理它,它就不会再在队列中了。其他线程不必检查项目是否已被处理。他们只是从队列中取出下一项,如果有的话。两个线程永远不会从队列中取出相同的项目。只有一个线程可以从队列中取出一个给定的项目,因为一旦它这样做,该项目就不再在队列中了。
感谢@Scott Hannen
最终解决方案如下:
Parallel.ForEach(links, new ParallelOptions { MaxDegreeOfParallelism = 25 },
webpage =>
{
try
{
if (WebPageValidator.ValidUrl(webpage))
{
string linkToProcess = webpage;
if (links.TryDequeue(out linkToProcess) && !Visitedlinks.Contains(linkToProcess))
{
Task obj = Scrape(linkToProcess);
Visitedlinks.Enqueue(linkToProcess);
}
}
}
catch (Exception e)
{
log.Error("Error occured: " + e.Message);
Console.WriteLine("Error occured, check log for further details.");
}