在某些用例中,我需要等待所有已创建的线程完成并根据其结果做出一些决定,并查看是否需要进一步处理-没有ThreadPoolExecutor.shutdown()
。
我这样实现:
from threading import BoundedSemaphore, Event
class JoinSemaphore(BoundedSemaphore):
def __init__(self, value=1):
super().__init__(value)
self._empty = Event()
def join(self, timeout=None):
if self._value < self._initial_value:
self._empty.wait(timeout)
def release(self):
with self._cond:
if self._value >= self._initial_value:
raise ValueError("Semaphore released too many times")
elif self._value == self._initial_value - 1:
self._empty.set()
self._value += 1
self._cond.notify()
def acquired(self):
with self._cond:
return self._initial_value - self._value
这里,我在没有任何保护的情况下计算self._value < self._initial_value
,并且有风险。当我编写波纹管这样的join()函数以防止在计算self._value < self._initial_value
时进行不必要的更改时,当主线程连接到信号量上并且此时其他线程并没有获得release()
锁定时,我将面临死锁主线程已经产生了它,但是主线程仍在等待其他线程。因此此实现不正确。
def join(self, timeout=None):
with self._cond:
if self._value < self._initial_value:
self._empty.wait(timeout)
在第三种实现中,我不能保证当我想要为wait()
事件提供_empty
时,其他线程可能不会提交其结果并释放锁。
def join(self, timeout=None):
with self._cond:
if self._value == self._initial_value:
return
self._empty.wait(timeout)
问题是:
如何使用锁正确计算self._value < self._initial_value
并等待_empty
事件并在等待之前释放锁以避免死锁?
非常感谢
from threading import Event, Condition, Lock
from time import monotonic as _time
class JoinSemaphore:
def __init__(self, value=1):
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
self._cond = Condition(Lock())
self._value = value
self._initial_value = value
self._empty = Event()
self._empty.set()
def acquire(self, blocking=True, timeout=None):
if not blocking and timeout is not None:
raise ValueError("can't specify timeout for non-blocking acquire")
rc = False
endtime = None
with self._cond:
while self._value == 0:
if not blocking:
break
if timeout is not None:
if endtime is None:
endtime = _time() + timeout
else:
timeout = endtime - _time()
if timeout <= 0:
break
self._cond.wait(timeout)
else:
self._empty.clear()
self._value -= 1
rc = True
return rc
__enter__ = acquire
def join(self, timeout=None):
self._empty.wait(timeout)
def release(self):
with self._cond:
if self._value >= self._initial_value:
raise ValueError("Semaphore released too many times")
elif self._value == self._initial_value - 1:
self._empty.set()
self._value += 1
self._cond.notify()
def acquired(self):
with self._cond:
return self._initial_value - self._value
差异是:
class JoinSemaphore:
def __init__(self, value=1):
...
self._empty = Event()
self._empty.set()
def acquire(self, blocking=True, timeout=None):
...
with self._cond:
while ...:
...
else:
self._empty.clear()
...
def join(self, timeout=None):
self._empty.wait(timeout)
def release(self):
...
elif self._value == self._initial_value - 1:
self._empty.set()
...
def acquired(self):
with self._cond:
return self._initial_value - self._value