我正在尝试找到一种方法来获取触发芹菜节拍来触发任务的时间条件。
由于所有 celery 工作人员都很忙,获取
datetime.now()
通常会偏离 celery 节拍任务排队的时间。
例如:我设置每天12:30执行任务,但由于当时所有工人都很忙,所以任务在12:31开始运行。
我需要知道哪个时间条件触发了任务,无论任务执行的时间如何。
编辑:
这就是我定义周期性任务的方式:
CELERYBEAT_SCHEDULE = {
'periodic_clear_task': {
'task': 'app.tasks.periodic_clear_task',
'schedule': crontab(hour=2),
'args': ()
},
'periodic_update_task': {
'task': 'app.tasks.periodic_update_task',
'schedule': crontab(minute='00,30'),
'args': ()
},
}
与此问题中的另一个答案类似,我使用了过期(秒)来解决此问题。使用 Celery Beat 时间表时,可以指定如下:
limit = 60
CELERYBEAT_SCHEDULE = {
...,
'periodic_update_task': {
'task': 'app.tasks.periodic_update_task',
'schedule': crontab(minute='00,30'),
'args': (),
'options': {'expires': 60 * 60 * 24 * limit }
},
}
这里的限制很重要,因为现在我们分配了一个到期值,如果达到这个限制,任务将不会被执行。因此,需要选择足够大的值,以便根据需要处理任务。请记住,到期值以秒为单位指定。更多信息这里。
当指定expires参数时,Celery会自动将此秒值转换为内部使用的日期时间值。要访问函数中的过期时间,您必须绑定该方法,然后从任务请求对象中检索。该任务对象应如下所示。
<Context: {'lang': 'py', 'task': 'app.tasks.periodic_update_task', 'expires': '2021-10-23T21:11:26.031443+01:00', 'id': 'ef0e301a-b9b3-4930-8fa3-ca3fdf6b36cb', ... }>
一旦有时间,您可以进行简单的计算来重新获得触发时间。
@app.task(bind=True)
def periodic_update_task(self):
trigger_time = parser.parse(self.request.expires)-timedelta(days=limit))
我遇到了同样的问题,我想在周期性 celery 任务中使用任务触发时间而不是执行时间。我在芹菜任务中找不到具有任务触发时间的确切字段。作为一种解决方法,我使用了任务的过期时间。
task_expires_day =30
@app.on_after_configure.connect
def df_beat_setup(sender, **kwargs):
pipeline_config = {}
sender.add_periodic_task(
crontab(
minute=0,
hour=8,
),
df_scheduler_task.s(),
args=(pipeline_config),
name="df_trigger",
queue="df_scheduler_queue",
expires=60*60*24*task_expires_day,
options={"time": datetime.now()}
)
@app.task(name="df_scheduler_queue", bind=True, acks_late=True)
def df_scheduler_task(task: "celery.Task", pipeline_config: Dict, time: str) -> dict:
task_trigger_time = parser.parse(task.request.expires)-timedelta(days=task_expires_day))
...
after_task_publish信号可能会有所帮助。
from django.core.cache import cache
from django.utils import timezone
from celery import Celery
from celery.signals import after_task_publish
app = Celery()
app.conf.beat_schedule = {
"test-every-minute": {
"task": "xxx.tasks.test",
"schedule": 60
}
}
@after_task_publish.connect
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
info = headers if "task" in headers else body
task_id = info["id"]
trigger_time = timezone.now()
cache.set(task_id, trigger_time) # cache trigger_time
print(f"task {task_id} sent at {trigger_time}")
@app.task(bind=True)
def test(self):
execute_time = timezone.now()
task_id = self.request.id
print(f"task {task_id} executed at {execute_time}")
trigger_time = cache.get(task_id) # get trigger_time
print(f"task {task_id} triggered at {trigger_time}")
Beat
[2023-07-31 02:17:40,573: INFO/MainProcess] Scheduler: Sending due task test-every-minute (xxx.tasks.test)
[2023-07-31 02:17:40,576: WARNING/MainProcess] task 041af551-01e7-45f8-931f-f3b2fbbad14f sent at 2023-07-31 02:17:40.575942+00:00
Worker
[2023-07-31 02:17:40,582: WARNING/MainProcess] task 041af551-01e7-45f8-931f-f3b2fbbad14f executed at 2023-07-31 02:17:40.582441+00:00
[2023-07-31 02:17:40,585: WARNING/MainProcess] task 041af551-01e7-45f8-931f-f3b2fbbad14f triggered at 2023-07-31 02:17:40.575942+00:00
[2023-07-31 02:17:40,597: INFO/MainProcess] Task rule.tasks.tasks.test[041af551-01e7-45f8-931f-f3b2fbbad14f] succeeded in 0.015511923003941774s: None