我有一个数据集,其输出如所附图片所示,
我想创建 3 个名为 start_time_1、start_time_2、start_time_3 的新列,以便我可以根据 start_timestamps 更新 coded、code、code 中每个代码的第一个时间戳。
附图中,第一组id=1,所以M、8、6开始的时间戳是2023-05-06。
所以,每次code1、code2、code3中出现M、8或6时,都应该用这个时间戳来填充。 A 在 2023-08-13 来得有点晚,所以 A 的时间戳将是 2023-08-13。
同样,在组 id=2 上,D 在 2023-06-07 出现在第一位,因此每次 D 出现在任何 code1、code2、code3 上。时间戳将为 2023-06-07。
如何在 pyspark 上实现这一目标?另请注意,尽管在所示示例中,列已排序,但列并未按升序排序。
我尝试使用无界前置。但是,我没能取得我的成果。我遇到的问题是我无法搜索所有代码。我的窗口函数只搜索一列,当我一次放置所有列时它不起作用。
我解决这个问题的方法是创建一个代码映射和它出现的最小时间戳。然后使用该映射来填充 start_time_i 列。下面是代码。
from pyspark import SparkContext, SQLContext
from pyspark.sql import functions as F
from pyspark.sql.functions import col,when
sc = SparkContext('local')
sqlContext = SQLContext(sc)
data1 = [
["1", "2023-08-30", "6", "6", "null", "null" ],
["1", "2023-08-13", "A6", "A", "6", "null"],
["1", "2023-08-06", "86", "8", "6", "null"],
["1", "2023-07-03", "6", "6", "null", "null"],
["1", "2023-05-06", "M86", "M", "8", "6"],
["2", "2023-08-10", "GA", "G", "A", "null"],
["2", "2023-07-04", "7D", "7", "D", "null"],
["2", "2023-06-07", "D", "D", "null", "null"],
]
df1Columns = ["id", "start_timestamps", "combined_code", "code1", "code2", "code3"]
df1 = sqlContext.createDataFrame(data=data1, schema = df1Columns)
#Replace "null" string with None for all columns
df1=df1.select([when(col(c)=="null", None).otherwise(col(c)).alias(c) for c in df1.columns])
print("Showing the values")
df1.show(n=100, truncate=False)
code_cols = ["code1", "code2", "code3"]
df1 = df1.withColumn("array_col", F.array_compact(F.array(code_cols)))
print("Showing the values")
df1.show(n=100, truncate=False)
df2 = df1.select("id", "start_timestamps", "array_col").withColumn("code_unique", F.explode("array_col")).drop("array_col").cache()
print("Showing the df2 values")
df2.show(n=100, truncate=False)
df3 = df2.groupby("id", "code_unique").agg(F.min("start_timestamps").alias("min_start_ts"))
print("Showing the df3 values")
df3.show(n=100, truncate=False)
df4 = df3.groupby("id").agg(F.collect_list("code_unique").alias("unique_code_list"), F.collect_list("min_start_ts").alias("min_start_ts_list"))
print("Showing the df4 values")
df4.show(n=100, truncate=False)
df5 = df4.withColumn("per_id_map", F.map_from_arrays("unique_code_list", "min_start_ts_list")).select("id", "per_id_map").cache()
print("Showing the df4 values")
df5.show(n=100, truncate=False)
df_joined = df1.join(df5, on=["id"])
print("Showing the df_joined values")
df_joined.show(n=100, truncate=False)
df_mapped = df_joined.withColumn("start_time_1", F.col("per_id_map").getItem(col("code1"))) \
.withColumn("start_time_2", F.col("per_id_map").getItem(col("code2"))) \
.withColumn("start_time_3", F.col("per_id_map").getItem(col("code3")))
print("Showing the df_mapped values")
df_mapped.show(n=100, truncate=False)
cols_select = df1Columns + ["start_time_1", "start_time_2", "start_time_3"]
print("Showing the final required dataframe values")
df_mapped.select(cols_select).show(n=100, truncate=False)
输出:
Showing the values
+---+----------------+-------------+-----+-----+-----+
|id |start_timestamps|combined_code|code1|code2|code3|
+---+----------------+-------------+-----+-----+-----+
|1 |2023-08-30 |6 |6 |null |null |
|1 |2023-08-13 |A6 |A |6 |null |
|1 |2023-08-06 |86 |8 |6 |null |
|1 |2023-07-03 |6 |6 |null |null |
|1 |2023-05-06 |M86 |M |8 |6 |
|2 |2023-08-10 |GA |G |A |null |
|2 |2023-07-04 |7D |7 |D |null |
|2 |2023-06-07 |D |D |null |null |
+---+----------------+-------------+-----+-----+-----+
Showing the values
+---+----------------+-------------+-----+-----+-----+---------+
|id |start_timestamps|combined_code|code1|code2|code3|array_col|
+---+----------------+-------------+-----+-----+-----+---------+
|1 |2023-08-30 |6 |6 |null |null |[6] |
|1 |2023-08-13 |A6 |A |6 |null |[A, 6] |
|1 |2023-08-06 |86 |8 |6 |null |[8, 6] |
|1 |2023-07-03 |6 |6 |null |null |[6] |
|1 |2023-05-06 |M86 |M |8 |6 |[M, 8, 6]|
|2 |2023-08-10 |GA |G |A |null |[G, A] |
|2 |2023-07-04 |7D |7 |D |null |[7, D] |
|2 |2023-06-07 |D |D |null |null |[D] |
+---+----------------+-------------+-----+-----+-----+---------+
Showing the df2 values
+---+----------------+-----------+
|id |start_timestamps|code_unique|
+---+----------------+-----------+
|1 |2023-08-30 |6 |
|1 |2023-08-13 |A |
|1 |2023-08-13 |6 |
|1 |2023-08-06 |8 |
|1 |2023-08-06 |6 |
|1 |2023-07-03 |6 |
|1 |2023-05-06 |M |
|1 |2023-05-06 |8 |
|1 |2023-05-06 |6 |
|2 |2023-08-10 |G |
|2 |2023-08-10 |A |
|2 |2023-07-04 |7 |
|2 |2023-07-04 |D |
|2 |2023-06-07 |D |
+---+----------------+-----------+
Showing the df3 values
+---+-----------+------------+
|id |code_unique|min_start_ts|
+---+-----------+------------+
|1 |6 |2023-05-06 |
|1 |8 |2023-05-06 |
|1 |A |2023-08-13 |
|1 |M |2023-05-06 |
|2 |7 |2023-07-04 |
|2 |A |2023-08-10 |
|2 |D |2023-06-07 |
|2 |G |2023-08-10 |
+---+-----------+------------+
Showing the df4 values
+---+----------------+------------------------------------------------+
|id |unique_code_list|min_start_ts_list |
+---+----------------+------------------------------------------------+
|1 |[6, 8, A, M] |[2023-05-06, 2023-05-06, 2023-08-13, 2023-05-06]|
|2 |[7, A, D, G] |[2023-07-04, 2023-08-10, 2023-06-07, 2023-08-10]|
+---+----------------+------------------------------------------------+
Showing the df4 values
+---+--------------------------------------------------------------------+
|id |per_id_map |
+---+--------------------------------------------------------------------+
|1 |{A -> 2023-08-13, M -> 2023-05-06, 6 -> 2023-05-06, 8 -> 2023-05-06}|
|2 |{G -> 2023-08-10, D -> 2023-06-07, 7 -> 2023-07-04, A -> 2023-08-10}|
+---+--------------------------------------------------------------------+
Showing the df_joined values
+---+----------------+-------------+-----+-----+-----+---------+--------------------------------------------------------------------+
|id |start_timestamps|combined_code|code1|code2|code3|array_col|per_id_map |
+---+----------------+-------------+-----+-----+-----+---------+--------------------------------------------------------------------+
|1 |2023-08-30 |6 |6 |null |null |[6] |{A -> 2023-08-13, M -> 2023-05-06, 6 -> 2023-05-06, 8 -> 2023-05-06}|
|1 |2023-08-13 |A6 |A |6 |null |[A, 6] |{A -> 2023-08-13, M -> 2023-05-06, 6 -> 2023-05-06, 8 -> 2023-05-06}|
|1 |2023-08-06 |86 |8 |6 |null |[8, 6] |{A -> 2023-08-13, M -> 2023-05-06, 6 -> 2023-05-06, 8 -> 2023-05-06}|
|1 |2023-07-03 |6 |6 |null |null |[6] |{A -> 2023-08-13, M -> 2023-05-06, 6 -> 2023-05-06, 8 -> 2023-05-06}|
|1 |2023-05-06 |M86 |M |8 |6 |[M, 8, 6]|{A -> 2023-08-13, M -> 2023-05-06, 6 -> 2023-05-06, 8 -> 2023-05-06}|
|2 |2023-08-10 |GA |G |A |null |[G, A] |{G -> 2023-08-10, D -> 2023-06-07, 7 -> 2023-07-04, A -> 2023-08-10}|
|2 |2023-07-04 |7D |7 |D |null |[7, D] |{G -> 2023-08-10, D -> 2023-06-07, 7 -> 2023-07-04, A -> 2023-08-10}|
|2 |2023-06-07 |D |D |null |null |[D] |{G -> 2023-08-10, D -> 2023-06-07, 7 -> 2023-07-04, A -> 2023-08-10}|
+---+----------------+-------------+-----+-----+-----+---------+--------------------------------------------------------------------+
Showing the df_mapped values
+---+----------------+-------------+-----+-----+-----+---------+--------------------------------------------------------------------+------------+------------+------------+
|id |start_timestamps|combined_code|code1|code2|code3|array_col|per_id_map |start_time_1|start_time_2|start_time_3|
+---+----------------+-------------+-----+-----+-----+---------+--------------------------------------------------------------------+------------+------------+------------+
|1 |2023-08-30 |6 |6 |null |null |[6] |{A -> 2023-08-13, M -> 2023-05-06, 6 -> 2023-05-06, 8 -> 2023-05-06}|2023-05-06 |null |null |
|1 |2023-08-13 |A6 |A |6 |null |[A, 6] |{A -> 2023-08-13, M -> 2023-05-06, 6 -> 2023-05-06, 8 -> 2023-05-06}|2023-08-13 |2023-05-06 |null |
|1 |2023-08-06 |86 |8 |6 |null |[8, 6] |{A -> 2023-08-13, M -> 2023-05-06, 6 -> 2023-05-06, 8 -> 2023-05-06}|2023-05-06 |2023-05-06 |null |
|1 |2023-07-03 |6 |6 |null |null |[6] |{A -> 2023-08-13, M -> 2023-05-06, 6 -> 2023-05-06, 8 -> 2023-05-06}|2023-05-06 |null |null |
|1 |2023-05-06 |M86 |M |8 |6 |[M, 8, 6]|{A -> 2023-08-13, M -> 2023-05-06, 6 -> 2023-05-06, 8 -> 2023-05-06}|2023-05-06 |2023-05-06 |2023-05-06 |
|2 |2023-08-10 |GA |G |A |null |[G, A] |{G -> 2023-08-10, D -> 2023-06-07, 7 -> 2023-07-04, A -> 2023-08-10}|2023-08-10 |2023-08-10 |null |
|2 |2023-07-04 |7D |7 |D |null |[7, D] |{G -> 2023-08-10, D -> 2023-06-07, 7 -> 2023-07-04, A -> 2023-08-10}|2023-07-04 |2023-06-07 |null |
|2 |2023-06-07 |D |D |null |null |[D] |{G -> 2023-08-10, D -> 2023-06-07, 7 -> 2023-07-04, A -> 2023-08-10}|2023-06-07 |null |null |
+---+----------------+-------------+-----+-----+-----+---------+--------------------------------------------------------------------+------------+------------+------------+
Showing the final required dataframe values
+---+----------------+-------------+-----+-----+-----+------------+------------+------------+
|id |start_timestamps|combined_code|code1|code2|code3|start_time_1|start_time_2|start_time_3|
+---+----------------+-------------+-----+-----+-----+------------+------------+------------+
|1 |2023-08-30 |6 |6 |null |null |2023-05-06 |null |null |
|1 |2023-08-13 |A6 |A |6 |null |2023-08-13 |2023-05-06 |null |
|1 |2023-08-06 |86 |8 |6 |null |2023-05-06 |2023-05-06 |null |
|1 |2023-07-03 |6 |6 |null |null |2023-05-06 |null |null |
|1 |2023-05-06 |M86 |M |8 |6 |2023-05-06 |2023-05-06 |2023-05-06 |
|2 |2023-08-10 |GA |G |A |null |2023-08-10 |2023-08-10 |null |
|2 |2023-07-04 |7D |7 |D |null |2023-07-04 |2023-06-07 |null |
|2 |2023-06-07 |D |D |null |null |2023-06-07 |null |null |
+---+----------------+-------------+-----+-----+-----+------------+------------+------------+