Python 可迭代队列

问题描述 投票:0回答:4

我需要知道队列何时关闭并且不会有更多项目,以便我可以结束迭代。

我通过在队列中放置一个哨兵来做到这一点:

from Queue import Queue

class IterableQueue(Queue): 

    _sentinel = object()

    def __iter__(self):
        return self

    def close(self):
        self.put(self._sentinel)

    def next(self):
        item = self.get()
        if item is self._sentinel:
            raise StopIteration
        else:
            return item

鉴于这是队列的常见用途,是否有任何内置实现?

python queue iteration
4个回答
15
投票

哨兵是生产者发送不再有队列任务即将到来的消息的合理方式。

FWIW,使用 iter():

的两个参数形式可以大大简化你的代码
from Queue import Queue

class IterableQueue(Queue): 

    _sentinel = object()

    def __iter__(self):
        return iter(self.get, self._sentinel)

    def close(self):
        self.put(self._sentinel)

4
投票

多处理模块有自己的 Queue 版本,其中包含

close
方法。我不确定它在线程中如何工作,但值得一试。我不明白为什么它不应该同样工作:

from multiprocessing import Queue

q = Queue()
q.put(1)
q.get_nowait()
# 1
q.close()
q.get_nowait()
# ...
# IOError: handle out of range in select()

您可以捕获 IOError 作为关闭信号。

测试

from multiprocessing import Queue
from threading import Thread

def worker(q):
    while True:
        try:
            item = q.get(timeout=.5)
        except IOError:
            print "Queue closed. Exiting thread."
            return
        except:
            continue
        print "Got item:", item

q = Queue()
for i in xrange(3):
    q.put(i)
t = Thread(target=worker, args=(q,))
t.start()
# Got item: 0
# Got item: 1
# Got item: 2
q.close()
# Queue closed. Exiting thread.

虽然说实话,这与在 Queue.Queue 上设置标志没有太大区别。 multiprocessing.Queue 只是使用关闭的文件描述符作为标志:

from Queue import Queue

def worker2(q):
    while True:
        if q.closed:
            print "Queue closed. Exiting thread."
            return
        try:
            item = q.get(timeout=.5)
        except:
            continue
        print "Got item:", item

q = Queue()
q.closed = False
for i in xrange(3):
    q.put(i)
t = Thread(target=worker2, args=(q,))
t.start()
# Got item: 0
# Got item: 1
# Got item: 2
q.closed = True
# Queue closed. Exiting thread.

0
投票

一个老问题,

self._sentinel = Object()
的变体将会起作用。在 2021 年重新审视这一点,我建议使用 concurrent.futures 并结合使用
None
作为你的哨兵:

# Note: this is Python 3.8+ code                                                                                                                                                   

import queue
import time
import functools
import random
from concurrent.futures import ThreadPoolExecutor

def worker(tup):
    (q,i) = tup
    print(f"Starting thread {i}")
    partial_sum = 0
    numbers_added = 0
    while True:
        try:
            item = q.get()
            if item is None:
                # 'propagate' this 'sentinel' to anybody else                                                                                                                      
                q.put(None)
                break
            numbers_added += 1
            partial_sum += item
            # need to pretend that we're doing something asynchronous                                                                                                              
            time.sleep(random.random()/100)

    except Exception as e:
            print(f"(warning) Thread {i} got an exception {e}, that shouldn't happen.")
            break

    print(f"Thread {i} is done, saw a total of {numbers_added} numbers to add up")
    return partial_sum

MAX_RANGE = 1024
MAX_THREADS = 12

with ThreadPoolExecutor() as executor:

    # create a queue with numbers to add up                                                                                                                                        
    (q := queue.Queue()).queue = queue.deque(range(MAX_RANGE))

    # kick off the threads                                                                                                                                                         
    future_partials = executor.map(worker, [(q,i) for i in range(MAX_THREADS)])

    # they'll be done more or less instantly, but we'll make them wait                                                                                                             
    print("Threads launched with first batch ... sleeping 2 seconds")
    time.sleep(2)

    # threads are still available for more work!                                                                                                                                   
    for i in range(MAX_RANGE):
        q.put(i)

    print("Finished giving them another batch, this time we're not sleeping")

    # now we tell them all to wrap it up                                                                                                                                           
    q.put(None)
    # this will nicely catch the outputs                                                                                                                                           
    sum = functools.reduce(lambda x, y: x+y, future_partials)
    print(f"Got total sum {sum} (correct answer is {(MAX_RANGE-1)*MAX_RANGE}")

