重写 multiprocessing.queues.Queue put 方法

问题描述 投票:0回答:1

我想实现一个不添加已经存在的元素的

multiprocessing.Queue
。 使用 Python STL 队列我没有任何问题,遵循 this 响应。对于多处理,我解决了一些问题,感谢this 为此,我执行以下操作:

from multiprocessing.queues import Queue
from multiprocessing import get_context


class CustomQueue(Queue):
    def put(self, obj, block=True, timeout=None):
        if obj not in self:
            return super().put(obj, block, timeout)

    def __contains__(self, item):
        with self.mutex:
            return item in self.queue

custom_queue = CustomQueue(ctx=get_context())

但是,当我调用 put 方法时,我得到

AttributeError: 'CustomQueue' object has no attribute 'mutex'

如何解决这个问题? 预先感谢您。

python multiprocessing queue
1个回答
0
投票

所以我显然解决了这个问题,而不是

self.queue
,我需要使用
self._buffer
。不过这个选项不是线程安全的。有一个名为
_rlock
的变量,它似乎相当于
mutex
,但我害怕使用它,因为我不太了解,并且还有第二个名为
_wlock
的变量仅适用于 win32,所以 IDK。如果有人知道如何使该线程无风险,请告诉我。

class CustomQueue(Queue):
    def put(self, obj, block=True, timeout=None):
        # Note: Can have issues with threads. i.e. obj not in buffer but
        # added later, so we added it twice.
        if obj not in self:
            return super().put(obj, block, timeout)

    def __contains__(self, item):
        return item in self._buffer
© www.soinside.com 2019 - 2024. All rights reserved.