如何让生产者知道消费者在python中完成工作

问题描述 投票:0回答:1

嗨我想瘫痪阅读和分析多个文本文件的过程。所以我有一个生产者说 Producer 和两个消费者 TextAnalyzer ATextAnalyzer B.

Producer 有一个 RequstQueue,其中包含 TextAnalyzer ATextAnalyzer B 的任务。

TextAnalyzerA 线程分配了一个任务并开始工作。一旦它完成它通知生产者和类似TextAnalyzerB线程完成任务并通知生产者。

一旦两个消费者都完成了他们的任务,我希望Producer启动另一个线程“ResultSender”来执行。

我的问题是通知 Producer 消费者已完成他们在 python 中的工作的最佳方式是什么?

我在这里创建了一些示例代码,通过引入 RequestQueue 和 FinishedQueue 来实现 Producer 和 Consumer 之间的通信。

 from time import sleep
from random import random
from threading import Thread


# global variable for consumer and producer to know
RequestQueue = Queue()
FinishedQueue = Queue()


class Task:
    def __init__(self, name, index, value, done):
        self.name = name
        self.index = index
        self.value = value
        self.done = done

    def getName(self):
        return self.name

    def getIndex(self):
        return self.index

    def getValue(self):
        return self.value

    def getDone(self):
        return self.done

    def setDone(self, done):
        self.done = done


def producer():
    print('Producer: Running')
    # generate items
    task = Task("(csv)", 1, "test", False)

    RequestQueue.put(task)
    print(">producer added {} {} {} {}".format(
        task.getName(), task.getIndex(), task.getValue(), task.getDone()))

    value = random()
    sleep(value)

    while True:
        if (FinishedQueue.empty() == False):
            fq = FinishedQueue.get()
            print(">producer found {} {} {} {}".format(
                fq.getName(), fq.getIndex(), fq.getValue(), fq.getDone()))

            break
        sleep(1)

    RequestQueue.put(None)
    print('Producer: Done')

# consumer task


def consumer():
    print('Consumer: Running')
    while True:

        item = RequestQueue.get()
        # check for stop
        if item is None:
            break

        print(">consumer is working on {} {} {} {}".format(
            item.getName(), item.getIndex(), item.getValue(), item.getDone()))

        # consumer do some work
        sleep(1)
        # set done to be true
        item.setDone(True)

        print(">consumer finished {} {} {} {}".format(
            item.getName(), item.getIndex(), item.getValue(), item.getDone()))
        FinishedQueue.put(item)

    # all done
    print('Consumer: Done')


def test():
    # create the shared queue

    tc = Thread(target=consumer, args=())
    tc.start()
    tp = Thread(target=producer, args=())
    tp.start()

    tp.join()
    tc.join()


test()
python multithreading python-multithreading producer-consumer condition-variable
1个回答
0
投票

Queue.get
是一个阻塞函数,你可以用它让生产者等待消费者完成他们的任务,你不需要循环和睡眠。

while True:  # not needed
    if (FinishedQueue.empty() == False):  # not needed
        fq = FinishedQueue.get()
        print(">producer found {} {} {} {}".format(
            fq.getName(), fq.getIndex(), fq.getValue(), fq.getDone()))

        break  # not needed
    sleep(1)  # not needed

你似乎在尝试重新发明一个 ThreadPoolExecutor,为什么不使用 python 标准库中已有的东西呢?并缩小您的代码并使其在此过程中更加健壮。

import concurrent.futures
consumer_threadpool = concurrent.futures.ThreadPoolExecutor()
producer_threadpool = concurrent.futures.ThreadPoolExecutor()
class Task:
    def __init__(self, name, index, value, done):
        self.name = name
        self.index = index
        self.value = value
        self.done = done
    def getName(self):
        return self.name
    def getIndex(self):
        return self.index
    def getValue(self):
        return self.value
    def getDone(self):
        return self.done
    def setDone(self, done):
        self.done = done

def consumer(item):
    print(">consumer is working on {} {} {} {}".format(
        item.getName(), item.getIndex(), item.getValue(), item.getDone()))
    # set done to be true
    item.setDone(True)
    print(">consumer finished {} {} {} {}".format(
        item.getName(), item.getIndex(), item.getValue(), item.getDone()))
    return item

def producer():
    print('Producer: Running')
    # generate items
    task = Task("(csv)", 1, "test", False)
    future1 = consumer_threadpool.submit(consumer, task)
    future2 = consumer_threadpool.submit(consumer, task)
    print(">producer added {} {} {} {}".format(
        task.getName(), task.getIndex(), task.getValue(), task.getDone()))
    fq = future1.result()
    fq2 = future2.result()
    print(">producer found {} {} {} {}".format(
        fq.getName(), fq.getIndex(), fq.getValue(), fq.getDone()))
    print('Producer: Done')

def main():
    tp = producer_threadpool.submit(producer)
    tp.result()
main()

这样做的一个明显优势是你有一个超时和错误处理机制,你可以选择加入(没有错误死锁),你可以同时在不同的任务上运行多个消费者和生产者,而不必担心任务同步.

© www.soinside.com 2019 - 2024. All rights reserved.