我有一个非常普通的生产者/消费者场景,但有一个转折。
我需要从数GB的输入流(可以是文件或HTTP流)中读取文本行;使用慢速且占用大量CPU的算法处理每一行,该算法将为输入的每一行输出一行文本;然后将输出行写入另一个流。所不同的是,我需要按照与产生它们的输入线相同的顺序来编写输出线。
这些情况的通常方法是使用multiprocessing.Pool运行CPU密集型算法,其中Queue从读取器进程中馈入行(实际上是成批的行),而另一个Queue从Pool和进入编写器过程:
/ [Pool] \
[Reader] --> InQueue --[Pool]---> OutQueue --> [Writer]
\ [Pool] /
但是如何确保输出行(或批次)以正确的顺序排序?
一个简单的答案是,“只需将它们写入一个临时文件,然后对该文件进行排序并将其写入输出”。我可能最终会这样做,但是我真的很想尽快开始流输出行-而不是等待从头到尾处理整个输入流。
我可以轻松编写自己的multiprocessing.Queue实现,该实现将使用Dictionary(或循环缓冲区列表),Lock和两个Condition(可能还有一个整数计数器)在内部对其项进行排序。但是,我需要从Manager中获取所有这些对象,而且我担心在多个进程之间使用这样的共享状态会降低性能。因此,是否有解决此问题的适当Pythony方法?
我有一个非常普通的生产者/消费者场景,但有一个转折点。我需要从数GB的输入流(可以是文件或HTTP流)中读取文本行;用...
也许我遗漏了一些东西,但是看来您的问题有一个基本答案。