通过时差过滤pyspark

问题描述 投票:-1回答:1

我在pyspark中有一个数据框,看起来像这样:

+----------+-------------------+-------+-----------------------+-----------------------+--------+
|Session_Id|Instance_Id        |Actions|Start_Date             |End_Date               |Duration|
+----------+-------------------+-------+-----------------------+-----------------------+--------+
|14252203  |i-051fc2d21fbe001e3|2      |2019-12-17 01:07:30.000|2019-12-17 01:07:30.000|0       |
|43024091  |i-051fc2d21fbe001e3|2      |2019-12-17 01:08:00.000|2019-12-17 01:08:00.000|0       |
|50961995  |i-0c733c7e356bc1615|2      |2019-12-17 01:07:30.000|2019-12-17 01:07:30.000|0       |
|56308963  |i-0c733c7e356bc1615|2      |2019-12-17 01:08:00.000|2019-12-17 01:08:00.000|0       |
|60120472  |i-0c733c7e356bc1615|2      |2019-12-17 01:07:30.000|2019-12-17 01:07:30.000|0       |
|69132492  |i-051fc2d21fbe001e3|2      |2019-12-17 01:07:30.000|2019-12-17 01:07:30.000|0       |
+----------+-------------------+-------+-----------------------+-----------------------+--------+

我正在尝试使用此方法过滤所有太新的行:

now = datetime.datetime.now()

filtered = grouped.filter(f.abs(f.unix_timestamp(now) - f.unix_timestamp(datetime.datetime.strptime(f.col('End_Date')[:-4], '%Y-%m-%d %H:%M:%S'))) > 100)

End_Date转换为时间戳并计算从现在到End_Date的差,并过滤少于100秒的任何内容。我从Filter pyspark dataframe based on time difference between two columns中得到的

每次运行此命令,都会出现此错误:

TypeError: Invalid argument, not a string or column: 2019-12-19 18:55:13.268489 of type <type 'datetime.datetime'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

如何通过比较时间戳进行过滤?

我在pyspark中有一个数据框,看起来像这样:+ ---------- + ------------------- + ----- -+ ----------------------- + ----------------------- + -------- + |会话ID |实例ID |操作| ...

apache-spark pyspark pyspark-sql pyspark-dataframes
1个回答
3
投票

我认为您在Python函数和Spark之间感到困惑。 unix_timestamp函数需要一个字符串或Column对象,但是您要传递一个Python datetime对象,这就是为什么会出现该错误的原因。

© www.soinside.com 2019 - 2024. All rights reserved.