使用来自多个目录的镶木地板创建增量表

问题描述 投票:0回答:1

我们正在从大型仓库中提取数据并保存,如下所示

table1
  2024-01-01_2024-03-31
  ├── _SUCCESS
  ├── _committed_1849751916443920415
  ├── _started_1849751916443920415
  ├── part-00000-tid-1849751916443920415-b7c7-b932ec8f07df-76362-1.c000.snappy.parquet
  ...
  ├── part-00100-tid-1849751916443920415-b7c7-b932ec8f07df-76364-1.c000.snappy.parquet  
  ...
  2023-10-01_2024-12-31
   ...
  2023-07-01_2024-10-31
  ...
  2011-01-01_2011-03-31

table2
  2024-01-01_2024-03-31
  2023-10-01_2024-12-31
  2023-07-01_2024-10-31
  ...
  2011-01-01_2011-03-31

不同的日期范围是并行拉取的。之后运行

CONVERT TO DELTA parquet. /mnt/azureblobshare/raw_data/.../table1

为我们提供了所需的增量表。

这些表需要每月更新 - 在每个表目录中添加更新显然是可行的。有没有一种方法可以在单独的目录中创建每周更新并为每个表创建增量表?

update_2024-04-01_2024-04-08
  table1
    *.parquet
  table2
    *.parquet
update_2024-04-08_2024-04-15
  table1
    *.parquet
  table2
    *.parquet

转换为delta可以指向多个独立目录吗?

azure pyspark databricks azure-databricks delta-lake
1个回答
0
投票

我尝试过以下方法:

我为表 1 和表 2 创建了一些示例数据,如下所示,并将数据保存为 Parquet。

from pyspark.sql import SparkSession
import os
dilip_table1_data = [
    (1, 'John', 90),
    (2, 'Jane', 85),
    (3, 'Mark', 95),
    (4, 'Emily', 88)
]
dilip_df_table1 = spark.createDataFrame(dilip_table1_data, ["id", "name", "score"])
dilip_df_table1.write.parquet("/FileStore/tables/RAW/dilip/table1/2024-01-01_2024-03-31/part-00000.parquet")
dilip_table2_data = [
    (1, 'Tom', 78),
    (2, 'Lisa', 92),
    (3, 'Mike', 80),
    (4, 'Sarah', 87)
]
dilip_df_table2 = spark.createDataFrame(dilip_table2_data, ["id", "name", "score"])
dilip_df_table2.write.parquet("/FileStore/tables/RAW/dilip/table2/2024-01-01_2024-03-31/part-00000.parquet")

接下来,我更新了表1和表2。

update_1_dilip_table1_data = [
    (5, 'Oliver', 92),
    (6, 'Emma', 87)
]
dilip_df_update_1_table1 = spark.createDataFrame(update_1_dilip_table1_data, ["id", "name", "score"])
update_1_table1_dir = "/FileStore/tables/RAW/dilip/new/update_2024-04-01_2024-04-08/table1"
os.makedirs(update_1_table1_dir, exist_ok=True)
dilip_df_update_1_table1.write.parquet(f"{update_1_table1_dir}/part-00000.parquet")
spark.sql(f"CONVERT TO DELTA parquet.`{update_1_table1_dir}`")

update_1_dilip_table2_data = [
    (5, 'Olivia', 85),
    (6, 'Daniel', 90)
]
dilip_df_update_1_table2 = spark.createDataFrame(update_1_dilip_table2_data, ["id", "name", "score"])
update_1_table2_dir = "/FileStore/tables/RAW/dilip/new/update_2024-04-01_2024-04-08/table2"
os.makedirs(update_1_table2_dir, exist_ok=True)
dilip_df_update_1_table2.write.parquet(f"{update_1_table2_dir}/part-00000.parquet")
spark.sql(f"CONVERT TO DELTA parquet.`{update_1_table2_dir}`")

在上面的代码中,我创建了更新目录,将更新数据保存为Parquet,并将更新目录转换为Delta。

结果:

df = spark.read.format("delta").load("/FileStore/tables/RAW/dilip/new/update_2024-04-01_2024-04-08/table2")
df.show()

+---+------+-----+
| id|  name|score|
+---+------+-----+
|  5|Olivia|   85|
|  6|Daniel|   90|
+---+------+-----+

df1 = spark.read.format("delta").load("/FileStore/tables/RAW/dilip/new/update_2024-04-01_2024-04-08/table1")
df1.show()

+---+------+-----+
| id|  name|score|
+---+------+-----+
|  5|Oliver|   92|
|  6|  Emma|   87|
+---+------+-----+
© www.soinside.com 2019 - 2024. All rights reserved.