我刚刚用Python写了一个任务队列,其作用是限制一次运行的任务数量。这与
Queue.Queue
略有不同,因为它不是限制队列中可以有多少项目,而是限制一次可以取出多少项目。它仍然使用无界的 Queue.Queue
来完成其工作,但它依赖于 Semaphore
来限制线程数量:
from Queue import Queue
from threading import BoundedSemaphore, Lock, Thread
class TaskQueue(object):
"""
Queues tasks to be run in separate threads and limits the number
concurrently running tasks.
"""
def __init__(self, limit):
"""Initializes a new instance of a TaskQueue."""
self.__semaphore = BoundedSemaphore(limit)
self.__queue = Queue()
self.__cancelled = False
self.__lock = Lock()
def enqueue(self, callback):
"""Indicates that the given callback should be ran."""
self.__queue.put(callback)
def start(self):
"""Tells the task queue to start running the queued tasks."""
thread = Thread(target=self.__process_items)
thread.start()
def stop(self):
self.__cancel()
# prevent blocking on a semaphore.acquire
self.__semaphore.release()
# prevent blocking on a Queue.get
self.__queue.put(lambda: None)
def __cancel(self):
print 'canceling'
with self.__lock:
self.__cancelled = True
def __process_items(self):
while True:
# see if the queue has been stopped before blocking on acquire
if self.__is_canceled():
break
self.__semaphore.acquire()
# see if the queue has been stopped before blocking on get
if self.__is_canceled():
break
callback = self.__queue.get()
# see if the queue has been stopped before running the task
if self.__is_canceled():
break
def runTask():
try:
callback()
finally:
self.__semaphore.release()
thread = Thread(target=runTask)
thread.start()
self.__queue.task_done()
def __is_canceled(self):
with self.__lock:
return self.__cancelled
Python 解释器将永远运行,除非我明确停止任务队列。这比我想象的要棘手得多。如果您查看
stop
方法,您会发现我在队列上设置了 canceled
标志、release
信号量和 put
无操作回调。最后两部分是必要的,因为代码可能会在 Semaphore
或 Queue
上阻塞。我基本上必须强制这些通过,以便循环有机会突破。
这段代码有效。当运行尝试并行运行数千个任务的服务时,此类非常有用。为了保持机器平稳运行并防止操作系统因活动线程过多而尖叫,此代码将限制任一时刻存活的线程数量。
我之前用 C# 写过类似的代码块。使该代码特别简洁的原因是 .NET 有一个称为
CancellationToken
的东西,几乎每个线程类都使用它。任何时候有一个阻塞操作,该操作都会使用一个可选的令牌。如果父任务被取消,任何使用该令牌阻塞的子任务也将立即被取消。与通过释放信号量或将值放入队列来“伪造”相比,这似乎是一种更干净的退出方式。
我想知道Python 中是否有等效的方法可以做到这一点?我绝对想使用线程而不是异步事件之类的东西。我想知道是否有一种方法可以使用两个
Queue.Queue
来实现相同的效果,其中一个具有最大大小,而另一个没有 - 但我仍然不确定如何处理取消。
我认为你的代码可以通过使用 poisoning 和
Thread.join()
来简化:
from Queue import Queue
from threading import Thread
poison = object()
class TaskQueue(object):
def __init__(self, limit):
def process_items():
while True:
callback = self._queue.get()
if callback is poison:
break
try:
callback()
except:
pass
finally:
self._queue.task_done()
self._workers = [Thread(target=process_items) for _ in range(limit)]
self._queue = Queue()
def enqueue(self, callback):
self._queue.put(callback)
def start(self):
for worker in self._workers:
worker.start()
def stop(self):
for worker in self._workers:
self._queue.put(poison)
while self._workers:
self._workers.pop().join()
未经测试。
为了简洁起见,我删除了评论。
此外,在这个版本中
process_items()
是真正私密的。
顺便说一句:
Queue
模块的全部目的是将您从可怕的锁定和事件中解放出来。
您似乎正在为队列中的每个任务创建一个新线程。这本身就是一种浪费,而且还会导致你遇到如何限制线程数量的问题。
相反,一种常见的做法是创建固定数量的工作线程,并让它们自由地从队列中拉取任务。要取消队列,您可以清除它,让工作人员保持活力以期待未来的工作。
我采纳了 Janne Karila 的建议并创建了一个线程池。这消除了对信号量的需要。问题是,如果您希望队列消失,则必须停止工作线程运行(只是我之前所做的一种变体)。新代码非常相似:
class TaskQueue(object):
"""
Queues tasks to be run in separate threads and limits the number
concurrently running tasks.
"""
def __init__(self, limit):
"""Initializes a new instance of a TaskQueue."""
self.__workers = []
for _ in range(limit):
worker = Thread(target=self.__process_items)
self.__workers.append(worker)
self.__queue = Queue()
self.__cancelled = False
self.__lock = Lock()
self.__event = Event()
def enqueue(self, callback):
"""Indicates that the given callback should be ran."""
self.__queue.put(callback)
def start(self):
"""Tells the task queue to start running the queued tasks."""
for worker in self.__workers:
worker.start()
def stop(self):
"""
Stops the queue from processing anymore tasks. Any actively running
tasks will run to completion.
"""
self.__cancel()
# prevent blocking on a Queue.get
for _ in range(len(self.__workers)):
self.__queue.put(lambda: None)
self.__event.wait()
def __cancel(self):
with self.__lock:
self.__queue.queue.clear()
self.__cancelled = True
def __process_items(self):
while True:
callback = self.__queue.get()
# see if the queue has been stopped before running the task
if self.__is_canceled():
break
try:
callback()
except:
pass
finally:
self.__queue.task_done()
self.__event.set()
def __is_canceled(self):
with self.__lock:
return self.__cancelled
如果你仔细看的话,我必须做一些会计才能杀死工人。我基本上等待
Event
的次数与工作人员的数量一样多。我clear
底层队列以防止工作人员以任何其他方式被取消。我也会在将每个虚假值放入队列后等待,因此一次只有一个工作人员可以取消。
我对此进行了一些测试,它似乎有效。消除对虚假值的需求仍然很好。
我已经实现了一个可以执行此操作的库。
安装:
pip install cantok
并使用:
from random import randint
from threading import Thread
from cantok import ConditionToken, CounterToken, TimeoutToken
counter = 0
def function(token):
global counter
while not token.cancelled:
counter += 1
token = ConditionToken(lambda: randint(1, 100_000) == 1984) + CounterToken(400_000, direct=False) + TimeoutToken(1)
thread = Thread(target=function, args=(token, ))
thread.start()
thread.join()
print(counter)
在此示例中,对循环施加了一些限制。它会一直持续到其中一个事件发生: 1 秒过去,将发生一个不太可能的随机事件(十亿分之一的概率的随机数将变成 1984),令牌将被轮询 40 万次或任务将被取消。
该库实现了取消令牌的标准功能+一些独特的功能,例如堆叠令牌的能力。