我有一个使用 Java 外部转换的 Google Dataflow Batch 管道。
当我使用 Java -jar 启动扩展服务并使用本地的 Dataflow Runner 启动管道时,它运行良好。
如何将其打包为 Flex 模板,以便可以从 AirFlow 运行。
我使用 Pythonbase 图像创建了 Flextemplate 并将 FLEX_TEMPLATE_PYTHON_PY_FILE 设置为我的主 python 文件。
如何在 Docker 中启动 Expansion 服务?
我尝试从 python 作为子进程运行 jar,我可以在日志中看到,扩展服务已启动,但管道无法连接到它。
出现以下错误:
debug_error_string =“未知:无法连接到所有地址;最后一个错误:未知:ipv4:127.0.0.1:12345:无法连接到远程主机:连接被拒绝{created_time:”2023-11-02T21:14:34.044478242 +00:00", grpc_status:14}"
如何将此多语言管道打包为 Flex 模板。
如有任何帮助,我们将不胜感激。
我发现尽管文档没有显示任何示例,但它提到了 JavaJarExpansionService。
我们可以在Python代码中使用此类来启动扩展服务,并在使用外部转换时将其作为参数传递给beam.ExternalTransform。
扩展服务= JavaJarExpansionService(path_to_jar=input_args.expansion_jar_path)
beam.ExternalTransform('URN',ImplicitSchemaPayloadBuilder({Payload}),expansion_service))