使用嵌套函数在 Google Dataflow 上运行 Apache Beam 管道时出现名称错误

问题描述 投票:0回答:1

我正在使用 Python 开发 Apache Beam 管道,在 Google Dataflow 上运行管道时遇到名称错误。该错误特别提到“json_encoder”未定义。管道在本地运行时工作正常。

这是代码的要点:

import apache_beam as beam
import decimal

# Simplified Apache Beam pipeline step
input | "Convert to string" >> beam.Map(encode_as_task)

def encode_as_task(element, cache_enabled=False):
    import orjson
    # Convert the element to a string before publishing to Pub/Sub
    task = dict()
    tasks = []
    task['task'] = element[2][0]
    task['task'].pop('client_id', None)
    task['clientIds'] = element[1]
    tasks.append(task)
    task_generator_request = {"tasks": tasks, "cacheEnabled": cache_enabled}
    message = orjson.dumps(task_generator_request, default=json_encoder)
    return message

def json_encoder(value):
    if isinstance(value, decimal.Decimal):
        return float(value)
    raise TypeError

此代码在本地运行良好。

但是当在云数据流中运行时,我收到以下错误。

NameError: name 'json_encoder' is not defined. 

我曾经在处理外部依赖项(例如 orjson)时遇到这些问题。 我通过直接在函数内导入包来解决这些问题,就像

import orjson
中的
encode_as_task

但是由于

json_encoder
是一个内部函数,不知道如何处理或导入它。

旁注,我使用 Beam 运行时参数

 --requirements_file=requirements.txt
发送需求文件,以告诉 apache Beam 和数据流有关依赖项的信息。仍然面临这些问题。

python google-cloud-dataflow apache-beam
1个回答
0
投票

您可以将 --save_main_session 传递给管道选项吗?保存主会话意味着它将函数和变量保存在main类中,以便它们可以在数据流上进行pickle和unpickled。

有关 save_main_session 的更多信息,请查看 https://beam.apache.org/releases/pydoc/2.26.0/_modules/apache_beam/options/pipeline_options.html

© www.soinside.com 2019 - 2024. All rights reserved.