我无法完全了解如何从apscheduler中的预定作业访问返回值。工作需要每天在不同的时间运行,我需要从今天的工作中获得返回值来安排明天的工作。
此链接(how to get return value from apscheduler jobs)似乎是此问题的最佳答案。它建议向调度程序添加一个监听器。我添加了一个监听器,但我不确定如何访问它的返回值。我可以访问附加到调度程序的侦听器,但我无法访问它们的输出。下面的代码中的侦听器job_runs()将在计划的作业运行时打印。
此外,我知道我需要访问一个JobExecutionEvent(https://apscheduler.readthedocs.io/en/latest/modules/events.html#module-apscheduler.events),它保存函数的返回值。
首先,我想访问的函数是run_all(),其中执行了一堆操作,但我只是为测试用例返回True。
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR, JobExecutionEvent
from datetime import datetime, timedelta
import logging
def run_all():
return True
def job_runs(event): # listener function
if event.exception:
print('The job did not run')
else:
print('The job completed @ {}'.format(datetime.now()))
def job_return_val(event): # listener function
return event.retval
然后,我设置调度程序,添加侦听器,并添加作业。将作业添加到调度程序后1分钟,触发器设置为运行该功能。
scheduler = BackgroundScheduler()
scheduler.add_listener(job_runs, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
scheduler.add_listener(job_return_val, EVENT_JOB_EXECUTED)
cron_args = datetime_to_dict(datetime.now() + timedelta(minutes=1))
job = scheduler.add_job(run_all, "cron", **cron_args)
接下来,我启动调度程序并打印预定作业。另外,我设置了日志记录,所以我知道调度程序在哪里。
test = scheduler.start()
scheduler.print_jobs()
logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)
启用日志记录后,调度程序将报告作业已运行并从调度程序中删除,正如我所期望的那样。 job_runs()将正确的输出打印到控制台。有了断点,我知道调用job_return_val()。但是,我不知道它返回的值被发送到哪里。该函数似乎在名为APScheduler的不同线程中调用。我对线程知之甚少,但这是有道理的。但是,我不明白该线程的输出何时返回到主线程。
最后,我尝试使用可从调度程序和作业的属性访问的代码,job_id,jobstore和scheduled_run_time来实例化JobExceptionEvent,但JobExceptionEvent似乎并不知道事件是在调度程序中运行的。由于前一段中描述的线程,这似乎也是有意义的。
任何帮助整理这将是伟大的!
监听器的返回值不会在任何地方使用(请参阅code),因此无论如何都无法返回任何值。如果您需要根据以前作业的值(通过事件对象在侦听器中获取)安排另一个作业,则必须在该侦听器中正确执行。
编辑:为了说明如何做(并证明它是可能的),请参阅此示例代码:
from datetime import datetime
import time
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
from apscheduler.schedulers.background import BackgroundScheduler
def tick():
print('Tick! The time is: %s' % datetime.now())
def tack():
print('Tack! The time is: %s' % datetime.now())
def listener(event):
if not event.exception:
job = scheduler.get_job(event.job_id)
if job.name == 'tick':
scheduler.add_job(tack)
if __name__ == '__main__':
scheduler = BackgroundScheduler()
scheduler.add_listener(listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
scheduler.add_job(tick, 'interval', seconds=5)
scheduler.start()
try:
while True:
time.sleep(1)
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()
输出:
(venv) pasmen@nyx:~/tmp/x$ python test.py
Tick! The time is: 2019-04-03 19:51:29.192420
Tack! The time is: 2019-04-03 19:51:29.195878
Tick! The time is: 2019-04-03 19:51:34.193145
Tack! The time is: 2019-04-03 19:51:34.194898
Tick! The time is: 2019-04-03 19:51:39.193207
Tack! The time is: 2019-04-03 19:51:39.194868
Tick! The time is: 2019-04-03 19:51:44.193223
Tack! The time is: 2019-04-03 19:51:44.195066
...
您需要的是需要实施stateful jobs feature。