我有一个带有表的数据库。 该表包含 2 列:user_id(数字)、score(数字)。 在我的表中,我可以有多行具有相同的 user_id / 分数,也可以两者都有。
我想显示 每个分数:
尝试加入,但我想要一个更干净的解决方案
这对于窗口函数来说实际上是非常困难的,因为需求中的 DISTINCT 位以及 Spark 不支持 count(distinct ...) over() 的事实。很想知道人们是否有聪明的解决方法,但标准的大锤方法是有效的。
构建示例
:
"""
+-------+-----+
|user_id|score|
+-------+-----+
| 100| 1|
| 100| 1|
| 100| 2|
| 200| 3|
| 300| 4|
| 400| 4|
+-------+-----+
"""
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType
schema = StructType([
StructField("user_id", IntegerType(), True),
StructField("score", IntegerType(), True)
])
data = [(100, 1), (100, 1), (100, 2), (200, 3), (300, 4), (400, 4)]
df = spark.createDataFrame(data, schema=schema)
df.show()
df.createOrReplaceTempView('your_data')
选项 1:使用自连接的简单传统方式。使用适当的不等式条件进行自连接并计算外表中的不同 id。
with below_counts as (
select yd1.score,
count(distinct yd2.user_id) as count_below
from your_data yd1
left
join your_data yd2
on yd1.score > yd2.score
group
by yd1.score),
above_counts as (
select yd1.score,
count(distinct yd2.user_id) as count_above
from your_data yd1
left
join your_data yd2
on yd1.score < yd2.score
group
by yd1.score),
equal_counts as (
select yd1.score,
count(distinct yd1.user_id) as count_at
from your_data yd1
group
by yd1.score)
select e.score,
e.count_at,
a.count_above,
b.count_below
from equal_counts e
inner
join above_counts a
on e.score = a.score
inner
join below_counts b
on e.score = b.score;
选项 2:Finagle 窗口函数。派生一个切换指示符,指示 user_id 是否是任一方向上的第一次出现。这是为了模拟范围内每个方向的 count(distinct ...) over() 。然后您可以对这个二进制指标求和以获得不同的计数。
行数几乎相同,但我敢打赌,在足够的规模下,性能会比自连接更好。 Spark 可能会很棘手,如果数据量很大,有时 PARTITION BY 1 会将所有数据合并到一个分区,这是一场溢出噩梦。
spark.sql("""
with distinct_user_scores as (
select
yd.score,
distinct_at.count_at_score,
case when
row_number() over (
partition by yd.user_id
order by yd.score
) = 1 then 1 else 0 end is_first_ascending,
case when
row_number() over (
partition by yd.user_id
order by yd.score desc
) = 1 then 1 else 0 end is_first_descending
from your_data yd
inner
join (
select score,
count(distinct user_id) as count_at_score
from your_data
group
by score
) as distinct_at
on yd.score = distinct_at.score
)
select distinct
score,
count_at_score as distinct_users_with_score,
coalesce(
sum(is_first_ascending) over (
partition by 1
order by score
range between unbounded preceding and 1 preceding
),0) as distinct_users_with_lower_score,
coalesce(
sum(is_first_descending) over (
partition by 1
order by score
range between 1 following and unbounded following
), 0) as distinct_users_with_higher_score
from distinct_user_scores yd order by score""").show()
SELECT
s.score,
COUNT(DISTINCT CASE WHEN u.score < s.score THEN u.user_id END) AS user_lt,
COUNT(DISTINCT CASE WHEN u.score = s.score THEN u.user_id END) AS user_eq,
COUNT(DISTINCT CASE WHEN u.score >= s.score THEN u.user_id END) AS user_ge
FROM
(
SELECT DISTINCT score FROM your_table
)
AS s
CROSS JOIN
your_table AS u
GROUP BY
s.score