在我的代码中,我在多处理池中生成数据,然后在另一个池中处理它。从现在起,我的实现一直是将
data
保存在磁盘中,然后再加载。现在我想将数据保存在内存中,所以这就是我采取的方法。我使用 Multiprocessing SharedMemory 来回发送我处理的 BytesIO 对象(这样,多处理就不必腌制数据)。
from io import BytesIO
from multiprocessing import Pool
from multiprocessing.shared_memory import SharedMemory
def generate_data(_dummy_arg) -> str:
data = BytesIO(bytearray(1024 * 1024 * 38))
# prepare SharedMemory to send the bytes back to main thread
buf = data.getbuffer()
shared_memory = SharedMemory(create=True, size=buf.nbytes)
shared_memory.buf[:] = buf
shared_memory.close()
return shared_memory.name
def process_data(data_name: str) -> str:
# recover the data from its name, this is where the error happens
data = SharedMemory(data_name)
# FileNotFoundError: [Errno 2] No such file or directory: '/psm_607cb218'
return "some_result"
datas: list[SharedMemory] = []
with Pool(5) as p:
for data_name in p.map(generate_data, range(5)):
# recover SharedMemory from the name
data = SharedMemory(data_name)
datas.append(data)
# some code
with Pool(5) as p:
for returned in p.map(process_data, (data.name for data in datas)):
...
但是这个实现在第二个池中引发了
FileNotFoundError: [Errno 2] No such file or directory: '/psm_201b67a1'
。就像 SharedMemory 对象以某种方式消失一样。有什么想法吗?
这可能是由于 SharedMemory 实例在调用其 close() 方法时取消链接。据我所知这是一个尚未修复的错误 作为解决方法,请尝试在处理数据后调用 close 方法
def process_data(data_name: str) -> str:
data = SharedMemory(data_name)
# Process the data here...
# After processing, close the shared memory
data.close()
return "some_result"