我正在尝试扩展 Google 的 Dataflow 模板以将数据从 BQ 移动到 Cloud Storage 上的 parquet 文件,但我在尝试控制 parquet 文件大小时受阻。 https://cloud.google.com/dataflow/docs/guides/templates/provided/bigquery-to-parquet
我正在使用 Direct runner 在本地运行管道,它工作正常,但我无法控制 parquet 文件的大小。主要目标是确保推荐大小为 1GB 的文件大小。
我做的一些事实和测试:
我可以使用 .withNumShards(xxx) 控制分片的数量,它直接影响输出文件夹中生成的文件数量,因为这与管道中的并行性有关,如果我设置 1 个分片,我只有 1 个文件对于所有数据,如果我设置 5 个分片,我会得到 5 个镶木地板文件,如果我使用默认设置,镶木地板文件的数量将取决于分配给管道的分片数量。
有一个 .withRowGroupSize() 配置,但它只是修改数据在 parquet 文件中的存储方式,它不会直接影响最终大小。
我找到了一个FixedWindows的配置,但是它是一个基于时间的配置,所以它对文件大小没有确定性。
我也尝试使用页面大小配置,但它在我使用的版本中不直接可用(Apache beam 2.46.0,包括所有 sdk 和运行器依赖项)
我没有想法,找不到与此问题相关的信息。所以我想了解考虑到 Parquet 有推荐的文件大小,这是否是一种常见做法,以及是否有人可以提供有关如何实现这一目标的更多信息。
简版代码(未完成,但基本上是基于上面提到的模板)
TypedRead<GenericRecord> readFromBQ =
BigQueryIO.read(SchemaAndRecord::getRecord)
.from(options.getTableRef())
.withTemplateCompatibility()
.withMethod(Method.DIRECT_READ)
.withCoder(AvroCoder.of(schema));
/*
* Steps: 1) Read records from BigQuery via BigQueryIO.
* 2) Write records to Google Cloud Storage in Parquet format.
*/
pipeline
/*
* Step 1: Read records via BigQueryIO using supplied schema as a PCollection of
* {@link GenericRecord}.
*/
.apply("ReadFromBigQuery", readFromBQ)
/*
* Step 2: Write records to Google Cloud Storage as one or more Parquet files
* via {@link ParquetIO}.
*/
.apply(
"WriteToParquet",
FileIO.<GenericRecord>write()
.via(
ParquetIO.sink(schema))
.to("/my/path/to/local/bucket")
// .withNumShards(1)
.withSuffix(FILE_SUFFIX));
// Execute the pipeline and return the result.
return pipeline.run();
谢谢