PySpark(Spark v3.4.1)结构化流如何实现累积聚合数据写入spark sink?

问题描述 投票:0回答:1

我正在从事 pyspark 结构化流编程,以生成流数据的一些累积聚合。我已经使用mongodb和kafka作为spark读取流数据。我在火花水槽中尝试过

foreachBatch
foreach
,但无法满足我的要求。另外,我不确定火花接收器的
outputMode
哪一种模式最适合我的要求。在此之后我不知道如何继续前进。下面是我的读取流数据帧,到目前为止我已经从源数据转换了它。

+--------------------+-------+--------------+-------------------+-------------------+-------------------+----------+----------+----------------+
|                  id|user_id|profile_viewed|         created_at|         updated_at|  settings_end_date|start_time|  end_time|total_time_saved|
+--------------------+-------+--------------+-------------------+-------------------+-------------------+----------+----------+----------------+
|6500d4bce7c328000...|   2533|           143|2023-09-12 21:14:35|2023-09-15 09:01:48|2023-09-13 01:29:18|1694533475|1694548758|           19573|
|64fe9c7a39869c000...|   2660|            41|2023-09-11 04:50:02|2023-09-15 08:41:41|2023-09-11 04:53:56|1694388002|1694388236|            1464|
|650143cd2d1886000...|   2660|             6|2023-09-13 05:08:29|2023-09-15 08:41:34|2023-09-13 05:10:05|1694561909|1694562005|             276|
|6501453f6b71b8000...|   2660|             4|2023-09-13 05:14:39|2023-09-15 08:41:05|2023-09-13 05:17:06|1694562279|1694562426|             267|
|65014196ff8372000...|   2660|            70|2023-09-13 04:59:02|2023-09-15 08:40:44|2023-09-13 05:04:39|1694561342|1694561679|            2437|
|64e80cf5cfeb0e000...|   2655|            18|2023-09-15 05:07:49|2023-09-15 05:29:49|2023-09-15 05:20:49|1694734669|1694735449|            1320|
|650d5a497266eb2ca...|   2655|             2|2023-09-15 05:10:49|2023-09-15 05:14:49|2023-09-15 05:10:49|1694734849|1694734849|              60|
|6503b61c660ef2000...|   2672|             5|2023-09-15 01:40:44|2023-09-15 01:55:50|2023-09-15 01:55:50|1694722244|1694723150|            1056|
|6503b5621c53eb000...|   2672|             9|2023-09-15 01:37:38|2023-09-15 01:39:34|2023-09-15 01:39:34|1694722058|1694722174|             386|
|6503a7d81c53eb000...|   2515|            32|2023-09-15 00:39:52|2023-09-15 00:40:45|2023-09-15 00:40:45|1694718592|1694718645|            1013|
|6500c6e6602140000...|   1996|            98|2023-09-12 20:15:34|2023-09-14 22:54:31|2023-09-12 23:12:21|1694529934|1694540541|           13547|
|64fefdc4540b67000...|   2658|            65|2023-09-11 11:45:08|2023-09-14 20:04:15|2023-09-11 16:15:35|1694412908|1694429135|           18177|
|64f8ef0aec3e46000...|   1996|            41|2023-09-06 21:28:42|2023-09-14 18:53:06|2023-09-07 00:22:47|1694015922|1694026367|           11675|
+--------------------+-------+--------------+-------------------+-------------------+-------------------+----------+----------+----------------+

请注意:在此数据框中,

id
user_id
列不是唯一的。我的意思是有重复的 id 和用户 id。

现在这就是我正在寻找的......

首先,我只想保留最后更新的

id
列中那些唯一的行。换句话说,如果有 5 条记录具有相同的 id,那么我只想保留
updated_at
列值是所有 5 条记录中最新日期时间值的一条记录。这里还需要考虑的是,当新数据具有相同的 id 时,该 id 已经存在于数据帧中,需要使用最新的 id 进行更新。希望这是有道理的

第二,在获取第一个数据帧后,我想通过

user_id
列和
total_time_saved
列的总和进行累积聚合组。

我对 pyspark 很陌生。请建议我最好的方法。我的Spark和Python版本分别是v3.4.1和v3.8。预先感谢

apache-spark pyspark spark-structured-streaming
1个回答
0
投票

您似乎需要的是有状态操作。

例如,flatMapGroupsWithState 将存储每个组的状态,并且对于每条新消息,您都可以刷新相应组的状态。

由于这在 python 中不可用,您可能需要考虑切换到 scala 或使用本文中提到的两种方法之一

https://stackoverflow.com/a/49825585/8726538

希望这有帮助

© www.soinside.com 2019 - 2024. All rights reserved.