在数据帧上的
foreach()
中执行写入操作时,我遇到此错误。这段代码在 3 个多月内运行良好,但从上周开始出现故障。
为了提供一些上下文,我有一个数据框 extract_df,其中包含 2 列 xml_full_name 和内容。我使用下面的代码将这些记录作为 xml 写入 ADLS 中的目标文件夹。
extract_df.foreach(write_file)
write_file
定义为:
def write_file(row):
with open(row.extract_path, "wb") as f:
f.write(row.content)
笔记本还使用
spark.write
命令编写一些镶木地板,效果很好。
经过进一步调查,我发现这个问题可能与并行性有关,作为解决方法,我尝试了以下方法:
for row in extract_df.collect():
with open(row.extract_path, "wb") as f:
f.write(row.content)
这有效,这意味着连接正常,但并行性不起作用。但是,这不能用作修复方法,因为它会降低性能。
这里有人遇到过这个问题吗?如果这与某些配置有关,请提供有关我可以检查的内容和位置的任何建议。欢迎所有意见。 谢谢。
我已经尝试过以下方法:
import os
def write_file_partition(rows):
for row in rows:
with open(row.xml_full_name, "w") as f:
f.write(row.content)
extract_df.foreachPartition(write_file_partition)
write_file_partition(extract_df.collect())
结果:
在上面的代码中,
foreachPartition()
方法用于将函数应用于RDD/DataFrame的每个分区。
我定义了一个函数
write_file_partition()
,它采用 extract_df DataFrame 的分区并将内容写入文件。
然后您可以在 extract_df DataFrame 上调用
foreachPartition()
方法并将 write_file_partition()
函数作为参数传递。这会将函数并行应用于 DataFrame 的每个分区。