在线程内调用condition.wait()会导致在主线程上检索任何未来的块。

问题描述 投票:38回答:1

我有一些在线程池中执行的任务,这些任务共享一个再入式读写锁。这些任务如果执行完毕就返回期货。当锁发生争用时,重入式读写锁将等待条件。

我正在使用的库暴露了一个 wait_for_any 方法从任务集合中检索一个或多个已完成的期货。然而,即使一个或多个期货已经完成了任务,也不一定能从任务集合中检索出一个或多个已完成的期货。wait_for_any 方法将无法返回,直到所有期货结束。此外, wait_for_any 方法暴露了一个超时参数,如果设置了这个参数,随后就会被忽略。

我的问题是,我到底做错了什么,导致了这样一个 wait_for_any 方法进行阻塞?我对Python的条件等待和通知的实现理解错误,这些构造会不会在Python中完全阻塞每个线程?

我使用的库叫Futurist,由OpenStack基金会维护。下面是我使用的相关类和方法的链接。GreenThreadPoolExecutorwaiters.wait_for_any

这里是ReentrantReadWriteLock。

class ReentrantReadWriteLock(object):
    def __init__(self):

        self._read_lock = RLock()
        self._write_lock = RLock()
        self._condition = Condition
        self._num_readers = 0
        self._wants_write = False

    def read_acquire(self, blocking=True):
        int_lock = False
        try:
            if self._read_lock.acquire(blocking):
                int_lock = True
                LOG.warning("read internal lock acquired")
                while self._wants_write:
                    LOG.warning("read wants write true")
                    if not blocking:
                        LOG.warning("read non blocking")
                        return False
                    LOG.warning("read wait")
                    with self._condition:
                        self._condition.wait()
                    first_it = False
                LOG.warning("read acquired lock")
                self._num_readers += 1
                return True
            LOG.warning("read internal lock failed")
            return False
        finally:
            if int_lock:
                 self._read_lock.release()

    def write_acquire(self, blocking=True):
        int_lock = False
        try:
            if self._write_lock.acquire(blocking):
                int_lock = True
                LOG.warning("write internal lock acquired")
                while self._num_readers > 0 or self._wants_write:
                    LOG.warning("write wants write true or num read")
                    if not blocking:
                        LOG.warning("write non blocking")
                        return False
                    LOG.warning("write wait")
                    with self._condition:
                        self._condition.wait()
                    first_it = False
                LOG.warning("write acquired lock")
                self._wants_write = True
                return True
            LOG.warning("write internal lock failed")
            return False
        finally:
            if int_lock:
                self._write_lock.release()

为了测试这个锁,并使它无限期地阻塞,我做了以下的工作。

def get_read(self, rrwlock):
    return rrwlock.read_acquire()

def get_write(self, rrwlock):
    return rrwlock.write_acquire()

def test():
    self._threadpool = futurist.GreenThreadPoolExecutor(max_workers=4)
    rrwlock = ReentrantReadWriteLock()
    futures = []
    futures.append(self._threadpool.submit(self.get_read, rrwlock))
    futures.append(self._threadpool.submit(self.get_write, rrwlock))

    # Get the results and verify only one of the calls succeeded
    # assert that the other call is still pending
    results = waiters.wait_for_any(futures)
    self.assertTrue(results[0].pop().result)
    self.assertEqual(1, len(results[1]))

在这个例子中,执行 results = waiters.wait_for_any(futures) 块无限期的。这让我彻底困惑了。希望有人能给我提供这种行为的解释。

2019-10-16 18:55:00 UTC更新:主线程的阻塞并不局限于这个ReentrantReadWriteLockimplementation,在使用库时也会发生,比如 读写器锁.

更新时间:2019-10-17 08:15:00 UTC我已将此作为一个错误报告提交给futurist的维护者,因为我认为这种行为是不正确的。发射场错误报告

更新时间:2019-10-20 09:02:00 UTC我后来观察到在哪个futurist库里面调用进度受阻。waiter. event. wait(timeout)类似的问题似乎也提交给了Python 3.3和3.4,后来被关闭了。已关闭的问题

世界协调时2019-10-21 09:06:00更新已经提交了一个未来学家库的补丁,以尝试并解决 本期.

更新时间:2019-10-22 08:03:00 UTC提交的补丁并没有解决这个问题。当追溯到 waiter.event.wait(timeout) 当调用Python threading.py中的wait函数时,在Python threading.py中的调用块。waiter.acquire().

更新时间:2019-10-23 07:17:00 UTC我创建了一个 小仓库 这证明了使用原生的ThreadPoolExecutor和期货是可能的。我开始怀疑这是CPython中GIL造成的限制。下面的代码使用上图所示的相同锁演示了演示的操作。

from rrwlock import ReentrantReadWriteLock
from concurrent.futures import ThreadPoolExecutor

def read_lock(lock):
    lock.read_acquire()

def write_lock(lock):
    lock.write_acquire()

def main():
    local_lock = ReentrantReadWriteLock()
    with ThreadPoolExecutor(max_workers=2) as executor:
        # First task will submit fine
        future = executor.submit(read_lock, local_lock)
        # Second one will block indefinitely
        future2 = executor.submit(write_lock, local_lock)

更新 2019-10-31 07:36:00 UTC更新了重入式读写锁,使它能在Python2.7中工作,并与在 Github上的演示库.

此外,还发现2019-10-23日描述的原生线程池演示无法使用,因为连同最后一句话在内

future2 = executor.submit(write_lock, local_lock)

__exit__ 的方法被调用。当然,这个方法会尝试干净地关闭所有当前运行的线程,但由于锁的保持,这是不可能的。这个例子已经更新了一个 自旋_for_any 的例子。

futures = []
futures.append(executor.submit(read_lock, local_lock))
futures.append(executor.submit(write_lock, local_lock))

# This will loop indefinitely as one future will
# never be done but it shouldn't block.
# although similar to waiters.wait_for_any this
# would rather be 'spin_for_any' since it does
# not use wait().
while len(futures) > 0:
    for f in futures:
        if f.done():
            futures.remove(f)
            f.result()
            print("Future done")

这个本地Python并发 自旋_for_any 这个例子完全按照预期工作。

python multithreading threadpool wait future
1个回答
2
投票

在你的 ReentrantReadWriteLock 类,尝试改变

self._condition = Condition()
© www.soinside.com 2019 - 2024. All rights reserved.