如何在进程池内实现连接池?

问题描述 投票:0回答:1

我正在尝试有效地发出数百万个 http 请求。为此,我利用 aiomultiprocess 在每个 CPU 进程上运行异步事件循环。

我的代码如下所示:

from aiohttp import request
from aiomultiprocess import Pool

async def execute_request(url: str, payload: dict = {}) -> tuple[int, dict]:
        """Execute the request and return the response status code and json response
        Args:
            url: url to query
            payload: dict of query params
        Returns:
            (status_code, json_response)
                status_code: int of the response status code
                json_response: dict of the json response
        """
        async with request(method="GET", url=url, params=payload) as response:
            status_code = response.status
            json_response = await response.json() if status_code == 200 else {}
            return (status_code, json_response)

async def api_pool_uploader(url_args):
    async with Pool(**pool_kwargs) as pool:
        async for result in pool.starmap(execute_request, url_args):
            if result is not None:
                await upload_to_db_function(result)

asyncio.run(api_pool_uploader(url_args))

此代码在第一分钟运行良好,但在执行大约 1500 个请求后,所有进程的请求函数都会返回

ClientConnectionError
,并且我的计算机会断开所有互联网连接。就像这样,我的浏览器中的所有选项卡都开始旋转,并且来自我的计算机上任何位置的请求都无法发出。这持续了大约 45 秒,然后互联网恢复,直到我完成接下来的约 1500 个请求,然后它再次发生......

我已经确认这不是我的家庭互联网(所有其他设备上的 WiFi 等始终保持稳定),也不是 API 限制了我。这似乎是一个只发生在我的机器上的问题,甚至在我的 python 实例之外也会影响整个计算机。

值得注意的是,即使我将池中的进程数减少到 1,这种情况仍然会发生。一旦达到大约 1500 个请求,所有网络请求都会下降,尽管需要更长的时间才能达到 1500 个请求标记。

如果有人知道可能会发生什么,我很想听听。

我的主要怀疑是 aiohttp.request 正在与每个单独的请求创建一个新的“会话”连接。拥有 1500 个连接可能会使我的计算机的网络适配器超载? (这是一件事吗?)并导致它们全部掉线并在可能的情况下重新连接?

为了避免这种情况,我尝试利用 aiohttp.clientsession 的连接池,以便每个请求都不会打开一个全新的连接。然而我一直没能做到。

aiohttp docs 解释说,客户端会话应该在应用程序的整个范围内实例化一次,然后将

client
对象作为参数传递到请求函数中。但是,当生成多个进程时,传递给进程池中的协同例程的所有参数都必须是可序列化和可导入的。尝试执行此操作时,我收到此错误:

AttributeError: Can't pickle local object 'WeakSet.__init__.<locals>._remove'

此后,我尝试通过创建自己的 aiomultiprocess 分支来更改我的代码,以便在每个工作程序生成时为每个工作程序实例化一个客户端会话,但这令人惊讶地导致了相同的错误。

代码如下所示:

from aiohttp import ClientSession
class PoolWorker(Process):
    """Individual worker process for the async pool."""

    def __init__(
        self.client_session = ClientSession() #i added this
    )

    async def run(self) -> None:
    # I'm only showing the change I made to this function with relevant context
        task: PoolTask = self.tx.get_nowait()
        tid, func, args, kwargs = task
        args = [*args,self.client_session] #i added this
        future = asyncio.ensure_future(func(*args, **kwargs))

正如我之前所说,这会产生相同的错误...客户端无法被pickle...

在使用进程池进行多处理时是否可以利用连接池?如果是这样,它会帮助我解决我的问题还是我完全找错了树?

python multiprocessing python-asyncio python-multiprocessing aiohttp
1个回答
0
投票

你找到这个问题的答案了吗?我遇到了与我自己非常相似的问题完全相同的限制,我想知道您是否知道如何通过 aiomultiprocess 正确利用 ClientSession 对象。

© www.soinside.com 2019 - 2024. All rights reserved.