我正在使用zeppelin和spark,我想从S3中取出一个2TB的文件,并在Spark中对其进行变换,然后将其上传到S3中,这样我就可以在Jupyter notebook中使用该文件。变换的过程非常直接。
我把文件作为一个parquet文件来读。我认为它大约有2TB,但我不知道如何验证。
它大约有10M行和5列,所以它是相当大的。
我试着做了 my_table.write.parquet(s3path)
我试过 my_table.write.option("maxRecordsPerFile", 200000).parquet(s3path)
. 我如何想出正确的方法来写一个大的parquet文件?
这几点你可以考虑....
用
my_table.write.parquet(s3path)
Spark每个任务写一个文件出来。
保存的文件数=被保存的RDDDataframe的分区数。因此,这可能会导致大得离谱的文件(当然,你可以重新分区数据并保存 重新分区是指将数据在网络间进行洗牌。.).
限制每个文件的记录数量
my_table.write.option("maxRecordsPerFile", numberOfRecordsPerFile..yourwish).parquet(s3path)
它可以避免产生巨大的文件。
emr-spark-s3-optimized-committer(优化的committer)
当没有使用EMRFS S3优化的提交者时 。
.config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", 2)
.config("spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored", true)
.config("spark.hadoop.parquet.enable.summary-metadata", false)
.config("spark.sql.parquet.mergeSchema", false)
.config("spark.sql.parquet.filterPushdown", true) // for reading purpose
.config("mapreduce.fileoutputcommitter.algorithm.version", "2")
.config("spark.sql.parquet.compression.codec", "snappy")
.getOrCreate()
.config("spark.hadoop.fs.s3a.fast.upload","true")
.config("spark.hadoop.fs.s3a.fast.upload","true")
.config("spark.hadoop.fs.s3a.connection.timeout","100000")
.config("spark.hadoop.fs.s3a.attempts.maximum","10")
.config("spark.hadoop.fs.s3a.fast.upload","true")
.config("spark.hadoop.fs.s3a.fast.upload.buffer","bytebuffer")
.config("spark.hadoop.fs.s3a.fast.upload.active.blocks","4")
.config("fs.s3a.connection.ssl.enabled", "true")
"fs.s3a.multipart.size
控制块的大小。有10K块的限制,所以你可以上传的最大文件是该大小*10,000。对于非常大的文件,使用比默认的 "64M "更大的数字。