aiomultiprocessing 池冻结和 OSError:[Errno 24] 打开的文件太多

问题描述 投票:0回答:0

我看过this question并与chatGPT / bing交谈过(笑)但仍然无法弄清楚这个问题。

我正在尝试在具有 32 个 CPU 的机器上对 API 执行约 700 万个请求,将数据加载到 postgres。我的代码基本上是这样设置的:

from aiomultiprocess import Pool
CPUS = 30 #(32 CPUS available)
batch_size = 5000

for i in range(0, len(args_list), batch_size):
        log.info(f"<log message>")
        async with Pool(
            processes=CPUS,
            maxtasksperchild=100,
            childconcurrency=3,
            queuecount=int(CPUS / 3),
        ) as pool:
            await pool.starmap(fetch_api_results, args_list[i : i + batch_size])

--------编辑: 在评论中为每个请求添加

fetch_api_results
的修订。它基本上是一组构造 api url 的函数,然后递归地发出 aiohttp 请求,直到 api 请求结果中不再有
next_url
标记。

在这里。

from aiohttp import request

async def fetch_api_results(*args)
    try:
        result_objects= APIPaginator(*args)
        await result_objects.fetch()
        log.info("uploading data")
        #upload to db function

    except planned_exceptions as e:
        log.warning(e, exc_info=False)


class APIPaginator(object):
    async def query_data(self):
        url = self.api_base + "<str from arg>"
        payload = {"limit": 1000}
        await self.query_all(url, payload)

    async def query_all(self, url, payload):
        try:
            async with request(method="GET", url=url, params=payload) as response:
                log.info(f"status code: {response.status}")
                if response.status == 200:
                    results = await response.json()
                    self.results.append(results)
                    next_url = results.get("next_url")
                    if next_url:
                        await self.query_all(next_url)
                else:
                    response.raise_for_status()
         except: #(except block omitted)

    async def fetch(self):
        await self.query_data()

编辑结束 -------------- 它将运行一两个小时(预计需要一两天)然后冻结。没有抛出错误。当我用键盘中断它时,我会看到

OSError: [Errno 24] Too many open files
错误。

我把回溯放在下面。

根据我的理解,打开太多文件处理程序的问题似乎与池中新工作进程的产生有关。让我感到困惑的是,文档说当达到 maxtasksperchild 限制时,应该会导致旧的工作进程被杀死并产生一个新的工作进程。这太能防止内存泄漏了,我想,为了防止这个问题发生。

但是,更改 maxtasksperchild 参数没有产生任何变化。

此外,我实施了批处理以有效地终止池并在每 5000 个任务后启动一个新池,以防止文件处理程序的积累。一旦 with 块关闭,

with pool:
实现应该有效地杀死与该池有关的所有内容。但这也失败了。实施批处理方法后没有任何变化。

这一切让我很困惑。很明显它与新生成的进程的管道有关,但我不确定该怎么做。欢迎任何反馈。

短期修复只会延长我在脚本失败之前的时间,可能是增加可能打开的文件的最大数量(根据链接的答案,使用

ulimit -n
)。但我担心这也会被超过,因为这将是一项相当长期的工作。

非常感谢任何帮助!

这是完整的回溯:

File "<path-to-file>.py", line 127, in import_data
    await pool.starmap(fetch_api_results, args_list[i : i + batch_size])
  File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 136, in results
    return await self.pool.results(self.task_ids)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 312, in results
    await asyncio.sleep(0.005)
  File "/<path-to-env>/lib/python3.11/asyncio/tasks.py", line 639, in sleep
    return await future

File "/<path-to-env>/3.11.1/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/<path-to-env>/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/<path-to-file>/main.py", line 39, in add_all_data
    await import_data(args)
  File "/<path-to-file>/orchestrator.py", line 120, in import_data
    async with Pool(
  File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 196, in __aexit__
    await self.join()
  File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 379, in join
    await self._loop
  File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 229, in loop
    self.processes[self.create_worker(qid)] = qid
                   ^^^^^^^^^^^^^^^^^^^^^^^
  File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 261, in create_worker
    process.start()
  File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/core.py", line 153, in start
    return self.aio_process.start()
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/<path-to-env>/lib/python3.11/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
                  ^^^^^^^^^^^^^^^^^
  File "/<path-to-env>/lib/python3.11/multiprocessing/context.py", line 288, in _Popen
    return Popen(process_obj)
           ^^^^^^^^^^^^^^^^^^
  File "/<path-to-env>/lib/python3.11/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/<path-to-env>/lib/python3.11/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/home/<path-to-env>/lib/python3.11/multiprocessing/popen_spawn_posix.py", line 58, in _launch
    self.pid = util.spawnv_passfds(spawn.get_executable(),
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/<path-to-env>/lib/python3.11/multiprocessing/util.py", line 451, in spawnv_passfds
    errpipe_read, errpipe_write = os.pipe()
                                  ^^^^^^^^^
OSError: [Errno 24] Too many open files
python multiprocessing python-asyncio aiohttp aio
© www.soinside.com 2019 - 2024. All rights reserved.