我正在尝试使用
href
获取锚标记 concurrent.futures
值和嵌套网址?import concurrent.futures
from urllib.parse import urlsplit
import requests
from bs4 import BeautifulSoup
def get_href_from_url(url):
try:
response = requests.get(url)
response.raise_for_status() # Check for errors in HTTP response
parts = urlsplit(url)
base = "{0.netloc}".format(parts)
strip_base = base.replace("www.", "")
base_url = "{0.scheme}://{0.netloc}".format(parts)
path = url[:url.rfind('/') + 1] if '/' in parts.path else url
soup = BeautifulSoup(response.text, 'html.parser')
href_values = set()
regex = r'.*-c?([0-9]+).html'
for link in soup.find_all('a'):
anchor = link.attrs["href"] if "href" in link.attrs else ''
if anchor.startswith('/'):
local_link = base_url + anchor
href_values.add(local_link)
elif strip_base in anchor:
href_values.add(anchor)
elif not anchor.startswith('http'):
local_link = path + anchor
href_values.add(local_link)
return href_values
except Exception as e:
print(f"Error while processing {url}: {e}")
return []
def follow_nested_urls(seed_url):
visited_urls = set()
urls_to_visit = [seed_url]
while len(urls_to_visit):
current_url = urls_to_visit.pop()
if current_url in visited_urls:
continue
visited_urls.add(current_url)
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
href_values = get_href_from_url(current_url)
nested_urls = [url for url in href_values if url.startswith('http')]
urls_to_visit.extend(nested_urls)
# Process href values or do other tasks as needed
# print(f"URL: {current_url}, HREF values: {href_values}")
print(f"visited_urls " + current_url)
print(len(visited_urls))
# print(len(urls_to_visit))
if __name__ == "__main__":
seed_url = "https://www.tradeindia.com/" # Replace with your desired starting URL
follow_nested_urls(seed_url)
您的代码,按原样,已正常执行。
ThreadPoolExecutor
如何工作的示例。
from concurrent.futures import ThreadPoolExecutor
# IO bound function
def scraper(url):
print(f'working on {url}...')
print(f'done with {url}...')
return 1
urls = range(10)
# amount of workers
n = 4
with ThreadPoolExecutor(max_workers=n) as executor:
res = executor.map(scraper, urls)
print(*res)
注意,
map
返回一个生成器,按原始顺序存储每个函数的结果!
这里是问题的抽象:以线程安全的方式更新容器对象。
注意以下事项
list
都是线程安全的,请使用 queue.Queue
print
不是线程安全的,请使用 logging
requests.Session
不是线程安全的(但你没有使用它,所以没关系)不幸的是,程序不会“很好地”退出,但需要
Ctrl + c
。使用 Event
,向队列添加停止值、计时器……可以修复它。
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
# for a thread-safe print
import logging
logging.basicConfig(format='Thread -> %(message)s', level=logging.WARNING)
lprint = logging.warning
# just for the example
from random import random, randint
from time import sleep
url_seed = 0 # test "url"
q_urls_to_visit = Queue()
q_urls_to_visit.put(url_seed)
urls_visited = set()
# just for the example
n = 15
L = list(range(10, n))
def scraper(url):
lprint(f'working on {url}...')
sleep(9*random())
lprint(f'done with {url}...')
if L:
return [url] + [L.pop()] + [randint(0, 9) for _ in range(5)]
return [url]
def update_queue(item):
def wrapper(future):
urls_visited.add(item)
for res in future.result():
if res not in urls_visited:
q_urls_to_visit.put(res)
urls_visited.add(res)
q_urls_to_visit.task_done()
return wrapper
if __name__ == '__main__':
with ThreadPoolExecutor(max_workers=10) as executor:
try:
while True:
url = q_urls_to_visit.get()
future = executor.submit(scraper, url)
future.add_done_callback(update_queue(url))
sleep(2*random())
except (TimeoutError, KeyboardInterrupt):
print('Forced shutdown')
lprint(urls_visited)
输出(用
Ctrl + c
强制)
working on 0...
done with 0...
working on 14...
working on 3...
done with 3...
working on 2...
working on 7...
done with 14...
working on 13...
working on 8...
done with 13...
working on 4...
done with 4...
working on 5...
working on 12...
done with 7...
done with 2...
working on 9...
done with 8...
working on 11...
working on 10...
done with 12...
working on 6...
working on 1...
done with 9...
done with 5...
done with 11...
done with 1...
done with 10...
done with 6...
^CForced shutdown
ThreadPool -> {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14}