如何使用concurrent.futures获取url和嵌套页面url?

问题描述 投票:0回答:1

我正在尝试收集网页中的锚标记

href
值和嵌套网址,并对每个此类网址重复该操作。

我想减少使用并发获取网址的时间。

import concurrent.futures
from urllib.parse import urlsplit

import requests
from bs4 import BeautifulSoup


def get_href_from_url(url):
    try:
        response = requests.get(url)
        response.raise_for_status()  # Check for errors in HTTP response
        parts = urlsplit(url)
        base = "{0.netloc}".format(parts)
        strip_base = base.replace("www.", "")
        base_url = "{0.scheme}://{0.netloc}".format(parts)
        path = url[:url.rfind('/') + 1] if '/' in parts.path else url
        soup = BeautifulSoup(response.text, 'html.parser')
        href_values = set()
        regex = r'.*-c?([0-9]+).html'
        for link in soup.find_all('a'):
            anchor = link.attrs["href"] if "href" in link.attrs else ''
            if anchor.startswith('/'):
                local_link = base_url + anchor
                href_values.add(local_link)
            elif strip_base in anchor:
                href_values.add(anchor)

            elif not anchor.startswith('http'):
                local_link = path + anchor
                href_values.add(local_link)

        return href_values
    except Exception as e:
        print(f"Error while processing {url}: {e}")
        return []


def follow_nested_urls(seed_url):
    visited_urls = set()
    urls_to_visit = [seed_url]

    while len(urls_to_visit):
        current_url = urls_to_visit.pop()
        if current_url in visited_urls:
            continue

        visited_urls.add(current_url)
        with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
            href_values = get_href_from_url(current_url)

        nested_urls = [url for url in href_values if url.startswith('http')]

        urls_to_visit.extend(nested_urls)

        # Process href values or do other tasks as needed
        # print(f"URL: {current_url}, HREF values: {href_values}")
        print(f"visited_urls " + current_url)
        print(len(visited_urls))
        # print(len(urls_to_visit))


if __name__ == "__main__":
    seed_url = "https://www.tradeindia.com/"  # Replace with your desired starting URL
    follow_nested_urls(seed_url)
python io concurrent.futures
1个回答
0
投票

您的代码,按原样,已正常执行。

这里有一个关于

ThreadPoolExecutor
如何工作的示例。

from concurrent.futures import ThreadPoolExecutor


# IO bound function
def scraper(url):
    print(f'working on {url}...')
    print(f'done with {url}...')
    return 1

urls = range(10)

# amount of workers
n = 4
with ThreadPoolExecutor(max_workers=n) as executor:
    res = executor.map(scraper, urls)
    print(*res)

注意,

map
返回一个生成器,按原始顺序存储每个函数的结果!


这里是问题的抽象:在使用容器对象时以线程安全的方式更新它。

注意以下事项

    只要您不读取/写入特定条目,
  • list
    都是线程安全的,请使用
    queue.Queue
  • print
    不是线程安全的,请使用
    logging
  • requests.Session
    不是线程安全的(但你没有使用它,所以没关系)

不幸的是,程序不会“很好地”退出,但需要

Ctrl + c
。使用
Event
,向队列添加停止值、计时器……可以修复它。

from concurrent.futures import ThreadPoolExecutor
from queue import Queue

# for a thread-safe print
import logging
logging.basicConfig(format='Thread -> %(message)s', level=logging.WARNING)
lprint = logging.warning

# just for the example
from random import random, randint
from time import sleep


url_seed = 0 # test "url"
q_urls_to_visit = Queue()
q_urls_to_visit.put(url_seed)

urls_visited = set()

# just for the example
n = 15
L = list(range(10, n))


def scraper(url):
    lprint(f'working on {url}...')
    sleep(9*random())
    lprint(f'done with {url}...')

    if L:
        return [url] + [L.pop()] + [randint(0, 9) for _ in range(5)]
    return [url]


def update_queue(item):
    def wrapper(future):
        urls_visited.add(item)

        for res in future.result():
            if res not in urls_visited:
                q_urls_to_visit.put(res)
                urls_visited.add(res)

        q_urls_to_visit.task_done()

    return wrapper


if __name__ == '__main__':

    with ThreadPoolExecutor(max_workers=10) as executor:
        try:
            while True:
                url = q_urls_to_visit.get()
                future = executor.submit(scraper, url)
                future.add_done_callback(update_queue(url))
                              
        except (TimeoutError, KeyboardInterrupt):
            print('Forced shutdown')
            lprint(urls_visited)

输出(用

Ctrl + c
强制)

working on 0...
done with 0...
working on 14...
working on 9...
working on 1...
working on 5...
working on 7...
working on 8...
done with 5...
working on 13...
working on 6...
working on 4...
done with 9...
working on 12...
done with 1...
working on 11...
working on 3...
working on 2...
done with 6...
working on 10...
done with 11...
done with 12...
done with 8...
done with 7...
done with 10...
done with 4...
done with 13...
done with 14...
done with 2...
done with 3...
^CForced shutdown
ThreadPool -> {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14}
© www.soinside.com 2019 - 2024. All rights reserved.