我关于这个应用程序应该如何工作的心理模型是能够将调用add_job的进程与管理进程的进程分开。
当我在开始计划之前添加作业时,这可以正常工作,但是当我尝试将它分成单独的函数时它没有。为什么?我需要调用add_job后是否有提交函数
我使用sqlite和BlockingScheduler,虽然转移到postgresql进行调试,但对我的目的更有意义。
from datetime import datetime, timedelta
from time import sleep
import logging
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
import click
from crontab import CronTab
import pytz
logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)
jobstores = {
'default': SQLAlchemyJobStore(url='postgresql+psycopg2://myusername:mypassword@localhost/mydb')
}
executors = {
'default': ThreadPoolExecutor(5),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
sched = BackgroundScheduler(jobstores=jobstores, timezone=pytz.timezone('Australia/Sydney'))
def my_job(text):
now = datetime.now()
print(f'{now} text: {text}')
@click.group()
def cli():
pass
@click.command()
@click.option('--message', default='<BLANK>', help='to display when ')
@click.option('--crontab', default='*/1 * * * *', help='Timestamp of ')
def add_job(message, crontab):
# entry = CronTab('0 0 ? * TUE,THU')
entry = CronTab(crontab)
number_of_seconds = entry.next()
timestamp = datetime.now(pytz.timezone('Australia/Sydney')) + timedelta(seconds=number_of_seconds)
move_service_message = f'Service {message} will be moved @ {timestamp}'
sched.add_job(
my_job,
'date',
run_date=timestamp,
args=[move_service_message]
)
print('added job:' + move_service_message)
@click.command()
def start():
# this will wait forever
sched.start()
try:
# This is here to simulate application activity (which keeps the main thread alive).
while True:
sleep(10)
except (KeyboardInterrupt, SystemExit):
# Not strictly necessary if daemonic mode is enabled but should be done if possible
sched.shutdown()
cli.add_command(start)
cli.add_command(add_job)
if __name__ == "__main__":
exit_code = 0 # assume it will be okay
time_started = datetime.now()
try:
cli()
except Exception as e:
print('Exception:', e)
exit_code = 1
finally:
exit(exit_code)
我的包裹是最新的,包括
APScheduler 3.6.0
SQLAlchemy 1.3.1
看起来我认为它可能有可能不像FAQ中记录的那样。我仍然会使用它虽然只是改变了我的期望。