要理解此问题,可以想象在t1
时刻有一群人在某个位置L
。您想知道所有这些人在t0
的时间在哪里。
我这里有一个示例代码来引导练习:
row = Row("user_id", "start", "location_id")
df = spark.sparkContext.parallelize([
row(1, "2015-01-01 01:00:00", 100),
row(2, "2015-01-01 01:00:00", 100),
row(2, "2015-01-01 02:00:00", 100),
row(1, "2015-01-01 02:00:00", 300),
row(2, "2015-01-01 02:00:00", 300),
row(1, "2015-01-01 03:00:00", 300),
row(3, "2015-01-01 03:00:00", 300),
row(2, "2015-01-01 03:00:00", 300),
]).toDF().withColumn("timebucket_start", col("start").cast("timestamp"))
预期结果应该是这样的:
+--------------------+--------------+-------------------+-----+
|camefrom_location_id|in_location_id| at_bucket|count|
+--------------------+--------------+-------------------+-----+
| 100| 300|2015-01-01 02:00:00| 2|
| 100| 100|2015-01-01 02:00:00| 1|
| 100| 300|2015-01-01 03:00:00| 1|
| 300| 300|2015-01-01 03:00:00| 2|
+--------------------+--------------+-------------------+-----+
您可以看到,在位置03:00:00
的时间段300
,有2个人来自位置300
(在时间02:00:00
的原始数据中,1
和2
都位于[ C0])和另外一个来自位置300
的(仅100
)。
我通过自连接计算了该解决方案:
2
我想使用Window函数来复制相同的控件,因为它的执行速度可能更快(在较大的数据集上,自连接会爆炸)。在这里,我正在努力用SparkSQL构建它。
根本上,我的想法是“让我们按位置和存储区分组”(以便获得df.createOrReplaceTempView("dataset")
res = spark.sql(f"""
SELECT
j1.location_id as camefrom_location_id,
j2.location_id as in_location_id,
j2.timebucket_start as at_bucket,
COUNT(DISTINCT j1.user_id) as count
FROM
dataset j1 INNER JOIN dataset j2
ON
j2.timebucket_start == (j1.timebucket_start + INTERVAL 1 HOUR) AND
j1.user_id == j2.user_id
GROUP BY
j1.location_id, j2.location_id, j2.timebucket_start
ORDER BY
at_bucket
""")
的列表),然后基于该列表进行窗口显示。但是我不太确定您是否可以基于ID列表进行窗口操作。同时,如果我在这个阶段爆炸结果,那么我正在违抗使用窗口函数的目的。。不是吗?
另一个想法是创建一个1小时的窗口,并按user_id进行分区,然后在窗口内进行分组。但是,再次看来,使用自定义UDAF也许可以实现?
您是您想要的结果的数据有误,或者您提供的数据是错误的结果。取决于您如何看待它。例如。 id 3输入的第一个条目是row(3,“ 2015-01-01 03:00:00”,300),因此它没有'from'位置。因此,“ 2015-01-01 03:00:00”,“ 300”中只有两个ID具有从位置,但您的预期结果将显示3,300-300的计数为2,而100-300的计数为1。
我会让您重命名列,但这大致就是我要做的:
user_id
我建议在提出问题之前仔细检查问题中的数据,这将使您更有可能得到一个好的答案。