我是 java、spark 和 hdfs 的新手 这是我的用例: 我读取了一个日志文件,选择我感兴趣的事件,并使用 java Spark 将其作为镶木地板保存在 hdfs 中。 现在,如果重新发送相同的日志,选择我感兴趣的事件,那么如何确保我不会将这些重复的数据重新写回到hdfs中?如果只有一些日志消息是重复的而其他日志消息是新的怎么办?由于日志文件每 10 小时发送一次,无论是否滚动,都会有一些新数据和预先存在的数据。
在将数据帧保存到 hdfs 之前,您需要与所有现有文件进行比较以检查重复情况。
如果我要实现这一点,我将执行以下步骤:
将日志数据帧保存到hdfs后,我将直接重命名它(不可能为hdfs文件选择名称,因为spark使用hadoop文件格式,这需要对数据进行分区)。文件名将包含一个 id,例如内容的校验和
file_logs_120EA8A25E5D487BF68B5F7096440019.parquet
对于校验和计算,您会发现很多可用的库。
在保存日志数据帧之前,我将从目标目录中的文件中获取所有校验和,并检查当前的校验和是否存在于该列表中。如果它不存在,那么我将保存它,当然还会重命名它(如前面提到的)。
scala 中的伪代码:
def saveLogFileToHdfs(source): Unit = {
//create the dataframe from source
val df = createDFLogsFromSource(source)
//get the checksum of my dataframe content. It'll be used for comparaison later
val dfSourceChecksum = getChecksum(df)
//save the dataframe into hdfs
if(checkDuplication(dfSourceChecksum, destinationPath))
saveAndRenameDF(df, dfSourceChecksum)
}
def checkDuplication(checksum: String, path: Path): boolean = {
//we'll get a list of checksums from the path
val checksumList : List[String] = getListOfChecksums(path)
checksumList.contains(checksum)
}
def getListOfChecksums(path: Path) : List[String] = {
//loop over the directoty, use split in order to get the cheksum and yield to a list the results
}
def saveAndRenameDF(df: DataFrame, dfSourceChecksum: String): Unit = {
//perform the save and rename
import org.apache.hadoop.fs._
val fs = FileSystem.get(sc.hadoopConfiguration)
fs.rename(new Path("path"), new Path("path_checksum.parquet"))
}
还可以使用另一种方法,即从目标目录读取所有文件,创建数据帧并将数据帧与所有文件进行比较。
为此,您可以使用this
if (firstDataFrame.exceptAll(secondDataFrame).head(1).isEmpty) {
//The two DF are equals, process here
} else {
//There is difference
}
希望这有帮助。