我想根据pyspark中的窗口函数填充给定代码的时间戳

问题描述 投票:0回答:1

我有一个数据集,其输出如所附图片所示,

我想创建 3 个名为 start_time_1、start_time_2、start_time_3 的新列,以便我可以根据 start_timestamps 更新 coded、code、code 中每个代码的第一个时间戳。

附图中,第一组id=1,所以M、8、6开始的时间戳是2023-05-06。

所以,每次code1、code2、code3中出现M、8或6时,都应该用这个时间戳来填充。 A 在 2023-08-13 来得有点晚,所以 A 的时间戳将是 2023-08-13。

同样,在组 id=2 上,D 在 2023-06-07 出现在第一位,因此每次 D 出现在任何 code1、code2、code3 上。时间戳将为 2023-06-07。

如何在 pyspark 上实现这一目标?另请注意,尽管在所示示例中,列已排序,但列并未按升序排序。

我尝试使用无界前置。但是,我没能取得我的成果。我遇到的问题是我无法搜索所有代码。我的窗口函数只搜索一列,当我一次放置所有列时它不起作用。

My output is:

sorting pyspark search window spark-window-function
1个回答
0
投票

我解决这个问题的方法是创建一个代码映射和它出现的最小时间戳。然后使用该映射来填充 start_time_i 列。下面是代码。

from pyspark import SparkContext, SQLContext
from pyspark.sql import functions as F
from pyspark.sql.functions import col,when


sc = SparkContext('local')
sqlContext = SQLContext(sc)

data1 = [
    ["1", "2023-08-30", "6", "6", "null", "null" ],
    ["1", "2023-08-13", "A6", "A", "6", "null"],
    ["1", "2023-08-06", "86", "8", "6", "null"],
    ["1", "2023-07-03", "6", "6", "null", "null"],
    ["1", "2023-05-06", "M86", "M", "8", "6"],
    ["2", "2023-08-10", "GA", "G", "A", "null"],
    ["2", "2023-07-04", "7D", "7", "D", "null"],
    ["2", "2023-06-07", "D", "D", "null", "null"],
]


df1Columns = ["id", "start_timestamps", "combined_code", "code1", "code2", "code3"]
df1 = sqlContext.createDataFrame(data=data1, schema = df1Columns)


#Replace "null" string with None for all columns
df1=df1.select([when(col(c)=="null", None).otherwise(col(c)).alias(c) for c in df1.columns])
print("Showing the values")
df1.show(n=100, truncate=False)

code_cols = ["code1", "code2", "code3"]
df1 = df1.withColumn("array_col", F.array_compact(F.array(code_cols)))

print("Showing the values")
df1.show(n=100, truncate=False)

df2 = df1.select("id", "start_timestamps", "array_col").withColumn("code_unique", F.explode("array_col")).drop("array_col").cache()
print("Showing the df2 values")
df2.show(n=100, truncate=False)

df3 = df2.groupby("id", "code_unique").agg(F.min("start_timestamps").alias("min_start_ts"))
print("Showing the df3 values")
df3.show(n=100, truncate=False)

df4 = df3.groupby("id").agg(F.collect_list("code_unique").alias("unique_code_list"), F.collect_list("min_start_ts").alias("min_start_ts_list"))
print("Showing the df4 values")
df4.show(n=100, truncate=False)

df5 = df4.withColumn("per_id_map", F.map_from_arrays("unique_code_list", "min_start_ts_list")).select("id", "per_id_map").cache()
print("Showing the df4 values")
df5.show(n=100, truncate=False)

df_joined = df1.join(df5, on=["id"])
print("Showing the df_joined values")
df_joined.show(n=100, truncate=False)

df_mapped = df_joined.withColumn("start_time_1", F.col("per_id_map").getItem(col("code1"))) \
                .withColumn("start_time_2", F.col("per_id_map").getItem(col("code2"))) \
                .withColumn("start_time_3", F.col("per_id_map").getItem(col("code3")))

print("Showing the df_mapped values")
df_mapped.show(n=100, truncate=False)

cols_select = df1Columns + ["start_time_1", "start_time_2", "start_time_3"]

print("Showing the final required dataframe values")
df_mapped.select(cols_select).show(n=100, truncate=False)

输出:

