这是我的芹菜配置:
celery = Celery(__name__)
celery.conf.broker_url = os.getenv("BROKER_URL")
celery.conf.result_backend = os.getenv("RESULT_BACKEND")
celery.conf.task_routes = ([
("worker_queue.gevent_tasks.*", {"queue": "gevent_queue"}),
("worker_queue.prefork_tasks.*", {"queue": "prefork_queue"})
])
这个工作方法实现(docx_file.py):
def mask_docx_file(upload_file, mask_words, upload_method, mask_method, key, minio_path):
return "111"
这是芹菜任务(gevent_tasks.py):
@celery.task
def docx_task(upload_file, mask_words, upload_method, mask_method, key, minio_path):
return mask_docx_file(upload_file, mask_words, upload_method, mask_method, key, minio_path)
当我使用任务时:
if file_type == 'docx':
task_id = docx_task.apply_async((file.filename, keywords, upload_method, mask_method, key, minio_path))
return task_id.id
这是我的芹菜树
fastapi_rebuild/
└── celery_task/
├── celery_utils/
│ ├── __init__.py
│ ├── encoding_format.py
│ ├── key_processor.py
│ └── minio_operation.py
├── worker_queue/
│ ├── __init__.py
│ ├── gevent_tasks.py
│ └── prefork_tasks.py
├── workers/
│ ├── __init__.py
│ └── docx_file.py
├── celery_app.py
这一行“task_id = docx_task.apply_async((file.filename, keywords, upload_method, mask_method, key, minio_path))”引发了错误 请告诉我问题是什么,谢谢
我尝试了 docx_task.apply_async 和 docx_task.delay,它引发了同样的错误
当您不传递 kwargs 参数时,似乎必须指定参数名称(
args=
)。至少他们在文档中做到了。
docx_task.apply_async(args=(file.filename, keywords, upload_method, mask_method, key, minio_path))
或者您可以使用缩写形式:
docx_task(file.filename, keywords, upload_method, mask_method, key, minio_path).apply_async()