如何使用PySpark并行处理文件处理程序

问题描述 投票:1回答:1

我现在有一个大型python项目,其中驱动程序具有一个功能,该功能使用for循环遍历我的GCP(谷歌云平台)存储桶中的每个文件。我正在使用CLI将作业提交到GCP,然后让该作业在GCP上运行。

对于在此for循环中遍历的每个文件,我正在调用一个parse_file(...)函数,该函数解析文件并调用处理该文件的其他函数的序列。

整个项目运行并花费几分钟,这很慢,并且驱动程序尚未使用太多PySpark。问题是该文件级for循环中的每个parse_file(...)都按顺序执行。是否可以使用PySpark并行化该文件级的for循环,以对所有这些文件并行运行parse_file(...)函数,以减少程序执行时间并提高效率?如果是这样,由于该程序未使用PySpark,是否需要进行大量代码修改才能使其并行化?

所以程序的功能看起来像这样

# ... some other codes
attributes_table = ....
for obj in gcp_bucket.objects(path):
    if obj.key.endswith('sys_data.txt'):
        #....some other codes
        file_data = (d for d in obj.download().decode('utf-8').split('\n'))
        parse_file(file_data, attributes_table)
        #....some other codes ....

我如何使用PySpark并行化此部分,而不是一次使用遍历文件?

apache-spark for-loop pyspark parallel-processing
1个回答
0
投票

感谢您提出问题。

我建议根据您的gcp_bucket.objects(path)创建一个RDD。

您拥有您的SparkContext,因此创建RDD应该很简单:my_rdd = sc.parallelize(gcp_bucket.objects(path)

对于未启动的约定,是将SparkContext分配给变量sc。 for循环的内容必须放入一个函数中,我们称之为my_function。您现在拥有了所有的碎片。

您的下一步将这样映射您的函数:

results_dag = my_rdd.map(my_function)
results = results_dag.collect()

回想一下,Spark执行了惰性评估。这就是为什么我们需要在最后执行collect操作。

其他一些建议。首先是在GCP存储桶中的一小部分对象上运行代码。了解时机。为了促进良好的编码习惯,另一项建议是考虑将for循环内的操作进一步分解为其他RDD。您可以随时将它们链接在一起...

my_rdd = sc.parallelize(gcp_bucket.objects(path)
dag1 = my_rdd.map(function1)
dag2 = dag1.map(function2)
dag3 = dag2.map(function3)
results = dag3.collect()
© www.soinside.com 2019 - 2024. All rights reserved.