根据匹配先前值和当前值压缩记录

问题描述 投票:0回答:1

我正在开发一个 SQL 解决方案,以在记录之间存在匹配的“开始”和“关闭”值时压缩行数。这是一个示例输入和所需的输出

输入

名字 开始 关闭
A 120 130
A 130 140
A 140 150
A 152 160
A 160 180
B 100 130
B 130 200
B 202 250
C 300 400

所需输出

名字 开始 关闭
A 120 150
A 152 180
B 100 200
B 202 250
C 300 400

我尝试使用

lag()
函数,但没有得到正确的输出。零边界被跨越。

with 
     t1 ( 
            select name, start, close, lag(close) over(partition by name order by start) pclose from event 
        ),
     t2 (
            select * from t1 where 1 = (case when pclose is null then 1
                                             when start = pclose then 0 else 1 end)
        )
   select * from t2 order by name, start

欢迎任何标准投诉解决方案,因为我可以轻松地将其移植到 Spark。

sql mysql apache-spark-sql
1个回答
0
投票

又是一个经典的

gaps-and-islands
问题。

以下是实现结果的方法:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, row_number, min, max, when, lit, sum
from pyspark.sql.types import DateType
from pyspark.sql.window import Window

# Initialize Spark session
spark = SparkSession.builder.appName("MergeIntervals").getOrCreate()

# Sample data

data = [
    ("A", 120, 130),
    ("A", 130, 140),
    ("A", 140, 150),
    ("A", 152, 160),
    ("A", 160, 180),
    ("B", 100, 130),
    ("B", 130, 200),
    ("B", 202, 250),
    ("C", 300, 400),
]

# Create DataFrame
df = spark.createDataFrame(data, ["name", "start", "close"])

df = (
    df.withColumn("row_num", row_number().over(Window.orderBy("start", "close")))
    .withColumn(
        "previous_close",
        max(col("close")).over(
            Window.partitionBy("name")
            .orderBy("name", "start", "close")
            .rowsBetween(Window.unboundedPreceding, Window.currentRow - 1)
        ),
    )
    .withColumn(
        "island_start_indicator",
        when(col("previous_close") >= col("start"), lit(0)).otherwise(lit(1)),
    )
    .withColumn(
        "island_id",
        sum("island_start_indicator").over(Window.orderBy("name", "start", "close")),
    )
    .withColumn("island_min_start", min("start").over(Window.partitionBy("island_id")))
    .withColumn("island_max_close", max("close").over(Window.partitionBy("island_id")))
    .select(
        col("name"),
        col("island_min_start").alias("start"),
        col("island_max_close").alias("close"),
    )
    .distinct()
)

df.show()
+----+-----+-----+
|name|start|close|
+----+-----+-----+
|   A|  120|  150|
|   A|  152|  180|
|   B|  100|  200|
|   B|  202|  250|
|   C|  300|  400|
+----+-----+-----+

这是 SQL 版本:

%sql
with sample_data as (
  select "A" as name, 120 as start, 130 as close union all
  select "A" as name, 130 as start, 140 as close union all
  select "A" as name, 140 as start, 150 as close union all
  select "A" as name, 152 as start, 160 as close union all
  select "A" as name, 160 as start, 180 as close union all
  select "B" as name, 100 as start, 130 as close union all
  select "B" as name, 130 as start, 200 as close union all
  select "B" as name, 202 as start, 250 as close union all
  select "C" as name, 300 as start, 400
),
windowed_data as (
  select
    *,
    max(close) over(
      partition by name
      order by
        name,
        start,
        close rows between unbounded preceding
        and 1 preceding
    ) as previous_close
  from
    sample_data
),
island as (
  select
    *,
    sum(island_start_indicator) over (
      order by
        name,
        start,
        close
    ) as island_id
  from
    (
      select
        *,
        case
          when previous_close >= start then 0
          else 1
        end as island_start_indicator
      from
        windowed_data
    )
)
select
  distinct name,
  min(start) over(partition by island_id) as start,
  max(close) over(partition by island_id) as close
from
  island
© www.soinside.com 2019 - 2024. All rights reserved.