# Starting thread 0                                                                                                                                                                
# Starting thread 1                                                                                                                                                                
# Starting thread 2                                                                                                                                                                
# Starting thread 3                                                                                                                                                                
# Starting thread 4                                                                                                                                                                
# Starting thread 5                                                                                                                                                                
# Starting thread 6                                                                                                                                                                
# Starting thread 7                                                                                                                                                                
# Starting thread 8                                                                                                                                                                
# Starting thread 9                                                                                                                                                                
# Starting thread 10                                                                                                                                                               
# Starting thread 11                                                                                                                                                               
# Threads launched with first batch ... sleeping 2 seconds                                                                                                                         
# Finished giving them another batch, this time we're not sleeping                                                                                                                 
# Thread 0 is done, saw a total of 175 numbers to add up                                                                                                                           
# Thread 3 is done, saw a total of 178 numbers to add up                                                                                                                           
# Thread 11 is done, saw a total of 173 numbers to add up                                                                                                                          
# Thread 4 is done, saw a total of 177 numbers to add up                                                                                                                           
# Thread 9 is done, saw a total of 169 numbers to add up                                                                                                                           
# Thread 1 is done, saw a total of 172 numbers to add up                                                                                                                           
# Thread 7 is done, saw a total of 162 numbers to add up                                                                                                                           
# Thread 10 is done, saw a total of 161 numbers to add up                                                                                                                          
# Thread 5 is done, saw a total of 169 numbers to add up                                                                                                                           
# Thread 2 is done, saw a total of 157 numbers to add up                                                                                                                           
# Thread 6 is done, saw a total of 169 numbers to add up                                                                                                                           
# Thread 8 is done, saw a total of 186 numbers to add up                                                                                                                           
# Got total sum 1047552 (correct answer is 1047552      

                                                                                                                       

请注意,事实上的“主线程”只需将

None
推入队列,类似于条件变量“信号”,线程都会拾取(并传播)该变量。

此外,这不使用比标准(线程安全)队列更重的多处理器

Queue
。上面的代码还有一个好处是可以轻松修改为使用
ProcessPoolExecutor
或两者的混合(在任何一种情况下,您都需要使用
multiprocessing.Queue
)。

(旁注:一般来说,如果需要类来解决任何给定 Python 代中的“基本”问题,那么在更现代的版本中通常会有新的选项。)

(第二个旁注:代码是Python 3.8+的唯一原因是因为我是赋值表达式的粉丝,与上面的旁注一致,它解决了如何从一个列表,而不必诉诸非功能性解决方案。)


0
投票

我已经实现了与

asyncio.Queue
兼容的队列,可以使用
async for
循环进行迭代。请参阅
queutils.IterableQueue

特点

  • asyncio.Queue
    界面
  • AsyncIterable
    支持:
    async for item in queue:
  • 当队列清空时,消费者自动终止,并出现
    QueueDone
    异常
  • 生产者必须在
    add_producer()
    注册,并且在完成添加项目后必须用
    finish()
    通知队列
  • 可计数接口,通过
    count
    属性来计算任务数量 task_done()
  • 可以使用 count_items=False 禁用可计数属性。当您想要对多个 IterableQueue 的计数求和时,这非常有用。

安装

pip install queutils

使用方法

生产者排满队列

A Producer 是向队列添加项目的“进程”。生产者需要使用

add_producer()
协程注册到队列中。一旦生产者添加了它想要添加的所有项目,它就会用
finish()

通知队列
from queutils import IterableQueue

async def producer(
    Q: IterableQueue[int], N: int
) -> None:

    # Add a producer to add items to the queue
    await Q.add_producer()
    
    for i in range(N):
        await Q.put(i)
    
    # notify the queue that this producer does not add more
    await Q.finish()
    
    return None

消费者从队列中取出物品

Consumer 是一个“进程”,它使用

get()
协程从队列中获取项目。由于
IterableQueue
AsyncIterable
,因此可以迭代
async for

from queutils.iterablequeue import IterableQueue

async def consumer(Q: IterableQueue[int]):
    """
    Consume the queue
    """
    async for i in Q:
        print(f"consumer: got {i} from the queue")        
    print(f"consumer: queue is done")
© www.soinside.com 2019 - 2024. All rights reserved.