从数据流管道的中间读取Bigtable数据

问题描述 投票:2回答:2

我有一个管道,该管道从pub sub接收一些数据,进行一些处理,并且需要根据该处理结果在Bigtable上处理所有数据。

例如,我有一个发布子消息,例如:{clientId: 10},因此我需要从Bigtable中读取clientId 10的所有数据(我知道如何基于clientId创建扫描)。问题在于,目前我们对Bigtable的读取(BigtableIO和CloudBigtableIO)都是基于管道以bigtable开头的事实,因此我无法(或找不到方法)在中间使用它们管道。如何解决这种情况?

简单的伪代码:

Pipeline p = Pipeline.create(...)
p.apply(PubsubIO.readMessagesWithAttributes ...)
.apply( PubsubMessageToScans()) // I know how to do this
.apply( ReadBigTable()) // How to do this?
google-cloud-dataflow apache-beam dataflow google-cloud-bigtable
2个回答
1
投票

为了补充@Billy的答案,您还可以尝试在ParDo转换中使用BigtableDataClient类。数据输入将是PubsubMessage中包含的参数以配置Scan对象,然后在ParDo中设置Scan参数,建立与BigTable的连接并获得过滤的结果。

此片段可能有用:

    @ProcessElement
    public void processElement(@Element String element, OutputReceiver<String> out){

        String projectId = "<PROJECT_ID>";
        String instanceId = "<INSTANCE_ID>";
        String tableName = "<TABLENAME>";


        String[] scanParameters = element.split(",");

        try (Connection connection = BigtableConfiguration.connect(projectId, instanceId)){

            Table table = connection.getTable(TableName.valueOf(tableName));

            Scan scan = new Scan();
            scan.withStartRow(Bytes.toBytes(scanParameters[0]));
            scan.withStopRow(Bytes.toBytes(scanParameters[1]));

            ResultScanner scanner = table.getScanner(scan);

            for (Result row : scanner) {
                System.out.println(row);
            }

            catch (Exception e){
                e.printStackTrace();
            }

            out.output("");
        }

我没有直接使用PubsubMessage对其进行测试,但是,您可以进行另一次转换以适应该消息,或者直接获取PubsubMessage并设置Scan对象。


0
投票

您需要使用CloudBigtableIO.read,它返回HBase Result对象的PCollection。默认情况下,它会扫描所有行,因此您将要应用特定的扫描或过滤器以免表过载。

Scan rangeQuery =
    new Scan()
        .withStartRow(Bytes.toBytes(START_ROW))
        .withStopRow(Bytes.toBytes(STOP_ROW));

CloudBigtableScanConfiguration config =
    new CloudBigtableScanConfiguration.Builder()
        .withProjectId(PROJECT_ID)
        .withInstanceId(INSTANCE_ID)
        .withTableId(TABLE_ID)
        .withScan(scan)
        .build();

Pipeline p = Pipeline.create(options);

p.apply(PubsubIO.readMessagesWithAttributes ...)
.apply( PubsubMessageToScans())
.apply(Read.from(CloudBigtableIO.read(config)))

reading from Bigtable during Dataflow job上的文档

CloudBigtableScanConfiguration的API参考

© www.soinside.com 2019 - 2024. All rights reserved.