如何在Python中迭代Queue.Queue项目?

问题描述 投票:0回答:5

有谁知道迭代

Queue.Queue
的元素而不将它们从队列中删除的 Pythonic 方法。我有一个生产者/消费者类型的程序,其中要处理的项目是使用
Queue.Queue
传递的,我希望能够打印剩余的项目是什么。有什么想法吗?

python queue producer-consumer
5个回答
52
投票

您可以循环访问底层数据存储的副本:

for elem in list(q.queue)

尽管这绕过了队列对象的锁,但列表复制是一个原子操作,应该可以正常工作。

如果您想保留锁,为什么不将所有任务从队列中取出,复制列表,然后将它们放回去。

mycopy = []
while True:
     try:
         elem = q.get(block=False)
     except Empty:
         break
     else:
         mycopy.append(elem)
for elem in mycopy:
    q.put(elem)
for elem in mycopy:
    # do something with the elements

24
投票

列出队列元素而不使用它们:

>>> from Queue import Queue
>>> q = Queue()
>>> q.put(1)
>>> q.put(2)
>>> q.put(3)
>>> print list(q.queue)
[1, 2, 3]

操作后,您仍然可以处理它们:

>>> q.get()
1
>>> print list(q.queue)
[2, 3]

9
投票

您可以子类化

queue.Queue
以线程安全的方式实现这一点:

import queue


class ImprovedQueue(queue.Queue):
    def to_list(self):
        """
        Returns a copy of all items in the queue without removing them.
        """

        with self.mutex:
            return list(self.queue)

1
投票

您可以在打印元素之前将双端队列转换为列表,以便您可以轻松地迭代它。

from collections import deque

d = deque([7,9,3,5])

d.append(2)
d.appendleft(1)
d.append(10)
d.pop()

for elem in list(d):
    print(elem, end=" ")

#Output: 1 7 9 3 5 2 

0
投票

我已经实现了一个支持

IterableQueue(asyncio.Queue)
迭代的
async for
。请参阅 GitHub 中的 pyutils

from pyutils import IterableQueue
from asyncio import run, Task, create_task

async def producer(Q: IterableQueue[int], n: int) -> None:
    await Q.add_producer(N=1) 
    for i in range(n):
        await Q.put(i)
    await Q.finish()
    return None

async def amain():
    q : IterableQueue[int] = IterableQueue(maxsize=5)
    task : Task = create_task(producer(q, 10))
    # Iterate over queue items
    async for i in q:
        print(f"Got {i}")

if __name__ == "__main__":
    run(amain())

IterableQueue()
计算带有
add_producer()
的生产者。一旦最后一个生产者完成(
finish()
),那么一个哨兵值(
None
)就会被添加到标记队列结束的队列中。

© www.soinside.com 2019 - 2024. All rights reserved.