我的情况是这样的: 我有一个由符号(分类)值的时间序列组成的数据框。它看起来类似于:
idx symbol partition
0 A 0
1 C 0
2 B 0
3 C 0
4 A 0
5 C 1
6 B 1
7 D 1
8 C 1
9 B 1
我现在的目标是制作一个滑动窗口并将 n 个主值收集到一个数组中。
我通过以下方式实现了这一目标:
sliding_window = Window.partitionBy("partition").orderBy("idx").rowsBetween(Window.currentRow, 2)
sliding_df = df.withColumn("sliding", collect_list("symbol").over(sliding_window))
这会产生以下数据框:
idx symbol partition sliding
0 A 0 [A, C, B]
1 C 0 [C, B, C]
2 B 0 [B, C, A]
3 C 0 [C, A]
4 A 0 [A]
5 C 1 [C, B, D]
6 B 1 [B, D, C]
7 D 1 [D, C, B]
8 C 1 [C, B]
9 B 1 [B]
到目前为止一切顺利。由于 Spark 中的分区性质,滑动数组在到达分区末尾时会变得更短,因为缺少另一个分区中存在的前导行的信息。对于无法避免的时间序列的末尾,但希望滑动窗口不会错过中间的任何信息(本例中的索引 3 和 4)。
所需的数据框将如下所示:
idx symbol partition sliding
0 A 0 [A, C, B]
1 C 0 [C, B, C]
2 B 0 [B, C, A]
3 C 0 [C, A, C]
4 A 0 [A, C, B]
5 C 1 [C, B, D]
6 B 1 [B, D, C]
7 D 1 [D, C, B]
8 C 1 [C, B]
9 B 1 [B]
最佳方案是具有重叠的分区,以便索引 5 和 6 存在于两个冗余分区中,我可以计算所需的滑动窗口。有什么办法可以达到这个目的吗?
对于重叠数据,原始数据框将如下所示:
idx symbol partition
0 A 0
1 C 0
2 B 0
3 C 0
4 A 0
5 C 0
6 B 0
5 C 1
6 B 1
7 D 1
8 C 1
9 B 1
因此基本上分区 1 的前两行将被复制并附加为分区 0 的最后几行。
我考虑过过滤分区边界信息并在本地计算必要的信息,然后再连接回原始数据帧,但我想要一种更简单的方法。
在您的示例中,如果您只是不分区窗口,它会给您您想要的
sliding_window = Window.orderBy("idx").rowsBetween(Window.currentRow, 2)
sliding_df = df.withColumn("sliding", collect_list("symbol").over(sliding_window))
给予
idx symbol block sliding
0 A 0 [A, C, B]
1 C 0 [C, B, C]
2 B 0 [B, C, A]
3 C 0 [C, A, C]
4 A 0 [A, C, B]
5 C 1 [C, B, D]
6 B 1 [B, D, C]
7 D 1 [D, C, B]
8 C 1 [C, B]
9 B 1 [B]
另外,请小心,
collect_list()
不尊重顺序(由于 Spark 的分布式性质),因此您的符号会在列表中混淆。
我遇到了同样的问题,我建议采用替代方法。这会重复必须转到两个单独窗口的行。
输入:
from pyspark.sql import functions as F, Window as W
df0 = spark.createDataFrame(
[(0, 'A', 0),
(1, 'C', 0),
(2, 'B', 0),
(3, 'C', 0),
(4, 'A', 0),
(5, 'C', 1),
(6, 'B', 1),
(7, 'D', 1),
(8, 'C', 1),
(9, 'B', 1)],
['idx', 'symbol', 'partition'])
解决方案:
w = W.partitionBy('partition').orderBy('idx')
df = df0.withColumn('_rn', F.row_number().over(w))
df_previous_part = df.select('partition').distinct() \
.withColumn('_prev', F.lag('partition').over(W.orderBy('partition')))
df = df.join(df_previous_part, 'partition')
df = df.withColumn(
'_to_explode',
F.when((F.col('_rn') > 2) | F.isnull('_prev'), F.array('partition'))
.otherwise(F.array('partition', '_prev'))
)
df = df.withColumn('partition', F.explode('_to_explode')).select(df0.columns)
df = df.withColumn('sliding', F.collect_list('symbol').over(w.rowsBetween(W.currentRow, 2)))
max_part = df_previous_part.agg(F.max('partition')).head()[0]
df = df.filter((F.size('sliding') == 3) | (F.col('partition') == max_part))
df.show()
# +---+------+---------+---------+
# |idx|symbol|partition| sliding|
# +---+------+---------+---------+
# | 0| A| 0|[A, C, B]|
# | 1| C| 0|[C, B, C]|
# | 2| B| 0|[B, C, A]|
# | 3| C| 0|[C, A, C]|
# | 4| A| 0|[A, C, B]|
# | 5| C| 1|[C, B, D]|
# | 6| B| 1|[B, D, C]|
# | 7| D| 1|[D, C, B]|
# | 8| C| 1| [C, B]|
# | 9| B| 1| [B]|
# +---+------+---------+---------+