我正在从事 pyspark 结构化流编程,以生成流数据的一些累积聚合。我已经使用mongodb和kafka作为spark读取流数据。我在火花水槽中尝试过
foreachBatch
和foreach
,但无法满足我的要求。另外,我不确定火花接收器的outputMode
哪一种模式最适合我的要求。在此之后我不知道如何继续前进。下面是我的读取流数据帧,到目前为止我已经从源数据转换了它。
+--------------------+-------+--------------+-------------------+-------------------+-------------------+----------+----------+----------------+
| id|user_id|profile_viewed| created_at| updated_at| settings_end_date|start_time| end_time|total_time_saved|
+--------------------+-------+--------------+-------------------+-------------------+-------------------+----------+----------+----------------+
|6500d4bce7c328000...| 2533| 143|2023-09-12 21:14:35|2023-09-15 09:01:48|2023-09-13 01:29:18|1694533475|1694548758| 19573|
|64fe9c7a39869c000...| 2660| 41|2023-09-11 04:50:02|2023-09-15 08:41:41|2023-09-11 04:53:56|1694388002|1694388236| 1464|
|650143cd2d1886000...| 2660| 6|2023-09-13 05:08:29|2023-09-15 08:41:34|2023-09-13 05:10:05|1694561909|1694562005| 276|
|6501453f6b71b8000...| 2660| 4|2023-09-13 05:14:39|2023-09-15 08:41:05|2023-09-13 05:17:06|1694562279|1694562426| 267|
|65014196ff8372000...| 2660| 70|2023-09-13 04:59:02|2023-09-15 08:40:44|2023-09-13 05:04:39|1694561342|1694561679| 2437|
|64e80cf5cfeb0e000...| 2655| 18|2023-09-15 05:07:49|2023-09-15 05:29:49|2023-09-15 05:20:49|1694734669|1694735449| 1320|
|650d5a497266eb2ca...| 2655| 2|2023-09-15 05:10:49|2023-09-15 05:14:49|2023-09-15 05:10:49|1694734849|1694734849| 60|
|6503b61c660ef2000...| 2672| 5|2023-09-15 01:40:44|2023-09-15 01:55:50|2023-09-15 01:55:50|1694722244|1694723150| 1056|
|6503b5621c53eb000...| 2672| 9|2023-09-15 01:37:38|2023-09-15 01:39:34|2023-09-15 01:39:34|1694722058|1694722174| 386|
|6503a7d81c53eb000...| 2515| 32|2023-09-15 00:39:52|2023-09-15 00:40:45|2023-09-15 00:40:45|1694718592|1694718645| 1013|
|6500c6e6602140000...| 1996| 98|2023-09-12 20:15:34|2023-09-14 22:54:31|2023-09-12 23:12:21|1694529934|1694540541| 13547|
|64fefdc4540b67000...| 2658| 65|2023-09-11 11:45:08|2023-09-14 20:04:15|2023-09-11 16:15:35|1694412908|1694429135| 18177|
|64f8ef0aec3e46000...| 1996| 41|2023-09-06 21:28:42|2023-09-14 18:53:06|2023-09-07 00:22:47|1694015922|1694026367| 11675|
+--------------------+-------+--------------+-------------------+-------------------+-------------------+----------+----------+----------------+
请注意:在此数据框中,
id
和user_id
列不是唯一的。我的意思是有重复的 id 和用户 id。
现在这就是我正在寻找的......
首先,我只想保留最后更新的
id
列中那些唯一的行。换句话说,如果有 5 条记录具有相同的 id,那么我只想保留 updated_at
列值是所有 5 条记录中最新日期时间值的一条记录。这里还需要考虑的是,当新数据具有相同的 id 时,该 id 已经存在于数据帧中,需要使用最新的 id 进行更新。希望这是有道理的
第二,在获取第一个数据帧后,我想通过
user_id
列和total_time_saved
列的总和进行累积聚合组。
我对 pyspark 很陌生。请建议我最好的方法。我的Spark和Python版本分别是v3.4.1和v3.8。预先感谢
您似乎需要的是有状态操作。
例如,flatMapGroupsWithState 将存储每个组的状态,并且对于每条新消息,您都可以刷新相应组的状态。
由于这在 python 中不可用,您可能需要考虑切换到 scala 或使用本文中提到的两种方法之一
https://stackoverflow.com/a/49825585/8726538
希望这有帮助