@celery.task(bind=True, ignore_result=False)
def process_ml_model_dataset(self, dataset_serialized, media_serialized, ml_model_serialized, collection_name, n_factor):
if n_factor == 1:
result_group = group(process_ml_model_dataset_factor_x.s(dataset_serialized, media_serialized, ml_model_serialized, collection_name, n_factor))
else:
# tasks = [process_ml_model_dataset_factor_x.s(dataset_serialized, media_serialized, ml_model_serialized, collection_name, i) for i in range(1, n_factor+1)]
result_group = (group(process_ml_model_dataset_factor_x.s(dataset_serialized, media_serialized, ml_model_serialized, collection_name, i) for i in range(1, n_factor+1)))
# tasks = []
# for i in range(1, n_factor+1):
# tasks.append(process_ml_model_dataset_factor_x.s(dataset_serialized, media_serialized, ml_model_serialized, collection_name, i))
# result_group = group(*tasks)
result = result_group()
while not result.ready():
time.sleep(60)
@celery.task(bind=True, ignore_result=False)
def process_ml_model_dataset_factor_x(self, dataset_serialized, media_serialized, ml_model_serialized, collection_name, factor_x):
result_chain = chain(
process_ml_model_chunk.s(None, 1, dataset_serialized, ml_model_serialized, first_chunk_pickle, collection_name, factor_x))
reader_second_chunk = FILE_HELPER.csv_reader_chunked(
MEDIAS_UPLOADS_DEFAULT_DEST + "/" + str(media_serialized["id"]) + "/" + media_serialized["file_name"],
columns_header, DATASET_CHUNK_SIZE, DATASET_CHUNK_SIZE)
for i, chunk in enumerate(reader_second_chunk):
result_chain |= process_ml_model_chunk.s(i + 2, dataset_serialized, ml_model_serialized, pickle.dumps(chunk),
collection_name, factor_x)
在下面的简短代码示例中,我的代码/环境无法执行此任务。在此示例中,我请求一组包含任务链的任务执行某些工作,我在工作人员上得到以下输出:
worker-1 | [2024-04-03 09:42:23,143: INFO/MainProcess] Task src.tasks.dataset_full_ml_process.process_ml_model_chunk[d3c55d4c-2c1e-48e5-927f-50dce7ac6738] received
worker-1 | [2024-04-03 09:42:23,210: INFO/MainProcess] Task src.tasks.dataset_full_ml_process.process_ml_model_chunk[26af7b45-d1fd-41f6-88c3-04b2f3dd9f6a] received
worker-1 | [2024-04-03 09:42:23,273: INFO/MainProcess] Task src.tasks.dataset_full_ml_process.process_ml_model_chunk[0314e273-a66b-493a-88e6-6ae81f30cb5e] received
但任务从未执行。使用芹菜接缝很容易适用于任何情况,我已经阅读了文档中的解决方案,搜索了大量的 stackoverflow 线程,但到目前为止还没有机会执行此类任务。
我提供的代码只是一个示例,但我的项目基于这种方法:docker-flask-example by Nickjj
到目前为止我已经尝试过 celery 版本 5.3.6 和 5.4.0rc2。
我的芹菜设置非常困难:
REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0")
# Celery.
CELERY_CONFIG = {
"broker_url": REDIS_URL,
"result_backend": REDIS_URL,
"include": [
"src.tasks.dataset_full_ml_process"
],
}
正如 docker-flask-example 中的 Nickjj 所建议的那样 Worker 的 docker-compose.yml 部分是这样的:
worker:
<<: *default-app
command: celery -A "src.app.celery_app" worker -l "${CELERY_LOG_LEVEL:-debug}"
entrypoint: []
deploy:
resources:
limits:
cpus: "${DOCKER_WORKER_CPUS:-0}"
memory: "${DOCKER_WORKER_MEMORY:-0}"
profiles: ["worker"]
PS:当我请求任务 process_ml_model_dataset 且 n_factor == 1 时,一切正常,女巫只创建一个任务的组任务。
让一个任务等待另一个任务的结果确实是 效率低下,如果工作池不可用甚至可能导致死锁 筋疲力尽。
您可以在此处阅读更多相关信息。
你的
process_ml_model_dataset
正在等待其他celery任务的结果(一堆process_ml_model_dataset_factor_x
)。当 process_ml_model_dataset
运行时,它正在等待其他任务,并“占用”该工作人员,因此,其他任务无法运行(如果您没有足够的空闲工作人员)。
result_group
服务器检查 Flask
状态。retry
机制。这样,任务将尝试获取结果,如果还没有准备好,它将停止运行并在 X 秒内重试(您想要的重试次数)。
这样,工作人员将可以自由地处理您希望的任何子任务。只需运行流程并存储 result_group
的 UUID。然后在新任务中使用它,看看它是否准备好。这是一个伪代码:
@celery.task(bind=True, ignore_result=False, max_retries=10)
def process_ml_model_dataset_status(self, task_id):
num_retries = int(self.request.retries)
if num_retries == 0:
# optional: do your thing only in the first run if needed
result = celery.AsyncResult(task_id)
if not result.ready():
raise self.retry(countdown=60). # retry in 60 seconds
return result.get()