我想实现一个不添加已经存在的元素的
multiprocessing.Queue
。
使用 Python STL 队列我没有任何问题,遵循 this 响应。对于多处理,我解决了一些问题,感谢this
为此,我执行以下操作:
from multiprocessing.queues import Queue
from multiprocessing import get_context
class CustomQueue(Queue):
def put(self, obj, block=True, timeout=None):
if obj not in self:
return super().put(obj, block, timeout)
def __contains__(self, item):
with self.mutex:
return item in self.queue
custom_queue = CustomQueue(ctx=get_context())
但是,当我调用 put 方法时,我得到
AttributeError: 'CustomQueue' object has no attribute 'mutex'
如何解决这个问题? 预先感谢您。
所以我显然解决了这个问题,而不是
self.queue
,我需要使用self._buffer
。不过这个选项不是线程安全的。有一个名为 _rlock
的变量,它似乎相当于 mutex
,但我害怕使用它,因为我不太了解,并且还有第二个名为 _wlock
的变量仅适用于 win32,所以 IDK。如果有人知道如何使该线程无风险,请告诉我。
class CustomQueue(Queue):
def put(self, obj, block=True, timeout=None):
# Note: Can have issues with threads. i.e. obj not in buffer but
# added later, so we added it twice.
if obj not in self:
return super().put(obj, block, timeout)
def __contains__(self, item):
return item in self._buffer