主要问题:如何将这个特定的基于 for 循环的逻辑转换为可扩展的 pyspark 逻辑?
我正在建模一个流程,通过一系列步骤跟踪生产项目的进度。我收到以下数据,描述某个项目何时到达给定的流程设置:
+-------+-----+-----+-------+----------+----------+----------+----------+----------+
|item_ID|brand|model|country| step1| step2| step3| step4| step5|
+-------+-----+-----+-------+----------+----------+----------+----------+----------+
| 1| A| X| US|2024-02-05|2024-02-07|2024-02-09|2024-02-11|2024-02-13|
| 2| A| Y| US|2024-02-01|2024-02-04|2024-02-09|2024-02-12|2024-02-15|
| 3| B| Z| CA|2024-02-06|2024-02-09|2024-02-10|2024-02-11|2024-02-16|
| 4| B| Z| CA|2024-02-08|2024-02-09|2024-02-12|2024-02-13|2024-02-14|
| 5| C| W| US|2024-02-05|2024-02-06|2024-02-07|2024-02-09|2024-02-10|
| 6| C| W| IT|2024-02-03|2024-02-07|2024-02-12|2024-02-14|2024-02-16|
+-------+-----+-----+-------+----------+----------+----------+----------+----------+
如何将此表转换为以下格式,unsing pyspark本机代码没有for循环,对于日期,国家/地区,品牌,型号的每个组合,指示每天特定步骤的项目数(此处过滤以显示目的):
+----------+-------+-----+-----+-----+-----+-----+-----+-----+
| Date|Country|Brand|Model|step1|step2|step3|step4|step5|
+----------+-------+-----+-----+-----+-----+-----+-----+-----+
|2024-02-06| CA| B| Z| 1| 0| 0| 0| 0|
|2024-02-07| CA| B| Z| 1| 0| 0| 0| 0|
|2024-02-08| CA| B| Z| 2| 0| 0| 0| 0|
|2024-02-09| CA| B| Z| 0| 2| 0| 0| 0|
|2024-02-10| CA| B| Z| 0| 1| 1| 0| 0|
|2024-02-11| CA| B| Z| 0| 1| 0| 1| 0|
|2024-02-12| CA| B| Z| 0| 0| 1| 1| 0|
|2024-02-13| CA| B| Z| 0| 0| 0| 2| 0|
|2024-02-14| CA| B| Z| 0| 0| 0| 1| 1|
|2024-02-15| CA| B| Z| 0| 0| 0| 1| 1|
|2024-02-16| CA| B| Z| 0| 0| 0| 0| 2|
|2024-02-17| CA| B| Z| 0| 0| 0| 0| 2|
|2024-02-18| CA| B| Z| 0| 0| 0| 0| 2|
+----------+-------+-----+-----+-----+-----+-----+-----+-----+
观察这两个加拿大物品如何通过 5 个步骤。
我能想到的最好的办法是下面基于 for 循环的逻辑,它不会在 Spark 集群上扩展,并且即使计算一小部分数据也需要足够的时间。
感谢您的帮助!
工作示例:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
# generate test data
data = {
'item_ID': [1, 2, 3, 4, 5, 6],
'brand': ['A', 'A', 'B', 'B', 'C', 'C'],
'model': ['X', 'Y', 'Z', 'Z', 'W', 'W'],
'country': ['US', 'US', 'CA', 'CA', 'US', 'IT'],
'step1': ['2024-02-05', '2024-02-01', '2024-02-05', '2024-02-08', '2024-02-05', '2024-02-03'],
'step2': ['2024-02-07', '2024-02-04', '2024-02-07', '2024-02-09', '2024-02-06', '2024-02-07'],
'step3': ['2024-02-09', '2024-02-09', '2024-02-10', '2024-02-12', '2024-02-07', '2024-02-12'],
'step4': ['2024-02-11', '2024-02-12', '2024-02-11', '2024-02-13', '2024-02-09', '2024-02-14'],
'step5': ['2024-02-13', '2024-02-15', '2024-02-16', '2024-02-14', '2024-02-10', '2024-02-16'],
}
df_pd = pd.DataFrame(data)
steps = ['step1', 'step2', 'step3', 'step4', 'step5']
# prepare datatypes for pyspark & create spark dataframe
for col in steps:
df_pd[col] = pd.to_datetime(df_pd[col]).dt.date
df_spark = spark.createDataFrame(df_pd)
# Define the start and end date for February 2024, generate time range of interest
start_date = '2024-02-01'
end_date = '2024-02-29'
date_range = pd.date_range(start=start_date, end=end_date, freq='D')
# replace the following _bad_ code by proper pyspark logic without for-loops
result_list = []
for step in ['step1', 'step2', 'step3', 'step4', 'step5']:
for country in df_pd.country.unique():
for brand in df_pd.brand.unique():
for model in df_pd.model.unique():
for date in date_range:
count = int((
(df_pd[step] <= date) &
(df_pd.country == country) &
(df_pd.brand == brand) &
(df_pd.model == model)
).sum())
result_list.append((date.to_pydatetime().date(), step, country, brand, model, count))
explicit_steps = spark.createDataFrame(result_list, ["Date", "Step", "Country", "Brand", "Model", "Count"])
pivoted_df = explicit_steps.groupBy("Date", "Country", "Brand", "Model").pivot("Step").sum("Count")
result_df = (pivoted_df.withColumn("step1", F.col("step1") - F.col("step2"))
.withColumn("step2", F.col("step2") - F.col("step3"))
.withColumn("step3", F.col("step3") - F.col("step4"))
.withColumn("step4", F.col("step4") - F.col("step5"))
)
我尝试了各种方法来删除 forloop,以便将其转换为可在 Spark 集群上与大数据一起运行的可扩展代码。但到目前为止还没有成功。
很难理解你想要实现什么目标 例如:国家 CA 品牌 B 日期为 2024-02-07 步骤 1 中的源数据中没有任何内容,但转换数据中的计数为 1。同样,国家 CA 品牌 B 日期为 2024 年 2 月 08 日,步骤 1 的计数为 2,步骤 1 中的源数据中的计数为 1,但转换后的数据中的计数为 2。缺乏明确的预期