如何正确管理从Spark Streaming生成的分区镶木地板文件

问题描述 投票:3回答:2

我的Spark结构化流媒体作业不断生成镶木地板文件,我想在到期后删除(比方说30天后)。

我将使用分区键分区的镶木地板数据作为事件日期存储在RFC3339 / ISO8601中,这样可以在HDFS级别上基于cron作业轻松完成内务管理(删除所有镶嵌文件夹,其中partitionkey <oldestAllowedAge在字符串比较方面) 。

但是,自从我介绍Spark Streaming以来,Spark将元数据写入要写入数据本身旁边的名为_spark_metadata的文件夹中。如果我现在只删除过期的HDFS文件并在整个数据集上运行spark批处理作业,则由于找不到文件,作业将失败。 batchjob将读取元数据并期望已存在已删除的文件。

解决这个问题的简单方法是禁用_spark_metadata目录的创建,如下所述:disabling _spark_metadata in Structured streaming in spark 2.3.0。但是,由于我不想在阅读常规批量分析的数据时失去性能,我想知道是否没有更好的解决方案。

我想,我可以只使用spark进行删除,以便删除镶木地板hdfs文件并更新元数据。但是,只需执行一个

session.sql(String.format("DELETE FROM parquet.`%s` WHERE partitionKey < " + oldestAllowedPartitionAge, path.toString()));

不起作用。遗憾的是,DELETE在Spark中是一个不受支持的操作......

是否有任何解决方案,以便我可以删除旧数据,但仍然有_spark_metadata文件夹工作?

apache-spark spark-streaming spark-structured-streaming
2个回答
0
投票

这实际上是结构化流(SPARK-24295)中的已知问题之一,尽管它只发生在大量输入文件中,而最终用户正在采用自己的解决方法。例如,停止查询 - >删除旧的输入文件 - >手动操作元数据以清除它们 - >重新启动查询。

鉴于手动操作元数据并非易事且不理想(假设它应该停止流式查询,并强制最终用户理解元数据的格式),建议将SPARK-27188作为替代方案 - 它应用保留并从元数据中清除过时的输入文件。


2
投票

据我所知,_spark_metadata的主要目的是确保容错并避免列出要处理的所有文件:

为了在保持一次语义的同时正确处理部分失败,每个批处理的文件将写入唯一目录,然后以原子方式附加到元数据日志中。当基于镶木地板的DataSource被初始化以供阅读时,我们首先检查该日志目录并在存在时使用它而不是文件列表。

https://github.com/apache/spark/commit/6bc4be64f86afcb38e4444c80c9400b7b6b745de

您引用的链接(disabling _spark_metadata in Structured streaming in spark 2.3.0)解释了问题来自不一致的检查点状态 - 检查点生成的元数据,但后来用户手动删除了它,当他重新启动查询时,它失败了,因为检查点需要有元数据文件。

要查看缺少元数据是否会使批处理失败,请查看org.apache.spark.sql.execution.datasources.DataSource #defolutionRelation方法,您可以在其中找到与2种情况匹配的模式:

  // We are reading from the results of a streaming query. Load files from the metadata log
  // instead of listing them using HDFS APIs.
  case (format: FileFormat, _)
      if FileStreamSink.hasMetadata(
        caseInsensitiveOptions.get("path").toSeq ++ paths,
        sparkSession.sessionState.newHadoopConf()) =>
  case (format: FileFormat, _) =>
    val globbedPaths =
      checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist)

hasMetadata方法看起来像:

  def hasMetadata(path: Seq[String], hadoopConf: Configuration): Boolean = {
    path match {
      case Seq(singlePath) =>
        try {
          val hdfsPath = new Path(singlePath)
          val fs = hdfsPath.getFileSystem(hadoopConf)
          if (fs.isDirectory(hdfsPath)) {
            fs.exists(new Path(hdfsPath, metadataDir))
          } else {
            false
          }
        } catch {
          case NonFatal(e) =>
            logWarning(s"Error while looking for metadata directory.")
            false
        }
      case _ => false
    }
  }

如您所见,没有失败的风险(至少通过阅读代码!)。如果您有一些,请提供更多背景信息,因为问题可能在其他地方。

关于你的性能问题,这个_spark_metadata只包含文件列表,所以当然,Spark首先需要列出你输入目录中的文件。但根据我的经验,这不是最昂贵的操作。例如,在AWS S3上列出包含1297个文件的目录大约需要9秒。在此之后,由您决定是否要进行简单的清洁过程或稍微减慢批处理。如果你有更多这样的文件,也许你也应该把它们分成更大的文件,比如256 MB或更多?

尽管如此,如果你想保留_spark_metadata,也许有一种方法可以通过清洁应用程序删除文件。但这将是一个挑战,因为你将有2个应用程序(流媒体和清洁)处理相同的数据。

你可以在这里找到更多关于_spark_metadata的信息:How to change the location of _spark_metadata directory?

© www.soinside.com 2019 - 2024. All rights reserved.