我正在尝试使用 Google Cloud Dataflow Flex 模板运行 Apache Beam 流,但我创建的模板继续失败,并显示以下错误消息:
java.lang.IllegalArgumentException: Transform ReadingFileFromSamples-Read-Impulse is not a composite transform but does not have a specified URN.
outputs {
key: "org.apache.beam.sdk.values.PCollection.<init>:397#bb20b45fd4d95138"
value: "ReadingFileFromSamples/Read/Impulse.out"
}
代码是非常简单的 Kotlin 版本的 wordcount,使用 Apache Beam v2.51.0:
fun main(args: Array<String>) {
val logger = LoggerFactory.getLogger("app.fp8.sample.kbeam.MainTK")
val options = PipelineOptionsFactory.fromArgs(*args).withValidation().`as`(IAppOptions::class.java)
// options.gcpCredential = AppHelper.getGCloudCredentials(options.serviceAccountPath)
// Define pipeline
val pipe = Pipeline.create(options)
pipe.apply(
"ReadingFileFromSamples",
TextIO.read().from(options.inputFilePattern)
).apply(
"CountWords",
WordCounterTransform(options.outputCountGreaterThan)
).apply(
"OutputLogs",
PassthroughTransform<String> {
println("Result -> $it")
}
)
// Run the pipeline
try {
logger.info("Start running pipeline reading from {}", options.inputFilePattern)
pipe.run()
} catch (e: RuntimeException) {
logger.error("Job failed due to runtime exception: ${e.message}", e)
exitProcess(1)
} catch (e: Exception) {
logger.error("Unexpected Exception: ${e.message}", e)
exitProcess(1)
}
}
错误明显出现在
TextIO.read()
步骤,其中inputFilePattern
设置为gs://apache-beam-samples/shakespeare/*
。该错误似乎暗示我正在尝试读取一个不正确的Impulse.out
。
知道这个错误意味着什么吗?
经过大量研究,我发现以下问题与我提出的问题类似:
https://github.com/apache/beam/issues/28034
我也在构建一个胖罐子,当我将所有必需的罐子传递给
--jar
的gcloud dataflow flex-template build
方法时,我终于成功地在Dataflow中运行了flex模板。
这让我想知道为什么 Google 不允许将通配符传递到 --jar 选项中。缺少通配符选项是我首先创建 fat jar 的原因。