如何终止带有队列的Python进程? + 队列大小的错误?

问题描述 投票:0回答:1

我刚刚开始使用 Python 进行多处理。 我在程序中使用

multiprocessing.queues 
,从一个进程传递到另一个进程。 问题是,当我使用
myprocess.join()
时,它仅在我正在写入的队列为空时终止。 所以我想知道原因是什么,为什么要这样构建以及如何解决它。我可以清空主进程中的所有队列,但我更喜欢这样。

错误部分: 我向你解释的是超过 100 个元素的队列的行为。

在我的示例中,如果您输入 100 个初始数据的列表:

in range(100)
将会按预期调用
p4.join()
,但如果您输入 1000 或更多,它将停止工作,正如我所解释的。所以我有点失落。

编辑:该错误出现在输入数据大小为 533 或 534 个元素时

谢谢,

这里有一个小例子向您展示。



import random
import multiprocessing
import time


def process1(data, queue1):
    for item in data:
        queue1.put(item)

    queue1.put(None)


def process2(queue1, queue2):
    while True:
        item = queue1.get()
        if item is None:
            queue2.put(None)
            break
        queue2.put(item * 2)


def process3(queue2, queue3):
    while True:
        item = queue2.get()
        if item is None:
            queue3.put(None)
            break
        queue3.put(item * 2)


def process4(queue3, queue4):
    while True:
        item = queue3.get()
        if item is None:
            queue4.put(None)
            print("None received")
            break
        queue4.put(item * 2)
    print("End process4")


if __name__ == "__main__":
    start_time = time.time()

    data = [random.randint(1, 10000) for _ in range(10000)]

    queue1 = multiprocessing.Queue()
    queue2 = multiprocessing.Queue()
    queue3 = multiprocessing.Queue()
    queue4 = multiprocessing.Queue()

    p1 = multiprocessing.Process(target=process1, args=(data, queue1))
    p2 = multiprocessing.Process(target=process2, args=(queue1, queue2))
    p3 = multiprocessing.Process(target=process3, args=(queue2, queue3))
    p4 = multiprocessing.Process(target=process4, args=(queue3, queue4))

    p1.start()
    p2.start()
    p3.start()
    p4.start()
    print("Proc started")

    p1.join()
    print("p1 join")
    p2.join()
    print("p2 join")
    p3.join()
    print("p3 join")
    p4.join()
    print("p4 join")

    results = []
    print("While loop")
    while True:
        item = queue4.get()
        if item is None:
            break
        results.append(item)

    end_time = time.time()

    print(end_time - start_time)



python multiprocessing
1个回答
0
投票

该问题在“加入使用队列的进程”部分中的https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming中进行了描述。

基本思想是进程将在终止之前等待,直到所有缓冲的项目都由“供给器”线程馈送到底层管道。

import multiprocessing
import random
import time


def process1(data, queue1):
    for item in data:
        queue1.put(item)

    queue1.put(None)


def process2(queue1, queue2):
    while True:
        item = queue1.get()
        if item is None:
            queue2.put(None)
            break
        queue2.put(item * 2)


def process3(queue2, queue3):
    while True:
        item = queue2.get()
        if item is None:
            queue3.put(None)
            break
        queue3.put(item * 2)


def process4(queue3, queue4):
    # calling queue4.cancel_join_thread() will correctly terminate the process
    # queue4.cancel_join_thread()
    # BUT not all items are put into the queue4 (rest are discarded)

    # see https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming
    # section "Joining processes that use queues"

    while True:
        item = queue3.get()
        if item is None:
            queue4.put(None)
            print("None received")
            break
        queue4.put(item * 2)
    print("End process4")
    print(queue4.qsize())


if __name__ == "__main__":
    start_time = time.time()

    data = [random.randint(1, 10000) for _ in range(10000)]

    queue1 = multiprocessing.Queue()
    queue2 = multiprocessing.Queue()
    queue3 = multiprocessing.Queue()
    queue4 = multiprocessing.Queue()

    p1 = multiprocessing.Process(target=process1, args=(data, queue1))
    p2 = multiprocessing.Process(target=process2, args=(queue1, queue2))
    p3 = multiprocessing.Process(target=process3, args=(queue2, queue3))
    p4 = multiprocessing.Process(target=process4, args=(queue3, queue4))

    p1.start()
    p2.start()
    p3.start()
    p4.start()
    print("Proc started")

    p1.join()
    print("p1 join")
    p2.join()
    print("p2 join")
    p3.join()
    print("p3 join")

    # don't call p4.join() here
    # p4.join()
    # print("p4 join")

    results = []
    print("While loop")
    while True:
        item = queue4.get()
        if item is None:
            break
        results.append(item)

    end_time = time.time()

    # call p4.join() here instead:
    p4.join()
    print("p4 join")

    print(end_time - start_time)

打印:

Proc started
p1 join
None received
End process4
10001
p2 join
p3 join
While loop
p4 join
0.1303730010986328
© www.soinside.com 2019 - 2024. All rights reserved.