databricks 上的 pyspark 代码永远不会完成执行并挂在中间

问题描述 投票:0回答:1

我有两个数据框:df_selected 和 df_filtered_mins_60

df_filtered_mins_60.columns()

输出:[“CSku”,“start_timestamp”,“end_timestamp”]

df_selected.columns() 

输出:["DATEUPDATED", "DATE", "HOUR", "CPSKU", "BB_Status", “ActivePrice”,“PrevPrice”,“MinPrice”,“AsCost”, “MinMargin”、“CPT”、“Comp_Price”、“AP_MSG”]

df_selected.count()

输出:7,816,521

df_filtered_mins_60.count()

产量:112,397

我想要实现的是: 迭代 df_filtered_mins_60,每行取:
开始时间 = 开始时间戳
停止时间 = 结束时间戳
sku = CSku
在 df_selected WHEN 上应用以下条件:
DATEUPDATED 等于或介于 start_time 和 stop_time 之间
和 CPSKU = sku
然后为满足此条件的所有行分配一个常数 i。继续执行此操作,直到 df_filtered_mins_60 中的行末尾。每次更新增量后 i=i+1

我写的代码如下。这段代码永远不会执行,而是卡在某个地方。它会持续运行几个小时,直到我强行停止它。

i = 1
df_selected = df_selected.withColumn("counter", lit(0))

# Iterate through each row of df_filtered_mins_60
for row in df_filtered_mins_60.collect():
    sku = row['CSku']
    start_time = row['start_timestamp']
    stop_time = row['stop_timestamp']

    # Apply conditions on df_selected and update "counter" column
    df_selected = df_selected.withColumn("counter", 
                                         when((df_selected.DATEUPDATED >= start_time) & 
                                              (df_selected.DATEUPDATED <= stop_time) &
                                              (df_selected.CPSKU == sku),
                                              lit(i)).otherwise(df_selected.counter))
    
    i += 1

# Display the updated df_selected DataFrame with the "counter" column
display(df_selected)

我分配计数器是因为我需要 df_selected 中的一组行,这些行位于每个 SKU 的某些时间窗口之间,并且此信息存在于 df_filtered_mins_60 中。分配计数器后,我需要对 df_selected 中的其他列执行聚合。基本上,对于每个窗口,我需要深入了解特定时间窗口内发生的情况。

我需要在 Pyspark 中获取正确的代码才能在 Databricks 上运行。

生成样本数据:

from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType

# Initialize SparkSession
spark_a = SparkSession.builder \
    .appName("Create DataFrame") \
    .getOrCreate()

schema = StructType([
    StructField("DATEUPDATED", StringType(), True),
    StructField("DATE", StringType(), True),
    StructField("HOUR", IntegerType(), True),
    StructField("CPSKU", StringType(), True),
    StructField("BB_Status", IntegerType(), True),
    StructField("ActivePrice", DoubleType(), True),
    StructField("PrevPrice", DoubleType(), True),
    StructField("MinPrice", DoubleType(), True),
    StructField("AsCost", DoubleType(), True),
    StructField("MinMargin", DoubleType(), True),
    StructField("CPT", DoubleType(), True),
    StructField("Comp_Price", DoubleType(), True)
])

