Apache Beam数据流中的外部api调用

问题描述 投票:0回答:2

我有一个用例,我读了存储在谷歌云存储中的换行json元素,并开始处理每个json。在处理每个json时,无论是否先前发现了该json元素,我都必须调用外部API进行重复数据删除。我正在用每个json上的ParDo进行DoFn

我还没有看过任何在线教程,没有关于如何从apache beamDoFnDataflow调用外部API端点的信息。

我正在使用Beam的JAVA SDK。我研究过的一些教程说明使用startBundleFinishBundle,但我不清楚如何使用它

java google-cloud-dataflow apache-beam apache-beam-io
2个回答
0
投票

[以下博客文章中有一个使用有状态DoFn批量调用外部系统的示例:https://beam.apache.org/blog/2017/08/28/timely-processing.html,可能会有所帮助。


0
投票

如果您需要为每个JSON记录检查外部存储中的重复项,那么仍然可以使用DoFn。有几种注释,例如@Setup@StartBundle@FinishBundle等,可用于注释DoFn中的方法。

例如,如果您需要实例化一个客户端对象以将请求发送到外部数据库,那么您可能想要在@Setup方法(例如POJO构造函数)中执行此操作,然后在@ProcessElement方法中利用此客户端对象。

让我们考虑一个简单的例子:

static class MyDoFn extends DoFn<Record, Record> {

    static transient MyClient client;

    @Setup
    public void setup() {
        client = new MyClient("host");
    }

    @ProcessElement
    public void processElement(ProcessContext c) {
        // process your records
        Record r = c.element();
        // check record ID for duplicates
        if (!client.isRecordExist(r.id()) {
            c.output(r);
        }
    }

    @Teardown
    public void teardown() {
        if (client != null) {
            client.close();
            client = null;
        }
    }
}

此外,为了避免对每个记录进行远程调用,您可以将束记录分批放入内部缓冲区(将输入数据束分成束),并以批处理方式检查重复项(如果客户端支持)。为此,您可以使用@StartBundle@FinishBundle带注释的方法,这些方法将在相应地处理Beam束之前和之后立即调用。

[对于更复杂的示例,我建议看一下不同Beam IO中的Sink实现,例如KinesisIO

© www.soinside.com 2019 - 2024. All rights reserved.