我有一个数据集,用于跟踪某些物品从仓库到供应商的旅程的时间戳数据。通常每个项目应该有三个状态(跟踪 ID)。但有时配送中心可能会出现迟到或没有到达扫描的情况,这就是我担心的问题。 schema和df如下:
sample_df = [("A001","23/10/2019 11:06:46","DEPART_FROM_FACTORY"),
("A001","23/10/2019 11:08:35","ARRIVED_AT_DISTRIBUTION"),# arrived on the same day, good compliance
("A001","25/10/2019 13:36:14","VENDOR_ACCEPTED"),
("A002","01/10/2019 13:06:46","DEPART_FROM_FACTORY"),
("A002","02/10/2019 09:08:35","ARRIVED_AT_DISTRIBUTION"),#Not arrived on the same day, bad compliance
("A002","03/10/2019 12:36:14","VENDOR_ACCEPTED"),
("A003","07/10/2019 13:06:46","DEPART_FROM_FACTORY"),
("A003","08/10/2019 09:08:35","VENDOR_ACCEPTED"), # no ARRIVED_AT_DISTRIBUTION, bad compliance
]
schema = StructType([ StructField("tracking_id", StringType(), True),
StructField("created_at", StringType(), True),
StructField("status", StringType(), True),
])
sample_df = spark.createDataFrame(sample_df, schema)
sample_df = sample_df.withColumn("created_at",to_timestamp(col("created_at"),"dd/MM/yyyy HH:mm:ss"))
+-----------+--------------------+--------------------+
|tracking_id| created_at| status|
+-----------+--------------------+--------------------+
| A001|23/10/2019 11:06:46| DEPART_FROM_FACTORY|
| A001|23/10/2019 11:08:35|ARRIVED_AT_DISTRI...|
| A001|25/10/2019 13:36:14| VENDOR_ACCEPTED|
| A002|01/10/2019 13:06:46| DEPART_FROM_FACTORY|
| A002|02/10/2019 09:08:35|ARRIVED_AT_DISTRI...|
| A002|03/10/2019 12:36:14| VENDOR_ACCEPTED|
| A003|07/10/2019 13:06:46| DEPART_FROM_FACTORY|
| A003|08/10/2019 09:08:35| VENDOR_ACCEPTED|
+-----------+--------------------+--------------------+
root
|-- tracking_id: string (nullable = true)
|-- created_at: string (nullable = true)
|-- status: timestamp (nullable = true)
之后,我尝试分组并通过跟踪ID创建分组数据集,并且:
agg = sample_df.groupBy("tracking_id").agg(collect_list(struct(
col("tracking_id").alias("tracking_id"),
col("created_at").alias("created_at"),
col("status").alias("status")
)).alias("tbl"))
agg.show(truncate=False) ; agg.printSchema()
root
|-- tracking_id: string (nullable = true)
|-- tbl: array (nullable = false)
| |-- element: struct (containsNull = false)
| | |-- tracking_id: string (nullable = true)
| | |-- created_at: timestamp (nullable = true)
| | |-- status: string (nullable = true)
预期输出: 作为初步想法,我想创建一个 UDF 或开发一些代码,可以查看结构数组(“tbl”)并根据状态的不存在/存在以及它们是否存在在新列中返回布尔值是否在同一天创建。最终结果应该是:
+-----------+-------------------+-----------------------+
|tracking_id|tbl |Compliance |
+-----------+-------------------+-----------------------+
|A001 |... |True |
|A002 |... |False |
|A003 |... |False |
+-----------+-------------------+-----------------------+
如有任何帮助,我们将不胜感激。
您不必使用
collect_list
+ 高阶函数。 compliance
列可以这样计算:
from pyspark.sql import types as T
from pyspark.sql import functions as F
sample_df = [
('A001', '23/10/2019 11:06:46', 'DEPART_FROM_FACTORY'),
('A001', '23/10/2019 11:08:35', 'ARRIVED_AT_DISTRIBUTION'),
('A001', '25/10/2019 13:36:14', 'VENDOR_ACCEPTED'),
('A002', '01/10/2019 13:06:46', 'DEPART_FROM_FACTORY'),
('A002', '02/10/2019 09:08:35', 'ARRIVED_AT_DISTRIBUTION'),
('A002', '03/10/2019 12:36:14', 'VENDOR_ACCEPTED'),
('A003', '07/10/2019 13:06:46', 'DEPART_FROM_FACTORY'),
('A003', '08/10/2019 09:08:35', 'VENDOR_ACCEPTED'),
]
schema = T.StructType([
T.StructField('tracking_id', T.StringType(), True),
T.StructField('created_at', T.StringType(), True),
T.StructField('status', T.StringType(), True),
])
sample_df = spark.createDataFrame(sample_df, schema)
created_at = F.to_timestamp('created_at', 'dd/MM/yyyy HH:mm:ss')
sample_df = sample_df.withColumn('created_at', created_at)
is_depart = F.col('status') == 'DEPART_FROM_FACTORY'
is_arrived = F.col('status') == 'ARRIVED_AT_DISTRIBUTION'
is_accepted = F.col('status') == 'VENDOR_ACCEPTED'
departed_day = F.when(is_depart, F.col('created_at'))
arrived_day = F.when(is_arrived, F.col('created_at'))
agg_cols = [
F.sum(is_depart.cast(T.ByteType())).alias('n_depart'),
F.sum(is_arrived.cast(T.ByteType())).alias('n_arrived'),
F.sum(is_accepted.cast(T.ByteType())).alias('n_accepted'),
F.min(departed_day).alias('departed_day'),
F.min(arrived_day).alias('arrived_day')
]
res = sample_df.groupBy('tracking_id').agg(*agg_cols)
all_statuses = """
n_depart > 0 AND n_arrived > 0 AND n_accepted > 0
"""
all_statuses = F.expr(all_statuses)
same_day = F.to_date('departed_day') == F.to_date('arrived_day')
compliance = all_statuses & same_day
res = res.withColumns({
'compliance': compliance
})
res.show(3, False)
# +-----------+--------+---------+----------+-------------------+-------------------+----------+
# |tracking_id|n_depart|n_arrived|n_accepted|departed_day |arrived_day |compliance|
# +-----------+--------+---------+----------+-------------------+-------------------+----------+
# |A001 |1 |1 |1 |2019-10-23 11:06:46|2019-10-23 11:08:35|true |
# |A002 |1 |1 |1 |2019-10-01 13:06:46|2019-10-02 09:08:35|false |
# |A003 |1 |0 |1 |2019-10-07 13:06:46|null |false |
# +-----------+--------+---------+----------+-------------------+-------------------+----------+