在 Azure Databricks 中,我有下表:
[File_Processing_History]
Id bigint
Client varchar(255)
FileName varchar(255)
FileType varchar(3)
EventType varchar(100)
EventContext varchar(255)
Checksum char(40)
OccurredOn timestamp
Status varchar(50)
Retry boolean
[Raw_Data]
XYZFileKey string
FileName string
FileContent string
File_Processing_History
表是历史表,用于跟踪系统中文件发生的事件历史记录。 Raw_Data
表包含从我们系统中的文件中提取的原始数据,并具有元数据,例如源文件名和内容(数据)。两个表都有一个 FileName
列,其中包含系统上的文件名称。
我的 Azure Databricks Notebook 中还有以下 Python/Spark 代码:
file_processing_df = spark.table("myapp.control.File_Processing_History")
latest_records_df = file_processing_df.join(
file_processing_df.groupBy("FileName").agg(max("OccurredOn").alias("MaxOccurredOn")),
on=["FileName"],
how="inner"
)
result_df = latest_records_df.filter(
(col("Client") == Client) &
(col("FileType") == fileType) &
(col("Status") == "Loaded")
)
windowSpec = Window.partitionBy("FileName").orderBy(col("OccurredOn").desc())
result_df = result_df.withColumn("row_number", F.row_number().over(windowSpec))
filtered_result_df = result_df.filter((col("OccurredOn") == col("MaxOccurredOn")) & (col("row_number") == 1))
filtered_result_df = filtered_result_df.drop("row_number")
filtered_result_df.show()
FileNames = filtered_result_df.select("FileName").distinct()
FileNames.show()
rawDF = spark.table(f"`usurint-mvp`.raw.Raw_Data")
filtered_rawDF = rawDF.join(FileNames, on="FileName", how="inner")
filtered_rawDF.show()
此代码尝试:
File_Processing_History
行,这些行不仅具有最新的 OccurredOn
日期,而且还具有“已加载”状态(如果存在),并将这些行保存到数据框(称为 filtered_result_df
)FileNames
并将其存储在 FileNames
数据框中FileNames
数据框中选择所有行,并从 Raw_data
表中选择 FileName
值与 FileNames
运行时,我从
shows()
获得以下控制台输出:
+--------------------+---+-------+--------+--------------+--------------------+--------+--------------------+------+-----+--------------------+
| FileName| Id| Tenant|FileType| EventType| EventContext|Checksum| OccurredOn|Status|Retry| MaxOccurredOn|
+--------------------+---+-------+--------+--------------+--------------------+--------+--------------------+------+-----+--------------------+
|z823021-01-04f42s...|175|acme-1| 123|FILE_DECRYPTED|Things were done and...| N/A|2024-04-09 15:38:...|Loaded|false|2024-04-09 15:38:...|
+--------------------+---+-------+--------+--------------+--------------------+--------+--------------------+------+-----+--------------------+
+--------------------+
| FileName|
+--------------------+
|z823021-01-04f42s...|
+--------------------+
+--------------+----------+--------------+
| FileName|XYZFileKey|SourceFileData|
+--------------+----------+--------------+
+--------------+----------+--------------+
由于
z823021-01-04f42s...
同时存在于filtered_result_df
和FileNames
中,我预计filtered_rawDF
非空。 有人能发现我哪里出了问题吗?
我创建了两个表file_processing_df和raw_data_df。以下是示例数据:
+---+-------+--------+--------+---------+------------+--------+----------+------+-----+
| Id| Client|FileName|FileType|EventType|EventContext|Checksum|OccurredOn|Status|Retry|
+---+-------+--------+--------+---------+------------+--------+----------+------+-----+
| 1|Client1| File1| CSV| Loaded| Context1| hash1|2022-01-01|Loaded|false|
| 2|Client1| File1| CSV|Processed| Context2| hash2|2022-01-02|Loaded|false|
| 3|Client1| File2| TXT| Loaded| Context3| hash3|2022-01-03|Loaded|false|
| 4|Client2| File3| CSV| Loaded| Context4| hash4|2022-01-04|Failed|false|
| 5|Client2| File3| CSV|Processed| Context5| hash5|2022-01-05|Loaded|false|
+---+-------+--------+--------+---------+------------+--------+----------+------+-----+
+--------+-----------+
|FileName|FileContent|
+--------+-----------+
| File1| Content1|
| File2| Content2|
| File3| Content3|
+--------+-----------+
我尝试过以下方法:
latest_records_df = file_processing_df.join(
file_processing_df.groupBy("FileName").agg(F.max("OccurredOn").alias("MaxOccurredOn")),
on=["FileName"],
how="inner"
)
result_df = latest_records_df.filter(
(F.col("Client") == Client) &
(F.col("FileType") == fileType) &
(F.col("Status") == "Loaded")
)
windowSpec = Window.partitionBy("FileName").orderBy(F.col("OccurredOn").desc())
result_df = result_df.withColumn("row_number", F.row_number().over(windowSpec))
filtered_result_df = result_df.filter(
(F.col("OccurredOn") == F.col("MaxOccurredOn")) & (F.col("row_number") == 1)
)
filtered_result_df = filtered_result_df.drop("row_number")
filtered_result_df.show()
FileNames = filtered_result_df.select("FileName").distinct()
FileNames.show()
filtered_rawDF = raw_data_df.join(FileNames, on="FileName", how="inner")
filtered_rawDF.show()
结果:
|FileName| Id| Client|FileType|EventType|EventContext|Checksum|OccurredOn|Status|Retry|MaxOccurredOn|
+--------+---+-------+--------+---------+------------+--------+----------+------+-----+-------------+
| File1| 2|Client1| CSV|Processed| Context2| hash2|2022-01-02|Loaded|false| 2022-01-02|
+--------+---+-------+--------+---------+------------+--------+----------+------+-----+-------------+
+--------+
|FileName|
+--------+
| File1|
+--------+
+--------+-----------+
|FileName|FileContent|
+--------+-----------+
| File1| Content1|
+--------+-----------+
在上面的代码中,我正在使用指定条件过滤最新记录,添加
row_number
来标识最新出现的记录,并过滤最新出现的行。我还删除了 row_number 列,获取过滤结果,获取不同的文件名,并在显示结果之前使用不同的文件名与 Raw_Data 连接。