asyncio 在循环中批量请求 url 时出错

问题描述 投票:0回答:1

我有一个脚本,从文件中获取 x url(例如:0-5 个文件行,然后 50-100 个文件行等),然后它使用

asyncio
批量请求这些 x url。但经过 1 次迭代后,它给出了错误。 Python 3.8.5 Win6

代码

import requests
import os
import logging
import sys
import httpx
import asyncio
from time import sleep


THIS_DIR = os.path.dirname(os.path.realpath(__file__)).replace('\\', '/')


class Processor:
    def open_file(self, file_name, fr=False, to=False):
        if (not fr) and (not to):
            with open(f'{THIS_DIR}/{file_name}', "r", encoding='utf-8') as f:
                text = f.readlines()

        else:
            with open(f'{THIS_DIR}/{file_name}', "r", encoding='utf-8') as num_lines:
                text = num_lines.readlines()[fr:to]
    
        return text
    

    def log(self, log_name, data):
        FORMAT = '%(message)s'
        logging.basiacConfig(
            handlers=[
                logging.FileHandler(
                    filename=f'{THIS_DIR}/{log_name}',
                    encoding='utf-8'
                )
            ],

            level=logging.INFO,
            format=FORMAT,
            force=True 
        )

        logging.info(data.strip())


    async def testing(self): 
        async with httpx.AsyncClient(limits=httpx.Limits(max_connections=None,     max_keepalive_connections=100)) as cl:
            async_resp = await asyncio.gather(*[cl.get(x, timeout=10.0) for x in text])
        for i, j in enumerate(async_resp):
            data = str(j) + ' ' + text[i]
            print(data)

            self.log('async_tested', data)
              


p = Processor()

