多队列 - 子进程被卡住,有时并没有收获

问题描述 投票:0回答:1

首先,我很抱歉,如果标题是有点怪怪的,但我真的想不出如何把成一条线我面临的问题。

所以,我有以下代码

import time
from multiprocessing import Process, current_process, Manager
from multiprocessing import JoinableQueue as Queue

# from threading import Thread, current_thread
# from queue import Queue


def checker(q):
    count = 0
    while True:
        if not q.empty():
            data = q.get()
            # print(f'{data} fetched by {current_process().name}')
            # print(f'{data} fetched by {current_thread().name}')
            q.task_done()
            count += 1
        else:
            print('Queue is empty now')
            print(current_process().name, '-----', count)
            # print(current_thread().name, '-----', count)


if __name__ == '__main__':
    t = time.time()
    # m = Manager()
    q = Queue()
    # with open("/tmp/c.txt") as ifile:
    #     for line in ifile:
    #         q.put((line.strip()))
    for i in range(1000):
        q.put(i)
    time.sleep(0.1)
    procs = []
    for _ in range(2):
        p = Process(target=checker, args=(q,), daemon=True)
        # p = Thread(target=checker, args=(q,))
        p.start()
        procs.append(p)
    q.join()
    for p in procs:
        p.join()

样本输出

1:当进程只是挂

Queue is empty now
Process-2 ----- 501
output hangs at this point

2:当一切工作就好了。

Queue is empty now
Process-1 ----- 515
Queue is empty now
Process-2 ----- 485

Process finished with exit code 0

现在的行为是间歇性的,有时但并不总是会发生。

我一直在使用Manager.Queue()以及在地方multiprocessing.Queue()的,但没有成功,都表现出同样的问题,试过。

我既multiprocessingmultithreading测试这一点,我得到完全相同的行为,有一个细微的差别,随着这种行为multithreading率相比multiprocessing少得多。

所以,我认为这是我丢失的东西概念或做错了,但我不能现在就抓住它,因为我花了在这条路上太多时间,现在我的心是没有看到的东西可能是很基本的。

因此,任何帮助表示赞赏。

python multiprocessing
1个回答
0
投票

我相信你在checker方法的竞争条件。您检查队列是否为空,然后出队在单独的步骤下一个任务。它通常没有互斥或锁定这两种类型的操作分开,因为队列的状态可以检查和流行之间改变是个好主意。这可能是一个非空,但随后另一个进程可能离队的等待工作之前,它通过检查的过程中能够做到这一点。

不过我一般喜欢在锁定尽可能通信;这是容易出错少,让人的意图更加清晰。在这种情况下,我会派一个警戒值的工作进程(如None),以表明所有的工作已经完成。每个工人然后就转移到下一个对象(这始终是线程安全的),并且,如果对象是None,子进程退出。

下面的示例代码是你的程序的简化版本,没有种族应该工作:

def checker(q):
    while True:
        data = q.get()
        if data is None:
            print(f'process f{current_process().name} ending')
            return
        else:
            pass # do work

if __name__ == '__main__':
    q = Queue()
    for i in range(1000):
        q.put(i)
    procs = []
    for _ in range(2):
        q.put(None) # Sentinel value
        p = Process(target=checker, args=(q,), daemon=True)
        p.start()
        procs.append(p)
    for proc in procs:
        proc.join()
© www.soinside.com 2019 - 2024. All rights reserved.