如何返回线程的第一个参数以及线程从队列返回?

问题描述 投票:1回答:1

下面我有Threader类,我用它来线程化任意数量的函数,然后在加入线程后返回一个线程函数返回的列表。我想要的一个功能是返回字典而不是列表的选项。通过要求线程函数返回元组,我找到了一种方法。然后,元组的第一个值将用于密钥。我想改为使用线程函数的第一个参数作为键。

我了解到线程可以被命名,所以我将名称设置为线程函数在线程创建时的第一个参数。线程本身可以使用getName()访问该名称,但是如何从队列中获取下一个线程的名称为.get()? (如何访问队列中的线程对象?)

我只需要按照第一段所述的方式工作,所以我愿意采用其他方法来达到同样的效果。

from queue import Queue
from threading import Thread

class Threader(object):
    """thread arbitrary number of functions, then block when results wanted

    Attributes:
        thread_queue (Queue): The queue that holds the threads.
        threads (Thread list): Threads of functions added with add_thread.
    """

    def __init__(self):
        self.thread_queue = Queue()
        self.threads = []


    def add_thread(self, func, args):
        """add a function to be threaded"""
        self.threads.append(Thread(
            name=args[0], # Custom name using function's first argument
            target=lambda queue, func_args: queue.put(func(*func_args)),
            args=(self.thread_queue, args)))
        self.threads[-1].start()


    def get_results(self, return_dict=False):
        """block threads until all are done, then return their results

        Args:
            return_dict (bool): Return a dict instead of a list. Requires 
                each thread to return a tuple with two values.
        """

        for thread in self.threads:
            thread.join()

        if return_dict:
            results = {}
            while not self.thread_queue.empty():
                # Setting the dictionary key with returned tuple
                # How to access thread's name?
                key, value = self.thread_queue.get()
                results[key] = value
        else:
            results = []
            while not self.thread_queue.empty():
                results.append(self.thread_queue.get())

        return results

用法示例:

threader = Threader()
for region in regions:
    # probe_region is a function, and (region, tag_filter) are args for it
    threader.add_thread(probe_region, (region, tag_filter))
results = threader.get_results()

编辑:我目前使用的是什么:

我的清理和改进版本的Mackay's answer(返回按线程插入排序):

from queue import Queue
from threading import Thread

class Threader(object):
    """thread arbitrary number of functions, then block when results wanted

    Attributes:
        result_queue (Queue): Thread-safe queue that holds the results.
        threads (list[Thread]): Threads of functions added with add_thread.
    """

    def __init__(self):
        self.result_queue = Queue()
        self.threads = []


    def worker(self, func, fargs):
        """insert threaded function into queue to make its return retrievable

        The index of the thread and the threaded function's first arg are 
        inserted into the queue, preceding the threaded function itself.

        Args: See add_thread
        """
        return self.result_queue.put([
            len(self.threads), fargs[0], func(*fargs)])


    def add_thread(self, func, fargs):
        """add a function to be threaded

        Args:
            func (function): Function to thread.
            fargs (tuple): Argument(s) to pass to the func function.

        Raises:
            ValueError: If func isn't callable, or if fargs not a tuple.
        """

        if not callable(func):
            raise ValueError("func must be a function.")
        if not isinstance(fargs, tuple) or not fargs:
            raise ValueError("fargs must be a non-empty tuple.")

        self.threads.append(Thread(target=self.worker, args=(func, fargs)))
        self.threads[-1].start()


    def get_results(self, return_dict=False):
        """block all threads, sort by thread index, then return thread results

        Args:
            return_dict (bool): Return dict instead of list. Threads' 
                function's first argument used as key.
        """

        for thread in self.threads:
            thread.join()

        thread_data = []
        while not self.result_queue.empty():
            thread_data.append(self.result_queue.get())
        thread_data.sort(key=lambda thread_index: thread_index[0])

        if return_dict:
            results = {}
            for _, key, thread_return in thread_data:
                results[key] = thread_return
        else:
            results = []
            for _, _, thread_return in thread_data:
                results.append(thread_return)

        return results
python multithreading python-3.x python-multithreading
1个回答
1
投票

如果您只想实现第一段中概述的结果,即使用第一个参数作为键,您可以修改代码,如下所示:

from queue import Queue
from threading import Thread

class Threader(object):
    """thread arbitrary number of functions, then block when results wanted

    Attributes:
        queue (Queue): The thread-safe queue that holds the results.
        threads (Thread list): Threads of functions added with add_thread.
    """

    def __init__(self):
        self.results_queue = Queue()
        self.threads = []

    def worker(self, func, args):
        """run the function and save its results"""
        result = func(*args)
        # save result, along with a key for use later if needed (first argument)
        self.results_queue.put([args[0], result])

    def add_thread(self, func, fargs):
        """add a function to be threaded"""
        self.threads.append(Thread(target = self.worker, args = (func, fargs)))
        self.threads[-1].start()

    def get_results(self, return_dict=False):
        """block threads until all are done, then return their results

        Args:
            return_dict (bool): Return a dict instead of a list. Requires 
                each thread to return a tuple with two values.
        """
        for thread in self.threads:
            thread.join()

        if return_dict:
            results = {}
            while not self.results_queue.empty():
                # set the dictionary key as first argument passed to worker
                key, value = self.results_queue.get()
                results[key] = value
        else:
            results = []
            while not self.results_queue.empty():
                # set the dictionary key as first argument passed to worker
                key, value = self.results_queue.get()
                results.append(value)

        return results

注意,没有必要将线程本身存储在队列中,只需将结果存储在队列中。 (队列是存储结果的好选择,因为它处理访问同步问题)。

worker()函数中,您可以根据自己的喜好生成密钥;在上面的代码中,我已经使用了你建议的第一个参数。

一个用法示例是:

def foo(*args):
    return "foo() " + repr(len(args))

def bar(*args):
    return "bar() " + repr(len(args))

def baz(*args):
    return "baz() " + repr(len(args))

threader = Threader()

threader.add_thread(foo, ["foo_key", "a"])
threader.add_thread(bar, ["bar_key", "b", "c"])
threader.add_thread(baz, ["baz_key", "d", "e", "f"])

print (threader.get_results(True))

这给出了输出:

{'foo_key': 'foo() 2', 'bar_key': 'bar() 3', 'baz_key': 'baz() 4'}

希望这可能有所帮助。

© www.soinside.com 2019 - 2024. All rights reserved.