我有一个pyspark数据框,其中一列是日期列。
我需要通过pandas / datetime函数运行此列来计算营业时间。
但是,我似乎无法正确完成转换:
df3 = df2.withColumn('test_date', add_one(df2.AssignedDate.toPandas()))
产生错误:
''Column'对象不可调用
我正在尝试通过以下功能运行df2.AssignedDate
:
def add_one(pd_date):
if pd_date.isoweekday() == 6:
pd_date = pd_date.replace(hour = 7 , minute=0)
return pd_date
您可以使用熊猫UDF:
from pyspark.sql.functions import pandas_udf
@pandas_udf(returnType=TimestampType())
def add_one(pd_date):
# If pd_date doesn't come as datetime object then uncomment the following line
# pd_date = pd.to_datetime(pd_date)
if pd_date.isoweekday() == 6:
pd_date = pd_date.replace(hour = 7 , minute=0)
return pd_date
df3 = df2.withColumn('test_date', add_one(df2.AssignedDate))
有关更多信息,请参见此Databricks blog
您可以使用常规的pyspark.sql.functions解析时间戳并直接对其进行操作:
In [1]: from datetime import datetime
...: from pyspark.sql.functions import col, date_format, to_timestamp, when, dayofweek
...:
...: frame = spark.createDataFrame(
...: [(1, datetime(2019, 11, 4, 7, 15, 21)),
...: (2, datetime(2019, 11, 9, 6, 2, 4))],
...: schema=("id", "time"))
...:
...: replaced_as_string = frame.withColumn(
...: "trunc",
...: when(
...: dayofweek(col("time")) == 7, # different convention
...: date_format(col("time"), "yyyy-MM-dd 07:00:ss")
...: ).otherwise(
...: date_format(col("time"), "yyyy-MM-dd HH:mm:ss"))
...: )
...: replaced_as_timestamp = replaced_as_string.withColumn(
...: "trunc",
...: to_timestamp(col("trunc")))
...: replaced_as_timestamp.show()
...:
+---+-------------------+-------------------+
| id| time| trunc|
+---+-------------------+-------------------+
| 1|2019-11-04 07:15:21|2019-11-04 07:15:21|
| 2|2019-11-09 06:02:04|2019-11-09 07:00:04|
+---+-------------------+-------------------+
这具有完全保留Java对象内部的优点,因此您不必浪费时间在Python对象之间进行转换。
[请注意,函数dayofweek
与Python的datetime.datetime.isoweekday()
的计数不同。