如何删除Spark结构化流创建的旧数据?

问题描述 投票:0回答:2

如何删除由Spark结构化流(Spark 2.4.5)创建的旧数据?

我有实木复合地板/ avro格式(非Delta)的HDFS数据,该数据是由火花结构化流创建并按时间(年,月,月,日,小时)进行分区的。

数据创建如下:

query = df.writeStream.format("avro").partitionBy("year", "month", "day", "hour").outputMode("append").option("checkpointLocation", "/data/avro.cp").start("/data/avro")

因此,我具有以下分区文件夹布局:

./year=2020/month=3/day=13/hour=12
./year=2020/month=3/day=13/hour=13
./year=2020/month=3/day=13/hour=14
./year=2020/month=3/day=13/hour=15
./year=2020/month=3/day=13/hour=16

如何删除旧数据,例如早于year = 2020,month = 2,day = 13,hour = 14?

仅删除相关文件夹

./year=2020/month=3/day=13/hour=12
./year=2020/month=3/day=13/hour=13

从文件系统读取批处理数据帧时引发异常:

df = spark.read.format("avro").load("/data/avro")
java.io.FileNotFoundException: File file:/data/avro/year=2020/month=3/day=13/hour=12/part-00000-0cc84e65-3f49-4686-85e3-1ecf48952794.c000.avro does not exist

正如我已经发现的那样,它与检查点使用的_spark_metadata文件夹有关。

感谢您的帮助。

apache-spark apache-spark-sql spark-structured-streaming apache-spark-2.0
2个回答
0
投票

除非您也删除了相应的检查点文件夹,否则无法删除该文件夹。您试图在检查点仍然了解该文件夹的同时删除该文件夹,因此这就是发生错误的原因。

但是,除非有必要,我真的不建议您将检查点文件夹弄乱。如果您有可能,我建议您将旧数据移至其他数据存储类型,例如AWS Standard-> Glacier。


0
投票

似乎我找到了解决方案/解决方法。关键概念是使用FileStreamSinkLog,然后将其与SinkFileStatus合并,并将操作设置为delete

  1. 加载FileStreamSinkLog

    sinkLog = new FileStreamSinkLog(1, spark, full-path-to-spark-metadata-dir);
    
  2. 获取最新的SinkFileStatus

    Option<Tuple2<Object, SinkFileStatus[]>> latest = sinkLog.getLatest();
    long batchId = (long)latest.get()._1;
    SinkFileStatus[] fileStatuses = latest.get()._2;
    
  3. 删除旧文件

  4. 将具有delete操作的新条目添加到fileStatuses数组

  5. 用更新的batchId回写fileStatuses日志文件

© www.soinside.com 2019 - 2024. All rights reserved.