我的目标是创建一个指定Apache Beam管道的数据流模板。管道以批处理模式运行,从BigQuery读取,然后执行转换并在其他位置写入。最重要的是,我用于从BigQuery读取的查询必须提供运行时。
预期结果是管道将使用运行时参数指定BigQuery查询,执行查询,然后继续其余的管道。
实际行为是忽略我传递的运行时参数,而是使用在创建GCS模板时指定的参数I had。
下面是我如何指定读取操作,以及如何定义和传递查询参数。
public interface MyOptions extends PipelineOptions, StreamingOptions {
@Description("Query String")
ValueProvider<String> getQueryString();
void setQueryString(ValueProvider<String> value);
}
public static void main(String[] args) {
MyOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(MyOptions.class);
Pipeline p = Pipeline.create(options);
PCollection<TableRow> tableRows =
p.apply(BigQueryIO.readTableRows()
.fromQuery(options.getQueryString())
.withTemplateCompatibility()
.withoutValidation());
// Add this point I run my transformations and loading
}
要实际构建模板并推送到GCS,请执行以下操作
mvn compile -Pdataflow-runner exec:java -Dexec.mainClass=com.Pipeline "-Dexec.args=--runner=DataflowRunner --queryString='SELECT time,type FROM [my-project:timeseries.my-data] where time between TIMESTAMP(\"2020-02-13T00:00:00Z\") and TIMESTAMP(\"2020-02-15T00:00:00Z\")'"
最后,我使用Dataflow Web UI从GCS中选择模板并进行部署。在Web UI的底部,指定运行时参数,在其中设置queryString和要使用的运行时查询。
[Note:当我在Dataflow中运行模板时,我指定queryString,并且我知道它已被传入。我重写了第一个转换以打印出queryString并正确打印指定的运行时选项。问题是“从BigQuery读取” queryString仍然是我制作模板时使用的原始内容。
经过多次迭代,我找出了问题所在。实际上有2个,最大的一个是我实际上不需要将运行时参数传递给“构建模板”步骤。
mvn compile args
SELECT time,type FROM `my-project.timeseries.my-data` where time between TIMESTAMP(\"2019-02-13T00:00:00Z\") and TIMESTAMP(\"2020-02-15T00:00:00Z\")
请注意,缺少整个参数的引号以及如何格式化projectId.dataset.tableId。