我有一个脚本,从文件中获取 x url(例如:0-5 个文件行,然后 50-100 个文件行等),然后它使用
asyncio
批量请求这些 x url。但经过 1 次迭代后,它给出了错误。
Python 3.8.5
Win6
import requests
import os
import logging
import sys
import httpx
import asyncio
from time import sleep
THIS_DIR = os.path.dirname(os.path.realpath(__file__)).replace('\\', '/')
class Processor:
def open_file(self, file_name, fr=False, to=False):
if (not fr) and (not to):
with open(f'{THIS_DIR}/{file_name}', "r", encoding='utf-8') as f:
text = f.readlines()
else:
with open(f'{THIS_DIR}/{file_name}', "r", encoding='utf-8') as num_lines:
text = num_lines.readlines()[fr:to]
return text
def log(self, log_name, data):
FORMAT = '%(message)s'
logging.basiacConfig(
handlers=[
logging.FileHandler(
filename=f'{THIS_DIR}/{log_name}',
encoding='utf-8'
)
],
level=logging.INFO,
format=FORMAT,
force=True
)
logging.info(data.strip())
async def testing(self):
async with httpx.AsyncClient(limits=httpx.Limits(max_connections=None, max_keepalive_connections=100)) as cl:
async_resp = await asyncio.gather(*[cl.get(x, timeout=10.0) for x in text])
for i, j in enumerate(async_resp):
data = str(j) + ' ' + text[i]
print(data)
self.log('async_tested', data)
p = Processor()
for i in range(0, 1000, 50):
text = []
try:
fr = i
to = i + 50
for i in p.open_file('ordered', fr, to):
i = i.strip()
text.append(i)
print(fr, to)
huj = p.testing()
asyncio.run(huj)
except:
ended = False
while not ended:
print(f'Sleeping on: {fr}, {to}')
for i in p.open_file('ordered', fr, to):
i = i.strip()
text.append(i)
huj = p.testing()
asyncio.run(huj)
ended = True`
0 50
.... 48 links here
<Response [200 OK]> https://www.example.ru/book/1053533/
<Response [200 OK]> https://www.example.ru/book/1053538/
50 100
Sleeping on: 50, 100
Traceback (most recent call last):
File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\anyio\_core\_tasks.py", line 115, in fail_after
yield cancel_scope
File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpcore\_backends\anyio.py", line 114, in connect_tcp
stream: anyio.abc.ByteStream = await anyio.connect_tcp(
File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\anyio\_core\_sockets.py", line 224, in connect_tcp
await event.wait()
File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\anyio\_backends\_asyncio.py", line 688, in __aexit__
raise cancelled_exc_while_waiting_tasks
File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\anyio\_backends\_asyncio.py", line 668, in __aexit__
await asyncio.wait(self._tasks)
File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\asyncio\tasks.py", line 426, in wait
return await _wait(fs, timeout, return_when, loop)
File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\asyncio\tasks.py", line 534, in _wait
await waiter
asyncio.exceptions.CancelledError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpcore\_exceptions.py", line 10, in map_exceptions
yield
File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpcore\_backends\anyio.py", line 121, in connect_tcp
stream._raw_socket.setsockopt(*option) # type: ignore[attr-defined] # pragma: no cover
File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\contextlib.py", line 131, in __exit__
self.gen.throw(type, value, traceback)
File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\anyio\_core\_tasks.py", line 118, in fail_after
raise TimeoutError
TimeoutError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpx\_transports\default.py", line 67, in map_httpcore_exceptions
yield
File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpx\_transports\default.py", line 371, in handle_async_request
resp = await self._pool.handle_async_request(req)
File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpcore\_async\connection_pool.py", line 268, in handle_async_request
raise exc
File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpcore\_async\connection_pool.py", line 251, in handle_async_request
response = await connection.handle_async_request(request)
File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpcore\_async\connection.py", line 99, in handle_async_request
raise exc
File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpcore\_async\connection.py", line 76, in handle_async_request
stream = await self._connect(request)
File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpcore\_async\connection.py", line 124, in _connect
stream = await self._network_backend.connect_tcp(**kwargs)
File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpcore\_backends\auto.py", line 30, in connect_tcp
return await self._backend.connect_tcp(
File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpcore\_backends\anyio.py", line 121, in connect_tcp
stream._raw_socket.setsockopt(*option) # type: ignore[attr-defined] # pragma: no cover
File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\contextlib.py", line 131, in __exit__
self.gen.throw(type, value, traceback)
File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpcore\_exceptions.py", line 14, in map_exceptions
raise to_exc(exc) from exc
httpcore.ConnectTimeout
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "G:\Desktop\re email\bh\moscowbooks.ru\processor.py", line 357, in <module>
asyncio.run(huj)
File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\asyncio\runners.py", line 44, in run
return loop.run_until_complete(main)
File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\asyncio\base_events.py", line 616, in run_until_complete
return future.result()
File "G:\Desktop\re email\bh\moscowbooks.ru\processor.py", line 263, in testing
async_resp = await asyncio.gather(*[cl.get(x, timeout=10.0) for x in text])
File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpx\_client.py", line 1786, in get
return await self.request(
File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpx\_client.py", line 1559, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpx\_client.py", line 1646, in send
response = await self._send_handling_auth(
File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpx\_client.py", line 1674, in _send_handling_auth
response = await self._send_handling_redirects(
File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpx\_client.py", line 1711, in _send_handling_redirects
response = await self._send_single_request(request)
File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpx\_client.py", line 1748, in _send_single_request
response = await transport.handle_async_request(request)
File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpx\_transports\default.py", line 371, in handle_async_request
resp = await self._pool.handle_async_request(req)
File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\contextlib.py", line 131, in __exit__
self.gen.throw(type, value, traceback)
File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpx\_transports\default.py", line 84, in map_httpcore_exceptions
raise mapped_exc(message) from exc
httpx.ConnectTimeout
[Finished in 24.3s]
可能与某个 URL 的连接超时。我该如何解决这个问题?
我尝试阅读并添加一些参数,也许将 url 数量降低到 10 个有助于 4-5 次迭代,但又会这样。
您的错误消息似乎没有显示代码错误中的一行,因此这可能是您使用
httpx
的方式。只需调用 asyncio.run()
一次,即可在单个事件循环中管理您的协同例程。
管理一次活动呼叫数量的更好方法是使用
Semaphore(3)
,它可以保持三个请求处于活动状态,每次完成后替换一个请求。使用信号量无需多次 asyncio.run
调用。
import asyncio
import random
import httpx
async def fetch_url(i, url, semaphore):
async with semaphore:
async with httpx.AsyncClient() as client:
print(f'getting {i}')
reply = await client.get(url)
print(f'got {i}')
return i, reply
async def main():
urls = [f'http://httpbingo.org/delay/{random.randint(2,4)}'
for _ in range(10)]
# urls = ['http://httpbingo.org/unstable' for _ in range(10)]
semaphore = asyncio.Semaphore(3)
tasks = [fetch_url(i, url, semaphore) for i, url in enumerate(urls)]
responses = await asyncio.gather(*tasks)
for i, r in responses:
print(i, r)
if __name__ == "__main__":
asyncio.run(main())