我有一个带有定义的 celery 函数 -
@async_worker.task(ignore_result=True, queue="data_path")
def publish_msg_from_lock(self, mac: str, data: bytes, gateway_euid: str):
以前它不是一个芹菜任务,而是这样称呼的 -
n.publish_msg_from_lock(addr, unhexlify(payload), gateway_euid)
将其设为芹菜任务后,我以这种方式更新了调用 -
n.publish_msg_from_lock.apply_async(args=(addr, unhexlify(payload), gateway_euid),)
我也尝试过-
n.publish_msg_from_lock.apply_async(args=(addr, unhexlify(payload), gateway_euid), kwargs={})
和
n.publish_msg_from_lock.apply_async(kwargs={"mac": addr, "data": unhexlify(payload), "gateway_euid": gateway_euid})
但我收到错误 - ** 文件“/usr/local/lib/python3.8/dist-packages/celery/app/task.py”,第 531 行,在 apply_async 中 check_arguments(*(args 或 ()), **(kwargs 或 {})) 类型错误:publish_msg_from_lock() 缺少 1 个必需的位置参数:'gateway_euid' **
你能帮忙修复一下吗?
您的任务失败,因为它没有绑定。 要在函数签名中使用参数 self,您需要将 bind=True 添加到 celery 装饰器中。
示例:
@async_worker.task(ignore_result=True, queue="data_path", bind=True)
def publish_msg_from_lock(self, mac: str, data: bytes, gateway_euid: str):
它允许访问此处描述的一些芹菜功能https://docs.celeryq.dev/en/stable/userguide/tasks.html#bound-tasks
如果不需要参数 self 也可以删除。
@async_worker.task(ignore_result=True, queue="data_path")
def publish_msg_from_lock(mac: str, data: bytes, gateway_euid: str):