此脚本是我所拥有的较大脚本的非常简化的版本,如果有这样的数组值,我在这里要做的就是删除“ pip”。但是问题出在第二个周期来临(甚至更多)时,pip继续在屏幕上打印,我不想这样做。
import requests, threading, random, string, json, time, queue, re
num_worker_threads = int(input("Threads: "))
lista = ['asd', 'asdjk', 'pip', 'lasd', 'lol']
print(str(lista))
def do_work(i):
try:
print(i.strip())
print(str(lista))
if i == "pip":
lista.remove("pip")
except Exception as e:
print(e)
def worker():
while True:
item = q.get()
if item is not None:
do_work(item)
q.task_done()
q = queue.Queue()
threads = []
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
for i in range(2): # I have only put 2 cycles
for line in lista:
q.put(line)
q.join()
for i in range(num_worker_threads):
q.put(None)
for t in threads:
t.join()
您的代码具有竞争条件。种族之间:
"pip"
添加到队列两次,每次在lista
上的迭代中一次。"pip"
条目,则会将其从lista
中删除。如果第一个"pip"
条目已由工作线程处理并从第二个lista
进行迭代并添加之前从"pip"
中删除,则不会再次对其进行处理。但是从您对问题的描述中,我猜测由于GIL和代码不同部分的时序的某种组合,您可以一贯地将两个"pip"
的副本都添加到队列中,然后工作人员才能执行关于它的任何事情。但这不是严格保证的。如果在主线程中的lista
迭代之间添加了延迟,则可能会给工作线程腾出时间来按预期方式赶上并删除"pip"
:
for i in range(2):
for line in lista:
q.put(line)
time.sleep(1) # delay for a second
显然,这可能不是您真正想要做的,因为它仍然是一场比赛,只是现在[[可能由其他赛车手赢得的比赛(不能保证其他线程不会花费过多的时间)一样长)。更好的解决方案是设计代码,以使根本没有竞争条件。
一个想法可能是在列表的两次迭代之间join
队列,以便所有值都由工作程序处理,然后再重新添加它们。这与time.sleep
方法非常相似,但是更可靠,因为主线程将等待,直到需要工人在添加更多内容之前处理队列中的项目。[另一个想法可能是在将值放入队列之前,对主线程中的"pip"
条目进行过滤。只需执行if line != "pip"
检查即可!
pip
之前,队列不再次读取列表。您可以利用Event
来控制多线程程序的流程,让我们声明一个名为first_iteration_processsed
的事件,队列将等待该事件得到满足,以便它开始对它的第二次迭代列出列表的所有元素。成功从列表中删除点后,该事件将由您的一个线程设置。
代码示例:
import requests, threading, random, string, json, time, queue, re
from threading import Event
num_worker_threads = int(input("Threads: "))
lista = ['asd', 'asdjk', 'pip', 'lasd', 'lol']
print(str(lista))
iteration_processsed = Event()
iteration_processsed.set()
def do_work(i):
# global lista
try:
print(i.strip())
print(str(lista))
if i == "pip":
lista.remove("pip")
iteration_processsed.set()
print("Removed pip successfully")
except Exception as e:
print(e)
def worker():
while True:
item = q.get()
if item is not None:
do_work(item)
q.task_done()
q = queue.Queue()
threads = []
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
for i in range(2): # I have only put 2 cycles
iteration_processsed.wait()
print(f"Iteration {i} started")
iteration_processsed.clear()
for line in lista:
q.put(line)
q.join()
for i in range(num_worker_threads):
q.put(None)
for t in threads:
t.join()
让我们尝试一下:
Threads: 2 ['asd', 'asdjk', 'pip', 'lasd', 'lol'] Iteration 0 started asd ['asd', 'asdjk', 'pip', 'lasd', 'lol'] asdjk ['asd', 'asdjk', 'pip', 'lasd', 'lol'] pip ['asd', 'asdjk', 'pip', 'lasd', 'lol'] Removed pip successfully lol lasd ['asd', 'asdjk', 'lasd', 'lol'] Iteration 1 started ['asd', 'asdjk', 'lasd', 'lol'] asd asdjk ['asd', 'asdjk', 'lasd', 'lol'] ['asd', 'asdjk', 'lasd', 'lol'] lasd lol ['asd', 'asdjk', 'lasd', 'lol'] ['asd', 'asdjk', 'lasd', 'lol']
现在,您可以看到第二个迭代将永远不会在删除pip之前开始,当然,此处的实现非常特定于此情况,但是我想您可以将其调整为自己更通用的用途,并可能向其中添加更多事件锁定更多操作,使其以某些预定义的顺序执行。您可以从文档中了解有关事件的更多信息,或者本文也是一个好的开始https://www.bogotobogo.com/python/Multithread/python_multithreading_Event_Objects_between_Threads.php