我正在运行下面的Spark代码(基本上是作为MVE创建的),它执行以下操作:
我正在努力理解为什么我在joined
数据框中得到不同数量的行,即每次运行应用程序后,在第3阶段之后的数据框中。为什么会这样?
我认为不应该发生的原因是limit
是确定性的,因此每次相同的行应位于分区数据帧中,尽管顺序不同。在联接中,我将联接完成分区的字段。我期望分区中具有成对的每种组合,但是我认为每次都应等于相同的数量。
def main(args: Array[String]) {
val maxRows = args(0)
val spark = SparkSession.builder.getOrCreate()
val windowSpec = Window.partitionBy("epoch_1min").orderBy("epoch")
val data = spark.read.parquet("srcfile.parquet").limit(maxRows.toInt)
val partitionDf = data.withColumn("row", row_number().over(windowSpec))
partitionDf.persist(StorageLevel.MEMORY_ONLY)
logger.debug(s"${partitionDf.count()} rows in partitioned data")
val dfOrig = partitionDf.withColumnRenamed("epoch_1min", "epoch_1min_orig").withColumnRenamed("row", "row_orig")
val dfDest = partitionDf.withColumnRenamed("epoch_1min", "epoch_1min_dest").withColumnRenamed("row", "row_dest")
val joined = dfOrig.join(dfDest, dfOrig("epoch_1min_orig") === dfDest("epoch_1min_dest"), "inner")
logger.debug(s"Rows in joined dataframe ${joined.count()}")
val filtered = joined.filter(col("row_orig") < col("row_dest"))
logger.debug(s"Rows in filtered dataframe ${filtered.count()}")
}