假设 django 项目中有一个端点
def my_view(request, my_id):
if is_request_valid(request):
logger = logging.getLogger('celery.task')
logger.info('Starting a task for a thing with ID %i', my_id)
my_cool_task.apply_async()
现在,根据芹菜文档这可能会记录,因为
有一个名为“celery.task”的特殊记录器,您可以继承此记录器以自动获取任务名称和唯一ID作为日志的一部分。
但它仅适用于任务内部。
另外,我确实在 settings.py 中设置了一些日志变量,然后重新分配了 celery 的日志记录。确实使用了 CELERY_WORKER_HIJACK_ROOT_LOGGER,但即使我使用根记录器,日志也不会出现在控制台中。使用 getLogger("celery") 也没有帮助。
这似乎是一件简单的事情,但我一整天都在努力做到这一点,请帮忙。
使用信号来捕获任务何时被触发。
您可以定义自定义信号并将其连接到 Celery 提供的
task_sent
信号:
from django.dispatch import Signal
from celery.signals import task_sent
task_started = Signal(providing_args=["task_name", "task_id", "kwargs"])
@receiver(task_sent, sender=os.getenv('DJANGO_SETTINGS_MODULE'))
def handle_task_sent(sender, task, task_id, task_sent_args, task_sent_kwargs, **kwargs):
logger = logging.getLogger('my_task_signals') # Use a custom logger name
logger.info(f'Task "{task.name}" (ID: {task_id}) started for thing with ID (if provided): {task_sent_kwargs.get("thing_id")}')
task_started.connect(handle_task_sent)
附注我添加了上面的代码,没有测试。只是作为如何实现它的想法。