PySpark UDF功能与数据帧查询?

问题描述 投票:0回答:1

我有另一种解决方案,但我更喜欢使用PySpark 2.3做到这一点。

我有这样的二维PySpark数据帧:

Date       | ID
---------- | ----
08/31/2018 | 10
09/31/2018 | 10
09/01/2018 | null
09/01/2018 | null
09/01/2018 | 12

我想通过寻找过去的最近更换ID空值,或者如果该值为空,由期待(如果是再次空,设置默认值)

我想象添加具有.withColumn新列,并使用UDF功能,将查询到的数据帧本身。

类似的东西在伪代码(不是完美的,但它是主要的想法):

from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

def return_value(value,date):

    if value is not null:
        return val

    value1 = df.filter(df['date']<= date).select(df['value']).collect()

    if (value1)[0][0] is not null:
        return (value1)[0][0]

    value2 = df.filter(tdf['date']>= date).select(df['value']).collect()
        return (value2)[0][0]


value_udf = udf(return_value,StringType())
new_df = tr.withColumn("new_value", value_udf(df.value,df.date))

但是,这是行不通的。我是完全在错误的方式做到这一点?难道仅仅是可以查询的UDF功能火花数据帧?我错过了一个更简单的解决方案?

pyspark pyspark-sql
1个回答
-1
投票

创建有一列新的数据框 - 所有日期的唯一列表:

datesDF = yourDF.select('Date').distinct()

再创建一个将包括日期和编号的,但唯一的不存在空值。而且还可以让只保留第一(无论将是第一个),每个日期ID的出现(从你的例子来看,你可以为每个日期多行)

noNullsDF = yourDF.dropna().dropDuplicates(subset='Date')

现在,让我们加入这两个让我们有什么价值,我们有这方面(或空)的所有日期列表

joinedDF = datesDF.join(noNullsDF, 'Date', 'left')

现在对每一个日期获取ID从以前的日期和未来日期值使用窗口功能,也可以让我们重命名ID列左右后会出现的问题与加入少:

from pyspark.sql.window import Window
from pyspark.sql import functions as f
w = Window.orderBy('Date')

joinedDF = joinedDF.withColumn('previousID',f.lag('ID').over(w)) 
                   .withColumn('nextID',f.lead('ID').over(w))
                   .withColumnRenamed('ID','newID') 

现在,让我们一起回我们的日期数据框原创

yourDF = yourDF.join(joinedDF, 'Date', 'left')

现在,我们的数据帧有4个ID列:

  1. 原始ID
  2. NEWID - 给定的日期,如果任何或null任何非空值的ID
  3. 以前 - ID从以前的日期(不为空,如果任何或空)
  4. nextID - ID从下一个日期(非空,如果任何或空)

现在,我们需要将它们组合成finalID依次是:

  1. 原来的值,如果不为空
  2. 对于当前的日期值,如果存在的任何非空值(这是与你的问题相反,但你大熊猫代码建议你去<=上日期检查),如果结果不为空
  3. 对于以前的日期,如果它不是空值
  4. 因为如果它不是空下一个日期值
  5. 一些默认值

我们这样做是通过凝聚简直是:

default = 0
finalDF = yourDF.select('Date', 
                        'ID',
                        f.coalesce('ID',
                                   'newID',
                                   'previousID',
                                   'nextID',
                                   f.lit(default)).alias('finalID')
                       )
© www.soinside.com 2019 - 2024. All rights reserved.