如何使用java Spark不将重复数据(parquet)写入hdfs

问题描述 投票:0回答:1

我是 java、spark 和 hdfs 的新手 这是我的用例: 我读取了一个日志文件,选择我感兴趣的事件,并使用 java Spark 将其作为镶木地板保存在 hdfs 中。 现在,如果重新发送相同的日志,选择我感兴趣的事件,那么如何确保我不会将这些重复的数据重新写回到hdfs中?如果只有一些日志消息是重复的而其他日志消息是新的怎么办?由于日志文件每 10 小时发送一次,无论是否滚动,都会有一些新数据和预先存在的数据。

java apache-spark parquet
1个回答
0
投票

在将数据帧保存到 hdfs 之前,您需要与所有现有文件进行比较以检查重复情况。

如果我要实现这一点,我将执行以下步骤:

将日志数据帧保存到hdfs后,我将直接重命名它(不可能为hdfs文件选择名称,因为spark使用hadoop文件格式,这需要对数据进行分区)。文件名将包含一个 id,例如内容的校验和

file_logs_120EA8A25E5D487BF68B5F7096440019.parquet

对于校验和计算,您会发现很多可用的库。

在保存日志数据帧之前,我将从目标目录中的文件中获取所有校验和,并检查当前的校验和是否存在于该列表中。如果它不存在,那么我将保存它,当然还会重命名它(如前面提到的)。

scala 中的伪代码:

def saveLogFileToHdfs(source): Unit = {
//create the dataframe from source
val df = createDFLogsFromSource(source)
//get the checksum of my dataframe content. It'll be used for comparaison later
val dfSourceChecksum = getChecksum(df)
//save the dataframe into hdfs
if(checkDuplication(dfSourceChecksum, destinationPath))
saveAndRenameDF(df, dfSourceChecksum)
} 


def checkDuplication(checksum: String, path: Path): boolean = {
//we'll get a list of checksums from the path
val checksumList : List[String] = getListOfChecksums(path)
checksumList.contains(checksum)
}

def getListOfChecksums(path: Path) : List[String] = {
//loop over the directoty, use split in order to get the cheksum and yield to a list the results
}

def saveAndRenameDF(df: DataFrame, dfSourceChecksum: String): Unit = {
//perform the save and rename
import org.apache.hadoop.fs._
val fs = FileSystem.get(sc.hadoopConfiguration)
fs.rename(new Path("path"), new Path("path_checksum.parquet"))
}

还可以使用另一种方法,即从目标目录读取所有文件,创建数据帧并将数据帧与所有文件进行比较。

为此,您可以使用this

if (firstDataFrame.exceptAll(secondDataFrame).head(1).isEmpty) {
//The two DF are equals, process here
} else {
//There is difference
}

希望这有帮助。

© www.soinside.com 2019 - 2024. All rights reserved.