我使用线程模块编写了一个 Threader 类,该模块创建多个线程来对元素列表执行任务(函数)。我将所述列表的切片传递到循环内的 threading.Thread 对象中,但是当启动创建的多个线程时,它们都开始使用相同的最后一个切片。
import threading
from typing import Callable, TypeAlias, Union, Generator
SubjectType: TypeAlias = Union[str, int]
def list_chunks(lst: list, size: int)-> Generator:
"""
Sublist generator. Splits a list into n sublist of specific size, yelding each sublist at a time.
:param lst: List to be splitted
:param size: size of each sublist
yields each sublist
"""
for i in range(0, len(lst), size):
yield lst[i:i + size]
class Threader():
def __init__(self, function: Callable, n_threads: int = 5):
"""
Instanciates a Threader object.
:param function:
:param name:
:param n_threads:
:param sleep:
"""
self.function = function
self.n_threads = n_threads
def create(self, targets: list, kwargs: dict = {}):
"""
Creates multiple threadind.Thread objects
:param targets: list of targets to be splitted between the threads.
:param kwargs: kwargs to be passed to threading.Thread object
"""
splits = int(len(targets)//self.n_threads) + 1
targets_splitted: list[SubjectType] = []
for sublist in list_chunks(targets, size=splits):
targets_splitted.append(sublist)
self.threads: list[threading.Thread] = []
for idx, sublist in enumerate(targets_splitted):
kwargs.update({'targets': sublist, 'thread_idx': idx})
thread_ = threading.Thread(target=self.function, kwargs=kwargs)
self.threads.append(thread_)
print(f'Thread {idx} created. Kwargs: {kwargs}')
def start_threads(self):
"""
Calls start() and join() for each thread.
"""
for thread in self.threads:
thread.start()
for thread in self.threads:
thread.join()
测试代码:
def simulate(targets: list, thread_idx: int) -> None:
for element in tqdm(targets, position=thread_idx, desc=f'Thread {thread_idx}'):
if element % 3 == 0:
time.sleep(1)
tests = [n+1 for n in range(25)]
sim_threads = Threader(function=simulate, n_threads=2)
sim_threads.create(targets=tests)
sim_threads.start_threads()
输出:
预计有两个线程在测试列表的不同部分上运行,但无法弄清楚出了什么问题。有人可以帮我吗?
问题是你的
kwargs
字典是一个可变对象,并且它不会仅仅因为你说它是线程的参数而“冻结” - 所有线程都接收 same 字典对象作为参数 - 并且当它们开始,这是他们看到的最后一次更新。
所以,首先,在 Python 中,永远不要这样做:
def create(self, targets: list, kwargs: dict = {}):
虽然这可能不是您遇到的特定情况下的问题,但它会告诉您:函数声明行在程序生命周期内执行一次,因此函数的这些默认参数是相同的可变对象 - 并且每当它们发生变化时,它们都会在函数调用之间保留更改。
改为这样做:
def create(self, targets: list, kwargs: dict | None = None):
if kwargs is None:
kwargs = {} # ensures the creation of a fresh dictionary each function call
然后,使用
kwargs
字典的副本创建每个线程,而不是直接传递它:
for idx, sublist in enumerate(targets_splitted):
kwargs.update({'targets': sublist, 'thread_idx': idx})
thread_ = threading.Thread(target=self.function, kwargs=kwargs.copy())
self.threads.append(thread_)
...
.copy()
调用会创建字典的即时快照,并且该对象在启动时在 Thread 实例中进行注释。