如何用apache梁中的ParDo和DoFn写入GCS

问题描述 投票:0回答:1

使用apache_beam.io.filesystems.FileSystems如何使用ParDo和DoFn写入GCS?我已经从pardo获得csv格式的输出,我是否需要编写另一个pardo来将其写入gcs或者我可以直接导入模块将其直接写入gcs吗?请帮忙

python google-cloud-storage google-cloud-dataflow apache-beam apache-beam-io
1个回答
1
投票

我有一个例子here,我使用apache_beam.io.filesystems.FileSystems将b64编码的图像写入GCS。管道的最后一步是将b64作为包含两个字段key_idimage的PCollection并应用ParDo:

b64 | 'Save images' >> beam.ParDo(WriteToSeparateFiles(known_args.output))

其中known_args.output是GCS基本路径,WriteToSeparateFiles如下:

class WriteToSeparateFiles(beam.DoFn):
    def __init__(self, outdir):
        self.outdir = outdir
    def process(self, element):
        writer = filesystems.FileSystems.create(self.outdir + element['key_id'] + '.png')
        writer.write(element['image'])
        writer.close()

使用filesystems.FileSystems.create(),我可以控制目标路径。对于基本路径,我使用传递给函数的参数,并使用每个元素的key_id生成有意义的文件名。最后,当我正在写图像时,我附加了.png扩展名。

我使用writer.write(element['image'])为每个文件保存image字段的内容,并使用writer.close()关闭流。

© www.soinside.com 2019 - 2024. All rights reserved.