我在Databricks中编写了一个spark结构化流。第一段代码是检查我的实体是否存在增量表。如果没有,则创建增量表。在这里,我想使用推断模式选项来获取增量表的模式。
# Check if the Delta table exists, if not create it
if not DeltaTable.isDeltaTable(spark, sink_path):
# Read the Parquet data and infer the schema
parquet_data = spark.read.option("inferSchema", "true").parquet(source_path)
# Create a Delta table with the inferred schema
#### does not create transaction log
parquet_data.write.format("delta").mode("overwrite").save(sink_path)
print('delta table created')
我收到此错误。源文件有大约 50 条记录。为什么模式推断不起作用?
广告商批量流失败:调用 o671.load 时出错。 :com.databricks.sql.cloudfiles.errors.CloudFilesException:当输入路径
dbfs:/mnt/raw/Entity1/fileA.parquet
为空时无法推断架构。请尝试在输入路径中有文件时启动流,或指定模式。
我已经尝试过你的增量创建代码,它工作正常, 该错误与您的流式查询有关。
我相信您的流式查询类似于下面的代码。
d1f = (
spark.readStream
.option("mode", "permissive")
.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.option("cloudFiles.schemaLocation","/checkpoint_directory2/")
.option("cloudFiles.inferColumnTypes","true")
.load("dbfs:/FileStore/tables/user_reviews.parquet")
)
它给出了与您相同的错误。
因为你给出的路径是文件路径,而不是你应该给出包含
parquet
文件的目录路径。
例如:
dbfs:/FileStore/tables/
在你的情况下是
dbfs:/mnt/raw/Entity1/
输出: