在Spark Dataframe中实现Window的重叠分区

问题描述 投票:0回答:2

我的情况是这样的: 我有一个由符号(分类)值的时间序列组成的数据框。它看起来类似于:


    idx    symbol    partition
    0      A         0
    1      C         0
    2      B         0
    3      C         0
    4      A         0
    5      C         1
    6      B         1
    7      D         1
    8      C         1
    9      B         1

我现在的目标是制作一个滑动窗口并将 n 个主值收集到一个数组中。

我通过以下方式实现了这一目标:

sliding_window = Window.partitionBy("partition").orderBy("idx").rowsBetween(Window.currentRow, 2)
sliding_df = df.withColumn("sliding", collect_list("symbol").over(sliding_window))

这会产生以下数据框:

    idx    symbol    partition    sliding
    0      A         0            [A, C, B]
    1      C         0            [C, B, C]
    2      B         0            [B, C, A]
    3      C         0               [C, A]
    4      A         0                  [A]
    5      C         1            [C, B, D]
    6      B         1            [B, D, C]
    7      D         1            [D, C, B]
    8      C         1               [C, B]
    9      B         1                  [B]

到目前为止一切顺利。由于 Spark 中的分区性质,滑动数组在到达分区末尾时会变得更短,因为缺少另一个分区中存在的前导行的信息。对于无法避免的时间序列的末尾,但希望滑动窗口不会错过中间的任何信息(本例中的索引 3 和 4)。

所需的数据框将如下所示:

    idx    symbol    partition    sliding
    0      A         0            [A, C, B]
    1      C         0            [C, B, C]
    2      B         0            [B, C, A]
    3      C         0            [C, A, C]
    4      A         0            [A, C, B]
    5      C         1            [C, B, D]
    6      B         1            [B, D, C]
    7      D         1            [D, C, B]
    8      C         1               [C, B]
    9      B         1                  [B]

最佳方案是具有重叠的分区,以便索引 5 和 6 存在于两个冗余分区中,我可以计算所需的滑动窗口。有什么办法可以达到这个目的吗?

对于重叠数据,原始数据框将如下所示:

    idx    symbol    partition    
    0      A         0        
    1      C         0        
    2      B         0        
    3      C         0        
    4      A         0
    5      C         0
    6      B         0
    5      C         1        
    6      B         1        
    7      D         1        
    8      C         1           
    9      B         1              

因此基本上分区 1 的前两行将被复制并附加为分区 0 的最后几行。

我考虑过过滤分区边界信息并在本地计算必要的信息,然后再连接回原始数据帧,但我想要一种更简单的方法。

apache-spark pyspark time-series apache-spark-sql
2个回答
1
投票

在您的示例中,如果您只是不分区窗口,它会给您您想要的

sliding_window = Window.orderBy("idx").rowsBetween(Window.currentRow, 2)
sliding_df = df.withColumn("sliding", collect_list("symbol").over(sliding_window))

给予

 idx    symbol    block    sliding
    0      A         0        [A, C, B]
    1      C         0        [C, B, C]
    2      B         0        [B, C, A]
    3      C         0        [C, A, C]
    4      A         0        [A, C, B]
    5      C         1        [C, B, D]
    6      B         1        [B, D, C]
    7      D         1        [D, C, B]
    8      C         1           [C, B]
    9      B         1              [B]

另外,请小心,

collect_list()
不尊重顺序(由于 Spark 的分布式性质),因此您的符号会在列表中混淆。


0
投票

我遇到了同样的问题,我建议采用替代方法。这会重复必须转到两个单独窗口的行。

输入:

from pyspark.sql import functions as F, Window as W
df0 = spark.createDataFrame(
    [(0, 'A', 0),
     (1, 'C', 0),
     (2, 'B', 0),
     (3, 'C', 0),
     (4, 'A', 0),
     (5, 'C', 1),
     (6, 'B', 1),
     (7, 'D', 1),
     (8, 'C', 1),
     (9, 'B', 1)],
    ['idx', 'symbol', 'partition'])

解决方案:

w = W.partitionBy('partition').orderBy('idx')
df = df0.withColumn('_rn', F.row_number().over(w))

df_previous_part = df.select('partition').distinct() \
                     .withColumn('_prev', F.lag('partition').over(W.orderBy('partition')))
df = df.join(df_previous_part, 'partition')
df = df.withColumn(
    '_to_explode',
    F.when((F.col('_rn') > 2) | F.isnull('_prev'), F.array('partition'))
     .otherwise(F.array('partition', '_prev'))
)
df = df.withColumn('partition', F.explode('_to_explode')).select(df0.columns)

df = df.withColumn('sliding', F.collect_list('symbol').over(w.rowsBetween(W.currentRow, 2)))
max_part = df_previous_part.agg(F.max('partition')).head()[0]
df = df.filter((F.size('sliding') == 3) | (F.col('partition') == max_part))

df.show()
# +---+------+---------+---------+
# |idx|symbol|partition|  sliding|
# +---+------+---------+---------+
# |  0|     A|        0|[A, C, B]|
# |  1|     C|        0|[C, B, C]|
# |  2|     B|        0|[B, C, A]|
# |  3|     C|        0|[C, A, C]|
# |  4|     A|        0|[A, C, B]|
# |  5|     C|        1|[C, B, D]|
# |  6|     B|        1|[B, D, C]|
# |  7|     D|        1|[D, C, B]|
# |  8|     C|        1|   [C, B]|
# |  9|     B|        1|      [B]|
# +---+------+---------+---------+
© www.soinside.com 2019 - 2024. All rights reserved.