我需要知道队列何时关闭并且不会有更多项目,以便我可以结束迭代。
我通过在队列中放置一个哨兵来做到这一点:
from Queue import Queue
class IterableQueue(Queue):
_sentinel = object()
def __iter__(self):
return self
def close(self):
self.put(self._sentinel)
def next(self):
item = self.get()
if item is self._sentinel:
raise StopIteration
else:
return item
鉴于这是队列的常见用途,是否有任何内置实现?
哨兵是生产者发送不再有队列任务即将到来的消息的合理方式。
FWIW,使用 iter():
的两个参数形式可以大大简化你的代码from Queue import Queue
class IterableQueue(Queue):
_sentinel = object()
def __iter__(self):
return iter(self.get, self._sentinel)
def close(self):
self.put(self._sentinel)
多处理模块有自己的 Queue 版本,其中包含
close
方法。我不确定它在线程中如何工作,但值得一试。我不明白为什么它不应该同样工作:
from multiprocessing import Queue
q = Queue()
q.put(1)
q.get_nowait()
# 1
q.close()
q.get_nowait()
# ...
# IOError: handle out of range in select()
您可以捕获 IOError 作为关闭信号。
测试
from multiprocessing import Queue
from threading import Thread
def worker(q):
while True:
try:
item = q.get(timeout=.5)
except IOError:
print "Queue closed. Exiting thread."
return
except:
continue
print "Got item:", item
q = Queue()
for i in xrange(3):
q.put(i)
t = Thread(target=worker, args=(q,))
t.start()
# Got item: 0
# Got item: 1
# Got item: 2
q.close()
# Queue closed. Exiting thread.
虽然说实话,这与在 Queue.Queue 上设置标志没有太大区别。 multiprocessing.Queue 只是使用关闭的文件描述符作为标志:
from Queue import Queue
def worker2(q):
while True:
if q.closed:
print "Queue closed. Exiting thread."
return
try:
item = q.get(timeout=.5)
except:
continue
print "Got item:", item
q = Queue()
q.closed = False
for i in xrange(3):
q.put(i)
t = Thread(target=worker2, args=(q,))
t.start()
# Got item: 0
# Got item: 1
# Got item: 2
q.closed = True
# Queue closed. Exiting thread.
一个老问题,
self._sentinel = Object()
的变体将会起作用。在 2021 年重新审视这一点,我建议使用 concurrent.futures 并结合使用 None
作为你的哨兵:
# Note: this is Python 3.8+ code
import queue
import time
import functools
import random
from concurrent.futures import ThreadPoolExecutor
def worker(tup):
(q,i) = tup
print(f"Starting thread {i}")
partial_sum = 0
numbers_added = 0
while True:
try:
item = q.get()
if item is None:
# 'propagate' this 'sentinel' to anybody else
q.put(None)
break
numbers_added += 1
partial_sum += item
# need to pretend that we're doing something asynchronous
time.sleep(random.random()/100)
except Exception as e:
print(f"(warning) Thread {i} got an exception {e}, that shouldn't happen.")
break
print(f"Thread {i} is done, saw a total of {numbers_added} numbers to add up")
return partial_sum
MAX_RANGE = 1024
MAX_THREADS = 12
with ThreadPoolExecutor() as executor:
# create a queue with numbers to add up
(q := queue.Queue()).queue = queue.deque(range(MAX_RANGE))
# kick off the threads
future_partials = executor.map(worker, [(q,i) for i in range(MAX_THREADS)])
# they'll be done more or less instantly, but we'll make them wait
print("Threads launched with first batch ... sleeping 2 seconds")
time.sleep(2)
# threads are still available for more work!
for i in range(MAX_RANGE):
q.put(i)
print("Finished giving them another batch, this time we're not sleeping")
# now we tell them all to wrap it up
q.put(None)
# this will nicely catch the outputs
sum = functools.reduce(lambda x, y: x+y, future_partials)
print(f"Got total sum {sum} (correct answer is {(MAX_RANGE-1)*MAX_RANGE}")
# Starting thread 0
# Starting thread 1
# Starting thread 2
# Starting thread 3
# Starting thread 4
# Starting thread 5
# Starting thread 6
# Starting thread 7
# Starting thread 8
# Starting thread 9
# Starting thread 10
# Starting thread 11
# Threads launched with first batch ... sleeping 2 seconds
# Finished giving them another batch, this time we're not sleeping
# Thread 0 is done, saw a total of 175 numbers to add up
# Thread 3 is done, saw a total of 178 numbers to add up
# Thread 11 is done, saw a total of 173 numbers to add up
# Thread 4 is done, saw a total of 177 numbers to add up
# Thread 9 is done, saw a total of 169 numbers to add up
# Thread 1 is done, saw a total of 172 numbers to add up
# Thread 7 is done, saw a total of 162 numbers to add up
# Thread 10 is done, saw a total of 161 numbers to add up
# Thread 5 is done, saw a total of 169 numbers to add up
# Thread 2 is done, saw a total of 157 numbers to add up
# Thread 6 is done, saw a total of 169 numbers to add up
# Thread 8 is done, saw a total of 186 numbers to add up
# Got total sum 1047552 (correct answer is 1047552
请注意,事实上的“主线程”只需将
None
推入队列,类似于条件变量“信号”,线程都会拾取(并传播)该变量。
此外,这不使用比标准(线程安全)队列更重的多处理器
Queue
。上面的代码还有一个好处是可以轻松修改为使用 ProcessPoolExecutor
或两者的混合(在任何一种情况下,您都需要使用 multiprocessing.Queue
)。
(旁注:一般来说,如果需要类来解决任何给定 Python 代中的“基本”问题,那么在更现代的版本中通常会有新的选项。)
(第二个旁注:代码是Python 3.8+的唯一原因是因为我是赋值表达式的粉丝,与上面的旁注一致,它解决了如何从一个列表,而不必诉诸非功能性解决方案。)
我已经实现了与
asyncio.Queue
兼容的队列,可以使用 async for
循环进行迭代。请参阅queutils.IterableQueue
。
asyncio.Queue
界面AsyncIterable
支持:async for item in queue:
QueueDone
异常add_producer()
注册,并且在完成添加项目后必须用 finish()
通知队列count
属性来计算任务数量 task_done() pip install queutils
A Producer 是向队列添加项目的“进程”。生产者需要使用
add_producer()
协程注册到队列中。一旦生产者添加了它想要添加的所有项目,它就会用 finish()
通知队列
from queutils import IterableQueue
async def producer(
Q: IterableQueue[int], N: int
) -> None:
# Add a producer to add items to the queue
await Q.add_producer()
for i in range(N):
await Q.put(i)
# notify the queue that this producer does not add more
await Q.finish()
return None
Consumer 是一个“进程”,它使用
get()
协程从队列中获取项目。由于 IterableQueue
是 AsyncIterable
,因此可以迭代 async for
。
from queutils.iterablequeue import IterableQueue
async def consumer(Q: IterableQueue[int]):
"""
Consume the queue
"""
async for i in Q:
print(f"consumer: got {i} from the queue")
print(f"consumer: queue is done")