我的文件名包含我在管道中需要的信息,例如我的数据点的标识符是文件名的一部分而不是数据中的字段。例如,每个风力涡轮机产生文件涡轮机-loc-001-007.csv。例如,我需要管道中的loc数据。
Java(sdk 2.9.0):
Beams TextIO读者不能访问文件名本身,对于这些用例,我们需要使用FileIO来匹配文件并获取对文件名中存储的信息的访问权限。与TextIO不同,用户需要在FileIO读取下游的转换中处理文件的读取。 FileIO读取的结果是PCollection,ReadableFile类包含文件名作为元数据,可以与文件的内容一起使用。
FileIO确实有一个方便的方法readFullyAsUTF8String(),它将整个文件读入一个String对象,这将首先将整个文件读入内存。如果内存是一个问题,您可以使用FileSystems等实用程序类直接处理该文件。
PCollection<KV<String, String>> filesAndContents = p
.apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
// withCompression can be omitted - by default compression is detected from the filename.
.apply(FileIO.readMatches().withCompression(GZIP))
.apply(MapElements
// uses imports from TypeDescriptors
.into(KVs(strings(), strings()))
.via((ReadableFile f) -> KV.of(
f.getMetadata().resourceId().toString(), f.readFullyAsUTF8String())));
Python(sdk 2.9.0):
对于2.9.0 for python,您需要从Dataflow管道外部收集URI列表,并将其作为参数提供给管道。例如,使用FileSystems通过Glob模式读入文件列表,然后将其传递给PCollection进行处理。
一旦fileio看到PR https://github.com/apache/beam/pull/7791/可用,下面的代码也将是python的一个选项。
import apache_beam as beam
from apache_beam.io import fileio
with beam.Pipeline() as p:
readable_files = (p
| fileio.MatchFiles(‘hdfs://path/to/*.txt’)
| fileio.ReadMatches()
| beam.Reshuffle())
files_and_contents = (readable_files
| beam.Map(lambda x: (x.metadata.path,
x.read_utf8()))