我有以下与 APScheduler 的 AsyncIOScheduler 一起使用的代码。 计划作业应每 15 分钟运行一次(x:00、x:15、x:30 和 x:45)。
from apscheduler.schedulers.asyncio import AsyncIOScheduler
scheduler = AsyncIOScheduler()
import logging
logging.basicConfig(filename="test.log", level=logging.DEBUG, format='[%(asctime)s]%(message)s', datefmt="%H:%M")
logging.getLogger('apscheduler').setLevel(logging.DEBUG)
@scheduler.scheduled_job('cron', minute="*/15")
async def scheduled_job1():
logging.CRITICAL("Job 1 Ran!")
# Some async methods with Discord.py
scheduler.start()
问题是这项工作永远不会被触发。关键日志记录尚未完成。当我运行脚本时,APScheduler 会记录以下几行。
[14:16]Adding job tentatively -- it will be properly scheduled when the scheduler starts
[14:16]Added job "scheduled_job1" to job store "default"
但即使在触发时间,它也不会执行任何其他日志记录。
当我尝试使用
print scheduler.get_jobs()[0]
检查作业时,输出的“下一次运行”数据指向过去的时间。我猜想第一次计划触发的时间。
输出示例:
scheduled_job1 (trigger: cron[minute='*/15'], next run at: 2023-07-29 14:15:00 UTC)
当我使用
datetime
模块检查时,系统时间看起来是正确的。
我真的不明白为什么作业没有被触发。
您的代码中有错误:将行
logging.CRITICAL("Job 1 Ran!")
替换为 logging.critical("Job 1 Ran!")
logging.CRITICAL
是用于设置日志记录级别的常量。
logging.critical()
是一种向日志发送 critical
消息的方法。
您的代码可以在我的机器上运行。我稍微改变了它(这个版本每五分钟发送一次消息并添加了
asyncio.get_event_loop().run_forever()
):
import logging
import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler
scheduler = AsyncIOScheduler()
logging.basicConfig(level=logging.DEBUG, format='[%(asctime)s]%(message)s', datefmt="%H:%M:%S")
logging.getLogger('apscheduler').setLevel(logging.DEBUG)
@scheduler.scheduled_job('cron', minute="*/5")
async def scheduled_job1():
logging.critical("Job 1 Ran!")
# Some async methods with Discord.py
scheduler.start()
print(scheduler.get_jobs()[0])
try:
asyncio.get_event_loop().run_forever()
except (KeyboardInterrupt, SystemExit):
pass