我正在使用 Python 开发 Apache Beam 管道,在 Google Dataflow 上运行管道时遇到名称错误。该错误特别提到“json_encoder”未定义。管道在本地运行时工作正常。
这是代码的要点:
import apache_beam as beam
import decimal
# Simplified Apache Beam pipeline step
input | "Convert to string" >> beam.Map(encode_as_task)
def encode_as_task(element, cache_enabled=False):
import orjson
# Convert the element to a string before publishing to Pub/Sub
task = dict()
tasks = []
task['task'] = element[2][0]
task['task'].pop('client_id', None)
task['clientIds'] = element[1]
tasks.append(task)
task_generator_request = {"tasks": tasks, "cacheEnabled": cache_enabled}
message = orjson.dumps(task_generator_request, default=json_encoder)
return message
def json_encoder(value):
if isinstance(value, decimal.Decimal):
return float(value)
raise TypeError
此代码在本地运行良好。
但是当在云数据流中运行时,我收到以下错误。
NameError: name 'json_encoder' is not defined.
我曾经在处理外部依赖项(例如 orjson)时遇到这些问题。 我通过直接在函数内导入包来解决这些问题,就像
import orjson
中的 encode_as_task
。
但是由于
json_encoder
是一个内部函数,不知道如何处理或导入它。
旁注,我使用 Beam 运行时参数
--requirements_file=requirements.txt
发送需求文件,以告诉 apache Beam 和数据流有关依赖项的信息。仍然面临这些问题。
您可以将 --save_main_session 传递给管道选项吗?保存主会话意味着它将函数和变量保存在main类中,以便它们可以在数据流上进行pickle和unpickled。
有关 save_main_session 的更多信息,请查看 https://beam.apache.org/releases/pydoc/2.26.0/_modules/apache_beam/options/pipeline_options.html