PySpark:根据时间和某些字符串的存在/不存在与结构体数组列中的元素交互来开发函数

问题描述 投票:0回答:1

我有一个数据集,用于跟踪某些物品从仓库到供应商的旅程的时间戳数据。通常每个项目应该有三个状态(跟踪 ID)。但有时配送中心可能会出现迟到或没有到达扫描的情况,这就是我担心的问题。 schema和df如下:

sample_df = [("A001","23/10/2019  11:06:46","DEPART_FROM_FACTORY"),
              ("A001","23/10/2019  11:08:35","ARRIVED_AT_DISTRIBUTION"),# arrived on the same day, good compliance
              ("A001","25/10/2019  13:36:14","VENDOR_ACCEPTED"),

              ("A002","01/10/2019  13:06:46","DEPART_FROM_FACTORY"),
              ("A002","02/10/2019  09:08:35","ARRIVED_AT_DISTRIBUTION"),#Not arrived on the same day, bad compliance
              ("A002","03/10/2019  12:36:14","VENDOR_ACCEPTED"), 

              ("A003","07/10/2019  13:06:46","DEPART_FROM_FACTORY"),
              ("A003","08/10/2019  09:08:35","VENDOR_ACCEPTED"), # no ARRIVED_AT_DISTRIBUTION, bad compliance
              ]

schema = StructType([ StructField("tracking_id", StringType(), True),
                      StructField("created_at", StringType(), True),
                      StructField("status", StringType(), True),
                     ])

sample_df = spark.createDataFrame(sample_df, schema)
sample_df = sample_df.withColumn("created_at",to_timestamp(col("created_at"),"dd/MM/yyyy  HH:mm:ss"))
+-----------+--------------------+--------------------+
|tracking_id|          created_at|              status|
+-----------+--------------------+--------------------+
|       A001|23/10/2019  11:06:46| DEPART_FROM_FACTORY|
|       A001|23/10/2019  11:08:35|ARRIVED_AT_DISTRI...|
|       A001|25/10/2019  13:36:14|     VENDOR_ACCEPTED|
|       A002|01/10/2019  13:06:46| DEPART_FROM_FACTORY|
|       A002|02/10/2019  09:08:35|ARRIVED_AT_DISTRI...|
|       A002|03/10/2019  12:36:14|     VENDOR_ACCEPTED|
|       A003|07/10/2019  13:06:46| DEPART_FROM_FACTORY|
|       A003|08/10/2019  09:08:35|     VENDOR_ACCEPTED|
+-----------+--------------------+--------------------+

root
 |-- tracking_id: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- status: timestamp (nullable = true)

之后,我尝试分组并通过跟踪ID创建分组数据集,并且:

agg = sample_df.groupBy("tracking_id").agg(collect_list(struct(
                col("tracking_id").alias("tracking_id"),
                col("created_at").alias("created_at"),
                col("status").alias("status")
            )).alias("tbl"))



agg.show(truncate=False) ; agg.printSchema()
root
 |-- tracking_id: string (nullable = true)
 |-- tbl: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- tracking_id: string (nullable = true)
 |    |    |-- created_at: timestamp (nullable = true)
 |    |    |-- status: string (nullable = true)

预期输出: 作为初步想法,我想创建一个 UDF 或开发一些代码,可以查看结构数组(“tbl”)并根据状态的不存在/存在以及它们是否存在在新列中返回布尔值是否在同一天创建。最终结果应该是:

+-----------+-------------------+-----------------------+
|tracking_id|tbl                |Compliance             |
+-----------+-------------------+-----------------------+
|A001       |...                |True                   |
|A002       |...                |False                  |
|A003       |...                |False                  |
+-----------+-------------------+-----------------------+

如有任何帮助,我们将不胜感激。

python arrays pyspark struct timestamp
1个回答
0
投票

您不必使用

collect_list
+ 高阶函数。
compliance
列可以这样计算:

from pyspark.sql import types as T
from pyspark.sql import functions as F

sample_df = [
    ('A001', '23/10/2019  11:06:46', 'DEPART_FROM_FACTORY'),
    ('A001', '23/10/2019  11:08:35', 'ARRIVED_AT_DISTRIBUTION'),
    ('A001', '25/10/2019  13:36:14', 'VENDOR_ACCEPTED'),

    ('A002', '01/10/2019  13:06:46', 'DEPART_FROM_FACTORY'),
    ('A002', '02/10/2019  09:08:35', 'ARRIVED_AT_DISTRIBUTION'),
    ('A002', '03/10/2019  12:36:14', 'VENDOR_ACCEPTED'),

    ('A003', '07/10/2019  13:06:46', 'DEPART_FROM_FACTORY'),
    ('A003', '08/10/2019  09:08:35', 'VENDOR_ACCEPTED'),
]
schema = T.StructType([
    T.StructField('tracking_id', T.StringType(), True),
    T.StructField('created_at', T.StringType(), True),
    T.StructField('status', T.StringType(), True),
])
sample_df = spark.createDataFrame(sample_df, schema)
created_at = F.to_timestamp('created_at', 'dd/MM/yyyy  HH:mm:ss')
sample_df = sample_df.withColumn('created_at', created_at)

is_depart = F.col('status') == 'DEPART_FROM_FACTORY'
is_arrived = F.col('status') == 'ARRIVED_AT_DISTRIBUTION'
is_accepted = F.col('status') == 'VENDOR_ACCEPTED'

departed_day = F.when(is_depart, F.col('created_at'))
arrived_day = F.when(is_arrived, F.col('created_at'))
agg_cols = [
    F.sum(is_depart.cast(T.ByteType())).alias('n_depart'),
    F.sum(is_arrived.cast(T.ByteType())).alias('n_arrived'),
    F.sum(is_accepted.cast(T.ByteType())).alias('n_accepted'),

    F.min(departed_day).alias('departed_day'),
    F.min(arrived_day).alias('arrived_day')
]
res = sample_df.groupBy('tracking_id').agg(*agg_cols)

all_statuses = """
n_depart > 0 AND n_arrived > 0 AND n_accepted > 0
"""
all_statuses = F.expr(all_statuses)
same_day = F.to_date('departed_day') == F.to_date('arrived_day')

compliance = all_statuses & same_day
res = res.withColumns({
    'compliance': compliance
})
res.show(3, False)

# +-----------+--------+---------+----------+-------------------+-------------------+----------+
# |tracking_id|n_depart|n_arrived|n_accepted|departed_day       |arrived_day        |compliance|
# +-----------+--------+---------+----------+-------------------+-------------------+----------+
# |A001       |1       |1        |1         |2019-10-23 11:06:46|2019-10-23 11:08:35|true      |
# |A002       |1       |1        |1         |2019-10-01 13:06:46|2019-10-02 09:08:35|false     |
# |A003       |1       |0        |1         |2019-10-07 13:06:46|null               |false     |
# +-----------+--------+---------+----------+-------------------+-------------------+----------+
© www.soinside.com 2019 - 2024. All rights reserved.