Python Databricks Dataframe 连接过滤记录意外

问题描述 投票:0回答:1

在 Azure Databricks 中,我有下表:

[File_Processing_History]
Id  bigint
Client  varchar(255)
FileName    varchar(255)
FileType    varchar(3)
EventType   varchar(100)
EventContext    varchar(255)
Checksum    char(40)
OccurredOn  timestamp
Status  varchar(50)
Retry   boolean

[Raw_Data]
XYZFileKey  string
FileName    string
FileContent string

File_Processing_History
表是历史表,用于跟踪系统中文件发生的事件历史记录。
Raw_Data
表包含从我们系统中的文件中提取的原始数据,并具有元数据,例如源文件名和内容(数据)。两个表都有一个
FileName
列,其中包含系统上的文件名称。

我的 Azure Databricks Notebook 中还有以下 Python/Spark 代码:

file_processing_df = spark.table("myapp.control.File_Processing_History")
latest_records_df = file_processing_df.join( 
file_processing_df.groupBy("FileName").agg(max("OccurredOn").alias("MaxOccurredOn")),
    on=["FileName"],
    how="inner"
)
result_df = latest_records_df.filter(
    (col("Client") == Client) &
    (col("FileType") == fileType) &
    (col("Status") == "Loaded")
)

windowSpec = Window.partitionBy("FileName").orderBy(col("OccurredOn").desc())
result_df = result_df.withColumn("row_number", F.row_number().over(windowSpec))
filtered_result_df = result_df.filter((col("OccurredOn") == col("MaxOccurredOn")) & (col("row_number") == 1))
filtered_result_df = filtered_result_df.drop("row_number")

filtered_result_df.show()
FileNames = filtered_result_df.select("FileName").distinct()
FileNames.show()
rawDF = spark.table(f"`usurint-mvp`.raw.Raw_Data")
filtered_rawDF = rawDF.join(FileNames, on="FileName", how="inner")
filtered_rawDF.show()

此代码尝试

  1. 查找给定 Client + FileType 的所有
    File_Processing_History
    行,这些行不仅具有最新的
    OccurredOn
    日期,而且还具有“已加载”状态(如果存在),并将这些行保存到数据框(称为
     filtered_result_df
  2. 查找该数据框中所有不同的
    FileNames
    并将其存储在
    FileNames
    数据框中
  3. FileNames
    数据框中选择所有行,并从
    Raw_data
    表中选择
    FileName
    值与
    FileNames
  4. 中的值之一匹配的任何行

运行时,我从

shows()
获得以下控制台输出:

+--------------------+---+-------+--------+--------------+--------------------+--------+--------------------+------+-----+--------------------+
|            FileName| Id| Tenant|FileType|     EventType|        EventContext|Checksum|          OccurredOn|Status|Retry|       MaxOccurredOn|
+--------------------+---+-------+--------+--------------+--------------------+--------+--------------------+------+-----+--------------------+
|z823021-01-04f42s...|175|acme-1|     123|FILE_DECRYPTED|Things were done and...|     N/A|2024-04-09 15:38:...|Loaded|false|2024-04-09 15:38:...|
+--------------------+---+-------+--------+--------------+--------------------+--------+--------------------+------+-----+--------------------+

+--------------------+
|            FileName|
+--------------------+
|z823021-01-04f42s...|
+--------------------+

+--------------+----------+--------------+
|      FileName|XYZFileKey|SourceFileData|
+--------------+----------+--------------+
+--------------+----------+--------------+

由于

z823021-01-04f42s...
同时存在于
filtered_result_df
FileNames
中,我预计
filtered_rawDF
非空。 有人能发现我哪里出了问题吗?

python apache-spark databricks azure-databricks
1个回答
0
投票

我创建了两个表file_processing_dfraw_data_df。以下是示例数据:

+---+-------+--------+--------+---------+------------+--------+----------+------+-----+
| Id| Client|FileName|FileType|EventType|EventContext|Checksum|OccurredOn|Status|Retry|
+---+-------+--------+--------+---------+------------+--------+----------+------+-----+
|  1|Client1|   File1|     CSV|   Loaded|    Context1|   hash1|2022-01-01|Loaded|false|
|  2|Client1|   File1|     CSV|Processed|    Context2|   hash2|2022-01-02|Loaded|false|
|  3|Client1|   File2|     TXT|   Loaded|    Context3|   hash3|2022-01-03|Loaded|false|
|  4|Client2|   File3|     CSV|   Loaded|    Context4|   hash4|2022-01-04|Failed|false|
|  5|Client2|   File3|     CSV|Processed|    Context5|   hash5|2022-01-05|Loaded|false|
+---+-------+--------+--------+---------+------------+--------+----------+------+-----+
+--------+-----------+
|FileName|FileContent|
+--------+-----------+
|   File1|   Content1|
|   File2|   Content2|
|   File3|   Content3|
+--------+-----------+

我尝试过以下方法:

latest_records_df = file_processing_df.join( 
    file_processing_df.groupBy("FileName").agg(F.max("OccurredOn").alias("MaxOccurredOn")),
    on=["FileName"],
    how="inner"
)
result_df = latest_records_df.filter(
    (F.col("Client") == Client) &
    (F.col("FileType") == fileType) &
    (F.col("Status") == "Loaded")
)
windowSpec = Window.partitionBy("FileName").orderBy(F.col("OccurredOn").desc())
result_df = result_df.withColumn("row_number", F.row_number().over(windowSpec))
filtered_result_df = result_df.filter(
    (F.col("OccurredOn") == F.col("MaxOccurredOn")) & (F.col("row_number") == 1)
)
filtered_result_df = filtered_result_df.drop("row_number")
filtered_result_df.show()
FileNames = filtered_result_df.select("FileName").distinct()
FileNames.show()
filtered_rawDF = raw_data_df.join(FileNames, on="FileName", how="inner")
filtered_rawDF.show()

结果:

|FileName| Id| Client|FileType|EventType|EventContext|Checksum|OccurredOn|Status|Retry|MaxOccurredOn|
+--------+---+-------+--------+---------+------------+--------+----------+------+-----+-------------+
|   File1|  2|Client1|     CSV|Processed|    Context2|   hash2|2022-01-02|Loaded|false|   2022-01-02|
+--------+---+-------+--------+---------+------------+--------+----------+------+-----+-------------+

+--------+
|FileName|
+--------+
|   File1|
+--------+

+--------+-----------+
|FileName|FileContent|
+--------+-----------+
|   File1|   Content1|
+--------+-----------+

在上面的代码中,我正在使用指定条件过滤最新记录,添加

row_number
来标识最新出现的记录,并过滤最新出现的行。我还删除了 row_number 列,获取过滤结果,获取不同的文件名,并在显示结果之前使用不同的文件名与 Raw_Data 连接。

© www.soinside.com 2019 - 2024. All rights reserved.