APScheduler失火测试

问题描述 投票:1回答:1

我正在尝试使用APScheduler测试错误执行的任务,但是重新启动APScheduler时看不到丢失的任务在运行。我已经将APScheduler配置如下:

scheduler.py

def configure_scheduler():
    jobstores = {
        'default': SQLAlchemyJobStore(url=config('DATABASE_URL'))
    }
    sched = BlockingScheduler()
    sched.configure(jobstores=jobstores)
    sched.add_job(
        test_task,
        id='test_task', 
        'interval',  
        hours=1, 
        coalesce=True, 
        max_instances=1,
        misfire_grace_time=360, 
        replace_existing=True
    )
    return sched

if __name__ == '__main__':
    scheduler = configure_scheduler()
    scheduler.start()

[第一次启动调度程序时,test_task被添加到Postgres数据库中的apscheduler_jobs表中,并且从启动调度程序起的next_run_time为一小时。然后,我尝试通过以下方法测试失火:

  1. 将数据库中的next_run_time更改为当前时间
  2. 等待15秒
  3. 启动调度程序

当我按照此步骤进行操作时,next_run_time再次设置为距当前时间的一小时。 next_run_time似乎在SQLAlchemy作业存储的update_job方法中进行了更新。我已经看到一个与持久性作业存储任务相关的update_job在断火后未运行。我见过的大多数similar question other的解决方案是将questions参数添加到misfire_grace_time。我已经按照上面的配置尝试过此操作,但是在调度程序启动时没有运气错过任何作业。我是否缺少与add_jobreplace_existing参数交互方式有关的内容?我是否需要手动检查是否有任何作业的misfire_grace_time过去,然后在启动调度程序之前运行这些作业?

我正在使用APScheduler库的v3.6.1。

[有关其他情况,我将在Heroku上部署调度程序,并且尝试解决每天至少发生一次的Heroku的next_run_time

python apscheduler
1个回答
0
投票

[与AlexSchöler的创建者AlexGrönholm在APScheduler Gitter聊天室中的automatic dyno cycling之后,我能够确定该作业正在数据库中被“覆盖”,因为我对some discussion的呼叫中有replace_existing=True ]。这将导致调度程序每次启动时都替换作业存储中的作业。

我的解决方法

  1. 调度程序启动前,请检查add_job表中的现有作业。

  2. 对于数据库中的每个作业,请对照当前时间检查apscheduler_jobs。如果next_run_time是过去的,请立即运行作业。

  3. 使用next_run_time像以前一样安排作业。

  4. 启动调度程序。

© www.soinside.com 2019 - 2024. All rights reserved.