如何统一2个同源的表?

问题描述 投票:0回答:1

我有一个表格,其中的值具有流程愿景(recours),而其他值则具有库存愿景(PSAP)。我将此表分成两部分,以便将我的流视觉场转换为库存视觉,效果很好,但当我尝试将两个表重新统一为一个表时,我陷入困境。

我尝试了几种方法,您可以在下面的代码中看到。

我要么生成一个新的 CTE columnsStockFinal,以便使其具有与 columnsFluxToStock 相同的行数,并且我可以在 columnsFluxToStock 和 columnsStockFinal 之间进行内部联接。

或者我保留 columnsStock 并使用 columnsFluxToStock 进行完整的外部联接。

对于每个缺失的日期,必须出现一行,其中包含 PSAP 值,该值表示元组/键的最后已知库存:Silo / GAR / Lob。

这导致:

对于 2023-03-31 / Completude / GAR1 / lob1,我应该直接提前获得 PSAP 的最后一个值,因此对于 2023-02-28,即 2100。

对于 2023-03-31 / EDI / GAR1 / lob1,我应该直接提前获得 PSAP 的最后一个值,因此对于 2023-02-28,即 1100。

对于 2023-04-30 / EDI / GAR1 / lob1,我应该直接在前面获得 PSAP 的最后一个值,因此对于 2023-02-28 或 2023-03-31,即 1100。

当然,这些值不能是硬编码的。

然后我必须得到以下结果:

ViewDate    Silo    GAR Lob recours PSAP
2023-01-31  Completude  GAR1    lob1    100 2000
2023-01-31  EDI GAR1    lob1    10  1000
2023-02-28  Completude  GAR1    lob1    150 2100
2023-02-28  EDI GAR1    lob1    15  1100
2023-03-31  Completude  GAR1    lob1    150 2100
2023-03-31  EDI GAR1    lob1    15  1100
2023-04-30  Completude  GAR1    lob1    210 2200
2023-04-30  EDI GAR1    lob1    15  1100
2023-05-31  Completude  GAR1    lob1    280 2300
2023-05-31  EDI GAR1    lob1    21  1200

这是我迄今为止所做的代码,并在评论中进行了一些其他尝试:

drop table if exists ColumnsMixedFluxStock;
CREATE TABLE ColumnsMixedFluxStock (
    ViewDate DATE,
    silo varchar(10),
    GAR varchar(10),
    lob varchar(10),
    recours int,
    PSAP int
);
 
INSERT INTO ColumnsMixedFluxStock VALUES
('2023-01-31', 'EDI', 'GAR1', 'lob1', 10, 1000),
('2023-02-28', 'EDI', 'GAR1', 'lob1', 5, 1100),
        ('2023-05-31', 'EDI', 'GAR1', 'lob1', 6, 1200),
 
('2023-01-31', 'Completude', 'GAR1', 'lob1', 100, 2000),
('2023-02-28', 'Completude', 'GAR1', 'lob1', 50, 2100)
 
        ,('2023-04-30', 'Completude', 'GAR1', 'lob1', 60, 2200),
         ('2023-05-31', 'Completude', 'GAR1', 'lob1', 70, 2300)
;
 
with ColumnsFlux as (
select Silo, ViewDate, GAR, Lob, recours
from ColumnsMixedFluxStock
),
ColumnsStock as (
select Silo, ViewDate, GAR, Lob, PSAP
from ColumnsMixedFluxStock
),
min_max_dates AS (
  SELECT MIN(ViewDate) as min_date, MAX(ViewDate) as max_date
  FROM ColumnsMixedFluxStock
),
date_range AS (
  SELECT explode(sequence(to_date(min_date), to_date(max_date), interval 1 month)) as ViewDate
  FROM min_max_dates
),
columnsFluxToStock AS (
SELECT i1.ViewDate, i2.silo, i2.GAR, i2.Lob,
       SUM(coalesce(i2.recours, 0)) as recours
FROM  ColumnsFlux i2
cross join date_range i1 on 
                   i1.ViewDate >= i2.ViewDate
GROUP BY i1.ViewDate, i2.silo, i2.GAR, i2.Lob
)
 
/*columnsStockFinal AS (
SELECT i1.ViewDate, i2.silo, i2.GAR, i2.Lob,
       coalesce(last_value(i2.PSAP) OVER (PARTITION BY /*i2.ViewDate,*/    i2.silo, i2.GAR, i2.Lob ORDER BY i2.ViewDate ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), 0) as PSAP
FROM date_range i1
LEFT JOIN ColumnsStock i2 on 
                   i1.ViewDate >= i2.ViewDate
) select * from columnsStockFinal order by ViewDate, Silo, GAR, Lob*/
 
/*all_combinations AS (
  SELECT dr.ViewDate, cs.silo, cs.GAR, cs.Lob
  FROM date_range dr
  CROSS JOIN (SELECT DISTINCT silo, GAR, Lob FROM ColumnsStock) cs
),
columnsStockFinal AS (
  SELECT ac.ViewDate, ac.silo, ac.GAR, ac.Lob,
         COALESCE(LAST_VALUE(cs.PSAP) OVER (PARTITION BY cs.silo, cs.GAR, cs.Lob ORDER BY ac.ViewDate ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), 0) AS PSAP
  FROM all_combinations ac
  LEFT JOIN ColumnsStock cs ON ac.silo = cs.silo AND ac.GAR = cs.GAR AND ac.Lob = cs.Lob AND ac.ViewDate = cs.ViewDate
) select * from columnsStockFinal order by ViewDate, Silo, GAR, Lob*/
 
