如何从apscheduler访问返回值?

问题描述 投票:1回答:2

我无法完全了解如何从apscheduler中的预定作业访问返回值。工作需要每天在不同的时间运行,我需要从今天的工作中获得返回值来安排明天的工作。

此链接(how to get return value from apscheduler jobs)似乎是此问题的最佳答案。它建议向调度程序添加一个监听器。我添加了一个监听器,但我不确定如何访问它的返回值。我可以访问附加到调度程序的侦听器,但我无法访问它们的输出。下面的代码中的侦听器job_runs()将在计划的作业运行时打印。

此外,我知道我需要访问一个JobExecutionEvent(https://apscheduler.readthedocs.io/en/latest/modules/events.html#module-apscheduler.events),它保存函数的返回值。

首先,我想访问的函数是run_all(),其中执行了一堆操作,但我只是为测试用例返回True。

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR, JobExecutionEvent
from datetime import datetime, timedelta
import logging


def run_all():
    return True


def job_runs(event):  # listener function
    if event.exception:
        print('The job did not run')
    else:
        print('The job completed @ {}'.format(datetime.now()))


def job_return_val(event):  # listener function
    return event.retval

然后,我设置调度程序,添加侦听器,并添加作业。将作业添加到调度程序后1分钟,触发器设置为运行该功能。

  scheduler = BackgroundScheduler()
  scheduler.add_listener(job_runs, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
  scheduler.add_listener(job_return_val, EVENT_JOB_EXECUTED)
  cron_args = datetime_to_dict(datetime.now() + timedelta(minutes=1))
  job = scheduler.add_job(run_all, "cron", **cron_args)

接下来,我启动调度程序并打印预定作业。另外,我设置了日志记录,所以我知道调度程序在哪里。

  test = scheduler.start()
  scheduler.print_jobs()
  logging.basicConfig()
  logging.getLogger('apscheduler').setLevel(logging.DEBUG)

启用日志记录后,调度程序将报告作业已运行并从调度程序中删除,正如我所期望的那样。 job_runs()将正确的输出打印到控制台。有了断点,我知道调用job_return_val()。但是,我不知道它返回的值被发送到哪里。该函数似乎在名为APScheduler的不同线程中调用。我对线程知之甚少,但这是有道理的。但是,我不明白该线程的输出何时返回到主线程。

最后,我尝试使用可从调度程序和作业的属性访问的代码,job_id,jobstore和scheduled_run_time来实例化JobExceptionEvent,但JobExceptionEvent似乎并不知道事件是在调度程序中运行的。由于前一段中描述的线程,这似乎也是有意义的。

任何帮助整理这将是伟大的!

python apscheduler
2个回答
0
投票

监听器的返回值不会在任何地方使用(请参阅code),因此无论如何都无法返回任何值。如果您需要根据以前作业的值(通过事件对象在侦听器中获取)安排另一个作业,则必须在该侦听器中正确执行。

编辑:为了说明如何做(并证明它是可能的),请参阅此示例代码:

from datetime import datetime
import time

from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
from apscheduler.schedulers.background import BackgroundScheduler


def tick():
    print('Tick! The time is: %s' % datetime.now())


def tack():
    print('Tack! The time is: %s' % datetime.now())


def listener(event):
    if not event.exception:
        job = scheduler.get_job(event.job_id)
        if job.name == 'tick':
            scheduler.add_job(tack)


if __name__ == '__main__':
    scheduler = BackgroundScheduler()
    scheduler.add_listener(listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
    scheduler.add_job(tick, 'interval', seconds=5)
    scheduler.start()

    try:
        while True:
            time.sleep(1)
    except (KeyboardInterrupt, SystemExit):
        scheduler.shutdown()

输出:

(venv) pasmen@nyx:~/tmp/x$ python test.py 
Tick! The time is: 2019-04-03 19:51:29.192420
Tack! The time is: 2019-04-03 19:51:29.195878
Tick! The time is: 2019-04-03 19:51:34.193145
Tack! The time is: 2019-04-03 19:51:34.194898
Tick! The time is: 2019-04-03 19:51:39.193207
Tack! The time is: 2019-04-03 19:51:39.194868
Tick! The time is: 2019-04-03 19:51:44.193223
Tack! The time is: 2019-04-03 19:51:44.195066
...

0
投票

您需要的是需要实施stateful jobs feature

© www.soinside.com 2019 - 2024. All rights reserved.