嗨我想瘫痪阅读和分析多个文本文件的过程。所以我有一个生产者说 Producer 和两个消费者 TextAnalyzer A 和 TextAnalyzer B.
Producer 有一个 RequstQueue,其中包含 TextAnalyzer A 和 TextAnalyzer B 的任务。
说 TextAnalyzerA 线程分配了一个任务并开始工作。一旦它完成它通知生产者和类似TextAnalyzerB线程完成任务并通知生产者。
一旦两个消费者都完成了他们的任务,我希望Producer启动另一个线程“ResultSender”来执行。
我的问题是通知 Producer 消费者已完成他们在 python 中的工作的最佳方式是什么?
我在这里创建了一些示例代码,通过引入 RequestQueue 和 FinishedQueue 来实现 Producer 和 Consumer 之间的通信。
from time import sleep
from random import random
from threading import Thread
# global variable for consumer and producer to know
RequestQueue = Queue()
FinishedQueue = Queue()
class Task:
def __init__(self, name, index, value, done):
self.name = name
self.index = index
self.value = value
self.done = done
def getName(self):
return self.name
def getIndex(self):
return self.index
def getValue(self):
return self.value
def getDone(self):
return self.done
def setDone(self, done):
self.done = done
def producer():
print('Producer: Running')
# generate items
task = Task("(csv)", 1, "test", False)
RequestQueue.put(task)
print(">producer added {} {} {} {}".format(
task.getName(), task.getIndex(), task.getValue(), task.getDone()))
value = random()
sleep(value)
while True:
if (FinishedQueue.empty() == False):
fq = FinishedQueue.get()
print(">producer found {} {} {} {}".format(
fq.getName(), fq.getIndex(), fq.getValue(), fq.getDone()))
break
sleep(1)
RequestQueue.put(None)
print('Producer: Done')
# consumer task
def consumer():
print('Consumer: Running')
while True:
item = RequestQueue.get()
# check for stop
if item is None:
break
print(">consumer is working on {} {} {} {}".format(
item.getName(), item.getIndex(), item.getValue(), item.getDone()))
# consumer do some work
sleep(1)
# set done to be true
item.setDone(True)
print(">consumer finished {} {} {} {}".format(
item.getName(), item.getIndex(), item.getValue(), item.getDone()))
FinishedQueue.put(item)
# all done
print('Consumer: Done')
def test():
# create the shared queue
tc = Thread(target=consumer, args=())
tc.start()
tp = Thread(target=producer, args=())
tp.start()
tp.join()
tc.join()
test()
Queue.get
是一个阻塞函数,你可以用它让生产者等待消费者完成他们的任务,你不需要循环和睡眠。
while True: # not needed
if (FinishedQueue.empty() == False): # not needed
fq = FinishedQueue.get()
print(">producer found {} {} {} {}".format(
fq.getName(), fq.getIndex(), fq.getValue(), fq.getDone()))
break # not needed
sleep(1) # not needed
你似乎在尝试重新发明一个 ThreadPoolExecutor,为什么不使用 python 标准库中已有的东西呢?并缩小您的代码并使其在此过程中更加健壮。
import concurrent.futures
consumer_threadpool = concurrent.futures.ThreadPoolExecutor()
producer_threadpool = concurrent.futures.ThreadPoolExecutor()
class Task:
def __init__(self, name, index, value, done):
self.name = name
self.index = index
self.value = value
self.done = done
def getName(self):
return self.name
def getIndex(self):
return self.index
def getValue(self):
return self.value
def getDone(self):
return self.done
def setDone(self, done):
self.done = done
def consumer(item):
print(">consumer is working on {} {} {} {}".format(
item.getName(), item.getIndex(), item.getValue(), item.getDone()))
# set done to be true
item.setDone(True)
print(">consumer finished {} {} {} {}".format(
item.getName(), item.getIndex(), item.getValue(), item.getDone()))
return item
def producer():
print('Producer: Running')
# generate items
task = Task("(csv)", 1, "test", False)
future1 = consumer_threadpool.submit(consumer, task)
future2 = consumer_threadpool.submit(consumer, task)
print(">producer added {} {} {} {}".format(
task.getName(), task.getIndex(), task.getValue(), task.getDone()))
fq = future1.result()
fq2 = future2.result()
print(">producer found {} {} {} {}".format(
fq.getName(), fq.getIndex(), fq.getValue(), fq.getDone()))
print('Producer: Done')
def main():
tp = producer_threadpool.submit(producer)
tp.result()
main()
这样做的一个明显优势是你有一个超时和错误处理机制,你可以选择加入(没有错误死锁),你可以同时在不同的任务上运行多个消费者和生产者,而不必担心任务同步.