使用 pyspark dataframe 将所有数据收集到具有相应行的列中

问题描述 投票:0回答:1

我有一个数据框,其中包含如下屏幕截图所示的列。我想添加一个附加列“all_data”,它将保存其中列的所有数据。

这是我尝试过的

from pyspark.sql.functions import collect_list, udf
from pyspark.sql.types import ArrayType, StringType

def read_file_content(file_path):
    content = spark.read.json(file_path).rdd.map(lambda x: x[0]).collect()
    return content

read_file_content_udf = udf(read_file_content, ArrayType(StringType()))

file_with_all_data = daftrame.withColumn("all_data", read_file_content_udf("file_name_input"))

如何使用上述方法我得到错误

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 63.0 failed 4 times, most recent failure: Lost task 0.3 in stage 63.0 (TID 5275) (10.99.0.10 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/root/.ipykernel/2377/command-3710246798592077-2084292290", line 12, in read_and_collect_data
  File "/databricks/python/lib/python3.10/site-packages/IPython/core/interactiveshell.py", line 284, in _modified_open
    return io_open(file, *args, **kwargs)
FileNotFoundError: [Errno 2] No such file or directory: 'abfss://soruce@storage_abs.dfs.core.windows.net/bite/searc/2024/03/asdaf-adase2-47217e-31-0150bda34e47_20240308_09-19-35.json'

而文件可用,我可以在单独的数据框中读取

因此最终的数据框看起来像所有列以及附加列“all_data”,其中单独保存行中每个文件的数据。

列名“file_name_input”的文件位置基本上类似于“abfss://soruce@storage_abs.dfs.core.windows.net/bite/searc/2024/03/asdaf-adase2-47217e-31-0150bda34e47_20240308_09-19 -35.json" 同样,“file_name_input”列中还有 196 个其他文件名和位置。

是否可以单独读取所有文件并将数据分别存储在附加列“all_data”中

python pyspark databricks azure-databricks
1个回答
0
投票

您得到的错误是文件未找到错误,这是因为您正在 python 上下文中使用

abfss
协议读取路径,但实际上您的代码给出了不同的错误,如下所示。

PicklingError: Could not serialize object: PySparkRuntimeError: [CONTEXT_ONLY_VALID_ON_DRIVER] 您似乎正在尝试从广播变量、操作或转换引用 SparkContext。 SparkContext 只能在驱动程序上使用,不能在工作程序上运行的代码中使用。有关更多信息,请参阅 SPARK-5063。

这意味着您无法在 udf 内(即驱动程序节点中)引用 Spark 上下文。

因此,您可以按照以下方法进行操作。

通过将存储帐户安装到 databricks,您可以使用 pandas 或使用

open
函数读取它并获取数据。

要安装的代码

dbutils.fs.mount(
  source = "wasbs://[email protected]",
  mount_point = "/mnt/jadls2",
  extra_configs = {"fs.azure.account.key.jadls2.blob.core.windows.net":"z9XY9xxxxyyyy=="})

在这里,我安装到

/mnt/jadls2/
并使用带有帐户密钥的
wasbs
协议, 我建议您使用 SAS 令牌或服务主体进行安装。了解更多相关信息 这里

接下来,更改您的 UDF 函数,如下所示。

from pyspark.sql.functions import collect_list, udf,lit
from pyspark.sql.types import ArrayType, StringType
import pandas as pd

def read_file_content(file_path):
    mnt_path = file_path.replace("abfss://[email protected]","/dbfs/mnt/jadls2")
    content = list(pd.read_json(mnt_path).columns)
    return content

read_file_content_udf = udf(read_file_content, ArrayType(StringType()))

在这里,我将

abfss
路径替换为挂载路径,因为我们已挂载到容器,您需要替换,直到仅剩下的路径与文件相同。

我正在返回列名,您只需根据您的要求返回即可。

注意:确保在替换函数中给出正确的

abfss
路径,并在安装路径前加上
/dbfs/
前缀。

输出:

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.