我在pyspark中具有以下数据框:
Name | Seconds
|Enviar solicitud ...| 1415
|Analizar mapa de ...| 1209|
|Modificar solicit...| 591|
|Entregar servicio...|91049|
我希望将seconds
列转换为日期或时间戳(希望是todate),我正在尝试使用以下功能
def to_date(seconds=0):
dat = ''
if seconds == 0:
dat = '0'
if (seconds / 86400) >= 1:
day = (int(seconds / 86400))
seconds = (seconds - 86400 * int(seconds / 86400))
dat = f'{day}d '
if (seconds / 3600) >= 1:
hour = (int(seconds / 3600))
seconds = (seconds - 3600 * int(seconds / 3600))
dat = dat + f'{hour}hr '
if (seconds / 60) >= 1:
minutes = (int(seconds / 60))
dat = dat + f'{minutes}min'
else:
return '0min'
return dat
但是在pyspark中没有像Pandas .apply(to_date)
这样的简单方法,有没有实现我想要做的事情?
期望的输出:
Analizar mapa de comparacion de presupuestos 1209 20min
Crear mapa de comparacion de presupuestos 12155 3hr 22min
Entregar servicios de bienes 91049 1d 1hr 17min
我认为,如果没有UDF,就可以实现这一点,它将更快,更可伸缩地处理大数据。试试这个,让我知道我的逻辑中是否有漏洞。
from pyspark.sql import functions as F
from pyspark.sql.functions import when
df.withColumn("Minutes", F.round((F.col("Seconds")/60),2))\
.withColumn("Hours", F.floor((F.col("Minutes")/60)))\
.withColumn("hourmin", F.floor(F.col("Minutes")-(F.col("Hours").cast("int") * 60)))\
.withColumn("Days", F.floor((F.col("Hours")/24)))\
.withColumn("Days2", F.col("Days")*24)\
.withColumn("Time", F.when((F.col("Hours")==0) &(F.col("Days")==0), F.concat(F.col("hourmin"),F.lit("min"))).when((F.col("Hours")!=0)&(F.col("Days")==0), F.concat(F.col("Hours"),F.lit("hr "),F.col("hourmin"),F.lit("min"))).when(F.col("Days")!=0, F.concat(F.col("Days"),F.lit("d "),(F.col("Hours")-F.col("Days2")),F.lit("hr "),F.col("hourmin"),F.lit("min"))))\
.drop("Minutes","Hours","hourmin","Days","Days2")\
.show()
+-----------------+-------+---------------+
| Name|Seconds| Time|
+-----------------+-------+---------------+
| Enviar solicitud| 1209| 20min|
| Analizar mapa de| 12155| 3hr 22min|
|Entregar servicio| 91049| 1d 1hr 17min|
| example1| 1900| 31min|
| example2| 2500| 41min|
| example3|9282398|107d 10hr 26min|
+-----------------+-------+---------------+
Spark中没有内置函数,但是可以在没有UDF的情况下完成。您可以简单地使用除法和取模运算来计算它以获得不同的部分(天,小时,...),并进行串联以获得所需的格式。
对于Spark 2.4+,可以使用高阶函数zip_with
和zip_with
array_join
另一种方法是使用array_join
并添加duration_parts = [(86400, 7), (3600, 24), (60, 60), (1, 60)]
exp = "zip_with(parts, array('d', 'hr', 'min', 'sec'), (x, y) -> IF(x > 0, concat(x, y), null))"
df.withColumn("parts", array(*[(floor(col("Seconds") / d)) % m for d, m in duration_parts]))\
.withColumn("duration", array_join(expr(exp), ", "))\
.drop("parts")\
.show(truncate=False)
#+--------------------------------------------+-------+---------------------+
#|Name |Seconds|duration |
#+--------------------------------------------+-------+---------------------+
#|Analizar mapa de comparacion de presupuestos|1209 |20min, 9sec |
#|Crear mapa de comparacion de presupuestos |12155 |3hr, 22min, 35sec |
#|Entregar servicios de bienes |91049 |1d, 1hr, 17min, 29sec|
#+--------------------------------------------+-------+---------------------+
表达式,如果您不希望等于0的部分:
concat
这应该为您提供when
格式的输出。
df.withColumn("duration", concat(
floor(col("Seconds") / 86400), lit("d, "),
floor(col("Seconds") % 86400 / 3600), lit("hr, "),
floor((col("Seconds") % 86400) % 3600 / 60), lit("min, "),
floor(((col("Seconds") % 86400) % 3600) % 60), lit("sec "),
)).show(truncate=False)