数据流上的Apache Beam不接受BigQuery查询的ValueProvider

问题描述 投票:0回答:1

目标

我的目标是创建一个指定Apache Beam管道的数据流模板。管道以批处理模式运行,从BigQuery读取,然后执行转换并在其他位置写入。最重要的是,我用于从BigQuery读取的查询必须提供运行时。

预期行为

预期结果是管道将使用运行时参数指定BigQuery查询,执行查询,然后继续其余的管道。

实际行为

实际行为是忽略我传递的运行时参数,而是使用在创建GCS模板时指定的参数I had

相关代码

下面是我如何指定读取操作,以及如何定义和传递查询参数。

public interface MyOptions extends PipelineOptions, StreamingOptions {
    @Description("Query String")
    ValueProvider<String> getQueryString();

    void setQueryString(ValueProvider<String> value);
}

public static void main(String[] args) {
        MyOptions options = PipelineOptionsFactory.fromArgs(args)
                .withValidation()
                .as(MyOptions.class);
        Pipeline p = Pipeline.create(options);

        PCollection<TableRow> tableRows =
                p.apply(BigQueryIO.readTableRows()
                        .fromQuery(options.getQueryString())
                        .withTemplateCompatibility()
                        .withoutValidation());
// Add this point I run my transformations and loading
}

要实际构建模板并推送到GCS,请执行以下操作

mvn compile -Pdataflow-runner exec:java -Dexec.mainClass=com.Pipeline "-Dexec.args=--runner=DataflowRunner --queryString='SELECT time,type FROM [my-project:timeseries.my-data] where time between TIMESTAMP(\"2020-02-13T00:00:00Z\") and TIMESTAMP(\"2020-02-15T00:00:00Z\")'"

最后,我使用Dataflow Web UI从GCS中选择模板并进行部署。在Web UI的底部,指定运行时参数,在其中设置queryString和要使用的运行时查询。

[Note:当我在Dataflow中运行模板时,我指定queryString,并且我知道它已被传入。我重写了第一个转换以打印出queryString并正确打印指定的运行时选项。问题是“从BigQuery读取” queryString仍然是我制作模板时使用的原始内容。

java google-cloud-platform google-bigquery apache-beam
1个回答
0
投票

经过多次迭代,我找出了问题所在。实际上有2个,最大的一个是我实际上不需要将运行时参数传递给“构建模板”步骤。

  1. 构建管道时不要传递运行时参数。似乎很明显,但请从mvn compile args
  2. 中删除
  3. 很难将queryString格式化为运行时参数。下面经过多次迭代为我工作
SELECT time,type FROM `my-project.timeseries.my-data` where time between TIMESTAMP(\"2019-02-13T00:00:00Z\") and TIMESTAMP(\"2020-02-15T00:00:00Z\")

请注意,缺少整个参数的引号以及如何格式化projectId.dataset.tableId。

© www.soinside.com 2019 - 2024. All rights reserved.