首先,我很抱歉,如果标题是有点怪怪的,但我真的想不出如何把成一条线我面临的问题。
所以,我有以下代码
import time
from multiprocessing import Process, current_process, Manager
from multiprocessing import JoinableQueue as Queue
# from threading import Thread, current_thread
# from queue import Queue
def checker(q):
count = 0
while True:
if not q.empty():
data = q.get()
# print(f'{data} fetched by {current_process().name}')
# print(f'{data} fetched by {current_thread().name}')
q.task_done()
count += 1
else:
print('Queue is empty now')
print(current_process().name, '-----', count)
# print(current_thread().name, '-----', count)
if __name__ == '__main__':
t = time.time()
# m = Manager()
q = Queue()
# with open("/tmp/c.txt") as ifile:
# for line in ifile:
# q.put((line.strip()))
for i in range(1000):
q.put(i)
time.sleep(0.1)
procs = []
for _ in range(2):
p = Process(target=checker, args=(q,), daemon=True)
# p = Thread(target=checker, args=(q,))
p.start()
procs.append(p)
q.join()
for p in procs:
p.join()
样本输出
1:当进程只是挂
Queue is empty now
Process-2 ----- 501
output hangs at this point
2:当一切工作就好了。
Queue is empty now
Process-1 ----- 515
Queue is empty now
Process-2 ----- 485
Process finished with exit code 0
现在的行为是间歇性的,有时但并不总是会发生。
我一直在使用Manager.Queue()
以及在地方multiprocessing.Queue()
的,但没有成功,都表现出同样的问题,试过。
我既multiprocessing
和multithreading
测试这一点,我得到完全相同的行为,有一个细微的差别,随着这种行为multithreading
率相比multiprocessing
少得多。
所以,我认为这是我丢失的东西概念或做错了,但我不能现在就抓住它,因为我花了在这条路上太多时间,现在我的心是没有看到的东西可能是很基本的。
因此,任何帮助表示赞赏。
我相信你在checker
方法的竞争条件。您检查队列是否为空,然后出队在单独的步骤下一个任务。它通常没有互斥或锁定这两种类型的操作分开,因为队列的状态可以检查和流行之间改变是个好主意。这可能是一个非空,但随后另一个进程可能离队的等待工作之前,它通过检查的过程中能够做到这一点。
不过我一般喜欢在锁定尽可能通信;这是容易出错少,让人的意图更加清晰。在这种情况下,我会派一个警戒值的工作进程(如None
),以表明所有的工作已经完成。每个工人然后就转移到下一个对象(这始终是线程安全的),并且,如果对象是None
,子进程退出。
下面的示例代码是你的程序的简化版本,没有种族应该工作:
def checker(q):
while True:
data = q.get()
if data is None:
print(f'process f{current_process().name} ending')
return
else:
pass # do work
if __name__ == '__main__':
q = Queue()
for i in range(1000):
q.put(i)
procs = []
for _ in range(2):
q.put(None) # Sentinel value
p = Process(target=checker, args=(q,), daemon=True)
p.start()
procs.append(p)
for proc in procs:
proc.join()