Showing the values
+---+----------------+-------------+-----+-----+-----+
|id |start_timestamps|combined_code|code1|code2|code3|
+---+----------------+-------------+-----+-----+-----+
|1  |2023-08-30      |6            |6    |null |null |
|1  |2023-08-13      |A6           |A    |6    |null |
|1  |2023-08-06      |86           |8    |6    |null |
|1  |2023-07-03      |6            |6    |null |null |
|1  |2023-05-06      |M86          |M    |8    |6    |
|2  |2023-08-10      |GA           |G    |A    |null |
|2  |2023-07-04      |7D           |7    |D    |null |
|2  |2023-06-07      |D            |D    |null |null |
+---+----------------+-------------+-----+-----+-----+

Showing the values
+---+----------------+-------------+-----+-----+-----+---------+
|id |start_timestamps|combined_code|code1|code2|code3|array_col|
+---+----------------+-------------+-----+-----+-----+---------+
|1  |2023-08-30      |6            |6    |null |null |[6]      |
|1  |2023-08-13      |A6           |A    |6    |null |[A, 6]   |
|1  |2023-08-06      |86           |8    |6    |null |[8, 6]   |
|1  |2023-07-03      |6            |6    |null |null |[6]      |
|1  |2023-05-06      |M86          |M    |8    |6    |[M, 8, 6]|
|2  |2023-08-10      |GA           |G    |A    |null |[G, A]   |
|2  |2023-07-04      |7D           |7    |D    |null |[7, D]   |
|2  |2023-06-07      |D            |D    |null |null |[D]      |
+---+----------------+-------------+-----+-----+-----+---------+

Showing the df2 values
+---+----------------+-----------+
|id |start_timestamps|code_unique|
+---+----------------+-----------+
|1  |2023-08-30      |6          |
|1  |2023-08-13      |A          |
|1  |2023-08-13      |6          |
|1  |2023-08-06      |8          |
|1  |2023-08-06      |6          |
|1  |2023-07-03      |6          |
|1  |2023-05-06      |M          |
|1  |2023-05-06      |8          |
|1  |2023-05-06      |6          |
|2  |2023-08-10      |G          |
|2  |2023-08-10      |A          |
|2  |2023-07-04      |7          |
|2  |2023-07-04      |D          |
|2  |2023-06-07      |D          |
+---+----------------+-----------+

Showing the df3 values
+---+-----------+------------+
|id |code_unique|min_start_ts|
+---+-----------+------------+
|1  |6          |2023-05-06  |
|1  |8          |2023-05-06  |
|1  |A          |2023-08-13  |
|1  |M          |2023-05-06  |
|2  |7          |2023-07-04  |
|2  |A          |2023-08-10  |
|2  |D          |2023-06-07  |
|2  |G          |2023-08-10  |
+---+-----------+------------+

Showing the df4 values
+---+----------------+------------------------------------------------+
|id |unique_code_list|min_start_ts_list                               |
+---+----------------+------------------------------------------------+
|1  |[6, 8, A, M]    |[2023-05-06, 2023-05-06, 2023-08-13, 2023-05-06]|
|2  |[7, A, D, G]    |[2023-07-04, 2023-08-10, 2023-06-07, 2023-08-10]|
+---+----------------+------------------------------------------------+

Showing the df4 values
+---+--------------------------------------------------------------------+
|id |per_id_map                                                          |
+---+--------------------------------------------------------------------+
|1  |{A -> 2023-08-13, M -> 2023-05-06, 6 -> 2023-05-06, 8 -> 2023-05-06}|
|2  |{G -> 2023-08-10, D -> 2023-06-07, 7 -> 2023-07-04, A -> 2023-08-10}|
+---+--------------------------------------------------------------------+

Showing the df_joined values
+---+----------------+-------------+-----+-----+-----+---------+--------------------------------------------------------------------+
|id |start_timestamps|combined_code|code1|code2|code3|array_col|per_id_map                                                          |
+---+----------------+-------------+-----+-----+-----+---------+--------------------------------------------------------------------+
|1  |2023-08-30      |6            |6    |null |null |[6]      |{A -> 2023-08-13, M -> 2023-05-06, 6 -> 2023-05-06, 8 -> 2023-05-06}|
|1  |2023-08-13      |A6           |A    |6    |null |[A, 6]   |{A -> 2023-08-13, M -> 2023-05-06, 6 -> 2023-05-06, 8 -> 2023-05-06}|
|1  |2023-08-06      |86           |8    |6    |null |[8, 6]   |{A -> 2023-08-13, M -> 2023-05-06, 6 -> 2023-05-06, 8 -> 2023-05-06}|
|1  |2023-07-03      |6            |6    |null |null |[6]      |{A -> 2023-08-13, M -> 2023-05-06, 6 -> 2023-05-06, 8 -> 2023-05-06}|
|1  |2023-05-06      |M86          |M    |8    |6    |[M, 8, 6]|{A -> 2023-08-13, M -> 2023-05-06, 6 -> 2023-05-06, 8 -> 2023-05-06}|
|2  |2023-08-10      |GA           |G    |A    |null |[G, A]   |{G -> 2023-08-10, D -> 2023-06-07, 7 -> 2023-07-04, A -> 2023-08-10}|
|2  |2023-07-04      |7D           |7    |D    |null |[7, D]   |{G -> 2023-08-10, D -> 2023-06-07, 7 -> 2023-07-04, A -> 2023-08-10}|
|2  |2023-06-07      |D            |D    |null |null |[D]      |{G -> 2023-08-10, D -> 2023-06-07, 7 -> 2023-07-04, A -> 2023-08-10}|
+---+----------------+-------------+-----+-----+-----+---------+--------------------------------------------------------------------+


