Kotlin - 未知的“runner”指定了“DataflowRunner”

问题描述 投票:0回答:0

我在运行这个时遇到了一些问题:

import com.google.api.services.bigquery.model.TableRow
import com.google.cloud.bigquery.*
import org.apache.beam.runners.dataflow.DataflowRunner
import org.apache.beam.sdk.Pipeline
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO
import org.apache.beam.sdk.io.kafka.KafkaIO
import org.apache.beam.sdk.options.Description
import org.apache.beam.sdk.options.PipelineOptions
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.sdk.options.Validation
import org.apache.beam.sdk.transforms.MapElements
import org.apache.beam.sdk.transforms.SerializableFunction
import org.apache.beam.sdk.values.TypeDescriptor
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.joda.time.Duration

interface KafkaToBigQueryOptions : PipelineOptions {
    @get:Description("mytopic")
    @get:Validation.Required
    var inputTopic: String

    @get:Description("mytable")
    @get:Validation.Required
    var outputTable: String

    @get:Description("mydataset")
    @get:Validation.Required
    var outputDataset: String

    @get:Description("my-project")
    @get:Validation.Required
    var outputProject: String
}

fun main(args: Array<String>) {
    val options = PipelineOptionsFactory.fromArgs(*args)
        .withValidation()
        .`as`(KafkaToBigQueryOptions::class.java)
    options.runner = DataflowRunner::class.java
    val pipeline = Pipeline.create(options)

    val kafkaReadOptions = KafkaIO.read<ByteArray, ByteArray>()
        .withBootstrapServers("mykafka-server.local:9092")
        .withTopic(options.inputTopic)
        .withKeyDeserializer(ByteArrayDeserializer::class.java)
        .withValueDeserializer(ByteArrayDeserializer::class.java)
        .withDynamicRead(Duration.standardSeconds(5))

    val bigQueryTable = BigQueryHelpers.parseTableSpec(
        "${options.outputProject}:${options.outputDataset}.${options.outputTable}"
    )

    pipeline.apply("Read from Kafka", kafkaReadOptions)
        .apply("Transform to TableRow", MapElements.into(TypeDescriptor.of(TableRow::class.java))
            .via(SerializableFunction { value ->
                val row = TableRow()
                row.set("message", value.toString())
                row
            }))
        .apply("Write to BigQuery", BigQueryIO.writeTableRows()
            .to(bigQueryTable)
            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND))

    pipeline.run().waitUntilFinish()
}

当我从 IntelliJ 运行它时,出现以下错误:
即使我在 options.inputTopic = "mytopic" 中声明输入主题,也会出现同样的错误

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Exception in thread "main" java.lang.IllegalArgumentException: Missing required value for [--inputTopic, "mytopic"].  at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440) at org.apache.beam.sdk.options.PipelineOptionsValidator.validate(PipelineOptionsValidator.java:93) at org.apache.beam.sdk.options.PipelineOptionsValidator.validateCli(PipelineOptionsValidator.java:65) at org.apache.beam.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:346) at MainKt.main(Main.kt:39)

重建.jar,用命令运行它:

java -jar jarfile.jar --experiments=beam_fn_api --runner=DataflowRunner --project=myproject --region=us-central1 --inputTopic=mytopic --outputTable=mytable --outputDataset=mydataset --outputProject=myotherproject

错误:

SLF4J:加载类“org.slf4j.impl.StaticLoggerBinder”失败。 SLF4J:默认为无操作 (NOP) 记录器实现 SLF4J:有关详细信息,请参阅http://www.slf4j.org/codes.html#StaticLoggerBinder。 线程“main”中的异常 java.lang.IllegalArgumentException:未知的“runner”指定了“DataflowRunner”,支持的管道运行器 []

java kotlin google-cloud-dataflow
© www.soinside.com 2019 - 2024. All rights reserved.