我想并行运行多个线程。每个线程从任务队列中选取一个任务并执行该任务。
from threading import Thread
from Queue import Queue
import time
class link(object):
def __init__(self, i):
self.name = str(i)
def run_jobs_in_parallel(consumer_func, jobs, results, thread_count,
async_run=False):
def consume_from_queue(jobs, results):
while not jobs.empty():
job = jobs.get()
try:
results.append(consumer_func(job))
except Exception as e:
print str(e)
results.append(False)
finally:
jobs.task_done()
#start worker threads
if jobs.qsize() < thread_count:
thread_count = jobs.qsize()
for tc in range(1,thread_count+1):
worker = Thread(
target=consume_from_queue,
name="worker_{0}".format(str(tc)),
args=(jobs,results,))
worker.start()
if not async_run:
jobs.join()
def create_link(link):
print str(link.name)
time.sleep(10)
return True
def consumer_func(link):
return create_link(link)
# create_link takes a while to execute
jobs = Queue()
results = list()
for i in range(0,10):
jobs.put(link(i))
run_jobs_in_parallel(consumer_func, jobs, results, 25, async_run=False)
现在发生的情况是,假设作业队列中有 10 个链接对象,当线程并行运行时,多个线程正在执行相同的任务。我怎样才能防止这种情况发生? 注意 - 上面的示例代码没有上面描述的问题,但我有完全相同的代码,除了 create_link 方法做了一些复杂的事情。
我认为你需要的是一个锁对象(docs,tutorial+examples)。如果您创建此类对象的实例,您可以“锁定”代码的某些部分,确保一次只有一个线程执行该部分。
我想在你的情况下你想锁定线路
job = jobs.get()
。
首先,您必须在所有线程都可以访问它的范围内创建锁。 (您不希望每个线程都有一个锁,而是希望所有线程都有一个锁。这意味着在获取锁之前在线程中创建锁是行不通的)
import threading
lock = threading.Lock()
然后你可以在你的线路上使用它,例如:
lock.acquire()
job = jobs.get()
lock.release()
或
with lock:
job = jobs.get()
第一个到达
acquire()
的线程将锁定该锁。尝试 acquire()
锁的其他线程将暂停,直到通过调用 release()
再次解锁锁。
多线程时应该不需要锁。队列是线程安全的。请参阅 - https://docs.python.org/3.8/library/queue.html#module-Queue
队列模块实现了多生产者、多消费者队列。它 当信息必须被传递时,在线程编程中特别有用 在多个线程之间安全地交换。其中的 Queue 类 模块实现了所有必需的锁定语义。