Azure databricks 自动加载器 Spark Streaming 无法读取输入文件

问题描述 投票:0回答:1

我已经使用自动加载器功能设置了流作业,输入位于 azure adls gen2,采用 parquet 格式。下面是代码。

df = spark.readStream.format("cloudFiles")\
          .options(**cloudfile)\
          .schema(schema).load(staging_path)
df.writeStream\
    .trigger(processingTime="10 minutes"))\
    .outputMode("append")\
    .option("checkpointLocation", checkpoint_path)\
    .foreachBatch(writeBatchToADXandDelta)\
    .start()

此代码抛出如下错误

py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 11.0 failed 4 times, most recent failure: Lost task 5.3 in stage 11.0 (TID 115) (172.20.58.133 executor 1): com.databricks.sql.io.FileReadException: Error while reading file /mnt/adl2/environment=production/data_lake=main/tier=ingress/area=transient/domain=iotdata/entity=screens/topic=sensor/vendor=abc/source_system=iot_hub/parent=external/dataset=screens/kind=data/evolution=2/file_format=parquet/source=kevitsa/ingestion_date=2022/08/03/13/-136567710_c96a862c2aaf43cfbd62025cd3db4a48_1.parquet.
..
Caused by: java.lang.AssertionError: assertion failed
    at scala.Predef$.assert(Predef.scala:208)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$2.apply(ParquetFileFormat.scala:397)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$2.apply(ParquetFileFormat.scala:373)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:333)
    ... 18 more

这可能是什么原因。

提前致谢!

bigdata databricks azure-databricks spark-structured-streaming databricks-autoloader
1个回答
0
投票

从错误消息来看,您所在的位置似乎有一个损坏的文件。您可以使用

ignoreCorruptFiles
选项 (doc) 跳过损坏的文件而不是失败。

© www.soinside.com 2019 - 2024. All rights reserved.