等待所有期货结束的Python并行期货

问题描述 投票:0回答:1

我一直在尝试使用并发期货,但是由于它是一个非常新鲜的Python库,因此其文档并不是最好的。

我正在使用一个称为广义随机Petri网的数学模型,但为简化我的问题,它们很像图。在每个节点上,应该有一个函数可以执行并代表我们当前在图形上的位置,我们有标记。我将在下面提供一张图片,以便您了解模型的外观:simple example。黑色小点是标记,而矩形称为过渡,但它们与我的问题无关。总之,这就是我要在项目中执行的操作:

  • 我想让多个线程执行不同的任务,这意味着当令牌到达新节点时,该线程必须执行相应的功能;
  • 每个线程有几种状态,“空闲”,“完成”和“已占用”,它们确定它们是否准备好执行任务(空闲),执行任务(已占用)或刚刚完成任务(完成)。

我想发生的是,当线程完成任务时,我希望它执行代码状态为“已占用”的部分,但是,这仅在所有线程完成其任务时才会发生,这实际上意味着并发不是100%起作用。

在显示我的代码之前,我只想解释一些变量:

  • self .__ token_states是一个len =令牌数的列表,并用代表令牌当前状态的字符串填充(由索引表示);
  • self .__ futures是我用来存储要操纵的期货的列表。

代码如下:

'''

def decide_function_to_execute(self):
    counter = 1
    while counter != 3:
        with ThreadPoolExecutor(max_workers=self.__number_of_tokens) as executor:
            number_tokens = self.__gspn.get_number_of_tokens()
            for thread_number in range(number_tokens):
                if self.__token_states[thread_number] == 'Free':
                    place = self.look_for_token(thread_number + 1)
                    splitted_path = self.__place_to_function_mapping[place].split(".")

                    # On the first case we have path = FILE.FUNCTION
                    if len(splitted_path) <= 2:
                        function_location = splitted_path[0]
                        function_name = splitted_path[1]
                        module_to_exec = __import__(function_location)
                        function_to_exec = getattr(module_to_exec, function_name)

                    # On the second case we have path = FOLDER. ... . FILE.FUNCTION
                    else:
                        new_path = splitted_path[0]
                        for element in splitted_path[1:]:
                            if element != splitted_path[-1]:
                                new_path = new_path + "." + element

                        function_location = new_path
                        function_name = splitted_path[-1]
                        module_to_exec = __import__(function_location, fromlist=[function_name])
                        function_to_exec = getattr(module_to_exec, function_name)

                    self.__token_states[thread_number] = 'Occupied'
                    self.__futures[thread_number] = executor.submit(function_to_exec, thread_number)


                if self.__token_states[thread_number] == 'Occupied' and self.__futures[thread_number].done():
                    print("I am occupied and finished the task", thread_number + 1)
                    self.__token_states[thread_number] = 'Done'
                    continue

                if self.__token_states[thread_number] == 'Done':
                    print("I am done", thread_number + 1)
                    self.apply_policy()
                    self.__token_states[thread_number] = 'Free'

        counter = counter + 1

'''

您可以忽略从“ place = self.look_for_token(thread_number + 1)”开始直到“ function_to_exec = getattr(module_to_exec,function_name)”的代码行,因为这与我需要的项目的另一部分有关以便能够在Python文件中查找函数。

为什么线程只有在全部完成后才执行“如果self .__ token_states [thread_number] =='被占用'和self .__ futures [thread_number] .done():“?

我已经在这个问题上停留了两个星期,我无法取得进展。谁能帮忙?

谢谢

python multithreading python-multithreading concurrent.futures
1个回答
0
投票

简化的功能如下。 (这是伪python,因此并非每个函数都存在)

def decide_function_to_execute(self):
    counter = 1
    while counter < 3:
        with ThreadPoolExecutor(...) as executor:
            for thread in pool:
                if not thread.started():
                    thread.start()
                if thread.done():
                    do_something()
        counter += 1

第一次访问每个线程后,您将退出ThreadPoolExecutor上下文。当代码离开执行程序上下文时,将隐式调用executor.shutdown()。连接每个线程(请参见documentation)。然后,您增加计数器并再次输入执行程序上下文。因为每个线程都已经加入,所以thread.done()对于每个线程都是True。然后每个线程都执行do_something()。

© www.soinside.com 2019 - 2024. All rights reserved.