/*
columnsStockFinal AS (
SELECT i1.ViewDate, i2.silo, i2.GAR, i2.Lob,
       coalesce(i2.PSAP, 0) as PSAP
FROM  ColumnsStock i2
cross join date_range i1 on 
                   i1.ViewDate = i2.ViewDate
GROUP BY i1.ViewDate, i2.silo, i2.GAR, i2.Lob
) --select * from columnsStockFinal order by ViewDate, Silo, GAR, Lob
*/
 
 
 
/*select cs.Silo, cs.ViewDate, cs.GAR, cs.Lob,
  cfts.recours,
  cs.PSAP
from columnsStockFinal cs  --columnsStock cs 
inner join
--full outer join
--inner join après avoir fait une autre CTE columnsStockFinal avec un cross join de ColumnsStock et date_range
columnsFluxToStock cfts on 
                        cs.Silo = cfts.Silo
                    AND cs.ViewDate = cfts.ViewDate
                            AND cs.GAR = cfts.GAR
                            AND cs.Lob = cfts.Lob
--where recours != 0 or PSAP != 0
order by ViewDate, Silo, GAR, Lob*/
 
select coalesce(cs.Silo, cfts.Silo) as Silo, coalesce(cs.ViewDate, cfts.ViewDate) ViewDate, coalesce(cs.GAR, cfts.GAR) as GAR, coalesce(cs.Lob, cfts.Lob) as Lob,
  cfts.recours,
  cs.PSAP
from columnsStock cs  --columnsStock cs
full outer join
columnsFluxToStock cfts on
                        cs.Silo = cfts.Silo
                    AND cs.ViewDate = cfts.ViewDate
                    AND cs.GAR = cfts.GAR
                    AND cs.Lob = cfts.Lob
--where recours != 0 or PSAP != 0
order by ViewDate, Silo, GAR, Lob

感谢您的帮助。

我在 Databricks 上使用 Spark sql。

sql apache-spark-sql databricks
1个回答
0
投票

这可能存在一些差距,具体取决于您的样本数据的代表性。

杂乱地堆满了一些用于再现性/测试的样板。主要解决方案是在 Spark sql 查询中查找

actual_df

原理 - 使用累积和独立计算原始数据的滚动总和。然后您可以估算缺失的日期,而无需在计算中考虑它。

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from pyspark.sql.functions import to_date, col
from pyspark.testing.utils import assertDataFrameEqual

schema = StructType([
    StructField("viewdate", StringType(), True),
    StructField("silo", StringType(), True),
    StructField("gar", StringType(), True),
    StructField("lob", StringType(), True),
    StructField("recours", IntegerType(), True),
    StructField("psap", IntegerType(), True)
])

sample_data = [
    ('2023-01-31', 'EDI', 'GAR1', 'lob1', 10, 1000),
    ('2023-02-28', 'EDI', 'GAR1', 'lob1', 5, 1100),
    ('2023-05-31', 'EDI', 'GAR1', 'lob1', 6, 1200),
    ('2023-01-31', 'Completude', 'GAR1', 'lob1', 100, 2000),
    ('2023-02-28', 'Completude', 'GAR1', 'lob1', 50, 2100),
    ('2023-04-30', 'Completude', 'GAR1', 'lob1', 60, 2200),
    ('2023-05-31', 'Completude', 'GAR1', 'lob1', 70, 2300)
]

expected_data = [
    ('2023-01-31', 'Completude', 'GAR1', 'lob1', 100, 2000),
    ('2023-01-31', 'EDI', 'GAR1', 'lob1', 10, 1000),
    ('2023-02-28', 'Completude', 'GAR1', 'lob1', 150, 2100),
    ('2023-02-28', 'EDI', 'GAR1', 'lob1', 15, 1100),
    ('2023-03-31', 'Completude', 'GAR1', 'lob1', 150, 2100),
    ('2023-03-31', 'EDI', 'GAR1', 'lob1', 15, 1100),
    ('2023-04-30', 'Completude', 'GAR1', 'lob1', 210, 2200),
    ('2023-04-30', 'EDI', 'GAR1', 'lob1', 15, 1100),
    ('2023-05-31', 'Completude', 'GAR1', 'lob1', 280, 2300),
    ('2023-05-31', 'EDI', 'GAR1', 'lob1', 21, 1200)
]

def create_df_with_cleaned_date(data):
  return (
    spark.
    createDataFrame(data, schema=schema).
    withColumn("ViewDate", to_date(col("ViewDate")))
  )

create_df_with_cleaned_date(sample_data).createOrReplaceTempView("sample_vw")
expected_df = create_df_with_cleaned_date(expected_data)

actual_df = spark.sql("""
  with month_ends_in_range as (
    SELECT
      explode(
        sequence(min(ViewDate), max(ViewDate), interval 1 month)
      ) as month_ends
    FROM
      sample_vw
  ),
  cum_sums_calculated as (
    select
      *,
      sum(recours) over (
        PARTITION BY Silo
        ORDER BY
          viewDate rows between unbounded preceding
          and current row) as cum_sum_recours,
      lead(ViewDate, 1, cast('9999-12-31' as date)) over (
        partition by Silo
        order by
          ViewDate
      ) nextViewDate
    from
      sample_vw
  ),
  fill_in_gap_dates as (
    select
      *
    from
      cum_sums_calculated csc
      inner join month_ends_in_range mer 
      on mer.month_ends >= csc.ViewDate
      and mer.month_ends < csc.NextViewDate
  )
  select
    month_ends as ViewDate,
    silo,
    gar,
    lob,
    cast(cum_sum_recours as int) recours,
    psap  
  from
    fill_in_gap_dates
  order by
    ViewDate""")

actual_df.show()
expected_df.show()
assertDataFrameEqual(actual_df, expected_df)
© www.soinside.com 2019 - 2024. All rights reserved.