我目前正在探索由databricks开源的三角洲湖泊。我正在读取kafka数据,并使用delta lake格式将其写入流中。 Delta Lake在从kafka进行流式写入期间创建了许多文件,这让我感到很高兴hdfs文件系统。
我已经尝试按照以下步骤将多个文件压缩为单个文件。
val spark = SparkSession.builder .master("local") .appName("spark session example") .getOrCreate() val df = spark.read.parquet("deltalakefile/data/") df.repartition(1).write.format("delta").mode("overwrite").save("deltalakefile/data/") df.show() spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled","false") DeltaTable.forPath("deltalakefile/data/").vacuum(1)
但是当我检查输出时,它正在创建新文件而不是删除任何现有文件。
是否有办法实现这一目标。另外,保留期限的关系是什么?使用时如何在HDFS中配置它?当我想以三角洲湖泊格式构建原始/青铜层并且要长期保留所有数据(在建筑物中使用年数/在云中无限时间)时,应保留的配置是什么?
我目前正在探索由databricks开源的三角洲湖泊。我正在读取kafka数据,并使用delta lake格式将其写入流中。 Delta Lake在流式写入过程中创建了许多文件...
根据设计,Delta不会立即删除文件以防止影响活动的使用者。它还提供了版本控制(又名时间旅行),因此您可以在必要时查看历史记录。要删除以前的版本或未提交的文件,您需要运行vacuum。