data=[('2024-01-01T19:45:39.151+00:00','2024-01-01',0,'MSAN10115836',0,14.86,14.86,14.86,12.63,0.00,13.90,5.84) ,
('2024-01-01T19:55:10.904+00:00','2024-01-01',0,'MSAN10115836',0,126.04,126.04,126.04,108.96,0.00,0.00,93.54),
('2024-01-01T20:35:10.904+00:00','2024-01-01',0,'MSAN10115836',0,126.04,126.04,126.04,108.96,0.00,0.00,93.54),
('2024-01-15T12:55:18.528+00:00','2024-01-01',1,'PFXNDDF4OX',1,18.16,18.16,10.56,26.85,-199.00,18.16,34.10) ,
('2024-01-15T13:25:18.528+00:00','2024-01-01',1,'PFXNDDF4OX',1,18.16,18.16,10.56,26.85,-199.00,18.16,34.10) ,
('2024-01-15T13:35:18.528+00:00','2024-01-01',1,'PFXNDDF4OX',1,18.16,18.16,10.56,26.85,-199.00,18.16,34.10) ,
('2024-01-15T13:51:09.574+00:00','2024-01-01',1,'PFXNDDF4OX',1,20.16,18.16,10.56,26.85,-199.00,18.16,34.10) ,
('2024-01-15T07:28:48.265+00:00','2024-01-01',1,'DEWNDCB135C',0,44.93,44.93,44.93,38.09,0.25,26.9,941.26),
('2024-01-15T07:50:32.412+00:00','2024-01-01',1,'DEWNDCB135C',0,44.93,44.93,44.93,38.09,0.25,26.9,941.26),
('2024-01-15T07:52:32.412+00:00','2024-01-01',1,'DEWNDCB135C',0,44.93,44.93,44.93,38.09,0.25,26.9,941.26)]

df_selected = spark.createDataFrame(data, schema=schema)
df_selected = df_selected.withColumn("DateUpdated", to_timestamp(df_selected["DATEUPDATED"], "yyyy-MM-dd'T'HH:mm:ss.SSS'+00:00'"))
display(df_selected)

第二个数据框:

schema = StructType([
    StructField("CPSKU", StringType(), True),
    StructField("start_timestamp", StringType(), True),
    StructField("stop_timestamp", StringType(), True)
])
data_2=[('MSAN10115836','2024-01-01T19:45:39.151+00:00','2024-01-01T20:35:10.904+00:00'),
('MSAN10115836','2024-01-08T06:04:16.484+00:00','2024-01-08T06:42:14.912+00:00'),
('DEWNDCB135C','2024-01-15T07:28:48.265+00:00','2024-01-15T07:52:32.412+00:00'),
('DEWNDCB135C','2024-01-15T11:37:56.698+00:00','2024-01-15T12:35:09.693+00:00'),
('PFXNDDF4OX','2024-01-15T12:55:18.528+00:00','2024-01-15T13:51:09.574+00:00'),
('PFXNDDF4OX','2024-01-15T19:25:10.150+00:00','2024-01-15T20:24:36.385+00:00')]

df_filtered_mins_60 = spark.createDataFrame(data_2, schema=schema)
df_filtered_mins_60 = df_filtered_mins_60.withColumn("start_timestamp", to_timestamp(df_filtered_mins_60["start_timestamp"], "yyyy-MM-dd'T'HH:mm:ss.SSS'+00:00'"))
df_filtered_mins_60 = df_filtered_mins_60.withColumn("stop_timestamp", to_timestamp(df_filtered_mins_60["stop_timestamp"], "yyyy-MM-dd'T'HH:mm:ss.SSS'+00:00'"))
display(df_filtered_mins_60)
python apache-spark pyspark databricks
1个回答
0
投票

下面的代码块确实效率很低,因为

.collect()
会将所有数据加载到驱动程序中(并且不利用正在分发的数据),然后对于循环的每次迭代,您都会覆盖整个列。

# Iterate through each row of df_filtered_mins_60
for row in df_filtered_mins_60.collect():
    sku = row['CSku']
    start_time = row['start_timestamp']
    stop_time = row['stop_timestamp']

    # Apply conditions on df_selected and update "counter" column
    df_selected = df_selected.withColumn("counter", 
                                         when((df_selected.DATEUPDATED >= start_time) & 
                                              (df_selected.DATEUPDATED <= stop_time) &
                                              (df_selected.CPSKU == sku),
                                              lit(i)).otherwise(df_selected.counter))
    
    i += 1

如果您需要有序的计数器行,您可以为

df_filtered_mins_60
的每一行分配一个行号,然后根据条件将
df_selected
df_filtered_mins_60
左连接:

