PySpark 在每周或每月的时间段内连接一列列表

问题描述 投票:0回答:2

我是 PySpark/Databricks 的新手。我有一个关于根据每周或每月时间段连接一列列表的问题。这是我预期结果的代码。

dates = ['2023-04-01', '2023-04-02', '2023-04-03', '2023-04-04', '2023-04-05', '2023-04-06', '2023-04-07', '2023-04-08', '2023-04-09', '2023-04-10', '2023-04-11', '2023-04-12', '2023-04-13', '2023-04-14']
brands = [['bmw', 'vw'], ['chevy', 'buick'], ['nissan', 'lexus', 'email'], ['bmw', 'nissan', 'lexus'], ['bmw', 'vw', 'nissan', 'lexus'], ['bmw', 'vw', 'chevy'], ['chevy', 'bmw', 'buick'], ['bmw', 'vw'], ['chevy', 'nissan'], ['nissan', 'lexus', 'vw'], ['bmw', 'security', 'vw'], ['bmw', 'vw', 'nissan', 'lexus'], ['bmw', 'lexus', 'chevy'], ['chevy', 'bmw', 'buick']]
weights = [[0.99, 0.98], [0.97, 0.96], [0.95, 0.94, 0.93], [0.98, 0.96, 0.95], [0.97, 0.96, 0.95, 0.94], [0.975, 0.964, 0.952, 0.943], [0.98, 0.976, 0.967], [0.99, 0.987, 0.978],
[0.978, 0.975], [0.972, 0.963], [0.955, 0.942, 0.936], [0.982, 0.961, 0.952], [0.97, 0.96, 0.952, 0.94], [0.975, 0.964, 0.952, 0.943], [0.982, 0.976, 0.967], [0.992, 0.987, 0.978]]

df = spark.createDataFrame(zip(dates, brands, weights), ['date', 'brands', 'weight'])
df.show()

