无法从 Databricks 笔记本在 ADLS 中执行并行写入操作。 OSError:[Errno 107] 传输端点未连接

问题描述 投票:0回答:1

在数据帧上的

foreach()
中执行写入操作时,我遇到此错误。这段代码在 3 个多月内运行良好,但从上周开始出现故障。

为了提供一些上下文,我有一个数据框 extract_df,其中包含 2 列 xml_full_name 和内容。我使用下面的代码将这些记录作为 xml 写入 ADLS 中的目标文件夹。

extract_df.foreach(write_file)

write_file
定义为:

def write_file(row):
    with open(row.extract_path, "wb") as f:
        f.write(row.content)

笔记本还使用

spark.write
命令编写一些镶木地板,效果很好。

经过进一步调查,我发现这个问题可能与并行性有关,作为解决方法,我尝试了以下方法:

for row in extract_df.collect():
    with open(row.extract_path, "wb") as f:
        f.write(row.content)

这有效,这意味着连接正常,但并行性不起作用。但是,这不能用作修复方法,因为它会降低性能。

这里有人遇到过这个问题吗?如果这与某些配置有关,请提供有关我可以检查的内容和位置的任何建议。欢迎所有意见。 谢谢。

python pyspark databricks azure-databricks azure-data-lake-gen2
1个回答
0
投票

我已经尝试过以下方法:

import os

def write_file_partition(rows):
    for row in rows:
        with open(row.xml_full_name, "w") as f:
            f.write(row.content)
extract_df.foreachPartition(write_file_partition)
write_file_partition(extract_df.collect())

结果:

Enter image description here

在上面的代码中,

foreachPartition()
方法用于将函数应用于RDD/DataFrame的每个分区。

我定义了一个函数

write_file_partition()
,它采用 extract_df DataFrame 的分区并将内容写入文件。

然后您可以在 extract_df DataFrame 上调用

foreachPartition()
方法并将
write_file_partition()
函数作为参数传递。这会将函数并行应用于 DataFrame 的每个分区。

© www.soinside.com 2019 - 2024. All rights reserved.