我想做什么:
实际代码:
static void run(final BeeswaxDataflowOptions options) { final Pipeline pipeline = Pipeline.create(options); final PCollection<MatchResult.Metadata> matches = pipeline.apply( "Read", FileIO.match() .filepattern(options.getSourcePath() + options.getSourceFilesPattern()) .continuously( Duration.standardSeconds(options.getInterval()), Watch.Growth.<String>never())); matches .apply(FileIO.readMatches().withCompression(GZIP)) .apply( Window.<FileIO.ReadableFile>into( FixedWindows.of(Duration.standardSeconds(options.getWindowInterval()))) .accumulatingFiredPanes() .withAllowedLateness(Duration.ZERO) .triggering( Repeatedly.forever(AfterPane.elementCountAtLeast(1).getContinuationTrigger()))) .apply( "Uncompress", MapElements.into( TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings())) .via( file -> { final String filePath = file.getMetadata().resourceId().toString(); try { return KV.of(filePath, file.readFullyAsUTF8String()); } catch (final IOException e) { return KV.of(filePath, ""); } })) .apply("Prepare for BigQuery import", ParDo.of(new BigQueryDataPreparatorFn())) .apply( "Save results", FileIO.<String, KV<String, String>>writeDynamic() .withCompression(GZIP) .by(KV::getKey) .withDestinationCoder(StringUtf8Coder.of()) .via(Contextful.fn(KV::getValue), TextIO.sink()) .withNumShards(options.getShards()) .to(options.getOutputPath()) .withTempDirectory(options.getTempLocation()) .withNaming(AbsoluteNaming::new)); pipeline.run().waitUntilFinish();
问题出在OutOfMemory异常(是的,我知道readFullyAsUTF8String对此可疑)。如何处理这种情况?
我的观察结果是,在“解压缩”步骤中读取并收集了所有〜3000个文件。因此,在进行“准备导入BigQuery”和“保存结果”之前,它已经以某种方式累积并读取到RAM中。
最好以某种方式将这个管道排队-像最多50个元素要经过步骤并等待结果,然后再开始。这可能吗?如果没有,如何处理不同]
我想做的事情:按模式连续读取和解压缩GZ文件(约3000个文件),解压缩后每个文件分别有1.2MB和9 MB替换每个CSV文件中的某些字符序列...