用 (py)spark 本机逻辑替换 for 循环,以进行连续流程步骤跟踪

问题描述 投票:0回答:1

主要问题:如何将这个特定的基于 for 循环的逻辑转换为可扩展的 pyspark 逻辑?

我正在建模一个流程,通过一系列步骤跟踪生产项目的进度。我收到以下数据,描述某个项目何时到达给定的流程设置:

+-------+-----+-----+-------+----------+----------+----------+----------+----------+
|item_ID|brand|model|country|     step1|     step2|     step3|     step4|     step5|
+-------+-----+-----+-------+----------+----------+----------+----------+----------+
|      1|    A|    X|     US|2024-02-05|2024-02-07|2024-02-09|2024-02-11|2024-02-13|
|      2|    A|    Y|     US|2024-02-01|2024-02-04|2024-02-09|2024-02-12|2024-02-15|
|      3|    B|    Z|     CA|2024-02-06|2024-02-09|2024-02-10|2024-02-11|2024-02-16|
|      4|    B|    Z|     CA|2024-02-08|2024-02-09|2024-02-12|2024-02-13|2024-02-14|
|      5|    C|    W|     US|2024-02-05|2024-02-06|2024-02-07|2024-02-09|2024-02-10|
|      6|    C|    W|     IT|2024-02-03|2024-02-07|2024-02-12|2024-02-14|2024-02-16|
+-------+-----+-----+-------+----------+----------+----------+----------+----------+

如何将此表转换为以下格式,unsing pyspark本机代码没有for循环,对于日期,国家/地区,品牌,型号的每个组合,指示每天特定步骤的项目数(此处过滤以显示目的):

+----------+-------+-----+-----+-----+-----+-----+-----+-----+
|      Date|Country|Brand|Model|step1|step2|step3|step4|step5|
+----------+-------+-----+-----+-----+-----+-----+-----+-----+
|2024-02-06|     CA|    B|    Z|    1|    0|    0|    0|    0|
|2024-02-07|     CA|    B|    Z|    1|    0|    0|    0|    0|
|2024-02-08|     CA|    B|    Z|    2|    0|    0|    0|    0|
|2024-02-09|     CA|    B|    Z|    0|    2|    0|    0|    0|
|2024-02-10|     CA|    B|    Z|    0|    1|    1|    0|    0|
|2024-02-11|     CA|    B|    Z|    0|    1|    0|    1|    0|
|2024-02-12|     CA|    B|    Z|    0|    0|    1|    1|    0|
|2024-02-13|     CA|    B|    Z|    0|    0|    0|    2|    0|
|2024-02-14|     CA|    B|    Z|    0|    0|    0|    1|    1|
|2024-02-15|     CA|    B|    Z|    0|    0|    0|    1|    1|
|2024-02-16|     CA|    B|    Z|    0|    0|    0|    0|    2|
|2024-02-17|     CA|    B|    Z|    0|    0|    0|    0|    2|
|2024-02-18|     CA|    B|    Z|    0|    0|    0|    0|    2|
+----------+-------+-----+-----+-----+-----+-----+-----+-----+

观察这两个加拿大物品如何通过 5 个步骤。

我能想到的最好的办法是下面基于 for 循环的逻辑,它不会在 Spark 集群上扩展,并且即使计算一小部分数据也需要足够的时间。

感谢您的帮助!

工作示例:

import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

# generate test data
data = {
    'item_ID': [1, 2, 3, 4, 5, 6],
    'brand': ['A', 'A', 'B', 'B', 'C', 'C'],
    'model': ['X', 'Y', 'Z', 'Z', 'W', 'W'],
    'country': ['US', 'US', 'CA', 'CA', 'US', 'IT'],
    'step1': ['2024-02-05', '2024-02-01', '2024-02-05', '2024-02-08', '2024-02-05', '2024-02-03'],
    'step2': ['2024-02-07', '2024-02-04', '2024-02-07', '2024-02-09', '2024-02-06', '2024-02-07'],
    'step3': ['2024-02-09', '2024-02-09', '2024-02-10', '2024-02-12', '2024-02-07', '2024-02-12'],
    'step4': ['2024-02-11', '2024-02-12', '2024-02-11', '2024-02-13', '2024-02-09', '2024-02-14'],
    'step5': ['2024-02-13', '2024-02-15', '2024-02-16', '2024-02-14', '2024-02-10', '2024-02-16'],
}
df_pd = pd.DataFrame(data)
steps = ['step1', 'step2', 'step3', 'step4', 'step5']
# prepare datatypes for pyspark & create spark dataframe
for col in steps:
    df_pd[col] = pd.to_datetime(df_pd[col]).dt.date
df_spark = spark.createDataFrame(df_pd)

# Define the start and end date for February 2024, generate time range of interest
start_date = '2024-02-01'
end_date = '2024-02-29'
date_range = pd.date_range(start=start_date, end=end_date, freq='D')

# replace the following _bad_ code by proper pyspark logic without for-loops
result_list = []
for step in ['step1', 'step2', 'step3', 'step4', 'step5']:
    for country in df_pd.country.unique():
        for brand in df_pd.brand.unique():
            for model in df_pd.model.unique():
                for date in date_range:
                    count = int((
                                (df_pd[step] <= date) &
                                (df_pd.country == country) &
                                (df_pd.brand == brand) &
                                (df_pd.model == model)
                                ).sum())
                    result_list.append((date.to_pydatetime().date(), step, country, brand, model, count))
explicit_steps = spark.createDataFrame(result_list, ["Date", "Step", "Country", "Brand", "Model", "Count"])

pivoted_df = explicit_steps.groupBy("Date", "Country", "Brand", "Model").pivot("Step").sum("Count")
result_df = (pivoted_df.withColumn("step1", F.col("step1") - F.col("step2"))
             .withColumn("step2", F.col("step2") - F.col("step3"))
             .withColumn("step3", F.col("step3") - F.col("step4"))
             .withColumn("step4", F.col("step4") - F.col("step5"))
             )

我尝试了各种方法来删除 forloop,以便将其转换为可在 Spark 集群上与大数据一起运行的可扩展代码。但到目前为止还没有成功。

apache-spark for-loop pyspark
1个回答
0
投票

很难理解你想要实现什么目标 例如:国家 CA 品牌 B 日期为 2024-02-07 步骤 1 中的源数据中没有任何内容,但转换数据中的计数为 1。同样,国家 CA 品牌 B 日期为 2024 年 2 月 08 日,步骤 1 的计数为 2,步骤 1 中的源数据中的计数为 1,但转换后的数据中的计数为 2。缺乏明确的预期

© www.soinside.com 2019 - 2024. All rights reserved.