如何使用concurrent.futures获取锚点href值和嵌套页面锚点href值?

问题描述 投票:0回答:1

我正在尝试使用

href
获取锚标记
concurrent.futures
值和嵌套网址?
我想减少使用下面的代码获取网址的时间。
使用下面的代码获取 URL 需要一些时间
import concurrent.futures
from urllib.parse import urlsplit

import requests
from bs4 import BeautifulSoup


def get_href_from_url(url):
    try:
        response = requests.get(url)
        response.raise_for_status()  # Check for errors in HTTP response
        parts = urlsplit(url)
        base = "{0.netloc}".format(parts)
        strip_base = base.replace("www.", "")
        base_url = "{0.scheme}://{0.netloc}".format(parts)
        path = url[:url.rfind('/') + 1] if '/' in parts.path else url
        soup = BeautifulSoup(response.text, 'html.parser')
        href_values = set()
        regex = r'.*-c?([0-9]+).html'
        for link in soup.find_all('a'):
            anchor = link.attrs["href"] if "href" in link.attrs else ''
            if anchor.startswith('/'):
                local_link = base_url + anchor
                href_values.add(local_link)
            elif strip_base in anchor:
                href_values.add(anchor)

            elif not anchor.startswith('http'):
                local_link = path + anchor
                href_values.add(local_link)

        return href_values
    except Exception as e:
        print(f"Error while processing {url}: {e}")
        return []


def follow_nested_urls(seed_url):
    visited_urls = set()
    urls_to_visit = [seed_url]

    while len(urls_to_visit):
        current_url = urls_to_visit.pop()
        if current_url in visited_urls:
            continue

        visited_urls.add(current_url)
        with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
            href_values = get_href_from_url(current_url)

        nested_urls = [url for url in href_values if url.startswith('http')]

        urls_to_visit.extend(nested_urls)

        # Process href values or do other tasks as needed
        # print(f"URL: {current_url}, HREF values: {href_values}")
        print(f"visited_urls " + current_url)
        print(len(visited_urls))
        # print(len(urls_to_visit))


if __name__ == "__main__":
    seed_url = "https://www.tradeindia.com/"  # Replace with your desired starting URL
    follow_nested_urls(seed_url)
python beautifulsoup concurrent.futures
1个回答
0
投票

您的代码,按原样,已正常执行。

这里有一个关于

ThreadPoolExecutor
如何工作的示例。

from concurrent.futures import ThreadPoolExecutor


# IO bound function
def scraper(url):
    print(f'working on {url}...')
    print(f'done with {url}...')
    return 1

urls = range(10)

# amount of workers
n = 4
with ThreadPoolExecutor(max_workers=n) as executor:
    res = executor.map(scraper, urls)
    print(*res)

注意,

map
返回一个生成器,按原始顺序存储每个函数的结果!


这里是问题的抽象:以线程安全的方式更新容器对象。

注意以下事项

    只要您不读取/写入特定条目,
  • list
    都是线程安全的,请使用
    queue.Queue
  • print
    不是线程安全的,请使用
    logging
  • requests.Session
    不是线程安全的(但你没有使用它,所以没关系)

不幸的是,程序不会“很好地”退出,但需要

Ctrl + c
。使用
Event
,向队列添加停止值、计时器……可以修复它。

from concurrent.futures import ThreadPoolExecutor
from queue import Queue

# for a thread-safe print
import logging
logging.basicConfig(format='Thread -> %(message)s', level=logging.WARNING)
lprint = logging.warning

# just for the example
from random import random, randint
from time import sleep


url_seed = 0 # test "url"
q_urls_to_visit = Queue()
q_urls_to_visit.put(url_seed)

urls_visited = set()

# just for the example
n = 15
L = list(range(10, n))


def scraper(url):
    lprint(f'working on {url}...')
    sleep(9*random())
    lprint(f'done with {url}...')

    if L:
        return [url] + [L.pop()] + [randint(0, 9) for _ in range(5)]
    return [url]


def update_queue(item):
    def wrapper(future):
        urls_visited.add(item)

        for res in future.result():
            if res not in urls_visited:
                q_urls_to_visit.put(res)
                urls_visited.add(res)

        q_urls_to_visit.task_done()

    return wrapper


if __name__ == '__main__':

    with ThreadPoolExecutor(max_workers=10) as executor:
        try:
            while True:
                url = q_urls_to_visit.get()
                future = executor.submit(scraper, url)
                future.add_done_callback(update_queue(url))
                sleep(2*random())

                              
        except (TimeoutError, KeyboardInterrupt):
            print('Forced shutdown')
            lprint(urls_visited)

输出(用

Ctrl + c
强制)

working on 0...
done with 0...
working on 14...
working on 3...
done with 3...
working on 2...
working on 7...
done with 14...
working on 13...
working on 8...
done with 13...
working on 4...
done with 4...
working on 5...
working on 12...
done with 7...
done with 2...
working on 9...
done with 8...
working on 11...
working on 10...
done with 12...
working on 6...
working on 1...
done with 9...
done with 5...
done with 11...
done with 1...
done with 10...
done with 6...
^CForced shutdown
ThreadPool -> {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14}
© www.soinside.com 2019 - 2024. All rights reserved.