如何在python中创建可连接的信号量?

问题描述 投票:0回答:1

在某些用例中,我需要等待所有已创建的线程完成并根据其结果做出一些决定,并查看是否需要进一步处理-没有ThreadPoolExecutor.shutdown()

我这样实现:

from threading import BoundedSemaphore, Event


class JoinSemaphore(BoundedSemaphore):
    def __init__(self, value=1):
        super().__init__(value)
        self._empty = Event()

    def join(self, timeout=None):
        if self._value < self._initial_value:
            self._empty.wait(timeout)

    def release(self):
        with self._cond:
            if self._value >= self._initial_value:
                raise ValueError("Semaphore released too many times")
            elif self._value == self._initial_value - 1:
                self._empty.set()

            self._value += 1
            self._cond.notify()

    def acquired(self):
        with self._cond:
            return self._initial_value - self._value

这里,我在没有任何保护的情况下计算self._value < self._initial_value,并且有风险。当我编写波纹管这样的join()函数以防止在计算self._value < self._initial_value时进行不必要的更改时,当主线程连接到信号量上并且此时其他线程并没有获得release()锁定时,我将面临死锁主线程已经产生了它,但是主线程仍在等待其他线程。因此此实现不正确。

    def join(self, timeout=None):
        with self._cond:
            if self._value < self._initial_value:
                self._empty.wait(timeout)

在第三种实现中,我不能保证当我想要为wait()事件提供_empty时,其他线程可能不会提交其结果并释放锁。

    def join(self, timeout=None):
        with self._cond:
            if self._value == self._initial_value:
                return
        self._empty.wait(timeout)

问题是:

如何使用锁正确计算self._value < self._initial_value并等待_empty事件并在等待之前释放锁以避免死锁?

非常感谢

python python-3.x multithreading semaphore python-multithreading
1个回答
0
投票
from threading import Event, Condition, Lock from time import monotonic as _time class JoinSemaphore: def __init__(self, value=1): if value < 0: raise ValueError("semaphore initial value must be >= 0") self._cond = Condition(Lock()) self._value = value self._initial_value = value self._empty = Event() self._empty.set() def acquire(self, blocking=True, timeout=None): if not blocking and timeout is not None: raise ValueError("can't specify timeout for non-blocking acquire") rc = False endtime = None with self._cond: while self._value == 0: if not blocking: break if timeout is not None: if endtime is None: endtime = _time() + timeout else: timeout = endtime - _time() if timeout <= 0: break self._cond.wait(timeout) else: self._empty.clear() self._value -= 1 rc = True return rc __enter__ = acquire def join(self, timeout=None): self._empty.wait(timeout) def release(self): with self._cond: if self._value >= self._initial_value: raise ValueError("Semaphore released too many times") elif self._value == self._initial_value - 1: self._empty.set() self._value += 1 self._cond.notify() def acquired(self): with self._cond: return self._initial_value - self._value

差异是:

class JoinSemaphore:

    def __init__(self, value=1):
        ...
        self._empty = Event()
        self._empty.set()

    def acquire(self, blocking=True, timeout=None):
        ...
        with self._cond:
            while ...:
        ...
            else:
                self._empty.clear()
        ...

    def join(self, timeout=None):
        self._empty.wait(timeout)

    def release(self):
        ...
            elif self._value == self._initial_value - 1:
                self._empty.set()
        ...

    def acquired(self):
        with self._cond:
            return self._initial_value - self._value

© www.soinside.com 2019 - 2024. All rights reserved.