我有一些在线程池中执行的任务,这些任务共享一个再入式读写锁。这些任务如果执行完毕就返回期货。当锁发生争用时,重入式读写锁将等待条件。
我正在使用的库暴露了一个 wait_for_any
方法从任务集合中检索一个或多个已完成的期货。然而,即使一个或多个期货已经完成了任务,也不一定能从任务集合中检索出一个或多个已完成的期货。wait_for_any
方法将无法返回,直到所有期货结束。此外, wait_for_any
方法暴露了一个超时参数,如果设置了这个参数,随后就会被忽略。
我的问题是,我到底做错了什么,导致了这样一个 wait_for_any
方法进行阻塞?我对Python的条件等待和通知的实现理解错误,这些构造会不会在Python中完全阻塞每个线程?
我使用的库叫Futurist,由OpenStack基金会维护。下面是我使用的相关类和方法的链接。GreenThreadPoolExecutor
和 waiters.wait_for_any
这里是ReentrantReadWriteLock。
class ReentrantReadWriteLock(object):
def __init__(self):
self._read_lock = RLock()
self._write_lock = RLock()
self._condition = Condition
self._num_readers = 0
self._wants_write = False
def read_acquire(self, blocking=True):
int_lock = False
try:
if self._read_lock.acquire(blocking):
int_lock = True
LOG.warning("read internal lock acquired")
while self._wants_write:
LOG.warning("read wants write true")
if not blocking:
LOG.warning("read non blocking")
return False
LOG.warning("read wait")
with self._condition:
self._condition.wait()
first_it = False
LOG.warning("read acquired lock")
self._num_readers += 1
return True
LOG.warning("read internal lock failed")
return False
finally:
if int_lock:
self._read_lock.release()
def write_acquire(self, blocking=True):
int_lock = False
try:
if self._write_lock.acquire(blocking):
int_lock = True
LOG.warning("write internal lock acquired")
while self._num_readers > 0 or self._wants_write:
LOG.warning("write wants write true or num read")
if not blocking:
LOG.warning("write non blocking")
return False
LOG.warning("write wait")
with self._condition:
self._condition.wait()
first_it = False
LOG.warning("write acquired lock")
self._wants_write = True
return True
LOG.warning("write internal lock failed")
return False
finally:
if int_lock:
self._write_lock.release()
为了测试这个锁,并使它无限期地阻塞,我做了以下的工作。
def get_read(self, rrwlock):
return rrwlock.read_acquire()
def get_write(self, rrwlock):
return rrwlock.write_acquire()
def test():
self._threadpool = futurist.GreenThreadPoolExecutor(max_workers=4)
rrwlock = ReentrantReadWriteLock()
futures = []
futures.append(self._threadpool.submit(self.get_read, rrwlock))
futures.append(self._threadpool.submit(self.get_write, rrwlock))
# Get the results and verify only one of the calls succeeded
# assert that the other call is still pending
results = waiters.wait_for_any(futures)
self.assertTrue(results[0].pop().result)
self.assertEqual(1, len(results[1]))
在这个例子中,执行 results = waiters.wait_for_any(futures)
块无限期的。这让我彻底困惑了。希望有人能给我提供这种行为的解释。
2019-10-16 18:55:00 UTC更新:主线程的阻塞并不局限于这个ReentrantReadWriteLockimplementation,在使用库时也会发生,比如 读写器锁.
更新时间:2019-10-17 08:15:00 UTC我已将此作为一个错误报告提交给futurist的维护者,因为我认为这种行为是不正确的。发射场错误报告
更新时间:2019-10-20 09:02:00 UTC我后来观察到在哪个futurist库里面调用进度受阻。waiter. event. wait(timeout)类似的问题似乎也提交给了Python 3.3和3.4,后来被关闭了。已关闭的问题
世界协调时2019-10-21 09:06:00更新已经提交了一个未来学家库的补丁,以尝试并解决 本期.
更新时间:2019-10-22 08:03:00 UTC提交的补丁并没有解决这个问题。当追溯到 waiter.event.wait(timeout)
当调用Python threading.py中的wait函数时,在Python threading.py中的调用块。waiter.acquire().
更新时间:2019-10-23 07:17:00 UTC我创建了一个 小仓库 这证明了使用原生的ThreadPoolExecutor和期货是可能的。我开始怀疑这是CPython中GIL造成的限制。下面的代码使用上图所示的相同锁演示了演示的操作。
from rrwlock import ReentrantReadWriteLock
from concurrent.futures import ThreadPoolExecutor
def read_lock(lock):
lock.read_acquire()
def write_lock(lock):
lock.write_acquire()
def main():
local_lock = ReentrantReadWriteLock()
with ThreadPoolExecutor(max_workers=2) as executor:
# First task will submit fine
future = executor.submit(read_lock, local_lock)
# Second one will block indefinitely
future2 = executor.submit(write_lock, local_lock)
更新 2019-10-31 07:36:00 UTC更新了重入式读写锁,使它能在Python2.7中工作,并与在 Github上的演示库.
此外,还发现2019-10-23日描述的原生线程池演示无法使用,因为连同最后一句话在内
future2 = executor.submit(write_lock, local_lock)
该 __exit__
的方法被调用。当然,这个方法会尝试干净地关闭所有当前运行的线程,但由于锁的保持,这是不可能的。这个例子已经更新了一个 自旋_for_any 的例子。
futures = []
futures.append(executor.submit(read_lock, local_lock))
futures.append(executor.submit(write_lock, local_lock))
# This will loop indefinitely as one future will
# never be done but it shouldn't block.
# although similar to waiters.wait_for_any this
# would rather be 'spin_for_any' since it does
# not use wait().
while len(futures) > 0:
for f in futures:
if f.done():
futures.remove(f)
f.result()
print("Future done")
这个本地Python并发 自旋_for_any 这个例子完全按照预期工作。
在你的 ReentrantReadWriteLock
类,尝试改变
self._condition = Condition()