我有一个 Spark Dataframe,其中每一行都包含两个项目:文件名(带有扩展名,例如 .jpg)和文件的内容(以字节为单位)。 我想编写一个过程,获取数据帧的每一行并将字节转换为“.jpg”图像,同时将其存储到 ADLS 容器中。
一切都需要在 Databricks 集群内运行,因此我使用 pyspark 来创建 Dataframe,并且我想利用它来将这些文件截断到目标位置。
但是,当我使用 azure-storage 库在映射函数中使用它来写入这些文件时,我遇到了麻烦。就像下面的例子,函数
consume_row
使用库创建文件并写入内容,如下example:
results_rdd = rdd.map(lambda row: consume_row(row, ...))
它返回以下错误:
PicklingError:无法序列化对象:TypeError:无法pickle“_thread._local”对象
有人尝试过做类似的事情吗?
PySpark 使用 Python 的
pickle
模块进行序列化,某些对象(如 _thread._local
对象)无法进行 pickle。
为了实现您的目标,您需要安装您的 adls2 帐户并保存文件。
请按照以下步骤操作。
这里,我使用帐户密钥进行安装。您还可以使用
SAS
令牌或 OAuth
进行安装。
dbutils.fs.mount(
source = "wasbs://<container_name>@<account_name>.blob.core.windows.net",
mount_point = "/mnt/jadls2",
extra_configs = {"fs.azure.account.key.<account_name>.blob.core.windows.net":"<account_key>"})
接下来,使用下面的代码。
from io import BytesIO
from PIL import Image
sample_data = [
("image1.jpg", image1_content),
("image2.jpg", image2_content)
]
columns = ["filename", "content"]
sample_df = spark.createDataFrame(sample_data, columns)
def consume_row(row):
filename, content = row
image = Image.open(BytesIO(content))
image.save(f"/dbfs/mnt/jadls2/databricks/images/{filename}",format="JPEG")
sample_df.rdd.foreach(consume_row)
输出: