我有一个管道,该管道从pub sub接收一些数据,进行一些处理,并且需要根据该处理结果在Bigtable上处理所有数据。
例如,我有一个发布子消息,例如:{clientId: 10}
,因此我需要从Bigtable中读取clientId 10的所有数据(我知道如何基于clientId创建扫描)。问题在于,目前我们对Bigtable的读取(BigtableIO和CloudBigtableIO)都是基于管道以bigtable开头的事实,因此我无法(或找不到方法)在中间使用它们管道。如何解决这种情况?
简单的伪代码:
Pipeline p = Pipeline.create(...)
p.apply(PubsubIO.readMessagesWithAttributes ...)
.apply( PubsubMessageToScans()) // I know how to do this
.apply( ReadBigTable()) // How to do this?
为了补充@Billy的答案,您还可以尝试在ParDo转换中使用BigtableDataClient类。数据输入将是PubsubMessage中包含的参数以配置Scan对象,然后在ParDo中设置Scan参数,建立与BigTable的连接并获得过滤的结果。
此片段可能有用:
@ProcessElement
public void processElement(@Element String element, OutputReceiver<String> out){
String projectId = "<PROJECT_ID>";
String instanceId = "<INSTANCE_ID>";
String tableName = "<TABLENAME>";
String[] scanParameters = element.split(",");
try (Connection connection = BigtableConfiguration.connect(projectId, instanceId)){
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
scan.withStartRow(Bytes.toBytes(scanParameters[0]));
scan.withStopRow(Bytes.toBytes(scanParameters[1]));
ResultScanner scanner = table.getScanner(scan);
for (Result row : scanner) {
System.out.println(row);
}
catch (Exception e){
e.printStackTrace();
}
out.output("");
}
我没有直接使用PubsubMessage对其进行测试,但是,您可以进行另一次转换以适应该消息,或者直接获取PubsubMessage并设置Scan对象。
您需要使用CloudBigtableIO.read,它返回HBase Result对象的PCollection。默认情况下,它会扫描所有行,因此您将要应用特定的扫描或过滤器以免表过载。
Scan rangeQuery =
new Scan()
.withStartRow(Bytes.toBytes(START_ROW))
.withStopRow(Bytes.toBytes(STOP_ROW));
CloudBigtableScanConfiguration config =
new CloudBigtableScanConfiguration.Builder()
.withProjectId(PROJECT_ID)
.withInstanceId(INSTANCE_ID)
.withTableId(TABLE_ID)
.withScan(scan)
.build();
Pipeline p = Pipeline.create(options);
p.apply(PubsubIO.readMessagesWithAttributes ...)
.apply( PubsubMessageToScans())
.apply(Read.from(CloudBigtableIO.read(config)))