[
    df_selected.CPSKU == df_filtered_mins_60.CPSKU,
    df_selected.DATEUPDATED >= df_filtered_mins_60.start_timestamp,
    df_selected.DATEUPDATED <= df_filtered_mins_60.stop_timestamp,
]

这将保留

df_selected
的所有行,包括那些不满足连接条件的行。然后您可以将 0 分配给计数器为
NULL
的任何行。

下面是一个完全可重现的示例(我在 df_selected 中添加了一行不满足连接条件的行,只是为了显示会发生什么):

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
from pyspark.sql.window import Window

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Create DataFrame") \
    .getOrCreate()

schema = StructType([
    StructField("DATEUPDATED", StringType(), True),
    StructField("DATE", StringType(), True),
    StructField("HOUR", IntegerType(), True),
    StructField("CPSKU", StringType(), True),
    StructField("BB_Status", IntegerType(), True),
    StructField("ActivePrice", DoubleType(), True),
    StructField("PrevPrice", DoubleType(), True),
    StructField("MinPrice", DoubleType(), True),
    StructField("AsCost", DoubleType(), True),
    StructField("MinMargin", DoubleType(), True),
    StructField("CPT", DoubleType(), True),
    StructField("Comp_Price", DoubleType(), True)
])

data=[('2024-01-01T19:45:39.151+00:00','2024-01-01',0,'MSAN10115836',0,14.86,14.86,14.86,12.63,0.00,13.90,5.84) ,
('2024-01-01T19:55:10.904+00:00','2024-01-01',0,'MSAN10115836',0,126.04,126.04,126.04,108.96,0.00,0.00,93.54),
('2024-01-01T20:35:10.904+00:00','2024-01-01',0,'MSAN10115836',0,126.04,126.04,126.04,108.96,0.00,0.00,93.54),
('2024-01-15T12:55:18.528+00:00','2024-01-01',1,'PFXNDDF4OX',1,18.16,18.16,10.56,26.85,-199.00,18.16,34.10) ,
('2024-01-15T13:25:18.528+00:00','2024-01-01',1,'PFXNDDF4OX',1,18.16,18.16,10.56,26.85,-199.00,18.16,34.10) ,
('2024-01-15T13:35:18.528+00:00','2024-01-01',1,'PFXNDDF4OX',1,18.16,18.16,10.56,26.85,-199.00,18.16,34.10) ,
('2024-01-15T13:51:09.574+00:00','2024-01-01',1,'PFXNDDF4OX',1,20.16,18.16,10.56,26.85,-199.00,18.16,34.10) ,
('2024-01-15T07:28:48.265+00:00','2024-01-01',1,'DEWNDCB135C',0,44.93,44.93,44.93,38.09,0.25,26.9,941.26),
('2024-01-15T07:50:32.412+00:00','2024-01-01',1,'DEWNDCB135C',0,44.93,44.93,44.93,38.09,0.25,26.9,941.26),
('2024-01-15T07:52:32.412+00:00','2024-01-01',1,'DEWNDCB135C',0,44.93,44.93,44.93,38.09,0.25,26.9,941.26),
('2027-01-15T07:52:32.412+00:00','2024-01-01',1,'TEST',0,44.93,44.93,44.93,38.09,0.25,26.9,941.26)]

df_selected = spark.createDataFrame(data, schema=schema)
df_selected = df_selected.withColumn("DATEUPDATED", to_timestamp(df_selected["DATEUPDATED"], "yyyy-MM-dd'T'HH:mm:ss.SSS'+00:00'"))

schema = StructType([
    StructField("CPSKU", StringType(), True),
    StructField("start_timestamp", StringType(), True),
    StructField("stop_timestamp", StringType(), True)
])
data_2=[('MSAN10115836','2024-01-01T19:45:39.151+00:00','2024-01-01T20:35:10.904+00:00'),
('MSAN10115836','2024-01-08T06:04:16.484+00:00','2024-01-08T06:42:14.912+00:00'),
('DEWNDCB135C','2024-01-15T07:28:48.265+00:00','2024-01-15T07:52:32.412+00:00'),
('DEWNDCB135C','2024-01-15T11:37:56.698+00:00','2024-01-15T12:35:09.693+00:00'),
('PFXNDDF4OX','2024-01-15T12:55:18.528+00:00','2024-01-15T13:51:09.574+00:00'),
('PFXNDDF4OX','2024-01-15T19:25:10.150+00:00','2024-01-15T20:24:36.385+00:00')]

