我有另一种解决方案,但我更喜欢使用PySpark 2.3做到这一点。
我有这样的二维PySpark数据帧:
Date | ID
---------- | ----
08/31/2018 | 10
09/31/2018 | 10
09/01/2018 | null
09/01/2018 | null
09/01/2018 | 12
我想通过寻找过去的最近更换ID
空值,或者如果该值为空,由期待(如果是再次空,设置默认值)
我想象添加具有.withColumn
新列,并使用UDF功能,将查询到的数据帧本身。
类似的东西在伪代码(不是完美的,但它是主要的想法):
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
def return_value(value,date):
if value is not null:
return val
value1 = df.filter(df['date']<= date).select(df['value']).collect()
if (value1)[0][0] is not null:
return (value1)[0][0]
value2 = df.filter(tdf['date']>= date).select(df['value']).collect()
return (value2)[0][0]
value_udf = udf(return_value,StringType())
new_df = tr.withColumn("new_value", value_udf(df.value,df.date))
但是,这是行不通的。我是完全在错误的方式做到这一点?难道仅仅是可以查询的UDF功能火花数据帧?我错过了一个更简单的解决方案?
创建有一列新的数据框 - 所有日期的唯一列表:
datesDF = yourDF.select('Date').distinct()
再创建一个将包括日期和编号的,但唯一的不存在空值。而且还可以让只保留第一(无论将是第一个),每个日期ID的出现(从你的例子来看,你可以为每个日期多行)
noNullsDF = yourDF.dropna().dropDuplicates(subset='Date')
现在,让我们加入这两个让我们有什么价值,我们有这方面(或空)的所有日期列表
joinedDF = datesDF.join(noNullsDF, 'Date', 'left')
现在对每一个日期获取ID从以前的日期和未来日期值使用窗口功能,也可以让我们重命名ID列左右后会出现的问题与加入少:
from pyspark.sql.window import Window
from pyspark.sql import functions as f
w = Window.orderBy('Date')
joinedDF = joinedDF.withColumn('previousID',f.lag('ID').over(w))
.withColumn('nextID',f.lead('ID').over(w))
.withColumnRenamed('ID','newID')
现在,让我们一起回我们的日期数据框原创
yourDF = yourDF.join(joinedDF, 'Date', 'left')
现在,我们的数据帧有4个ID列:
现在,我们需要将它们组合成finalID依次是:
我们这样做是通过凝聚简直是:
default = 0
finalDF = yourDF.select('Date',
'ID',
f.coalesce('ID',
'newID',
'previousID',
'nextID',
f.lit(default)).alias('finalID')
)