SQL - 根据匹配的先前值和当前值压缩记录

问题描述 投票:0回答:1

我正在开发一个 SQL 解决方案,以在记录之间存在匹配的“开始”和“关闭”值时压缩行数。这是一个示例输入和所需的输出

输入:

+----+-----+-----+
|name|start|close|
+----+-----+-----+
|A   |120  |130  |
|A   |130  |140  |
|A   |140  |150  |
|A   |152  |160  |
|A   |160  |180  |
|B   |100  |130  |
|B   |130  |200  |
|B   |202  |250  |
|C   |300  |400  |
+----+-----+-----+

所需输出:

+----+-----+-----+
|name|start|close|
+----+-----+-----+
|A   |120  |150  |
|A   |152  |180  |
|B   |100  |200  |
|B   |202  |250  |
|C   |300  |400  |
+----+-----+-----+

我尝试使用 lag() 函数,但没有得到正确的输出..零边界正在跨越。

with 
     t1 ( 
            select name, start, close, lag(close) over(partition by name order by start) pclose from event 
        ),
     t2 (
            select * from t1 where 1 = (case when pclose is null then 1
                                             when start = pclose then 0 else 1 end)
        )
   select * from t2 order by name, start

欢迎任何 ANSI 投诉解决方案,因为我可以轻松地将其移植到 Spark。

sql mysql apache-spark-sql
1个回答
0
投票

又是一个经典的

gaps-and-islands
问题。

以下是实现结果的方法:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, row_number, min, max, when, lit, sum
from pyspark.sql.types import DateType
from pyspark.sql.window import Window

# Initialize Spark session
spark = SparkSession.builder.appName("MergeIntervals").getOrCreate()

# Sample data

data = [
    ("A", 120, 130),
    ("A", 130, 140),
    ("A", 140, 150),
    ("A", 152, 160),
    ("A", 160, 180),
    ("B", 100, 130),
    ("B", 130, 200),
    ("B", 202, 250),
    ("C", 300, 400),
]

# Create DataFrame
df = spark.createDataFrame(data, ["name", "start", "close"])

df = (
    df.withColumn("row_num", row_number().over(Window.orderBy("start", "close")))
    .withColumn(
        "previous_close",
        max(col("close")).over(
            Window.partitionBy("name")
            .orderBy("name", "start", "close")
            .rowsBetween(Window.unboundedPreceding, Window.currentRow - 1)
        ),
    )
    .withColumn(
        "island_start_indicator",
        when(col("previous_close") >= col("start"), lit(0)).otherwise(lit(1)),
    )
    .withColumn(
        "island_id",
        sum("island_start_indicator").over(Window.orderBy("name", "start", "close")),
    )
    .withColumn("island_min_start", min("start").over(Window.partitionBy("island_id")))
    .withColumn("island_max_close", max("close").over(Window.partitionBy("island_id")))
    .select(
        col("name"),
        col("island_min_start").alias("start"),
        col("island_max_close").alias("close"),
    )
    .distinct()
)

df.show()
+----+-----+-----+
|name|start|close|
+----+-----+-----+
|   A|  120|  150|
|   A|  152|  180|
|   B|  100|  200|
|   B|  202|  250|
|   C|  300|  400|
+----+-----+-----+
© www.soinside.com 2019 - 2024. All rights reserved.