df_filtered_mins_60 = spark.createDataFrame(data_2, schema=schema)
df_filtered_mins_60 = df_filtered_mins_60.withColumn("start_timestamp", F.to_timestamp(df_filtered_mins_60["start_timestamp"], "yyyy-MM-dd'T'HH:mm:ss.SSS'+00:00'"))
df_filtered_mins_60 = df_filtered_mins_60.withColumn("stop_timestamp", F.to_timestamp(df_filtered_mins_60["stop_timestamp"], "yyyy-MM-dd'T'HH:mm:ss.SSS'+00:00'"))

df_filtered_mins_60 = df_filtered_mins_60.withColumn("counter", F.row_number().over(w))

df_selected
看起来像这样:

+-----------------------+----------+----+------------+---------+-----------+---------+--------+------+---------+-----+----------+
|DATEUPDATED            |DATE      |HOUR|CPSKU       |BB_Status|ActivePrice|PrevPrice|MinPrice|AsCost|MinMargin|CPT  |Comp_Price|
+-----------------------+----------+----+------------+---------+-----------+---------+--------+------+---------+-----+----------+
|2024-01-01 19:45:39.151|2024-01-01|0   |MSAN10115836|0        |14.86      |14.86    |14.86   |12.63 |0.0      |13.9 |5.84      |
|2024-01-01 19:55:10.904|2024-01-01|0   |MSAN10115836|0        |126.04     |126.04   |126.04  |108.96|0.0      |0.0  |93.54     |
|2024-01-01 20:35:10.904|2024-01-01|0   |MSAN10115836|0        |126.04     |126.04   |126.04  |108.96|0.0      |0.0  |93.54     |
|2024-01-15 12:55:18.528|2024-01-01|1   |PFXNDDF4OX  |1        |18.16      |18.16    |10.56   |26.85 |-199.0   |18.16|34.1      |
|2024-01-15 13:25:18.528|2024-01-01|1   |PFXNDDF4OX  |1        |18.16      |18.16    |10.56   |26.85 |-199.0   |18.16|34.1      |
|2024-01-15 13:35:18.528|2024-01-01|1   |PFXNDDF4OX  |1        |18.16      |18.16    |10.56   |26.85 |-199.0   |18.16|34.1      |
|2024-01-15 13:51:09.574|2024-01-01|1   |PFXNDDF4OX  |1        |20.16      |18.16    |10.56   |26.85 |-199.0   |18.16|34.1      |
|2024-01-15 07:28:48.265|2024-01-01|1   |DEWNDCB135C |0        |44.93      |44.93    |44.93   |38.09 |0.25     |26.9 |941.26    |
|2024-01-15 07:50:32.412|2024-01-01|1   |DEWNDCB135C |0        |44.93      |44.93    |44.93   |38.09 |0.25     |26.9 |941.26    |
|2024-01-15 07:52:32.412|2024-01-01|1   |DEWNDCB135C |0        |44.93      |44.93    |44.93   |38.09 |0.25     |26.9 |941.26    |
|2027-01-15 07:52:32.412|2024-01-01|1   |TEST        |0        |44.93      |44.93    |44.93   |38.09 |0.25     |26.9 |941.26    |
+-----------------------+----------+----+------------+---------+-----------+---------+--------+------+---------+-----+----------+

df_filtered_mins_60
看起来像这样:

