我正在开发一个 SQL 解决方案,以在记录之间存在匹配的“开始”和“关闭”值时压缩行数。这是一个示例输入和所需的输出
输入:
+----+-----+-----+
|name|start|close|
+----+-----+-----+
|A |120 |130 |
|A |130 |140 |
|A |140 |150 |
|A |152 |160 |
|A |160 |180 |
|B |100 |130 |
|B |130 |200 |
|B |202 |250 |
|C |300 |400 |
+----+-----+-----+
所需输出:
+----+-----+-----+
|name|start|close|
+----+-----+-----+
|A |120 |150 |
|A |152 |180 |
|B |100 |200 |
|B |202 |250 |
|C |300 |400 |
+----+-----+-----+
我尝试使用 lag() 函数,但没有得到正确的输出..零边界正在跨越。
with
t1 (
select name, start, close, lag(close) over(partition by name order by start) pclose from event
),
t2 (
select * from t1 where 1 = (case when pclose is null then 1
when start = pclose then 0 else 1 end)
)
select * from t2 order by name, start
欢迎任何 ANSI 投诉解决方案,因为我可以轻松地将其移植到 Spark。
又是一个经典的
gaps-and-islands
问题。
以下是实现结果的方法:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, row_number, min, max, when, lit, sum
from pyspark.sql.types import DateType
from pyspark.sql.window import Window
# Initialize Spark session
spark = SparkSession.builder.appName("MergeIntervals").getOrCreate()
# Sample data
data = [
("A", 120, 130),
("A", 130, 140),
("A", 140, 150),
("A", 152, 160),
("A", 160, 180),
("B", 100, 130),
("B", 130, 200),
("B", 202, 250),
("C", 300, 400),
]
# Create DataFrame
df = spark.createDataFrame(data, ["name", "start", "close"])
df = (
df.withColumn("row_num", row_number().over(Window.orderBy("start", "close")))
.withColumn(
"previous_close",
max(col("close")).over(
Window.partitionBy("name")
.orderBy("name", "start", "close")
.rowsBetween(Window.unboundedPreceding, Window.currentRow - 1)
),
)
.withColumn(
"island_start_indicator",
when(col("previous_close") >= col("start"), lit(0)).otherwise(lit(1)),
)
.withColumn(
"island_id",
sum("island_start_indicator").over(Window.orderBy("name", "start", "close")),
)
.withColumn("island_min_start", min("start").over(Window.partitionBy("island_id")))
.withColumn("island_max_close", max("close").over(Window.partitionBy("island_id")))
.select(
col("name"),
col("island_min_start").alias("start"),
col("island_max_close").alias("close"),
)
.distinct()
)
df.show()
+----+-----+-----+
|name|start|close|
+----+-----+-----+
| A| 120| 150|
| A| 152| 180|
| B| 100| 200|
| B| 202| 250|
| C| 300| 400|
+----+-----+-----+