是否可以在工作节点上运行操作系统命令?

问题描述 投票:0回答:1

我们目前从源接收 gzip 压缩的 csv 文件。我们发现在继续青铜加载之前使用gunzip命令解压缩文件的性能更高。

为了(在某种程度上)实现并行性,我们为每个文件创建一个线程并开始解压缩。显然这只是在驱动节点上完成的。我们使用的是Databricks,集群至少需要2个woker节点。有没有可能的方法让这两个其他节点参与解压?这是一个非常耗时的步骤,特别是对于大型历史批次,因此我们希望看看我们是否可以通过让工人参与来加快这一速度,同时也提高成本效率。

apache-spark pyspark databricks gzip
1个回答
0
投票

您可能会滥用 udfs。

import pyspark.sql.functions as F


@F.udf(returnType=StringType())
def decompress_file(file_name: Column):
    try:
        # code to download, decompress and upload the file
        return f'decompressed {file_name}'
    except Exception as e:
        return f'exception {e} while decompressing {file_name}'


file_list = ['1.csv.gz', '2.csv.gz', '3.csv.gz']
df = spark.createDataFrame(data=file_list, schema='file_name: string')
df.withColumn('decompresion_result', decompress_file('file_name'))

假设输入文件在某些外部存储上可用,而不是在驱动器硬盘上。如果它们位于驱动程序上,那么您需要使它们在所有工作节点上可用。

一般来说,实现错误处理是很困难的。


我们发现在继续进行青铜级加载之前使用gunzip命令解压缩文件的性能更高。

我不太明白这一点。如果您说

spark.read.csv('folder/with/lot/of/gzipped/csv/files', compression='gzip')
,它将解压缩文件并加载它们,将工作分配给工作人员。也许比解压它们然后运行更有效
spark.read.csv('folder/with/lot/of/csv/files')

© www.soinside.com 2019 - 2024. All rights reserved.