我尝试在 aiohttp 中使用 ClientSession,但在对多个请求重用 aiohttp.ClientSession 时,代码会崩溃并出现 RunTimeError,并显示消息
Timeout context manager should be used inside a task
。
当我根据每个请求创建新会话时,它正在工作,而不是重新使用 ClientSession。
import asyncio
from abc import ABC, abstractmethod
import aiohttp
from typing import Dict, List, Any
def get_headers(extra: Dict[str, Any] = {}) -> Dict[str, str]:
headers = {
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"accept-language": "en-GB,en;q=0.9,ja-JP;q=0.8,ja;q=0.7,en-US;q=0.6",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.102 Safari/537.36",
}
for key, val in extra.items():
headers[key] = val
return headers
class Scraper(ABC):
session: aiohttp.ClientSession = None
def __init__(self):
if not self.session:
self.session = asyncio.get_event_loop().run_until_complete(self.get_session())
@classmethod
async def get_session(cls):
cls.session = aiohttp.ClientSession()
return cls.session
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_value, traceback):
if self.session:
await self.session.close()
class Animepahe(Scraper):
_SITE_NAME: str = "animepahe"
site_url: str = "https://animepahe.ru"
api_url: str = "https://animepahe.ru/api"
manifest_header = get_headers({"referer": "https://kwik.cx", "origin": "https://kwik.cx"})
async def get_api(self, data: dict, headers: dict = get_headers()) -> dict:
resp = await self.get(self.api_url, data, headers)
return await resp.json()
async def get(self, url: str, data=None, headers: dict = get_headers()) -> aiohttp.ClientResponse:
data = {} or data
err, tries = None, 0
while tries < 10:
try:
async with self.session.get(url=url, params=data, headers=headers) as resp:
if resp.status != 200:
err = f"request failed with status: {resp.status}\n err msg: {resp.content}"
logging.error(f"{err}\nRetrying...")
raise aiohttp.ClientResponseError(None, None, message=err)
return resp
except (aiohttp.ClientOSError, asyncio.TimeoutError, aiohttp.ServerDisconnectedError, aiohttp.ServerTimeoutError):
await asyncio.sleep(choice([5, 4, 3, 2, 1])) # randomly await
tries += 1
continue
raise aiohttp.ClientResponseError(None, None, message=err)
if __name__ == "__main__":
Scraper()
async def main():
scraper = Animepahe()
print(await scraper.get_api({"m": "search", "q": "attack"}))
asyncio.run(main())
RuntimeError: Timeout context manager should be used inside a task
Unclosed client session
プロセスは終了コード 1 で終了しました
我正在使用 python 版本:3.10.2 和 aiohttp 版本:3.8.5
所有与异步相关的内容都应该在事件循环内
class Scraper(ABC):
session: aiohttp.ClientSession = None
@classmethod
async def create_session(cls):
if not cls.session:
cls.session = aiohttp.ClientSession()
class Animepahe(Scraper):
_SITE_NAME: str = "animepahe"
site_url: str = "https://animepahe.ru"
api_url: str = "https://animepahe.ru/api"
manifest_header = get_headers({"referer": "https://kwik.cx", "origin": "https://kwik.cx"})
@classmethod
async def set_session(cls):
cls.session = aiohttp.ClientSession()
async def get_api(self, data: dict, headers: dict = get_headers()) -> dict:
resp = await self.get(self.api_url, data, headers)
return await resp.json()
async def get(self, url: str, data=None, headers: dict = get_headers()) -> aiohttp.ClientResponse:
data = {} or data
err, tries = None, 0
while tries < 10:
try:
async with self.session.get(url=url, params=data, headers=headers) as resp:
if resp.status != 200:
err = f"request failed with status: {resp.status}\n err msg: {resp.content}"
logging.error(f"{err}\nRetrying...")
raise aiohttp.ClientResponseError(None, None, message=err)
return await resp.read()
except (aiohttp.ClientOSError, asyncio.TimeoutError, aiohttp.ServerDisconnectedError, aiohttp.ServerTimeoutError):
await asyncio.sleep(choice([5, 4, 3, 2, 1])) # randomly await
tries += 1
continue
raise aiohttp.ClientResponseError(None, None, message=err)
if __name__ == "__main__":
async def main():
await Scraper().set_session()
scraper = Animepahe()
print(await scraper.get("https://animepahe.ru/api", {"m": "search", "q": "attack"}))
asyncio.run(main())
从异步函数内部调用 Scraper() 后,即在主函数内部初始化 Scraper 将解决该错误。