避免 Spark 数据框中的惰性求值

问题描述 投票:0回答:1

我有文件列表,我对它们执行以下操作

import org.apache.spark.sql.*;

for(File file :files){
    df = spark.read.csv(file)
    df = df.withColumn("last_update_date",from_unixtime(unix_timestamp(from_utc_timestamp(current_timestamp(), "PST"))))
    Thread.sleep(500)
}
Finaldf = Finaldf.union(df)

然后我会根据我添加的列进行重复数据删除。我开始知道,由于 Spark 的惰性评估,所有文件都具有相同的

last_update_date
值,最终我从最终的 DF 中删除了错误的重复项。

我已尝试以下方法,但有时仍然会遇到问题

  1. 使用
    df.cache()
  2. 在读取每个文件后将 df 加载到临时表中,并延迟一段时间。

如上所述,在尝试上述 2 个解决方案后,我发现该问题的频率降低了,但我想完全消除它。任何解决方案将不胜感激。

java dataframe apache-spark etl data-engineering
1个回答
0
投票

在 Spark 的惰性评估中,您使用的是

current_timestamp()
,在这种情况下,当最终触发操作时,所有行都会获得相同的时间戳值,这就是您面临问题的原因。 RealSkeptic 建议使用 int 值而不是 current_timestamp。

如果您想确保每个文件都有唯一的时间戳,那么:

import org.apache.spark.sql.*;

// Add python lib of time
import time

// assuming the files in array formate
Finaldf = spark.createDataFrame([], schema)

for (file in files){
    df = spark.read.csv(file)
    
    // Get current time in milliseconds at first using time lib
    current_time_in_millisec = int(time.mktime(Date().timetuple()) * 1000)

    // Your line of code
    //df = df.withColumn("last_update_date",from_unixtime(unix_timestamp(from_utc_timestamp(current_timestamp(), "PST"))))
    
    // New way of this: Here used lit() function to add static value at first then add the current timestamp
    df = df.withColumn("last_update_date", from_unixtime(lit(current_time_in_millisec / 1000).cast('long')))
    
    Finaldf = Finaldf.union(df)
    time.sleep(0.5)

    //Thread.sleep(500)
}
//Finaldf = Finaldf.union(df)

我评论了你的代码并并排编写了改进版本。

© www.soinside.com 2019 - 2024. All rights reserved.