在我的项目中,我使用 Rocketry 来安排一些任务。我用于任务的功能对于所有这些都是相同的,但时间表不同。例如
def func(...):
...
for i in range(10, 20):
app_rocketry.task(name=f"func{i}", func=func, start_cond=every(f"{i**2} minutes"))
我的问题是我必须在场景中的运行时更改时间表。我需要做的是不改变计划,我的意思是,例如,如果该任务每 5 分钟运行一次,我会保持原样,但延迟下一次运行,因此强制。
例如任务将在 12:00 运行,并且计划每 5 分钟运行一次。在 12:03 我想将下一次运行更改为 12:08 而不是 12:05,之后的运行将基于此修改。我能做什么,主要问题是这是否可能?
class MyTask():
def __init__(self):
self.time_schedule = ''
def do_something(self):
print("TASK DONE")
def __call__(self):
# this magic method makes the instance callable as a function
self.do_something()
my_task = MyTask()
for i in range(10, 20):
my_task.time_schedule = f"{i**2} minutes"
# since 'my_task' is passed as a function the __call__ method will be called
app_rocketry.task(name=f"func{i}")(my_task)
我不确定我是否理解你的用例,但你可以重新安排任务,比如
app_rocketry = Rocketry()
@app_rocketry.task(daily.at('12:05'))
def my_task():
print('Hello World')
reschedule(5)
def parse_schedule_time(input:str)->tuple[int, int]:
pattern = r"(\d+) hours?(?:, (\d+) minutes?)?"
import re
match = re.search(pattern, input)
if match:
hours = int(match.group(1))
minutes = int(match.group(2))
return (hours, minutes)
else:
return None
def reschedule(add_minutes: int):
# The output looks like "task 'None' 12 hours, 5 minutes - 13 hours, 5 minutes"
current_condition = app_rocketry.session[my_task].start_cond
hours, minutes = parse_schedule_time(str(current_condition))
if hours is not None:
total_minutes = hours * 60 + minutes
total_minutes += add_minutes
new_hours, new_minutes = divmod(total_minutes, 60)
app_rocketry.session[my_task].start_cond = daily.at(f'{new_hours}:{new_minutes}')
其中最重要的部分是
app_rocketry.session[my_task].start_cond
这个变量为我们提供了启动条件,请注意任务名称
my_task
。
我不知道 Rocketry 是否为我们提供了一种更轻松地解析时间的方法,所以我写了一些简单的东西来做到这一点,但有一些边缘情况它没有考虑到。