我正在尝试从 GetDayMin 类的“run”方法运行许多 python 线程(或单独的进程)。我希望线程或进程同时运行,40 秒后类实例写入其数据(来自每个线程/过程)并退出。虽然我可以启动每个线程而无需等待任何事情完成,但如果我使用 join 方法等待任何线程,则可能需要很长时间才能超时,因为连续的线程可能全部被阻塞。看起来线程和多处理的 join 方法都会挂起直到超时。
例如,如果在我的类中,我启动 5 个线程,然后按照线程创建的顺序等待 40 秒,第一个线程可能需要 40 秒才能超时,然后我们转到第二个线程,需要 40 秒才能超时,依此类推。最终可能会等待 5 个线程 200 秒。
我想要的是没有线程花费超过 40 秒的时间,因此整个类实例也最多持续 40 秒。如果这能让事情变得更容易,我愿意使用多处理而不是多线程。我真正预计的是大多数线程将在 10 秒内完成,但三四个线程可能会挂起,而我不想等待它们。我怎样才能做到这一点?
import multiprocessing
import pandas as pd
import random
import time
class GetDayMin:
def __init__(self):
self.results = pd.DataFrame() # Shared DataFrame to store results
def add_result(self, result):
self.results = self.results.append(result, ignore_index=True)
def process_function(self):
sleep_time = random.randint(30, 50) # Random sleep time between 30 to 50 seconds
time.sleep(sleep_time) # Pretend I'm calculating something
# Return the time slept to store in the results to simulate thread communication
return {'process_id': multiprocessing.current_process().pid, 'time_slept': sleep_time}
def run(self):
processes = []
for _ in range(30):
process = multiprocessing.Process(target=self.process_function)
processes.append(process)
# Start all processes
for process in processes:
process.start()
# Wait for all processes to finish or timeout after 40 seconds (each--unfortunately)
for process in processes:
process.join(timeout=40)
if process.is_alive():
process.terminate()
process.join() # wait on process--want this to be a collective 40 seconds
在我看来,您可以计算一个绝对时间
expiration_time
,当所有提交的任务应该完成时,如果有任何任务在该时间之后仍在运行,则应终止它们。您在 process.join()
上指定的时间量可以根据 expiration_time
和当前时间来计算:
...
def run(self):
processes = []
for _ in range(30):
process = multiprocessing.Process(target=self.process_function)
processes.append(process)
# Start all processes
for process in processes:
process.start()
# Wait for all processes to finish or timeout after 40 seconds
expiration_time = time.time() + 40
time_expired = False
for process in processes:
if time_expired:
process.terminate()
else:
wait_time = expiration_time - time.time()
if wait_time < 0:
process.terminate()
time_expired = True
else:
process.join(timeout=wait_time)
if process.is_alive():
process.terminate()
time_expired = True