Apache Beam GCP在动态创建的目录中上传Avro

问题描述 投票:0回答:3

我想在GCP中创建一个流式Apache Beam管道,该管道从Google Pub / Sub读取数据并将其推送到GCS。我可以从Pub / Sub读取数据。我当前的代码如下所示(从GCP Apache Beam模板之一中选取)

pipeline.apply("Read PubSub Events",
  PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()))
                .apply("Map to Archive", ParDo.of(new PubsubMessageToArchiveDoFn()))
                .apply(
                        options.getWindowDuration() + " Window",
                        Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))
                .apply(
                        "Write File(s)",
                        AvroIO.write(AdEvent.class)
                                .to(
                                        new WindowedFilenamePolicy(
                                                options.getOutputDirectory(),
                                                options.getOutputFilenamePrefix(),
                                                options.getOutputShardTemplate(),
                                                options.getOutputFilenameSuffix()))
                                .withTempDirectory(NestedValueProvider.of(
                                        options.getAvroTempDirectory(),
                                        (SerializableFunction<String, ResourceId>) input ->
                                                FileBasedSink.convertToFileResourceIfPossible(input)))
                                .withWindowedWrites()
                                .withNumShards(options.getNumShards()));

它可以生成如下所示的文件windowed-file2020-04-28T09:00:00.000Z-2020-04-28T09:02:00.000Z-pane-0-last-00-of-01.avro

我想将数据存储在动态创建的目录中的GCS中。在以下目录2020-04-28/012020-04-28/02等中-0102是子目录,表示数据流流传输管道处理数据的时刻。

示例:

gs://data/2020-04-28/01/0000000.avro
gs://data/2020-04-28/01/0000001.avro
gs://data/2020-04-28/01/....

gs://data/2020-04-28/02/0000000.avro
gs://data/2020-04-28/02/0000001.avro
gs://data/2020-04-28/02/....

gs://data/2020-04-28/03/0000000.avro
gs://data/2020-04-28/03/0000001.avro
gs://data/2020-04-28/03/....
...

0000000、0000001等是我用于说明的简单文件名,我不希望这些文件是顺序名称。您认为在GCP数据流流式传输设置中有可能吗?

我想在GCP中创建一个流式Apache Beam管道,该管道从Google Pub / Sub读取数据并将其推送到GCS。我可以从Pub / Sub读取数据。我当前的代码如下所示(...

google-cloud-platform pipeline apache-beam dataflow apache-beam-pipeline
3个回答
2
投票

您可以实现自己的FilenamePolicy(也许以WindowedFilenamePolicy作为起点)来使用自己的逻辑来定义输出路径。您可以根据需要在文件路径中使用/字符(顺便说一下,GCS存储桶为"flat",它们实际上没有目录)。要获取日期/时间,windowedFilename方法将窗口信息作为参数,因此您可以在返回值中使用它,但您认为合适。


1
投票

您需要使用writeDynamic而不是Write。不幸的是,AvroIO不像here那样本机支持writeDynamic,而是需要使用FileIO。


0
投票

您可以使用Pub/Sub to Cloud Storage Avro template是一种流传输管道,可从发布/订阅主题中读取数据,并将Avro文件写入指定的Cloud Storage存储桶。该管道支持可选的用户提供的窗口持续时间,用于执行窗口写入。

© www.soinside.com 2019 - 2024. All rights reserved.