+------------+-----------------------+-----------------------+-------+
|CPSKU       |start_timestamp        |stop_timestamp         |counter|
+------------+-----------------------+-----------------------+-------+
|MSAN10115836|2024-01-01 19:45:39.151|2024-01-01 20:35:10.904|1      |
|MSAN10115836|2024-01-08 06:04:16.484|2024-01-08 06:42:14.912|2      |
|DEWNDCB135C |2024-01-15 07:28:48.265|2024-01-15 07:52:32.412|3      |
|DEWNDCB135C |2024-01-15 11:37:56.698|2024-01-15 12:35:09.693|4      |
|PFXNDDF4OX  |2024-01-15 12:55:18.528|2024-01-15 13:51:09.574|5      |
|PFXNDDF4OX  |2024-01-15 19:25:10.15 |2024-01-15 20:24:36.385|6      |
+------------+-----------------------+-----------------------+-------+

这是连接和结果:

df_selected.join(
    df_filtered_mins_60, 
    on=[
        df_selected.CPSKU == df_filtered_mins_60.CPSKU,
        df_selected.DATEUPDATED >= df_filtered_mins_60.start_timestamp,
        df_selected.DATEUPDATED <= df_filtered_mins_60.stop_timestamp,
    ],
    how='left' # keep all rows from df_selected
).drop(
    df_filtered_mins_60.CPSKU, 
    df_filtered_mins_60.start_timestamp, 
    df_filtered_mins_60.stop_timestamp
).withColumn(
    'counter', F.coalesce(F.col('counter'), F.lit(0))
)

+-----------------------+----------+----+------------+---------+-----------+---------+--------+------+---------+-----+----------+-------+
|DATEUPDATED            |DATE      |HOUR|CPSKU       |BB_Status|ActivePrice|PrevPrice|MinPrice|AsCost|MinMargin|CPT  |Comp_Price|counter|
+-----------------------+----------+----+------------+---------+-----------+---------+--------+------+---------+-----+----------+-------+
|2024-01-01 19:45:39.151|2024-01-01|0   |MSAN10115836|0        |14.86      |14.86    |14.86   |12.63 |0.0      |13.9 |5.84      |1      |
|2024-01-01 19:55:10.904|2024-01-01|0   |MSAN10115836|0        |126.04     |126.04   |126.04  |108.96|0.0      |0.0  |93.54     |1      |
|2024-01-15 12:55:18.528|2024-01-01|1   |PFXNDDF4OX  |1        |18.16      |18.16    |10.56   |26.85 |-199.0   |18.16|34.1      |5      |
|2024-01-01 20:35:10.904|2024-01-01|0   |MSAN10115836|0        |126.04     |126.04   |126.04  |108.96|0.0      |0.0  |93.54     |1      |
|2024-01-15 13:25:18.528|2024-01-01|1   |PFXNDDF4OX  |1        |18.16      |18.16    |10.56   |26.85 |-199.0   |18.16|34.1      |5      |
|2024-01-15 13:35:18.528|2024-01-01|1   |PFXNDDF4OX  |1        |18.16      |18.16    |10.56   |26.85 |-199.0   |18.16|34.1      |5      |
|2024-01-15 07:28:48.265|2024-01-01|1   |DEWNDCB135C |0        |44.93      |44.93    |44.93   |38.09 |0.25     |26.9 |941.26    |3      |
|2024-01-15 13:51:09.574|2024-01-01|1   |PFXNDDF4OX  |1        |20.16      |18.16    |10.56   |26.85 |-199.0   |18.16|34.1      |5      |
|2024-01-15 07:50:32.412|2024-01-01|1   |DEWNDCB135C |0        |44.93      |44.93    |44.93   |38.09 |0.25     |26.9 |941.26    |3      |
|2024-01-15 07:52:32.412|2024-01-01|1   |DEWNDCB135C |0        |44.93      |44.93    |44.93   |38.09 |0.25     |26.9 |941.26    |3      |
|2027-01-15 07:52:32.412|2024-01-01|1   |TEST        |0        |44.93      |44.93    |44.93   |38.09 |0.25     |26.9 |941.26    |0      |
+-----------------------+----------+----+------------+---------+-----------+---------+--------+------+---------+-----+----------+-------+
© www.soinside.com 2019 - 2024. All rights reserved.