Google Cloud数据流:CloudBigtableScanConfiguration.withScan(),如何传递动态过滤器值?

问题描述 投票:0回答:1

SourceLocation是我的Bigtable的前缀,该前缀是从application.properties获取的。有没有一种方法可以在运行数据流模板时动态地获取它?

我的管道:

pipeline.apply("ReadTable", Read.from(CloudBigtableIO.read(configSetUp(options))))

CloudBigtableScanConfiguration

private static CloudBigtableScanConfiguration configSetUp(LocationSetupOptions options) {
    ValueProvider<Integer>  pageFilter = options.getPageFilter();
    Scan scan = new Scan(Bytes.toBytes(options.getSourceLocation().get()));

    FilterList filterList = new FilterList();
    PrefixFilter prefixFilter = new PrefixFilter(Bytes.toBytes(options.getSourceLocation().get()));
    filterList.addFilter(new PageFilter(Long.valueOf(pageFilter.get())));
    filterList.addFilter(prefixFilter);
    scan.setFilter(filterList);

    return new CloudBigtableScanConfiguration.Builder()
        .withProjectId(options.getProjectId())
        .withInstanceId(options.getInstanceId())
        .withTableId(options.getTableId())
        .withScan(scan)
        .build();}
google-cloud-platform google-cloud-dataflow
1个回答
0
投票

[Bigtable CloudBigtableIOBigtableIO有两个客户端。 CloudBigtableIO参数未更新为由模板通过ValueProvider修改,但BigtableIO与ValueProviders兼容。

在您的特定情况下,如果您要与模板一起使用ValueProvider,那么我建议您转而使用BigtableIO。可以在此处找到一个示例AvroToBigtable

© www.soinside.com 2019 - 2024. All rights reserved.