我有一个表格,其中的值具有流程愿景(recours),而其他值则具有库存愿景(PSAP)。我将此表分成两部分,以便将我的流视觉场转换为库存视觉,效果很好,但当我尝试将两个表重新统一为一个表时,我陷入困境。
我尝试了几种方法,您可以在下面的代码中看到。
我要么生成一个新的 CTE columnsStockFinal,以便使其具有与 columnsFluxToStock 相同的行数,并且我可以在 columnsFluxToStock 和 columnsStockFinal 之间进行内部联接。
或者我保留 columnsStock 并使用 columnsFluxToStock 进行完整的外部联接。
对于每个缺失的日期,必须出现一行,其中包含 PSAP 值,该值表示元组/键的最后已知库存:Silo / GAR / Lob。
这导致:
对于 2023-03-31 / Completude / GAR1 / lob1,我应该直接提前获得 PSAP 的最后一个值,因此对于 2023-02-28,即 2100。
对于 2023-03-31 / EDI / GAR1 / lob1,我应该直接提前获得 PSAP 的最后一个值,因此对于 2023-02-28,即 1100。
对于 2023-04-30 / EDI / GAR1 / lob1,我应该直接在前面获得 PSAP 的最后一个值,因此对于 2023-02-28 或 2023-03-31,即 1100。
当然,这些值不能是硬编码的。
然后我必须得到以下结果:
ViewDate Silo GAR Lob recours PSAP
2023-01-31 Completude GAR1 lob1 100 2000
2023-01-31 EDI GAR1 lob1 10 1000
2023-02-28 Completude GAR1 lob1 150 2100
2023-02-28 EDI GAR1 lob1 15 1100
2023-03-31 Completude GAR1 lob1 150 2100
2023-03-31 EDI GAR1 lob1 15 1100
2023-04-30 Completude GAR1 lob1 210 2200
2023-04-30 EDI GAR1 lob1 15 1100
2023-05-31 Completude GAR1 lob1 280 2300
2023-05-31 EDI GAR1 lob1 21 1200
这是我迄今为止所做的代码,并在评论中进行了一些其他尝试:
drop table if exists ColumnsMixedFluxStock;
CREATE TABLE ColumnsMixedFluxStock (
ViewDate DATE,
silo varchar(10),
GAR varchar(10),
lob varchar(10),
recours int,
PSAP int
);
INSERT INTO ColumnsMixedFluxStock VALUES
('2023-01-31', 'EDI', 'GAR1', 'lob1', 10, 1000),
('2023-02-28', 'EDI', 'GAR1', 'lob1', 5, 1100),
('2023-05-31', 'EDI', 'GAR1', 'lob1', 6, 1200),
('2023-01-31', 'Completude', 'GAR1', 'lob1', 100, 2000),
('2023-02-28', 'Completude', 'GAR1', 'lob1', 50, 2100)
,('2023-04-30', 'Completude', 'GAR1', 'lob1', 60, 2200),
('2023-05-31', 'Completude', 'GAR1', 'lob1', 70, 2300)
;
with ColumnsFlux as (
select Silo, ViewDate, GAR, Lob, recours
from ColumnsMixedFluxStock
),
ColumnsStock as (
select Silo, ViewDate, GAR, Lob, PSAP
from ColumnsMixedFluxStock
),
min_max_dates AS (
SELECT MIN(ViewDate) as min_date, MAX(ViewDate) as max_date
FROM ColumnsMixedFluxStock
),
date_range AS (
SELECT explode(sequence(to_date(min_date), to_date(max_date), interval 1 month)) as ViewDate
FROM min_max_dates
),
columnsFluxToStock AS (
SELECT i1.ViewDate, i2.silo, i2.GAR, i2.Lob,
SUM(coalesce(i2.recours, 0)) as recours
FROM ColumnsFlux i2
cross join date_range i1 on
i1.ViewDate >= i2.ViewDate
GROUP BY i1.ViewDate, i2.silo, i2.GAR, i2.Lob
)
/*columnsStockFinal AS (
SELECT i1.ViewDate, i2.silo, i2.GAR, i2.Lob,
coalesce(last_value(i2.PSAP) OVER (PARTITION BY /*i2.ViewDate,*/ i2.silo, i2.GAR, i2.Lob ORDER BY i2.ViewDate ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), 0) as PSAP
FROM date_range i1
LEFT JOIN ColumnsStock i2 on
i1.ViewDate >= i2.ViewDate
) select * from columnsStockFinal order by ViewDate, Silo, GAR, Lob*/
/*all_combinations AS (
SELECT dr.ViewDate, cs.silo, cs.GAR, cs.Lob
FROM date_range dr
CROSS JOIN (SELECT DISTINCT silo, GAR, Lob FROM ColumnsStock) cs
),
columnsStockFinal AS (
SELECT ac.ViewDate, ac.silo, ac.GAR, ac.Lob,
COALESCE(LAST_VALUE(cs.PSAP) OVER (PARTITION BY cs.silo, cs.GAR, cs.Lob ORDER BY ac.ViewDate ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), 0) AS PSAP
FROM all_combinations ac
LEFT JOIN ColumnsStock cs ON ac.silo = cs.silo AND ac.GAR = cs.GAR AND ac.Lob = cs.Lob AND ac.ViewDate = cs.ViewDate
) select * from columnsStockFinal order by ViewDate, Silo, GAR, Lob*/
/*
columnsStockFinal AS (
SELECT i1.ViewDate, i2.silo, i2.GAR, i2.Lob,
coalesce(i2.PSAP, 0) as PSAP
FROM ColumnsStock i2
cross join date_range i1 on
i1.ViewDate = i2.ViewDate
GROUP BY i1.ViewDate, i2.silo, i2.GAR, i2.Lob
) --select * from columnsStockFinal order by ViewDate, Silo, GAR, Lob
*/
/*select cs.Silo, cs.ViewDate, cs.GAR, cs.Lob,
cfts.recours,
cs.PSAP
from columnsStockFinal cs --columnsStock cs
inner join
--full outer join
--inner join après avoir fait une autre CTE columnsStockFinal avec un cross join de ColumnsStock et date_range
columnsFluxToStock cfts on
cs.Silo = cfts.Silo
AND cs.ViewDate = cfts.ViewDate
AND cs.GAR = cfts.GAR
AND cs.Lob = cfts.Lob
--where recours != 0 or PSAP != 0
order by ViewDate, Silo, GAR, Lob*/
select coalesce(cs.Silo, cfts.Silo) as Silo, coalesce(cs.ViewDate, cfts.ViewDate) ViewDate, coalesce(cs.GAR, cfts.GAR) as GAR, coalesce(cs.Lob, cfts.Lob) as Lob,
cfts.recours,
cs.PSAP
from columnsStock cs --columnsStock cs
full outer join
columnsFluxToStock cfts on
cs.Silo = cfts.Silo
AND cs.ViewDate = cfts.ViewDate
AND cs.GAR = cfts.GAR
AND cs.Lob = cfts.Lob
--where recours != 0 or PSAP != 0
order by ViewDate, Silo, GAR, Lob
感谢您的帮助。
我在 Databricks 上使用 Spark sql。
这可能存在一些差距,具体取决于您的样本数据的代表性。
杂乱地堆满了一些用于再现性/测试的样板。主要解决方案是在 Spark sql 查询中查找
actual_df
。
原理 - 使用累积和独立计算原始数据的滚动总和。然后您可以估算缺失的日期,而无需在计算中考虑它。
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from pyspark.sql.functions import to_date, col
from pyspark.testing.utils import assertDataFrameEqual
schema = StructType([
StructField("viewdate", StringType(), True),
StructField("silo", StringType(), True),
StructField("gar", StringType(), True),
StructField("lob", StringType(), True),
StructField("recours", IntegerType(), True),
StructField("psap", IntegerType(), True)
])
sample_data = [
('2023-01-31', 'EDI', 'GAR1', 'lob1', 10, 1000),
('2023-02-28', 'EDI', 'GAR1', 'lob1', 5, 1100),
('2023-05-31', 'EDI', 'GAR1', 'lob1', 6, 1200),
('2023-01-31', 'Completude', 'GAR1', 'lob1', 100, 2000),
('2023-02-28', 'Completude', 'GAR1', 'lob1', 50, 2100),
('2023-04-30', 'Completude', 'GAR1', 'lob1', 60, 2200),
('2023-05-31', 'Completude', 'GAR1', 'lob1', 70, 2300)
]
expected_data = [
('2023-01-31', 'Completude', 'GAR1', 'lob1', 100, 2000),
('2023-01-31', 'EDI', 'GAR1', 'lob1', 10, 1000),
('2023-02-28', 'Completude', 'GAR1', 'lob1', 150, 2100),
('2023-02-28', 'EDI', 'GAR1', 'lob1', 15, 1100),
('2023-03-31', 'Completude', 'GAR1', 'lob1', 150, 2100),
('2023-03-31', 'EDI', 'GAR1', 'lob1', 15, 1100),
('2023-04-30', 'Completude', 'GAR1', 'lob1', 210, 2200),
('2023-04-30', 'EDI', 'GAR1', 'lob1', 15, 1100),
('2023-05-31', 'Completude', 'GAR1', 'lob1', 280, 2300),
('2023-05-31', 'EDI', 'GAR1', 'lob1', 21, 1200)
]
def create_df_with_cleaned_date(data):
return (
spark.
createDataFrame(data, schema=schema).
withColumn("ViewDate", to_date(col("ViewDate")))
)
create_df_with_cleaned_date(sample_data).createOrReplaceTempView("sample_vw")
expected_df = create_df_with_cleaned_date(expected_data)
actual_df = spark.sql("""
with month_ends_in_range as (
SELECT
explode(
sequence(min(ViewDate), max(ViewDate), interval 1 month)
) as month_ends
FROM
sample_vw
),
cum_sums_calculated as (
select
*,
sum(recours) over (
PARTITION BY Silo
ORDER BY
viewDate rows between unbounded preceding
and current row) as cum_sum_recours,
lead(ViewDate, 1, cast('9999-12-31' as date)) over (
partition by Silo
order by
ViewDate
) nextViewDate
from
sample_vw
),
fill_in_gap_dates as (
select
*
from
cum_sums_calculated csc
inner join month_ends_in_range mer
on mer.month_ends >= csc.ViewDate
and mer.month_ends < csc.NextViewDate
)
select
month_ends as ViewDate,
silo,
gar,
lob,
cast(cum_sum_recours as int) recours,
psap
from
fill_in_gap_dates
order by
ViewDate""")
actual_df.show()
expected_df.show()
assertDataFrameEqual(actual_df, expected_df)