嵌套异步协程,控制流返回调用者

问题描述 投票:0回答:1

我有一系列异步步骤要做,我想将其放入一个方法中。每个步骤都会生成一个子结果并引发后续的异步步骤,该异步步骤再次生成子结果,直到到达最后一步。

看看我的代码:

class TextDetector(Detector):
    lock: asyncio.Lock = asyncio.Lock()

    async def detect(self, input: TextDetectorInput) -> TextDetectorResult: 
        pass

class TextAugmentationProcedure(AugmentationProcedure):
    async def augment(
        self, input: AugmentationProcedureTextInput
    ) -> AugmentationProcedureTextOutput:
            pass
    
TextDetectorCoolDownAndResultType = Awaitable[Awaitable[ImageDetectorResult]]
RunTextDetectionType = AsyncIterator[tuple[DetectorIdentifier, TextDetector, TextDetectorCoolDownAndResultType]]
GetFilesTextType = AsyncIterator[tuple[InputFileIdentifier, TextInputFile, AugmentationProcedureTextOutput, RunTextDetectionType]]
CheckTextType = AsyncIterator[tuple[AugmentationProcedureIdentifier, TextAugmentationProcedure, GetFilesTextType]]

@dataclass
class TextTestCase(TestCase):

    inputs: dict[InputFileIdentifier, TextInputFile]

    augmentation_procedures: dict[AugmentationProcedureIdentifier, TextAugmentationProcedure]
    detectors: dict[DetectorIdentifier, TextDetector]
    
    async def get_files(self, augmentation_procedure: TextAugmentationProcedure) -> GetFilesTextType:
        for input_identifier, text_file in self.inputs.items():
            augmentation_input = await read_text_file(
                text_file.file_name, text_file.language
            )

            augmentation_output = await augmentation_procedure.augment(augmentation_input)
        
            detector_input = TextDetectorInput(
                        augmentation_output.language, augmentation_output.text
            )

            yield (input_identifier, text_file, augmentation_output, self.run_detection(detector_input))

    async def run_detection(self, detector_input: TextDetectorInput) -> RunTextDetectionType:
        for detector_identifier, detector in self.detectors.items():
            async def cooldown_and_detect(detector: TextDetector, detector_input: TextDetectorInput):
                # Acquire lock
                with detector.lock
                    # Cooldown
                    cooleddown = await detector.cooldown()
                    
                    return detector.detect(detector_input)
            
            yield (detector_identifier, detector, cooldown_and_detect(detector, detector_input))
    
    async def check(self) -> CheckImageType:
        for augmentation_procedure_identifier, augmentation_procedure in self.augmentation_procedures.items():
            yield (augmentation_procedure_identifier, augmentation_procedure, self.get_files(augmentation_procedure))

基本上,我想在

check(...)
的实例上调用方法
TextTestCase
时获得子结果。有趣的部分是
run_detection()
。对于每个检测器,应通知调用者。之后,获取锁。然后
detector.cooldown()
被调用并等待。如果等待,应通知呼叫者并拨打
detector.detect()
。当结果可用时,应通知调用者并释放锁定。

目前,我通过以下方式致电

check()

test_case = TextTestCase()

async for (augmentation_procedure_identifier, augmentation_procedure, augmentation_results) in test_case.check():
    async for (file_identifier, image_file, augmentation_output_awaitable, detectors) in augmentation_results:
        
        results.append([image_file.file_name, str(procedure), "Augmenting...", ""])
        live.update(update_table_with_results(results))

        async for (detector_identifier, detector, cooldown_awaitable) in detectors:
            try:
                detection_awaitable = await cooldown_awaitable
                detection_result = await detection_awaitable
            
                # TODO: Do stuff here
            except:
                pass
                # Error occured
                # TODO: print error

因为

cooldown_and_detect()
返回一个可等待的,
detector.detect()
,上下文管理器显然会在可等待返回后立即释放锁,即在等待
detector.cooldown()
并触发
detector.detect()
之后。但我想在等待
detector.detect()
后释放它,但我仍然想将控制流传递给调用者。

python concurrency python-asyncio
1个回答
0
投票

从上面的代码中理解你想要的东西有点混乱,但是通过添加适当的回调一切都应该是可能的。 IE。如果您的任务可以用可以并行运行的普通非循环流来描述,那么您应该一直使用

await
和上下文管理器,并且只使用 asyncio.TaskGroup 或对
gather
的调用您想要并行运行的“根级”任务。

否则,由于您想要自定义管理中间资源,例如由

lock
保护的中间资源,您可以显式调用
acquire
,并释放相应步骤的完成回调上的锁,而不是将锁用作上下文管理器(
with
块),或无条件地在
finally
块上释放它。

正如我之前所写,您的示例似乎比实际情况更人为,因此可以根据您的代码编写一个很好的示例 - 请在提出下一个问题之前阅读有关发布 最小可重现示例的信息 -

也就是说,如果我能正确理解您的意思,您内心

cooldown_and_detect
协同例程的代码中的某些内容可能会达到您想要的效果L

async def cooldown_and_detect(detector, detector_input: TextDetectorInput):
    # Acquire lock
    await detector.lock.acquire()
    try:    # Cooldown
        cooleddown = await detector.cooldown()
    except Exception as error:
        # something wrong with cooldown - release lcok
        detector.lock.release()
        raise
    detector_task = asyncio.create_task(detector.detect(detector_input))
    detector_task.add_done_callback(lambda task: detector.lock.release())
    return detector_task
                        

© www.soinside.com 2019 - 2024. All rights reserved.