我们目前从源接收 gzip 压缩的 csv 文件。我们发现在继续青铜加载之前使用gunzip命令解压缩文件的性能更高。
为了(在某种程度上)实现并行性,我们为每个文件创建一个线程并开始解压缩。显然这只是在驱动节点上完成的。我们使用的是Databricks,集群至少需要2个woker节点。有没有可能的方法让这两个其他节点参与解压?这是一个非常耗时的步骤,特别是对于大型历史批次,因此我们希望看看我们是否可以通过让工人参与来加快这一速度,同时也提高成本效率。
您可能会滥用 udfs。
import pyspark.sql.functions as F
@F.udf(returnType=StringType())
def decompress_file(file_name: Column):
try:
# code to download, decompress and upload the file
return f'decompressed {file_name}'
except Exception as e:
return f'exception {e} while decompressing {file_name}'
file_list = ['1.csv.gz', '2.csv.gz', '3.csv.gz']
df = spark.createDataFrame(data=file_list, schema='file_name: string')
df.withColumn('decompresion_result', decompress_file('file_name'))
假设输入文件在某些外部存储上可用,而不是在驱动器硬盘上。如果它们位于驱动程序上,那么您需要使它们在所有工作节点上可用。
一般来说,实现错误处理是很困难的。
我们发现在继续进行青铜级加载之前使用gunzip命令解压缩文件的性能更高。
我不太明白这一点。如果您说
spark.read.csv('folder/with/lot/of/gzipped/csv/files', compression='gzip')
,它将解压缩文件并加载它们,将工作分配给工作人员。也许比解压它们然后运行更有效spark.read.csv('folder/with/lot/of/csv/files')
。