我有一个场景,我将本地文件上传到 Cloud Storage 存储桶,从而触发 Cloud Function (xyz)。在此云功能中,我正在使用 Google Cloud Document AI 执行批处理任务。我面临的挑战是批处理可能需要比云函数允许的执行时间更长的时间,从而导致超时。
以下是相关代码片段:
from typing import Optional
from google.api_core.client_options import ClientOptions
from google.cloud import documentai
def batch_process_documents(
gcs_input_uri: str,
):
project_id = "tally-ai-394207"
location = "us"
processor_id = "d5efc50f9221c006" # invoice parser
gcs_output_uri = "gs://document-ai-output-bucket-haritha/ManishoOutput/" # Must end with a '/'
# Set the API endpoint based on the location
api_endpoint = f"{location}-documentai.googleapis.com"
client_options = ClientOptions(api_endpoint=api_endpoint)
client = documentai.DocumentProcessorServiceClient(client_options=client_options)
# Specify a GCS URI Prefix to process an entire directory
gcs_prefix = documentai.GcsPrefix(gcs_uri_prefix=gcs_input_uri)
input_config = documentai.BatchDocumentsInputConfig(gcs_prefix=gcs_prefix)
# Cloud Storage URI for the Output Directory
gcs_output_config = documentai.DocumentOutputConfig.GcsOutputConfig(
gcs_uri=gcs_output_uri
)
# Where to write results
output_config = documentai.DocumentOutputConfig(gcs_output_config=gcs_output_config)
name = client.processor_path(project_id, location, processor_id)
request = documentai.BatchProcessRequest(
name=name,
input_documents=input_config,
document_output_config=output_config,
)
# BatchProcess returns a Long Running Operation (LRO)
operation = client.batch_process_documents(request)
# Wait for the operation to complete
operation.result()
# NOTE: Can also use callbacks for asynchronous processing
#
# def my_callback(future):
# result = future.result()
#
# operation.add_done_callback(my_callback)
我想在批处理完成后触发另一个云函数(abc),或者我们可以传递云函数(abc)而不是my_callback以及通知云函数(abc)处理已完成的方法。此外,我需要将batch_process_documents返回的信息传递给abc。考虑到潜在的超时问题,我正在寻找一种解决方案来更有效地处理这种情况。
任何有关如何实现这一目标的指导或建议将不胜感激。
我实现了一个解决方案,将本地文件上传到 Cloud Storage 存储桶,从而触发 Cloud Function (xyz)。在此云功能中,我利用 Google Cloud Document AI 来执行批处理。为了处理批处理的异步性质,我考虑使用回调函数(my_callback),但我担心潜在的超时,因为批处理任务可能会超出云函数(xyz)允许的执行时间。
你在期待什么:
从设计角度来看,我预计云函数(xyz)可能会在批处理完成执行之前超时。我正在探索替代设计选项,以确保批处理任务能够成功完成,并且我对在批处理完成后触发另一个云函数(abc)特别感兴趣。此外,我正在寻找一种方法,在这种情况下将batch_process_documents返回的信息传递给abc。
我能想到的唯一解决方案如下:
未来其他可能的解决方案是: 如果 GCP eventarc 创建一个在批处理完成后发送的事件类型,您可以将其与所需的云功能一起使用