我一直在提炼和调试使用 multiprocessing.Queue 跨主线程和 multiprocessing.Pool 时发生的随机死锁条件。它在 multiprocessing.Pool 尝试加入时挂起。
更具体地说,您有一个额外的线程作为 multiprocessing.Queue 消费者和 multiprocessing.Pool 工作人员作为主要生产者,但是当您在启动池工作人员之前将一些消息添加到主线程中的队列时会发生死锁。请参阅以下提炼脚本:
import multiprocessing as mp
import threading as th
from typing import Optional
log_queue = mp.Queue()
def listner():
while True:
v: Optional[str] = log_queue.get()
if v is None:
break
print(v)
def worker(x: int):
# Waste some time
v: float = 3.141576 * x
for _ in range(10):
v *= 0.92
log_queue.put(f'{v}')
def worker_error(e: BaseException):
print(f'Error: {e}')
return None
def main():
lt = th.Thread(target=listner, name='listener',
args=())
lt.start()
# print('Launched listner thread')
# print('Place some messages in the queue.')
for i in range(500):
log_queue.put(f'|--------------------------------------------------|')
# print('Running workers')
with mp.Pool() as pool:
for i in range(50):
pool.apply_async(
worker,
args=(i,),
error_callback=worker_error,
)
pool.close()
pool.join()
# print('Telling listener to stop ...')
log_queue.put(None)
lt.join()
if __name__ == '__main__':
main()
我开始认为它与全局范围内定义的 multiprocessing.Queue 有关。这是由 multiprocessing.Pool(有意)继承的。 也许 multiprocessing.Pool 正在尝试在队列上运行一些析构函数/清理例程,这会挂起它?
可能与 MP 管道和队列 部分中的警告有关,关于 MP.Pool 如何在所有缓冲项目通过队列的管道发送之前不会加入。如果是这种情况,为什么消费者线程不清除队列,因为 main 无论如何都被阻塞了?
有什么猜测吗?