for i in range(0, 1000, 50):
    text = []

    try:
        fr = i
        to = i + 50

        for i in p.open_file('ordered', fr, to):
            i = i.strip()                    
            text.append(i)

        print(fr, to)

        huj = p.testing()
        asyncio.run(huj)

     except:
         ended = False

         while not ended:
             print(f'Sleeping on: {fr}, {to}')

             for i in p.open_file('ordered', fr, to):
                 i = i.strip()
                 text.append(i)
        
             huj = p.testing()
             asyncio.run(huj)
             ended = True`

错误

 0 50
 .... 48 links here
 <Response [200 OK]> https://www.example.ru/book/1053533/
 <Response [200 OK]> https://www.example.ru/book/1053538/
 50 100
 Sleeping on: 50, 100
 Traceback (most recent call last):
 File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\anyio\_core\_tasks.py", line 115, in fail_after
 yield cancel_scope
 File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpcore\_backends\anyio.py", line 114, in connect_tcp
 stream: anyio.abc.ByteStream = await anyio.connect_tcp(
 File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\anyio\_core\_sockets.py", line 224, in connect_tcp
 await event.wait()
 File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\anyio\_backends\_asyncio.py", line 688, in __aexit__
 raise cancelled_exc_while_waiting_tasks
 File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\anyio\_backends\_asyncio.py", line 668, in __aexit__
 await asyncio.wait(self._tasks)
 File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\asyncio\tasks.py", line 426, in wait
 return await _wait(fs, timeout, return_when, loop)
 File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\asyncio\tasks.py", line 534, in _wait
 await waiter
 asyncio.exceptions.CancelledError
 
 During handling of the above exception, another exception occurred:
 
 Traceback (most recent call last):
 File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpcore\_exceptions.py", line 10, in map_exceptions
 yield
 File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpcore\_backends\anyio.py", line 121, in connect_tcp
 stream._raw_socket.setsockopt(*option)  # type: ignore[attr-defined] # pragma: no cover
 File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\contextlib.py", line 131, in __exit__
 self.gen.throw(type, value, traceback)
 File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\anyio\_core\_tasks.py", line 118, in fail_after
 raise TimeoutError
 TimeoutError
 
 The above exception was the direct cause of the following exception:
 
 Traceback (most recent call last):
 File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpx\_transports\default.py", line 67, in map_httpcore_exceptions
 yield
 File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpx\_transports\default.py", line 371, in handle_async_request
 resp = await self._pool.handle_async_request(req)
 File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpcore\_async\connection_pool.py", line 268, in handle_async_request
 raise exc
 File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpcore\_async\connection_pool.py", line 251, in handle_async_request
 response = await connection.handle_async_request(request)
 File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpcore\_async\connection.py", line 99, in handle_async_request
 raise exc
 File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpcore\_async\connection.py", line 76, in handle_async_request
 stream = await self._connect(request)
 File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpcore\_async\connection.py", line 124, in _connect
 stream = await self._network_backend.connect_tcp(**kwargs)
 File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpcore\_backends\auto.py", line 30, in connect_tcp
 return await self._backend.connect_tcp(
 File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpcore\_backends\anyio.py", line 121, in connect_tcp
 stream._raw_socket.setsockopt(*option)  # type: ignore[attr-defined] # pragma: no cover
 File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\contextlib.py", line 131, in __exit__
 self.gen.throw(type, value, traceback)
 File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpcore\_exceptions.py", line 14, in map_exceptions
 raise to_exc(exc) from exc
 httpcore.ConnectTimeout
 
 The above exception was the direct cause of the following exception:
 
 Traceback (most recent call last):
 File "G:\Desktop\re email\bh\moscowbooks.ru\processor.py", line 357, in <module>
 asyncio.run(huj)
 File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\asyncio\runners.py", line 44, in run
 return loop.run_until_complete(main)
 File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\asyncio\base_events.py", line 616, in run_until_complete
 return future.result()
 File "G:\Desktop\re email\bh\moscowbooks.ru\processor.py", line 263, in testing
 async_resp = await asyncio.gather(*[cl.get(x, timeout=10.0) for x in text])
 File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpx\_client.py", line 1786, in get
 return await self.request(
 File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpx\_client.py", line 1559, in request
 return await self.send(request, auth=auth, follow_redirects=follow_redirects)
 File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpx\_client.py", line 1646, in send
 response = await self._send_handling_auth(
 File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpx\_client.py", line 1674, in _send_handling_auth
 response = await self._send_handling_redirects(
 File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpx\_client.py", line 1711, in _send_handling_redirects
 response = await self._send_single_request(request)
 File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpx\_client.py", line 1748, in _send_single_request
 response = await transport.handle_async_request(request)
 File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpx\_transports\default.py", line 371, in handle_async_request
 resp = await self._pool.handle_async_request(req)
 File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\contextlib.py", line 131, in __exit__
 self.gen.throw(type, value, traceback)
 File "C:\Users\User\AppData\Local\Programs\Python\Python38\lib\site-packages\httpx\_transports\default.py", line 84, in map_httpcore_exceptions
 raise mapped_exc(message) from exc
 httpx.ConnectTimeout
 [Finished in 24.3s]

可能与某个 URL 的连接超时。我该如何解决这个问题?

我尝试阅读并添加一些参数,也许将 url 数量降低到 10 个有助于 4-5 次迭代,但又会这样。

python python-requests python-asyncio
1个回答
0
投票

您的错误消息似乎没有显示代码错误中的一行,因此这可能是您使用

httpx
的方式。只需调用
asyncio.run()
一次,即可在单个事件循环中管理您的协同例程。

管理一次活动呼叫数量的更好方法是使用

Semaphore(3)
,它可以保持三个请求处于活动状态,每次完成后替换一个请求。使用信号量无需多次
asyncio.run
调用。

import asyncio
import random
import httpx


async def fetch_url(i, url, semaphore):
    async with semaphore:
        async with httpx.AsyncClient() as client:
            print(f'getting {i}')
            reply = await client.get(url)
            print(f'got {i}')
            return i, reply


async def main():
    urls = [f'http://httpbingo.org/delay/{random.randint(2,4)}'
            for _ in range(10)]
    # urls = ['http://httpbingo.org/unstable' for _ in range(10)]
    semaphore = asyncio.Semaphore(3)
    tasks = [fetch_url(i, url, semaphore) for i, url in enumerate(urls)]
    responses = await asyncio.gather(*tasks)
    for i, r in responses:
        print(i, r)


if __name__ == "__main__":
    asyncio.run(main())
© www.soinside.com 2019 - 2024. All rights reserved.