在 Azure Blob 存储中写入 Parquet:“请求输入之一无效”

问题描述 投票:0回答:2

我正在尝试将

parquet
格式的简单 DataFrame 写入 Azure Blob Storage。 请注意,以下代码片段在本地工作,因此我的猜测是它必须与 Azure 库相关。我也尝试过使用
delta
格式并且它有效(即使它在引擎盖下使用
parquet
)。

使用 Spark 3.1.1、Scala 2.12.10、OpenJDK 1.8.0_292。

我像往常一样设置 Spark 会话,如下所示:

$SPARK_HOME/bin/spark-shell \
  (...cluster settings...) \
  --conf spark.hadoop.fs.azure.account.key.<account>.blob.core.windows.net="${AZURE_BLOB_STORAGE_KEY}" \
  --conf spark.hadoop.fs.AbstractFileSystem.wasb.impl=org.apache.hadoop.fs.azure.Wasb \
  --conf spark.hadoop.fs.wasb.impl=org.apache.hadoop.fs.azure.NativeAzureFileSystem \
  --conf spark.delta.logStore.class=org.apache.spark.sql.delta.storage.AzureLogStore \
  --packages org.apache.hadoop:hadoop-azure:2.7.0,com.azure:azure-storage-blob:12.8.0,com.azure:azure-storage-common:12.8.0,com.microsoft.azure:azure-storage:2.0.0,io.delta:delta-core_2.12:0.8.0
  (...other irrelevant settings...)

我尝试了

azure-storage-blob
azure-storage-common
azure-storage
包的其他版本,都导致了同样的问题。

为了重现问题,我创建了一个简单的数据帧并将其写入存储:

val columns = Seq("language", "users_count")
val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000"))
val rdd = spark.sparkContext.parallelize(data)
val df = spark.createDataFrame(rdd).toDF(columns: _*)
df.show
// +--------+-----------+                                                                                                                                  
// |language|users_count|
// +--------+-----------+
// |    Java|      20000|
// |  Python|     100000|
// |   Scala|       3000|
// +--------+-----------+

df.write.parquet("wasb://<container>@<account>.blob.core.windows.net/<path>")

在镶木地板格式上书写时,我遇到

com.microsoft.azure.storage.StorageException: One of the request inputs is not valid
异常:

21/09/21 13:38:14 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 83) (10.244.6.3 executor 6): org.apache.spark.SparkException: Task failed while writing rows.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:296)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.fs.azure.AzureException: com.microsoft.azure.storage.StorageException: One of the request inputs is not valid.
        at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2482)
        at org.apache.hadoop.fs.azure.NativeAzureFileSystem$FolderRenamePending.execute(NativeAzureFileSystem.java:424)
        at org.apache.hadoop.fs.azure.NativeAzureFileSystem.rename(NativeAzureFileSystem.java:1997)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:531)
        at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:502)
        at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)
        at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:77)
        at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:260)
        at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:79)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:280)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:286)
        ... 9 more
Caused by: com.microsoft.azure.storage.StorageException: One of the request inputs is not valid.
        at com.microsoft.azure.storage.StorageException.translateException(StorageException.java:162)
        at com.microsoft.azure.storage.core.StorageRequest.materializeException(StorageRequest.java:307)
        at com.microsoft.azure.storage.core.ExecutionEngine.executeWithRetry(ExecutionEngine.java:177)
        at com.microsoft.azure.storage.blob.CloudBlob.startCopyFromBlob(CloudBlob.java:764)
        at org.apache.hadoop.fs.azure.StorageInterfaceImpl$CloudBlobWrapperImpl.startCopyFromBlob(StorageInterfaceImpl.java:399)
        at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2449)
        ... 20 more

关于造成这种情况的原因或如何使其发挥作用有任何提示或想法吗? 谢谢!

scala apache-spark hadoop azure-blob-storage parquet
2个回答
0
投票

有两件事帮助我使用 WASB 协议写入 Azure 存储:

  1. 存储容器应该是Data Lake Gen1(尝试过Gen2但失败)

  2. 您需要添加以下依赖/jar:

    org.codehaus.jackson 杰克逊映射器 LGPL 1.9.13

我不得不使用WASB(因为我的hadoop版本(2.9.2)不支持ABFS),但如果你有hadoop-2.10.1+,请使用ABFS。


0
投票

只需将 wasb 更改为 abfss 并将 blob 更改为 dfs 即可。 示例代码: wasb://@.blob.core.windows.net/ 到 abfss://@.dfs.core.windows.net

这对我有用。

© www.soinside.com 2019 - 2024. All rights reserved.