有没有办法将图像的内容(存储在spark Dataframe中)与pyspark并行写入文件?

问题描述 投票:0回答:1

我有一个 Spark Dataframe,其中每一行都包含两个项目:文件名(带有扩展名,例如 .jpg)和文件的内容(以字节为单位)。 我想编写一个过程,获取数据帧的每一行并将字节转换为“.jpg”图像,同时将其存储到 ADLS 容器中。

一切都需要在 Databricks 集群内运行,因此我使用 pyspark 来创建 Dataframe,并且我想利用它来将这些文件截断到目标位置。

但是,当我使用 azure-storage 库在映射函数中使用它来写入这些文件时,我遇到了麻烦。就像下面的例子,函数

consume_row
使用库创建文件并写入内容,如下example:

results_rdd = rdd.map(lambda row: consume_row(row, ...))

它返回以下错误:

PicklingError:无法序列化对象:TypeError:无法pickle“_thread._local”对象

有人尝试过做类似的事情吗?

azure pyspark serialization databricks azure-data-lake-gen2
1个回答
0
投票

PySpark 使用 Python 的

pickle
模块进行序列化,某些对象(如
_thread._local
对象)无法进行 pickle。

为了实现您的目标,您需要安装您的 adls2 帐户并保存文件。

请按照以下步骤操作。

这里,我使用帐户密钥进行安装。您还可以使用

SAS
令牌或
OAuth
进行安装。

dbutils.fs.mount(
  source = "wasbs://<container_name>@<account_name>.blob.core.windows.net",
  mount_point = "/mnt/jadls2",
  extra_configs = {"fs.azure.account.key.<account_name>.blob.core.windows.net":"<account_key>"})

接下来,使用下面的代码。

from io import BytesIO
from PIL import Image

sample_data = [
    ("image1.jpg", image1_content),
    ("image2.jpg", image2_content)
]

columns = ["filename", "content"]
sample_df = spark.createDataFrame(sample_data, columns)

def consume_row(row):
    filename, content = row
    image = Image.open(BytesIO(content))
    image.save(f"/dbfs/mnt/jadls2/databricks/images/{filename}",format="JPEG")

sample_df.rdd.foreach(consume_row)

输出:

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.