+----------+--------------------+--------------------+
|      date|              brands|              weight|
+----------+--------------------+--------------------+
|2023-04-01|           [bmw, vw]|        [0.99, 0.98]|
|2023-04-02|      [chevy, buick]|        [0.97, 0.96]|
|2023-04-03|[nissan, lexus, e...|  [0.95, 0.94, 0.93]|
|2023-04-04|[bmw, nissan, lexus]|  [0.98, 0.96, 0.95]|
|2023-04-05|[bmw, vw, nissan,...|[0.97, 0.96, 0.95...|
|2023-04-06|    [bmw, vw, chevy]|[0.975, 0.964, 0....|
|2023-04-07| [chevy, bmw, buick]|[0.98, 0.976, 0.967]|
|2023-04-08|           [bmw, vw]|[0.99, 0.987, 0.978]|
|2023-04-09|     [chevy, nissan]|      [0.978, 0.975]|
|2023-04-10| [nissan, lexus, vw]|      [0.972, 0.963]|
|2023-04-11| [bmw, security, vw]|[0.955, 0.942, 0....|
|2023-04-12|[bmw, vw, nissan,...|[0.982, 0.961, 0....|
|2023-04-13| [bmw, lexus, chevy]|[0.97, 0.96, 0.95...|
|2023-04-14| [chevy, bmw, buick]|[0.975, 0.964, 0....|
+----------+--------------------+--------------------+

df1 = df.withColumn("DateFormatted", to_timestamp(col("date"), "yyyy-MM-dd"))
df1.show()
    
+----------+--------------------+--------------------+-------------------+
|      date|              brands|              weight|      DateFormatted|
+----------+--------------------+--------------------+-------------------+
|2023-04-01|           [bmw, vw]|        [0.99, 0.98]|2023-04-01 00:00:00|
|2023-04-02|      [chevy, buick]|        [0.97, 0.96]|2023-04-02 00:00:00|
|2023-04-03|[nissan, lexus, e...|  [0.95, 0.94, 0.93]|2023-04-03 00:00:00|
|2023-04-04|[bmw, nissan, lexus]|  [0.98, 0.96, 0.95]|2023-04-04 00:00:00|
|2023-04-05|[bmw, vw, nissan,...|[0.97, 0.96, 0.95...|2023-04-05 00:00:00|
|2023-04-06|    [bmw, vw, chevy]|[0.975, 0.964, 0....|2023-04-06 00:00:00|
|2023-04-07| [chevy, bmw, buick]|[0.98, 0.976, 0.967]|2023-04-07 00:00:00|
|2023-04-08|           [bmw, vw]|[0.99, 0.987, 0.978]|2023-04-08 00:00:00|
|2023-04-09|     [chevy, nissan]|      [0.978, 0.975]|2023-04-09 00:00:00|
|2023-04-10| [nissan, lexus, vw]|      [0.972, 0.963]|2023-04-10 00:00:00|
|2023-04-11| [bmw, security, vw]|[0.955, 0.942, 0....|2023-04-11 00:00:00|
|2023-04-12|[bmw, vw, nissan,...|[0.982, 0.961, 0....|2023-04-12 00:00:00|
|2023-04-13| [bmw, lexus, chevy]|[0.97, 0.96, 0.95...|2023-04-13 00:00:00|
|2023-04-14| [chevy, bmw, buick]|[0.975, 0.964, 0....|2023-04-14 00:00:00|
+----------+--------------------+--------------------+-------------------+

我将日期列转换为日期列以进行任何类型的时间戳相关聚合。现在,这就是我想要的:

  1. 每周(比如 4 月 1 日 - 4 月 7 日,4 月 8 日 - 4 月 14 日......),我希望将“品牌”列和“重量”列连接成一行(新数据框的)。
  2. 我想要另一个数据框,monthly_aggregate_df,它会为每个日历月做类似的事情。

我尝试了这些……但是,遇到了几个问题:

window(列,窗口持续时间,滑动持续时间,起始偏移量)

    df2 = df1.groupBy(window(col("DateFormatted"), "1 week", "1 week", "64 hours")).agg(concat("brands") as "brands_concat").select("window.start", "window.end", "DateFormatted", "brands_concat").show()

SyntaxError: invalid syntax
  File "<command-4423978228267630>", line 2
    df2 = df1.groupBy(window(col("DateFormatted"), "1 week", "1 week", "64 hours")).agg(concat("brands") as "brands_concat").select("window.start", "window.end", "DateFormatted", "brands_concat").show()
                                                                                                         ^
SyntaxError: invalid syntax

另一个尝试:

import pyspark.sql.functions as f
df.groupBy("date").agg(f.concat_ws(",", f.collect_list("brands")).alias("brands")).show()


AnalysisException: cannot resolve 'concat_ws(',', collect_list(`brands`))' due to data type mismatch: argument 2 requires (array<string> or string) type, however, 'collect_list(`brands`)' is of array<array<string>> type.;

看起来 concat_ws 只连接字符串。不是列表。也许我需要使用某种 UDF 来执行此操作。所以,尝试了 array_join ..但是,不喜欢使用分组数据......

from pyspark.sql.functions import array_join
    df2.withColumn("week_strt_day",date_sub(next_day(col("DateFormatted"),"sunday"),7)).groupBy("week_strt_day").apply(array_join("brands", ",").alias("brands")).orderBy("week_strt_day").show()
    
    ValueError: Invalid udf: the udf argument must be a pandas_udf of type GROUPED_MAP.
python pyspark concatenation aggregate
2个回答
2
投票
  1. 您为一周创建一个标识符:
from pyspark.sql import functions as F


df1 = df1.groupBy(F.window(F.col("DateFormatted"), "1 weeks").getItem("start").alias("week"))
  1. 您汇总值:
df1 = df1.agg(
    F.collect_list(F.col("brands")).alias("brands"),
    F.collect_list(F.col("weight")).alias("weight"),
)
  1. 你展平阵列:
df1 = df1.select(
    "week",
    F.flatten(F.col("brands")).alias("brands"),
    F.flatten(F.col("weight")).alias("weight"),
)

结果

df1.show()
+-------------------+--------------------+--------------------+
|               week|              brands|              weight|
+-------------------+--------------------+--------------------+
|2023-03-30 02:00:00|[bmw, vw, chevy, ...|[0.99, 0.98, 0.97...|
|2023-04-06 02:00:00|[chevy, bmw, buic...|[0.98, 0.976, 0.9...|
|2023-04-13 02:00:00| [chevy, bmw, buick]|[0.975, 0.964, 0....|
+-------------------+--------------------+--------------------+

注意: 我的结果有 2 小时的偏移,因为我在法国,因此,utc+2


对于 month agg,只需将 1. 更改为“1 month”而不是 week.


0
投票

让我知道您对此解决方案的看法:

df = spark.createDataFrame([
    ('2023-04-01', ['bmw', 'vw'], [0.99, 0.98]),
    ('2023-04-02', ['bmw', 'chevy'], [0.96, 0.95]),
    ('2023-04-03', ['bmw', 'nissan', 'lexus'], [0.98, 0.96, 0.95]),
    ('2023-04-04', ['chevy', 'bmw', 'buick'], [0.98, 0.976, 0.967]),
    ('2023-04-05', ['nissan', 'lexus', 'vw'], [0.972, 0.963, 0.98]),
    ('2023-05-06', ['bmw', 'vw'], [0.99, 0.98]),
    ('2023-05-07', ['bmw', 'chevy'], [0.96, 0.95]),
], ['Date', 'Brands', 'Weights'])

df_grouped_weekly = (
    df
    .withColumn('DateFormatted', f.to_timestamp(f.col('Date'), 'yyyy-MM-dd'))
    .withColumn('WeekOfYear', f.concat_ws('-', f.weekofyear(f.col('DateFormatted')), f.year(f.col('DateFormatted'))))
    .groupBy('WeekOfYear')
    .agg(
        f.collect_list(f.col('Brands')).alias('GroupedBrands'),
        f.collect_list(f.col('Weights')).alias('GroupedWeights'),
    )
    .withColumn('GroupedBrands', f.flatten(f.col('GroupedBrands')))
    .withColumn('GroupedWeights', f.flatten(f.col('GroupedWeights')))
)

df_grouped_monthly = (
    df
    .withColumn('DateFormatted', f.to_timestamp(f.col('Date'), 'yyyy-MM-dd'))
    .withColumn('Month', f.concat_ws('-', f.month(f.col('DateFormatted')), f.year(f.col('DateFormatted'))))
    .groupBy('Month')
    .agg(
        f.collect_list(f.col('Brands')).alias('GroupedBrands'),
        f.collect_list(f.col('Weights')).alias('GroupedWeights'),
    )
    .withColumn('GroupedBrands', f.flatten(f.col('GroupedBrands')))
    .withColumn('GroupedWeights', f.flatten(f.col('GroupedWeights')))
)

df_grouped_weekly.show(truncate = False)
df_grouped_monthly.show(truncate = False)

输出为:

+----------+----------------------------------------------------------+----------------------------------------------------------+
|WeekOfYear|GroupedBrands                                             |GroupedWeights                                            |
+----------+----------------------------------------------------------+----------------------------------------------------------+
|18-2023   |[bmw, vw, bmw, chevy]                                     |[0.99, 0.98, 0.96, 0.95]                                  |
|14-2023   |[bmw, nissan, lexus, chevy, bmw, buick, nissan, lexus, vw]|[0.98, 0.96, 0.95, 0.98, 0.976, 0.967, 0.972, 0.963, 0.98]|
|13-2023   |[bmw, vw, bmw, chevy]                                     |[0.99, 0.98, 0.96, 0.95]                                  |
+----------+----------------------------------------------------------+----------------------------------------------------------+

+------+-------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
|Month |GroupedBrands                                                                  |GroupedWeights                                                                    |
+------+-------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
|4-2023|[bmw, vw, bmw, chevy, bmw, nissan, lexus, chevy, bmw, buick, nissan, lexus, vw]|[0.99, 0.98, 0.96, 0.95, 0.98, 0.96, 0.95, 0.98, 0.976, 0.967, 0.972, 0.963, 0.98]|
|5-2023|[bmw, vw, bmw, chevy]                                                          |[0.99, 0.98, 0.96, 0.95]                                                          |
+------+-------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
© www.soinside.com 2019 - 2024. All rights reserved.