我刚刚开始使用 Python 进行多处理。 我在程序中使用
multiprocessing.queues
,从一个进程传递到另一个进程。
问题是,当我使用 myprocess.join()
时,它仅在我正在写入的队列为空时终止。
所以我想知道原因是什么,为什么要这样构建以及如何解决它。我可以清空主进程中的所有队列,但我更喜欢这样。
错误部分: 我向你解释的是超过 100 个元素的队列的行为。
在我的示例中,如果您输入 100 个初始数据的列表:
in range(100)
将会按预期调用 p4.join()
,但如果您输入 1000 或更多,它将停止工作,正如我所解释的。所以我有点失落。
编辑:该错误出现在输入数据大小为 533 或 534 个元素时
谢谢,
这里有一个小例子向您展示。
import random
import multiprocessing
import time
def process1(data, queue1):
for item in data:
queue1.put(item)
queue1.put(None)
def process2(queue1, queue2):
while True:
item = queue1.get()
if item is None:
queue2.put(None)
break
queue2.put(item * 2)
def process3(queue2, queue3):
while True:
item = queue2.get()
if item is None:
queue3.put(None)
break
queue3.put(item * 2)
def process4(queue3, queue4):
while True:
item = queue3.get()
if item is None:
queue4.put(None)
print("None received")
break
queue4.put(item * 2)
print("End process4")
if __name__ == "__main__":
start_time = time.time()
data = [random.randint(1, 10000) for _ in range(10000)]
queue1 = multiprocessing.Queue()
queue2 = multiprocessing.Queue()
queue3 = multiprocessing.Queue()
queue4 = multiprocessing.Queue()
p1 = multiprocessing.Process(target=process1, args=(data, queue1))
p2 = multiprocessing.Process(target=process2, args=(queue1, queue2))
p3 = multiprocessing.Process(target=process3, args=(queue2, queue3))
p4 = multiprocessing.Process(target=process4, args=(queue3, queue4))
p1.start()
p2.start()
p3.start()
p4.start()
print("Proc started")
p1.join()
print("p1 join")
p2.join()
print("p2 join")
p3.join()
print("p3 join")
p4.join()
print("p4 join")
results = []
print("While loop")
while True:
item = queue4.get()
if item is None:
break
results.append(item)
end_time = time.time()
print(end_time - start_time)
该问题在“加入使用队列的进程”部分中的https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming中进行了描述。
基本思想是进程将在终止之前等待,直到所有缓冲的项目都由“供给器”线程馈送到底层管道。
import multiprocessing
import random
import time
def process1(data, queue1):
for item in data:
queue1.put(item)
queue1.put(None)
def process2(queue1, queue2):
while True:
item = queue1.get()
if item is None:
queue2.put(None)
break
queue2.put(item * 2)
def process3(queue2, queue3):
while True:
item = queue2.get()
if item is None:
queue3.put(None)
break
queue3.put(item * 2)
def process4(queue3, queue4):
# calling queue4.cancel_join_thread() will correctly terminate the process
# queue4.cancel_join_thread()
# BUT not all items are put into the queue4 (rest are discarded)
# see https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming
# section "Joining processes that use queues"
while True:
item = queue3.get()
if item is None:
queue4.put(None)
print("None received")
break
queue4.put(item * 2)
print("End process4")
print(queue4.qsize())
if __name__ == "__main__":
start_time = time.time()
data = [random.randint(1, 10000) for _ in range(10000)]
queue1 = multiprocessing.Queue()
queue2 = multiprocessing.Queue()
queue3 = multiprocessing.Queue()
queue4 = multiprocessing.Queue()
p1 = multiprocessing.Process(target=process1, args=(data, queue1))
p2 = multiprocessing.Process(target=process2, args=(queue1, queue2))
p3 = multiprocessing.Process(target=process3, args=(queue2, queue3))
p4 = multiprocessing.Process(target=process4, args=(queue3, queue4))
p1.start()
p2.start()
p3.start()
p4.start()
print("Proc started")
p1.join()
print("p1 join")
p2.join()
print("p2 join")
p3.join()
print("p3 join")
# don't call p4.join() here
# p4.join()
# print("p4 join")
results = []
print("While loop")
while True:
item = queue4.get()
if item is None:
break
results.append(item)
end_time = time.time()
# call p4.join() here instead:
p4.join()
print("p4 join")
print(end_time - start_time)
打印:
Proc started
p1 join
None received
End process4
10001
p2 join
p3 join
While loop
p4 join
0.1303730010986328