是否需要一直关闭SharedMemory?

问题描述 投票:0回答:1

我注意到在重负载下不断关闭和打开内存会影响性能(示例 1 get/set_shm)。

我可以在不同的进程中保留一个打开的 SharedMemory 并保留该内存的 numpy 数组的链接(示例 2 get/set_shm)吗?或者这样的决定会导致我的代码运行不稳定?

示例1

class MyClass(object):

    def __init__(self):
        pass

    def create_shm(self):
        self.lock = Lock()
        
        zero_arr = np.zeros(shape=(400,400), dtype=np.uint8)
        existing_shm = shared_memory.SharedMemory(create=True, size=zero_arr.nbytes)
        shm_arr = np.ndarray(zero_arr.shape, dtype=np.uint8, buffer=existing_shm.buf)
        shm_arr[:] = zero_arr[:]
        existing_shm.close()

        #save memory name
        self.shm_name = existing_shm.name

    def get_shm(self):
        existing_shm = shared_memory.SharedMemory(name=self.shm_name, create=False)
        ar_shm = np.ndarray((400,400), dtype=np.uint8, buffer=existing_shm.buf)

        self.lock.acquire()
        ar = ar_shm.copy()
        self.lock.release()
        existing_shm.close()
        return ar

    def set_shm(self, ar):
        existing_shm = shared_memory.SharedMemory(name=self.shm_name, create=False)
        ar_shm = np.ndarray((400,400), dtype=np.uint8, buffer=existing_shm.buf)

        self.lock.acquire()
        ar_shm[:] = ar[:]
        self.lock.release()
        existing_shm.close()
        return 1

    def func1(self):
        for i in range(30):
            new_arr = np.random.randint(0, high=255, size=(400,400), dtype=np.uint8)
            self.set_shm(new_arr)
            print(f'{i} | func1 | set_shm | {new_arr[:4,0]}')
            time.sleep(random.random())
        
    def func2(self):
        for i in range(30):
            new_arr = self.get_shm()
            print(f'{i} | func2 | get_shm | {new_arr[:4,0]}')
            new_arr[:] = 2
            self.set_shm(new_arr)
            print(f'{i} | func2 | set_shm | {new_arr[:4,0]}')
            time.sleep(random.random())

if __name__ == '__main__':

    foo = MyClass()
    foo.create_shm()

    p1 = Process(target=foo.func1)
    p2 = Process(target=foo.func2)

    p1.start()
    p2.start()


    p1.join()
    p2.join()
    
    ###need unlink but skip it for example

示例2

class MyClass(object):

    def __init__(self):
        pass

    def create_shm(self):
        self.lock = Lock()
        
        zero_arr = np.zeros(shape=(400,400), dtype=np.uint8)
        existing_shm = shared_memory.SharedMemory(create=True, size=zero_arr.nbytes)
        shm_arr = np.ndarray(zero_arr.shape, dtype=np.uint8, buffer=existing_shm.buf)
        shm_arr[:] = zero_arr[:]
        existing_shm.close()

        #save memory name
        self.shm_name = existing_shm.name
        self.shm_link = None
        self.arr_link = None

    def get_shm(self):
        if self.shm_link is None:
            existing_shm = shared_memory.SharedMemory(name=self.shm_name, create=False)
            ar_shm = np.ndarray((400,400), dtype=np.uint8, buffer=existing_shm.buf)
            self.shm_link = existing_shm
            self.arr_link = ar_shm
        self.lock.acquire()
        ar = self.arr_link.copy()
        self.lock.release()
        return ar

    def set_shm(self, ar):
        if self.shm_link is None:
            existing_shm = shared_memory.SharedMemory(name=self.shm_name, create=False)
            ar_shm = np.ndarray((400,400), dtype=np.uint8, buffer=existing_shm.buf)
            self.shm_link = existing_shm
            self.arr_link = ar_shm
        self.lock.acquire()
        self.arr_link[:] = ar[:]
        self.lock.release()
        return 1

    def func1(self):
        for i in range(30):
            new_arr = np.random.randint(0, high=255, size=(400,400), dtype=np.uint8)
            self.set_shm(new_arr)
            print(f'{i} | func1 | set_shm | {new_arr[:4,0]}')
            time.sleep(random.random())
        
    def func2(self):
        for i in range(30):
            new_arr = self.get_shm()
            print(f'{i} | func2 | get_shm | {new_arr[:4,0]}')
            new_arr[:] = 2
            self.set_shm(new_arr)
            print(f'{i} | func2 | set_shm | {new_arr[:4,0]}')
            time.sleep(random.random())

if __name__ == '__main__':

    foo = MyClass()
    foo.create_shm()

    p1 = Process(target=foo.func1)
    p2 = Process(target=foo.func2)

    p1.start()
    p2.start()


    p1.join()
    p2.join()
    
    ###need unlink but skip it for example

我只是想确定示例 2 没问题

python multiprocessing locking shared-memory
1个回答
0
投票

我实际上建议一直关闭文件不仅会损害性能,而且实际上是错误的。在 Windows 上,如果没有进程打开 SHM 文件句柄,它将被自动删除,而无需调用

unlink
。如果您希望代码可跨平台使用,则应确保至少一个进程(可能是主进程)保持句柄打开,直到数据should 取消链接。除此之外,我认为没有任何理由要求关闭文件,直到子进程中的进程结束。我遵循的设计模式是创建一个上下文管理器来处理 SHM 的打开和关闭。我什至将它放入一个类中以提供共享的 numpy 数组:https://stackoverflow.com/a/73279376/3220135

© www.soinside.com 2019 - 2024. All rights reserved.