我有一系列异步步骤要做,我想将其放入一个方法中。每个步骤都会生成一个子结果并引发后续的异步步骤,该异步步骤再次生成子结果,直到到达最后一步。
看看我的代码:
class TextDetector(Detector):
lock: asyncio.Lock = asyncio.Lock()
async def detect(self, input: TextDetectorInput) -> TextDetectorResult:
pass
class TextAugmentationProcedure(AugmentationProcedure):
async def augment(
self, input: AugmentationProcedureTextInput
) -> AugmentationProcedureTextOutput:
pass
TextDetectorCoolDownAndResultType = Awaitable[Awaitable[ImageDetectorResult]]
RunTextDetectionType = AsyncIterator[tuple[DetectorIdentifier, TextDetector, TextDetectorCoolDownAndResultType]]
GetFilesTextType = AsyncIterator[tuple[InputFileIdentifier, TextInputFile, AugmentationProcedureTextOutput, RunTextDetectionType]]
CheckTextType = AsyncIterator[tuple[AugmentationProcedureIdentifier, TextAugmentationProcedure, GetFilesTextType]]
@dataclass
class TextTestCase(TestCase):
inputs: dict[InputFileIdentifier, TextInputFile]
augmentation_procedures: dict[AugmentationProcedureIdentifier, TextAugmentationProcedure]
detectors: dict[DetectorIdentifier, TextDetector]
async def get_files(self, augmentation_procedure: TextAugmentationProcedure) -> GetFilesTextType:
for input_identifier, text_file in self.inputs.items():
augmentation_input = await read_text_file(
text_file.file_name, text_file.language
)
augmentation_output = await augmentation_procedure.augment(augmentation_input)
detector_input = TextDetectorInput(
augmentation_output.language, augmentation_output.text
)
yield (input_identifier, text_file, augmentation_output, self.run_detection(detector_input))
async def run_detection(self, detector_input: TextDetectorInput) -> RunTextDetectionType:
for detector_identifier, detector in self.detectors.items():
async def cooldown_and_detect(detector: TextDetector, detector_input: TextDetectorInput):
# Acquire lock
with detector.lock
# Cooldown
cooleddown = await detector.cooldown()
return detector.detect(detector_input)
yield (detector_identifier, detector, cooldown_and_detect(detector, detector_input))
async def check(self) -> CheckImageType:
for augmentation_procedure_identifier, augmentation_procedure in self.augmentation_procedures.items():
yield (augmentation_procedure_identifier, augmentation_procedure, self.get_files(augmentation_procedure))
基本上,我想在
check(...)
的实例上调用方法 TextTestCase
时获得子结果。有趣的部分是run_detection()
。对于每个检测器,应通知调用者。之后,获取锁。然后 detector.cooldown()
被调用并等待。如果等待,应通知呼叫者并拨打 detector.detect()
。当结果可用时,应通知调用者并释放锁定。
目前,我通过以下方式致电
check()
:
test_case = TextTestCase()
async for (augmentation_procedure_identifier, augmentation_procedure, augmentation_results) in test_case.check():
async for (file_identifier, image_file, augmentation_output_awaitable, detectors) in augmentation_results:
results.append([image_file.file_name, str(procedure), "Augmenting...", ""])
live.update(update_table_with_results(results))
async for (detector_identifier, detector, cooldown_awaitable) in detectors:
try:
detection_awaitable = await cooldown_awaitable
detection_result = await detection_awaitable
# TODO: Do stuff here
except:
pass
# Error occured
# TODO: print error
因为
cooldown_and_detect()
返回一个可等待的,detector.detect()
,上下文管理器显然会在可等待返回后立即释放锁,即在等待 detector.cooldown()
并触发 detector.detect()
之后。但我想在等待 detector.detect()
后释放它,但我仍然想将控制流传递给调用者。
从上面的代码中理解你想要的东西有点混乱,但是通过添加适当的回调一切都应该是可能的。 IE。如果您的任务可以用可以并行运行的普通非循环流来描述,那么您应该一直使用
await
和上下文管理器,并且只使用 asyncio.TaskGroup 或对 gather
的调用您想要并行运行的“根级”任务。
否则,由于您想要自定义管理中间资源,例如由
lock
保护的中间资源,您可以显式调用 acquire
,并释放相应步骤的完成回调上的锁,而不是将锁用作上下文管理器(with
块),或无条件地在finally
块上释放它。
正如我之前所写,您的示例似乎比实际情况更人为,因此可以根据您的代码编写一个很好的示例 - 请在提出下一个问题之前阅读有关发布 最小可重现示例的信息 -
也就是说,如果我能正确理解您的意思,您内心
cooldown_and_detect
协同例程的代码中的某些内容可能会达到您想要的效果L
async def cooldown_and_detect(detector, detector_input: TextDetectorInput):
# Acquire lock
await detector.lock.acquire()
try: # Cooldown
cooleddown = await detector.cooldown()
except Exception as error:
# something wrong with cooldown - release lcok
detector.lock.release()
raise
detector_task = asyncio.create_task(detector.detect(detector_input))
detector_task.add_done_callback(lambda task: detector.lock.release())
return detector_task