AutoLoader 带有大量空镶木地板文件

问题描述 投票:0回答:2

我想使用 Databricks 中的 AutoLoader 处理一些镶木地板文件(使用快速压缩)。其中许多文件都是空的或仅包含一条记录。另外,我无法更改它们的创建方式,也无法压缩它们。

以下是我迄今为止尝试过的一些方法:

  1. 我在 Databricks 中创建了一个 python 笔记本,并尝试使用 AutoLoader 加载数据。当我为单个表/文件夹运行它时,我可以毫无问题地处理它。但是,当在其他表的 for 循环中调用该笔记本时(for active_tables_metadata: -> dbutils.notebook.run("process_raw", 0, item)),我只在目标中获得空文件夹。
  2. 我创建了一个 Databricks 工作流作业,并为每个表/文件夹调用相同的笔记本(通过参数发送表/文件夹的名称/路径)。 这样,每个表/文件夹都被处理了。
  3. 我使用 DBX 将 python 脚本打包到一个轮子中,并在 Databricks 工作流作业任务中使用它作为入口点。执行此操作时,我设法创建了与上面第 2 点相同的工作流程,但是我不是调用笔记本,而是调用 python 脚本(在任务的入口点中指定)。 不幸的是,这样我只能在目标中得到空文件夹。
  4. 将 DBX python 轮中使用的所有函数复制到 Databricks 中的 python 笔记本,并为一个表/文件夹运行该笔记本。 我在目标中只得到一个空文件夹。

我设置了以下 AutoLoader 配置:

  • “cloudFiles.tenantId”
  • “cloudFiles.clientId”
  • “cloudFiles.clientSecret”
  • “cloudFiles.resourceGroup”
  • “cloudFiles.subscriptionId”
  • “cloudFiles.format”:“镶木地板”
  • “pathGlobFilter”:“*.snappy”
  • “cloudFiles.useNotifications”:True
  • “cloudFiles.includeExistingFiles”:True
  • “cloudFiles.allowOverwrites”:True

我使用以下 readStream 配置:

spark.readStream.format("cloudFiles")
     .options(**CLOUDFILE_CONFIG)
     .option("cloudFiles.format", "parquet")
     .option("pathGlobFilter", "*.snappy")
     .option("recursiveFileLookup", True)
     .schema(schema)
     .option("locale", "de-DE")
     .option("dateFormat", "dd.MM.yyyy")
     .option("timestampFormat", "MM/dd/yyyy HH:mm:ss")
     .load(<path-to-source>)

以及以下 writeStream 配置:

df.writeStream.format("delta")
  .outputMode("append")
  .option("checkpointLocation", <path_to_checkpoint>)
  .queryName(<processed_table_name>)
  .partitionBy(<partition-key>)
  .option("mergeSchema", True)
  .trigger(once=True)
  .start(<path-to-target>)

我首选的解决方案是使用 DBX,但我不知道为什么作业会成功,我只在目标位置看到空文件夹。这是非常奇怪的行为,因为我认为 AutoLoader 一段时间后仅读取空文件超时!

附注当我使用 Parquet Spark Streaming 而不是 AutoLoader 时,也会发生同样的情况。

您知道发生这种情况的原因吗?我该如何克服这个问题?

azure-databricks autoload python-wheel dbx spark-notebook
2个回答
0
投票

您是否指定流读取的架构? (抱歉,还不能添加评论)


0
投票

如果您正在读取使用快速压缩的 parquet 写入的文件,则文件扩展名是“.snappy.parquet”。您可以尝试更改 pathGlobFilter 以匹配 *.snappy.parquet。我的第二个疑问是关于 cloudFiles.allowOverwrites": True ,您可以在没有此选项的情况下尝试一下。

© www.soinside.com 2019 - 2024. All rights reserved.