超时上下文管理器应该在任务内部使用

问题描述 投票:0回答:1

我尝试在 aiohttp 中使用 ClientSession,但在对多个请求重用 aiohttp.ClientSession 时,代码会崩溃并出现 RunTimeError,并显示消息

Timeout context manager should be used inside a task

当我根据每个请求创建新会话时,它正在工作,而不是重新使用 ClientSession。

import asyncio
from abc import ABC, abstractmethod
import aiohttp
from typing import Dict, List, Any


def get_headers(extra: Dict[str, Any] = {}) -> Dict[str, str]:
    headers = {
        "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
        "accept-language": "en-GB,en;q=0.9,ja-JP;q=0.8,ja;q=0.7,en-US;q=0.6",
        "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.102 Safari/537.36",
    }
    for key, val in extra.items():
        headers[key] = val
    return headers


class Scraper(ABC):
    session: aiohttp.ClientSession = None

    def __init__(self):
        if not self.session:
            self.session = asyncio.get_event_loop().run_until_complete(self.get_session())

    @classmethod
    async def get_session(cls):
        cls.session = aiohttp.ClientSession()
        return cls.session

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc_value, traceback):
        if self.session:
            await self.session.close()


class Animepahe(Scraper):
    _SITE_NAME: str = "animepahe"
    site_url: str = "https://animepahe.ru"
    api_url: str = "https://animepahe.ru/api"
    manifest_header = get_headers({"referer": "https://kwik.cx", "origin": "https://kwik.cx"})

    async def get_api(self, data: dict, headers: dict = get_headers()) -> dict:
        resp = await self.get(self.api_url, data, headers)
        return await resp.json()

    async def get(self, url: str, data=None, headers: dict = get_headers()) -> aiohttp.ClientResponse:
        data = {} or data
        err, tries = None, 0

        while tries < 10:
            try:
                async with self.session.get(url=url, params=data, headers=headers) as resp:
                    if resp.status != 200:
                        err = f"request failed with status: {resp.status}\n err msg: {resp.content}"
                        logging.error(f"{err}\nRetrying...")
                        raise aiohttp.ClientResponseError(None, None, message=err)

                    return resp
            except (aiohttp.ClientOSError, asyncio.TimeoutError, aiohttp.ServerDisconnectedError, aiohttp.ServerTimeoutError):
                await asyncio.sleep(choice([5, 4, 3, 2, 1]))  # randomly await
                tries += 1
                continue

        raise aiohttp.ClientResponseError(None, None, message=err)


if __name__ == "__main__":
    Scraper()

    async def main():
        scraper = Animepahe()
        print(await scraper.get_api({"m": "search", "q": "attack"}))

    asyncio.run(main())

回溯

RuntimeError: Timeout context manager should be used inside a task
Unclosed client session

プロセスは終了コード 1 で終了しました

预期行为:JSon 响应

我正在使用 python 版本:3.10.2 和 aiohttp 版本:3.8.5

python async-await python-asyncio aiohttp
1个回答
0
投票

所有与异步相关的内容都应该在事件循环内

class Scraper(ABC):
    session: aiohttp.ClientSession = None

    @classmethod
    async def create_session(cls):
        if not cls.session:
            cls.session = aiohttp.ClientSession()


class Animepahe(Scraper):
    _SITE_NAME: str = "animepahe"
    site_url: str = "https://animepahe.ru"
    api_url: str = "https://animepahe.ru/api"
    manifest_header = get_headers({"referer": "https://kwik.cx", "origin": "https://kwik.cx"})

    @classmethod
    async def set_session(cls):
        cls.session = aiohttp.ClientSession()

    async def get_api(self, data: dict, headers: dict = get_headers()) -> dict:
        resp = await self.get(self.api_url, data, headers)
        return await resp.json()

    async def get(self, url: str, data=None, headers: dict = get_headers()) -> aiohttp.ClientResponse:
        data = {} or data
        err, tries = None, 0

        while tries < 10:
            try:
                async with self.session.get(url=url, params=data, headers=headers) as resp:
                    if resp.status != 200:
                        err = f"request failed with status: {resp.status}\n err msg: {resp.content}"
                        logging.error(f"{err}\nRetrying...")
                        raise aiohttp.ClientResponseError(None, None, message=err)

                    return await resp.read()
            except (aiohttp.ClientOSError, asyncio.TimeoutError, aiohttp.ServerDisconnectedError, aiohttp.ServerTimeoutError):
                await asyncio.sleep(choice([5, 4, 3, 2, 1]))  # randomly await
                tries += 1
                continue

        raise aiohttp.ClientResponseError(None, None, message=err)


if __name__ == "__main__":

    async def main():
        await Scraper().set_session()
        scraper = Animepahe()
        print(await scraper.get("https://animepahe.ru/api", {"m": "search", "q": "attack"}))

    asyncio.run(main())

从异步函数内部调用 Scraper() 后,即在主函数内部初始化 Scraper 将解决该错误。

© www.soinside.com 2019 - 2024. All rights reserved.