from flask import Flask
from celery.schedules import crontab
from celery import Celery
from datetime import datetime
app = Flask(__name__)
today = datetime.now().strftime("%d/%m")
@celery.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
emails = DB.get_email_today(today) #getting a list of birthday emails today
for row in emails:
email = row['email']
first_name = row['first_name']
last_name = row['last_name']
sender.add_periodic_task(
crontab(hour=12, minute=20),
sender.send_email.s(email, first_name, last_name),
)
问题在于
today
仅在应用程序启动时计算,而不是每天更新。您正在应用程序的全局范围内定义
today
变量,因此它是在您首次运行应用程序时设置的。
您可以添加任务/路线以获取任务中的当前日期,然后使用 @app.on_after_configure.connect
安排它
@app.task
def task():
current_datetime = datetime.now()
emails = DB.get_email_today()
...
*Your desired code logic*
@app.on_after_configure.connect
def schedule_task(sender, **kwargs):
sender.add_periodic_task(crontab(hour=12, minute=20), task.s(), "Send
Birthday Mails")
....
参考这里