我看过this question并与chatGPT / bing交谈过(笑)但仍然无法弄清楚这个问题。
我正在尝试在具有 32 个 CPU 的机器上对 API 执行约 700 万个请求,将数据加载到 postgres。我的代码基本上是这样设置的:
from aiomultiprocess import Pool
CPUS = 30 #(32 CPUS available)
batch_size = 5000
for i in range(0, len(args_list), batch_size):
log.info(f"<log message>")
async with Pool(
processes=CPUS,
maxtasksperchild=100,
childconcurrency=3,
queuecount=int(CPUS / 3),
) as pool:
await pool.starmap(fetch_api_results, args_list[i : i + batch_size])
--------编辑: 在评论中为每个请求添加
fetch_api_results
的修订。它基本上是一组构造 api url 的函数,然后递归地发出 aiohttp 请求,直到 api 请求结果中不再有 next_url
标记。
在这里。
from aiohttp import request
async def fetch_api_results(*args)
try:
result_objects= APIPaginator(*args)
await result_objects.fetch()
log.info("uploading data")
#upload to db function
except planned_exceptions as e:
log.warning(e, exc_info=False)
class APIPaginator(object):
async def query_data(self):
url = self.api_base + "<str from arg>"
payload = {"limit": 1000}
await self.query_all(url, payload)
async def query_all(self, url, payload):
try:
async with request(method="GET", url=url, params=payload) as response:
log.info(f"status code: {response.status}")
if response.status == 200:
results = await response.json()
self.results.append(results)
next_url = results.get("next_url")
if next_url:
await self.query_all(next_url)
else:
response.raise_for_status()
except: #(except block omitted)
async def fetch(self):
await self.query_data()
编辑结束 -------------- 它将运行一两个小时(预计需要一两天)然后冻结。没有抛出错误。当我用键盘中断它时,我会看到
OSError: [Errno 24] Too many open files
错误。
我把回溯放在下面。
根据我的理解,打开太多文件处理程序的问题似乎与池中新工作进程的产生有关。让我感到困惑的是,文档说当达到 maxtasksperchild 限制时,应该会导致旧的工作进程被杀死并产生一个新的工作进程。这太能防止内存泄漏了,我想,为了防止这个问题发生。
但是,更改 maxtasksperchild 参数没有产生任何变化。
此外,我实施了批处理以有效地终止池并在每 5000 个任务后启动一个新池,以防止文件处理程序的积累。一旦 with 块关闭,
with pool:
实现应该有效地杀死与该池有关的所有内容。但这也失败了。实施批处理方法后没有任何变化。
这一切让我很困惑。很明显它与新生成的进程的管道有关,但我不确定该怎么做。欢迎任何反馈。
短期修复只会延长我在脚本失败之前的时间,可能是增加可能打开的文件的最大数量(根据链接的答案,使用
ulimit -n
)。但我担心这也会被超过,因为这将是一项相当长期的工作。
非常感谢任何帮助!
这是完整的回溯:
File "<path-to-file>.py", line 127, in import_data
await pool.starmap(fetch_api_results, args_list[i : i + batch_size])
File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 136, in results
return await self.pool.results(self.task_ids)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 312, in results
await asyncio.sleep(0.005)
File "/<path-to-env>/lib/python3.11/asyncio/tasks.py", line 639, in sleep
return await future
File "/<path-to-env>/3.11.1/lib/python3.11/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/<path-to-env>/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "/<path-to-file>/main.py", line 39, in add_all_data
await import_data(args)
File "/<path-to-file>/orchestrator.py", line 120, in import_data
async with Pool(
File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 196, in __aexit__
await self.join()
File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 379, in join
await self._loop
File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 229, in loop
self.processes[self.create_worker(qid)] = qid
^^^^^^^^^^^^^^^^^^^^^^^
File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/pool.py", line 261, in create_worker
process.start()
File "/<path-to-env>/lib/python3.11/site-packages/aiomultiprocess/core.py", line 153, in start
return self.aio_process.start()
^^^^^^^^^^^^^^^^^^^^^^^^
File "/<path-to-env>/lib/python3.11/multiprocessing/process.py", line 121, in start
self._popen = self._Popen(self)
^^^^^^^^^^^^^^^^^
File "/<path-to-env>/lib/python3.11/multiprocessing/context.py", line 288, in _Popen
return Popen(process_obj)
^^^^^^^^^^^^^^^^^^
File "/<path-to-env>/lib/python3.11/multiprocessing/popen_spawn_posix.py", line 32, in __init__
super().__init__(process_obj)
File "/<path-to-env>/lib/python3.11/multiprocessing/popen_fork.py", line 19, in __init__
self._launch(process_obj)
File "/home/<path-to-env>/lib/python3.11/multiprocessing/popen_spawn_posix.py", line 58, in _launch
self.pid = util.spawnv_passfds(spawn.get_executable(),
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/<path-to-env>/lib/python3.11/multiprocessing/util.py", line 451, in spawnv_passfds
errpipe_read, errpipe_write = os.pipe()
^^^^^^^^^
OSError: [Errno 24] Too many open files