我注意到在重负载下不断关闭和打开内存会影响性能(示例 1 get/set_shm)。
我可以在不同的进程中保留一个打开的 SharedMemory 并保留该内存的 numpy 数组的链接(示例 2 get/set_shm)吗?或者这样的决定会导致我的代码运行不稳定?
示例1
class MyClass(object):
def __init__(self):
pass
def create_shm(self):
self.lock = Lock()
zero_arr = np.zeros(shape=(400,400), dtype=np.uint8)
existing_shm = shared_memory.SharedMemory(create=True, size=zero_arr.nbytes)
shm_arr = np.ndarray(zero_arr.shape, dtype=np.uint8, buffer=existing_shm.buf)
shm_arr[:] = zero_arr[:]
existing_shm.close()
#save memory name
self.shm_name = existing_shm.name
def get_shm(self):
existing_shm = shared_memory.SharedMemory(name=self.shm_name, create=False)
ar_shm = np.ndarray((400,400), dtype=np.uint8, buffer=existing_shm.buf)
self.lock.acquire()
ar = ar_shm.copy()
self.lock.release()
existing_shm.close()
return ar
def set_shm(self, ar):
existing_shm = shared_memory.SharedMemory(name=self.shm_name, create=False)
ar_shm = np.ndarray((400,400), dtype=np.uint8, buffer=existing_shm.buf)
self.lock.acquire()
ar_shm[:] = ar[:]
self.lock.release()
existing_shm.close()
return 1
def func1(self):
for i in range(30):
new_arr = np.random.randint(0, high=255, size=(400,400), dtype=np.uint8)
self.set_shm(new_arr)
print(f'{i} | func1 | set_shm | {new_arr[:4,0]}')
time.sleep(random.random())
def func2(self):
for i in range(30):
new_arr = self.get_shm()
print(f'{i} | func2 | get_shm | {new_arr[:4,0]}')
new_arr[:] = 2
self.set_shm(new_arr)
print(f'{i} | func2 | set_shm | {new_arr[:4,0]}')
time.sleep(random.random())
if __name__ == '__main__':
foo = MyClass()
foo.create_shm()
p1 = Process(target=foo.func1)
p2 = Process(target=foo.func2)
p1.start()
p2.start()
p1.join()
p2.join()
###need unlink but skip it for example
示例2
class MyClass(object):
def __init__(self):
pass
def create_shm(self):
self.lock = Lock()
zero_arr = np.zeros(shape=(400,400), dtype=np.uint8)
existing_shm = shared_memory.SharedMemory(create=True, size=zero_arr.nbytes)
shm_arr = np.ndarray(zero_arr.shape, dtype=np.uint8, buffer=existing_shm.buf)
shm_arr[:] = zero_arr[:]
existing_shm.close()
#save memory name
self.shm_name = existing_shm.name
self.shm_link = None
self.arr_link = None
def get_shm(self):
if self.shm_link is None:
existing_shm = shared_memory.SharedMemory(name=self.shm_name, create=False)
ar_shm = np.ndarray((400,400), dtype=np.uint8, buffer=existing_shm.buf)
self.shm_link = existing_shm
self.arr_link = ar_shm
self.lock.acquire()
ar = self.arr_link.copy()
self.lock.release()
return ar
def set_shm(self, ar):
if self.shm_link is None:
existing_shm = shared_memory.SharedMemory(name=self.shm_name, create=False)
ar_shm = np.ndarray((400,400), dtype=np.uint8, buffer=existing_shm.buf)
self.shm_link = existing_shm
self.arr_link = ar_shm
self.lock.acquire()
self.arr_link[:] = ar[:]
self.lock.release()
return 1
def func1(self):
for i in range(30):
new_arr = np.random.randint(0, high=255, size=(400,400), dtype=np.uint8)
self.set_shm(new_arr)
print(f'{i} | func1 | set_shm | {new_arr[:4,0]}')
time.sleep(random.random())
def func2(self):
for i in range(30):
new_arr = self.get_shm()
print(f'{i} | func2 | get_shm | {new_arr[:4,0]}')
new_arr[:] = 2
self.set_shm(new_arr)
print(f'{i} | func2 | set_shm | {new_arr[:4,0]}')
time.sleep(random.random())
if __name__ == '__main__':
foo = MyClass()
foo.create_shm()
p1 = Process(target=foo.func1)
p2 = Process(target=foo.func2)
p1.start()
p2.start()
p1.join()
p2.join()
###need unlink but skip it for example
我只是想确定示例 2 没问题
我实际上建议一直关闭文件不仅会损害性能,而且实际上是错误的。在 Windows 上,如果没有进程打开 SHM 文件句柄,它将被自动删除,而无需调用
unlink
。如果您希望代码可跨平台使用,则应确保至少一个进程(可能是主进程)保持句柄打开,直到数据should 取消链接。除此之外,我认为没有任何理由要求关闭文件,直到子进程中的进程结束。我遵循的设计模式是创建一个上下文管理器来处理 SHM 的打开和关闭。我什至将它放入一个类中以提供共享的 numpy 数组:https://stackoverflow.com/a/73279376/3220135