多线程。使用类内循环创建的线程无法存储正确的参数

问题描述 投票:0回答:1

我使用线程模块编写了一个 Threader 类,该模块创建多个线程来对元素列表执行任务(函数)。我将所述列表的切片传递到循环内的 threading.Thread 对象中,但是当启动创建的多个线程时,它们都开始使用相同的最后一个切片。

import threading
from typing import Callable, TypeAlias, Union, Generator


SubjectType: TypeAlias = Union[str, int]

    
def list_chunks(lst: list, size: int)-> Generator:
    """
    Sublist generator. Splits a list into n sublist of specific size, yelding each sublist at a time.

    :param lst: List to be splitted
    :param size: size of each sublist
    yields each sublist 
    """
    for i in range(0, len(lst), size):
        yield lst[i:i + size]

class Threader():
    
    def __init__(self, function: Callable, n_threads: int = 5):
        """
        Instanciates a Threader object.

        :param function:
        :param name:
        :param n_threads:
        :param sleep:     
        """
        self.function = function        
        self.n_threads = n_threads
                        

    def create(self, targets: list, kwargs: dict = {}):
        """
        Creates multiple threadind.Thread objects

        :param targets: list of targets to be splitted between the threads.
        :param kwargs: kwargs to be passed to threading.Thread object
        """
        splits = int(len(targets)//self.n_threads) + 1
        targets_splitted: list[SubjectType] = []
        for sublist in list_chunks(targets, size=splits):
            targets_splitted.append(sublist)        

        self.threads: list[threading.Thread] = []
        for idx, sublist in enumerate(targets_splitted):            
            kwargs.update({'targets': sublist, 'thread_idx': idx})
            thread_ = threading.Thread(target=self.function, kwargs=kwargs)
            self.threads.append(thread_)
            print(f'Thread {idx} created. Kwargs: {kwargs}')
    

    def start_threads(self):
        """
        Calls start() and join() for each thread.
        """
        for thread in self.threads:
            thread.start()
                    
        for thread in self.threads:
            thread.join()

测试代码:

def simulate(targets: list, thread_idx: int) -> None:
for element in tqdm(targets, position=thread_idx, desc=f'Thread {thread_idx}'):
    if element % 3 == 0:
        time.sleep(1)
tests = [n+1 for n in range(25)]
sim_threads = Threader(function=simulate, n_threads=2)
sim_threads.create(targets=tests)
sim_threads.start_threads()

输出:

预计有两个线程在测试列表的不同部分上运行,但无法弄清楚出了什么问题。有人可以帮我吗?

python multithreading subclass python-multithreading
1个回答
0
投票

问题是你的

kwargs
字典是一个可变对象,并且它不会仅仅因为你说它是线程的参数而“冻结” - 所有线程都接收 same 字典对象作为参数 - 并且当它们开始,这是他们看到的最后一次更新。

所以,首先,在 Python 中,永远不要这样做:

def create(self, targets: list, kwargs: dict = {}):

虽然这可能不是您遇到的特定情况下的问题,但它会告诉您:函数声明行在程序生命周期内执行一次,因此函数的这些默认参数是相同的可变对象 - 并且每当它们发生变化时,它们都会在函数调用之间保留更改。

改为这样做:

def create(self, targets: list, kwargs: dict | None = None):
     if kwargs is None:
          kwargs = {}  # ensures the creation of a fresh dictionary each function call

然后,使用

kwargs
字典的副本创建每个线程,而不是直接传递它:

        for idx, sublist in enumerate(targets_splitted):            
            kwargs.update({'targets': sublist, 'thread_idx': idx})
            thread_ = threading.Thread(target=self.function, kwargs=kwargs.copy())
            self.threads.append(thread_)
            ...

.copy()
调用会创建字典的即时快照,并且该对象在启动时在 Thread 实例中进行注释。

© www.soinside.com 2019 - 2024. All rights reserved.