我有两个数据框:df_selected 和 df_filtered_mins_60
df_filtered_mins_60.columns()
输出:[“CSku”,“start_timestamp”,“end_timestamp”]
df_selected.columns()
输出:["DATEUPDATED", "DATE", "HOUR", "CPSKU", "BB_Status", “ActivePrice”,“PrevPrice”,“MinPrice”,“AsCost”, “MinMargin”、“CPT”、“Comp_Price”、“AP_MSG”]
df_selected.count()
输出:7,816,521
df_filtered_mins_60.count()
产量:112,397
我想要实现的是:
迭代 df_filtered_mins_60,每行取:
开始时间 = 开始时间戳
停止时间 = 结束时间戳
sku = CSku
在 df_selected WHEN 上应用以下条件:
DATEUPDATED 等于或介于 start_time 和 stop_time 之间
和 CPSKU = sku
然后为满足此条件的所有行分配一个常数 i。继续执行此操作,直到 df_filtered_mins_60 中的行末尾。每次更新增量后 i=i+1
我写的代码如下。这段代码永远不会执行,而是卡在某个地方。它会持续运行几个小时,直到我强行停止它。
i = 1
df_selected = df_selected.withColumn("counter", lit(0))
# Iterate through each row of df_filtered_mins_60
for row in df_filtered_mins_60.collect():
sku = row['CSku']
start_time = row['start_timestamp']
stop_time = row['stop_timestamp']
# Apply conditions on df_selected and update "counter" column
df_selected = df_selected.withColumn("counter",
when((df_selected.DATEUPDATED >= start_time) &
(df_selected.DATEUPDATED <= stop_time) &
(df_selected.CPSKU == sku),
lit(i)).otherwise(df_selected.counter))
i += 1
# Display the updated df_selected DataFrame with the "counter" column
display(df_selected)
我分配计数器是因为我需要 df_selected 中的一组行,这些行位于每个 SKU 的某些时间窗口之间,并且此信息存在于 df_filtered_mins_60 中。分配计数器后,我需要对 df_selected 中的其他列执行聚合。基本上,对于每个窗口,我需要深入了解特定时间窗口内发生的情况。
我需要在 Pyspark 中获取正确的代码才能在 Databricks 上运行。
生成样本数据:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
# Initialize SparkSession
spark_a = SparkSession.builder \
.appName("Create DataFrame") \
.getOrCreate()
schema = StructType([
StructField("DATEUPDATED", StringType(), True),
StructField("DATE", StringType(), True),
StructField("HOUR", IntegerType(), True),
StructField("CPSKU", StringType(), True),
StructField("BB_Status", IntegerType(), True),
StructField("ActivePrice", DoubleType(), True),
StructField("PrevPrice", DoubleType(), True),
StructField("MinPrice", DoubleType(), True),
StructField("AsCost", DoubleType(), True),
StructField("MinMargin", DoubleType(), True),
StructField("CPT", DoubleType(), True),
StructField("Comp_Price", DoubleType(), True)
])
data=[('2024-01-01T19:45:39.151+00:00','2024-01-01',0,'MSAN10115836',0,14.86,14.86,14.86,12.63,0.00,13.90,5.84) ,
('2024-01-01T19:55:10.904+00:00','2024-01-01',0,'MSAN10115836',0,126.04,126.04,126.04,108.96,0.00,0.00,93.54),
('2024-01-01T20:35:10.904+00:00','2024-01-01',0,'MSAN10115836',0,126.04,126.04,126.04,108.96,0.00,0.00,93.54),
('2024-01-15T12:55:18.528+00:00','2024-01-01',1,'PFXNDDF4OX',1,18.16,18.16,10.56,26.85,-199.00,18.16,34.10) ,
('2024-01-15T13:25:18.528+00:00','2024-01-01',1,'PFXNDDF4OX',1,18.16,18.16,10.56,26.85,-199.00,18.16,34.10) ,
('2024-01-15T13:35:18.528+00:00','2024-01-01',1,'PFXNDDF4OX',1,18.16,18.16,10.56,26.85,-199.00,18.16,34.10) ,
('2024-01-15T13:51:09.574+00:00','2024-01-01',1,'PFXNDDF4OX',1,20.16,18.16,10.56,26.85,-199.00,18.16,34.10) ,
('2024-01-15T07:28:48.265+00:00','2024-01-01',1,'DEWNDCB135C',0,44.93,44.93,44.93,38.09,0.25,26.9,941.26),
('2024-01-15T07:50:32.412+00:00','2024-01-01',1,'DEWNDCB135C',0,44.93,44.93,44.93,38.09,0.25,26.9,941.26),
('2024-01-15T07:52:32.412+00:00','2024-01-01',1,'DEWNDCB135C',0,44.93,44.93,44.93,38.09,0.25,26.9,941.26)]
df_selected = spark.createDataFrame(data, schema=schema)
df_selected = df_selected.withColumn("DateUpdated", to_timestamp(df_selected["DATEUPDATED"], "yyyy-MM-dd'T'HH:mm:ss.SSS'+00:00'"))
display(df_selected)
第二个数据框:
schema = StructType([
StructField("CPSKU", StringType(), True),
StructField("start_timestamp", StringType(), True),
StructField("stop_timestamp", StringType(), True)
])
data_2=[('MSAN10115836','2024-01-01T19:45:39.151+00:00','2024-01-01T20:35:10.904+00:00'),
('MSAN10115836','2024-01-08T06:04:16.484+00:00','2024-01-08T06:42:14.912+00:00'),
('DEWNDCB135C','2024-01-15T07:28:48.265+00:00','2024-01-15T07:52:32.412+00:00'),
('DEWNDCB135C','2024-01-15T11:37:56.698+00:00','2024-01-15T12:35:09.693+00:00'),
('PFXNDDF4OX','2024-01-15T12:55:18.528+00:00','2024-01-15T13:51:09.574+00:00'),
('PFXNDDF4OX','2024-01-15T19:25:10.150+00:00','2024-01-15T20:24:36.385+00:00')]
df_filtered_mins_60 = spark.createDataFrame(data_2, schema=schema)
df_filtered_mins_60 = df_filtered_mins_60.withColumn("start_timestamp", to_timestamp(df_filtered_mins_60["start_timestamp"], "yyyy-MM-dd'T'HH:mm:ss.SSS'+00:00'"))
df_filtered_mins_60 = df_filtered_mins_60.withColumn("stop_timestamp", to_timestamp(df_filtered_mins_60["stop_timestamp"], "yyyy-MM-dd'T'HH:mm:ss.SSS'+00:00'"))
display(df_filtered_mins_60)
下面的代码块确实效率很低,因为
.collect()
会将所有数据加载到驱动程序中(并且不利用正在分发的数据),然后对于循环的每次迭代,您都会覆盖整个列。
# Iterate through each row of df_filtered_mins_60
for row in df_filtered_mins_60.collect():
sku = row['CSku']
start_time = row['start_timestamp']
stop_time = row['stop_timestamp']
# Apply conditions on df_selected and update "counter" column
df_selected = df_selected.withColumn("counter",
when((df_selected.DATEUPDATED >= start_time) &
(df_selected.DATEUPDATED <= stop_time) &
(df_selected.CPSKU == sku),
lit(i)).otherwise(df_selected.counter))
i += 1
如果您需要有序的计数器行,您可以为
df_filtered_mins_60
的每一行分配一个行号,然后根据条件将 df_selected
与 df_filtered_mins_60
左连接:
[
df_selected.CPSKU == df_filtered_mins_60.CPSKU,
df_selected.DATEUPDATED >= df_filtered_mins_60.start_timestamp,
df_selected.DATEUPDATED <= df_filtered_mins_60.stop_timestamp,
]
这将保留
df_selected
的所有行,包括那些不满足连接条件的行。然后您可以将 0 分配给计数器为 NULL
的任何行。
下面是一个完全可重现的示例(我在 df_selected 中添加了一行不满足连接条件的行,只是为了显示会发生什么):
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
from pyspark.sql.window import Window
# Initialize SparkSession
spark = SparkSession.builder \
.appName("Create DataFrame") \
.getOrCreate()
schema = StructType([
StructField("DATEUPDATED", StringType(), True),
StructField("DATE", StringType(), True),
StructField("HOUR", IntegerType(), True),
StructField("CPSKU", StringType(), True),
StructField("BB_Status", IntegerType(), True),
StructField("ActivePrice", DoubleType(), True),
StructField("PrevPrice", DoubleType(), True),
StructField("MinPrice", DoubleType(), True),
StructField("AsCost", DoubleType(), True),
StructField("MinMargin", DoubleType(), True),
StructField("CPT", DoubleType(), True),
StructField("Comp_Price", DoubleType(), True)
])
data=[('2024-01-01T19:45:39.151+00:00','2024-01-01',0,'MSAN10115836',0,14.86,14.86,14.86,12.63,0.00,13.90,5.84) ,
('2024-01-01T19:55:10.904+00:00','2024-01-01',0,'MSAN10115836',0,126.04,126.04,126.04,108.96,0.00,0.00,93.54),
('2024-01-01T20:35:10.904+00:00','2024-01-01',0,'MSAN10115836',0,126.04,126.04,126.04,108.96,0.00,0.00,93.54),
('2024-01-15T12:55:18.528+00:00','2024-01-01',1,'PFXNDDF4OX',1,18.16,18.16,10.56,26.85,-199.00,18.16,34.10) ,
('2024-01-15T13:25:18.528+00:00','2024-01-01',1,'PFXNDDF4OX',1,18.16,18.16,10.56,26.85,-199.00,18.16,34.10) ,
('2024-01-15T13:35:18.528+00:00','2024-01-01',1,'PFXNDDF4OX',1,18.16,18.16,10.56,26.85,-199.00,18.16,34.10) ,
('2024-01-15T13:51:09.574+00:00','2024-01-01',1,'PFXNDDF4OX',1,20.16,18.16,10.56,26.85,-199.00,18.16,34.10) ,
('2024-01-15T07:28:48.265+00:00','2024-01-01',1,'DEWNDCB135C',0,44.93,44.93,44.93,38.09,0.25,26.9,941.26),
('2024-01-15T07:50:32.412+00:00','2024-01-01',1,'DEWNDCB135C',0,44.93,44.93,44.93,38.09,0.25,26.9,941.26),
('2024-01-15T07:52:32.412+00:00','2024-01-01',1,'DEWNDCB135C',0,44.93,44.93,44.93,38.09,0.25,26.9,941.26),
('2027-01-15T07:52:32.412+00:00','2024-01-01',1,'TEST',0,44.93,44.93,44.93,38.09,0.25,26.9,941.26)]
df_selected = spark.createDataFrame(data, schema=schema)
df_selected = df_selected.withColumn("DATEUPDATED", to_timestamp(df_selected["DATEUPDATED"], "yyyy-MM-dd'T'HH:mm:ss.SSS'+00:00'"))
schema = StructType([
StructField("CPSKU", StringType(), True),
StructField("start_timestamp", StringType(), True),
StructField("stop_timestamp", StringType(), True)
])
data_2=[('MSAN10115836','2024-01-01T19:45:39.151+00:00','2024-01-01T20:35:10.904+00:00'),
('MSAN10115836','2024-01-08T06:04:16.484+00:00','2024-01-08T06:42:14.912+00:00'),
('DEWNDCB135C','2024-01-15T07:28:48.265+00:00','2024-01-15T07:52:32.412+00:00'),
('DEWNDCB135C','2024-01-15T11:37:56.698+00:00','2024-01-15T12:35:09.693+00:00'),
('PFXNDDF4OX','2024-01-15T12:55:18.528+00:00','2024-01-15T13:51:09.574+00:00'),
('PFXNDDF4OX','2024-01-15T19:25:10.150+00:00','2024-01-15T20:24:36.385+00:00')]
df_filtered_mins_60 = spark.createDataFrame(data_2, schema=schema)
df_filtered_mins_60 = df_filtered_mins_60.withColumn("start_timestamp", F.to_timestamp(df_filtered_mins_60["start_timestamp"], "yyyy-MM-dd'T'HH:mm:ss.SSS'+00:00'"))
df_filtered_mins_60 = df_filtered_mins_60.withColumn("stop_timestamp", F.to_timestamp(df_filtered_mins_60["stop_timestamp"], "yyyy-MM-dd'T'HH:mm:ss.SSS'+00:00'"))
df_filtered_mins_60 = df_filtered_mins_60.withColumn("counter", F.row_number().over(w))
df_selected
看起来像这样:
+-----------------------+----------+----+------------+---------+-----------+---------+--------+------+---------+-----+----------+
|DATEUPDATED |DATE |HOUR|CPSKU |BB_Status|ActivePrice|PrevPrice|MinPrice|AsCost|MinMargin|CPT |Comp_Price|
+-----------------------+----------+----+------------+---------+-----------+---------+--------+------+---------+-----+----------+
|2024-01-01 19:45:39.151|2024-01-01|0 |MSAN10115836|0 |14.86 |14.86 |14.86 |12.63 |0.0 |13.9 |5.84 |
|2024-01-01 19:55:10.904|2024-01-01|0 |MSAN10115836|0 |126.04 |126.04 |126.04 |108.96|0.0 |0.0 |93.54 |
|2024-01-01 20:35:10.904|2024-01-01|0 |MSAN10115836|0 |126.04 |126.04 |126.04 |108.96|0.0 |0.0 |93.54 |
|2024-01-15 12:55:18.528|2024-01-01|1 |PFXNDDF4OX |1 |18.16 |18.16 |10.56 |26.85 |-199.0 |18.16|34.1 |
|2024-01-15 13:25:18.528|2024-01-01|1 |PFXNDDF4OX |1 |18.16 |18.16 |10.56 |26.85 |-199.0 |18.16|34.1 |
|2024-01-15 13:35:18.528|2024-01-01|1 |PFXNDDF4OX |1 |18.16 |18.16 |10.56 |26.85 |-199.0 |18.16|34.1 |
|2024-01-15 13:51:09.574|2024-01-01|1 |PFXNDDF4OX |1 |20.16 |18.16 |10.56 |26.85 |-199.0 |18.16|34.1 |
|2024-01-15 07:28:48.265|2024-01-01|1 |DEWNDCB135C |0 |44.93 |44.93 |44.93 |38.09 |0.25 |26.9 |941.26 |
|2024-01-15 07:50:32.412|2024-01-01|1 |DEWNDCB135C |0 |44.93 |44.93 |44.93 |38.09 |0.25 |26.9 |941.26 |
|2024-01-15 07:52:32.412|2024-01-01|1 |DEWNDCB135C |0 |44.93 |44.93 |44.93 |38.09 |0.25 |26.9 |941.26 |
|2027-01-15 07:52:32.412|2024-01-01|1 |TEST |0 |44.93 |44.93 |44.93 |38.09 |0.25 |26.9 |941.26 |
+-----------------------+----------+----+------------+---------+-----------+---------+--------+------+---------+-----+----------+
df_filtered_mins_60
看起来像这样:
+------------+-----------------------+-----------------------+-------+
|CPSKU |start_timestamp |stop_timestamp |counter|
+------------+-----------------------+-----------------------+-------+
|MSAN10115836|2024-01-01 19:45:39.151|2024-01-01 20:35:10.904|1 |
|MSAN10115836|2024-01-08 06:04:16.484|2024-01-08 06:42:14.912|2 |
|DEWNDCB135C |2024-01-15 07:28:48.265|2024-01-15 07:52:32.412|3 |
|DEWNDCB135C |2024-01-15 11:37:56.698|2024-01-15 12:35:09.693|4 |
|PFXNDDF4OX |2024-01-15 12:55:18.528|2024-01-15 13:51:09.574|5 |
|PFXNDDF4OX |2024-01-15 19:25:10.15 |2024-01-15 20:24:36.385|6 |
+------------+-----------------------+-----------------------+-------+
这是连接和结果:
df_selected.join(
df_filtered_mins_60,
on=[
df_selected.CPSKU == df_filtered_mins_60.CPSKU,
df_selected.DATEUPDATED >= df_filtered_mins_60.start_timestamp,
df_selected.DATEUPDATED <= df_filtered_mins_60.stop_timestamp,
],
how='left' # keep all rows from df_selected
).drop(
df_filtered_mins_60.CPSKU,
df_filtered_mins_60.start_timestamp,
df_filtered_mins_60.stop_timestamp
).withColumn(
'counter', F.coalesce(F.col('counter'), F.lit(0))
)
+-----------------------+----------+----+------------+---------+-----------+---------+--------+------+---------+-----+----------+-------+
|DATEUPDATED |DATE |HOUR|CPSKU |BB_Status|ActivePrice|PrevPrice|MinPrice|AsCost|MinMargin|CPT |Comp_Price|counter|
+-----------------------+----------+----+------------+---------+-----------+---------+--------+------+---------+-----+----------+-------+
|2024-01-01 19:45:39.151|2024-01-01|0 |MSAN10115836|0 |14.86 |14.86 |14.86 |12.63 |0.0 |13.9 |5.84 |1 |
|2024-01-01 19:55:10.904|2024-01-01|0 |MSAN10115836|0 |126.04 |126.04 |126.04 |108.96|0.0 |0.0 |93.54 |1 |
|2024-01-15 12:55:18.528|2024-01-01|1 |PFXNDDF4OX |1 |18.16 |18.16 |10.56 |26.85 |-199.0 |18.16|34.1 |5 |
|2024-01-01 20:35:10.904|2024-01-01|0 |MSAN10115836|0 |126.04 |126.04 |126.04 |108.96|0.0 |0.0 |93.54 |1 |
|2024-01-15 13:25:18.528|2024-01-01|1 |PFXNDDF4OX |1 |18.16 |18.16 |10.56 |26.85 |-199.0 |18.16|34.1 |5 |
|2024-01-15 13:35:18.528|2024-01-01|1 |PFXNDDF4OX |1 |18.16 |18.16 |10.56 |26.85 |-199.0 |18.16|34.1 |5 |
|2024-01-15 07:28:48.265|2024-01-01|1 |DEWNDCB135C |0 |44.93 |44.93 |44.93 |38.09 |0.25 |26.9 |941.26 |3 |
|2024-01-15 13:51:09.574|2024-01-01|1 |PFXNDDF4OX |1 |20.16 |18.16 |10.56 |26.85 |-199.0 |18.16|34.1 |5 |
|2024-01-15 07:50:32.412|2024-01-01|1 |DEWNDCB135C |0 |44.93 |44.93 |44.93 |38.09 |0.25 |26.9 |941.26 |3 |
|2024-01-15 07:52:32.412|2024-01-01|1 |DEWNDCB135C |0 |44.93 |44.93 |44.93 |38.09 |0.25 |26.9 |941.26 |3 |
|2027-01-15 07:52:32.412|2024-01-01|1 |TEST |0 |44.93 |44.93 |44.93 |38.09 |0.25 |26.9 |941.26 |0 |
+-----------------------+----------+----+------------+---------+-----------+---------+--------+------+---------+-----+----------+-------+