我正在编写一个使用Python的 multiprocessing
模块来加速CPU绑定的任务,我希望我创建的子进程能够访问最初在父进程中创建的内存映射,而不需要重复。根据 multiprocessing
文件在Python 3.4中,子进程默认不再继承文件描述符,所以我试着使用了 os.set_inheritable()
来覆盖该行为。
下面是我做的一个快速模拟图来演示这个问题。
DATA = r"data.csv"
from sys import platform
WINDOWS = platform.startswith("win")
import os
from multiprocessing import Process
import mmap
from typing import Optional
def child(fd: int, shm_tag: Optional[str]) -> None:
if shm_tag: # i.e. if using Windows
mm = mmap.mmap(fd, 0, shm_tag, mmap.ACCESS_READ)
else:
mm = mmap.mmap(fd, 0, mmap.MAP_SHARED, mmap.PROT_READ)
mm.close()
if __name__ == "__main__":
# Some code differs on Windows
WINDOWS = platform.startswith("win")
# Open file
fd = os.open(DATA, os.O_RDONLY | os.O_BINARY if WINDOWS else os.O_RDONLY)
os.set_inheritable(fd, True)
# Create memory map from file descriptor
if WINDOWS:
shm_tag = "shm_mmap"
mm = mmap.mmap(fd, 0, shm_tag, mmap.ACCESS_READ)
else:
shm_tag = None
mm = mmap.mmap(fd, 0, mmap.MAP_SHARED, mmap.PROT_READ)
# Run child process
(p := Process(target = child, args = (fd, shm_tag), daemon = True)).start()
p.join()
p.close()
mm.close()
os.close(fd)
这个问题一直无法解决--或者说至少在我主要测试的Windows*上无法解决。我在子进程中收到了一个错误,这在很大程度上意味着文件描述符实际上并没有被继承。
Process Process-1:
Traceback (most recent call last):
File "C:\Program Files\Python38\lib\multiprocessing\process.py", line 315, in _bootstrap
self.run()
File "C:\Program Files\Python38\lib\multiprocessing\process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\[N.D.]\Documents\test.py", line 12, in child
mm = mmap.mmap(fd, 0, shm_tag, mmap.ACCESS_READ)
ValueError: cannot mmap an empty file
此外,无论我是否传递了以下信息,我都得到了同样的错误 True
或 False
到 os.set_inheritable()
仿佛这一切都没有什么不同。
这到底是怎么回事?我是不是用了 mmap
模块不正确?
* 可能相关。Windows使用 spawn()
以创建新的流程,而不是 fork()
如果你试图对一个空文件进行内存映射,就会抛出一个异常。
多亏了Eryk Sun的评论,我才得以做出一个可行的实现。
DATA = r"data.csv"
from sys import platform
if platform.startswith("win"):
WINDOWS = True
from msvcrt import get_osfhandle
else:
WINDOWS = False
import os
from multiprocessing import Process
import mmap
from typing import Optional
def child(fd_or_size: int, shm_tag: Optional[str]) -> None:
if WINDOWS:
mm = mmap.mmap(-1, fd_or_size, shm_tag, mmap.ACCESS_READ)
else:
mm = mmap.mmap(fd_or_size, 0, mmap.MAP_SHARED, mmap.PROT_READ)
mm.close()
if __name__ == "__main__":
# Open file
fd = os.open(DATA, os.O_RDONLY | os.O_BINARY if WINDOWS else os.O_RDONLY)
# Create memory map from file descriptor
if WINDOWS:
# Obtain underlying file handle from file descriptor
os.set_handle_inheritable(get_osfhandle(fd), True)
shm_tag = f"test_mmap_{os.getpid()}"
mm = mmap.mmap(fd, 0, shm_tag, mmap.ACCESS_READ)
else:
os.set_inheritable(fd, True)
mm = mmap.mmap(fd, 0, mmap.MAP_SHARED, mmap.PROT_READ)
# Run child process
(p := Process(target = child, args = (mm.size() if WINDOWS else fd, shm_tag),
daemon = True)).start()
p.join()
p.close()
mm.close()
os.close(fd)
重要的变化(全部在Windows上)。
get_osfhandle()
从文件描述符中获取底层文件句柄。