Celery:获取任务图中的所有任务 ID

问题描述 投票:0回答:1

我有一个 celery 任务图,我想恢复图中的所有任务 ID 以用于我的应用程序中的其他目的。然而,在某些情况下这似乎是不可能的。

虽然对于简单的图表使用

as_tuple()
一切都可以正常工作,但当事情变得更复杂时,事情就会失败。例如,假设我想并行运行
t1
中的任务:等待它们完成,运行
f1
,等待它完成,并行运行
t2
中的任务,等待它们完成,运行
f2

from celery import Celery
from celery import group, chord, states, chain
import tasks
from tasks import add, mul


t1 =[add.s(1,2), add.s(3,4), add.s(5,6)]
f1 = mul.s(2,2)
t2 =[add.s(1,2), add.s(3,4), add.s(5,6)]
f2 = mul.s(3,4)

为此,首先我想到运行

chain
chords

chain_task1 = chain(chord(t1, f1), chord(t2, f2))
res1 = chain_task1.apply_async()

但是登录后

res1.as_tuple()
我们可以看到一些ID丢失了。事实上我只得到 5 个,而我们的和弦链至少包含 8 个(我怀疑是 10 个)。

print(res1.as_tuple())
# Output: (('5ba907d3-6daa-43a6-9c8d-0edaa9b359d4', (('6dcace43-9187-4ae3-8cfb-e5f298e2651a', None), [(('ea493ce4-10ea-43c0-8d1d-7e7d1a6491f2', None), None), (('577f24b7-3bde-47b9-be7a-5db45d07f01d', None), None), (('c29ca951-1a11-489b-83d5-f22bc326c39e', None), None)])), None)

尝试后,我注意到,当将我们的任务简单地定义为

chain
groups
tasks
时,我可以获得所需的行为(10 个任务):

chain_task2 = chain(group(t1), f1, group(t2), f2)
res2 = chain_task2.apply_async()
print(res2.as_tuple())
# Output: (('b6125ca6-41f5-4310-8df8-8c632c9ea268', (('8a7bac12-a556-4087-bf20-4de9ccaed22a', (('dd3bd05f-b56c-428a-9ed2-ac09dd2d3960', (('6e5f3fe3-df02-482f-99fc-b9385d74255e', None), [(('e7219ae2-7f53-45ce-b59f-fa168f97b7c4', None), None), (('3804f8ee-ea01-4554-a1c6-66be220ab214', None), None), (('c2a402c0-5401-4931-abf0-4e7275cf198d', None), None)])), None)), [(('1992ef83-836a-457c-8aa4-90875be0ad9a', None), None), (('b9636615-7d24-4af0-9843-9dcfb9163c61', None), None), (('f597050e-e6c4-4c95-9283-681d80ad020d', None), None)])), None)

我仍然想知道我是否做错了什么。然而我怀疑芹菜使用

parent
存在问题。
chord
.parent
可能会覆盖和弦之一,从而可以重新构建完整的任务图。

注意:任务在两种设置中都能正常运行,我只是无法检索任务 ID。

python celery task
1个回答
0
投票

还有另一种方法可以主动管理您的任务集,即在

chain()/group()/chord()
冻结它们之前为其分配 id,例如

from celery import group, chord, chain, signature, Task
from celery.canvas import Signature
from celery.result import AsyncResult, GroupResult, ResultSet
import time

t1: Signature = tasks.progress.s(10).set(task_id=str(uuid.uuid4()))
t2: Signature = tasks.progress.s(5).set(task_id=str(uuid.uuid4()))

# t3: Signature = tasks.add.si(4, 4).set(task_id=str(uuid.uuid4()))
# t4: Signature = tasks.add.si(5, 5).set(task_id=str(uuid.uuid4()))
t3: Signature = tasks.log.s().set(task_id=str(uuid.uuid4()))
t4: Signature = tasks.log.s().set(task_id=str(uuid.uuid4()))

task = chord(header=[t1, t2], body=group([t3, t4]))
result: AsyncResult = task.apply_async()

print(f"task: {task}")
print(f"graph: {result.as_tuple()}")
print(f"result: #[{result}]")

res1 = AsyncResult(id=t1.id)
res2 = AsyncResult(id=t2.id)
res3 = AsyncResult(id=t3.id)
res4 = AsyncResult(id=t4.id)

ress = ResultSet(results=[res1, res2, res3, res4])

while not result.ready():
    print(f"result #1, result[{res1.result}] status[{res1.status}]")
    print(f"result #2, result[{res2.result}] status[{res2.status}]")
    print(f"result #3, result[{res3.result}] status[{res3.status}]")
    print(f"result #4, result[{res4.result}] status[{res4.status}]")
    print(f"result set[#1, #2, #3, #4] {ress.completed_count()} {ress.ready()}")
    time.sleep(1)

print(f"result #1, result[{res1.result}] status[{res1.status}]")
print(f"result #2, result[{res2.result}] status[{res2.status}]")
print(f"result #3, result[{res3.result}] status[{res3.status}]")
print(f"result #4, result[{res4.result}] status[{res4.status}]")
print(f"result set[#1, #2, #3, #4] {ress.completed_count()} {ress.ready()}")
print(f"result: {result.get()}")
© www.soinside.com 2019 - 2024. All rights reserved.