我有一个异步脚本,它向不同的 url(本例中为 223)发送 1 个请求,因此不是同一主机。问题是,在大约 150 个请求之后,它开始使 url 超时,但是当单独处理超时的 url 时,它工作正常并且它们得到处理。我尝试使用 aiohttp 并得到了相同的结果。我尝试添加 asyncio.sleep() 但仍然存在同样的问题。我尝试为每个请求创建一个客户端,但结果仍然相同。我不知道为什么它会超时。这是请求的函数:
async def fetch(url, client):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.99 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
}
response = await client.get(url, follow_redirects=True, headers=headers)
return response
和运行部分:
urls_list = [site.strip() for site in start_urls.replace(',', ' ').split() if site.strip()]
if second_input:
pattern_list = second_input
else:
pattern_list = []
async with httpx.AsyncClient() as client:
tasks = [process_url(url, skip_empty_results, depth, pattern_list, dataset, 5, client) for url in urls_list]
await asyncio.gather(*tasks)
fetch()
正在用于 process_url()
。
asyncio.gather
调用,正如这段代码中的那样,将简单地同时启动所有 200 和一些请求 - 即使 URL 不同,如果它们指向同一个服务,它们可能会限制您(甚至被淹没) )。
(对不同服务的 223 个同时传出呼叫不应减慢您的系统速度,但即使这样也可能发生,例如,如果您的网络配置使用有限的 DNS 服务)。
而不是
gather
,您可以尝试一种编码模式,例如一次进行 50 个调用,并在一段时间后补充这些调用,直到完成?可以使用 asyncio.wait
和一些变量来控制状态来做到这一点。顺便说一句,你的代码片段缺少 process_url
函数 - 我假设它只是调用它 fetch
。
import asyncio
...
async def fetch(url, client):
...
return response
async def process_url(*args):
...
response = await fetch(...)
...
async def main():
urls_list = [site.strip() for site in start_urls.replace(',', ' ').split() if site.strip()]
if second_input:
pattern_list = second_input
else:
pattern_list = []
batch_size = 50 # roughly. to simplify the code, we will let these go up to this value * 1.33
async with httpx.AsyncClient() as client:
tasks = [process_url(url, skip_empty_results, depth, pattern_list, dataset, 5, client) for url in urls_list]
current_tasks = set()
done_tasks = set()
while tasks or current_tasks:
if tasks and len(current_tasks) < batch_size // 3:
current_tasks.update(tasks[:batch_size])
tasks[:batch_size] = []
done, current_tasks = await asyncio.wait(current_tasks, timeout=10)
done_tasks.update(done)
results = [task.result() for task in in done_tasks if not task.exception()]
# optionally handle tasks that raised an exception here
...