我正在开发一个 SQL 解决方案,以在记录之间存在匹配的“开始”和“关闭”值时压缩行数。这是一个示例输入和所需的输出
名字 | 开始 | 关闭 |
---|---|---|
A | 120 | 130 |
A | 130 | 140 |
A | 140 | 150 |
A | 152 | 160 |
A | 160 | 180 |
B | 100 | 130 |
B | 130 | 200 |
B | 202 | 250 |
C | 300 | 400 |
名字 | 开始 | 关闭 |
---|---|---|
A | 120 | 150 |
A | 152 | 180 |
B | 100 | 200 |
B | 202 | 250 |
C | 300 | 400 |
我尝试使用
lag()
函数,但没有得到正确的输出。零边界被跨越。
with
t1 (
select name, start, close, lag(close) over(partition by name order by start) pclose from event
),
t2 (
select * from t1 where 1 = (case when pclose is null then 1
when start = pclose then 0 else 1 end)
)
select * from t2 order by name, start
欢迎任何标准投诉解决方案,因为我可以轻松地将其移植到 Spark。
又是一个经典的
gaps-and-islands
问题。
以下是实现结果的方法:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, row_number, min, max, when, lit, sum
from pyspark.sql.types import DateType
from pyspark.sql.window import Window
# Initialize Spark session
spark = SparkSession.builder.appName("MergeIntervals").getOrCreate()
# Sample data
data = [
("A", 120, 130),
("A", 130, 140),
("A", 140, 150),
("A", 152, 160),
("A", 160, 180),
("B", 100, 130),
("B", 130, 200),
("B", 202, 250),
("C", 300, 400),
]
# Create DataFrame
df = spark.createDataFrame(data, ["name", "start", "close"])
df = (
df.withColumn("row_num", row_number().over(Window.orderBy("start", "close")))
.withColumn(
"previous_close",
max(col("close")).over(
Window.partitionBy("name")
.orderBy("name", "start", "close")
.rowsBetween(Window.unboundedPreceding, Window.currentRow - 1)
),
)
.withColumn(
"island_start_indicator",
when(col("previous_close") >= col("start"), lit(0)).otherwise(lit(1)),
)
.withColumn(
"island_id",
sum("island_start_indicator").over(Window.orderBy("name", "start", "close")),
)
.withColumn("island_min_start", min("start").over(Window.partitionBy("island_id")))
.withColumn("island_max_close", max("close").over(Window.partitionBy("island_id")))
.select(
col("name"),
col("island_min_start").alias("start"),
col("island_max_close").alias("close"),
)
.distinct()
)
df.show()
+----+-----+-----+
|name|start|close|
+----+-----+-----+
| A| 120| 150|
| A| 152| 180|
| B| 100| 200|
| B| 202| 250|
| C| 300| 400|
+----+-----+-----+
这是 SQL 版本:
%sql
with sample_data as (
select "A" as name, 120 as start, 130 as close union all
select "A" as name, 130 as start, 140 as close union all
select "A" as name, 140 as start, 150 as close union all
select "A" as name, 152 as start, 160 as close union all
select "A" as name, 160 as start, 180 as close union all
select "B" as name, 100 as start, 130 as close union all
select "B" as name, 130 as start, 200 as close union all
select "B" as name, 202 as start, 250 as close union all
select "C" as name, 300 as start, 400
),
windowed_data as (
select
*,
max(close) over(
partition by name
order by
name,
start,
close rows between unbounded preceding
and 1 preceding
) as previous_close
from
sample_data
),
island as (
select
*,
sum(island_start_indicator) over (
order by
name,
start,
close
) as island_id
from
(
select
*,
case
when previous_close >= start then 0
else 1
end as island_start_indicator
from
windowed_data
)
)
select
distinct name,
min(start) over(partition by island_id) as start,
max(close) over(partition by island_id) as close
from
island