如何在全局超时下运行多个线程/过程

问题描述 投票:0回答:1

我正在尝试从 GetDayMin 类的“run”方法运行许多 python 线程(或单独的进程)。我希望线程或进程同时运行,40 秒后类实例写入其数据(来自每个线程/过程)并退出。虽然我可以启动每个线程而无需等待任何事情完成,但如果我使用 join 方法等待任何线程,则可能需要很长时间才能超时,因为连续的线程可能全部被阻塞。看起来线程和多处理的 join 方法都会挂起直到超时。

例如,如果在我的类中,我启动 5 个线程,然后按照线程创建的顺序等待 40 秒,第一个线程可能需要 40 秒才能超时,然后我们转到第二个线程,需要 40 秒才能超时,依此类推。最终可能会等待 5 个线程 200 秒。

我想要的是没有线程花费超过 40 秒的时间,因此整个类实例也最多持续 40 秒。如果这能让事情变得更容易,我愿意使用多处理而不是多线程。我真正预计的是大多数线程将在 10 秒内完成,但三四个线程可能会挂起,而我不想等待它们。我怎样才能做到这一点?

import multiprocessing
import pandas as pd
import random
import time

class GetDayMin:
    def __init__(self):
        self.results = pd.DataFrame()  # Shared DataFrame to store results

    def add_result(self, result):
        self.results = self.results.append(result, ignore_index=True)

    def process_function(self):
        sleep_time = random.randint(30, 50)  # Random sleep time between 30 to 50 seconds
        time.sleep(sleep_time)  # Pretend I'm calculating something
        
        # Return the time slept to store in the results to simulate thread communication
        return {'process_id': multiprocessing.current_process().pid, 'time_slept': sleep_time}

    def run(self):
        processes = []
        for _ in range(30):
            process = multiprocessing.Process(target=self.process_function)
            processes.append(process)

        # Start all processes
        for process in processes:
            process.start()

        # Wait for all processes to finish or timeout after 40 seconds (each--unfortunately)
        for process in processes:
            process.join(timeout=40)
            if process.is_alive():
                process.terminate()
                process.join()  # wait on process--want this to be a collective 40 seconds
python multithreading multiprocessing
1个回答
0
投票

在我看来,您可以计算一个绝对时间

expiration_time
,当所有提交的任务应该完成时,如果有任何任务在该时间之后仍在运行,则应终止它们。您在
process.join()
上指定的时间量可以根据
expiration_time
和当前时间来计算:

...
    def run(self):
        processes = []
        for _ in range(30):
            process = multiprocessing.Process(target=self.process_function)
            processes.append(process)

        # Start all processes
        for process in processes:
            process.start()

        # Wait for all processes to finish or timeout after 40 seconds
        expiration_time = time.time() + 40
        time_expired = False
        for process in processes:
            if time_expired:
                process.terminate()
            else:
                wait_time = expiration_time - time.time()
                if wait_time < 0:
                    process.terminate()
                    time_expired = True
                else:
                    process.join(timeout=wait_time)
                    if process.is_alive():
                        process.terminate()
                        time_expired = True
© www.soinside.com 2019 - 2024. All rights reserved.