Showing the df_mapped values
+---+----------------+-------------+-----+-----+-----+---------+--------------------------------------------------------------------+------------+------------+------------+
|id |start_timestamps|combined_code|code1|code2|code3|array_col|per_id_map                                                          |start_time_1|start_time_2|start_time_3|
+---+----------------+-------------+-----+-----+-----+---------+--------------------------------------------------------------------+------------+------------+------------+
|1  |2023-08-30      |6            |6    |null |null |[6]      |{A -> 2023-08-13, M -> 2023-05-06, 6 -> 2023-05-06, 8 -> 2023-05-06}|2023-05-06  |null        |null        |
|1  |2023-08-13      |A6           |A    |6    |null |[A, 6]   |{A -> 2023-08-13, M -> 2023-05-06, 6 -> 2023-05-06, 8 -> 2023-05-06}|2023-08-13  |2023-05-06  |null        |
|1  |2023-08-06      |86           |8    |6    |null |[8, 6]   |{A -> 2023-08-13, M -> 2023-05-06, 6 -> 2023-05-06, 8 -> 2023-05-06}|2023-05-06  |2023-05-06  |null        |
|1  |2023-07-03      |6            |6    |null |null |[6]      |{A -> 2023-08-13, M -> 2023-05-06, 6 -> 2023-05-06, 8 -> 2023-05-06}|2023-05-06  |null        |null        |
|1  |2023-05-06      |M86          |M    |8    |6    |[M, 8, 6]|{A -> 2023-08-13, M -> 2023-05-06, 6 -> 2023-05-06, 8 -> 2023-05-06}|2023-05-06  |2023-05-06  |2023-05-06  |
|2  |2023-08-10      |GA           |G    |A    |null |[G, A]   |{G -> 2023-08-10, D -> 2023-06-07, 7 -> 2023-07-04, A -> 2023-08-10}|2023-08-10  |2023-08-10  |null        |
|2  |2023-07-04      |7D           |7    |D    |null |[7, D]   |{G -> 2023-08-10, D -> 2023-06-07, 7 -> 2023-07-04, A -> 2023-08-10}|2023-07-04  |2023-06-07  |null        |
|2  |2023-06-07      |D            |D    |null |null |[D]      |{G -> 2023-08-10, D -> 2023-06-07, 7 -> 2023-07-04, A -> 2023-08-10}|2023-06-07  |null        |null        |
+---+----------------+-------------+-----+-----+-----+---------+--------------------------------------------------------------------+------------+------------+------------+

Showing the final required dataframe values
+---+----------------+-------------+-----+-----+-----+------------+------------+------------+
|id |start_timestamps|combined_code|code1|code2|code3|start_time_1|start_time_2|start_time_3|
+---+----------------+-------------+-----+-----+-----+------------+------------+------------+
|1  |2023-08-30      |6            |6    |null |null |2023-05-06  |null        |null        |
|1  |2023-08-13      |A6           |A    |6    |null |2023-08-13  |2023-05-06  |null        |
|1  |2023-08-06      |86           |8    |6    |null |2023-05-06  |2023-05-06  |null        |
|1  |2023-07-03      |6            |6    |null |null |2023-05-06  |null        |null        |
|1  |2023-05-06      |M86          |M    |8    |6    |2023-05-06  |2023-05-06  |2023-05-06  |
|2  |2023-08-10      |GA           |G    |A    |null |2023-08-10  |2023-08-10  |null        |
|2  |2023-07-04      |7D           |7    |D    |null |2023-07-04  |2023-06-07  |null        |
|2  |2023-06-07      |D            |D    |null |null |2023-06-07  |null        |null        |
+---+----------------+-------------+-----+-----+-----+------------+------------+------------+
© www.soinside.com 2019 - 2024. All rights reserved.