AvroParquetWriter.<GenericRecord>builder(filePath)
.withSchema(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withConf(Configuration)
.withDataModel(GenericData.get())
.withWriteMode(Mode.OVERWRITE)
.withRowGroupSize(8*1024*10124)
.withPageSize(64*1024*1024)
.build()
对于
Path
,我使用逻辑路径=“hdfsLocation”+String.format(tid%每个文件的行数),计数器/每个文件的行数)+_parquet。通过这些,我能够实现 382 KB 的文件大小,但我需要大约 100 MB 的文件大小。请分享一些解决方案。
通常您会指定滚动策略。例如:
FileSink<EnrichedNetflow> sink =
FileSink.forBulkFormat(
new Path(<my path>),
AvroParquetWriters.forSpecificRecord(
MyPojo.class))
.withRollingPolicy(new MyRollingPolicy())
.build();
其中滚动策略是
CheckpointRollingPolicy
,因为您需要在检查点上滚动文件。但这意味着,如果您没有为每个文件接收器子任务(每个检查点间隔)生成足够的数据,您的文件大小可能会小于您想要的大小。例如。如果您的接收器并行度为 128,并且您的检查点间隔为 1 分钟,那么每个文件大小将与每分钟到达工作流程的 1/128 的数据量相关。
为了解决这个问题,您可以(a)设置更长的检查点间隔,或者(b)在写入文件时降低并行度,或者(c)使用文件压缩策略。有关详细信息,请参阅 enableCompaction 方法。